duck.utils.multiprocessing.processpool

Module Contents

Classes

ProcessPoolManager

Process pool manager with task type protection.

Functions

get_or_create_process_manager

Retrieve or create the ProcessPoolManager instance bound to the current process.

Data

REGISTRY

API

exception duck.utils.multiprocessing.processpool.ManagerNotFound[source]

Bases: Exception

Raised if manager cannot be resolved and user strictly wants to get the manager and user doesn’t allow creating it if it doesn’t exist.

Initialization

Initialize self. See help(type(self)) for accurate signature.

class duck.utils.multiprocessing.processpool.ProcessPoolManager(creator_process: Optional[multiprocessing.Process] = None)[source]

Process pool manager with task type protection.

Use start() to initialize a centralized processpool for sync tasks. Restrict submitted tasks by their task_type, preventing inappropriate jobs in critical worker pools.

Initialization

Initialize the processpool.

Parameters:

creator_process – This is the process responsible for this manager.’

__instances

[]

This is the list of created instances.

__repr__()[source]
__str__()[source]
_worker_init()[source]

Method called when process worker is initialized.

classmethod all_instances() List[duck.utils.multiprocessing.processpool.ProcessPoolManager][source]

Returns a list of all created instances so far.

get_pool() concurrent.futures.ProcessPoolExecutor[source]

Returns the running process pool.

Returns:

Running process pool.

Return type:

concurrent.futures.ProcessPoolExecutor

Raises:

RuntimeError – If the process pool is not running.

classmethod registry() Dict[int, Dict[Any, duck.utils.multiprocessing.processpool.ProcessPoolManager]][source]

Returns the registry for created instances. Useful for tracking.

start(max_workers: int, task_type: Optional[str] = None, daemon: bool = False, process_name_prefix: Optional[str] = None, mp_context: Optional[multiprocessing.context.BaseContext] = None)[source]

Starts the processpool, ready to accept tasks.

Parameters:
  • max_workers – Maximum processs for pool.

  • task_type – Only allows tasks with this type to be submitted. Useful for protecting pools handling critical jobs (e.g., request_handling only).

  • daemon – Whether pool worker processs should be daemon processs.

  • process_name_prefix – The prefix for each worker process.

  • mp_context – optional multiprocessing context (get_context(‘spawn’|‘fork’|…))

Raises:

RuntimeError – If process pool already available and initialized.

stop(wait: bool = True)[source]

Shutdowns the process pool.

Parameters:

wait – Whether to wait for running tasks to finish.

submit_task(task: Callable, *args, task_type: Optional[str] = None, **kwargs) concurrent.futures.Future[source]

Submit a task to the process pool.

Parameters:
  • task – Callable to execute.

  • task_type – Type/flag of this task. If manager was initialized with a specific allowed task_type, this must match or raise UnknownTaskError.

Raises:
  • UnknownTaskError – If task_type mismatches the pool’s allowed type.

  • RuntimeError – If the process pool is None/not running.

Returns:

Future for the executing task.

Return type:

concurrent.futures.Future

duck.utils.multiprocessing.processpool.REGISTRY: Dict[int, Dict[Any, duck.utils.multiprocessing.processpool.ProcessPoolManager]]

None

exception duck.utils.multiprocessing.processpool.UnknownTaskError(task_type, pool_task_type)[source]

Bases: Exception

Raised when attempting to submit a task of a disallowed or unknown type.

This error indicates a task type was provided (or omitted) that does not match the pool’s configured protection. Typical use is to prevent accidental or inappropriate task submission to specialized or critical pools. If you need to run a different type of task, consider subclassing or reconfiguring the pool.

Initialization

Initialize self. See help(type(self)) for accurate signature.

duck.utils.multiprocessing.processpool.get_or_create_process_manager(id: Optional[Any] = None, force_create: bool = False, strictly_get: bool = False) ProcessPoolManager[source]

Retrieve or create the ProcessPoolManager instance bound to the current process.

Returns:

The resolved or newly created manager instance.

Return type:

ProcessPoolManager

Centralized Process Manager

This module provides a structured system for managing process-scoped ProcessPoolManager instances with optional task-type protection. It enables worker processs, request handlers, or subsystem processs to each maintain their own dedicated process-pool manager, preventing concurrency issues caused by cross-process state leakage.

Features

  1. Multiple Manager Namespaces Using the id parameter, processs can maintain several different logical managers simultaneously (e.g., CPU-bound pool, IO pool, component-render pool). Each namespace is isolated without requiring separate registry structures.

  2. Task-Type Protection Each manager can enforce a single allowed task_type for submitted tasks. This prevents accidental or unsafe mixing of workloads—for example, ensuring that: - time-sensitive tasks run only in designated pools, - component rendering tasks never leak into background worker pools, - slow jobs cannot clog request-handling processs.

  3. Daemon and Non-Daemon Worker Control Managers can start process pools with daemon processs (for non-blocking shutdown) or non-daemon processs (for guaranteed completion of jobs before exit).

Typical Usage

def worker_entrypoint():
    # Initialize or reuse the process's manager
    manager = get_or_create_process_manager(id="render")

    # Start a dedicated process pool for rendering tasks
    manager.start(max_workers=4, task_type="component_render")

    future = manager.submit_task(
        some_callable,
        task_type="component_render",
    )
    return future.result()