Source code for duck.utils.threading
"""
Threading utilities and helpers.
"""
import os
import asyncio
import platform
import threading
from typing import Optional
[docs]
def get_max_workers() -> int:
"""
Dynamically calculate a safe max_workers value for ThreadPoolExecutor,
based on CPU count, available memory, stack size, and current system usage.
Works cross-platform (Linux, Windows, macOS). No root required.
Returns:
int: Suggested max_workers value (min 8, max 2000)
"""
import psutil
# --- System info ---
cpu_count = os.cpu_count() or 1
try:
total_memory = psutil.virtual_memory().total
used_memory = psutil.virtual_memory().used
available_memory = psutil.virtual_memory().available
except Exception:
total_memory = 4 * 1024**3 # fallback to 4 GB
available_memory = total_memory * 0.5 # fallback to 50% available
try:
all_threads = sum(p.num_threads() for p in psutil.process_iter())
except Exception:
all_threads = 500 # fallback if counting fails
# --- Estimate stack size ---
try:
if platform.system() == "Windows":
stack_size = 1 * 1024 * 1024 # 1 MB
else:
import resource
stack_size = resource.getrlimit(resource.RLIMIT_STACK)[0]
if stack_size <= 0 or stack_size > 1024**3:
stack_size = 8 * 1024 * 1024 # fallback
except Exception:
stack_size = 8 * 1024 * 1024 # fallback
# --- Limits ---
# 1. CPU limit
cpu_limit = cpu_count * 4
# 2. Memory limit (use only portion of available RAM)
mem_limit = int(available_memory * 0.75 / stack_size)
# 3. Adjust for running threads (leave room)
thread_adjustment = max(0, 2000 - all_threads)
# --- Final decision ---
max_workers = min(cpu_limit, mem_limit, thread_adjustment, 2000)
return max(8, max_workers)
[docs]
def async_to_sync_future(async_future: asyncio.Future) -> "SyncFuture":
"""
Converts an asynchronous future to a synchronous future.
"""
sync_future = SyncFuture()
def _transfer_result(fut: asyncio.Future):
try:
result = fut.result()
sync_future.set_result(result)
except Exception as e:
sync_future.set_exception(e)
async_future.add_done_callback(_transfer_result)
return sync_future
[docs]
class SyncFuture:
"""
A thread-safe future that blocks until a result is set or an exception is raised.
This class mimics a subset of the behavior of asyncio.Future, but for use in
synchronous (threaded) code. It allows one thread to wait for a value or error
that will be provided by another thread.
"""
def __init__(self):
"""
Initializes the SyncFuture with no result or exception.
"""
self._event = threading.Event()
self._result = None
self._exception = None
[docs]
def exception(self) -> Optional[Exception]:
"""
Returns the future exception if set.
"""
return self._exception
[docs]
def set_result(self, value):
"""
Sets the result of the future and unblocks any waiting thread.
Args:
value (Any): The result to return from the `result()` method.
"""
self._result = value
self._event.set()
[docs]
def set_exception(self, exception):
"""
Sets an exception for the future and unblocks any waiting thread.
Args:
exception (Exception): The exception to raise when `result()` is called.
"""
self._exception = exception
self._event.set()
[docs]
def result(self):
"""
Blocks until a result or exception is set, then returns or raises it.
Returns:
Any: The result value previously set by `set_result()`.
Raises:
Exception: If an exception was set using `set_exception()`.
"""
self._event.wait()
if self.exception():
raise self.exception()
return self._result