Skip to content

Custom messages

A message is any @dataclass that inherits from Message. Registration, fingerprinting, and (de)serialization are derived from the dataclass definition.

Define

messages.py
from dataclasses import dataclass
import numpy as np
from cortex.messages.base import Message


@dataclass
class RobotState(Message):
    timestamp: float
    position: np.ndarray      # shape (3,)
    velocity: np.ndarray      # shape (3,)
    joint_angles: np.ndarray  # shape (N,)
    is_moving: bool
    frame_id: str = "base_link"

Shared module

Put message definitions in a module both the publisher and subscriber import. The fingerprint is module.qualname + field names/types — declaring the same class in two different modules yields two different fingerprints.

Publish

publisher.py
import numpy as np
import cortex
from cortex import Node
from messages import RobotState


class StateBroadcaster(Node):
    def __init__(self):
        super().__init__("robot")
        self.pub = self.create_publisher("/robot/state", RobotState)
        self.create_timer(1 / 100, self.tick)  # 100 Hz
        self._t0 = 0.0

    async def tick(self):
        self._t0 += 0.01
        self.pub.publish(RobotState(
            timestamp=self._t0,
            position=np.array([self._t0, 0.0, 0.5], dtype="f4"),
            velocity=np.array([1.0, 0.0, 0.0], dtype="f4"),
            joint_angles=np.zeros(7, dtype="f4"),
            is_moving=True,
        ))


if __name__ == "__main__":
    cortex.run(StateBroadcaster().run())

Subscribe

subscriber.py
import cortex
from cortex import Node
from cortex.messages.base import MessageHeader
from messages import RobotState   # same import, same fingerprint


async def on_state(msg: RobotState, header: MessageHeader):
    if header.sequence % 100 == 0:
        print(f"t={msg.timestamp:.3f} pos={msg.position}")


class Monitor(Node):
    def __init__(self):
        super().__init__("monitor")
        self.create_subscriber("/robot/state", RobotState, callback=on_state)


if __name__ == "__main__":
    cortex.run(Monitor().run())

Dataclass → wire

flowchart LR
    DC[dataclass fields] --> FP[fingerprint]
    DC --> ORD[declaration order]
    ORD --> Enc[serialize_message_frames<br/>values in order]
    Enc --> Meta[metadata frame]
    Enc --> Bufs[array frames]
    FP --> Hdr[24-byte header]
    Hdr --> Wire[(multipart send)]
    Meta --> Wire
    Bufs --> Wire

See Message wire format for the full encoding.

Supported field types

Field type Notes
int / float / bool / str msgpack primitives
bytes msgpack bin
list[...] / tuple[...] walked recursively
dict[str, Any] walked recursively; arrays inside still go OOB
np.ndarray OOB frame; zero-copy decode
torch.Tensor OOB frame; CPU-transported, device restored on decode
Nested Message not supported — flatten instead

What changes the fingerprint

Any of these makes old and new publishers/subscribers incompatible:

  • Renaming the class, its module, or any field
  • Adding a field (even with a default)
  • Removing a field
  • Changing a field's annotation text

Safe to change:

  • Reordering methods, adding methods
  • Editing docstrings or defaults
  • Changing unrelated classes in the same module

See also