Source code for duck.utils.multiprocessing

"""
ProcessSafeLRUCache: A process-safe, shared-memory LRU cache for Python multi-process applications.

This class allows multiple processes to share and update a central in-memory cache,
with Least Recently Used (LRU) eviction and optional per-key expiry, using Python's
`multiprocessing.Manager` for process safety.

Features:
- Process-safe via Manager-proxied dicts and lists.
- True LRU eviction: oldest unused entry is removed once maxkeys is exceeded.
- Optional per-key expiry: keys are removed after a set time.
- API is similar to standard in-memory LRU cache: get, set, delete, has, clear.
- Locking ensures atomicity for compound operations.
- Suitable for multi-process WSGI/ASGI, background jobs, ML/AI workers, caching across service forks.

Example use cases:
- Share a cache between worker processes in Gunicorn, Uvicorn, or multiprocessing scripts.
- Protect frequently-accessed data with expiry and auto-eviction.
- Use in ML/AI pipeline for process-wide memoization or inference result caching.

Usage Example:
```py
import time
import multiprocessing

from duck.utils.multiprocesing import ProcessSafeLRUCache

# Must be called in __main__ for Windows fork support!
if __name__ == "__main__":
    manager = multiprocessing.Manager()
    cache = ProcessSafeLRUCache(maxkeys=5)

    def store_and_read(cache, key, value):
        cache.set(key, value, expiry=10)
        print("Stored", key, "got back", cache.get(key))

    ps = []
    for i in range(10):
        p = multiprocessing.Process(target=store_and_read, args=(cache, str(i), i * 2))
        p.start()
        ps.append(p)
        time.sleep(1)
    
    for p in ps:
        p.join()

    print("Final shared cache:", dict(cache.cache))
"""

import datetime
import multiprocessing


[docs] class ProcessSafeLRUCache: """ ProcessSafeLRUCache(maxkeys=None) A process-safe, shared-memory LRU cache with per-key expiry time. Uses multiprocessing.Manager for process-safe shared dict/list storage. Args: maxkeys (int): Optional maximum number of cache entries (evicts oldest if exceeded). **Methods:** - set(key, value, expiry=None): Set key to value, optionally with expiry time in seconds. - get(key, default=None, pop=False): Get value for key (None/default if missing), optionally popping it. - has(key): Check if key exists (not evicted/expired). - delete(key): Delete a cache entry. - clear(): Remove all cache and expiry data. - close(): Aliased to clear. """ __slots__ = { "manager", "expiry_map", "cache", "lru_order", "maxkeys", "lock", } def __init__(self, maxkeys=None): self.manager = multiprocessing.Manager() self.expiry_map = self.manager.dict() self.cache = self.manager.dict() self.lru_order = self.manager.list() self.maxkeys = maxkeys self.lock = self.manager.Lock()
[docs] def set(self, key: str, value, expiry=None): """ Set a value in the cache, marking it as most recently used. Args: key: Key to set. value: Value to associate with key. expiry: Seconds until key expires (None disables expiry). """ with self.lock: if key in self.cache: try: self.lru_order.remove(key) except ValueError: pass self.cache[key] = value self.lru_order.append(key) if expiry: self.expiry_map[key] = datetime.datetime.now() + datetime.timedelta(seconds=expiry) # LRU eviction (preserves maxkeys) while self.maxkeys and len(self.cache) > self.maxkeys: oldest_key = self.lru_order.pop(0) self.cache.pop(oldest_key, None) self.expiry_map.pop(oldest_key, None)
[docs] def get(self, key: str, default=None, pop=False): """ Get a value from the cache (or default if missing/expired). Args: key: Cache key. default: Value to return if key not present. pop: If True, remove the key after retrieval. Returns: Cached value, or default/None. """ with self.lock: # Expiry check if key in self.expiry_map: expiry_date = self.expiry_map[key] if datetime.datetime.now() >= expiry_date: self.cache.pop(key, None) self.expiry_map.pop(key, None) try: self.lru_order.remove(key) except ValueError: pass return None # LRU update (if not popping) if key in self.cache: try: self.lru_order.remove(key) except ValueError: pass if not pop: self.lru_order.append(key) return self.cache[key] else: value = self.cache.pop(key) self.expiry_map.pop(key, None) return value return default
[docs] def has(self, key: str): """ Check if the cache has a key (not expired/evicted). """ with self.lock: return key in self.cache
[docs] def delete(self, key: str): """ Remove a key from the cache. """ with self.lock: self.cache.pop(key, None) self.expiry_map.pop(key, None) try: self.lru_order.remove(key) except ValueError: pass
[docs] def clear(self): """ Clear the entire cache and expiry info. """ with self.lock: self.cache.clear() self.expiry_map.clear() self.lru_order[:] = []
[docs] def close(self): """ Alias for clear(). """ self.clear()