duck.utils.multiprocessing.process_managerยถ

WorkerProcessManager manages and monitors a pool of worker processes.

Features:

  • Automatic restart of dead or unhealthy workers.

  • Customizable health-check hooks per worker.

  • Threaded non-blocking monitoring loop.

  • Configurable logging and verbosity.

  • Status inspection and graceful shutdown.

Example use cases:

  • WSGI/ASGI server worker orchestration.

  • ML/AI multi-process task runner watchdog.

  • Long-running web backend with process self-repair.

Usage Example:

def sample_worker(idx, *args):
    import time, random
    print(f"Worker {idx} started...")

    while True:
        time.sleep(1)
        print(f"[Worker {idx}] Sleeping")

        # Simulate random crash; restart will occur
        if random.random() < 0.03:
            print(f"[Worker {idx}] Simulating crash")
            exit(1)

def health_check_fn(proc, idx):
    # Returns True if alive; override for custom checks
    return proc.is_alive()

manager = WorkerProcessManager(
    worker_fn=sample_worker,
    num_workers=4,
    args_fn=lambda idx: (...),
    worker_name_fn=lambda idx: f"duck-worker-{idx}",
    health_check_fn=health_check_fn, # Or use HeartbeatHeathCheck object.
    restart_timeout=2,
    enable_logs=True, verbose_logs=False,
    enable_monitoring=True,
    process_stop_timeout=3,
)

try:
    manager.start()
    for _ in range(20):  # Monitor for a while
        print("Worker status:", manager.status())
        time.sleep(2)
finally:
    manager.stop()

Module Contentsยถ

Classesยถ

HeartbeatHealthCheck

Process Health Check using heartbeat approach.

WorkerProcessManager

WorkerProcessManager manages and monitors a pool of worker processes.

APIยถ

class duck.utils.multiprocessing.process_manager.HeartbeatHealthCheck(heartbeat_timeout: float)[source]ยถ

Process Health Check using heartbeat approach.

Example:

healthcheck = HeartbeatHealthCheck(...)

def worker_fn(idx, healthcheck, ...):
    while True:
        healthcheck.update_heartbeat(idx)
        # Some tasks here
        ...

Initialization

Initialize heartbeat health check.

__call__(process: multiprocessing.Process, idx: int) โ†’ bool[source]ยถ

Checks if last heartbeat hasnโ€™t reached a timeout. This may indicate an unhealthy process.

Returns:

True if last heartbeat hasnโ€™t reached a timeout else False.

Return type:

bool

Raises:

HeartbeatUpdateNeverCalled โ€“

Raised if no heartbeat update has never been updated. This avoids mistakenly using this approach but not upating heartbeats by calling update_heartbeat. In a process loop, heartbeat update must be called initialialy before handling any tasks.

Example:

healthcheck = HeartbeatHealthCheck(...)

manager = WorkerProcessManager(
    health_check_fn=healthcheck,
    ...
)

def worker_fn(idx, healthcheck, ...):
    while True:
        healthcheck.update_heartbeat(idx)
        # Some tasks here
        ...

check_health(process: multiprocessing.Process, idx: int) โ†’ bool[source]ยถ

Checks if last heartbeat hasnโ€™t reached a timeout. This may indicate an unhealthy process.

Returns:

True if last heartbeat hasnโ€™t reached a timeout else False.

Return type:

bool

Raises:

HeartbeatUpdateNeverCalled โ€“

Raised if no heartbeat update has never been called. This avoids mistakenly using this approach but not upating heartbeats by calling update_heartbeat. In a process loop, heartbeat update must be called initialialy before handling any tasks.

Example:

healthcheck = HeartbeatHealthCheck(...)

def worker_fn(idx, healthcheck, ...):
    while True:
        healthcheck.update_heartbeat(idx)
        # Some tasks here
        ...

update_heartbeat(idx: int)[source]ยถ

Update last heartbeat.

Parameters:

idx โ€“ Index of the process, usually provided to worker_fn.

Raises:

RuntimeError โ€“ If the function is called in main process or not in a child process.

exception duck.utils.multiprocessing.process_manager.HeartbeatUpdateNeverCalled[source]ยถ

Bases: Exception

Raised by HeartbeatHealthCheck.check_health if heartbeats are empty.

Initialization

Initialize self. See help(type(self)) for accurate signature.

class duck.utils.multiprocessing.process_manager.WorkerProcessManager(worker_fn: Callable, num_workers: int, args_fn: Optional[Callable[[int], tuple]] = None, worker_name_fn: Optional[Callable[[int], str]] = None, health_check_fn: Optional[Union[Callable[[multiprocessing.Process], bool], duck.utils.multiprocessing.process_manager.HeartbeatHealthCheck]] = None, restart_timeout: Union[int, float] = 5, enable_logs: bool = True, verbose_logs: bool = True, enable_monitoring: bool = True, process_stop_timeout: Optional[float] = 5.0, daemon: bool = False)[source]ยถ

WorkerProcessManager manages and monitors a pool of worker processes.

Features:

  • Automatic restart of dead or unhealthy workers.

  • Customizable health-check hooks per worker.

  • Threaded non-blocking monitoring loop.

  • Configurable logging and verbosity.

  • Status inspection and graceful shutdown.

Example use cases:

  • WSGI/ASGI server worker orchestration.

  • ML/AI multi-process task runner watchdog.

  • Long-running web backend with process self-repair.

Initialization

Parameters:
  • worker_fn โ€“ Function executed by each worker process.

  • num_workers โ€“ Number of worker processes to spawn.

  • args_fn โ€“ Callable (idx) => tuple for args per worker.

  • worker_name_fn โ€“ Callable (idx) => str; worker process name.

  • health_check_fn โ€“ Callable (Process) => bool: Function to check health; must return True if worker healthy, False otherwise. You can just supply HeartbeatHealthCheck object instead to use heartbeat health check.

  • restart_timeout โ€“ Seconds to wait before restart on process death.

  • enable_logs โ€“ Enable info/warning logging.

  • verbose_logs โ€“ Enable full exception trace logs.

  • enable_monitoring โ€“ Start monitor thread automatically.

  • process_stop_timeout โ€“ Maximum seconds to wait for worker to stop. Will be parsed to join() method.

  • daemon โ€“ Whether to start daemon processes. Defaults to False.

_monitor_loop()[source]ยถ

Monitor thread: checks worker health/liveness and restarts unhealthy/dead workers. Non-blocking for main thread.

_restart_worker(idx: int)[source]ยถ

Restart a worker process by index.

running_pids()[source]ยถ

Returns a list of PIDs for currently alive worker processes.

start()[source]ยถ

Start worker processes and non-blocking monitor loop.

status()[source]ยถ

Returns status list for all worker processes. Each dict contains (name, pid, alive).

stop(graceful: bool = True, wait: bool = True, monitor_stop_timeout: float = 0.5, no_logging: bool = False)[source]ยถ

Stop all worker processes and monitoring thread.

Parameters:
  • graceful โ€“ Use terminate() for workers (soft shutdown).

  • wait โ€“ Whether to wait for processes to finish stopping. Defaults to True.

  • monitor_stop_timeout โ€“ Timeout for waiting on monitor thread.

  • no_logging โ€“ Whether to log stop message. Use this to temporarily disable logging of stop message.