Skip to content

cortex.messages.base

base

Base message classes for Cortex.

Classes

MessageType

Registry mapping 64-bit fingerprints to Message subclasses.

Populated automatically via :meth:Message.__init_subclass__. Used by :meth:Message.decode to dispatch an incoming byte stream to the right concrete class based on the fingerprint in its header.

Example

from cortex.messages.standard import ArrayMessage MessageType.get(ArrayMessage.fingerprint()) is ArrayMessage True

Source code in src/cortex/messages/base.py
class MessageType:
    """Registry mapping 64-bit fingerprints to ``Message`` subclasses.

    Populated automatically via :meth:`Message.__init_subclass__`. Used by
    :meth:`Message.decode` to dispatch an incoming byte stream to the right
    concrete class based on the fingerprint in its header.

    Example:
        >>> from cortex.messages.standard import ArrayMessage
        >>> MessageType.get(ArrayMessage.fingerprint()) is ArrayMessage
        True
    """

    _registry: ClassVar[dict[int, type["Message"]]] = {}

    @classmethod
    def register(cls, message_class: type["Message"]) -> type["Message"]:
        """Register a message class by its fingerprint."""
        fingerprint = get_cached_fingerprint(message_class)
        cls._registry[fingerprint] = message_class
        return message_class

    @classmethod
    def get(cls, fingerprint: int) -> type["Message"] | None:
        """Get a message class by fingerprint."""
        return cls._registry.get(fingerprint)

    @classmethod
    def get_all(cls) -> dict[int, type["Message"]]:
        """Get all registered message types."""
        return cls._registry.copy()

    @classmethod
    def clear(cls) -> None:
        """Clear the registry (useful for testing)."""
        cls._registry.clear()
Functions
register classmethod
register(message_class: type[Message]) -> type[Message]

Register a message class by its fingerprint.

Source code in src/cortex/messages/base.py
@classmethod
def register(cls, message_class: type["Message"]) -> type["Message"]:
    """Register a message class by its fingerprint."""
    fingerprint = get_cached_fingerprint(message_class)
    cls._registry[fingerprint] = message_class
    return message_class
get classmethod
get(fingerprint: int) -> type[Message] | None

Get a message class by fingerprint.

Source code in src/cortex/messages/base.py
@classmethod
def get(cls, fingerprint: int) -> type["Message"] | None:
    """Get a message class by fingerprint."""
    return cls._registry.get(fingerprint)
get_all classmethod
get_all() -> dict[int, type[Message]]

Get all registered message types.

Source code in src/cortex/messages/base.py
@classmethod
def get_all(cls) -> dict[int, type["Message"]]:
    """Get all registered message types."""
    return cls._registry.copy()
clear classmethod
clear() -> None

Clear the registry (useful for testing).

Source code in src/cortex/messages/base.py
@classmethod
def clear(cls) -> None:
    """Clear the registry (useful for testing)."""
    cls._registry.clear()

MessageHeader dataclass

Fixed-size 24-byte header prepended to every Cortex message.

Layout (big-endian): fingerprint u64 | timestamp_ns u64 | sequence u64.

Attributes:

Name Type Description
fingerprint int

64-bit type identifier. See :func:cortex.utils.hashing.compute_fingerprint.

timestamp_ns int

Wall-clock nanoseconds at publish time (time.time_ns()).

sequence int

Per-process, per-message-type monotonic counter.

Source code in src/cortex/messages/base.py
@dataclass
class MessageHeader:
    """Fixed-size 24-byte header prepended to every Cortex message.

    Layout (big-endian): ``fingerprint u64 | timestamp_ns u64 | sequence u64``.

    Attributes:
        fingerprint: 64-bit type identifier. See :func:`cortex.utils.hashing.compute_fingerprint`.
        timestamp_ns: Wall-clock nanoseconds at publish time (``time.time_ns()``).
        sequence: Per-process, per-message-type monotonic counter.
    """

    fingerprint: int
    timestamp_ns: int
    sequence: int

    def to_bytes(self) -> bytes:
        """Serialize header to bytes (24 bytes fixed size)."""
        return struct.pack(">QQQ", self.fingerprint, self.timestamp_ns, self.sequence)

    @classmethod
    def from_bytes(cls, data: bytes) -> "MessageHeader":
        """Deserialize header from bytes."""
        fingerprint, timestamp_ns, sequence = struct.unpack(">QQQ", data[:24])
        return cls(
            fingerprint=fingerprint, timestamp_ns=timestamp_ns, sequence=sequence
        )

    @classmethod
    def size(cls) -> int:
        """Return the fixed header size in bytes."""
        return 24
Functions
to_bytes
to_bytes() -> bytes

Serialize header to bytes (24 bytes fixed size).

Source code in src/cortex/messages/base.py
def to_bytes(self) -> bytes:
    """Serialize header to bytes (24 bytes fixed size)."""
    return struct.pack(">QQQ", self.fingerprint, self.timestamp_ns, self.sequence)
from_bytes classmethod
from_bytes(data: bytes) -> MessageHeader

Deserialize header from bytes.

Source code in src/cortex/messages/base.py
@classmethod
def from_bytes(cls, data: bytes) -> "MessageHeader":
    """Deserialize header from bytes."""
    fingerprint, timestamp_ns, sequence = struct.unpack(">QQQ", data[:24])
    return cls(
        fingerprint=fingerprint, timestamp_ns=timestamp_ns, sequence=sequence
    )
size classmethod
size() -> int

Return the fixed header size in bytes.

Source code in src/cortex/messages/base.py
@classmethod
def size(cls) -> int:
    """Return the fixed header size in bytes."""
    return 24

Message dataclass

Base class for all Cortex messages.

Subclasses should be decorated with @dataclass and define their fields. The message will automatically compute its fingerprint based on the class name and field structure.

Example

@dataclass class PointCloud(Message): points: np.ndarray colors: np.ndarray intensity: float = 1.0

Source code in src/cortex/messages/base.py
@dataclass
class Message:
    """
    Base class for all Cortex messages.

    Subclasses should be decorated with @dataclass and define their
    fields. The message will automatically compute its fingerprint
    based on the class name and field structure.

    Example:
        @dataclass
        class PointCloud(Message):
            points: np.ndarray
            colors: np.ndarray
            intensity: float = 1.0
    """

    # Class-level sequence counter, kept as a fallback for callers that
    # serialize a message directly via ``to_bytes``/``to_frames`` without
    # going through ``Publisher``. Real per-publisher gap detection lives
    # on :class:`cortex.core.publisher.Publisher` (one counter per topic).
    _sequence_counter: ClassVar[int] = 0
    _field_names_cache: ClassVar[tuple[str, ...] | None] = None

    def __init_subclass__(cls, **kwargs):
        """Automatically register subclasses."""
        super().__init_subclass__(**kwargs)
        # Only register concrete classes (not abstract ones)
        if not getattr(cls, "__abstractmethods__", None):
            MessageType.register(cls)

    @classmethod
    def fingerprint(cls) -> int:
        """Get the 64-bit fingerprint for this message type."""
        return get_cached_fingerprint(cls)

    @classmethod
    def _next_sequence(cls) -> int:
        """Get the next sequence number on the class-level counter.

        Used as a fallback when no explicit ``sequence`` is supplied to
        :meth:`to_bytes` / :meth:`to_frames`.
        """
        seq = cls._sequence_counter
        cls._sequence_counter += 1
        return seq

    @classmethod
    def _field_names(cls) -> tuple[str, ...]:
        """Get cached dataclass field names in declaration order."""
        cached = cls.__dict__.get("_field_names_cache")
        if cached is None:
            cached = tuple(field.name for field in fields(cls))
            cls._field_names_cache = cached
        return cached

    def _field_values(self) -> list[object]:
        """Get field values in schema order."""
        return [getattr(self, name) for name in self._field_names()]

    @classmethod
    def _build_instance(cls: type[T], values: list[object]) -> T:
        """Create a message instance from ordered field values."""
        field_names = cls._field_names()
        if len(values) != len(field_names):
            raise ValueError(
                f"Expected {len(field_names)} fields for {cls.__name__}, got {len(values)}"
            )
        return cls(**dict(zip(field_names, values, strict=True)))

    def _build_header(self, sequence: int | None = None) -> MessageHeader:
        """Create a message header for the current instance.

        Args:
            sequence: Explicit sequence number (typically supplied by the
                owning :class:`Publisher`). When ``None``, falls back to
                the class-level counter so direct ``to_bytes`` / ``to_frames``
                calls keep working in tests and ad-hoc serialization.
        """
        return MessageHeader(
            fingerprint=self.fingerprint(),
            timestamp_ns=time.time_ns(),
            sequence=self._next_sequence() if sequence is None else sequence,
        )

    def to_bytes(self, sequence: int | None = None) -> bytes:
        """
        Serialize the message to bytes.

        Format:
        - 24 bytes: header (fingerprint, timestamp, sequence)
        - remaining: serialized field data
        """
        header_bytes = self._build_header(sequence).to_bytes()
        data_bytes = serialize_message_values(self._field_values())
        return header_bytes + data_bytes

    def to_frames(self, sequence: int | None = None) -> list[object]:
        """Serialize the message into transport frames.

        The first frame is always the fixed-size header. The second frame holds
        packed metadata, and any remaining frames are raw out-of-band buffers.
        """
        return [
            self._build_header(sequence).to_bytes(),
            *serialize_message_frames(self._field_values()),
        ]

    @classmethod
    def from_bytes(cls: type[T], data: bytes) -> tuple[T, MessageHeader]:
        """
        Deserialize a message from bytes.

        Returns:
            Tuple of (message instance, header)
        """
        header = MessageHeader.from_bytes(data)
        values = deserialize_message_values(data[MessageHeader.size() :])
        return cls._build_instance(values), header

    @classmethod
    def from_frames(cls: type[T], frames: list[object]) -> tuple[T, MessageHeader]:
        """Deserialize a message from transport frames."""
        if len(frames) < 2:
            raise ValueError("Message frame payload must include header and metadata")

        header = MessageHeader.from_bytes(_frame_to_bytes_like(frames[0]))
        values = deserialize_message_frames(frames[1:])
        return cls._build_instance(values), header

    @staticmethod
    def decode(data: bytes) -> tuple["Message", MessageHeader]:
        """
        Decode a message without knowing its type in advance.

        Uses the fingerprint in the header to look up the message type.

        Returns:
            Tuple of (message instance, header)

        Raises:
            ValueError: If the message type is not registered
        """
        header = MessageHeader.from_bytes(data)
        message_class = MessageType.get(header.fingerprint)

        if message_class is None:
            raise ValueError(
                f"Unknown message type with fingerprint: {header.fingerprint:#018x}"
            )

        return message_class.from_bytes(data)
Functions
fingerprint classmethod
fingerprint() -> int

Get the 64-bit fingerprint for this message type.

Source code in src/cortex/messages/base.py
@classmethod
def fingerprint(cls) -> int:
    """Get the 64-bit fingerprint for this message type."""
    return get_cached_fingerprint(cls)
to_bytes
to_bytes(sequence: int | None = None) -> bytes

Serialize the message to bytes.

Format: - 24 bytes: header (fingerprint, timestamp, sequence) - remaining: serialized field data

Source code in src/cortex/messages/base.py
def to_bytes(self, sequence: int | None = None) -> bytes:
    """
    Serialize the message to bytes.

    Format:
    - 24 bytes: header (fingerprint, timestamp, sequence)
    - remaining: serialized field data
    """
    header_bytes = self._build_header(sequence).to_bytes()
    data_bytes = serialize_message_values(self._field_values())
    return header_bytes + data_bytes
to_frames
to_frames(sequence: int | None = None) -> list[object]

Serialize the message into transport frames.

The first frame is always the fixed-size header. The second frame holds packed metadata, and any remaining frames are raw out-of-band buffers.

Source code in src/cortex/messages/base.py
def to_frames(self, sequence: int | None = None) -> list[object]:
    """Serialize the message into transport frames.

    The first frame is always the fixed-size header. The second frame holds
    packed metadata, and any remaining frames are raw out-of-band buffers.
    """
    return [
        self._build_header(sequence).to_bytes(),
        *serialize_message_frames(self._field_values()),
    ]
from_bytes classmethod
from_bytes(data: bytes) -> tuple[T, MessageHeader]

Deserialize a message from bytes.

Returns:

Type Description
tuple[T, MessageHeader]

Tuple of (message instance, header)

Source code in src/cortex/messages/base.py
@classmethod
def from_bytes(cls: type[T], data: bytes) -> tuple[T, MessageHeader]:
    """
    Deserialize a message from bytes.

    Returns:
        Tuple of (message instance, header)
    """
    header = MessageHeader.from_bytes(data)
    values = deserialize_message_values(data[MessageHeader.size() :])
    return cls._build_instance(values), header
from_frames classmethod
from_frames(
    frames: list[object],
) -> tuple[T, MessageHeader]

Deserialize a message from transport frames.

Source code in src/cortex/messages/base.py
@classmethod
def from_frames(cls: type[T], frames: list[object]) -> tuple[T, MessageHeader]:
    """Deserialize a message from transport frames."""
    if len(frames) < 2:
        raise ValueError("Message frame payload must include header and metadata")

    header = MessageHeader.from_bytes(_frame_to_bytes_like(frames[0]))
    values = deserialize_message_frames(frames[1:])
    return cls._build_instance(values), header
decode staticmethod
decode(data: bytes) -> tuple[Message, MessageHeader]

Decode a message without knowing its type in advance.

Uses the fingerprint in the header to look up the message type.

Returns:

Type Description
tuple[Message, MessageHeader]

Tuple of (message instance, header)

Raises:

Type Description
ValueError

If the message type is not registered

Source code in src/cortex/messages/base.py
@staticmethod
def decode(data: bytes) -> tuple["Message", MessageHeader]:
    """
    Decode a message without knowing its type in advance.

    Uses the fingerprint in the header to look up the message type.

    Returns:
        Tuple of (message instance, header)

    Raises:
        ValueError: If the message type is not registered
    """
    header = MessageHeader.from_bytes(data)
    message_class = MessageType.get(header.fingerprint)

    if message_class is None:
        raise ValueError(
            f"Unknown message type with fingerprint: {header.fingerprint:#018x}"
        )

    return message_class.from_bytes(data)

Functions