duck.utils.threading.thread_managerยถ
WorkerThreadManager manages and monitors a pool of worker threads.
Features:
Automatic restart of dead or unhealthy workers.
Customizable health-check hooks per worker.
Threaded non-blocking monitoring loop.
Configurable logging and verbosity.
Status inspection and graceful shutdown.
Example use cases:
WSGI/ASGI server worker orchestration.
ML/AI multi-threaded task runner watchdog.
Long-running web backend with thread self-repair.
Usage Example:
def sample_worker(idx, *args):
import time, random
print(f"Worker {idx} started...")
while True:
time.sleep(1)
print(f"[Worker {idx}] Sleeping")
# Simulate random crash; restart will occur
if random.random() < 0.03:
print(f"[Worker {idx}] Simulating crash")
exit(1)
def health_check_fn(thread, idx):
# Returns True if alive; override for custom checks
return thread.is_alive()
manager = WorkerThreadManager(
worker_fn=sample_worker,
num_workers=4,
args_fn=lambda idx: (...),
worker_name_fn=lambda idx: f"duck-worker-{idx}",
health_check_fn=health_check_fn, # Or use HeartbeatHeathCheck object.
restart_timeout=2,
enable_logs=True, verbose_logs=False,
enable_monitoring=True,
thread_stop_timeout=3,
)
try:
manager.start()
for _ in range(20): # Monitor for a while
print("Worker status:", manager.status())
time.sleep(2)
finally:
manager.stop()
Module Contentsยถ
Classesยถ
Thread Health Check using heartbeat approach. |
|
WorkerThreadManager manages and monitors a pool of worker threads. |
APIยถ
- class duck.utils.threading.thread_manager.HeartbeatHealthCheck(heartbeat_timeout: float)[source]ยถ
Thread Health Check using heartbeat approach.
Example:
healthcheck = HeartbeatHealthCheck(...) def worker_fn(idx, healthcheck, ...): while True: healthcheck.update_heartbeat(idx) # Some tasks here ...Initialization
Initialize heartbeat health check.
- __call__(thread: threading.Thread, idx: int) bool[source]ยถ
Checks if last heartbeat hasnโt reached a timeout. This may indicate an unhealthy thread.
- Returns:
True if last heartbeat hasnโt reached a timeout else False.
- Return type:
bool
- Raises:
HeartbeatUpdateNeverCalled โ
Raised if no heartbeat update has never been updated. This avoids mistakenly using this approach but not upating heartbeats by calling
update_heartbeat. In a thread loop, heartbeat update must be called initialialy before handling any tasks.Example:
healthcheck = HeartbeatHealthCheck(...) manager = WorkerThreadManager( health_check_fn=healthcheck, ... ) def worker_fn(idx, healthcheck, ...): while True: healthcheck.update_heartbeat(idx) # Some tasks here ...
- check_health(thread: threading.Thread, idx: int) bool[source]ยถ
Checks if last heartbeat hasnโt reached a timeout. This may indicate an unhealthy thread.
- Returns:
True if last heartbeat hasnโt reached a timeout else False.
- Return type:
bool
- Raises:
HeartbeatUpdateNeverCalled โ
Raised if no heartbeat update has never been called. This avoids mistakenly using this approach but not upating heartbeats by calling
update_heartbeat. In a thread loop, heartbeat update must be called initialialy before handling any tasks.Example:
healthcheck = HeartbeatHealthCheck(...) def worker_fn(idx, healthcheck, ...): while True: healthcheck.update_heartbeat(idx) # Some tasks here ...
- exception duck.utils.threading.thread_manager.HeartbeatUpdateNeverCalled[source]ยถ
Bases:
ExceptionRaised by
HeartbeatHealthCheck.check_healthif heartbeats are empty.Initialization
Initialize self. See help(type(self)) for accurate signature.
- class duck.utils.threading.thread_manager.WorkerThreadManager(worker_fn: Callable, num_workers: int, args_fn: Optional[Callable[[int], tuple]] = None, worker_name_fn: Optional[Callable[[int], str]] = None, health_check_fn: Optional[Union[Callable[[threading.Thread], bool], duck.utils.threading.thread_manager.HeartbeatHealthCheck]] = None, restart_timeout: Union[int, float] = 5, enable_logs: bool = True, verbose_logs: bool = True, enable_monitoring: bool = True, thread_stop_timeout: Optional[float] = 5.0, daemon: bool = False)[source]ยถ
WorkerThreadManager manages and monitors a pool of worker threads.
Features:
Automatic restart of dead or unhealthy workers.
Customizable health-check hooks per worker.
Threaded non-blocking monitoring loop.
Configurable logging and verbosity.
Status inspection and graceful shutdown.
Example use cases:
WSGI/ASGI server worker orchestration.
ML/AI multi-thread task runner watchdog.
Long-running web backend with thread self-repair.
Initialization
- Parameters:
worker_fn โ Function executed by each worker thread.
num_workers โ Number of worker threads to start.
args_fn โ Callable (idx) => tuple for args per worker.
worker_name_fn โ Callable (idx) => str; worker thread name.
health_check_fn โ Callable (Thread) => bool: Function to check health; must return True if worker healthy, False otherwise. You can just supply
HeartbeatHealthCheckobject instead to use heartbeat health check.restart_timeout โ Seconds to wait before restart on thread death.
enable_logs โ Enable info/warning logging.
verbose_logs โ Enable full exception trace logs.
enable_monitoring โ Start monitor thread automatically.
thread_stop_timeout โ Maximum seconds to wait for worker to stop. Will be parsed to
join()method.daemon โ Whether to start daemon threads. Defaults to False.
- _monitor_loop()[source]ยถ
Monitor thread: checks worker health/liveness and restarts unhealthy/dead workers. Non-blocking for main thread.
- stop(wait: bool = True, monitor_stop_timeout: float = 0.5, no_logging: bool = False)[source]ยถ
Stop all worker threads and monitoring thread.
- Parameters:
wait โ Whether to wait for threads to finish stopping. Defaults to True.
monitor_stop_timeout โ Timeout for waiting on monitor thread.
no_logging โ Whether to log stop message. Use this to temporarily disable logging of stop message.