Skip to content

cortex.discovery

discovery

Discovery module for Cortex framework.

Classes

DiscoveryClient

Synchronous REQ client for the Cortex discovery daemon.

Built around a single zmq.REQ socket. Because REQ sockets get stuck in a bad state after a missed reply (they block further sends), this client transparently closes and recreates the socket on every timeout via :meth:_reconnect.

Used internally by :class:cortex.core.publisher.Publisher at registration time and by :class:cortex.core.subscriber.Subscriber for topic lookup. User code rarely needs to instantiate this class directly.

Example
client = DiscoveryClient()
info = client.wait_for_topic("/camera/image", timeout=5.0)
print(info.address if info else "not found")
Source code in src/cortex/discovery/client.py
class DiscoveryClient:
    """Synchronous REQ client for the Cortex discovery daemon.

    Built around a single ``zmq.REQ`` socket. Because REQ sockets get stuck
    in a bad state after a missed reply (they block further sends), this
    client transparently closes and recreates the socket on every timeout
    via :meth:`_reconnect`.

    Used internally by :class:`cortex.core.publisher.Publisher` at registration
    time and by :class:`cortex.core.subscriber.Subscriber` for topic lookup.
    User code rarely needs to instantiate this class directly.

    Example:
        ```python
        client = DiscoveryClient()
        info = client.wait_for_topic("/camera/image", timeout=5.0)
        print(info.address if info else "not found")
        ```
    """

    def __init__(
        self,
        discovery_address: str = DEFAULT_DISCOVERY_ADDRESS,
        timeout_ms: int = 5000,
        retries: int = 1,
    ):
        """
        Initialize the discovery client.

        Args:
            discovery_address: Address of the discovery daemon
            timeout_ms: Request timeout in milliseconds
            retries: Number of retries for failed requests
        """
        self.discovery_address = discovery_address
        self.timeout_ms = timeout_ms
        self.retries = retries

        self._context: zmq.Context = zmq.Context()
        self._socket: zmq.Socket | None = None

        # Connect immediately
        self._connect()

    def _connect(self) -> None:
        """Create and connect the socket."""
        self._socket = self._context.socket(zmq.REQ)
        self._socket.setsockopt(zmq.RCVTIMEO, self.timeout_ms)
        self._socket.setsockopt(zmq.SNDTIMEO, self.timeout_ms)
        self._socket.setsockopt(zmq.LINGER, 0)  # Immediate shutdown
        self._socket.connect(self.discovery_address)

    def _reconnect(self) -> None:
        """Reconnect by closing and recreating the socket.

        This is needed because REQ sockets get stuck in a bad state
        after a timeout (waiting for reply that will never come).
        """
        if self._socket:
            with contextlib.suppress(Exception):
                self._socket.close()
        self._connect()

    def _send_request(self, request: DiscoveryRequest) -> DiscoveryResponse:
        """Send a request and wait for response."""

        last_error: Exception | None = None

        for attempt in range(self.retries):
            try:
                self._socket.send(request.to_bytes())
                response_bytes = self._socket.recv()
                return DiscoveryResponse.from_bytes(response_bytes)
            except zmq.Again:
                # Timeout - need to reconnect because REQ socket is now stuck
                last_error = TimeoutError(
                    f"Discovery request timed out after {self.timeout_ms}ms"
                )
                logger.warning(f"Request timeout, attempt {attempt + 1}/{self.retries}")
                self._reconnect()
            except zmq.ZMQError as e:
                # ZMQ error - reconnect and re-raise
                last_error = e
                logger.warning(f"ZMQ error: {e}, attempt {attempt + 1}/{self.retries}")
                self._reconnect()

        raise last_error

    def register_topic(self, topic_info: TopicInfo) -> bool:
        """
        Register a topic with the discovery daemon.

        Args:
            topic_info: Information about the topic to register

        Returns:
            True if registration was successful
        """
        request = DiscoveryRequest(
            command=DiscoveryCommand.REGISTER_TOPIC, topic_info=topic_info
        )

        try:
            response = self._send_request(request)
            if response.status == DiscoveryStatus.OK:
                logger.info(f"Registered topic: {topic_info.name}")
                return True
            else:
                logger.error(f"Failed to register topic: {response.message}")
                return False
        except Exception as e:
            logger.error(f"Failed to register topic: {e}")
            return False

    def unregister_topic(self, topic_name: str) -> bool:
        """
        Unregister a topic from the discovery daemon.

        Args:
            topic_name: Name of the topic to unregister

        Returns:
            True if unregistration was successful
        """
        request = DiscoveryRequest(
            command=DiscoveryCommand.UNREGISTER_TOPIC, topic_name=topic_name
        )

        try:
            response = self._send_request(request)
            if response.status == DiscoveryStatus.OK:
                logger.info(f"Unregistered topic: {topic_name}")
                return True
            else:
                logger.warning(f"Failed to unregister topic: {response.message}")
                return False
        except Exception as e:
            logger.error(f"Failed to unregister topic: {e}")
            return False

    def lookup_topic(self, topic_name: str) -> TopicInfo | None:
        """
        Look up a topic by name.

        Args:
            topic_name: Name of the topic to look up

        Returns:
            TopicInfo if found, None otherwise
        """
        request = DiscoveryRequest(
            command=DiscoveryCommand.LOOKUP_TOPIC, topic_name=topic_name
        )

        try:
            response = self._send_request(request)
            if response.status == DiscoveryStatus.OK:
                return response.topic_info
            else:
                return None
        except TimeoutError:
            logger.error(
                f"Lookup timeout for topic: {topic_name}. Probably Discovery Daemon is not running."
            )
            return None
        except Exception as e:
            logger.error(f"Failed to lookup topic: {e}")
            return None

    def wait_for_topic(
        self,
        topic_name: str,
        timeout: float = 30.0,
        poll_interval: float = 0.5,
    ) -> TopicInfo | None:
        """
        Wait for a topic to become available (blocking).

        Args:
            topic_name: Name of the topic to wait for
            timeout: Maximum time to wait in seconds
            poll_interval: Time between lookup attempts in seconds

        Returns:
            TopicInfo if found within timeout, None otherwise
        """
        start_time = time.time()

        while time.time() - start_time < timeout:
            topic_info = self.lookup_topic(topic_name)
            if topic_info:
                return topic_info
            time.sleep(poll_interval)

        return None

    async def wait_for_topic_async(
        self,
        topic_name: str,
        timeout: float = 600.0,
        poll_interval: float = 0.5,
    ) -> TopicInfo | None:
        """
        Wait for a topic to become available (async, non-blocking).

        Uses asyncio.sleep to avoid blocking the event loop.

        Args:
            topic_name: Name of the topic to wait for
            timeout: Maximum time to wait in seconds
            poll_interval: Time between lookup attempts in seconds

        Returns:
            TopicInfo if found within timeout, None otherwise
        """
        start_time = time.perf_counter()

        while time.perf_counter() - start_time < timeout:
            topic_info = self.lookup_topic(topic_name)
            if topic_info:
                return topic_info
            await asyncio.sleep(poll_interval)

        return None

    def ping(self) -> bool:
        """Check whether the discovery daemon is reachable.

        Returns:
            True if the daemon responded with OK within the configured
            timeout/retries, False otherwise.
        """
        request = DiscoveryRequest(command=DiscoveryCommand.PING)
        try:
            response = self._send_request(request)
        except Exception as e:
            logger.debug(f"Ping failed: {e}")
            return False
        return response.status == DiscoveryStatus.OK

    def list_topics(self) -> list[TopicInfo]:
        """
        List all registered topics.

        Returns:
            List of TopicInfo for all registered topics
        """
        request = DiscoveryRequest(command=DiscoveryCommand.LIST_TOPICS)

        try:
            response = self._send_request(request)
            if response.status == DiscoveryStatus.OK:
                return response.topics or []
            else:
                logger.warning(f"Failed to list topics: {response.message}")
                return []
        except Exception as e:
            logger.error(f"Failed to list topics: {e}")
            return []

    def close(self) -> None:
        """Close the client connection."""
        if self._socket:
            with contextlib.suppress(Exception):
                self._socket.close()
            self._socket = None

        with contextlib.suppress(Exception):
            self._context.term()

    def __enter__(self) -> "DiscoveryClient":
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.close()
Functions
register_topic
register_topic(topic_info: TopicInfo) -> bool

Register a topic with the discovery daemon.

Parameters:

Name Type Description Default
topic_info TopicInfo

Information about the topic to register

required

Returns:

Type Description
bool

True if registration was successful

Source code in src/cortex/discovery/client.py
def register_topic(self, topic_info: TopicInfo) -> bool:
    """
    Register a topic with the discovery daemon.

    Args:
        topic_info: Information about the topic to register

    Returns:
        True if registration was successful
    """
    request = DiscoveryRequest(
        command=DiscoveryCommand.REGISTER_TOPIC, topic_info=topic_info
    )

    try:
        response = self._send_request(request)
        if response.status == DiscoveryStatus.OK:
            logger.info(f"Registered topic: {topic_info.name}")
            return True
        else:
            logger.error(f"Failed to register topic: {response.message}")
            return False
    except Exception as e:
        logger.error(f"Failed to register topic: {e}")
        return False
unregister_topic
unregister_topic(topic_name: str) -> bool

Unregister a topic from the discovery daemon.

Parameters:

Name Type Description Default
topic_name str

Name of the topic to unregister

required

Returns:

Type Description
bool

True if unregistration was successful

Source code in src/cortex/discovery/client.py
def unregister_topic(self, topic_name: str) -> bool:
    """
    Unregister a topic from the discovery daemon.

    Args:
        topic_name: Name of the topic to unregister

    Returns:
        True if unregistration was successful
    """
    request = DiscoveryRequest(
        command=DiscoveryCommand.UNREGISTER_TOPIC, topic_name=topic_name
    )

    try:
        response = self._send_request(request)
        if response.status == DiscoveryStatus.OK:
            logger.info(f"Unregistered topic: {topic_name}")
            return True
        else:
            logger.warning(f"Failed to unregister topic: {response.message}")
            return False
    except Exception as e:
        logger.error(f"Failed to unregister topic: {e}")
        return False
lookup_topic
lookup_topic(topic_name: str) -> TopicInfo | None

Look up a topic by name.

Parameters:

Name Type Description Default
topic_name str

Name of the topic to look up

required

Returns:

Type Description
TopicInfo | None

TopicInfo if found, None otherwise

Source code in src/cortex/discovery/client.py
def lookup_topic(self, topic_name: str) -> TopicInfo | None:
    """
    Look up a topic by name.

    Args:
        topic_name: Name of the topic to look up

    Returns:
        TopicInfo if found, None otherwise
    """
    request = DiscoveryRequest(
        command=DiscoveryCommand.LOOKUP_TOPIC, topic_name=topic_name
    )

    try:
        response = self._send_request(request)
        if response.status == DiscoveryStatus.OK:
            return response.topic_info
        else:
            return None
    except TimeoutError:
        logger.error(
            f"Lookup timeout for topic: {topic_name}. Probably Discovery Daemon is not running."
        )
        return None
    except Exception as e:
        logger.error(f"Failed to lookup topic: {e}")
        return None
wait_for_topic
wait_for_topic(
    topic_name: str,
    timeout: float = 30.0,
    poll_interval: float = 0.5,
) -> TopicInfo | None

Wait for a topic to become available (blocking).

Parameters:

Name Type Description Default
topic_name str

Name of the topic to wait for

required
timeout float

Maximum time to wait in seconds

30.0
poll_interval float

Time between lookup attempts in seconds

0.5

Returns:

Type Description
TopicInfo | None

TopicInfo if found within timeout, None otherwise

Source code in src/cortex/discovery/client.py
def wait_for_topic(
    self,
    topic_name: str,
    timeout: float = 30.0,
    poll_interval: float = 0.5,
) -> TopicInfo | None:
    """
    Wait for a topic to become available (blocking).

    Args:
        topic_name: Name of the topic to wait for
        timeout: Maximum time to wait in seconds
        poll_interval: Time between lookup attempts in seconds

    Returns:
        TopicInfo if found within timeout, None otherwise
    """
    start_time = time.time()

    while time.time() - start_time < timeout:
        topic_info = self.lookup_topic(topic_name)
        if topic_info:
            return topic_info
        time.sleep(poll_interval)

    return None
wait_for_topic_async async
wait_for_topic_async(
    topic_name: str,
    timeout: float = 600.0,
    poll_interval: float = 0.5,
) -> TopicInfo | None

Wait for a topic to become available (async, non-blocking).

Uses asyncio.sleep to avoid blocking the event loop.

Parameters:

Name Type Description Default
topic_name str

Name of the topic to wait for

required
timeout float

Maximum time to wait in seconds

600.0
poll_interval float

Time between lookup attempts in seconds

0.5

Returns:

Type Description
TopicInfo | None

TopicInfo if found within timeout, None otherwise

Source code in src/cortex/discovery/client.py
async def wait_for_topic_async(
    self,
    topic_name: str,
    timeout: float = 600.0,
    poll_interval: float = 0.5,
) -> TopicInfo | None:
    """
    Wait for a topic to become available (async, non-blocking).

    Uses asyncio.sleep to avoid blocking the event loop.

    Args:
        topic_name: Name of the topic to wait for
        timeout: Maximum time to wait in seconds
        poll_interval: Time between lookup attempts in seconds

    Returns:
        TopicInfo if found within timeout, None otherwise
    """
    start_time = time.perf_counter()

    while time.perf_counter() - start_time < timeout:
        topic_info = self.lookup_topic(topic_name)
        if topic_info:
            return topic_info
        await asyncio.sleep(poll_interval)

    return None
ping
ping() -> bool

Check whether the discovery daemon is reachable.

Returns:

Type Description
bool

True if the daemon responded with OK within the configured

bool

timeout/retries, False otherwise.

Source code in src/cortex/discovery/client.py
def ping(self) -> bool:
    """Check whether the discovery daemon is reachable.

    Returns:
        True if the daemon responded with OK within the configured
        timeout/retries, False otherwise.
    """
    request = DiscoveryRequest(command=DiscoveryCommand.PING)
    try:
        response = self._send_request(request)
    except Exception as e:
        logger.debug(f"Ping failed: {e}")
        return False
    return response.status == DiscoveryStatus.OK
list_topics
list_topics() -> list[TopicInfo]

List all registered topics.

Returns:

Type Description
list[TopicInfo]

List of TopicInfo for all registered topics

Source code in src/cortex/discovery/client.py
def list_topics(self) -> list[TopicInfo]:
    """
    List all registered topics.

    Returns:
        List of TopicInfo for all registered topics
    """
    request = DiscoveryRequest(command=DiscoveryCommand.LIST_TOPICS)

    try:
        response = self._send_request(request)
        if response.status == DiscoveryStatus.OK:
            return response.topics or []
        else:
            logger.warning(f"Failed to list topics: {response.message}")
            return []
    except Exception as e:
        logger.error(f"Failed to list topics: {e}")
        return []
close
close() -> None

Close the client connection.

Source code in src/cortex/discovery/client.py
def close(self) -> None:
    """Close the client connection."""
    if self._socket:
        with contextlib.suppress(Exception):
            self._socket.close()
        self._socket = None

    with contextlib.suppress(Exception):
        self._context.term()

DiscoveryDaemon

Long-lived REP service that maps topic names to ZMQ endpoints.

Publishers register their topic on startup; subscribers look up the endpoint and then connect directly. The daemon is not on the data path — it sees control traffic only.

Single-threaded by design. Requests are handled one at a time with a 1-second RCVTIMEO so the loop can observe _running for clean shutdown.

Source code in src/cortex/discovery/daemon.py
class DiscoveryDaemon:
    """Long-lived REP service that maps topic names to ZMQ endpoints.

    Publishers register their topic on startup; subscribers look up the
    endpoint and then connect directly. The daemon is **not** on the data
    path — it sees control traffic only.

    Single-threaded by design. Requests are handled one at a time with a
    1-second ``RCVTIMEO`` so the loop can observe ``_running`` for clean
    shutdown.
    """

    def __init__(
        self,
        address: str = DEFAULT_DISCOVERY_ADDRESS,
    ):
        """
        Initialize the discovery daemon.

        Args:
            address: ZMQ address to bind to (default: ipc:///tmp/cortex/discovery.sock)
        """
        self.address = address

        # Topic registry: topic_name -> TopicInfo
        self._topics: dict[str, TopicInfo] = {}

        # ZMQ context and socket
        self._context: zmq.Context | None = None
        self._socket: zmq.Socket | None = None

        # Control flag
        self._running = False

    def _ensure_ipc_path(self) -> None:
        """Ensure the IPC socket directory exists."""
        if self.address.startswith("ipc://"):
            path = self.address[6:]  # Remove "ipc://"
            dir_path = os.path.dirname(path)
            if dir_path and not os.path.exists(dir_path):
                os.makedirs(dir_path, exist_ok=True)
            # Remove stale socket file if it exists
            if os.path.exists(path):
                os.remove(path)

    def start(self) -> None:
        """Start the discovery daemon."""
        logger.info(f"Starting discovery daemon at {self.address}")

        self._ensure_ipc_path()

        self._context = zmq.Context()
        self._socket = self._context.socket(zmq.REP)
        self._socket.bind(self.address)

        #! We do not set a high water mark on the socket.
        #! It is 1000 by default, which is reasonable for our use case.

        # Set socket options for responsiveness
        self._socket.setsockopt(zmq.RCVTIMEO, 1000)  # 1 second timeout
        self._socket.setsockopt(zmq.LINGER, 0)  # Immediate shutdown

        self._running = True

        logger.info("=" * 50)
        logger.info("DISCOVERY DAEMON STARTED")
        logger.info("  Address: %s", self.address)
        logger.info("=" * 50)

        try:
            self._run_loop()
        except KeyboardInterrupt:
            logger.info("Received interrupt signal")
        finally:
            self._cleanup()

    def _run_loop(self) -> None:
        """Main event loop."""
        while self._running:
            try:
                # Try to receive a request (blocks up to RCVTIMEO)
                try:
                    request_bytes = self._socket.recv(copy=False)

                    # Process the request
                    response = self._handle_request(request_bytes)
                    self._socket.send(response.to_bytes())
                except zmq.Again:
                    # Timeout, check _running and continue
                    continue

            except Exception as e:
                if not self._running:
                    # We're shutting down, exit cleanly
                    break
                logger.error(f"Error in discovery loop: {e}")
                # Send error response if we received a request
                try:
                    error_response = DiscoveryResponse(
                        status=DiscoveryStatus.ERROR, message=str(e)
                    )
                    self._socket.send(error_response.to_bytes())
                except Exception as send_err:
                    logger.debug(f"Failed to send error response: {send_err}")

    def _handle_request(self, request_bytes: bytes) -> DiscoveryResponse:
        """Handle a discovery request."""
        try:
            request = DiscoveryRequest.from_bytes(request_bytes)
        except Exception as e:
            return DiscoveryResponse(
                status=DiscoveryStatus.ERROR, message=f"Failed to parse request: {e}"
            )

        if request.command == DiscoveryCommand.REGISTER_TOPIC:
            return self._handle_register(request)
        elif request.command == DiscoveryCommand.UNREGISTER_TOPIC:
            return self._handle_unregister(request)
        elif request.command == DiscoveryCommand.LOOKUP_TOPIC:
            return self._handle_lookup(request)
        elif request.command == DiscoveryCommand.LIST_TOPICS:
            return self._handle_list()
        elif request.command == DiscoveryCommand.PING:
            return self._handle_ping()
        elif request.command == DiscoveryCommand.SHUTDOWN:
            return self._handle_shutdown()
        else:
            return DiscoveryResponse(
                status=DiscoveryStatus.ERROR,
                message=f"Unknown command: {request.command}",
            )

    def _handle_register(self, request: DiscoveryRequest) -> DiscoveryResponse:
        """Handle topic registration."""
        if not request.topic_info:
            return DiscoveryResponse(
                status=DiscoveryStatus.ERROR,
                message="Missing topic_info in register request",
            )

        topic_name = request.topic_info.name

        if topic_name in self._topics:
            # Allow re-registration from same publisher
            existing = self._topics[topic_name]
            if existing.publisher_node != request.topic_info.publisher_node:
                return DiscoveryResponse(
                    status=DiscoveryStatus.ALREADY_EXISTS,
                    message=f"Topic {topic_name} already registered by {existing.publisher_node}",
                )

        self._topics[topic_name] = request.topic_info

        logger.info("-" * 50)
        logger.info("REGISTER topic: %s", topic_name)
        logger.info("  Address:     %s", request.topic_info.address)
        logger.info("  Publisher:   %s", request.topic_info.publisher_node)
        logger.info("  Type:        %s", request.topic_info.message_type)
        logger.info("  Fingerprint: %d", request.topic_info.fingerprint)

        return DiscoveryResponse(
            status=DiscoveryStatus.OK, message=f"Registered topic: {topic_name}"
        )

    def _handle_unregister(self, request: DiscoveryRequest) -> DiscoveryResponse:
        """Handle topic unregistration."""
        topic_name = request.topic_name or (
            request.topic_info.name if request.topic_info else None
        )

        if not topic_name:
            return DiscoveryResponse(
                status=DiscoveryStatus.ERROR,
                message="Missing topic name in unregister request",
            )

        if topic_name not in self._topics:
            return DiscoveryResponse(
                status=DiscoveryStatus.NOT_FOUND,
                message=f"Topic {topic_name} not found",
            )

        del self._topics[topic_name]

        logger.info("-" * 50)
        logger.info("UNREGISTER topic: %s", topic_name)

        return DiscoveryResponse(
            status=DiscoveryStatus.OK, message=f"Unregistered topic: {topic_name}"
        )

    def _handle_lookup(self, request: DiscoveryRequest) -> DiscoveryResponse:
        """Handle topic lookup."""
        topic_name = request.topic_name

        if not topic_name:
            return DiscoveryResponse(
                status=DiscoveryStatus.ERROR,
                message="Missing topic_name in lookup request",
            )

        topic_info = self._topics.get(topic_name)

        logger.info("-" * 50)
        if topic_info:
            logger.info("LOOKUP topic: %s -> FOUND", topic_name)
            return DiscoveryResponse(status=DiscoveryStatus.OK, topic_info=topic_info)
        else:
            logger.info("LOOKUP topic: %s -> NOT FOUND", topic_name)
            return DiscoveryResponse(
                status=DiscoveryStatus.NOT_FOUND,
                message=f"Topic {topic_name} not found",
            )

    def _handle_list(self) -> DiscoveryResponse:
        """Handle list all topics."""
        topics = list(self._topics.values())

        logger.info("-" * 50)
        logger.info("LIST topics: %d registered", len(topics))

        return DiscoveryResponse(status=DiscoveryStatus.OK, topics=topics)

    def _handle_ping(self) -> DiscoveryResponse:
        """Handle ping request — used by clients to verify daemon liveness."""
        return DiscoveryResponse(status=DiscoveryStatus.OK, message="pong")

    def _handle_shutdown(self) -> DiscoveryResponse:
        """Handle shutdown request."""
        self._running = False
        logger.info("-" * 50)
        logger.info("SHUTDOWN command received")
        return DiscoveryResponse(status=DiscoveryStatus.OK, message="Shutting down")

    def _cleanup(self) -> None:
        """Clean up resources."""
        if self._socket:
            try:
                self._socket.close()
            except Exception as e:
                logger.debug(f"Error closing socket: {e}")
            self._socket = None

        if self._context:
            try:
                self._context.term()
            except zmq.ZMQError as e:
                logger.debug(f"Error terminating context: {e}")
            self._context = None

        # Clean up IPC socket file
        if self.address.startswith("ipc://"):
            path = self.address[6:]
            if os.path.exists(path):
                with contextlib.suppress(Exception):
                    os.remove(path)

        logger.info("=" * 50)
        logger.info("DISCOVERY DAEMON STOPPED")
        logger.info("=" * 50)

    def stop(self) -> None:
        """Stop the discovery daemon."""
        logger.info("Stopping discovery daemon")
        self._running = False
Functions
start
start() -> None

Start the discovery daemon.

Source code in src/cortex/discovery/daemon.py
def start(self) -> None:
    """Start the discovery daemon."""
    logger.info(f"Starting discovery daemon at {self.address}")

    self._ensure_ipc_path()

    self._context = zmq.Context()
    self._socket = self._context.socket(zmq.REP)
    self._socket.bind(self.address)

    #! We do not set a high water mark on the socket.
    #! It is 1000 by default, which is reasonable for our use case.

    # Set socket options for responsiveness
    self._socket.setsockopt(zmq.RCVTIMEO, 1000)  # 1 second timeout
    self._socket.setsockopt(zmq.LINGER, 0)  # Immediate shutdown

    self._running = True

    logger.info("=" * 50)
    logger.info("DISCOVERY DAEMON STARTED")
    logger.info("  Address: %s", self.address)
    logger.info("=" * 50)

    try:
        self._run_loop()
    except KeyboardInterrupt:
        logger.info("Received interrupt signal")
    finally:
        self._cleanup()
stop
stop() -> None

Stop the discovery daemon.

Source code in src/cortex/discovery/daemon.py
def stop(self) -> None:
    """Stop the discovery daemon."""
    logger.info("Stopping discovery daemon")
    self._running = False

DiscoveryCommand

Bases: IntEnum

Commands for the discovery service.

Source code in src/cortex/discovery/protocol.py
class DiscoveryCommand(IntEnum):
    """Commands for the discovery service."""

    REGISTER_TOPIC = 1
    UNREGISTER_TOPIC = 2
    LOOKUP_TOPIC = 3
    LIST_TOPICS = 4
    PING = 5
    SHUTDOWN = 99

DiscoveryStatus

Bases: IntEnum

Status codes for discovery responses.

Source code in src/cortex/discovery/protocol.py
class DiscoveryStatus(IntEnum):
    """Status codes for discovery responses."""

    OK = 0
    NOT_FOUND = 1
    ALREADY_EXISTS = 2
    ERROR = 3

TopicInfo dataclass

Information about a registered topic.

Source code in src/cortex/discovery/protocol.py
@dataclass
class TopicInfo:
    """Information about a registered topic."""

    name: str  # Topic name (e.g., "/camera/image")
    address: str  # ZMQ IPC address (e.g., "ipc:///tmp/cortex/topics/camera_image")
    message_type: str  # Message type name
    fingerprint: int  # 64-bit message fingerprint
    publisher_node: str  # Name of the publishing node

    def to_bytes(self) -> bytes:
        """Serialize topic info to bytes."""
        data = {
            "name": self.name,
            "address": self.address,
            "message_type": self.message_type,
            "fingerprint": self.fingerprint,
            "publisher_node": self.publisher_node,
        }
        return msgpack.packb(data, use_bin_type=True)

    @classmethod
    def from_bytes(cls, data: bytes) -> "TopicInfo":
        """Deserialize topic info from bytes."""
        d = msgpack.unpackb(data, raw=False)
        return cls(**d)
Functions
to_bytes
to_bytes() -> bytes

Serialize topic info to bytes.

Source code in src/cortex/discovery/protocol.py
def to_bytes(self) -> bytes:
    """Serialize topic info to bytes."""
    data = {
        "name": self.name,
        "address": self.address,
        "message_type": self.message_type,
        "fingerprint": self.fingerprint,
        "publisher_node": self.publisher_node,
    }
    return msgpack.packb(data, use_bin_type=True)
from_bytes classmethod
from_bytes(data: bytes) -> TopicInfo

Deserialize topic info from bytes.

Source code in src/cortex/discovery/protocol.py
@classmethod
def from_bytes(cls, data: bytes) -> "TopicInfo":
    """Deserialize topic info from bytes."""
    d = msgpack.unpackb(data, raw=False)
    return cls(**d)