Shared subscriber primitives.
The async subscriber (cortex.core.subscriber.Subscriber) and the
threaded subscriber (cortex.core.sync_subscriber.ThreadedSubscriber)
diverge in how they pull frames off the wire — one await-s a
zmq.asyncio socket, the other blocks an OS thread on a
zmq.Poller. Everything around that — discovery lookup, type
fingerprint validation, frame decoding, stats, and per-publisher
sequence-gap detection — is identical and lives here.
This module owns no zmq sockets. It is pure dataflow + bookkeeping.
Classes
MessageFingerprintError
Bases: RuntimeError
Raised when an incoming topic's fingerprint doesn't match the expected type.
Source code in src/cortex/core/subscriber_base.py
| class MessageFingerprintError(RuntimeError):
"""Raised when an incoming topic's fingerprint doesn't match the expected type."""
|
SubscriberStats
dataclass
Per-subscriber counters; updated by the receive loop.
Source code in src/cortex/core/subscriber_base.py
| @dataclass
class SubscriberStats:
"""Per-subscriber counters; updated by the receive loop."""
received: int = 0
dropped_estimated: int = 0
last_recv_perf_ns: int | None = None
last_sequence_by_publisher: dict[int, int] = field(default_factory=dict)
|
SubscriberBase
Discovery + connection scaffolding shared by all subscriber implementations.
Subclasses are responsible only for the I/O loop. They set
:attr:_topic_info via :meth:_lookup_blocking (or the async variant
used by the asyncio subscriber) and then open whatever socket they
prefer against :attr:_topic_info.address.
Source code in src/cortex/core/subscriber_base.py
| class SubscriberBase:
"""Discovery + connection scaffolding shared by all subscriber implementations.
Subclasses are responsible only for the I/O loop. They set
:attr:`_topic_info` via :meth:`_lookup_blocking` (or the async variant
used by the asyncio subscriber) and then open whatever socket they
prefer against :attr:`_topic_info.address`.
"""
def __init__(
self,
topic_name: str,
message_type: type[Message],
node_name: str = "anonymous",
discovery_address: str = DEFAULT_DISCOVERY_ADDRESS,
topic_timeout: float = 600.0,
wait_for_topic: bool = True,
strict_fingerprint: bool = False,
):
self.topic_name = topic_name
self.message_type = message_type
self.node_name = node_name
self.discovery_address = discovery_address
self.topic_timeout = topic_timeout
self._wait_for_topic = wait_for_topic
self._strict_fingerprint = strict_fingerprint
self._topic_info: TopicInfo | None = None
self._connected = False
self._discovery_client: DiscoveryClient | None = DiscoveryClient(
discovery_address=self.discovery_address
)
self.stats = SubscriberStats()
# ------------------------------------------------------------------ discovery
def _validate_fingerprint(self, info: TopicInfo) -> None:
"""Refuse or warn on type mismatch.
Strict mode raises; lax mode preserves historical
warning-and-continue behavior (kept until callers opt in).
"""
expected = self.message_type.fingerprint()
if info.fingerprint == expected:
return
msg = (
f"Message type mismatch for {self.topic_name}: "
f"expected {self.message_type.__name__} (fp={expected:#018x}), "
f"got {info.message_type} (fp={info.fingerprint:#018x})"
)
if self._strict_fingerprint:
raise MessageFingerprintError(msg)
logger.warning(msg)
def _lookup_nonblocking(self) -> bool:
"""One-shot lookup. Returns True on success."""
try:
self._topic_info = self._discovery_client.lookup_topic(self.topic_name)
except Exception as exc:
logger.error("Failed to lookup topic: %s", exc)
return False
if self._topic_info is None:
return False
self._validate_fingerprint(self._topic_info)
return True
def _lookup_blocking(self, poll_interval: float = 0.5) -> bool:
"""Block-and-poll for the topic up to :attr:`topic_timeout`."""
try:
self._topic_info = self._discovery_client.wait_for_topic(
self.topic_name,
timeout=self.topic_timeout,
poll_interval=poll_interval,
)
except Exception as exc:
logger.error("Failed to wait for topic: %s", exc)
return False
if self._topic_info is None:
return False
self._validate_fingerprint(self._topic_info)
return True
# ------------------------------------------------------------------ properties
@property
def is_connected(self) -> bool:
return self._connected
@property
def topic_info(self) -> TopicInfo | None:
return self._topic_info
@property
def receive_count(self) -> int:
return self.stats.received
@property
def dropped_count(self) -> int:
return self.stats.dropped_estimated
# ------------------------------------------------------------------ shutdown
def _close_discovery(self) -> None:
if self._discovery_client is not None:
try:
self._discovery_client.close()
except Exception as exc: # best-effort
logger.debug("Discovery close error: %s", exc)
self._discovery_client = None
|
Functions
decode_frames
decode_frames(
message_type: type[Message], frames: list[object]
) -> tuple[Message, MessageHeader] | None
Decode [topic, header, metadata, *buffers] into a typed message.
Returns None and logs a warning on malformed input rather than
raising — the receive loop should not die on a single bad frame.
Source code in src/cortex/core/subscriber_base.py
| def decode_frames(
message_type: type[Message], frames: list[object]
) -> tuple[Message, MessageHeader] | None:
"""Decode ``[topic, header, metadata, *buffers]`` into a typed message.
Returns ``None`` and logs a warning on malformed input rather than
raising — the receive loop should not die on a single bad frame.
"""
if len(frames) < 2:
logger.warning("Unexpected frame count: %d", len(frames))
return None
payload_frames = frames[1:]
try:
if len(payload_frames) == 1:
raw = (
memoryview(payload_frames[0].buffer)
if hasattr(payload_frames[0], "buffer")
else payload_frames[0]
)
return message_type.from_bytes(raw)
return message_type.from_frames(payload_frames)
except Exception as exc:
logger.error("Decode failed: %s", exc)
return None
|
update_stats_for_header(
stats: SubscriberStats,
header: MessageHeader,
now_perf_ns: int,
) -> int
Bump receive counters and infer dropped messages from sequence gaps.
Each Subscriber connects to exactly one topic, and each topic has a
single publisher (today), so keying by fingerprint is effectively
keying by (publisher, type). When multi-publisher fan-in lands we
will extend the key to (publisher_node, fingerprint).
Returns the number of dropped messages inferred from this header.
Source code in src/cortex/core/subscriber_base.py
| def update_stats_for_header(
stats: SubscriberStats, header: MessageHeader, now_perf_ns: int
) -> int:
"""Bump receive counters and infer dropped messages from sequence gaps.
Each ``Subscriber`` connects to exactly one topic, and each topic has a
single publisher (today), so keying by ``fingerprint`` is effectively
keying by ``(publisher, type)``. When multi-publisher fan-in lands we
will extend the key to ``(publisher_node, fingerprint)``.
Returns the number of dropped messages inferred from this header.
"""
stats.received += 1
stats.last_recv_perf_ns = now_perf_ns
last = stats.last_sequence_by_publisher.get(header.fingerprint)
stats.last_sequence_by_publisher[header.fingerprint] = header.sequence
if last is None:
return 0
gap = header.sequence - last - 1
if gap > 0:
stats.dropped_estimated += gap
return gap
return 0
|