Source code for duck.utils.timer

"""
Timer Utility module for scheduling callables.
"""
import time
import threading


[docs] class TimerThreadPool: all: list = [] """ List of all threads added through the use of any `TimerΛ‹ class. """
[docs] class Timer: """ Timer Utility for callables scheduling. """ all_threads: list[threading.Thread] = [] """ List of all alive or dead threads scheduled. """
[docs] @classmethod def _schedule_interval(cls, function: callable, seconds: int): while True: time.sleep(seconds) function()
[docs] @classmethod def _schedule(cls, function: callable, seconds: int): time.sleep(seconds) function()
[docs] @classmethod def schedule(cls, function: callable, seconds: int): th = threading.Thread(target=cls._schedule, args=[function, seconds]) th.start() cls.all_threads.append(th) TimerThreadPool.all.append(th)
[docs] @classmethod def schedule_interval(cls, function: callable, seconds: int): th = threading.Thread(target=cls._schedule_interval, args=[function, seconds]) th.start() cls.all_threads.append(th) TimerThreadPool.all.append(th)
[docs] class OverlappingTimer: """ This Timer Utility stops all running threads scheduled with the use of this class and then runs the new schedule in a new thread. **NOTE:** This only share threads from an initialized object. """ def __init__(self): self.all_threads: list[threading.Thread] = ([]) # List of all threads scheduled
[docs] def get_running_thread(self): """ Get the latest thread that was created when using any of the methods (schedule or schedule_interval). **WARNING:**# If the latest thread is done running, None will be returned """ if not self.all_threads: return None last_thread = self.all_threads[-1] return last_thread if last_thread.is_alive() else None
[docs] @staticmethod def _schedule_interval(function: callable, seconds: int) -> None: while True: time.sleep(seconds) function()
[docs] @staticmethod def _schedule(function: callable, seconds: int) -> None: time.sleep(seconds) function()
[docs] def schedule(self, function: callable, seconds: int): th = threading.Thread(target=self._schedule, args=[function, seconds]) # Stop all threads that were created by using schedule method for thread in self.all_threads: if thread.is_alive(): try: thread._stop() except Exception: try: thread.join() except Exception: pass th.start() self.all_threads.append(th) TimerThreadPool.all.append(th)
[docs] def schedule_interval(self, function: callable, seconds: int): th = threading.Thread(target=self._schedule_interval, args=[function, seconds]) # Stop all threads that were created by using schedule_interval method for thread in self.all_threads: if thread.is_alive(): try: thread._stop() except Exception: try: thread.join() except Exception: pass th.start() self.all_threads.append(th) TimerThreadPool.all.append(th)
[docs] class SharedOverlappingTimer: """ This Timer Utility stop all running threads scheduled with the use of this class and then runs the new schedule in a new thread. **NOTE:** This shares all threads from this class """ all_threads: list[threading.Thread] = [] """ List of all threads scheduled. """
[docs] def get_running_thread(self): """ Get the latest thread that was created when using any of the methods (schedule or schedule_interval). **WARNING:** If the latest thread is done running, None will be returned """ if not self.all_threads: return None last_thread = self.all_threads[-1] return last_thread if last_thread.is_alive() else None
[docs] @staticmethod def _schedule_interval(function: callable, seconds: int) -> None: while True: time.sleep(seconds) function()
[docs] @staticmethod def _schedule(function: callable, seconds: int) -> None: time.sleep(seconds) function()
[docs] def schedule(self, function: callable, seconds: int): th = threading.Thread(target=self._schedule, args=[function, seconds]) # Stop all threads that were created by using schedule method for thread in self.all_threads: if thread.is_alive(): try: thread._stop() except Exception: try: thread.join() except Exception: pass th.start() type(self).all_threads.append(th) TimerThreadPool.all.append(th)
[docs] def schedule_interval(self, function: callable, seconds: int): th = threading.Thread(target=self._schedule_interval, args=[function, seconds]) # Stop all threads that were created by using schedule_interval method for thread in self.all_threads: if thread.is_alive(): try: thread._stop() except Exception: try: thread.join() except Exception: pass th.start() type(self).all_threads.append(th) TimerThreadPool.all.append(th)