duck.utils.multiprocessing.proxy

Multiprocessing proxy implementation for optimum performance. This module enables fast inter-process communication using msgpack and shared memory buffers.

Usage:

import multiprocessing

from duck.utils.multiprocessing import ProxyServer, Proxy

class myObject:
    pass

server = ProxyServer(bufsize=200)
server.run()

obj = myObject()
obj.a = 100

proxy = server.create_proxy(obj)

def process_entry(proxy: Proxy):
    with proxy as p:
        print(p.a) # Output: 100

p1 = multiprocessing.Process(target=process_entry, args=[proxy])
p1.start()
p1.join()

TODO:

  • Make sure dunder methods like __iter__, __await__ to work seemless the same way original objects act.

  • Implement asynchronous protocol

  • Enable multiprocessing when starting proxy server, for isolation.

  • Make iterables to work with create_proxy. Example: ```py proxy = Proxy([1, 2, 3])

    def process_entry(proxy: Proxy):
        with proxy as p:
            print(list(p)) # Must print [1, 2, 3]
            p.append(4)
            print(list(p)) # Must print [1, 2, 3, 4]
    ```
    

Module Contents

Classes

Frame

Frame that will be written to the shared memory.

Proxy

Multiprocessing proxy object. This performs actions like get or set of anything indirectly on a real object.

ProxyOpCode

Opcodes for proxy operations.

ProxyServer

Server for handling proxy objects.

Functions

get_callable_name

Return a readable name for fn. Works for functions, bound methods, functools.partial, and callable objects.

is_method_of

True if callable_obj is a method of obj (bound to that object) or if the function underlying callable_obj is the attribute on obj.

API

exception duck.utils.multiprocessing.proxy.DataDecodeError[source]

Bases: duck.utils.multiprocessing.proxy.ProxyError

Raised when there are issues decoding data written to the shared memory.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception duck.utils.multiprocessing.proxy.DataEncodeError[source]

Bases: duck.utils.multiprocessing.proxy.ProxyError

Raised when there are issues encoding data to be written to the shared memory.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception duck.utils.multiprocessing.proxy.DataTooBigError[source]

Bases: duck.utils.multiprocessing.proxy.ProxyError

Raised when trying to write data that is too big to the shared memory.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception duck.utils.multiprocessing.proxy.EmptyData[source]

Bases: duck.utils.multiprocessing.proxy.ProxyError

Raised if data to be read is empty or zero.

Initialization

Initialize self. See help(type(self)) for accurate signature.

class duck.utils.multiprocessing.proxy.Frame(opcode: int, payload: List[Any])[source]

Frame that will be written to the shared memory.

Initialization

Initialize the frame.

Parameters:
  • opcode – The operation code for this frame.

  • payload – The payload for the frame as a list.

__repr__()[source]
__slots__

(‘opcode’, ‘payload’)

__str__

None

classmethod parse(data: bytes) duck.utils.multiprocessing.proxy.Frame[source]

Parse data and produce a frame object.

exception duck.utils.multiprocessing.proxy.LimitedProxyChaining(max_level: int, target_obj: Optional[str] = None)[source]

Bases: duck.utils.multiprocessing.proxy.ProxyError

Raised when proxy chaining reaches configured maximum depth. Provides a clear message and suggests mitigations.

Initialization

Initialize self. See help(type(self)) for accurate signature.

class duck.utils.multiprocessing.proxy.Proxy(proxy_server: duck.utils.multiprocessing.proxy.ProxyServer, idx: int, target_obj: Any, shared_memory: multiprocessing.shared_memory.SharedMemory)[source]

Multiprocessing proxy object. This performs actions like get or set of anything indirectly on a real object.

Use Case:

  • Indirectly performing actions on objects that are heavy or complex (not serializable) between processes.

Notes:

  • This must be created in process where the real object resides and then be used as proxy in another process.

  • Make sure to delete the proxy object after use, this frees the shared memory also.

Initialization

Initialize the proxy object.

Parameters:
  • proxy_server – The proxy server which wants to create this proxy object.

  • idx – Unique ID for the proxy object.

  • target_obj – This is the real/target object you want to proxy to.

  • shared_memory – The target shared memory for this proxy object.

_DATA_SIZE_PREFIX_LEN: int

4

This is the 4-byte length of the data size prefix when placing data in shared memory.

… rubric:: Example

20 [0, …]

Where:

  • 20: Is the data size prefix.

  • 0: This is the OpCode for GET.

  • ...: This is extra payload for the data.

__del__()[source]
__delattr__(key)[source]
__enter__()[source]
__exit__(exc_type, exc, exc_tb)[source]
__getattribute__(key)[source]
__getitem__(key)[source]
__repr__()[source]
__setattr__(key, value)[source]
__setitem__(key, value)[source]
__slots__

None

__str__

None

_callable_prefix

-’

_cls_attrs

None

These are attributes that belong soley on this proxy object but not the target object.

_proxy_prefix

-’

close()[source]

Closes the shared memory for the proxy object.

get_response(frame: duck.utils.multiprocessing.proxy.Frame, timeout: Optional[float] = None) Union[Any, duck.utils.multiprocessing.proxy.Proxy, Callable][source]

This makes a request to the proxy server and returns the correct response/result.

Important behaviour:

  • If the response is a Proxy descriptor (structured dict or legacy string), a new Proxy object is returned and its chain level is incremented.

  • If the response is a normal (real/serializable) value or a callable descriptor, this proxy’s chain level is reset to 0 (per user’s requirement).

get_shared_memory() multiprocessing.shared_memory.SharedMemory[source]

Gets the target shared memory for this proxy object.

read_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, timeout: Optional[float] = 0.5) duck.utils.multiprocessing.proxy.Frame[source]

Reads a frame from shared memory and returns the parsed Frame (or result of Frame.parse).

This waits until the full payload (as indicated by the 4-byte big-endian length prefix) is available in the shared memory or until the optional timeout expires.

Raises:
  • TimeoutError – if the full payload isn’t available within timeout.

  • DataTooBigError – if the indicated payload would exceed the configured buffer size.

  • DataDecodeError – for other decoding/parsing errors.

  • EmptyData – If data is empty or size is less than 1 byte.

write_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, frame: duck.utils.multiprocessing.proxy.Frame) int[source]

Write a frame to the shared memory.

The written layout is: [4-byte big-endian uint32 payload-length][payload bytes]

Returns:

Written payload size in bytes (does not include the 4-byte length prefix).

Return type:

int

Raises:

DataTooBigError – if the payload+prefix would not fit into the configured buffer.

exception duck.utils.multiprocessing.proxy.ProxyError[source]

Bases: Exception

Raised on multiprocessing proxy issues.

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception duck.utils.multiprocessing.proxy.ProxyObjectNotFound[source]

Bases: duck.utils.multiprocessing.proxy.ProxyError

Raised if target object linked with the proxy is not found.

Initialization

Initialize self. See help(type(self)) for accurate signature.

class duck.utils.multiprocessing.proxy.ProxyOpCode[source]

Bases: enum.IntEnum

Opcodes for proxy operations.

Initialization

Initialize self. See help(type(self)) for accurate signature.

EXECUTE

2

Execute a callable on the real object.

Format: [opcode, [target_object_id, attr, args, kwargs]]

Where:

  • opcode (int): The operation code for the request i.e. 2 in this case.

  • target_object_id (int): The target ID for the real object.

  • attr (str): The method name to execute.

  • args (tuple): The arguments to parse when executing (as a tuple).

  • kwargs (dict): Dictionary of positional keyword arguments.

EXECUTION_RESULT

None

This represent the result of an execution for the client.

Format: [opcode, [target_object_id, value, error]]

Where:

  • opcode (int): The operation code, i.e., -1 in this case.

  • target_object_id (int): The ID to the target object.

  • value (Any): Any serializable data (using msgpack).

  • error (str): Error if any has been encountered.

GET

0

Get an attribute from the real object.

Format: [opcode, [target_object_id, attr]]

Where:

  • opcode (int): The operation code.

  • target_object_id (int): The original object ID.

  • attr (str): The attribute to retrieve.

RESPONSE_PENDING

None

Represents that the server is still processing request and the response is not ready yet.

Format: [opcode, []]

SET

1

Set an attribute on the real object.

Format: [opcode, [target_object_id, attr, value]]

Where:

  • opcode (int): The operation code.

  • target_object_id (int): The original object ID.

  • attr (str): The attribute to alter.

  • value (Any): The value to set.

class duck.utils.multiprocessing.proxy.ProxyServer(bufsize: int)[source]

Server for handling proxy objects.

Initialization

_DATA_SIZE_PREFIX_LEN: int

4

This is the 4-byte length of the data size prefix when placing data in shared memory.

Example:

    20 [0, ...]

Where:

  • 20: Is the data size prefix.

  • 0: This is the OpCode for GET.

  • ...: This is extra payload for the data.

create_proxy(target_object: Any, shared_memory: Optional[multiprocessing.shared_memory.SharedMemory] = None) duck.utils.multiprocessing.proxy.Proxy[source]

Create a process-safe proxy object.

Parameters:
  • target_object – The target object to create the proxy for.

  • shared_memory – The shared memory to use for communication. None will create a new shared memory.

handle_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, frame: duck.utils.multiprocessing.proxy.Frame)[source]

Handles a frame from the shared memory.

handle_request_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, frame: duck.utils.multiprocessing.proxy.Frame)[source]

Handle a request frame from the client.

read_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, timeout: Optional[float] = 0.5) duck.utils.multiprocessing.proxy.Frame[source]

Reads a frame from shared memory and returns the parsed Frame (or result of Frame.parse).

This waits until the full payload (as indicated by the 4-byte big-endian length prefix) is available in the shared memory or until the optional timeout expires.

Raises:
  • TimeoutError – if the full payload isn’t available within timeout.

  • DataTooBigError – if the indicated payload would exceed the configured buffer size.

  • DataDecodeError – for other decoding/parsing errors.

  • EmptyData – If data is empty or size is less than 1 byte.

run(threaded: bool = True)[source]

Runs the current proxy server on current thread.

Parameters:

threaded – Whether to run the server in a thread. Defaults to True.

property running: bool

Returns boolean on whether if or if not the server is running.

write_frame(shared_memory: multiprocessing.shared_memory.SharedMemory, frame: duck.utils.multiprocessing.proxy.Frame) int[source]

Write a frame to the shared memory.

The written layout is: [4-byte big-endian uint32 payload-length][payload bytes]

Returns:

Written payload size in bytes (does not include the 4-byte length prefix).

Return type:

int

Raises:

DataTooBigError – if the payload+prefix would not fit into the configured buffer.

duck.utils.multiprocessing.proxy.get_callable_name(fn)[source]

Return a readable name for fn. Works for functions, bound methods, functools.partial, and callable objects.

duck.utils.multiprocessing.proxy.is_method_of(callable_obj, obj)[source]

True if callable_obj is a method of obj (bound to that object) or if the function underlying callable_obj is the attribute on obj.

Examples:

  • is_method_of(a.foo, a) -> True

  • is_method_of(A.foo, a) -> True (A.foo is the function, still considered ‘a’ method)

  • is_method_of(a.bar, a) -> False if bar is not on a