Skip to content

cortex.discovery.client

client

Discovery client for Cortex.

Provides a client interface to interact with the discovery daemon. Uses synchronous ZMQ since discovery is typically done once at startup.

Classes

DiscoveryClient

Synchronous REQ client for the Cortex discovery daemon.

Built around a single zmq.REQ socket. Because REQ sockets get stuck in a bad state after a missed reply (they block further sends), this client transparently closes and recreates the socket on every timeout via :meth:_reconnect.

Used internally by :class:cortex.core.publisher.Publisher at registration time and by :class:cortex.core.subscriber.Subscriber for topic lookup. User code rarely needs to instantiate this class directly.

Example
client = DiscoveryClient()
info = client.wait_for_topic("/camera/image", timeout=5.0)
print(info.address if info else "not found")
Source code in src/cortex/discovery/client.py
class DiscoveryClient:
    """Synchronous REQ client for the Cortex discovery daemon.

    Built around a single ``zmq.REQ`` socket. Because REQ sockets get stuck
    in a bad state after a missed reply (they block further sends), this
    client transparently closes and recreates the socket on every timeout
    via :meth:`_reconnect`.

    Used internally by :class:`cortex.core.publisher.Publisher` at registration
    time and by :class:`cortex.core.subscriber.Subscriber` for topic lookup.
    User code rarely needs to instantiate this class directly.

    Example:
        ```python
        client = DiscoveryClient()
        info = client.wait_for_topic("/camera/image", timeout=5.0)
        print(info.address if info else "not found")
        ```
    """

    def __init__(
        self,
        discovery_address: str = DEFAULT_DISCOVERY_ADDRESS,
        timeout_ms: int = 5000,
        retries: int = 1,
    ):
        """
        Initialize the discovery client.

        Args:
            discovery_address: Address of the discovery daemon
            timeout_ms: Request timeout in milliseconds
            retries: Number of retries for failed requests
        """
        self.discovery_address = discovery_address
        self.timeout_ms = timeout_ms
        self.retries = retries

        self._context: zmq.Context = zmq.Context()
        self._socket: zmq.Socket | None = None

        # Connect immediately
        self._connect()

    def _connect(self) -> None:
        """Create and connect the socket."""
        self._socket = self._context.socket(zmq.REQ)
        self._socket.setsockopt(zmq.RCVTIMEO, self.timeout_ms)
        self._socket.setsockopt(zmq.SNDTIMEO, self.timeout_ms)
        self._socket.setsockopt(zmq.LINGER, 0)  # Immediate shutdown
        self._socket.connect(self.discovery_address)

    def _reconnect(self) -> None:
        """Reconnect by closing and recreating the socket.

        This is needed because REQ sockets get stuck in a bad state
        after a timeout (waiting for reply that will never come).
        """
        if self._socket:
            with contextlib.suppress(Exception):
                self._socket.close()
        self._connect()

    def _send_request(self, request: DiscoveryRequest) -> DiscoveryResponse:
        """Send a request and wait for response."""

        last_error: Exception | None = None

        for attempt in range(self.retries):
            try:
                self._socket.send(request.to_bytes())
                response_bytes = self._socket.recv()
                return DiscoveryResponse.from_bytes(response_bytes)
            except zmq.Again:
                # Timeout - need to reconnect because REQ socket is now stuck
                last_error = TimeoutError(
                    f"Discovery request timed out after {self.timeout_ms}ms"
                )
                logger.warning(f"Request timeout, attempt {attempt + 1}/{self.retries}")
                self._reconnect()
            except zmq.ZMQError as e:
                # ZMQ error - reconnect and re-raise
                last_error = e
                logger.warning(f"ZMQ error: {e}, attempt {attempt + 1}/{self.retries}")
                self._reconnect()

        raise last_error

    def register_topic(self, topic_info: TopicInfo) -> bool:
        """
        Register a topic with the discovery daemon.

        Args:
            topic_info: Information about the topic to register

        Returns:
            True if registration was successful
        """
        request = DiscoveryRequest(
            command=DiscoveryCommand.REGISTER_TOPIC, topic_info=topic_info
        )

        try:
            response = self._send_request(request)
            if response.status == DiscoveryStatus.OK:
                logger.info(f"Registered topic: {topic_info.name}")
                return True
            else:
                logger.error(f"Failed to register topic: {response.message}")
                return False
        except Exception as e:
            logger.error(f"Failed to register topic: {e}")
            return False

    def unregister_topic(self, topic_name: str) -> bool:
        """
        Unregister a topic from the discovery daemon.

        Args:
            topic_name: Name of the topic to unregister

        Returns:
            True if unregistration was successful
        """
        request = DiscoveryRequest(
            command=DiscoveryCommand.UNREGISTER_TOPIC, topic_name=topic_name
        )

        try:
            response = self._send_request(request)
            if response.status == DiscoveryStatus.OK:
                logger.info(f"Unregistered topic: {topic_name}")
                return True
            else:
                logger.warning(f"Failed to unregister topic: {response.message}")
                return False
        except Exception as e:
            logger.error(f"Failed to unregister topic: {e}")
            return False

    def lookup_topic(self, topic_name: str) -> TopicInfo | None:
        """
        Look up a topic by name.

        Args:
            topic_name: Name of the topic to look up

        Returns:
            TopicInfo if found, None otherwise
        """
        request = DiscoveryRequest(
            command=DiscoveryCommand.LOOKUP_TOPIC, topic_name=topic_name
        )

        try:
            response = self._send_request(request)
            if response.status == DiscoveryStatus.OK:
                return response.topic_info
            else:
                return None
        except TimeoutError:
            logger.error(
                f"Lookup timeout for topic: {topic_name}. Probably Discovery Daemon is not running."
            )
            return None
        except Exception as e:
            logger.error(f"Failed to lookup topic: {e}")
            return None

    def wait_for_topic(
        self,
        topic_name: str,
        timeout: float = 30.0,
        poll_interval: float = 0.5,
    ) -> TopicInfo | None:
        """
        Wait for a topic to become available (blocking).

        Args:
            topic_name: Name of the topic to wait for
            timeout: Maximum time to wait in seconds
            poll_interval: Time between lookup attempts in seconds

        Returns:
            TopicInfo if found within timeout, None otherwise
        """
        start_time = time.time()

        while time.time() - start_time < timeout:
            topic_info = self.lookup_topic(topic_name)
            if topic_info:
                return topic_info
            time.sleep(poll_interval)

        return None

    async def wait_for_topic_async(
        self,
        topic_name: str,
        timeout: float = 600.0,
        poll_interval: float = 0.5,
    ) -> TopicInfo | None:
        """
        Wait for a topic to become available (async, non-blocking).

        Uses asyncio.sleep to avoid blocking the event loop.

        Args:
            topic_name: Name of the topic to wait for
            timeout: Maximum time to wait in seconds
            poll_interval: Time between lookup attempts in seconds

        Returns:
            TopicInfo if found within timeout, None otherwise
        """
        start_time = time.perf_counter()

        while time.perf_counter() - start_time < timeout:
            topic_info = self.lookup_topic(topic_name)
            if topic_info:
                return topic_info
            await asyncio.sleep(poll_interval)

        return None

    def ping(self) -> bool:
        """Check whether the discovery daemon is reachable.

        Returns:
            True if the daemon responded with OK within the configured
            timeout/retries, False otherwise.
        """
        request = DiscoveryRequest(command=DiscoveryCommand.PING)
        try:
            response = self._send_request(request)
        except Exception as e:
            logger.debug(f"Ping failed: {e}")
            return False
        return response.status == DiscoveryStatus.OK

    def list_topics(self) -> list[TopicInfo]:
        """
        List all registered topics.

        Returns:
            List of TopicInfo for all registered topics
        """
        request = DiscoveryRequest(command=DiscoveryCommand.LIST_TOPICS)

        try:
            response = self._send_request(request)
            if response.status == DiscoveryStatus.OK:
                return response.topics or []
            else:
                logger.warning(f"Failed to list topics: {response.message}")
                return []
        except Exception as e:
            logger.error(f"Failed to list topics: {e}")
            return []

    def close(self) -> None:
        """Close the client connection."""
        if self._socket:
            with contextlib.suppress(Exception):
                self._socket.close()
            self._socket = None

        with contextlib.suppress(Exception):
            self._context.term()

    def __enter__(self) -> "DiscoveryClient":
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.close()
Functions
register_topic
register_topic(topic_info: TopicInfo) -> bool

Register a topic with the discovery daemon.

Parameters:

Name Type Description Default
topic_info TopicInfo

Information about the topic to register

required

Returns:

Type Description
bool

True if registration was successful

Source code in src/cortex/discovery/client.py
def register_topic(self, topic_info: TopicInfo) -> bool:
    """
    Register a topic with the discovery daemon.

    Args:
        topic_info: Information about the topic to register

    Returns:
        True if registration was successful
    """
    request = DiscoveryRequest(
        command=DiscoveryCommand.REGISTER_TOPIC, topic_info=topic_info
    )

    try:
        response = self._send_request(request)
        if response.status == DiscoveryStatus.OK:
            logger.info(f"Registered topic: {topic_info.name}")
            return True
        else:
            logger.error(f"Failed to register topic: {response.message}")
            return False
    except Exception as e:
        logger.error(f"Failed to register topic: {e}")
        return False
unregister_topic
unregister_topic(topic_name: str) -> bool

Unregister a topic from the discovery daemon.

Parameters:

Name Type Description Default
topic_name str

Name of the topic to unregister

required

Returns:

Type Description
bool

True if unregistration was successful

Source code in src/cortex/discovery/client.py
def unregister_topic(self, topic_name: str) -> bool:
    """
    Unregister a topic from the discovery daemon.

    Args:
        topic_name: Name of the topic to unregister

    Returns:
        True if unregistration was successful
    """
    request = DiscoveryRequest(
        command=DiscoveryCommand.UNREGISTER_TOPIC, topic_name=topic_name
    )

    try:
        response = self._send_request(request)
        if response.status == DiscoveryStatus.OK:
            logger.info(f"Unregistered topic: {topic_name}")
            return True
        else:
            logger.warning(f"Failed to unregister topic: {response.message}")
            return False
    except Exception as e:
        logger.error(f"Failed to unregister topic: {e}")
        return False
lookup_topic
lookup_topic(topic_name: str) -> TopicInfo | None

Look up a topic by name.

Parameters:

Name Type Description Default
topic_name str

Name of the topic to look up

required

Returns:

Type Description
TopicInfo | None

TopicInfo if found, None otherwise

Source code in src/cortex/discovery/client.py
def lookup_topic(self, topic_name: str) -> TopicInfo | None:
    """
    Look up a topic by name.

    Args:
        topic_name: Name of the topic to look up

    Returns:
        TopicInfo if found, None otherwise
    """
    request = DiscoveryRequest(
        command=DiscoveryCommand.LOOKUP_TOPIC, topic_name=topic_name
    )

    try:
        response = self._send_request(request)
        if response.status == DiscoveryStatus.OK:
            return response.topic_info
        else:
            return None
    except TimeoutError:
        logger.error(
            f"Lookup timeout for topic: {topic_name}. Probably Discovery Daemon is not running."
        )
        return None
    except Exception as e:
        logger.error(f"Failed to lookup topic: {e}")
        return None
wait_for_topic
wait_for_topic(
    topic_name: str,
    timeout: float = 30.0,
    poll_interval: float = 0.5,
) -> TopicInfo | None

Wait for a topic to become available (blocking).

Parameters:

Name Type Description Default
topic_name str

Name of the topic to wait for

required
timeout float

Maximum time to wait in seconds

30.0
poll_interval float

Time between lookup attempts in seconds

0.5

Returns:

Type Description
TopicInfo | None

TopicInfo if found within timeout, None otherwise

Source code in src/cortex/discovery/client.py
def wait_for_topic(
    self,
    topic_name: str,
    timeout: float = 30.0,
    poll_interval: float = 0.5,
) -> TopicInfo | None:
    """
    Wait for a topic to become available (blocking).

    Args:
        topic_name: Name of the topic to wait for
        timeout: Maximum time to wait in seconds
        poll_interval: Time between lookup attempts in seconds

    Returns:
        TopicInfo if found within timeout, None otherwise
    """
    start_time = time.time()

    while time.time() - start_time < timeout:
        topic_info = self.lookup_topic(topic_name)
        if topic_info:
            return topic_info
        time.sleep(poll_interval)

    return None
wait_for_topic_async async
wait_for_topic_async(
    topic_name: str,
    timeout: float = 600.0,
    poll_interval: float = 0.5,
) -> TopicInfo | None

Wait for a topic to become available (async, non-blocking).

Uses asyncio.sleep to avoid blocking the event loop.

Parameters:

Name Type Description Default
topic_name str

Name of the topic to wait for

required
timeout float

Maximum time to wait in seconds

600.0
poll_interval float

Time between lookup attempts in seconds

0.5

Returns:

Type Description
TopicInfo | None

TopicInfo if found within timeout, None otherwise

Source code in src/cortex/discovery/client.py
async def wait_for_topic_async(
    self,
    topic_name: str,
    timeout: float = 600.0,
    poll_interval: float = 0.5,
) -> TopicInfo | None:
    """
    Wait for a topic to become available (async, non-blocking).

    Uses asyncio.sleep to avoid blocking the event loop.

    Args:
        topic_name: Name of the topic to wait for
        timeout: Maximum time to wait in seconds
        poll_interval: Time between lookup attempts in seconds

    Returns:
        TopicInfo if found within timeout, None otherwise
    """
    start_time = time.perf_counter()

    while time.perf_counter() - start_time < timeout:
        topic_info = self.lookup_topic(topic_name)
        if topic_info:
            return topic_info
        await asyncio.sleep(poll_interval)

    return None
ping
ping() -> bool

Check whether the discovery daemon is reachable.

Returns:

Type Description
bool

True if the daemon responded with OK within the configured

bool

timeout/retries, False otherwise.

Source code in src/cortex/discovery/client.py
def ping(self) -> bool:
    """Check whether the discovery daemon is reachable.

    Returns:
        True if the daemon responded with OK within the configured
        timeout/retries, False otherwise.
    """
    request = DiscoveryRequest(command=DiscoveryCommand.PING)
    try:
        response = self._send_request(request)
    except Exception as e:
        logger.debug(f"Ping failed: {e}")
        return False
    return response.status == DiscoveryStatus.OK
list_topics
list_topics() -> list[TopicInfo]

List all registered topics.

Returns:

Type Description
list[TopicInfo]

List of TopicInfo for all registered topics

Source code in src/cortex/discovery/client.py
def list_topics(self) -> list[TopicInfo]:
    """
    List all registered topics.

    Returns:
        List of TopicInfo for all registered topics
    """
    request = DiscoveryRequest(command=DiscoveryCommand.LIST_TOPICS)

    try:
        response = self._send_request(request)
        if response.status == DiscoveryStatus.OK:
            return response.topics or []
        else:
            logger.warning(f"Failed to list topics: {response.message}")
            return []
    except Exception as e:
        logger.error(f"Failed to list topics: {e}")
        return []
close
close() -> None

Close the client connection.

Source code in src/cortex/discovery/client.py
def close(self) -> None:
    """Close the client connection."""
    if self._socket:
        with contextlib.suppress(Exception):
            self._socket.close()
        self._socket = None

    with contextlib.suppress(Exception):
        self._context.term()