duck.utils.threading.thread_managerยถ

WorkerThreadManager manages and monitors a pool of worker threads.

Features:

  • Automatic restart of dead or unhealthy workers.

  • Customizable health-check hooks per worker.

  • Threaded non-blocking monitoring loop.

  • Configurable logging and verbosity.

  • Status inspection and graceful shutdown.

Example use cases:

  • WSGI/ASGI server worker orchestration.

  • ML/AI multi-threaded task runner watchdog.

  • Long-running web backend with thread self-repair.

Usage Example:

def sample_worker(idx, *args):
    import time, random
    print(f"Worker {idx} started...")

    while True:
        time.sleep(1)
        print(f"[Worker {idx}] Sleeping")

        # Simulate random crash; restart will occur
        if random.random() < 0.03:
            print(f"[Worker {idx}] Simulating crash")
            exit(1)

def health_check_fn(thread, idx):
    # Returns True if alive; override for custom checks
    return thread.is_alive()

manager = WorkerThreadManager(
    worker_fn=sample_worker,
    num_workers=4,
    args_fn=lambda idx: (...),
    worker_name_fn=lambda idx: f"duck-worker-{idx}",
    health_check_fn=health_check_fn, # Or use HeartbeatHeathCheck object.
    restart_timeout=2,
    enable_logs=True, verbose_logs=False,
    enable_monitoring=True,
    thread_stop_timeout=3,
)

try:
    manager.start()
    for _ in range(20):  # Monitor for a while
        print("Worker status:", manager.status())
        time.sleep(2)
finally:
    manager.stop()

Module Contentsยถ

Classesยถ

HeartbeatHealthCheck

Thread Health Check using heartbeat approach.

WorkerThreadManager

WorkerThreadManager manages and monitors a pool of worker threads.

APIยถ

class duck.utils.threading.thread_manager.HeartbeatHealthCheck(heartbeat_timeout: float)[source]ยถ

Thread Health Check using heartbeat approach.

Example:

healthcheck = HeartbeatHealthCheck(...)

def worker_fn(idx, healthcheck, ...):
    while True:
        healthcheck.update_heartbeat(idx)
        # Some tasks here
        ...

Initialization

Initialize heartbeat health check.

__call__(thread: threading.Thread, idx: int) โ†’ bool[source]ยถ

Checks if last heartbeat hasnโ€™t reached a timeout. This may indicate an unhealthy thread.

Returns:

True if last heartbeat hasnโ€™t reached a timeout else False.

Return type:

bool

Raises:

HeartbeatUpdateNeverCalled โ€“

Raised if no heartbeat update has never been updated. This avoids mistakenly using this approach but not upating heartbeats by calling update_heartbeat. In a thread loop, heartbeat update must be called initialialy before handling any tasks.

Example:

healthcheck = HeartbeatHealthCheck(...)

manager = WorkerThreadManager(
    health_check_fn=healthcheck,
    ...
)

def worker_fn(idx, healthcheck, ...):
    while True:
        healthcheck.update_heartbeat(idx)
        # Some tasks here
        ...

check_health(thread: threading.Thread, idx: int) โ†’ bool[source]ยถ

Checks if last heartbeat hasnโ€™t reached a timeout. This may indicate an unhealthy thread.

Returns:

True if last heartbeat hasnโ€™t reached a timeout else False.

Return type:

bool

Raises:

HeartbeatUpdateNeverCalled โ€“

Raised if no heartbeat update has never been called. This avoids mistakenly using this approach but not upating heartbeats by calling update_heartbeat. In a thread loop, heartbeat update must be called initialialy before handling any tasks.

Example:

healthcheck = HeartbeatHealthCheck(...)

def worker_fn(idx, healthcheck, ...):
    while True:
        healthcheck.update_heartbeat(idx)
        # Some tasks here
        ...

update_heartbeat(idx: int)[source]ยถ

Update last heartbeat.

Parameters:

idx โ€“ Index of the thread, usually provided to worker_fn.

Raises:

RuntimeError โ€“ If the function is called in main thread or not in a child thread.

exception duck.utils.threading.thread_manager.HeartbeatUpdateNeverCalled[source]ยถ

Bases: Exception

Raised by HeartbeatHealthCheck.check_health if heartbeats are empty.

Initialization

Initialize self. See help(type(self)) for accurate signature.

class duck.utils.threading.thread_manager.WorkerThreadManager(worker_fn: Callable, num_workers: int, args_fn: Optional[Callable[[int], tuple]] = None, worker_name_fn: Optional[Callable[[int], str]] = None, health_check_fn: Optional[Union[Callable[[threading.Thread], bool], duck.utils.threading.thread_manager.HeartbeatHealthCheck]] = None, restart_timeout: Union[int, float] = 5, enable_logs: bool = True, verbose_logs: bool = True, enable_monitoring: bool = True, thread_stop_timeout: Optional[float] = 5.0, daemon: bool = False)[source]ยถ

WorkerThreadManager manages and monitors a pool of worker threads.

Features:

  • Automatic restart of dead or unhealthy workers.

  • Customizable health-check hooks per worker.

  • Threaded non-blocking monitoring loop.

  • Configurable logging and verbosity.

  • Status inspection and graceful shutdown.

Example use cases:

  • WSGI/ASGI server worker orchestration.

  • ML/AI multi-thread task runner watchdog.

  • Long-running web backend with thread self-repair.

Initialization

Parameters:
  • worker_fn โ€“ Function executed by each worker thread.

  • num_workers โ€“ Number of worker threads to start.

  • args_fn โ€“ Callable (idx) => tuple for args per worker.

  • worker_name_fn โ€“ Callable (idx) => str; worker thread name.

  • health_check_fn โ€“ Callable (Thread) => bool: Function to check health; must return True if worker healthy, False otherwise. You can just supply HeartbeatHealthCheck object instead to use heartbeat health check.

  • restart_timeout โ€“ Seconds to wait before restart on thread death.

  • enable_logs โ€“ Enable info/warning logging.

  • verbose_logs โ€“ Enable full exception trace logs.

  • enable_monitoring โ€“ Start monitor thread automatically.

  • thread_stop_timeout โ€“ Maximum seconds to wait for worker to stop. Will be parsed to join() method.

  • daemon โ€“ Whether to start daemon threads. Defaults to False.

_monitor_loop()[source]ยถ

Monitor thread: checks worker health/liveness and restarts unhealthy/dead workers. Non-blocking for main thread.

_restart_worker(idx: int)[source]ยถ

Restart a worker thread by index.

start()[source]ยถ

Start worker threads and non-blocking monitor loop.

status()[source]ยถ

Returns status list for all worker threads. Each dict contains (name, alive).

stop(wait: bool = True, monitor_stop_timeout: float = 0.5, no_logging: bool = False)[source]ยถ

Stop all worker threads and monitoring thread.

Parameters:
  • wait โ€“ Whether to wait for threads to finish stopping. Defaults to True.

  • monitor_stop_timeout โ€“ Timeout for waiting on monitor thread.

  • no_logging โ€“ Whether to log stop message. Use this to temporarily disable logging of stop message.