Skip to content

cortex.messages

messages

Messages module for Cortex framework.

Classes

Message dataclass

Base class for all Cortex messages.

Subclasses should be decorated with @dataclass and define their fields. The message will automatically compute its fingerprint based on the class name and field structure.

Example

@dataclass class PointCloud(Message): points: np.ndarray colors: np.ndarray intensity: float = 1.0

Source code in src/cortex/messages/base.py
@dataclass
class Message:
    """
    Base class for all Cortex messages.

    Subclasses should be decorated with @dataclass and define their
    fields. The message will automatically compute its fingerprint
    based on the class name and field structure.

    Example:
        @dataclass
        class PointCloud(Message):
            points: np.ndarray
            colors: np.ndarray
            intensity: float = 1.0
    """

    # Class-level sequence counter, kept as a fallback for callers that
    # serialize a message directly via ``to_bytes``/``to_frames`` without
    # going through ``Publisher``. Real per-publisher gap detection lives
    # on :class:`cortex.core.publisher.Publisher` (one counter per topic).
    _sequence_counter: ClassVar[int] = 0
    _field_names_cache: ClassVar[tuple[str, ...] | None] = None

    def __init_subclass__(cls, **kwargs):
        """Automatically register subclasses."""
        super().__init_subclass__(**kwargs)
        # Only register concrete classes (not abstract ones)
        if not getattr(cls, "__abstractmethods__", None):
            MessageType.register(cls)

    @classmethod
    def fingerprint(cls) -> int:
        """Get the 64-bit fingerprint for this message type."""
        return get_cached_fingerprint(cls)

    @classmethod
    def _next_sequence(cls) -> int:
        """Get the next sequence number on the class-level counter.

        Used as a fallback when no explicit ``sequence`` is supplied to
        :meth:`to_bytes` / :meth:`to_frames`.
        """
        seq = cls._sequence_counter
        cls._sequence_counter += 1
        return seq

    @classmethod
    def _field_names(cls) -> tuple[str, ...]:
        """Get cached dataclass field names in declaration order."""
        cached = cls.__dict__.get("_field_names_cache")
        if cached is None:
            cached = tuple(field.name for field in fields(cls))
            cls._field_names_cache = cached
        return cached

    def _field_values(self) -> list[object]:
        """Get field values in schema order."""
        return [getattr(self, name) for name in self._field_names()]

    @classmethod
    def _build_instance(cls: type[T], values: list[object]) -> T:
        """Create a message instance from ordered field values."""
        field_names = cls._field_names()
        if len(values) != len(field_names):
            raise ValueError(
                f"Expected {len(field_names)} fields for {cls.__name__}, got {len(values)}"
            )
        return cls(**dict(zip(field_names, values, strict=True)))

    def _build_header(self, sequence: int | None = None) -> MessageHeader:
        """Create a message header for the current instance.

        Args:
            sequence: Explicit sequence number (typically supplied by the
                owning :class:`Publisher`). When ``None``, falls back to
                the class-level counter so direct ``to_bytes`` / ``to_frames``
                calls keep working in tests and ad-hoc serialization.
        """
        return MessageHeader(
            fingerprint=self.fingerprint(),
            timestamp_ns=time.time_ns(),
            sequence=self._next_sequence() if sequence is None else sequence,
        )

    def to_bytes(self, sequence: int | None = None) -> bytes:
        """
        Serialize the message to bytes.

        Format:
        - 24 bytes: header (fingerprint, timestamp, sequence)
        - remaining: serialized field data
        """
        header_bytes = self._build_header(sequence).to_bytes()
        data_bytes = serialize_message_values(self._field_values())
        return header_bytes + data_bytes

    def to_frames(self, sequence: int | None = None) -> list[object]:
        """Serialize the message into transport frames.

        The first frame is always the fixed-size header. The second frame holds
        packed metadata, and any remaining frames are raw out-of-band buffers.
        """
        return [
            self._build_header(sequence).to_bytes(),
            *serialize_message_frames(self._field_values()),
        ]

    @classmethod
    def from_bytes(cls: type[T], data: bytes) -> tuple[T, MessageHeader]:
        """
        Deserialize a message from bytes.

        Returns:
            Tuple of (message instance, header)
        """
        header = MessageHeader.from_bytes(data)
        values = deserialize_message_values(data[MessageHeader.size() :])
        return cls._build_instance(values), header

    @classmethod
    def from_frames(cls: type[T], frames: list[object]) -> tuple[T, MessageHeader]:
        """Deserialize a message from transport frames."""
        if len(frames) < 2:
            raise ValueError("Message frame payload must include header and metadata")

        header = MessageHeader.from_bytes(_frame_to_bytes_like(frames[0]))
        values = deserialize_message_frames(frames[1:])
        return cls._build_instance(values), header

    @staticmethod
    def decode(data: bytes) -> tuple["Message", MessageHeader]:
        """
        Decode a message without knowing its type in advance.

        Uses the fingerprint in the header to look up the message type.

        Returns:
            Tuple of (message instance, header)

        Raises:
            ValueError: If the message type is not registered
        """
        header = MessageHeader.from_bytes(data)
        message_class = MessageType.get(header.fingerprint)

        if message_class is None:
            raise ValueError(
                f"Unknown message type with fingerprint: {header.fingerprint:#018x}"
            )

        return message_class.from_bytes(data)
Functions
fingerprint classmethod
fingerprint() -> int

Get the 64-bit fingerprint for this message type.

Source code in src/cortex/messages/base.py
@classmethod
def fingerprint(cls) -> int:
    """Get the 64-bit fingerprint for this message type."""
    return get_cached_fingerprint(cls)
to_bytes
to_bytes(sequence: int | None = None) -> bytes

Serialize the message to bytes.

Format: - 24 bytes: header (fingerprint, timestamp, sequence) - remaining: serialized field data

Source code in src/cortex/messages/base.py
def to_bytes(self, sequence: int | None = None) -> bytes:
    """
    Serialize the message to bytes.

    Format:
    - 24 bytes: header (fingerprint, timestamp, sequence)
    - remaining: serialized field data
    """
    header_bytes = self._build_header(sequence).to_bytes()
    data_bytes = serialize_message_values(self._field_values())
    return header_bytes + data_bytes
to_frames
to_frames(sequence: int | None = None) -> list[object]

Serialize the message into transport frames.

The first frame is always the fixed-size header. The second frame holds packed metadata, and any remaining frames are raw out-of-band buffers.

Source code in src/cortex/messages/base.py
def to_frames(self, sequence: int | None = None) -> list[object]:
    """Serialize the message into transport frames.

    The first frame is always the fixed-size header. The second frame holds
    packed metadata, and any remaining frames are raw out-of-band buffers.
    """
    return [
        self._build_header(sequence).to_bytes(),
        *serialize_message_frames(self._field_values()),
    ]
from_bytes classmethod
from_bytes(data: bytes) -> tuple[T, MessageHeader]

Deserialize a message from bytes.

Returns:

Type Description
tuple[T, MessageHeader]

Tuple of (message instance, header)

Source code in src/cortex/messages/base.py
@classmethod
def from_bytes(cls: type[T], data: bytes) -> tuple[T, MessageHeader]:
    """
    Deserialize a message from bytes.

    Returns:
        Tuple of (message instance, header)
    """
    header = MessageHeader.from_bytes(data)
    values = deserialize_message_values(data[MessageHeader.size() :])
    return cls._build_instance(values), header
from_frames classmethod
from_frames(
    frames: list[object],
) -> tuple[T, MessageHeader]

Deserialize a message from transport frames.

Source code in src/cortex/messages/base.py
@classmethod
def from_frames(cls: type[T], frames: list[object]) -> tuple[T, MessageHeader]:
    """Deserialize a message from transport frames."""
    if len(frames) < 2:
        raise ValueError("Message frame payload must include header and metadata")

    header = MessageHeader.from_bytes(_frame_to_bytes_like(frames[0]))
    values = deserialize_message_frames(frames[1:])
    return cls._build_instance(values), header
decode staticmethod
decode(data: bytes) -> tuple[Message, MessageHeader]

Decode a message without knowing its type in advance.

Uses the fingerprint in the header to look up the message type.

Returns:

Type Description
tuple[Message, MessageHeader]

Tuple of (message instance, header)

Raises:

Type Description
ValueError

If the message type is not registered

Source code in src/cortex/messages/base.py
@staticmethod
def decode(data: bytes) -> tuple["Message", MessageHeader]:
    """
    Decode a message without knowing its type in advance.

    Uses the fingerprint in the header to look up the message type.

    Returns:
        Tuple of (message instance, header)

    Raises:
        ValueError: If the message type is not registered
    """
    header = MessageHeader.from_bytes(data)
    message_class = MessageType.get(header.fingerprint)

    if message_class is None:
        raise ValueError(
            f"Unknown message type with fingerprint: {header.fingerprint:#018x}"
        )

    return message_class.from_bytes(data)

MessageType

Registry mapping 64-bit fingerprints to Message subclasses.

Populated automatically via :meth:Message.__init_subclass__. Used by :meth:Message.decode to dispatch an incoming byte stream to the right concrete class based on the fingerprint in its header.

Example

from cortex.messages.standard import ArrayMessage MessageType.get(ArrayMessage.fingerprint()) is ArrayMessage True

Source code in src/cortex/messages/base.py
class MessageType:
    """Registry mapping 64-bit fingerprints to ``Message`` subclasses.

    Populated automatically via :meth:`Message.__init_subclass__`. Used by
    :meth:`Message.decode` to dispatch an incoming byte stream to the right
    concrete class based on the fingerprint in its header.

    Example:
        >>> from cortex.messages.standard import ArrayMessage
        >>> MessageType.get(ArrayMessage.fingerprint()) is ArrayMessage
        True
    """

    _registry: ClassVar[dict[int, type["Message"]]] = {}

    @classmethod
    def register(cls, message_class: type["Message"]) -> type["Message"]:
        """Register a message class by its fingerprint."""
        fingerprint = get_cached_fingerprint(message_class)
        cls._registry[fingerprint] = message_class
        return message_class

    @classmethod
    def get(cls, fingerprint: int) -> type["Message"] | None:
        """Get a message class by fingerprint."""
        return cls._registry.get(fingerprint)

    @classmethod
    def get_all(cls) -> dict[int, type["Message"]]:
        """Get all registered message types."""
        return cls._registry.copy()

    @classmethod
    def clear(cls) -> None:
        """Clear the registry (useful for testing)."""
        cls._registry.clear()
Functions
register classmethod
register(message_class: type[Message]) -> type[Message]

Register a message class by its fingerprint.

Source code in src/cortex/messages/base.py
@classmethod
def register(cls, message_class: type["Message"]) -> type["Message"]:
    """Register a message class by its fingerprint."""
    fingerprint = get_cached_fingerprint(message_class)
    cls._registry[fingerprint] = message_class
    return message_class
get classmethod
get(fingerprint: int) -> type[Message] | None

Get a message class by fingerprint.

Source code in src/cortex/messages/base.py
@classmethod
def get(cls, fingerprint: int) -> type["Message"] | None:
    """Get a message class by fingerprint."""
    return cls._registry.get(fingerprint)
get_all classmethod
get_all() -> dict[int, type[Message]]

Get all registered message types.

Source code in src/cortex/messages/base.py
@classmethod
def get_all(cls) -> dict[int, type["Message"]]:
    """Get all registered message types."""
    return cls._registry.copy()
clear classmethod
clear() -> None

Clear the registry (useful for testing).

Source code in src/cortex/messages/base.py
@classmethod
def clear(cls) -> None:
    """Clear the registry (useful for testing)."""
    cls._registry.clear()

ArrayMessage dataclass

Bases: Message

NumPy array message.

Efficiently serializes numpy arrays of any dtype and shape.

Source code in src/cortex/messages/standard.py
@dataclass
class ArrayMessage(Message):
    """
    NumPy array message.

    Efficiently serializes numpy arrays of any dtype and shape.
    """

    data: np.ndarray

    # Optional metadata
    name: str = ""
    frame_id: str = ""

DictMessage dataclass

Bases: Message

Dictionary message supporting nested structures.

Values can be primitives, numpy arrays, torch tensors, or nested dicts/lists.

Source code in src/cortex/messages/standard.py
@dataclass
class DictMessage(Message):
    """
    Dictionary message supporting nested structures.

    Values can be primitives, numpy arrays, torch tensors, or nested dicts/lists.
    """

    data: dict[str, Any]

FloatMessage dataclass

Bases: Message

Simple float message.

Source code in src/cortex/messages/standard.py
@dataclass
class FloatMessage(Message):
    """Simple float message."""

    data: float

IntMessage dataclass

Bases: Message

Simple integer message.

Source code in src/cortex/messages/standard.py
@dataclass
class IntMessage(Message):
    """Simple integer message."""

    data: int

StringMessage dataclass

Bases: Message

Simple string message.

Source code in src/cortex/messages/standard.py
@dataclass
class StringMessage(Message):
    """Simple string message."""

    data: str

TensorMessage dataclass

Bases: Message

PyTorch tensor message.

Preserves tensor device and requires_grad attributes. Note: Tensors are moved to CPU for serialization.

Source code in src/cortex/messages/standard.py
@dataclass
class TensorMessage(Message):
    """
    PyTorch tensor message.

    Preserves tensor device and requires_grad attributes.
    Note: Tensors are moved to CPU for serialization.
    """

    data: Any  # torch.Tensor, but using Any to avoid import issues

    # Optional metadata
    name: str = ""

    def __post_init__(self):
        """Validate that data is a torch tensor if torch is available."""
        if TORCH_AVAILABLE and not isinstance(self.data, torch.Tensor):
            raise TypeError(f"Expected torch.Tensor, got {type(self.data)}")