Skip to content

cortex.discovery.protocol

protocol

Discovery protocol definitions for Cortex.

Defines the request/response messages for the discovery service.

Classes

DiscoveryCommand

Bases: IntEnum

Commands for the discovery service.

Source code in src/cortex/discovery/protocol.py
class DiscoveryCommand(IntEnum):
    """Commands for the discovery service."""

    REGISTER_TOPIC = 1
    UNREGISTER_TOPIC = 2
    LOOKUP_TOPIC = 3
    LIST_TOPICS = 4
    PING = 5
    SHUTDOWN = 99

DiscoveryStatus

Bases: IntEnum

Status codes for discovery responses.

Source code in src/cortex/discovery/protocol.py
class DiscoveryStatus(IntEnum):
    """Status codes for discovery responses."""

    OK = 0
    NOT_FOUND = 1
    ALREADY_EXISTS = 2
    ERROR = 3

TopicInfo dataclass

Information about a registered topic.

Source code in src/cortex/discovery/protocol.py
@dataclass
class TopicInfo:
    """Information about a registered topic."""

    name: str  # Topic name (e.g., "/camera/image")
    address: str  # ZMQ IPC address (e.g., "ipc:///tmp/cortex/topics/camera_image")
    message_type: str  # Message type name
    fingerprint: int  # 64-bit message fingerprint
    publisher_node: str  # Name of the publishing node

    def to_bytes(self) -> bytes:
        """Serialize topic info to bytes."""
        data = {
            "name": self.name,
            "address": self.address,
            "message_type": self.message_type,
            "fingerprint": self.fingerprint,
            "publisher_node": self.publisher_node,
        }
        return msgpack.packb(data, use_bin_type=True)

    @classmethod
    def from_bytes(cls, data: bytes) -> "TopicInfo":
        """Deserialize topic info from bytes."""
        d = msgpack.unpackb(data, raw=False)
        return cls(**d)
Functions
to_bytes
to_bytes() -> bytes

Serialize topic info to bytes.

Source code in src/cortex/discovery/protocol.py
def to_bytes(self) -> bytes:
    """Serialize topic info to bytes."""
    data = {
        "name": self.name,
        "address": self.address,
        "message_type": self.message_type,
        "fingerprint": self.fingerprint,
        "publisher_node": self.publisher_node,
    }
    return msgpack.packb(data, use_bin_type=True)
from_bytes classmethod
from_bytes(data: bytes) -> TopicInfo

Deserialize topic info from bytes.

Source code in src/cortex/discovery/protocol.py
@classmethod
def from_bytes(cls, data: bytes) -> "TopicInfo":
    """Deserialize topic info from bytes."""
    d = msgpack.unpackb(data, raw=False)
    return cls(**d)

DiscoveryRequest dataclass

Request message for the discovery service.

Source code in src/cortex/discovery/protocol.py
@dataclass
class DiscoveryRequest:
    """Request message for the discovery service."""

    command: DiscoveryCommand
    topic_info: TopicInfo | None = None  # For REGISTER/UNREGISTER
    topic_name: str | None = None  # For LOOKUP

    def to_bytes(self) -> bytes:
        """Serialize request to bytes."""
        data = {
            "command": int(self.command),
            "topic_name": self.topic_name,
        }
        if self.topic_info:
            data["topic_info"] = self.topic_info.to_bytes()
        return msgpack.packb(data, use_bin_type=True)

    @classmethod
    def from_bytes(cls, data: bytes) -> "DiscoveryRequest":
        """Deserialize request from bytes."""
        d = msgpack.unpackb(data, raw=False)
        topic_info = None
        if "topic_info" in d and d["topic_info"]:
            topic_info = TopicInfo.from_bytes(d["topic_info"])
        return cls(
            command=DiscoveryCommand(d["command"]),
            topic_info=topic_info,
            topic_name=d.get("topic_name"),
        )
Functions
to_bytes
to_bytes() -> bytes

Serialize request to bytes.

Source code in src/cortex/discovery/protocol.py
def to_bytes(self) -> bytes:
    """Serialize request to bytes."""
    data = {
        "command": int(self.command),
        "topic_name": self.topic_name,
    }
    if self.topic_info:
        data["topic_info"] = self.topic_info.to_bytes()
    return msgpack.packb(data, use_bin_type=True)
from_bytes classmethod
from_bytes(data: bytes) -> DiscoveryRequest

Deserialize request from bytes.

Source code in src/cortex/discovery/protocol.py
@classmethod
def from_bytes(cls, data: bytes) -> "DiscoveryRequest":
    """Deserialize request from bytes."""
    d = msgpack.unpackb(data, raw=False)
    topic_info = None
    if "topic_info" in d and d["topic_info"]:
        topic_info = TopicInfo.from_bytes(d["topic_info"])
    return cls(
        command=DiscoveryCommand(d["command"]),
        topic_info=topic_info,
        topic_name=d.get("topic_name"),
    )

DiscoveryResponse dataclass

Response message from the discovery service.

Source code in src/cortex/discovery/protocol.py
@dataclass
class DiscoveryResponse:
    """Response message from the discovery service."""

    status: DiscoveryStatus
    message: str = ""
    topic_info: TopicInfo | None = None  # For LOOKUP
    topics: list[TopicInfo] | None = None  # For LIST_TOPICS

    def to_bytes(self) -> bytes:
        """Serialize response to bytes."""
        data = {
            "status": int(self.status),
            "message": self.message,
        }
        if self.topic_info:
            data["topic_info"] = self.topic_info.to_bytes()
        if self.topics:
            data["topics"] = [t.to_bytes() for t in self.topics]
        return msgpack.packb(data, use_bin_type=True)

    @classmethod
    def from_bytes(cls, data: bytes) -> "DiscoveryResponse":
        """Deserialize response from bytes."""
        d = msgpack.unpackb(data, raw=False)
        topic_info = None
        topics = None
        if "topic_info" in d and d["topic_info"]:
            topic_info = TopicInfo.from_bytes(d["topic_info"])
        if "topics" in d and d["topics"]:
            topics = [TopicInfo.from_bytes(t) for t in d["topics"]]
        return cls(
            status=DiscoveryStatus(d["status"]),
            message=d.get("message", ""),
            topic_info=topic_info,
            topics=topics,
        )
Functions
to_bytes
to_bytes() -> bytes

Serialize response to bytes.

Source code in src/cortex/discovery/protocol.py
def to_bytes(self) -> bytes:
    """Serialize response to bytes."""
    data = {
        "status": int(self.status),
        "message": self.message,
    }
    if self.topic_info:
        data["topic_info"] = self.topic_info.to_bytes()
    if self.topics:
        data["topics"] = [t.to_bytes() for t in self.topics]
    return msgpack.packb(data, use_bin_type=True)
from_bytes classmethod
from_bytes(data: bytes) -> DiscoveryResponse

Deserialize response from bytes.

Source code in src/cortex/discovery/protocol.py
@classmethod
def from_bytes(cls, data: bytes) -> "DiscoveryResponse":
    """Deserialize response from bytes."""
    d = msgpack.unpackb(data, raw=False)
    topic_info = None
    topics = None
    if "topic_info" in d and d["topic_info"]:
        topic_info = TopicInfo.from_bytes(d["topic_info"])
    if "topics" in d and d["topics"]:
        topics = [TopicInfo.from_bytes(t) for t in d["topics"]]
    return cls(
        status=DiscoveryStatus(d["status"]),
        message=d.get("message", ""),
        topic_info=topic_info,
        topics=topics,
    )