duck.contrib.sync.smart_async

Smart module for high-concurrency async execution of synchronous callables, with intelligent handling of transactional/atomic operations.

Features:

  • Runs regular sync code on any available thread for maximal concurrency.

  • Routes transactional/atomic database operations to a pool of specialized threads.

  • Ensures thread (and connection) affinity for sync-to-async calls within a transaction context.

  • Dynamically scales up transactional threads as needed, up to max_threads.

  • Reliable queueing and error handling; safe for production usage.

Usage Example:

import time
import asyncio

def regular_func(x):
    time.sleep(1)
    return x * x

async def atomic_func(num_times):
    results = []
    x = 0

    def some_db_func():
        nonlocal x
        time.sleep(1)
        results.append(x + 1)
        x += 1

    with transaction_context():
        for i in range(num_times):
            await smart_sync_to_async(some_db_func)()
    return results

async def main():
    print("Regular ops concurrently:")
    st_time = time.time()
    results = await asyncio.gather(
        *(smart_sync_to_async(regular_func)(i) for i in range(8))
    )
    print(results)

    st_time = time.time()
    print("
Atomic ops sequentially (single transaction context):")
    atomic_results = await atomic_func(8)
    print(atomic_results)

asyncio.run(main())

The core principal of async responsiveness, use of small tasks rather than awaiting long running task or even converting to async.

Module Contents

Classes

TransactionThread

Dedicated thread for executing atomic/transactional database operations. Each thread maintains its own DB connection context. All tasks submitted are executed serially, preserving transaction context.

TransactionThreadPool

Dynamically scalable pool of TransactionThread objects.

disable_transaction_context

Context manager that temporarily disables any active transaction context. While inside this block, in_transaction_context() returns None.

transaction_context

Custom transaction context manager for testing purposes. This context manager simulates Django’s transaction.atomic() for testing. Sets a contextvar so that in_transaction_context() can detect when code is running inside a transaction context.

Functions

in_transaction_context

Returns a unique ID (str) if currently inside a DB transaction context/atomic block. Notes:

is_transactional

Heuristically determine if a function is transactional/atomic. For Django: checks for ‘transaction.atomic’ in source or ‘is_atomic’ attribute.

sync_to_async

High-concurrency async wrapper for synchronous functions.

Data

T

_TRANSACTION_THREAD_POOL

_transaction_context_id_var

API

duck.contrib.sync.smart_async.T

‘TypeVar(…)’

exception duck.contrib.sync.smart_async.TaskTookTooLongWarning[source]

Bases: UserWarning

Warning when a task took too much time executing as this might exhaust the threadpool and might cause significant performance degradation (hangs subsequently).

Initialization

Initialize self. See help(type(self)) for accurate signature.

class duck.contrib.sync.smart_async.TransactionThread(context_id=None)[source]

Bases: threading.Thread

Dedicated thread for executing atomic/transactional database operations. Each thread maintains its own DB connection context. All tasks submitted are executed serially, preserving transaction context.

Notes:

  • This thread can also be used in non-transactional general contexts.

Initialization

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

__repr__()[source]
__str__()[source]
current_task_executing() Optional[Any][source]

Returns the current task/callable being executed.

is_free() bool[source]

Returns True if the thread is idle (no task running and queue empty).

run()[source]
shutdown()[source]

Shutsdown the thread but if no task is being executed or after current task finishes.

submit(func: Callable[..., duck.contrib.sync.smart_async.T], *args, **kwargs) asyncio.Future[source]

Puts the task in queue for execution.

Returns:

An asynchronous future you can wait for in async context.

Return type:

asyncio.Fututure

class duck.contrib.sync.smart_async.TransactionThreadPool(max_threads: Optional[int] = None, auto_free_general_threads: bool = True, general_threads_free_level: int = 50)[source]

Dynamically scalable pool of TransactionThread objects.

Features:

  • Context-affinity: threads created with a context_id are stored and reused for that context. These threads are NOT auto-freed, because callers may rely on persistent context-affinity.

  • General threads: threads without context_id are pooled and may be auto-freed when there are more idle general threads than the configured ‘general_threads_free_level’ percentage.

  • Thread creation is bounded by max_threads. max_threads applies to both context and general threads.

Initialization

Initialize the thread pool.

Parameters:
  • max_threads – Maximum number of threads to create (per context/non-context threads). If None will use a reasonable default from duck.utils.threading.get_max_workers().

  • auto_free_general_threads – If True, extra idle general threads will be automatically shut down when the pool has more idle threads than the configured free level.

  • general_threads_free_level – percentage (0-100) of general threads to keep free before freeing additional idle threads. For example, 50 keeps at least half of the general free threads; any extra idle threads will be candidates for freeing.

_maybe_free_general_threads(ignore_threads: Optional[List[duck.contrib.sync.smart_async.TransactionThread]] = None) None[source]

Free extra idle general threads to respect general_threads_free_level.

Parameters:

ignore_threads – This is a list of threads to ignore when freeing threads. This may be useful in cases, you want to free but excluding some thread you wanna use.

Calculation:

desired_free_threads = ceil(max_threads * (free_level / 100))
if current_free_threads > desired_free_threads:
    free (current_free_threads - desired_free_threads) # (but keep at least one general thread).
get_thread(context_id: Optional[str] = None) duck.contrib.sync.smart_async.TransactionThread[source]

Return a TransactionThread appropriate for the provided context_id.

If a context_id is given, the pool attempts to return a dedicated thread for that context, creating one if necessary (and if under max_threads). Context-bound threads are not auto-freed by the pool.

If context_id is None, returns an available general thread if free, otherwise may create a new general thread (subject to max_threads). This method may trigger auto-freeing of extra general threads when appropriate.

Parameters:

context_id – This is the context ID of the thread that is needed to run the task.

Returns:

The thread matching the context ID or any free/appropriate thread if no context_id provided.

Return type:

TransactionThread

shutdown(wait: bool = True)[source]

Stop all running threads attached to this pool.

Parameters:

wait – Whether to wait for all threads to stop. Defaults to True.

submit(func: Callable[..., duck.contrib.sync.smart_async.T], *args, context_id=None, **kwargs) asyncio.Future[source]

Puts the task in queue for execution in correct thread according to context ID or just any free/appropriate thread if no context ID provided.

Returns:

An asynchronous future you can wait for in async context.

Return type:

asyncio.Fututure

Raises:

AssertionError – If the returned thread from get_thread is dead/not running.

duck.contrib.sync.smart_async._TRANSACTION_THREAD_POOL

‘TransactionThreadPool(…)’

duck.contrib.sync.smart_async._transaction_context_id_var

‘ContextVar(…)’

class duck.contrib.sync.smart_async.disable_transaction_context[source]

Context manager that temporarily disables any active transaction context. While inside this block, in_transaction_context() returns None.

Usage:

with transaction_context():
    print(in_transaction_context())  # Not None
    with disable_transaction_context():
        print(in_transaction_context())  # None
    print(in_transaction_context())  # Not None
__enter__()[source]
__exit__(exc_type, exc_val, exc_tb)[source]
duck.contrib.sync.smart_async.in_transaction_context() Optional[str][source]

Returns a unique ID (str) if currently inside a DB transaction context/atomic block. Notes:

  • For Django: True if in transaction.atomic, returns thread identity.

  • For unsupported ORMs: returns None.

duck.contrib.sync.smart_async.is_transactional(func: Callable) bool[source]

Heuristically determine if a function is transactional/atomic. For Django: checks for ‘transaction.atomic’ in source or ‘is_atomic’ attribute.

duck.contrib.sync.smart_async.sync_to_async(func: Callable[..., duck.contrib.sync.smart_async.T], *outer_args, **outer_kwargs) Callable[..., asyncio.Future][source]

High-concurrency async wrapper for synchronous functions.

  • Runs sync code in any available thread for maximum concurrency.

  • Detects atomic/transactional operations and routes them to a pool of specialized threads, ensuring all DB operations within a transaction run on the same thread/connection.

  • If called inside a transaction context, all sync_to_async calls for that transaction use the same thread.

  • Returns an awaitable Future with the result.

class duck.contrib.sync.smart_async.transaction_context[source]

Custom transaction context manager for testing purposes. This context manager simulates Django’s transaction.atomic() for testing. Sets a contextvar so that in_transaction_context() can detect when code is running inside a transaction context.

__enter__()[source]
__exit__(exc_type, exc_val, exc_tb)[source]