Source code for duck.utils.multiprocessing.proxy

"""
Multiprocessing proxy implementation for optimum performance. This module enables 
fast inter-process communication using `msgpack` and shared memory buffers.

Usage:

```py
import multiprocessing

from duck.utils.multiprocessing import ProxyServer, Proxy

class myObject:
    pass
    
server = ProxyServer(bufsize=200)
server.run()

obj = myObject()
obj.a = 100

proxy = server.create_proxy(obj)

def process_entry(proxy: Proxy):
    with proxy as p:
        print(p.a) # Output: 100
        
p1 = multiprocessing.Process(target=process_entry, args=[proxy])
p1.start()
p1.join()

```

TODO:
- Make sure dunder methods like `__iter__`, `__await__` to work seemless the same way original objects act.
- Implement asynchronous protocol
- Enable multiprocessing when starting proxy server, for isolation.
- Make iterables to work with `create_proxy`.  
      Example:
      ```py
      proxy = Proxy([1, 2, 3])
      
      def process_entry(proxy: Proxy):
          with proxy as p:
              print(list(p)) # Must print [1, 2, 3]
              p.append(4)
              print(list(p)) # Must print [1, 2, 3, 4]
      ```
"""
import enum
import time
import struct
import threading
import msgpack
import inspect
import functools
import multiprocessing
import multiprocessing.shared_memory as sm

from types import MethodType
from typing import (
    List,
    Dict,
    Optional,
    Any,
    Callable,
    Union,
    Iterable,
)

from duck.exceptions.all import SettingsError


try:
    from duck.logging import logger
except SettingsError:
    # Not inside a Duck project
    from duck.logging import console as logger


[docs] def get_callable_name(fn): """ Return a readable name for fn. Works for functions, bound methods, functools.partial, and callable objects. """ # Unwrap partials if isinstance(fn, functools.partial): return get_callable_name(fn.func) # Bound method (has __func__ and __self__) if inspect.ismethod(fn): return fn.__func__.__name__ # same as fn.__name__ for bound methods # Plain function or builtin if inspect.isfunction(fn) or inspect.isbuiltin(fn): return fn.__name__ # Callable instance (object with __call__) if hasattr(fn, "__call__"): # Prefer the __call__ function name, show class for clarity call_attr = getattr(fn.__call__, "__name__", None) if call_attr and call_attr != "<lambda>": return f"{type(fn).__name__}.__call__" return type(fn).__name__ # Fallback return getattr(fn, "__name__", type(fn).__name__)
[docs] def is_method_of(callable_obj, obj): """ True if callable_obj is a method of obj (bound to that object) or if the function underlying callable_obj is the attribute on obj. Examples: - is_method_of(a.foo, a) -> True - is_method_of(A.foo, a) -> True (A.foo is the function, still considered 'a' method) - is_method_of(a.bar, a) -> False if bar is not on a """ # If it's a bound method, check its __self__ if inspect.ismethod(callable_obj): return callable_obj.__self__ is obj # Unwrap partials if isinstance(callable_obj, functools.partial): return is_method_of(callable_obj.func, obj) # If callable_obj has no name, fall back to False (can't look it up on obj) name = getattr(callable_obj, "__name__", None) if name is None: return False # Try to fetch attribute with same name from the object try: attr = getattr(obj, name) except AttributeError: return False # If attr is a bound method, compare underlying function objects if inspect.ismethod(attr): # attr.__func__ is the function object stored on the class # callable_obj could be either the bound method or the raw function underlying = getattr(callable_obj, "__func__", callable_obj) return attr.__func__ is underlying # For staticmethod or function attribute on the instance/class, compare directly meth_str = f"of {obj.__class__.__name__}" return attr is callable_obj or meth_str in str(callable_obj)
[docs] class ProxyError(Exception): """ Raised on multiprocessing proxy issues. """
[docs] class ProxyObjectNotFound(ProxyError): """ Raised if target object linked with the proxy is not found. """
[docs] class LimitedProxyChaining(ProxyError): """ Raised when proxy chaining reaches configured maximum depth. Provides a clear message and suggests mitigations. """ def __init__(self, max_level: int, target_obj: Optional[str] = None): self.max_level = max_level msg = ( f"Proxy chaining reached the maximum allowed level ({max_level}). " "This usually means your code repeatedly returns non-serializable objects " "which get wrapped as proxy references. Consider increasing the server " "proxy chaining limit, refactoring to return serializable data, or " "exposing a higher-level API on the server side to avoid deep chaining." ) if target_obj: msg = f"{msg} Offending object description: {target_obj!r}." super().__init__(msg)
[docs] class DataDecodeError(ProxyError): """ Raised when there are issues decoding data written to the shared memory. """
[docs] class DataEncodeError(ProxyError): """ Raised when there are issues encoding data to be written to the shared memory. """
[docs] class DataTooBigError(ProxyError): """ Raised when trying to write data that is too big to the shared memory. """
[docs] class EmptyData(ProxyError): """ Raised if data to be read is empty or zero. """
[docs] class ProxyOpCode(enum.IntEnum): """ Opcodes for proxy operations. """ GET = 0 """ Get an attribute from the real object. Format: [opcode, [target_object_id, attr]] Where: - opcode (int): The operation code. - target_object_id (int): The original object ID. - attr (str): The attribute to retrieve. """ SET = 1 """ Set an attribute on the real object. Format: [opcode, [target_object_id, attr, value]] Where: - opcode (int): The operation code. - target_object_id (int): The original object ID. - attr (str): The attribute to alter. - value (Any): The value to set. """ EXECUTE = 2 """ Execute a callable on the real object. Format: [opcode, [target_object_id, attr, args, kwargs]] Where: - opcode (int): The operation code for the request i.e. 2 in this case. - target_object_id (int): The target ID for the real object. - attr (str): The method name to execute. - args (tuple): The arguments to parse when executing (as a tuple). - kwargs (dict): Dictionary of positional keyword arguments. """ EXECUTION_RESULT = -1 """ This represent the result of an execution for the client. Format: [opcode, [target_object_id, value, error]] Where: - opcode (int): The operation code, i.e., -1 in this case. - target_object_id (int): The ID to the target object. - value (Any): Any serializable data (using msgpack). - error (str): Error if any has been encountered. """ RESPONSE_PENDING = -2 """ Represents that the server is still processing request and the response is not ready yet. Format: [opcode, []] """
[docs] class Frame: """ Frame that will be written to the shared memory. """ __slots__ = ("opcode", "payload") def __init__(self, opcode: int, payload: List[Any]): """ Initialize the frame. Args: opcode (int): The operation code for this frame. payload (List[Any]): The payload for the frame as a list. """ self.opcode = opcode self.payload = payload
[docs] @classmethod def parse(self, data: bytes) -> "Frame": """ Parse data and produce a frame object. """ try: opcode, payload = msgpack.unpackb(data, raw=False) return Frame(opcode, payload) except Exception as e: raise DataDecodeError(f"Error decoding data: {e}.")
[docs] def __repr__(self): return f"<[{self.__class__.__name__} opcode={self.opcode}]>"
__str__ = __repr__
[docs] class Proxy: """ Multiprocessing proxy object. This performs actions like `get` or `set` of anything indirectly on a real object. Use Case: - Indirectly performing actions on objects that are heavy or complex (not serializable) between processes. Notes: - This must be created in process where the real object resides and then be used as proxy in another process. - Make sure to delete the proxy object after use, this frees the shared memory also. """ _proxy_prefix = "<proxy>-" # Prefix of another proxy object that is usually created for objects that are not serializable _callable_prefix = "<callable>-" # Prefix of callables that cant be send directly using shared memory _cls_attrs = { "get_shared_memory", "get_response", "read_frame", "write_frame", "idx", "close", "target_obj_str", "shared_memory_name", "_sm", "_bufsize", "_server_running", "_callable_prefix", "_proxy_prefix", "_close_on_delete", "_DATA_SIZE_PREFIX_LEN", "__str__", "__repr__", "__del__", "__enter__", "__exit__", "__class__", "__dir__", "__setitem__", "__getitem__", "_chain_level", "_proxy_chaining_max_level", "_last_error", } """ These are attributes that belong soley on this proxy object but not the target object. """ _DATA_SIZE_PREFIX_LEN: int = 4 """ This is the 4-byte length of the `data size prefix` when placing data in shared memory. Example: 20 [0, ...] Where: - `20`: Is the data size prefix. - `0`: This is the OpCode for `GET`. - `...`: This is extra payload for the data. """ __slots__ = { "idx", "target_obj_str", "shared_memory_name", "_sm", "_bufsize", "_server_running", "_close_on_delete", "_chain_level", "_proxy_chaining_max_level", "_last_error", } def __init__( self, proxy_server: "ProxyServer", idx: int, target_obj: Any, shared_memory: sm.SharedMemory, ): """ Initialize the proxy object. Args: proxy_server (ProxyServer): The proxy server which wants to create this proxy object. idx (int): Unique ID for the proxy object. target_obj (Any): This is the real/target object you want to proxy to. shared_memory (sm.SharedMemory): The target shared memory for this proxy object. """ assert isinstance(proxy_server, ProxyServer), f"The proxy_server must be an instance of ProxyServer not {type(proxy_server)}" assert target_obj is not None, f"Target object must not be None." assert isinstance(shared_memory, sm.SharedMemory), f"The shared_memory must be an instance of SharedMemory not {type(shared_memory)}." self.idx = idx self.target_obj_str = str(target_obj) self.shared_memory_name = shared_memory.name self._sm: Optional[sm.SharedMemory] = None self._bufsize = proxy_server._bufsize self._close_on_delete = True self._last_error = None if isinstance(target_obj, Iterable): # Don't use the default string representation of iterables because this may reflect wrong data # as the str representation may not include current available items self.target_obj_str = str(type(target_obj)) # Chain-level (how many proxies have been created between the original server and this client) self._chain_level = 0 # Default max; if a server provided an explicit max it will be applied later in get_response self._proxy_chaining_max_level = getattr(proxy_server, "_proxy_chaining_max_level", 7) if proxy_server else 7 if not proxy_server.running: raise ProxyError("The provided proxy_server is not running. Make sure `run` is called on the proxy_server for running the server.") self._server_running = True
[docs] def get_shared_memory(self) -> sm.SharedMemory: """ Gets the target shared memory for this proxy object. """ if not self._sm: # Open existing shared memory block by name (created by server) self._sm = sm.SharedMemory(name=self.shared_memory_name) return self._sm
[docs] def read_frame(self, shared_memory: sm.SharedMemory, timeout: Optional[float] = 0.5) -> Frame: """ Reads a frame from shared memory and returns the parsed Frame (or result of Frame.parse). This waits until the full payload (as indicated by the 4-byte big-endian length prefix) is available in the shared memory or until the optional timeout expires. Raises: TimeoutError: if the full payload isn't available within `timeout`. DataTooBigError: if the indicated payload would exceed the configured buffer size. DataDecodeError: for other decoding/parsing errors. EmptyData: If data is empty or size is less than 1 byte. """ buffer = shared_memory.buf # memoryview # Ensure prefix bytes exist if len(buffer) < self._DATA_SIZE_PREFIX_LEN: raise DataDecodeError("Shared memory buffer is smaller than the length-prefix size.") try: # Read the 4-byte big-endian length prefix prefix_bytes = bytes(buffer[: self._DATA_SIZE_PREFIX_LEN]) data_length = struct.unpack(">I", prefix_bytes)[0] except struct.error as e: raise DataDecodeError(f"Failed to unpack data length prefix: {e}") # Check if total_bytes is not greater than bufsize total_bytes = data_length + self._DATA_SIZE_PREFIX_LEN if total_bytes > self._bufsize: raise DataTooBigError( f"The data to be read is too big. Max is {self._bufsize} byte(s) but data to be read is {total_bytes} bytes." ) start = self._DATA_SIZE_PREFIX_LEN end = start + data_length start_time = time.time() if data_length == 0: raise EmptyData("Data to be read is empty or zero.") try: # Try to read until we get the expected number of payload bytes or timeout. while True: payload = bytes(buffer[start:end]) if len(payload) == data_length: break if timeout is not None and (time.time() - start_time >= timeout): raise TimeoutError( f"Timed out reading full data. Expected {data_length} byte(s) but got {len(payload)} byte(s)." ) # Small sleep to avoid tight busy loop; adjust as needed for your use-case. time.sleep(0.001) # Parse the frame payload return Frame.parse(payload) except Exception as e: raise DataDecodeError(f"Error decoding data: {e}") from e
[docs] def write_frame(self, shared_memory: sm.SharedMemory, frame: Frame) -> int: """ Write a frame to the shared memory. The written layout is: [4-byte big-endian uint32 payload-length][payload bytes] Returns: int: Written payload size in bytes (does not include the 4-byte length prefix). Raises: DataTooBigError: if the payload+prefix would not fit into the configured buffer. """ assert isinstance(frame, Frame), f"Frame must be an instance of Frame not {type(frame)}." # Pack payload using msgpack data = msgpack.packb([frame.opcode, frame.payload], use_bin_type=True) size = len(data) total_bytes = size + self._DATA_SIZE_PREFIX_LEN if total_bytes > self._bufsize: raise DataTooBigError( f"Data too large to fit into shared memory buffer: {total_bytes} byte(s) is greater than {self._bufsize} byte(s)." ) # Write 4-byte length prefix then payload buffer = shared_memory.buf # memoryview supports slice assignment with bytes buffer[: self._DATA_SIZE_PREFIX_LEN] = struct.pack(">I", size) buffer[self._DATA_SIZE_PREFIX_LEN : total_bytes] = data # Return written bytes return size
[docs] def get_response(self, frame: Frame, timeout: Optional[float] = None) -> Union[Any, "Proxy", Callable]: """ This makes a request to the proxy server and returns the correct response/result. Important behaviour: - If the response is a Proxy descriptor (structured dict or legacy string), a new Proxy object is returned and its chain level is incremented. - If the response is a normal (real/serializable) value or a callable descriptor, this proxy's chain level is reset to 0 (per user's requirement). """ valid_opcodes = [ ProxyOpCode.GET, ProxyOpCode.SET, ProxyOpCode.EXECUTE, ] assert frame.opcode in valid_opcodes, f"OpCode not recognized, available opcodes: {valid_opcodes}." # Retrieve the shared memory. sm = self.get_shared_memory() # Send/write data to shared memory. self.write_frame(sm, frame) # Read response response_frame = None start_time = time.time() while True: if timeout and (time.time() - start_time >= timeout): raise TimeoutError(f"Request timed out: Got no response in {timeout: .2f} seconds.") try: response_frame = self.read_frame(sm, timeout=0.001) if response_frame.opcode == ProxyOpCode.EXECUTION_RESULT: break except DataDecodeError: # Maybe data is not enough yet to be decoded. pass except TimeoutError: pass # Sleep to avoid busy loop time.sleep(0.0005) if response_frame.opcode == ProxyOpCode.EXECUTION_RESULT: target_object_id, value, error = response_frame.payload if error: # Provide the error type and message from server raise ProxyError(f"Server-side error when operating on target object id={target_object_id}: {error}") if isinstance(value, str) and value.startswith(self._proxy_prefix): # Server is trying to tell us that this is another proxy object, usually sent if data cannot be serialized. target_object_str = value.split(self._proxy_prefix, 1)[-1] # Use default max level stored on this Proxy instance max_level = getattr(self, "_proxy_chaining_max_level", 7) chain_level = getattr(self, "_chain_level", 0) # Implement proxy chaining check logic if chain_level + 1 > max_level: raise LimitedProxyChaining(max_level, target_obj=target_object_str) dummy_proxy_server = ProxyServer(bufsize=self._bufsize) dummy_proxy_server._running = 1 dummy_target_object = object() # Create new proxy object proxy = Proxy( proxy_server=dummy_proxy_server, idx=target_object_id, target_obj=dummy_target_object, shared_memory=sm, ) # Update vital attributes proxy.target_obj_str = target_object_str proxy._chain_level = chain_level + 1 proxy._proxy_chaining_max_level = max_level # Return the proxy object. return proxy elif isinstance(value, str) and value.startswith(self._callable_prefix): # This is a method method = value.split(self._callable_prefix, 1)[-1] # Reset chain level on this proxy as we've received a real (callable descriptor) response try: self._chain_level = 0 except Exception: # best-effort; ignore if attribute not present pass # Create custom method that makes a request for execution upon callback def execute_on_target(*args, **kwargs): request_frame = Frame(opcode=ProxyOpCode.EXECUTE, payload=[target_object_id, method, args, kwargs]) return self.get_response(request_frame) # Return proxy callable return execute_on_target # At this point it's a real/serializable value: reset chain level per user's request. try: self._chain_level = 0 except Exception: # best-effort; ignore if attribute not present pass # Return the value return value else: raise ProxyError(f"Unknown response from server: {response_frame}.")
[docs] def close(self): """ Closes the shared memory for the proxy object. """ try: sm = self.get_shared_memory() sm.close() sm.unlink() except FileNotFoundError: # Shared memory nolonger available pass except Exception: # Be quiet on other cleanup errors pass
[docs] def __getattribute__(self, key): super_getattr = super().__getattribute__ super_setattr = super().__setattr__ last_error = super_getattr('_last_error') if last_error: # We manually return immediately inside this method because if we don't, # any exception encountered inside context manager will be suppressed super_setattr('_last_error', None) return if key in type(self)._cls_attrs: return super().__getattribute__(key) # Get response from the server. get_response = super_getattr("get_response") idx = super_getattr('idx') request_frame = Frame(opcode=ProxyOpCode.GET, payload=[idx, key]) result = get_response(request_frame) return result
[docs] def __setattr__(self, key, value): super_getattr = super().__getattribute__ if key in type(self)._cls_attrs: return super().__setattr__(key, value) # Send a request for setting attribute and get response. get_response = super_getattr("get_response") idx = super_getattr('idx') request_frame = Frame(opcode=ProxyOpCode.SET, payload=[idx, key, value]) return get_response(request_frame)
[docs] def __delattr__(self, key): super_getattr = super().__getattribute__ if key in type(self)._cls_attrs: return super().__delattr__(key) # Send a request for deleting the attribute to the server. get_response = super_getattr("get_response") idx = super_getattr('idx') request_frame = Frame(opcode.ProxyOpCode.EXECUTE, [idx, "__delattr__", (), {}]) return get_response(request_frame)
[docs] def __enter__(self): return self
[docs] def __exit__(self, exc_type, exc, exc_tb): try: self.close() finally: if exc: super().__setattr__("_last_error", exc) return False
[docs] def __getitem__(self, key): super_getattr = super().__getattribute__ get_response = super_getattr("get_response") idx = super_getattr('idx') args = [key] kwargs = {} request_frame = Frame(opcode=ProxyOpCode.EXECUTE, payload=[idx, "__getitem__", args, kwargs]) result = get_response(request_frame) return result
[docs] def __setitem__(self, key, value): super_getattr = super().__getattribute__ get_response = super_getattr("get_response") idx = super_getattr('idx') args = [key, value] kwargs = {} request_frame = Frame(opcode=ProxyOpCode.EXECUTE, payload=[idx, "__setitem__", args, kwargs]) result = get_response(request_frame) return result
[docs] def __del__(self): if not self._close_on_delete: super_del = getattr(super(), '__del__', None) if super_del is not None: super_del() return try: if not getattr(self, "_server_running", False): # Server is not running self.close() # Try sending a request for cleanup super_getattr = super().__getattribute__ get_response = super_getattr("get_response") idx = super_getattr('idx') request_frame = Frame(opcode=ProxyOpCode.EXECUTE, payload=[idx, "__del__"]) try: return get_response(request_frame) except Exception: # Failed to do cleanup using the server as a mediator pass finally: self.close() finally: super_del = getattr(super(), '__del__', None) if super_del is not None: super_del() # Close on our end just in case self.close()
[docs] def __repr__(self): # Only define __repr__, __str__ must be resolved on real object return f"<[{self.__class__.__name__} {self.target_obj_str}]>"
__str__ = __repr__
[docs] class ProxyServer: """ Server for handling proxy objects. """ _DATA_SIZE_PREFIX_LEN: int = 4 """ This is the 4-byte length of the `data size prefix` when placing data in shared memory. Example: ```py 20 [0, ...] ``` Where: - `20`: Is the data size prefix. - `0`: This is the OpCode for `GET`. - `...`: This is extra payload for the data. """ def __init__(self, bufsize: int): # map id(target) -> [target_obj, proxy, shared_memory] self.proxy_objects: Dict[int, List[Any, Proxy, sm.SharedMemory]] = {} # Define more attributes self.creator_process = multiprocessing.current_process() self.server_thread: Optional[threading.Thread] = None self._running = False self._bufsize = bufsize self._threaded = False # Whether to enable proxy chaining. Condition on whether to wrap # all objects that are not serializable as Proxy objects. self._wrap_unserializable_objects = True self._proxy_chaining_max_level = 7 # Lock to avoid race conditions when manipulating proxy_objects self._lock = threading.RLock() @property def running(self) -> bool: """ Returns boolean on whether if or if not the server is running. """ return self._running
[docs] def run(self, threaded: bool = True): """ Runs the current proxy server on current thread. Args: threaded (bool): Whether to run the server in a thread. Defaults to True. """ assert not self._running, "Proxy server already running." self._running = True self._threaded = threaded def _run(): while self._running: try: # Copy items to avoid "dictionary changed size during iteration" with self._lock: items = list(self.proxy_objects.items()) for _, data in items: # data is [target_object, proxy, shared_memory] data: List[Any, Proxy, sm.SharedMemory] _, proxy, shared_memory = data try: request_frame = self.read_frame(shared_memory) if request_frame.opcode in [ProxyOpCode.GET, ProxyOpCode.SET, ProxyOpCode.EXECUTE]: # Only handle request if the data in shared memory is written by the client not us. self.handle_request_frame(shared_memory, request_frame) except EmptyData: # Nothing to do. pass except DataDecodeError as e: # Malformed data, surface it. raise e # Reraise exception except Exception: raise except RuntimeError as e: if "dictionary changed size" in str(e): continue else: raise e except Exception as e: logger.log_exception(e) # Log exception # Sleep a little bit. time.sleep(0.001) if threaded: self.server_thread = threading.Thread(target=_run) self.server_thread.start() else: _run() # Run directly
[docs] def read_frame(self, shared_memory: sm.SharedMemory, timeout: Optional[float] = 0.5) -> Frame: """ Reads a frame from shared memory and returns the parsed Frame (or result of Frame.parse). This waits until the full payload (as indicated by the 4-byte big-endian length prefix) is available in the shared memory or until the optional timeout expires. Raises: TimeoutError: if the full payload isn't available within `timeout`. DataTooBigError: if the indicated payload would exceed the configured buffer size. DataDecodeError: for other decoding/parsing errors. EmptyData: If data is empty or size is less than 1 byte. """ return Proxy.read_frame(self, shared_memory, timeout)
[docs] def write_frame(self, shared_memory: sm.SharedMemory, frame: Frame) -> int: """ Write a frame to the shared memory. The written layout is: [4-byte big-endian uint32 payload-length][payload bytes] Returns: int: Written payload size in bytes (does not include the 4-byte length prefix). Raises: DataTooBigError: if the payload+prefix would not fit into the configured buffer. """ return Proxy.write_frame(self, shared_memory, frame)
[docs] def handle_request_frame(self, shared_memory: sm.SharedMemory, frame: Frame): """ Handle a request frame from the client. """ if self._threaded: th = threading.Thread(target=self.handle_frame, args=[shared_memory, frame]) th.start() else: self.handle_frame(shared_memory, frame)
[docs] def handle_frame(self, shared_memory: sm.SharedMemory, frame: Frame): """ Handles a frame from the shared memory. """ error = None result = None target_object_id = None target_object = None # Write a frame that we are now handling the request self.write_frame(shared_memory, Frame(ProxyOpCode.RESPONSE_PENDING, [])) try: target_object_id = frame.payload[0] # Get object but with a lock with self._lock: entry = self.proxy_objects.get(target_object_id, [None, None, None]) # Assign target object. target_object, _, __ = entry if target_object is None: # Doing not target_object can cause issues for empty iterables like lists/dicts raise ProxyObjectNotFound(f"Proxy target object with ID `{target_object_id}` not found. It may have been deleted or never declared.") if frame.opcode == ProxyOpCode.GET: _, key = frame.payload result = getattr(target_object, key) elif frame.opcode == ProxyOpCode.SET: _, key, value = frame.payload setattr(target_object, key, value) elif frame.opcode == ProxyOpCode.EXECUTE: _, method_name, args, kwargs = frame.payload resolved_method = getattr(target_object, method_name) result = resolved_method(*args, **kwargs) except Exception as e: error = e def is_serializable(data: Any) -> bool: """ Returns a boolean on whether if the data can be serialized by msgpack. """ return isinstance(data, (tuple, int, str, dict, set, list, bool)) if data is not None else True if not error: if callable(result): # This is a method returned as a result if not is_method_of(result, target_object): raise ProxyError(f"Callable must be a method of {self}. Independant callables are not allowed. Got {result}.") method_name = get_callable_name(result) result = f"{Proxy._callable_prefix}{method_name}" if not is_serializable(result) and self._wrap_unserializable_objects: # Create another proxy reference proxy_result = self.create_proxy(result, shared_memory) target_object_id = proxy_result.idx target_object_str = proxy_result.target_obj_str result = f"{Proxy._proxy_prefix}{target_object_str}" # Tweak the proxy result and delete it proxy_result._close_on_delete = False del proxy_result # Write response frame response_frame = Frame(ProxyOpCode.EXECUTION_RESULT, [target_object_id, result, None]) try: self.write_frame(shared_memory, response_frame) except TypeError as e: # Error serializing object. if self._wrap_unserializable_objects: # Create another proxy reference proxy_result = self.create_proxy(result, shared_memory) target_object_id = proxy_result.idx target_object_str = proxy_result.target_obj_str result = f"{Proxy._proxy_prefix}{target_object_str}" # Tweak the proxy result and delete it proxy_result._close_on_delete = False del proxy_result # Alter response frame inplace and retry writting. response_frame.payload = [target_object_id, result, None] self.write_frame(shared_memory, response_frame) else: raise e # Reraise exception else: error = f"{error.__class__.__name__}: {error}" response_frame = Frame(ProxyOpCode.EXECUTION_RESULT, [target_object_id, None, error]) self.write_frame(shared_memory, response_frame)
[docs] def create_proxy(self, target_object: Any, shared_memory: Optional[sm.SharedMemory] = None) -> Proxy: """ Create a process-safe proxy object. Args: target_object (Any): The target object to create the proxy for. shared_memory (Optional[sm.SharedMemory]): The shared memory to use for communication. None will create a new shared memory. """ super_del = getattr(target_object, '__del__', None) def wrapped_del(): """ Custom version of delete which closes the shared memory upon delete. """ try: data = self.proxy_objects.get(idx) if data: _, __, shared_mem = data try: shared_mem.close() shared_mem.unlink() except Exception: # best-effort cleanup pass with self._lock: self.proxy_objects.pop(idx, None) finally: # Finally call original __del__ (if available) if super_del is not None: super_del() idx = id(target_object) with self._lock: if idx in self.proxy_objects: raise ProxyError("The provided object already exists as a proxy. Please provide a different object or remove the existing proxy first.") # Create a shared memory block for the proxy object. shared_memory = shared_memory or sm.SharedMemory(create=True, size=self._bufsize, name=f"shared-memory-{idx}") proxy = Proxy(self, idx, target_object, shared_memory) self.proxy_objects[idx] = [target_object, proxy, shared_memory] # Patch the target object's __del__ to perform cleanup, but do so carefully try: target_object.__del__ = wrapped_del except Exception: # Some objects may not allow attribute assignment; that's acceptable. pass # Finally, return the proxy object return proxy