duck.contrib.sync.smart_async¶
Smart module for high-concurrency async execution of synchronous callables, with intelligent handling of transactional/atomic operations.
Features:
Runs regular sync code on any available thread for maximal concurrency.
Routes transactional/atomic database operations to a pool of specialized threads.
Ensures thread (and connection) affinity for sync-to-async calls within a transaction context.
Dynamically scales up transactional threads as needed, up to max_threads.
Reliable queueing and error handling; safe for production usage.
Usage Example:
import time
import asyncio
def regular_func(x):
time.sleep(1)
return x * x
async def atomic_func(num_times):
results = []
x = 0
def some_db_func():
nonlocal x
time.sleep(1)
results.append(x + 1)
x += 1
with transaction_context():
for i in range(num_times):
await smart_sync_to_async(some_db_func)()
return results
async def main():
print("Regular ops concurrently:")
st_time = time.time()
results = await asyncio.gather(
*(smart_sync_to_async(regular_func)(i) for i in range(8))
)
print(results)
st_time = time.time()
print("
Atomic ops sequentially (single transaction context):")
atomic_results = await atomic_func(8)
print(atomic_results)
asyncio.run(main())
The core principal of async responsiveness, use of small tasks rather than awaiting long running task or even converting to async.
Module Contents¶
Classes¶
Dedicated thread for executing atomic/transactional database operations. Each thread maintains its own DB connection context. All tasks submitted are executed serially, preserving transaction context. |
|
Dynamically scalable pool of TransactionThread objects. |
|
Context manager that temporarily disables any active transaction context. While inside this block, in_transaction_context() returns None. |
|
Custom transaction context manager for testing purposes.
This context manager simulates Django’s |
Functions¶
Returns a unique ID (str) if currently inside a DB transaction context/atomic block. Notes: |
|
Heuristically determine if a function is transactional/atomic. For Django: checks for ‘transaction.atomic’ in source or ‘is_atomic’ attribute. |
|
High-concurrency async wrapper for synchronous functions. |
Data¶
API¶
- duck.contrib.sync.smart_async.T¶
‘TypeVar(…)’
- exception duck.contrib.sync.smart_async.TaskTookTooLongWarning[source]¶
Bases:
UserWarningWarning when a task took too much time executing as this might exhaust the threadpool and might cause significant performance degradation (hangs subsequently).
Initialization
Initialize self. See help(type(self)) for accurate signature.
- class duck.contrib.sync.smart_async.TransactionThread(context_id=None)[source]¶
Bases:
threading.ThreadDedicated thread for executing atomic/transactional database operations. Each thread maintains its own DB connection context. All tasks submitted are executed serially, preserving transaction context.
Notes:
This thread can also be used in non-transactional general contexts.
Initialization
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.
- class duck.contrib.sync.smart_async.TransactionThreadPool(max_threads: Optional[int] = None, auto_free_general_threads: bool = True, general_threads_free_level: int = 50)[source]¶
Dynamically scalable pool of TransactionThread objects.
Features:
Context-affinity: threads created with a context_id are stored and reused for that context. These threads are NOT auto-freed, because callers may rely on persistent context-affinity.
General threads: threads without context_id are pooled and may be auto-freed when there are more idle general threads than the configured ‘general_threads_free_level’ percentage.
Thread creation is bounded by max_threads. max_threads applies to both context and general threads.
Initialization
Initialize the thread pool.
- Parameters:
max_threads – Maximum number of threads to create (per context/non-context threads). If None will use a reasonable default from
duck.utils.threading.get_max_workers().auto_free_general_threads – If True, extra idle general threads will be automatically shut down when the pool has more idle threads than the configured free level.
general_threads_free_level – percentage (0-100) of general threads to keep free before freeing additional idle threads. For example, 50 keeps at least half of the general free threads; any extra idle threads will be candidates for freeing.
- _maybe_free_general_threads(ignore_threads: Optional[List[duck.contrib.sync.smart_async.TransactionThread]] = None) None[source]¶
Free extra idle general threads to respect
general_threads_free_level.- Parameters:
ignore_threads – This is a list of threads to ignore when freeing threads. This may be useful in cases, you want to free but excluding some thread you wanna use.
Calculation:
desired_free_threads = ceil(max_threads * (free_level / 100)) if current_free_threads > desired_free_threads: free (current_free_threads - desired_free_threads) # (but keep at least one general thread).
- get_thread(context_id: Optional[str] = None) duck.contrib.sync.smart_async.TransactionThread[source]¶
Return a
TransactionThreadappropriate for the providedcontext_id.If a
context_idis given, the pool attempts to return a dedicated thread for that context, creating one if necessary (and if undermax_threads). Context-bound threads are not auto-freed by the pool.If
context_idis None, returns an available general thread if free, otherwise may create a new general thread (subject tomax_threads). This method may trigger auto-freeing of extra general threads when appropriate.- Parameters:
context_id – This is the context ID of the thread that is needed to run the task.
- Returns:
The thread matching the context ID or any free/appropriate thread if no context_id provided.
- Return type:
- shutdown(wait: bool = True)[source]¶
Stop all running threads attached to this pool.
- Parameters:
wait – Whether to wait for all threads to stop. Defaults to True.
- submit(func: Callable[..., duck.contrib.sync.smart_async.T], *args, context_id=None, **kwargs) asyncio.Future[source]¶
Puts the task in queue for execution in correct thread according to context ID or just any free/appropriate thread if no context ID provided.
- Returns:
An asynchronous future you can wait for in async context.
- Return type:
asyncio.Fututure
- Raises:
AssertionError – If the returned thread from
get_threadis dead/not running.
- duck.contrib.sync.smart_async._TRANSACTION_THREAD_POOL¶
‘TransactionThreadPool(…)’
- duck.contrib.sync.smart_async._transaction_context_id_var¶
‘ContextVar(…)’
- class duck.contrib.sync.smart_async.disable_transaction_context[source]¶
Context manager that temporarily disables any active transaction context. While inside this block, in_transaction_context() returns None.
Usage:
with transaction_context(): print(in_transaction_context()) # Not None with disable_transaction_context(): print(in_transaction_context()) # None print(in_transaction_context()) # Not None
- duck.contrib.sync.smart_async.in_transaction_context() Optional[str][source]¶
Returns a unique ID (str) if currently inside a DB transaction context/atomic block. Notes:
For Django: True if in
transaction.atomic, returns thread identity.For unsupported ORMs: returns None.
- duck.contrib.sync.smart_async.is_transactional(func: Callable) bool[source]¶
Heuristically determine if a function is transactional/atomic. For Django: checks for ‘transaction.atomic’ in source or ‘is_atomic’ attribute.
- duck.contrib.sync.smart_async.sync_to_async(func: Callable[..., duck.contrib.sync.smart_async.T], *outer_args, **outer_kwargs) Callable[..., asyncio.Future][source]¶
High-concurrency async wrapper for synchronous functions.
Runs sync code in any available thread for maximum concurrency.
Detects atomic/transactional operations and routes them to a pool of specialized threads, ensuring all DB operations within a transaction run on the same thread/connection.
If called inside a transaction context, all sync_to_async calls for that transaction use the same thread.
Returns an awaitable Future with the result.
- class duck.contrib.sync.smart_async.transaction_context[source]¶
Custom transaction context manager for testing purposes. This context manager simulates Django’s
transaction.atomic()for testing. Sets a contextvar so thatin_transaction_context()can detect when code is running inside a transaction context.