Source code for duck.automation

"""
Module for automations used in automating tasks, jobs, actions, and more.

This module provides classes and utilities to define and manage automated tasks. It includes a base `Automation` class for creating and running tasks with various scheduling options, as well as supporting threading and callback functionalities.

## Classes

- **`AutomationError`**: Custom exception for automation-related errors.
- **`AutomationThread`**: Threading class for executing automations.
- **`Automation`**: Base class for automating tasks, jobs, actions, etc.

## Usage Example

```py
import os
from duck.automation.dispatcher import DispatcherV1
from duck.automation.trigger import NoTrigger

class SimpleAutomation(Automation):
    # Execute a shell script
    def execute(self):
        # Or use async def execute for asynchronous execution, but parse `async_` argument to automation.
        os.system('bash some_script.sh')

# Instantiate the automation with specified parameters
automation = SimpleAutomation(
    name="Sample Automation",
    description="Sample automation",
    start_time='immediate',
    schedules=1,
    interval=0,
)  # Set automation schedules to -1 for infinite schedules

# Instantiate a trigger that executes immediately
trigger = NoTrigger()

# You can create a custom trigger by implementing the `check_trigger` method
# in your AutomationTrigger subclass and returning True or False based on
# whether the trigger condition is satisfied

# Create a dispatcher to manage and execute automations
dispatcher = DispatcherV1()  # The first argument is the main running web application (optional)

# Register the trigger and automation with the dispatcher
dispatcher.register(trigger, automation)
dispatcher.start()  # Listen for triggers and execute automations infinitely

# Alternatively, use a callback function for the automation task
def callback():
    # Perform the automation tasks here
    # Avoid using while loops; set the number of schedules to -1 for infinite schedules
    pass

automation = Automation(
    callback=callback,
    name="Sample Automation",
    description="Sample automation"
)

# Register the trigger and automation with the dispatcher
dispatcher.register(trigger, automation)
dispatcher.start()  # Listen for triggers and execute automations infinitely
"""

import time
import asyncio
import datetime
import threading

from duck.contrib.sync import convert_to_async_if_needed
from duck.utils.dateutils import build_readable_date, datetime_difference
from duck.utils.asyncio import create_task
from duck.utils.asyncio.eventloop import get_or_create_loop_manager


[docs] class AutomationError(Exception): """ Automation based exceptions. """
[docs] class AutomationThread(threading.Thread): """ Threading class for Automations execution. """
[docs] def set_on_stop_callback(self, callback: callable): """ Set a callback that will be called whenever the thread finishes execution. Notes: - Make sure callback doesn't allow any positional or keyword arguments. """ if not callable(callback): raise TypeError(f"Callback should be a callable object not {type(callback).__name__}.") self.on_stop_callback = callback
[docs] def on_stop(self): """ Method called whenever a thread finishes execution. The callback set with method 'set_on_stop_callback' will be called if present. """ if hasattr(self, "on_stop_callback"): return self.on_stop_callback()
[docs] def start(self, *args, **kw): returned = super().start(*args, **kw) self.on_stop() return returned
[docs] class Automation: """ Automation class for automating tasks, jobs, actions, etc. Events: - `on_pre_execute`: Called before every execution cycle. - `on_post_execute`: Called after every execution cycle. - `on_start`: Called once when the automation starts. - `on_finish`: Called once when the automation finishes. - `on_error`: Called whenever there is an error during execution. Override this method to suppress or handle all errors. Notes: - The `start` method is the initial method to run an automation. - The `callback` argument is optional; if not provided, the `execute` method must be implemented. - Provide a name and description for more descriptive Automations. - The `join` method can be used to stop the next execution cycle, meaning the automation will stop immediately before entering the next execution cycle. - Using a while loop in the `callback` or `execute` method will cause the automation to hang, and `join` will not work if the callback is in a loop. - The `join` method works properly before the next execution cycle or after the current execution cycle. **Automations can be asynchronously executed by providing the `async_` argument.** """ def __init__( self, callback: callable = None, name: str = None, threaded: bool = True, async_: bool = False, start_time: datetime.datetime | str = "immediate", schedules: int = 1, interval: float | int = None, description: str = None, ): """ Initialize the Automation class. Args: callback (callable, optional): A callable to be executed whenever the automation runs. If you do not provide a callback, implement the 'execute' method. name (str, optional): The name of the Automation. Defaults to None. threaded (bool): Indicates whether to run the automation in a new thread. Defaults to True. async_ (bool): Indicates whether to run automation asynchronously. Defaults to False. start_time (datetime.datetime | str): The datetime to start the Automation. It can be "immediate" or any specific datetime. schedules (int): The number of times to run the Automation. Defaults to 1. Set to -1 for infinite schedules. interval (int | float): The time period in seconds between successive runs of the Automation. description (str, optional): A brief description of the Automation. Defaults to an empty string. """ self.__running_app = None self.__first_execution = False self.__execution_times = 0 self.__started_at = None self.__finished_at = None self.__force_stop = False self.__running = False self.__finished = False self.__latest_error = None self.disable_execution = False # Run some checks # threaded checks if not isinstance(threaded, bool): raise AutomationError( f"Argument `threaded` should be a boolean not {type(threaded).__name__}" ) # Callback checks if callback: if not callable(callback): raise AutomationError( "Callback must be a callable object, function or method." ) # Name checks if name: if not isinstance(name, str): raise AutomationError( f"Name must be an instance of string not {type(name).__name__}" ) if description and not isinstance(description, str): raise AutomationError( f"Description must be a string not {type(description).__name__}" ) # Async/threaded checks if async_ and threaded: raise AutomationError("Arguments `async_` and `threaded` cannot be both True at the same time.") # start_time checks if isinstance(start_time, datetime.datetime): now = datetime.datetime.now() if start_time < now: diff_dict: dict = datetime_difference(now, start_time) diff = build_readable_date(diff_dict) if diff == "Just now": diff = "Less than a second" raise AutomationError( f'Start time for the automation "{self.__class__.__name__}" has already passed, difference: {diff}' ) elif isinstance(start_time, str): if not start_time == "immediate": raise AutomationError( 'Start time provided is not recognized, should be either a datetime object or "immediate" ' ) else: if not start_time: raise AutomationError( "Please provide start time for the automation.") raise AutomationError( "Start time provided is not recognized, should be either a string or datetime object." ) # schedules checks if isinstance(schedules, int): if schedules < 0 and schedules != -1: raise AutomationError( "Number of schedules for automation should not be less than zero, consider setting to -1 for infinite schedules." ) elif schedules == 0: raise AutomationError( "Number of schedules should be at least >=1 or -1 not zero." ) else: if not schedules: raise AutomationError("Number of schedules required.") raise AutomationError( f"Number of shedules should an integer not {type(schedules).__name__}" ) # Interval checks if isinstance(interval, (float, int)): if interval < 0: raise AutomationError( "Interval for automation should be a positive integer or float" ) elif interval > 0 and (schedules < 2 and schedules != -1): raise AutomationError( "Interval for automation is set yet the number of schedules is less than the minimum i.e. 2" ) else: if interval: raise AutomationError( "Invalid interval provided, should be an integer or a float." ) self.callback = callback self.name = name self.threaded = threaded self.async_ = async_ self.start_time = start_time self.schedules = schedules self.interval = interval or 0 # will be 0 if interval is set to None self.description = str(description) @property def latest_error(self): """ Returns the latest exception encountered during execution. """ return self.__latest_error @property def is_running(self) -> bool: """ Whether the automation is running or not. """ return self.__running @property def started_at(self) -> datetime.datetime: """ The datetime where this automation started. """ if not self.is_running and not self.finished: raise AutomationError( "The automation was never started so the time it started cannot be given." ) return self.__started_at @property def finished_at(self) -> datetime.datetime: """ The datetime where this automation finished. """ if not self.finished: if self.is_running: raise AutomationError("Automation still running.") raise AutomationError( "The automation was never started so the time it finished cannot be given." ) return self.__finished_at @property def finished(self): """ Whether the automation has been stopped completely. """ return self.__finished @property def first_execution(self) -> bool: """ Returns whether the automation has already been executed for the first time. Notes: - Take a look a property execution_times to see the number of times the Automation was Executed. """ return self.__first_execution @property def execution_times(self) -> int: """ Returns the number of times the automation has been executed. """ return self.__execution_times @property def execution_cycles(self) -> int: """ Returns the number of times the automation has been executed (same as property 'execution_times'). """ return self.execution_times
[docs] def to_thread(self) -> AutomationThread: """ This wraps an automation in new Thread so that it may be executed nicely without blocking any other automations execution. Return: AutomationThread: Returns an automation in new thread. """ def on_error_wrapper(error: Exception): """ Function called on automation execution error. """ self.__running = False self.__finished = True self.__latest_error = error self.__finished_at = datetime.datetime.now() self.on_error(error) def run_automation(automation): """ Run an automation. """ try: automation._start() except Exception as e: automation.join() # stop next execution cycles just in case if automation.name: e = AutomationError( f'Error executing automation with name "{automation.name}": {e}' ) else: e = AutomationError( f'Error executing automation of class "{type(automation).__name__}": {e}' ) # call on_error event on_error_wrapper(e) if self.name: thread = AutomationThread( target=run_automation, args=[self], name=f'Automation-{self.name.strip().replace(" ", "-")}'.title( ), ) else: thread = AutomationThread(target=run_automation, args=[self]) return thread
[docs] def get_thread(self): """ This returns the Thread for running automation. Notes: - The thread is only created once. """ if not hasattr(self, "_base_thread"): self._base_thread = self.to_thread() return self._base_thread
[docs] def join(self): """ Wait for the automation to finish the current execution cycle and stop the execution """ self.__force_stop = True
[docs] def get_short_description(self): """ Get the short version of description. """ max_limit = 20 short_description = "" if len(self.description) > max_limit: splitted_description = self.description.split(" ") if len(splitted_description) > 1: for word in splitted_description: incr = word + " " if len(short_description) + len(incr) <= max_limit: short_description += incr else: short_description = short_description.strip() short_description += "..." break else: short_description = (self.description[:max_limit].strip() + "...") else: return self.description return short_description
[docs] def prepare_stop(self): """ Called before the main application termination. """ pass
[docs] def get_running_app(self): """ Returns the main running application """ if not self.__running_app: raise AutomationError("Main running application is not yet set.") return self.__running_app
[docs] def set_running_app(self, app): """ Set the main running application """ from duck.app import App if not isinstance(app, App): raise AutomationError( f"Invalid application type: '{type(app).__name__}'. Expected instance of 'duck.app.App'." ) self.__running_app = app
[docs] def start(self): """ Entry method to execute an automation. """ def on_error_wrapper(error: Exception): """ Function called on automation execution error. """ self.__running = False self.__finished = True self.__latest_error = error self.__finished_at = datetime.datetime.now() self.on_error(error) if self.threaded: # Use multithreading thread = self.get_thread() if thread.is_alive(): raise AutomationError( "Automation is already running in another Thread, cannot start automation." ) thread.start() elif self.async_: # Use asynchronous execution task = getattr(self, "_async_task", None) if task and not task.done(): raise AutomationError( "Automation is already running in another async Task, cannot start automation." ) async def async_task_wrapper(): task = create_task(self._async_start()) self._async_task = task # Execute asynchronous task loop_manager = get_or_create_loop_manager(strictly_get=True, id="automations-eventloop-manager") loop_manager.submit_task(async_task_wrapper()) else: # Use synchronous blocking execution try: self._start() except Exception as e: self.join() # stop next execution cycles just in case if self.name: e = AutomationError( f'Error executing automation with name "{self.name}": {e}' ) else: e = AutomationError( f'Error executing automation of class "{type(self).__name__}": {e}' ) # call on_error event on_error_wrapper(e)
[docs] def _start(self): """ Entry method to execute an automation. """ self.__running = False self.__force_stop = False # undo method "join" if it has been used self.__finished = False def on_start_wrapper(): """ Function called on automation start. """ self.__running = True self.__first_execution = True self.__started_at = datetime.datetime.now() self.on_start() def on_finish_wrapper(): """ Function called on automation finish. """ self.__running = False self.__finished = True self.__finished_at = datetime.datetime.now() self.on_finish() if self.schedules == -1: # execute to infinite counter = 0 while True: if self.__force_stop or self.disable_execution: break # force stop, maybe method join was used. if counter == 0: on_start_wrapper() # call function on_start_wrapper just once if self.disable_execution: break # continue with execution self.on_pre_execute() # do some stuff before execution self.execute() # execute the automation self.on_post_execute() # do some stuff after execution self.__executed = (True) # set that the method has executed for the first time self.__execution_times += 1 counter += 1 # counter is more accurate on number of times the loop was run compared to __execution_times. time.sleep(self.interval) # sleep before next execution cycle else: for i in range(0, self.schedules): if self.__force_stop or self.disable_execution: break # force stop, maybe method join was used. if i == 0: on_start_wrapper() # call function on_start_wrapper just once if self.disable_execution: break # continue with execution self.on_pre_execute() # do some stuff before execution self.execute() # execute the automation self.on_post_execute() # do some stuff after execution self.__executed = (True) # automation has executed for the first time self.__execution_times += 1 time.sleep(self.interval) # sleep before next execution cycle # finished or stopped execution on_finish_wrapper()
[docs] async def _async_start(self): """ Asyncronous entry method to execute an automation. """ self.__running = False self.__force_stop = False # undo method "join" if it has been used self.__finished = False async def on_start_wrapper(): """ Async function called on automation start. """ self.__running = True self.__first_execution = True self.__started_at = datetime.datetime.now() await convert_to_async_if_needed(self.on_start)() async def on_finish_wrapper(): """ Async function called on automation finish. """ self.__running = False self.__finished = True self.__finished_at = datetime.datetime.now() await convert_to_async_if_needed(self.on_finish)() if self.schedules == -1: # execute to infinite counter = 0 while True: if self.__force_stop or self.disable_execution: break # force stop, maybe method join was used. if counter == 0: await on_start_wrapper() # call function on_start_wrapper just once if self.disable_execution: break # continue with execution await convert_to_async_if_needed(self.on_pre_execute)() # do some stuff before execution await convert_to_async_if_needed(self.execute)() # execute the automation await convert_to_async_if_needed(self.on_post_execute)() # do some stuff after execution self.__executed = (True) # set that the method has executed for the first time self.__execution_times += 1 counter += 1 # counter is more accurate on number of times the loop was run compared to __execution_times. await asyncio.sleep(self.interval) # sleep before next execution cycle else: for i in range(0, self.schedules): if self.__force_stop or self.disable_execution: break # force stop, maybe method join was used. if i == 0: await on_start_wrapper() # call function on_start_wrapper just once if self.disable_execution: break # continue with execution await convert_to_async_if_needed(self.on_pre_execute)() # do some stuff before execution await convert_to_async_if_needed(self.execute)() # execute the automation await convert_to_async_if_needed(self.on_post_execute)() # do some stuff after execution self.__executed = (True) # automation has executed for the first time self.__execution_times += 1 await asyncio.sleep(self.interval) # sleep before next execution cycle # finished or stopped execution await on_finish_wrapper()
[docs] def execute(self): """ This executes an automation. Do whatever task you want here e.g. running bash scripts or something else. Example: ```py import os class SimpleAutomation(Automation): def execute(self): os.system('bash some_script.bash') automation = SimpleAutomation(start_time='immediate', schedules=1, interval=0) automation.start # start the automation ``` """ if not self.callback: raise NotImplementedError( "AutomationError: The 'execute' method must be implemented. This method is the main entry point for automation execution. Alternatively, pass a callback argument to the Automation class." ) self.callback()
[docs] def on_pre_execute(self): """ Method called before automation starts the current execution cycle. """
[docs] def on_post_execute(self): """ Method called after automation has finished the current execution cycle. """
[docs] def on_start(self): """ Method called once the execution has been started. Notes: - Method "on_start" is called before method "on_pre_execute" """
[docs] def on_finish(self): """ Method called on automation final finish. """
[docs] def on_error(self, e): """ Method called on execution automation after method "start" is called. """ raise e
[docs] def __repr__(self): return ( f"[{self.__class__.__name__}] (\n" f" name={repr(self.name)}\n" f" description={repr(self.get_short_description()) if self.description else None}\n" f" callback={self.callback}\n" f" start_time={repr(self.start_time)}\n" f" execution_times={self.execution_times}\n" f" finished={self.finished}\n" f" schedules={self.schedules}\n" f" interval={self.interval}\n" f" latest_error={type(self.latest_error).__name__ if self.latest_error else None}\n" ")")
[docs] class SampleAutomationBase(Automation): """ A placeholder automation class designed for testing or sampling purposes. This automation is intentionally empty and does not perform any actions. It can be used as a base for creating more complex automations or for testing automation systems without side effects. Use this class when you need a no-op (no operation) automation that will not alter the application's state or behavior during execution. """
[docs] def execute(self): """ No-op method. This method is intentionally left empty and performs no actions. It is used as a placeholder or for testing purposes where no operation is required. """
SampleAutomation = SampleAutomationBase()