Source code for duck.utils.asyncio.eventloop

"""
# Asyncio Loop Manager With Thread-Based Context Propagation

This module provides a structured system for managing asyncio event loops that run
inside dedicated background threads. It enables synchronous code (including worker
threads, component systems, WSGI-like environments, and blocking server logic)
to safely submit and await asyncio coroutines without requiring a running loop
in the current thread.

Unlike globally shared event loops—which cause cross-context contamination,
unbounded concurrency, and invalid reentrancy—each loop manager created by this
module is *thread-scoped* and optional-task-type-protected, ensuring strong
isolation for component rendering, per-request async operations, and specialized
background jobs.

## Core Components

1. **AsyncioLoopManager**
   A manager that:
   - initializes a dedicated asyncio event loop,
   - runs it inside a background thread,
   - allows synchronous code to submit async tasks,
   - optionally restricts allowed task types (e.g., `"update"`, `"render"`),
   - supports blocking behavior via `SyncFuture`.

   This bridges synchronous and asynchronous subsystems safely, without requiring
   callers to understand asyncio internals or manage loops manually.

2. **SyncFuture**
   A thread-safe future that lets synchronous callers block until an async
   coroutine completes. This provides an ergonomic way to mix async/sync code
   without deadlocks or event-loop mismanagement.

3. **Thread-Scoped Manager Registry**
   Loop managers are registered per-thread (and per-ID), and automatically
   inherited by child threads via a lineage-resolution system. This ensures:
   - consistent async context for worker threads,
   - isolation of async operations across independent request handlers,
   - avoidance of global-state conflicts.

4. **Task-Type Protection**
   Optional enforcement that allows only a specific `task_type` to be submitted
   into a given loop. This prevents misrouting workloads—for example:
   - keeping component-rendering coroutines separate from background updates,
   - preventing slow tasks from being scheduled into latency-critical loops,
   - enforcing strict execution domains for sensitive operations.

## Motivation

Frameworks that must bridge the sync/async boundary—such as servers using
thread-per-request models, component rendering systems, hybrid blocking/
non-blocking architectures, or environments where loops cannot be run
directly—need a safe, isolated way to schedule coroutines.

## Typical pitfalls solved by this module:
- **Running async code inside random threads**: leads to `RuntimeError: no running event loop`.
- **Global event loop sharing**: creates race conditions and cross-context pollution.
- **Worker threads needing async helpers** without owning an event loop.
- **Component systems generating async work** inside sync call stacks.
- **Preventing unrelated tasks from entering specialized async domains**.

This module eliminates these pitfalls by providing predictable, isolated event
loops that behave like mini-ASGI environments local to each thread.

## Thread Lineage Model

Loop managers are resolved using a hierarchical lookup:

1. Start with the current thread.
2. If it has a manager for the requested ID, return it.
3. If not, recursively check its parent thread.
4. If none exist in the lineage, create and attach a new manager.

This mirrors context inheritance in modern frameworks and avoids global
state entirely.

## Multiple Manager Namespaces

Managers are identified by an optional `id`:

- Calling `get_or_create_loop_manager(id="render-loop")`
  creates or retrieves a loop dedicated to rendering operations.
- Calling `get_or_create_loop_manager(id="background-io")`
  yields an entirely separate loop for I/O-heavy async tasks.

This encourages clean separation of execution domains without rewriting
infrastructure.

Typical Usage

```py
def worker():
    loop_mgr = get_or_create_loop_manager(id="component")
    loop_mgr.start(task_type="render")

    # Submit coroutine and block for result
    value = loop_mgr.submit_task(
        some_async_function(),
        return_sync_future=True,
        task_type="render",
     )
     value = value.result()
     return value
```

## Best Practices

- Always resolve loop managers *inside* worker threads to avoid sharing the
  main thread’s manager unintentionally.
- Use task-type protection to ensure coroutine routing guarantees.
- Use unique IDs for logically distinct async domains.
- Never directly interact with the internal REGISTRY.

This module provides a safe, controlled, extensible foundation for mixing
async and synchronous workloads in complex frameworks such as the Duck
Lively Component system.
"""

import asyncio
import threading

from typing import (
    Union,
    Coroutine,
    Optional,
    Any,
    List,
    Dict,
)

from duck.utils.threading import SyncFuture, async_to_sync_future
from duck.utils.threading.patch import get_parent_thread


REGISTRY = {}


[docs] def get_or_create_loop_manager( id: Optional[Any] = None, force_create: bool = False, strictly_get: bool = False, ) -> "AsyncioLoopManager": """ Retrieve or create an `AsyncioLoopManager` instance scoped to the current thread lineage, with optional namespace isolation. This function provides a hierarchical registry: each thread may own multiple managers, identified by user-supplied `id` namespaces. If a manager is not found in the current thread, its parent thread is searched recursively, allowing thread families to inherit managers unless isolation is explicitly requested. **Thread-scoped behavior** - Without `force_create`, the manager resolution walks upward through parent threads until it finds a matching manager for the given `id`. - With `force_create=True`, a new manager is always created for the current thread, regardless of parent managers. **Use cases** - Worker threads that must run async tasks in their own protected loop. - Request-scoped or component-scoped corruption-free async execution. - Shared background async loop across a thread tree, unless isolation is desired. Args: id (Optional[Any]): Optional namespace key. Managers with different `id` values can coexist within the same thread. Using the same `id` retrieves the same manager. Examples: ```py default_mgr = get_or_create_loop_manager() io_mgr = get_or_create_loop_manager(id="io") job_mgr = get_or_create_loop_manager(id="job-executor") ``` force_create (bool): If True, bypasses lineage resolution and creates a fresh manager bound to the current thread. This is required inside worker threads where isolated loops are needed. strictly_get (bool): Whether to strictly get the loop manager or raise an exception if manager not found. Raises: ManagerNotFound: Raised if manager not found and `strictly_get=True`. TypeError: If arguments `strictly_get` and `force_create` are both True. Notes: - Calling this function in the main thread makes the returned manager automatically inherited by all descendant threads unless they specify `force_create=True`. - The lookup chain stops at the root thread; if no manager is found, a new one is created and registered. - If you already have created manager, use `strictly_get` argument to strictly get your created manager or raise an exception if manager not found without creating new one. Returns: AsyncioLoopManager: The resolved manager or a newly created instance. """ def resolve(thread: threading.Thread): managers = REGISTRY.get(thread.ident) if managers: # Manager namespace found manager = managers.get(id, None) if manager is None and not strictly_get: manager = AsyncioLoopManager(thread, _id=id) managers[id] = manager return manager # No managers registered for this thread; check parent parent = get_parent_thread(thread) if parent is not None: return resolve(parent) return None if strictly_get and force_create: raise TypeError("Arguments 'strictly_get' and 'force_create' cannot be both True.") current = threading.current_thread() manager = None if not force_create: manager = resolve(current) # If resolution failed (root), create a new entry if manager is None: if strictly_get: raise ManagerNotFound(f"Strict getting of manager is True yet the manager with id '{id}' could not be resolved. Current thread: {current}.") # Create new manager manager = AsyncioLoopManager(current, _id=id) try: managers = REGISTRY[current.ident] managers[id] = manager except KeyError: # Registry has no target key managers = REGISTRY[current.ident] = {id: manager} return manager
[docs] class ManagerNotFound(Exception): """ Raised if manager cannot be resolved and user strictly wants to get the manager and user doesn't allow creating it if it doesn't exist. """
[docs] class UnknownAsyncTaskError(Exception): """ Raised when attempting to submit a coroutine of a disallowed or unexpected type to a protected asyncio loop. """ def __init__(self, given_type, expected_type): message = ( f"Async task type '{given_type}' is not permitted in this event loop " f"(required type: '{expected_type}').\n" "To safely submit this task, restart AsyncioLoopManager with the desired task_type, or reinitialize." ) super().__init__(message) self.given_type = given_type self.expected_type = expected_type
[docs] class AsyncioLoopManager: """ A manager that runs an asyncio event loop in a background thread, with optional task type protection. Notes: - This starts an asyncio event loop in a single global thread. - Allows submitting coroutines from synchronous code. - Supports synchronous result blocking via SyncFuture. - Can restrict tasks by type to prevent inappropriate coroutine submission to protected loops. - Using the event loop directly is not recommended and it seems not to work, use `submit_task` instead. """ __instances = [] """ This is the list of created instances. """ def __init__(self, creator_thread: Optional[threading.Thread] = None, _id: Optional[Union[str, int]] = None): """ Initialize the threadpool. Args: creator_thread (Optional[threading.Thread]): This is the thread responsible for this manager.' _id (Union[str, int]): A custom Unique Identifier for the manager. """ self._creator_thread = creator_thread self._loop: Optional[asyncio.AbstractEventLoop] = None self._thread: Optional[threading.Thread] = None self._task_type: Optional[str] = None self._id = _id or "default-%s"%id(self) AsyncioLoopManager.__instances.append(self)
[docs] @classmethod def all_instances(cls) -> List["AsyncioLoopManager"]: """ Returns a list of all created instances so far. """ return ThreadPoolManger.__instances
[docs] @classmethod def registry(cls) -> Dict[int, Dict[Any, "AsyncioLoopManager"]]: """ Returns the registry for created instances. Useful for tracking. """ return REGISTRY
[docs] def start(self, task_type: Optional[str] = None): """ Starts the event loop in a background thread if it's not already running. Args: task_type (Optional[str]): The expected type of async tasks for this loop. If set, only tasks with matching type will be accepted. Raises: RuntimeError: If event loop is not None and loop's thread is alive. """ if self._loop is None: self._loop = asyncio.new_event_loop() self._task_type = task_type def run_loop(): asyncio.set_event_loop(self._loop) self._loop.run_forever() if self._thread is None or not self._thread.is_alive(): self._thread = threading.Thread(target=run_loop, daemon=True) self._thread.start() else: raise RuntimeError("Cannot start, asyncio event loop is already running.")
[docs] def submit_task( self, coro: Coroutine, return_sync_future: bool = False, task_type: Optional[str] = None ) -> Union[SyncFuture, asyncio.Future]: """ Submit an asynchronous coroutine to the event loop, with optional task type protection. Args: coro (Coroutine): The coroutine to schedule. return_sync_future (bool): If True, wraps the result in a SyncFuture for blocking use. task_type (Optional[str]): The type of this async task; must match manager's required type if set. Returns: Union[SyncFuture, asyncio.Future]: A future representing the scheduled coroutine. Raises: UnknownAsyncTaskError: If the provided type doesn't match the loop's protected type. RuntimeError: If the event loop is not running. """ if self._loop is not None and self._loop.is_running(): # Enforce task type protection if set if self._task_type is not None: if task_type != self._task_type: raise UnknownAsyncTaskError(task_type, self._task_type) future = asyncio.run_coroutine_threadsafe(coro, self._loop) if return_sync_future: future = async_to_sync_future(future) return future else: raise RuntimeError("Event loop is not running. Method start() must be called first.")
[docs] def get_event_loop(self) -> asyncio.AbstractEventLoop: """ Returns the running asyncio event loop. Returns: asyncio.AbstractEventLoop: The currently running event loop. Raises: RuntimeError: If the loop isn't running. """ if self._loop is not None and self._loop.is_running(): return self._loop else: raise RuntimeError("Event loop is not running. Call start() first.")
[docs] def stop(self): """ Stops the event loop and waits for the thread to finish. Notes: This should be called to clean up the background loop thread. """ if self._loop: self._loop.call_soon_threadsafe(self._loop.stop) if self._thread: self._thread.join(1) self._loop = None self._thread = None self._task_type = None
[docs] def __str__(self): return f"<{self.__class__.__name__} id='{self._id}', creator_thread={self._creator_thread}>"
[docs] def __repr__(self): return f"<{self.__class__.__name__} id='{self._id}', creator_thread={self._creator_thread}>"