Source code for duck.utils.asyncio

"""
Asyncio utilities and helpers.
"""
import asyncio

from typing import Coroutine, Callable, List, Type, Optional


[docs] def create_task( coro: Coroutine, on_complete: Optional[Callable[[asyncio.Task], None]] = None, raise_on_exception: bool = True, ignore_errors: Optional[List[Type[BaseException]]] = None, loop: Optional = None, ) -> asyncio.Task: """ Create an asyncio task and handle exceptions, optionally ignoring specified errors. Args: coro (Coroutine): The coroutine to run. on_complete (Callable): Called with the task when finished. raise_on_exception (bool): Raises exceptions from the task unless ignored. ignore_errors (List[Type[BaseException]]): Exception types to ignore. loop (Optional): Custom event loop to use for creating task. Raises: RuntimeError: If the loop is provided and is not running. Exception or CancelledError: If not ignored and raise_on_exception is True. """ if ignore_errors is None: ignore_errors = [asyncio.CancelledError] if loop and not loop.is_running(): raise RuntimeError("Event loop provided yet it is not running.") task = loop.create_task(coro) if loop else asyncio.create_task(coro) def on_task_done(t: asyncio.Task): try: if raise_on_exception: try: exc = t.exception() # May raise CancelledError! except BaseException as e: # Handle CancelledError and other ignored exceptions ignore_exception = any(issubclass(type(e), ignore) for ignore in ignore_errors) if not ignore_exception: raise else: if exc and not any(issubclass(type(exc), ignore) for ignore in ignore_errors): raise exc finally: if on_complete: try: on_complete(t) except Exception as e: logger.log_exception(e) # Optionally log task.add_done_callback(on_task_done) return task
[docs] def in_async_context() -> bool: """ Check if the current code is running inside an asynchronous context. Returns: bool: True if called within an `async def` coroutine (i.e., there's a running event loop), False otherwise. Example: ``` >>> in_async_context() False >>> async def main(): ... print(in_async_context()) >>> asyncio.run(main()) True ``` """ try: asyncio.get_running_loop() return True except RuntimeError: return False