Skip to content

Discovery

Source: cortex.discovery.daemon, cortex.discovery.client, cortex.discovery.protocol

A single long-lived process mapping topic names to ZMQ endpoints. Off the data path — once a subscriber has an endpoint, messages flow publisher → subscriber directly.

Moving parts

flowchart LR
    subgraph DP [discovery package]
        PR[protocol.py<br/>DiscoveryRequest /<br/>DiscoveryResponse /<br/>TopicInfo]
        DM[daemon.py<br/>DiscoveryDaemon<br/>ZMQ REP loop]
        CL[client.py<br/>DiscoveryClient<br/>ZMQ REQ wrapper]
    end

    CL -- msgpack REQ --> DM
    DM -- msgpack REP --> CL
    PR -.-> DM
    PR -.-> CL

Both sides agree on the wire format via protocol.py. The daemon runs a single-threaded REP loop; publishers/subscribers speak REQ.

Daemon

DiscoveryDaemon:

  • Binds zmq.REP at ipc:///tmp/cortex/discovery.sock by default.
  • Maintains _topics: dict[str, TopicInfo] — one publisher per topic.
  • RCVTIMEO=1000 so the loop can check _running for clean Ctrl-C. Single request at a time — a slow client blocks others.

State transitions

stateDiagram-v2
    [*] --> Starting
    Starting --> Running: bind OK
    Running --> Running: REGISTER → insert
    Running --> Running: LOOKUP → read
    Running --> Running: UNREGISTER → delete
    Running --> Running: LIST → snapshot
    Running --> Stopping: SIGINT / SHUTDOWN
    Stopping --> [*]: close socket, unlink .sock

Registry semantics

Case Result
New topic Insert → OK
Same topic, same publisher_node Overwrite → OK (re-registration)
Same topic, different publisher_node Reject → ALREADY_EXISTS
UNREGISTER missing topic NOT_FOUND

Client

DiscoveryClient is a thin REQ wrapper. Operational detail: REQ sockets stick after a timeout — they block subsequent sends waiting for a reply that never came. The client closes and recreates the socket on every timeout (_reconnect); callers don't see it.

REQ timeout recovery

flowchart TD
    S[send request] --> W[wait RCVTIMEO]
    W -->|reply| OK[return DiscoveryResponse]
    W -->|timeout| T[zmq.Again]
    T --> C[close REQ socket]
    C --> N[create fresh REQ<br/>same endpoint]
    N -->|attempts < retries| S
    N -->|exhausted| F[raise TimeoutError]

Polling helpers

Protocol

cortex.discovery.protocol:

Type Purpose
DiscoveryCommand REGISTER_TOPIC / UNREGISTER_TOPIC / LOOKUP_TOPIC / LIST_TOPICS / SHUTDOWN
DiscoveryStatus OK / NOT_FOUND / ALREADY_EXISTS / ERROR
TopicInfo name, address, message_type, fingerprint, publisher_node
DiscoveryRequest command + optional topic_info / topic_name
DiscoveryResponse status, message, topic_info, topics

All payloads are msgpack. TopicInfo is nested as a packed sub-blob so responses stay flat.

Limitations

  • One publisher per topic.
  • No heartbeats or leases — a crashed publisher leaves a stale entry.
  • Single-threaded REP — a slow client blocks others.
  • Daemon state is lost on restart; publishers don't auto-re-register.

See also