Discovery protocol¶
A msgpack-over-REQ/REP protocol. Not on the data path — once a subscriber has the endpoint, messages flow publisher → subscriber directly.
Commands¶
| Command | Payload required | Returns |
|---|---|---|
REGISTER_TOPIC (1) |
TopicInfo |
OK / ALREADY_EXISTS |
UNREGISTER_TOPIC (2) |
topic_name or TopicInfo.name |
OK / NOT_FOUND |
LOOKUP_TOPIC (3) |
topic_name |
OK + TopicInfo / NOT_FOUND |
LIST_TOPICS (4) |
— | OK + list[TopicInfo] |
SHUTDOWN (99) |
— | OK; daemon exits |
Status codes: OK=0, NOT_FOUND=1, ALREADY_EXISTS=2, ERROR=3.
TopicInfo payload¶
@dataclass
class TopicInfo:
name: str # "/camera/image"
address: str # "ipc:///tmp/cortex/topics/cam__camera_image.sock"
message_type: str # "ImageMessage"
fingerprint: int # 64-bit class fingerprint
publisher_node: str # "cam"
Publisher register flow¶
sequenceDiagram
autonumber
participant P as Publisher
participant D as Daemon REP
P->>P: bind PUB socket on ipc:///tmp/cortex/topics/<node>__<topic>.sock
P->>D: REQ → DiscoveryRequest(REGISTER_TOPIC, TopicInfo{...})
D->>D: if topic_name absent: insert, else compare publisher_node
alt new
D-->>P: OK "Registered topic: /x"
else same publisher re-registering
D-->>P: OK (overwrite)
else different publisher, same topic
D-->>P: ALREADY_EXISTS
end
Subscriber lookup flow¶
sequenceDiagram
autonumber
participant S as Subscriber
participant D as Daemon REP
participant P as Publisher
S->>D: REQ → LOOKUP_TOPIC("/x")
alt present
D-->>S: OK + TopicInfo
S->>P: SUB connect + SUBSCRIBE "/x"
else missing
D-->>S: NOT_FOUND
Note over S: if wait_for_topic:<br/>poll every 500 ms until timeout
S->>D: retry LOOKUP_TOPIC
end
wait_for_topic_async runs the retry loop with asyncio.sleep so the event loop keeps spinning.
REQ-socket recovery¶
A ZMQ REQ socket gets stuck after a missed reply. The client detects zmq.Again on timeout and rebuilds the socket:
flowchart TD
A[send request] -->|timeout| B[REQ socket stuck]
B --> C[close socket]
C --> D[recreate socket<br/>same endpoint]
D --> E[retry up to retries]
See DiscoveryClient._reconnect.
Fencepost in retries default
retries=1 today executes the loop exactly once — i.e. no retry. Bump to
retries=3 in client-side code if you need resilience.
Failure modes & how Cortex handles them¶
| Scenario | Behavior |
|---|---|
| Daemon not running when publisher starts | Register fails; publisher still publishes, but no subscriber can find it. |
| Daemon restarts | All state lost; publishers must re-register. Current design has no auto-re-register. |
| Publisher crashes | Registry keeps stale TopicInfo until someone UNREGISTERs. |
| Two publishers, same topic | Second registration rejected with ALREADY_EXISTS. |
| Subscriber looks up before publisher | NOT_FOUND; caller may wait_for_topic to poll. |