Skip to content

cortex.utils.serialization

serialization

Serialization utilities for Cortex messages.

Classes

DataType

Bases: IntEnum

Type identifiers for serialized data.

Source code in src/cortex/utils/serialization.py
class DataType(IntEnum):
    """Type identifiers for serialized data."""

    NONE = 0
    PRIMITIVE = 1  # int, float, str, bool, None
    NUMPY = 2
    TORCH = 3
    DICT = 4
    LIST = 5
    BYTES = 6

Functions

serialize_numpy

serialize_numpy(arr: ndarray) -> bytes

Serialize a NumPy array to bytes.

Format: - 1 byte: number of dimensions - 4 bytes per dim: shape - variable: dtype string length (2 bytes) + dtype string - remaining: raw array data

Source code in src/cortex/utils/serialization.py
def serialize_numpy(arr: np.ndarray) -> bytes:
    """
    Serialize a NumPy array to bytes.

    Format:
    - 1 byte: number of dimensions
    - 4 bytes per dim: shape
    - variable: dtype string length (2 bytes) + dtype string
    - remaining: raw array data
    """
    contiguous = np.ascontiguousarray(arr)
    ndim = contiguous.ndim
    dtype_str = contiguous.dtype.str.encode("utf-8")

    header_size = 1 + (4 * ndim) + 2 + len(dtype_str)
    header = bytearray(header_size)
    struct.pack_into(f">B{ndim}I", header, 0, ndim, *contiguous.shape)
    offset = 1 + (4 * ndim)
    struct.pack_into(">H", header, offset, len(dtype_str))
    offset += 2
    header[offset:] = dtype_str

    return bytes(header) + contiguous.tobytes(order="C")

deserialize_numpy

deserialize_numpy(
    data: bytes | memoryview, *, copy: bool = False
) -> tuple[np.ndarray, int]

Deserialize bytes to a NumPy array.

Returns:

Type Description
tuple[ndarray, int]

Tuple of (array, bytes_consumed)

Source code in src/cortex/utils/serialization.py
def deserialize_numpy(
    data: bytes | memoryview, *, copy: bool = False
) -> tuple[np.ndarray, int]:
    """
    Deserialize bytes to a NumPy array.

    Returns:
        Tuple of (array, bytes_consumed)
    """
    offset = 0
    view = _as_buffer_view(data)

    # Read ndim
    ndim = struct.unpack(">B", view[offset : offset + 1])[0]
    offset += 1

    # Read shape
    shape = struct.unpack(f">{ndim}I", view[offset : offset + 4 * ndim])
    offset += 4 * ndim

    # Read dtype
    dtype_len = struct.unpack(">H", view[offset : offset + 2])[0]
    offset += 2
    dtype_str = bytes(view[offset : offset + dtype_len]).decode("utf-8")
    offset += dtype_len

    # Calculate data size and read
    dtype = np.dtype(dtype_str)
    size = int(np.prod(shape)) * dtype.itemsize
    arr_data = view[offset : offset + size]
    offset += size

    arr = np.frombuffer(arr_data, dtype=dtype).reshape(shape)
    if copy:
        arr = arr.copy()
    return arr, offset

serialize_torch

serialize_torch(tensor: Any) -> bytes

Serialize a PyTorch tensor to bytes.

Converts to NumPy for serialization, preserving device and requires_grad info.

Source code in src/cortex/utils/serialization.py
def serialize_torch(tensor: Any) -> bytes:
    """
    Serialize a PyTorch tensor to bytes.

    Converts to NumPy for serialization, preserving device and requires_grad info.
    """
    if not TORCH_AVAILABLE:
        raise RuntimeError("PyTorch is not available")

    # Store metadata
    device_str = str(tensor.device).encode("utf-8")
    requires_grad = tensor.requires_grad

    # Convert to numpy (move to CPU if needed)
    arr = tensor.detach().cpu().numpy()

    # Pack metadata
    meta = struct.pack(">?H", requires_grad, len(device_str)) + device_str

    # Serialize the numpy array
    arr_bytes = serialize_numpy(arr)

    return meta + arr_bytes

deserialize_torch

deserialize_torch(data: bytes) -> tuple[Any, int]

Deserialize bytes to a PyTorch tensor.

Returns:

Type Description
tuple[Any, int]

Tuple of (tensor, bytes_consumed)

Source code in src/cortex/utils/serialization.py
def deserialize_torch(data: bytes) -> tuple[Any, int]:
    """
    Deserialize bytes to a PyTorch tensor.

    Returns:
        Tuple of (tensor, bytes_consumed)
    """
    if not TORCH_AVAILABLE:
        raise RuntimeError("PyTorch is not available")

    offset = 0

    # Read metadata
    requires_grad = struct.unpack(">?", data[offset : offset + 1])[0]
    offset += 1
    device_len = struct.unpack(">H", data[offset : offset + 2])[0]
    offset += 2
    device_str = data[offset : offset + device_len].decode("utf-8")
    offset += device_len

    # Deserialize numpy array
    arr, arr_bytes = deserialize_numpy(data[offset:], copy=True)
    offset += arr_bytes

    # Convert to tensor
    tensor = torch.from_numpy(arr)

    # Restore device (only if available)
    if device_str.startswith("cuda") and torch.cuda.is_available():
        tensor = tensor.to(device_str)

    if requires_grad:
        tensor.requires_grad_(True)

    return tensor, offset

serialize

serialize(value: Any) -> bytes

Serialize any supported value to bytes.

Supported types: - None, int, float, str, bool - bytes - list, dict - numpy.ndarray - torch.Tensor

Source code in src/cortex/utils/serialization.py
def serialize(value: Any) -> bytes:
    """
    Serialize any supported value to bytes.

    Supported types:
    - None, int, float, str, bool
    - bytes
    - list, dict
    - numpy.ndarray
    - torch.Tensor
    """
    if value is None:
        return struct.pack(">B", DataType.NONE)

    if isinstance(value, np.ndarray):
        return struct.pack(">B", DataType.NUMPY) + serialize_numpy(value)

    if TORCH_AVAILABLE and isinstance(value, torch.Tensor):
        return struct.pack(">B", DataType.TORCH) + serialize_torch(value)

    if isinstance(value, bytes):
        return struct.pack(">BI", DataType.BYTES, len(value)) + value

    if isinstance(value, dict):
        packed = msgpack.packb(value, default=_msgpack_default, use_bin_type=True)
        return struct.pack(">BI", DataType.DICT, len(packed)) + packed

    if isinstance(value, (list, tuple)):
        packed = msgpack.packb(value, default=_msgpack_default, use_bin_type=True)
        return struct.pack(">BI", DataType.LIST, len(packed)) + packed

    packed = msgpack.packb(value, use_bin_type=True)
    return struct.pack(">BI", DataType.PRIMITIVE, len(packed)) + packed

deserialize

deserialize(data: bytes) -> tuple[Any, int]

Deserialize bytes to a value.

Returns:

Type Description
tuple[Any, int]

Tuple of (value, bytes_consumed)

Source code in src/cortex/utils/serialization.py
def deserialize(data: bytes) -> tuple[Any, int]:
    """
    Deserialize bytes to a value.

    Returns:
        Tuple of (value, bytes_consumed)
    """
    offset = 0
    view = _as_buffer_view(data)
    data_type = DataType(struct.unpack(">B", view[offset : offset + 1])[0])
    offset += 1

    if data_type == DataType.NONE:
        return None, offset

    if data_type == DataType.NUMPY:
        arr, arr_bytes = deserialize_numpy(view[offset:])
        return arr, offset + arr_bytes

    if data_type == DataType.TORCH:
        tensor, tensor_bytes = deserialize_torch(bytes(view[offset:]))
        return tensor, offset + tensor_bytes

    if data_type == DataType.BYTES:
        length = struct.unpack(">I", view[offset : offset + 4])[0]
        offset += 4
        return bytes(view[offset : offset + length]), offset + length

    if data_type in (DataType.DICT, DataType.LIST, DataType.PRIMITIVE):
        length = struct.unpack(">I", view[offset : offset + 4])[0]
        offset += 4
        payload = view[offset : offset + length]
        value = msgpack.unpackb(payload, raw=False, ext_hook=_msgpack_ext_hook)
        return value, offset + length

    raise ValueError(f"Unknown data type: {data_type}")

serialize_message_data

serialize_message_data(fields: dict[str, Any]) -> bytes

Serialize message fields to bytes.

Format: - 2 bytes: number of fields - For each field: - 2 bytes: key length - key bytes - 4 bytes: value length - value bytes

Source code in src/cortex/utils/serialization.py
def serialize_message_data(fields: dict[str, Any]) -> bytes:
    """
    Serialize message fields to bytes.

    Format:
    - 2 bytes: number of fields
    - For each field:
        - 2 bytes: key length
        - key bytes
        - 4 bytes: value length
        - value bytes
    """
    return serialize(fields)

deserialize_message_data

deserialize_message_data(data: bytes) -> dict[str, Any]

Deserialize bytes to message fields.

Source code in src/cortex/utils/serialization.py
def deserialize_message_data(data: bytes) -> dict[str, Any]:
    """Deserialize bytes to message fields."""
    fields, _ = deserialize(data)
    return fields

serialize_message_values

serialize_message_values(
    values: list[Any] | tuple[Any, ...],
) -> bytes

Serialize ordered message field values.

Source code in src/cortex/utils/serialization.py
def serialize_message_values(values: list[Any] | tuple[Any, ...]) -> bytes:
    """Serialize ordered message field values."""
    return serialize(list(values))

deserialize_message_values

deserialize_message_values(
    data: bytes | memoryview,
) -> list[Any]

Deserialize ordered message field values.

Source code in src/cortex/utils/serialization.py
def deserialize_message_values(data: bytes | memoryview) -> list[Any]:
    """Deserialize ordered message field values."""
    values, _ = deserialize(_as_buffer_view(data))
    return values

serialize_message_frames

serialize_message_frames(
    values: list[Any] | tuple[Any, ...],
) -> list[Any]

Serialize message values into metadata plus out-of-band buffer frames.

Source code in src/cortex/utils/serialization.py
def serialize_message_frames(values: list[Any] | tuple[Any, ...]) -> list[Any]:
    """Serialize message values into metadata plus out-of-band buffer frames."""
    buffers: list[Any] = []
    encoded_values = [_encode_transport_value(value, buffers) for value in values]
    metadata = msgpack.packb(encoded_values, use_bin_type=True)
    return [metadata, *buffers]

deserialize_message_frames

deserialize_message_frames(frames: list[Any]) -> list[Any]

Deserialize metadata plus out-of-band buffer frames into message values.

Source code in src/cortex/utils/serialization.py
def deserialize_message_frames(frames: list[Any]) -> list[Any]:
    """Deserialize metadata plus out-of-band buffer frames into message values."""
    if not frames:
        return []

    encoded_values = msgpack.unpackb(_as_buffer_view(frames[0]), raw=False)
    return [_decode_transport_value(value, frames[1:]) for value in encoded_values]