duck.utils.multiprocessing.proxy¶
Multiprocessing proxy implementation for optimum performance. This module enables
fast inter-process communication using msgpack and shared memory buffers.
Usage:
import multiprocessing
from duck.utils.multiprocessing import ProxyServer, Proxy
class myObject:
pass
server = ProxyServer(bufsize=200)
server.run()
obj = myObject()
obj.a = 100
proxy = server.create_proxy(obj)
def process_entry(proxy: Proxy):
with proxy as p:
print(p.a) # Output: 100
p1 = multiprocessing.Process(target=process_entry, args=[proxy])
p1.start()
p1.join()
TODO:
Make sure dunder methods like
__iter__,__await__to work seemless the same way original objects act.Implement asynchronous protocol
Enable multiprocessing when starting proxy server, for isolation.
Make iterables to work with
create_proxy. Example: ```py proxy = Proxy([1, 2, 3])def process_entry(proxy: Proxy): with proxy as p: print(list(p)) # Must print [1, 2, 3] p.append(4) print(list(p)) # Must print [1, 2, 3, 4] ```
Module Contents¶
Classes¶
Frame that will be written to the shared memory. |
|
Multiprocessing proxy object. This performs actions like |
|
Opcodes for proxy operations. |
|
Server for handling proxy objects. |
Functions¶
Return a readable name for fn. Works for functions, bound methods, functools.partial, and callable objects. |
|
True if callable_obj is a method of obj (bound to that object) or if the function underlying callable_obj is the attribute on obj. |
API¶
- exception duck.utils.multiprocessing.proxy.DataDecodeError[source]¶
Bases:
duck.utils.multiprocessing.proxy.ProxyErrorRaised when there are issues decoding data written to the shared memory.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception duck.utils.multiprocessing.proxy.DataEncodeError[source]¶
Bases:
duck.utils.multiprocessing.proxy.ProxyErrorRaised when there are issues encoding data to be written to the shared memory.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception duck.utils.multiprocessing.proxy.DataTooBigError[source]¶
Bases:
duck.utils.multiprocessing.proxy.ProxyErrorRaised when trying to write data that is too big to the shared memory.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception duck.utils.multiprocessing.proxy.EmptyData[source]¶
Bases:
duck.utils.multiprocessing.proxy.ProxyErrorRaised if data to be read is empty or zero.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- class duck.utils.multiprocessing.proxy.Frame(opcode: int, payload: List[Any])[source]¶
Frame that will be written to the shared memory.
Initialization
Initialize the frame.
- Parameters:
opcode – The operation code for this frame.
payload – The payload for the frame as a list.
- __slots__¶
(‘opcode’, ‘payload’)
- __str__¶
None
- classmethod parse(data: bytes) duck.utils.multiprocessing.proxy.Frame[source]¶
Parse data and produce a frame object.
- exception duck.utils.multiprocessing.proxy.LimitedProxyChaining(max_level: int, target_obj: Optional[str] = None)[source]¶
Bases:
duck.utils.multiprocessing.proxy.ProxyErrorRaised when proxy chaining reaches configured maximum depth. Provides a clear message and suggests mitigations.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- class duck.utils.multiprocessing.proxy.Proxy(proxy_server: duck.utils.multiprocessing.proxy.ProxyServer, idx: int, target_obj: Any, shared_memory: multiprocessing.shared_memory.SharedMemory)[source]¶
Multiprocessing proxy object. This performs actions like
getorsetof anything indirectly on a real object.Use Case:
Indirectly performing actions on objects that are heavy or complex (not serializable) between processes.
Notes:
This must be created in process where the real object resides and then be used as proxy in another process.
Make sure to delete the proxy object after use, this frees the shared memory also.
Initialization
Initialize the proxy object.
- Parameters:
proxy_server – The proxy server which wants to create this proxy object.
idx – Unique ID for the proxy object.
target_obj – This is the real/target object you want to proxy to.
shared_memory – The target shared memory for this proxy object.
- _DATA_SIZE_PREFIX_LEN: int¶
4
This is the 4-byte length of the
data size prefixwhen placing data in shared memory.… rubric:: Example
20 [0, …]
Where:
20: Is the data size prefix.0: This is the OpCode forGET....: This is extra payload for the data.
- __slots__¶
None
- __str__¶
None
- _callable_prefix¶
‘
-’
- _cls_attrs¶
None
These are attributes that belong soley on this proxy object but not the target object.
- _proxy_prefix¶
‘
-’
- get_response(frame: duck.utils.multiprocessing.proxy.Frame, timeout: Optional[float] = None) Union[Any, duck.utils.multiprocessing.proxy.Proxy, Callable][source]¶
This makes a request to the proxy server and returns the correct response/result.
Important behaviour:
If the response is a Proxy descriptor (structured dict or legacy string), a new Proxy object is returned and its chain level is incremented.
If the response is a normal (real/serializable) value or a callable descriptor, this proxy’s chain level is reset to 0 (per user’s requirement).
Gets the target shared memory for this proxy object.
- read_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, timeout: Optional[float] = 0.5) duck.utils.multiprocessing.proxy.Frame[source]¶
Reads a frame from shared memory and returns the parsed Frame (or result of Frame.parse).
This waits until the full payload (as indicated by the 4-byte big-endian length prefix) is available in the shared memory or until the optional timeout expires.
- Raises:
TimeoutError – if the full payload isn’t available within
timeout.DataTooBigError – if the indicated payload would exceed the configured buffer size.
DataDecodeError – for other decoding/parsing errors.
EmptyData – If data is empty or size is less than 1 byte.
- write_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, frame: duck.utils.multiprocessing.proxy.Frame) int[source]¶
Write a frame to the shared memory.
The written layout is: [4-byte big-endian uint32 payload-length][payload bytes]
- Returns:
Written payload size in bytes (does not include the 4-byte length prefix).
- Return type:
int
- Raises:
DataTooBigError – if the payload+prefix would not fit into the configured buffer.
- exception duck.utils.multiprocessing.proxy.ProxyError[source]¶
Bases:
ExceptionRaised on multiprocessing proxy issues.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception duck.utils.multiprocessing.proxy.ProxyObjectNotFound[source]¶
Bases:
duck.utils.multiprocessing.proxy.ProxyErrorRaised if target object linked with the proxy is not found.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- class duck.utils.multiprocessing.proxy.ProxyOpCode[source]¶
Bases:
enum.IntEnumOpcodes for proxy operations.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- EXECUTE¶
2
Execute a callable on the real object.
Format: [opcode, [target_object_id, attr, args, kwargs]]
Where:
opcode (int): The operation code for the request i.e. 2 in this case.
target_object_id (int): The target ID for the real object.
attr (str): The method name to execute.
args (tuple): The arguments to parse when executing (as a tuple).
kwargs (dict): Dictionary of positional keyword arguments.
- EXECUTION_RESULT¶
None
This represent the result of an execution for the client.
Format: [opcode, [target_object_id, value, error]]
Where:
opcode (int): The operation code, i.e., -1 in this case.
target_object_id (int): The ID to the target object.
value (Any): Any serializable data (using msgpack).
error (str): Error if any has been encountered.
- GET¶
0
Get an attribute from the real object.
Format: [opcode, [target_object_id, attr]]
Where:
opcode (int): The operation code.
target_object_id (int): The original object ID.
attr (str): The attribute to retrieve.
- RESPONSE_PENDING¶
None
Represents that the server is still processing request and the response is not ready yet.
Format: [opcode, []]
- SET¶
1
Set an attribute on the real object.
Format: [opcode, [target_object_id, attr, value]]
Where:
opcode (int): The operation code.
target_object_id (int): The original object ID.
attr (str): The attribute to alter.
value (Any): The value to set.
- class duck.utils.multiprocessing.proxy.ProxyServer(bufsize: int)[source]¶
Server for handling proxy objects.
Initialization
- _DATA_SIZE_PREFIX_LEN: int¶
4
This is the 4-byte length of the
data size prefixwhen placing data in shared memory.Example:
20 [0, ...]Where:
20: Is the data size prefix.0: This is the OpCode forGET....: This is extra payload for the data.
- create_proxy(target_object: Any, shared_memory: Optional[multiprocessing.shared_memory.SharedMemory] = None) duck.utils.multiprocessing.proxy.Proxy[source]¶
Create a process-safe proxy object.
- Parameters:
target_object – The target object to create the proxy for.
shared_memory – The shared memory to use for communication. None will create a new shared memory.
- handle_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, frame: duck.utils.multiprocessing.proxy.Frame)[source]¶
Handles a frame from the shared memory.
- handle_request_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, frame: duck.utils.multiprocessing.proxy.Frame)[source]¶
Handle a request frame from the client.
- read_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, timeout: Optional[float] = 0.5) duck.utils.multiprocessing.proxy.Frame[source]¶
Reads a frame from shared memory and returns the parsed Frame (or result of Frame.parse).
This waits until the full payload (as indicated by the 4-byte big-endian length prefix) is available in the shared memory or until the optional timeout expires.
- Raises:
TimeoutError – if the full payload isn’t available within
timeout.DataTooBigError – if the indicated payload would exceed the configured buffer size.
DataDecodeError – for other decoding/parsing errors.
EmptyData – If data is empty or size is less than 1 byte.
- run(threaded: bool = True)[source]¶
Runs the current proxy server on current thread.
- Parameters:
threaded – Whether to run the server in a thread. Defaults to True.
- property running: bool¶
Returns boolean on whether if or if not the server is running.
- write_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, frame: duck.utils.multiprocessing.proxy.Frame) int[source]¶
Write a frame to the shared memory.
The written layout is: [4-byte big-endian uint32 payload-length][payload bytes]
- Returns:
Written payload size in bytes (does not include the 4-byte length prefix).
- Return type:
int
- Raises:
DataTooBigError – if the payload+prefix would not fit into the configured buffer.
- duck.utils.multiprocessing.proxy.get_callable_name(fn)[source]¶
Return a readable name for fn. Works for functions, bound methods, functools.partial, and callable objects.
- duck.utils.multiprocessing.proxy.is_method_of(callable_obj, obj)[source]¶
True if callable_obj is a method of obj (bound to that object) or if the function underlying callable_obj is the attribute on obj.
Examples:
is_method_of(a.foo, a) -> True
is_method_of(A.foo, a) -> True (A.foo is the function, still considered ‘a’ method)
is_method_of(a.bar, a) -> False if bar is not on a