Skip to content

cortex.core

core

Core module for Cortex framework.

Attributes

SyncMessageCallback module-attribute

SyncMessageCallback = Callable[
    [Message, MessageHeader], None
]

A blocking callback invoked on the receive thread — must not return a coroutine.

Classes

AsyncExecutor

Bases: BaseExecutor

Runs an async callable in a tight loop, yielding to the event loop.

Used by :class:cortex.core.subscriber.Subscriber to drive its receive → decode → dispatch loop. Exceptions are logged and the loop continues; only :class:asyncio.CancelledError stops it.

Example
async def process_data():
    data = await get_data()
    await handle(data)

executor = AsyncExecutor(process_data)
await executor.run()
Source code in src/cortex/core/executor.py
class AsyncExecutor(BaseExecutor):
    """Runs an async callable in a tight loop, yielding to the event loop.

    Used by :class:`cortex.core.subscriber.Subscriber` to drive its
    receive → decode → dispatch loop. Exceptions are logged and the loop
    continues; only :class:`asyncio.CancelledError` stops it.

    Example:
        ```python
        async def process_data():
            data = await get_data()
            await handle(data)

        executor = AsyncExecutor(process_data)
        await executor.run()
        ```
    """

    async def _run_impl(self, *args, **kwargs) -> None:
        """Run the async function as fast as possible."""
        while self._running:
            try:
                await self.func(*args, **kwargs)
                await asyncio.sleep(0)  # Yield to event loop
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Error in AsyncExecutor: {e}")
                await asyncio.sleep(0)

BaseExecutor

Bases: ABC

Abstract base class for async executors.

Provides common interface for starting, stopping, and running async callback functions.

Source code in src/cortex/core/executor.py
class BaseExecutor(ABC):
    """
    Abstract base class for async executors.

    Provides common interface for starting, stopping, and running
    async callback functions.
    """

    def __init__(self, func: AsyncCallback):
        """
        Initialize the executor.

        Args:
            func: Async function to execute
        """
        self.func = func
        self._running = False

    @property
    def running(self) -> bool:
        """Check if the executor is running."""
        return self._running

    def start(self) -> None:
        """Start the executor."""
        self._running = True

    def stop(self) -> None:
        """Stop the executor."""
        self._running = False

    async def run(self, *args, **kwargs) -> None:
        """Run the executor."""
        self.start()
        try:
            await self._run_impl(*args, **kwargs)
        finally:
            self.stop()

    @abstractmethod
    async def _run_impl(self, *args, **kwargs) -> None:
        """Implementation of the run loop. Subclasses must override."""
        ...
Attributes
running property
running: bool

Check if the executor is running.

Functions
start
start() -> None

Start the executor.

Source code in src/cortex/core/executor.py
def start(self) -> None:
    """Start the executor."""
    self._running = True
stop
stop() -> None

Stop the executor.

Source code in src/cortex/core/executor.py
def stop(self) -> None:
    """Stop the executor."""
    self._running = False
run async
run(*args, **kwargs) -> None

Run the executor.

Source code in src/cortex/core/executor.py
async def run(self, *args, **kwargs) -> None:
    """Run the executor."""
    self.start()
    try:
        await self._run_impl(*args, **kwargs)
    finally:
        self.stop()

RateExecutor

Bases: BaseExecutor

Runs an async callable at a target rate in Hz.

Uses time.perf_counter for scheduling. If a callback overruns the nominal period, next_exec_time stays on the fixed grid (only + interval per invocation); the loop then sleeps 0 until the clock catches up, so missed ticks are not skipped. This matches the historical neurosim ZMQNODE constant-rate executor behavior and is appropriate for simulation stepping.

Example
async def my_callback():
    print("tick")

executor = RateExecutor(my_callback, rate_hz=10.0)
await executor.run()
Source code in src/cortex/core/executor.py
class RateExecutor(BaseExecutor):
    """Runs an async callable at a target rate in Hz.

    Uses ``time.perf_counter`` for scheduling. If a callback overruns the
    nominal period, ``next_exec_time`` stays on the fixed grid (only
    ``+ interval`` per invocation); the loop then sleeps 0 until the clock
    catches up, so **missed ticks are not skipped**. This matches the
    historical neurosim ``ZMQNODE`` constant-rate executor behavior and is
    appropriate for simulation stepping.

    Example:
        ```python
        async def my_callback():
            print("tick")

        executor = RateExecutor(my_callback, rate_hz=10.0)
        await executor.run()
        ```
    """

    def __init__(self, func: AsyncCallback, rate_hz: float):
        """
        Initialize constant rate executor.

        Args:
            func: Async function to execute
            rate_hz: Target execution rate in Hz
        """
        super().__init__(func)
        self._rate_hz = rate_hz
        self.interval = 1.0 / rate_hz

    async def _run_impl(self, *args, **kwargs) -> None:
        """
        Run a function on a fixed ``perf_counter`` grid at ``rate_hz``.

        When the callback is slow, ticks are not skipped: ``next_exec_time``
        advances by one interval per invocation and the loop yields until
        the clock catches up (zero-length sleeps while behind).
        """
        next_exec_time = time.perf_counter()

        while self._running:
            try:
                current_time = time.perf_counter()

                if current_time >= next_exec_time:
                    await self.func(*args, **kwargs)
                    next_exec_time += self.interval

                await asyncio.sleep(0)  # Yield to event loop
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Error in RateExecutor: {e}")
                await asyncio.sleep(0)

            await asyncio.sleep(max(0, next_exec_time - time.perf_counter()))

Node

User-facing composition unit that owns publishers, subscribers, and timers.

A node bundles a shared :class:zmq.asyncio.Context, a collection of :class:cortex.core.publisher.Publisher and :class:cortex.core.subscriber.Subscriber instances created through it, and any number of periodic timer callbacks.

:meth:run starts every subscriber receive loop and every timer as asyncio tasks and gathers them until cancelled. Use as an async context manager so that :meth:close runs on exit and cleans up sockets, tasks, and the shared ZMQ context.

Example
class CameraNode(Node):
    def __init__(self):
        super().__init__("camera_node")
        self.pub = self.create_publisher("/camera/image", ImageMessage)
        self.create_timer(1 / 30, self.publish_image)

    async def publish_image(self):
        self.pub.publish(ImageMessage(data=capture_image()))

async def main():
    async with CameraNode() as node:
        await node.run()
Source code in src/cortex/core/node.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
class Node:
    """User-facing composition unit that owns publishers, subscribers, and timers.

    A node bundles a shared :class:`zmq.asyncio.Context`, a collection of
    :class:`cortex.core.publisher.Publisher` and
    :class:`cortex.core.subscriber.Subscriber` instances created through it,
    and any number of periodic timer callbacks.

    :meth:`run` starts every subscriber receive loop and every timer as
    asyncio tasks and ``gather``s them until cancelled. Use as an async
    context manager so that :meth:`close` runs on exit and cleans up
    sockets, tasks, and the shared ZMQ context.

    Example:
        ```python
        class CameraNode(Node):
            def __init__(self):
                super().__init__("camera_node")
                self.pub = self.create_publisher("/camera/image", ImageMessage)
                self.create_timer(1 / 30, self.publish_image)

            async def publish_image(self):
                self.pub.publish(ImageMessage(data=capture_image()))

        async def main():
            async with CameraNode() as node:
                await node.run()
        ```
    """

    def __init__(
        self,
        name: str,
        discovery_address: str = DEFAULT_DISCOVERY_ADDRESS,
    ):
        """
        Initialize the node.

        Args:
            name: Unique name for this node
            discovery_address: Address of the discovery daemon
        """
        self.name = name
        self.discovery_address = discovery_address

        # ZMQ async context
        self._context = zmq.asyncio.Context()

        # Publishers and subscribers (async and sync share one keyed dict)
        self._publishers: dict[str, Publisher] = {}
        self._subscribers: dict[str, Subscriber | ThreadedSubscriber] = {}

        # Timer executors: (period, callback, RateExecutor)
        self._timers: list[tuple[float, AsyncCallback, RateExecutor]] = []

        # Async subscribers with callbacks need their receive loop scheduled
        # as an asyncio task; sync subscribers run on their own OS thread and
        # are tracked separately so close() can join them deterministically.
        self._active_subscribers: list[Subscriber] = []
        self._sync_subscribers: list[ThreadedSubscriber] = []

        # Independent zmq contexts created for sync-mode publishers; we
        # own their lifecycle and term them on close().
        self._owned_pub_contexts: list[zmq.Context] = []

        # Sync-side worker threads spawned via ``spawn_thread``. They share
        # one ``threading.Event`` for shutdown so ``stop()`` can signal all
        # of them at once and ``close()`` can join them deterministically.
        self._sync_stop_event = threading.Event()
        self._spawned_threads: list[threading.Thread] = []

        # Tasks
        self._tasks: list[asyncio.Task] = []

        # State
        self._running = False
        self._stop_event: asyncio.Event | None = None

        logger.info(f"Created node: {name}")

    def create_publisher(
        self,
        topic_name: str,
        message_type: type[Message],
        queue_size: int = 10,
        mode: PublisherMode = "async",
    ) -> Publisher:
        """
        Create a publisher for a topic.

        Args:
            topic_name: Name of the topic.
            message_type: Type of messages to publish.
            queue_size: Output queue size.
            mode: ``'async'`` (default) shares the node's
                :class:`zmq.asyncio.Context` (with a sync shadow). ``'sync'``
                gives the publisher its own independent
                :class:`zmq.Context` so ``publish()`` does not bounce
                through asyncio's IO threads — recommended for control-loop
                publishers calling ``publish()`` from a non-asyncio thread.
                Note that :class:`zmq.PUB` sockets are not thread-safe;
                only call ``publish()`` from one thread per Publisher.

        Returns:
            Publisher instance
        """
        if topic_name in self._publishers:
            logger.warning(f"Publisher for {topic_name} already exists")
            return self._publishers[topic_name]

        if mode == "async":
            pub_context = self._context
        elif mode == "sync":
            pub_context = zmq.Context()
            self._owned_pub_contexts.append(pub_context)
        else:
            raise ValueError(f"Unknown publisher mode: {mode!r}")

        pub = Publisher(
            topic_name=topic_name,
            message_type=message_type,
            node_name=self.name,
            discovery_address=self.discovery_address,
            queue_size=queue_size,
            context=pub_context,
        )

        self._publishers[topic_name] = pub
        logger.info("Created %s publisher for %s", mode, topic_name)

        return pub

    def create_subscriber(
        self,
        topic_name: str,
        message_type: type[Message],
        callback: MessageCallback | SyncMessageCallback | None = None,
        queue_size: int = 10,
        wait_for_topic: bool = True,
        topic_timeout: float = 30.0,
        mode: SubscriberMode = "async",
        strict_fingerprint: bool | None = None,
        cpu_affinity: list[int] | None = None,
        sched_priority: int | None = None,
    ) -> Subscriber | ThreadedSubscriber:
        """
        Create a subscriber for a topic.

        Args:
            topic_name: Name of the topic.
            message_type: Type of messages expected.
            callback: Function to call when messages arrive. ``mode='async'``
                expects an async callback; ``mode='sync'`` expects a plain
                synchronous callable and rejects coroutine functions.
            queue_size: Input queue size (ignored when ``conflate=True`` in
                sync mode).
            wait_for_topic: Whether to wait for the topic to be available.
            topic_timeout: Timeout for waiting for topic, in seconds.
            mode: ``'async'`` (default) routes through asyncio. ``'sync'``
                runs a dedicated OS thread with synchronous zmq + Poller —
                use for control loops needing tight p99 latency. In sync
                mode the default ``queue_size`` of ``1`` gives latest-wins
                semantics suitable for control commands.
            strict_fingerprint: When True, a fingerprint mismatch between
                the topic and ``message_type`` raises ``MessageFingerprintError``
                instead of logging a warning. Default behavior is mode-
                dependent: ``True`` in sync mode, ``False`` in async mode
                (kept lax for backward compatibility). Pass ``True``
                explicitly on async control topics where silent type
                confusion would corrupt downstream state.
            cpu_affinity: Sync mode only. Pin the receive thread to the
                given CPU set (Linux only; ignored elsewhere).
            sched_priority: Sync mode only. Run the receive thread under
                ``SCHED_FIFO`` at the given priority (Linux only; requires
                ``CAP_SYS_NICE``). Failure is logged and the thread falls
                back to the default scheduler.

        Returns:
            ``Subscriber`` for ``mode='async'``, ``ThreadedSubscriber`` for
            ``mode='sync'``.
        """
        if topic_name in self._subscribers:
            logger.warning(f"Subscriber for {topic_name} already exists")
            return self._subscribers[topic_name]

        if mode == "async":
            # Async default: lax (logs and continues) for compatibility.
            # Callers opt into strict via strict_fingerprint=True.
            async_strict = False if strict_fingerprint is None else strict_fingerprint
            sub: Subscriber | ThreadedSubscriber = Subscriber(
                topic_name=topic_name,
                message_type=message_type,
                callback=callback,
                node_name=self.name,
                discovery_address=self.discovery_address,
                queue_size=queue_size,
                wait_for_topic=wait_for_topic,
                topic_timeout=topic_timeout,
                context=self._context,
                strict_fingerprint=async_strict,
            )
            if callback is not None:
                self._active_subscribers.append(sub)
        elif mode == "sync":
            if callback is None:
                raise ValueError("Sync subscribers require a callback")
            if strict_fingerprint is False:
                # Allow callers to relax sync mode if they really mean it,
                # but the ThreadedSubscriber currently hard-codes strict.
                # Surface the override expectation as a clear log line so
                # the future relaxation is discoverable.
                logger.info(
                    "strict_fingerprint=False ignored for sync subscriber "
                    "%s; sync mode is always strict.",
                    topic_name,
                )
            sub = ThreadedSubscriber(
                topic_name=topic_name,
                message_type=message_type,
                callback=callback,  # type: ignore[arg-type]
                node_name=self.name,
                discovery_address=self.discovery_address,
                queue_size=queue_size,
                wait_for_topic=wait_for_topic,
                topic_timeout=topic_timeout,
                cpu_affinity=cpu_affinity,
                sched_priority=sched_priority,
            )
            self._sync_subscribers.append(sub)
        else:
            raise ValueError(f"Unknown subscriber mode: {mode!r}")

        self._subscribers[topic_name] = sub
        logger.info("Created %s subscriber for %s", mode, topic_name)
        return sub

    @property
    def stop_event(self) -> threading.Event:
        """Shared ``threading.Event`` set when the node is stopping.

        Sync code that opts into the node's lifecycle (publisher threads,
        I/O loops, anything spawned via :meth:`spawn_thread`) should poll
        ``node.stop_event.is_set()`` and exit promptly when it goes True.
        Async code should not need this — it gets cancellation through the
        normal asyncio task lifecycle.
        """
        return self._sync_stop_event

    def spawn_thread(
        self,
        target: Callable[..., None],
        *args,
        name: str | None = None,
        **kwargs,
    ) -> threading.Thread:
        """Start an OS thread owned by this node.

        ``target`` is invoked as ``target(stop_event, *args, **kwargs)`` —
        the first positional argument is always the node's shared
        ``threading.Event``. The thread is started immediately, registered
        for ``run()`` keepalive (so the asyncio side won't fall through),
        and joined deterministically by :meth:`close`.

        This is the canonical way to drive sync-mode publishers, custom
        polling loops, or any blocking I/O the node should manage.

        Args:
            target: The thread body. Must accept the stop event as its
                first positional arg.
            *args: Forwarded to ``target`` after the stop event.
            name: Thread name; defaults to ``"<node-name>-thread-<n>"``.
            **kwargs: Forwarded to ``target``.

        Returns:
            The :class:`threading.Thread` instance, already running.

        Example:
            ```python
            def control_loop(stop, pub, rate_hz):
                interval = 1.0 / rate_hz
                next_t = time.perf_counter()
                while not stop.is_set():
                    ...
                    pub.publish(WheelCommand(...))
                    next_t += interval
                    time.sleep(max(0, next_t - time.perf_counter()))

            pub = node.create_publisher(..., mode="sync")
            node.spawn_thread(control_loop, pub, 1000.0)
            await node.run()  # blocks until Ctrl+C; close() joins the thread
            ```
        """
        thread_name = name or f"{self.name}-thread-{len(self._spawned_threads)}"
        stop = self._sync_stop_event

        def _runner() -> None:
            try:
                target(stop, *args, **kwargs)
            except Exception:
                logger.exception("Spawned thread %s crashed", thread_name)

        thread = threading.Thread(target=_runner, name=thread_name, daemon=False)
        thread.start()
        self._spawned_threads.append(thread)
        logger.info("Spawned thread %s", thread_name)
        return thread

    def create_timer(
        self,
        period: float,
        callback: AsyncCallback,
    ) -> None:
        """
        Create a periodic timer.

        Args:
            period: Timer period in seconds
            callback: Async function to call on each timer tick
        """
        rate_hz = 1.0 / period
        executor = RateExecutor(callback, rate_hz=rate_hz)
        self._timers.append((period, callback, executor))

        logger.debug(f"Created timer with period {period}s ({rate_hz} Hz)")

    async def run(self) -> None:
        """
        Run the node, processing messages and timers.

        This is the main async entry point for the node. Sync subscribers
        are started on their own OS threads and run independently of the
        asyncio event loop.
        """
        self._running = True

        # Start sync subscribers first — they don't depend on the loop and
        # we want them receiving as early as possible.
        for sub in self._sync_subscribers:
            sub.start()

        # Start all timer executors
        for _period, _callback, executor in self._timers:
            self._tasks.append(asyncio.create_task(executor.run()))

        # Start all async subscriber receive loops
        for sub in self._active_subscribers:
            self._tasks.append(asyncio.create_task(sub.run()))

        # If the node has no async work but does have sync work to manage
        # (sync subscribers and/or threads spawned via spawn_thread), keep
        # run() alive so the asyncio side does not fall through and trip
        # the finally block. Released by stop() / close().
        has_sync_work = bool(self._sync_subscribers) or bool(self._spawned_threads)
        if not self._tasks and has_sync_work:
            self._stop_event = asyncio.Event()
            self._tasks.append(asyncio.create_task(self._stop_event.wait()))

        logger.info(
            "Node %s running with %d async tasks, %d sync threads",
            self.name,
            len(self._tasks),
            len(self._sync_subscribers),
        )

        try:
            await asyncio.gather(*self._tasks, return_exceptions=True)
        except asyncio.CancelledError:
            logger.info(f"Node {self.name} cancelled")
        finally:
            self._running = False
            # Stop all executors
            for _period, _callback, executor in self._timers:
                executor.stop()
            for sub in self._active_subscribers:
                sub.stop()
            for sub in self._sync_subscribers:
                sub.stop()

    def stop(self) -> None:
        """Stop the node."""
        logger.info(f"Stopping node {self.name}")
        self._running = False

        # Stop all executors
        for _period, _callback, executor in self._timers:
            executor.stop()
        for sub in self._active_subscribers:
            sub.stop()
        for sub in self._sync_subscribers:
            sub.stop()

        # Signal all spawned sync threads to wind down.
        self._sync_stop_event.set()

        # Release the keepalive task (if any) so run() can return cleanly.
        if self._stop_event is not None and not self._stop_event.is_set():
            self._stop_event.set()

        # Cancel all tasks
        for task in self._tasks:
            if not task.done():
                task.cancel()

    async def close(self) -> None:
        """Close the node and release all resources."""
        logger.info(f"Closing node {self.name}")

        self.stop()

        # Wait for tasks to complete
        if self._tasks:
            await asyncio.gather(*self._tasks, return_exceptions=True)
        self._tasks.clear()

        # Close all publishers
        for pub in self._publishers.values():
            pub.close()
        self._publishers.clear()

        # Close all subscribers (joins sync receive threads)
        for sub in self._subscribers.values():
            sub.close()
        self._subscribers.clear()

        # Join spawned sync worker threads — stop() already set the event.
        for thread in self._spawned_threads:
            thread.join(timeout=2.0)
            if thread.is_alive():
                logger.warning(
                    "Spawned thread %s did not exit within 2.0s", thread.name
                )
        self._spawned_threads.clear()

        self._timers.clear()
        self._active_subscribers.clear()
        self._sync_subscribers.clear()

        # Terminate ZMQ contexts: shared async first, then any sync contexts
        # created for sync-mode publishers.
        self._context.term()
        for ctx in self._owned_pub_contexts:
            try:
                ctx.term()
            except Exception as exc:
                logger.debug("Error terming sync publisher context: %s", exc)
        self._owned_pub_contexts.clear()

        logger.info(f"Node {self.name} closed")

    def get_publisher(self, topic_name: str) -> Publisher | None:
        """Get a publisher by topic name."""
        return self._publishers.get(topic_name)

    def get_subscriber(self, topic_name: str) -> Subscriber | None:
        """Get a subscriber by topic name."""
        return self._subscribers.get(topic_name)

    @property
    def publishers(self) -> list[str]:
        """Get list of publisher topic names."""
        return list(self._publishers.keys())

    @property
    def subscribers(self) -> list[str]:
        """Get list of subscriber topic names."""
        return list(self._subscribers.keys())

    @property
    def is_running(self) -> bool:
        """Check if the node is running."""
        return self._running

    # ------------------------------------------------------------------
    # Sync entry points — for nodes that only own sync work
    # ------------------------------------------------------------------

    def _has_async_work(self) -> bool:
        """True if the node has anything that needs an asyncio loop."""
        return bool(self._timers) or bool(self._active_subscribers)

    def spin(self, timeout: float | None = None) -> None:
        """Block the calling thread until the node is stopped.

        Sync counterpart to :meth:`run`. Use this when the node owns only
        sync work — sync subscribers, threads spawned via
        :meth:`spawn_thread`, or nothing more than a publisher driven from
        the calling thread itself. No asyncio loop is created.

        Raises ``RuntimeError`` if the node has async timers or async
        subscribers, since those need :meth:`run` to be scheduled. ``Ctrl+C``
        is delivered as :class:`KeyboardInterrupt` and propagates so the
        caller can decide whether to swallow it.

        Args:
            timeout: Optional cap (seconds) on how long to block. ``None``
                means "wait forever, until :meth:`stop` is called".

        Example:
            ```python
            node = Node("controller")
            pub = node.create_publisher("/cmd", WheelCommand, mode="sync")
            node.spawn_thread(control_loop, pub, 1000.0)
            try:
                node.spin()              # blocks until Ctrl+C
            except KeyboardInterrupt:
                pass
            finally:
                node.close_sync()
            ```
        """
        if self._has_async_work():
            raise RuntimeError(
                "Node.spin() does not start an asyncio loop, but this node "
                "has async timers/subscribers. Use `await node.run()` instead, "
                "or remove the async work."
            )

        self._running = True
        for sub in self._sync_subscribers:
            sub.start()

        logger.info(
            "Node %s spinning with %d sync subscribers and %d threads",
            self.name,
            len(self._sync_subscribers),
            len(self._spawned_threads),
        )
        try:
            # ``Event.wait`` is interruptible by Ctrl+C on the main thread.
            self._sync_stop_event.wait(timeout=timeout)
        finally:
            self._running = False
            for sub in self._sync_subscribers:
                sub.stop()

    def close_sync(self) -> None:
        """Sync counterpart to :meth:`close`.

        Tears down sockets, joins spawned threads, and terms zmq contexts
        without ever entering an asyncio loop. Safe to call from a plain
        ``def main()`` — including from inside ``__exit__`` when the node
        is used as a regular ``with`` context manager.

        Will refuse to run if the node has async timers/subscribers; for
        those, use ``await node.close()``.
        """
        if self._has_async_work():
            raise RuntimeError(
                "Node.close_sync() cannot tear down async timers/subscribers. "
                "Use `await node.close()` instead."
            )

        logger.info("Closing node %s (sync)", self.name)

        # Signal everyone, then synchronously join.
        self._sync_stop_event.set()
        for sub in self._sync_subscribers:
            sub.stop()
        self._running = False

        # Close publishers and (sync) subscribers.
        for pub in self._publishers.values():
            pub.close()
        self._publishers.clear()
        for sub in self._subscribers.values():
            sub.close()
        self._subscribers.clear()

        # Join spawned worker threads.
        for thread in self._spawned_threads:
            thread.join(timeout=2.0)
            if thread.is_alive():
                logger.warning(
                    "Spawned thread %s did not exit within 2.0s", thread.name
                )
        self._spawned_threads.clear()

        self._sync_subscribers.clear()

        # Term zmq contexts. The shared async context is never used by a
        # purely-sync node, but term it anyway so leaks don't accumulate.
        try:
            self._context.term()
        except Exception as exc:
            logger.debug("Error terming async context: %s", exc)
        for ctx in self._owned_pub_contexts:
            try:
                ctx.term()
            except Exception as exc:
                logger.debug("Error terming sync publisher context: %s", exc)
        self._owned_pub_contexts.clear()

        logger.info("Node %s closed", self.name)

    # ------------------------------------------------------------------
    # Context managers
    # ------------------------------------------------------------------

    def __enter__(self) -> "Node":
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.close_sync()

    async def __aenter__(self) -> "Node":
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.close()
Attributes
stop_event property
stop_event: Event

Shared threading.Event set when the node is stopping.

Sync code that opts into the node's lifecycle (publisher threads, I/O loops, anything spawned via :meth:spawn_thread) should poll node.stop_event.is_set() and exit promptly when it goes True. Async code should not need this — it gets cancellation through the normal asyncio task lifecycle.

publishers property
publishers: list[str]

Get list of publisher topic names.

subscribers property
subscribers: list[str]

Get list of subscriber topic names.

is_running property
is_running: bool

Check if the node is running.

Functions
create_publisher
create_publisher(
    topic_name: str,
    message_type: type[Message],
    queue_size: int = 10,
    mode: PublisherMode = "async",
) -> Publisher

Create a publisher for a topic.

Parameters:

Name Type Description Default
topic_name str

Name of the topic.

required
message_type type[Message]

Type of messages to publish.

required
queue_size int

Output queue size.

10
mode PublisherMode

'async' (default) shares the node's :class:zmq.asyncio.Context (with a sync shadow). 'sync' gives the publisher its own independent :class:zmq.Context so publish() does not bounce through asyncio's IO threads — recommended for control-loop publishers calling publish() from a non-asyncio thread. Note that :class:zmq.PUB sockets are not thread-safe; only call publish() from one thread per Publisher.

'async'

Returns:

Type Description
Publisher

Publisher instance

Source code in src/cortex/core/node.py
def create_publisher(
    self,
    topic_name: str,
    message_type: type[Message],
    queue_size: int = 10,
    mode: PublisherMode = "async",
) -> Publisher:
    """
    Create a publisher for a topic.

    Args:
        topic_name: Name of the topic.
        message_type: Type of messages to publish.
        queue_size: Output queue size.
        mode: ``'async'`` (default) shares the node's
            :class:`zmq.asyncio.Context` (with a sync shadow). ``'sync'``
            gives the publisher its own independent
            :class:`zmq.Context` so ``publish()`` does not bounce
            through asyncio's IO threads — recommended for control-loop
            publishers calling ``publish()`` from a non-asyncio thread.
            Note that :class:`zmq.PUB` sockets are not thread-safe;
            only call ``publish()`` from one thread per Publisher.

    Returns:
        Publisher instance
    """
    if topic_name in self._publishers:
        logger.warning(f"Publisher for {topic_name} already exists")
        return self._publishers[topic_name]

    if mode == "async":
        pub_context = self._context
    elif mode == "sync":
        pub_context = zmq.Context()
        self._owned_pub_contexts.append(pub_context)
    else:
        raise ValueError(f"Unknown publisher mode: {mode!r}")

    pub = Publisher(
        topic_name=topic_name,
        message_type=message_type,
        node_name=self.name,
        discovery_address=self.discovery_address,
        queue_size=queue_size,
        context=pub_context,
    )

    self._publishers[topic_name] = pub
    logger.info("Created %s publisher for %s", mode, topic_name)

    return pub
create_subscriber
create_subscriber(
    topic_name: str,
    message_type: type[Message],
    callback: MessageCallback
    | SyncMessageCallback
    | None = None,
    queue_size: int = 10,
    wait_for_topic: bool = True,
    topic_timeout: float = 30.0,
    mode: SubscriberMode = "async",
    strict_fingerprint: bool | None = None,
    cpu_affinity: list[int] | None = None,
    sched_priority: int | None = None,
) -> Subscriber | ThreadedSubscriber

Create a subscriber for a topic.

Parameters:

Name Type Description Default
topic_name str

Name of the topic.

required
message_type type[Message]

Type of messages expected.

required
callback MessageCallback | SyncMessageCallback | None

Function to call when messages arrive. mode='async' expects an async callback; mode='sync' expects a plain synchronous callable and rejects coroutine functions.

None
queue_size int

Input queue size (ignored when conflate=True in sync mode).

10
wait_for_topic bool

Whether to wait for the topic to be available.

True
topic_timeout float

Timeout for waiting for topic, in seconds.

30.0
mode SubscriberMode

'async' (default) routes through asyncio. 'sync' runs a dedicated OS thread with synchronous zmq + Poller — use for control loops needing tight p99 latency. In sync mode the default queue_size of 1 gives latest-wins semantics suitable for control commands.

'async'
strict_fingerprint bool | None

When True, a fingerprint mismatch between the topic and message_type raises MessageFingerprintError instead of logging a warning. Default behavior is mode- dependent: True in sync mode, False in async mode (kept lax for backward compatibility). Pass True explicitly on async control topics where silent type confusion would corrupt downstream state.

None
cpu_affinity list[int] | None

Sync mode only. Pin the receive thread to the given CPU set (Linux only; ignored elsewhere).

None
sched_priority int | None

Sync mode only. Run the receive thread under SCHED_FIFO at the given priority (Linux only; requires CAP_SYS_NICE). Failure is logged and the thread falls back to the default scheduler.

None

Returns:

Type Description
Subscriber | ThreadedSubscriber

Subscriber for mode='async', ThreadedSubscriber for

Subscriber | ThreadedSubscriber

mode='sync'.

Source code in src/cortex/core/node.py
def create_subscriber(
    self,
    topic_name: str,
    message_type: type[Message],
    callback: MessageCallback | SyncMessageCallback | None = None,
    queue_size: int = 10,
    wait_for_topic: bool = True,
    topic_timeout: float = 30.0,
    mode: SubscriberMode = "async",
    strict_fingerprint: bool | None = None,
    cpu_affinity: list[int] | None = None,
    sched_priority: int | None = None,
) -> Subscriber | ThreadedSubscriber:
    """
    Create a subscriber for a topic.

    Args:
        topic_name: Name of the topic.
        message_type: Type of messages expected.
        callback: Function to call when messages arrive. ``mode='async'``
            expects an async callback; ``mode='sync'`` expects a plain
            synchronous callable and rejects coroutine functions.
        queue_size: Input queue size (ignored when ``conflate=True`` in
            sync mode).
        wait_for_topic: Whether to wait for the topic to be available.
        topic_timeout: Timeout for waiting for topic, in seconds.
        mode: ``'async'`` (default) routes through asyncio. ``'sync'``
            runs a dedicated OS thread with synchronous zmq + Poller —
            use for control loops needing tight p99 latency. In sync
            mode the default ``queue_size`` of ``1`` gives latest-wins
            semantics suitable for control commands.
        strict_fingerprint: When True, a fingerprint mismatch between
            the topic and ``message_type`` raises ``MessageFingerprintError``
            instead of logging a warning. Default behavior is mode-
            dependent: ``True`` in sync mode, ``False`` in async mode
            (kept lax for backward compatibility). Pass ``True``
            explicitly on async control topics where silent type
            confusion would corrupt downstream state.
        cpu_affinity: Sync mode only. Pin the receive thread to the
            given CPU set (Linux only; ignored elsewhere).
        sched_priority: Sync mode only. Run the receive thread under
            ``SCHED_FIFO`` at the given priority (Linux only; requires
            ``CAP_SYS_NICE``). Failure is logged and the thread falls
            back to the default scheduler.

    Returns:
        ``Subscriber`` for ``mode='async'``, ``ThreadedSubscriber`` for
        ``mode='sync'``.
    """
    if topic_name in self._subscribers:
        logger.warning(f"Subscriber for {topic_name} already exists")
        return self._subscribers[topic_name]

    if mode == "async":
        # Async default: lax (logs and continues) for compatibility.
        # Callers opt into strict via strict_fingerprint=True.
        async_strict = False if strict_fingerprint is None else strict_fingerprint
        sub: Subscriber | ThreadedSubscriber = Subscriber(
            topic_name=topic_name,
            message_type=message_type,
            callback=callback,
            node_name=self.name,
            discovery_address=self.discovery_address,
            queue_size=queue_size,
            wait_for_topic=wait_for_topic,
            topic_timeout=topic_timeout,
            context=self._context,
            strict_fingerprint=async_strict,
        )
        if callback is not None:
            self._active_subscribers.append(sub)
    elif mode == "sync":
        if callback is None:
            raise ValueError("Sync subscribers require a callback")
        if strict_fingerprint is False:
            # Allow callers to relax sync mode if they really mean it,
            # but the ThreadedSubscriber currently hard-codes strict.
            # Surface the override expectation as a clear log line so
            # the future relaxation is discoverable.
            logger.info(
                "strict_fingerprint=False ignored for sync subscriber "
                "%s; sync mode is always strict.",
                topic_name,
            )
        sub = ThreadedSubscriber(
            topic_name=topic_name,
            message_type=message_type,
            callback=callback,  # type: ignore[arg-type]
            node_name=self.name,
            discovery_address=self.discovery_address,
            queue_size=queue_size,
            wait_for_topic=wait_for_topic,
            topic_timeout=topic_timeout,
            cpu_affinity=cpu_affinity,
            sched_priority=sched_priority,
        )
        self._sync_subscribers.append(sub)
    else:
        raise ValueError(f"Unknown subscriber mode: {mode!r}")

    self._subscribers[topic_name] = sub
    logger.info("Created %s subscriber for %s", mode, topic_name)
    return sub
spawn_thread
spawn_thread(
    target: Callable[..., None],
    *args,
    name: str | None = None,
    **kwargs,
) -> threading.Thread

Start an OS thread owned by this node.

target is invoked as target(stop_event, *args, **kwargs) — the first positional argument is always the node's shared threading.Event. The thread is started immediately, registered for run() keepalive (so the asyncio side won't fall through), and joined deterministically by :meth:close.

This is the canonical way to drive sync-mode publishers, custom polling loops, or any blocking I/O the node should manage.

Parameters:

Name Type Description Default
target Callable[..., None]

The thread body. Must accept the stop event as its first positional arg.

required
*args

Forwarded to target after the stop event.

()
name str | None

Thread name; defaults to "<node-name>-thread-<n>".

None
**kwargs

Forwarded to target.

{}

Returns:

Name Type Description
The Thread

class:threading.Thread instance, already running.

Example
def control_loop(stop, pub, rate_hz):
    interval = 1.0 / rate_hz
    next_t = time.perf_counter()
    while not stop.is_set():
        ...
        pub.publish(WheelCommand(...))
        next_t += interval
        time.sleep(max(0, next_t - time.perf_counter()))

pub = node.create_publisher(..., mode="sync")
node.spawn_thread(control_loop, pub, 1000.0)
await node.run()  # blocks until Ctrl+C; close() joins the thread
Source code in src/cortex/core/node.py
def spawn_thread(
    self,
    target: Callable[..., None],
    *args,
    name: str | None = None,
    **kwargs,
) -> threading.Thread:
    """Start an OS thread owned by this node.

    ``target`` is invoked as ``target(stop_event, *args, **kwargs)`` —
    the first positional argument is always the node's shared
    ``threading.Event``. The thread is started immediately, registered
    for ``run()`` keepalive (so the asyncio side won't fall through),
    and joined deterministically by :meth:`close`.

    This is the canonical way to drive sync-mode publishers, custom
    polling loops, or any blocking I/O the node should manage.

    Args:
        target: The thread body. Must accept the stop event as its
            first positional arg.
        *args: Forwarded to ``target`` after the stop event.
        name: Thread name; defaults to ``"<node-name>-thread-<n>"``.
        **kwargs: Forwarded to ``target``.

    Returns:
        The :class:`threading.Thread` instance, already running.

    Example:
        ```python
        def control_loop(stop, pub, rate_hz):
            interval = 1.0 / rate_hz
            next_t = time.perf_counter()
            while not stop.is_set():
                ...
                pub.publish(WheelCommand(...))
                next_t += interval
                time.sleep(max(0, next_t - time.perf_counter()))

        pub = node.create_publisher(..., mode="sync")
        node.spawn_thread(control_loop, pub, 1000.0)
        await node.run()  # blocks until Ctrl+C; close() joins the thread
        ```
    """
    thread_name = name or f"{self.name}-thread-{len(self._spawned_threads)}"
    stop = self._sync_stop_event

    def _runner() -> None:
        try:
            target(stop, *args, **kwargs)
        except Exception:
            logger.exception("Spawned thread %s crashed", thread_name)

    thread = threading.Thread(target=_runner, name=thread_name, daemon=False)
    thread.start()
    self._spawned_threads.append(thread)
    logger.info("Spawned thread %s", thread_name)
    return thread
create_timer
create_timer(
    period: float, callback: AsyncCallback
) -> None

Create a periodic timer.

Parameters:

Name Type Description Default
period float

Timer period in seconds

required
callback AsyncCallback

Async function to call on each timer tick

required
Source code in src/cortex/core/node.py
def create_timer(
    self,
    period: float,
    callback: AsyncCallback,
) -> None:
    """
    Create a periodic timer.

    Args:
        period: Timer period in seconds
        callback: Async function to call on each timer tick
    """
    rate_hz = 1.0 / period
    executor = RateExecutor(callback, rate_hz=rate_hz)
    self._timers.append((period, callback, executor))

    logger.debug(f"Created timer with period {period}s ({rate_hz} Hz)")
run async
run() -> None

Run the node, processing messages and timers.

This is the main async entry point for the node. Sync subscribers are started on their own OS threads and run independently of the asyncio event loop.

Source code in src/cortex/core/node.py
async def run(self) -> None:
    """
    Run the node, processing messages and timers.

    This is the main async entry point for the node. Sync subscribers
    are started on their own OS threads and run independently of the
    asyncio event loop.
    """
    self._running = True

    # Start sync subscribers first — they don't depend on the loop and
    # we want them receiving as early as possible.
    for sub in self._sync_subscribers:
        sub.start()

    # Start all timer executors
    for _period, _callback, executor in self._timers:
        self._tasks.append(asyncio.create_task(executor.run()))

    # Start all async subscriber receive loops
    for sub in self._active_subscribers:
        self._tasks.append(asyncio.create_task(sub.run()))

    # If the node has no async work but does have sync work to manage
    # (sync subscribers and/or threads spawned via spawn_thread), keep
    # run() alive so the asyncio side does not fall through and trip
    # the finally block. Released by stop() / close().
    has_sync_work = bool(self._sync_subscribers) or bool(self._spawned_threads)
    if not self._tasks and has_sync_work:
        self._stop_event = asyncio.Event()
        self._tasks.append(asyncio.create_task(self._stop_event.wait()))

    logger.info(
        "Node %s running with %d async tasks, %d sync threads",
        self.name,
        len(self._tasks),
        len(self._sync_subscribers),
    )

    try:
        await asyncio.gather(*self._tasks, return_exceptions=True)
    except asyncio.CancelledError:
        logger.info(f"Node {self.name} cancelled")
    finally:
        self._running = False
        # Stop all executors
        for _period, _callback, executor in self._timers:
            executor.stop()
        for sub in self._active_subscribers:
            sub.stop()
        for sub in self._sync_subscribers:
            sub.stop()
stop
stop() -> None

Stop the node.

Source code in src/cortex/core/node.py
def stop(self) -> None:
    """Stop the node."""
    logger.info(f"Stopping node {self.name}")
    self._running = False

    # Stop all executors
    for _period, _callback, executor in self._timers:
        executor.stop()
    for sub in self._active_subscribers:
        sub.stop()
    for sub in self._sync_subscribers:
        sub.stop()

    # Signal all spawned sync threads to wind down.
    self._sync_stop_event.set()

    # Release the keepalive task (if any) so run() can return cleanly.
    if self._stop_event is not None and not self._stop_event.is_set():
        self._stop_event.set()

    # Cancel all tasks
    for task in self._tasks:
        if not task.done():
            task.cancel()
close async
close() -> None

Close the node and release all resources.

Source code in src/cortex/core/node.py
async def close(self) -> None:
    """Close the node and release all resources."""
    logger.info(f"Closing node {self.name}")

    self.stop()

    # Wait for tasks to complete
    if self._tasks:
        await asyncio.gather(*self._tasks, return_exceptions=True)
    self._tasks.clear()

    # Close all publishers
    for pub in self._publishers.values():
        pub.close()
    self._publishers.clear()

    # Close all subscribers (joins sync receive threads)
    for sub in self._subscribers.values():
        sub.close()
    self._subscribers.clear()

    # Join spawned sync worker threads — stop() already set the event.
    for thread in self._spawned_threads:
        thread.join(timeout=2.0)
        if thread.is_alive():
            logger.warning(
                "Spawned thread %s did not exit within 2.0s", thread.name
            )
    self._spawned_threads.clear()

    self._timers.clear()
    self._active_subscribers.clear()
    self._sync_subscribers.clear()

    # Terminate ZMQ contexts: shared async first, then any sync contexts
    # created for sync-mode publishers.
    self._context.term()
    for ctx in self._owned_pub_contexts:
        try:
            ctx.term()
        except Exception as exc:
            logger.debug("Error terming sync publisher context: %s", exc)
    self._owned_pub_contexts.clear()

    logger.info(f"Node {self.name} closed")
get_publisher
get_publisher(topic_name: str) -> Publisher | None

Get a publisher by topic name.

Source code in src/cortex/core/node.py
def get_publisher(self, topic_name: str) -> Publisher | None:
    """Get a publisher by topic name."""
    return self._publishers.get(topic_name)
get_subscriber
get_subscriber(topic_name: str) -> Subscriber | None

Get a subscriber by topic name.

Source code in src/cortex/core/node.py
def get_subscriber(self, topic_name: str) -> Subscriber | None:
    """Get a subscriber by topic name."""
    return self._subscribers.get(topic_name)
spin
spin(timeout: float | None = None) -> None

Block the calling thread until the node is stopped.

Sync counterpart to :meth:run. Use this when the node owns only sync work — sync subscribers, threads spawned via :meth:spawn_thread, or nothing more than a publisher driven from the calling thread itself. No asyncio loop is created.

Raises RuntimeError if the node has async timers or async subscribers, since those need :meth:run to be scheduled. Ctrl+C is delivered as :class:KeyboardInterrupt and propagates so the caller can decide whether to swallow it.

Parameters:

Name Type Description Default
timeout float | None

Optional cap (seconds) on how long to block. None means "wait forever, until :meth:stop is called".

None
Example
node = Node("controller")
pub = node.create_publisher("/cmd", WheelCommand, mode="sync")
node.spawn_thread(control_loop, pub, 1000.0)
try:
    node.spin()              # blocks until Ctrl+C
except KeyboardInterrupt:
    pass
finally:
    node.close_sync()
Source code in src/cortex/core/node.py
def spin(self, timeout: float | None = None) -> None:
    """Block the calling thread until the node is stopped.

    Sync counterpart to :meth:`run`. Use this when the node owns only
    sync work — sync subscribers, threads spawned via
    :meth:`spawn_thread`, or nothing more than a publisher driven from
    the calling thread itself. No asyncio loop is created.

    Raises ``RuntimeError`` if the node has async timers or async
    subscribers, since those need :meth:`run` to be scheduled. ``Ctrl+C``
    is delivered as :class:`KeyboardInterrupt` and propagates so the
    caller can decide whether to swallow it.

    Args:
        timeout: Optional cap (seconds) on how long to block. ``None``
            means "wait forever, until :meth:`stop` is called".

    Example:
        ```python
        node = Node("controller")
        pub = node.create_publisher("/cmd", WheelCommand, mode="sync")
        node.spawn_thread(control_loop, pub, 1000.0)
        try:
            node.spin()              # blocks until Ctrl+C
        except KeyboardInterrupt:
            pass
        finally:
            node.close_sync()
        ```
    """
    if self._has_async_work():
        raise RuntimeError(
            "Node.spin() does not start an asyncio loop, but this node "
            "has async timers/subscribers. Use `await node.run()` instead, "
            "or remove the async work."
        )

    self._running = True
    for sub in self._sync_subscribers:
        sub.start()

    logger.info(
        "Node %s spinning with %d sync subscribers and %d threads",
        self.name,
        len(self._sync_subscribers),
        len(self._spawned_threads),
    )
    try:
        # ``Event.wait`` is interruptible by Ctrl+C on the main thread.
        self._sync_stop_event.wait(timeout=timeout)
    finally:
        self._running = False
        for sub in self._sync_subscribers:
            sub.stop()
close_sync
close_sync() -> None

Sync counterpart to :meth:close.

Tears down sockets, joins spawned threads, and terms zmq contexts without ever entering an asyncio loop. Safe to call from a plain def main() — including from inside __exit__ when the node is used as a regular with context manager.

Will refuse to run if the node has async timers/subscribers; for those, use await node.close().

Source code in src/cortex/core/node.py
def close_sync(self) -> None:
    """Sync counterpart to :meth:`close`.

    Tears down sockets, joins spawned threads, and terms zmq contexts
    without ever entering an asyncio loop. Safe to call from a plain
    ``def main()`` — including from inside ``__exit__`` when the node
    is used as a regular ``with`` context manager.

    Will refuse to run if the node has async timers/subscribers; for
    those, use ``await node.close()``.
    """
    if self._has_async_work():
        raise RuntimeError(
            "Node.close_sync() cannot tear down async timers/subscribers. "
            "Use `await node.close()` instead."
        )

    logger.info("Closing node %s (sync)", self.name)

    # Signal everyone, then synchronously join.
    self._sync_stop_event.set()
    for sub in self._sync_subscribers:
        sub.stop()
    self._running = False

    # Close publishers and (sync) subscribers.
    for pub in self._publishers.values():
        pub.close()
    self._publishers.clear()
    for sub in self._subscribers.values():
        sub.close()
    self._subscribers.clear()

    # Join spawned worker threads.
    for thread in self._spawned_threads:
        thread.join(timeout=2.0)
        if thread.is_alive():
            logger.warning(
                "Spawned thread %s did not exit within 2.0s", thread.name
            )
    self._spawned_threads.clear()

    self._sync_subscribers.clear()

    # Term zmq contexts. The shared async context is never used by a
    # purely-sync node, but term it anyway so leaks don't accumulate.
    try:
        self._context.term()
    except Exception as exc:
        logger.debug("Error terming async context: %s", exc)
    for ctx in self._owned_pub_contexts:
        try:
            ctx.term()
        except Exception as exc:
            logger.debug("Error terming sync publisher context: %s", exc)
    self._owned_pub_contexts.clear()

    logger.info("Node %s closed", self.name)

Publisher

Sends typed messages on a topic over a ZMQ PUB socket.

On construction the publisher binds its own IPC socket, registers the (topic, address, fingerprint) triple with the discovery daemon, and becomes ready. :meth:publish is synchronous and non-blocking by default — if the send queue is full the message is dropped and False is returned.

Always create via :meth:Node.create_publisher; that path shares the node's async context and tracks the publisher for clean shutdown.

Note

zmq.PUB sockets are not thread-safe. Do not call :meth:publish concurrently from multiple threads or tasks on the same :class:Publisher instance.

Example
async with Node("camera_node") as node:
    pub = node.create_publisher("/camera/image", ImageMessage)
    pub.publish(ImageMessage(data=image_array))
    await node.run()
Source code in src/cortex/core/publisher.py
class Publisher:
    """Sends typed messages on a topic over a ZMQ PUB socket.

    On construction the publisher binds its own IPC socket, registers the
    ``(topic, address, fingerprint)`` triple with the discovery daemon, and
    becomes ready. :meth:`publish` is synchronous and non-blocking by default
    — if the send queue is full the message is dropped and ``False`` is
    returned.

    Always create via :meth:`Node.create_publisher`; that path shares the
    node's async context and tracks the publisher for clean shutdown.

    Note:
        ``zmq.PUB`` sockets are **not thread-safe**. Do not call
        :meth:`publish` concurrently from multiple threads or tasks on the
        same :class:`Publisher` instance.

    Example:
        ```python
        async with Node("camera_node") as node:
            pub = node.create_publisher("/camera/image", ImageMessage)
            pub.publish(ImageMessage(data=image_array))
            await node.run()
        ```
    """

    def __init__(
        self,
        topic_name: str,
        message_type: type[Message],
        node_name: str = "anonymous",
        discovery_address: str = DEFAULT_DISCOVERY_ADDRESS,
        queue_size: int = 10,
        auto_register: bool = True,
        context: zmq.asyncio.Context | zmq.Context | None = None,
    ):
        """
        Initialize the publisher.

        Args:
            topic_name: Name of the topic to publish on (e.g., "/camera/image")
            message_type: Type of message to publish
            node_name: Name of the node creating this publisher
            discovery_address: Address of the discovery daemon
            queue_size: High-water mark for outgoing messages
            auto_register: Whether to automatically register with discovery daemon
            context: Shared ZMQ async context or sync context (optional)
        """
        self.topic_name = topic_name
        self.message_type = message_type
        self.node_name = node_name
        self.discovery_address = discovery_address
        self.queue_size = queue_size

        # Generate IPC address for this topic
        self.address = generate_ipc_address(topic_name, node_name)
        self._topic_bytes = topic_name.encode("utf-8")

        # ZMQ setup - context provided by Node
        # if context is async context, convert to sync context
        self._context: zmq.asyncio.Context | zmq.Context = context or zmq.Context()
        if isinstance(self._context, zmq.asyncio.Context):
            self._context: zmq.Context = zmq.Context(
                self._context
            )  # publishers are sync
        self._socket: zmq.Socket | None = None

        # Discovery client
        self._discovery_client: DiscoveryClient | None = None
        self._registered = False

        # Statistics
        self._publish_count = 0
        self._last_publish_time: float | None = None

        # Per-publisher monotonic sequence counter. Subscribers infer drops
        # by tracking gaps in this number per ``(publisher_node, fingerprint)``
        # pair, so it must be one-counter-per-publisher rather than the
        # class-level counter that used to live on ``Message``.
        self._sequence: int = 0

        # Initialize
        self._setup_socket()
        if auto_register:
            self._register_with_discovery()

    def _setup_socket(self) -> None:
        """Set up the ZMQ publisher socket."""
        # Ensure the IPC directory exists and remove stale socket file
        path = self.address[6:]  # Remove "ipc://" prefix
        dir_path = os.path.dirname(path)
        os.makedirs(dir_path, exist_ok=True)
        if os.path.exists(path):
            os.remove(path)

        self._socket = self._context.socket(zmq.PUB)

        # Set high-water mark (queue size)
        self._socket.setsockopt(zmq.SNDHWM, self.queue_size)

        # Set linger to 0 for immediate shutdown (close all sockets before context.term)
        self._socket.setsockopt(zmq.LINGER, 0)

        # Bind to the address
        self._socket.bind(self.address)

        logger.debug(f"Publisher socket bound to {self.address}")

    def _register_with_discovery(self) -> None:
        """Register this publisher with the discovery daemon."""
        try:
            self._discovery_client = DiscoveryClient(
                discovery_address=self.discovery_address
            )

            topic_info = TopicInfo(
                name=self.topic_name,
                address=self.address,
                message_type=self.message_type.__name__,
                fingerprint=self.message_type.fingerprint(),
                publisher_node=self.node_name,
            )

            if self._discovery_client.register_topic(topic_info):
                self._registered = True
                logger.info(f"Registered topic {self.topic_name} with discovery daemon")
            else:
                logger.warning(f"Failed to register topic {self.topic_name}")
        except Exception as e:
            logger.warning(f"Could not connect to discovery daemon: {e}")

    def publish(self, message: Message, flags: int = zmq.NOBLOCK) -> bool:
        """Serialize and send ``message`` on this topic.

        Uses the frame-aware transport path so large NumPy / PyTorch buffers
        ride as separate ZMQ frames (zero-copy handoff).

        Args:
            message: Instance whose class matches :attr:`message_type`.
            flags: ZMQ send flags. Default :data:`zmq.NOBLOCK` — drop on
                high-water-mark rather than block the caller.

        Returns:
            ``True`` if ZMQ accepted the message; ``False`` if the queue was
            full (``zmq.Again``) or another send error was logged.

        Raises:
            TypeError: If ``type(message)`` does not match :attr:`message_type`.
        """
        if not isinstance(message, self.message_type):
            raise TypeError(
                f"Expected {self.message_type.__name__}, got {type(message).__name__}"
            )

        try:
            # Send with topic name as first frame for filtering.
            # Message payload uses frame-aware transport to keep large buffers
            # out of the metadata blob. Sequence numbers come from this
            # publisher (not the class-level fallback) so receivers can
            # detect drops per-source.
            sequence = self._sequence
            self._sequence += 1
            self._socket.send_multipart(
                [self._topic_bytes, *message.to_frames(sequence=sequence)],
                flags=flags,
            )

            self._publish_count += 1
            self._last_publish_time = time.time()

            return True
        except zmq.Again:
            # Would block - queue full
            return False
        except Exception as e:
            logger.error(f"Failed to publish message: {e}")
            return False

    @property
    def is_registered(self) -> bool:
        """Check if publisher is registered with discovery daemon."""
        return self._registered

    @property
    def publish_count(self) -> int:
        """Get the number of messages published."""
        return self._publish_count

    @property
    def last_publish_time(self) -> float | None:
        """Get the timestamp of the last published message."""
        return self._last_publish_time

    def close(self) -> None:
        """Close the publisher and unregister from discovery."""
        logger.info(f"Closing publisher for {self.topic_name}")

        # Unregister from discovery (best effort - daemon may be gone)
        if self._discovery_client and self._registered:
            with contextlib.suppress(Exception):
                self._discovery_client.unregister_topic(self.topic_name)
            with contextlib.suppress(Exception):
                self._discovery_client.close()
            self._discovery_client = None

        self._registered = False

        # Close socket
        if self._socket:
            self._socket.close()
            self._socket = None

        # Clean up IPC socket file
        assert self.address.startswith("ipc://"), (
            "CRITICAL: ADDRESS ALWAYS STARTS WITH ipc:// -- UNLESS MANUALLY CHANGED"
        )
        path = self.address[6:]  # Remove "ipc://" prefix
        if os.path.exists(path):
            with contextlib.suppress(Exception):
                os.remove(path)
Attributes
is_registered property
is_registered: bool

Check if publisher is registered with discovery daemon.

publish_count property
publish_count: int

Get the number of messages published.

last_publish_time property
last_publish_time: float | None

Get the timestamp of the last published message.

Functions
publish
publish(message: Message, flags: int = zmq.NOBLOCK) -> bool

Serialize and send message on this topic.

Uses the frame-aware transport path so large NumPy / PyTorch buffers ride as separate ZMQ frames (zero-copy handoff).

Parameters:

Name Type Description Default
message Message

Instance whose class matches :attr:message_type.

required
flags int

ZMQ send flags. Default :data:zmq.NOBLOCK — drop on high-water-mark rather than block the caller.

NOBLOCK

Returns:

Type Description
bool

True if ZMQ accepted the message; False if the queue was

bool

full (zmq.Again) or another send error was logged.

Raises:

Type Description
TypeError

If type(message) does not match :attr:message_type.

Source code in src/cortex/core/publisher.py
def publish(self, message: Message, flags: int = zmq.NOBLOCK) -> bool:
    """Serialize and send ``message`` on this topic.

    Uses the frame-aware transport path so large NumPy / PyTorch buffers
    ride as separate ZMQ frames (zero-copy handoff).

    Args:
        message: Instance whose class matches :attr:`message_type`.
        flags: ZMQ send flags. Default :data:`zmq.NOBLOCK` — drop on
            high-water-mark rather than block the caller.

    Returns:
        ``True`` if ZMQ accepted the message; ``False`` if the queue was
        full (``zmq.Again``) or another send error was logged.

    Raises:
        TypeError: If ``type(message)`` does not match :attr:`message_type`.
    """
    if not isinstance(message, self.message_type):
        raise TypeError(
            f"Expected {self.message_type.__name__}, got {type(message).__name__}"
        )

    try:
        # Send with topic name as first frame for filtering.
        # Message payload uses frame-aware transport to keep large buffers
        # out of the metadata blob. Sequence numbers come from this
        # publisher (not the class-level fallback) so receivers can
        # detect drops per-source.
        sequence = self._sequence
        self._sequence += 1
        self._socket.send_multipart(
            [self._topic_bytes, *message.to_frames(sequence=sequence)],
            flags=flags,
        )

        self._publish_count += 1
        self._last_publish_time = time.time()

        return True
    except zmq.Again:
        # Would block - queue full
        return False
    except Exception as e:
        logger.error(f"Failed to publish message: {e}")
        return False
close
close() -> None

Close the publisher and unregister from discovery.

Source code in src/cortex/core/publisher.py
def close(self) -> None:
    """Close the publisher and unregister from discovery."""
    logger.info(f"Closing publisher for {self.topic_name}")

    # Unregister from discovery (best effort - daemon may be gone)
    if self._discovery_client and self._registered:
        with contextlib.suppress(Exception):
            self._discovery_client.unregister_topic(self.topic_name)
        with contextlib.suppress(Exception):
            self._discovery_client.close()
        self._discovery_client = None

    self._registered = False

    # Close socket
    if self._socket:
        self._socket.close()
        self._socket = None

    # Clean up IPC socket file
    assert self.address.startswith("ipc://"), (
        "CRITICAL: ADDRESS ALWAYS STARTS WITH ipc:// -- UNLESS MANUALLY CHANGED"
    )
    path = self.address[6:]  # Remove "ipc://" prefix
    if os.path.exists(path):
        with contextlib.suppress(Exception):
            os.remove(path)

Subscriber

Bases: SubscriberBase

Async subscriber: receives typed messages on a topic from a ZMQ SUB socket.

On construction, the subscriber performs a non-blocking lookup against the discovery daemon. If the topic already has a publisher it connects immediately; otherwise it defers and retries with an async polling wait inside :meth:run.

When constructed with a callback the subscriber drives its own receive loop (one task, one callback at a time — see :class:cortex.core.executor.AsyncExecutor). Without a callback the subscriber is passive and the caller polls via :meth:receive.

Always create via :meth:Node.create_subscriber.

Source code in src/cortex/core/subscriber.py
class Subscriber(SubscriberBase):
    """Async subscriber: receives typed messages on a topic from a ZMQ SUB socket.

    On construction, the subscriber performs a non-blocking lookup against
    the discovery daemon. If the topic already has a publisher it connects
    immediately; otherwise it defers and retries with an async polling
    wait inside :meth:`run`.

    When constructed with a ``callback`` the subscriber drives its own
    receive loop (one task, one callback at a time — see
    :class:`cortex.core.executor.AsyncExecutor`). Without a callback the
    subscriber is passive and the caller polls via :meth:`receive`.

    Always create via :meth:`Node.create_subscriber`.
    """

    def __init__(
        self,
        topic_name: str,
        message_type: type[Message],
        callback: MessageCallback | None = None,
        node_name: str = "anonymous",
        discovery_address: str = DEFAULT_DISCOVERY_ADDRESS,
        queue_size: int = 10,
        wait_for_topic: bool = True,
        topic_timeout: float = 600.0,
        context: zmq.asyncio.Context | None = None,
        strict_fingerprint: bool = False,
    ):
        super().__init__(
            topic_name=topic_name,
            message_type=message_type,
            node_name=node_name,
            discovery_address=discovery_address,
            topic_timeout=topic_timeout,
            wait_for_topic=wait_for_topic,
            strict_fingerprint=strict_fingerprint,
        )
        self._callback = callback
        self.queue_size = queue_size

        self._context: zmq.asyncio.Context = context or zmq.asyncio.Context()
        self._socket: zmq.asyncio.Socket | None = None

        # Compatibility shim: legacy code reads ``_last_receive_time`` directly.
        self._last_receive_time: float | None = None

        self._executor: AsyncExecutor | None = None

        # Try non-blocking connect (will succeed if topic already exists)
        if self._lookup_nonblocking():
            self._setup_socket(self._topic_info.address)
            self._connected = True
            logger.info(
                "Connected to topic %s at %s", self.topic_name, self._topic_info.address
            )
        else:
            logger.warning(
                "Topic %s not found yet, will retry in run()", self.topic_name
            )

    async def _async_connect(self) -> bool:
        """Async wait for the topic and connect once available."""
        if self._connected:
            return True
        try:
            if self._wait_for_topic:
                logger.info("Waiting for topic %s...", self.topic_name)
                self._topic_info = await self._discovery_client.wait_for_topic_async(
                    self.topic_name, timeout=self.topic_timeout
                )
            else:
                self._topic_info = self._discovery_client.lookup_topic(self.topic_name)
        except Exception as exc:
            logger.error("Failed to connect to topic: %s", exc)
            return False

        if self._topic_info is None:
            return False
        try:
            self._validate_fingerprint(self._topic_info)
        except MessageFingerprintError:
            raise

        self._setup_socket(self._topic_info.address)
        self._connected = True
        logger.info(
            "Connected to topic %s at %s", self.topic_name, self._topic_info.address
        )
        return True

    def _setup_socket(self, address: str) -> None:
        """Create the SUB socket, set HWM/topic filter, and connect."""
        self._socket = self._context.socket(zmq.SUB)
        self._socket.setsockopt(zmq.RCVHWM, self.queue_size)
        self._socket.setsockopt(zmq.LINGER, 0)
        self._socket.setsockopt_string(zmq.SUBSCRIBE, self.topic_name)
        self._socket.connect(address)
        logger.debug("Subscriber socket connected to %s", address)

    async def receive(self) -> tuple[Message, MessageHeader] | None:
        if not self._connected or self._socket is None:
            return None

        try:
            with tracing.stage("async.recv_multipart"):
                frames = await self._socket.recv_multipart(copy=False)

            with tracing.stage("async.decode"):
                decoded = decode_frames(self.message_type, frames)
            if decoded is None:
                return None
            message, header = decoded

            update_stats_for_header(self.stats, header, perf_counter_ns())
            self._last_receive_time = time.time()
            return message, header

        except asyncio.CancelledError:
            raise
        except Exception as exc:
            logger.error("Failed to receive message: %s", exc)
            return None

    async def _receive_and_callback(self) -> Any:
        result = await self.receive()
        if result is None:
            return None
        message, header = result
        with tracing.stage("async.callback"):
            return await self._callback(message, header)

    def start(self) -> None:
        if self._executor:
            self._executor.start()

    def stop(self) -> None:
        if self._executor:
            self._executor.stop()

    @property
    def running(self) -> bool:
        return self._executor.running if self._executor else False

    async def run(self) -> None:
        if self._callback is None:
            logger.warning("No callback set for subscriber %s", self.topic_name)
            return

        if not self._connected and not await self._async_connect():
            logger.error("Failed to connect subscriber for %s", self.topic_name)
            return

        logger.info("Subscriber for %s running", self.topic_name)
        self._executor = AsyncExecutor(self._receive_and_callback)
        await self._executor.run()
        logger.info("Subscriber for %s stopped", self.topic_name)

    @property
    def last_receive_time(self) -> float | None:
        return self._last_receive_time

    def close(self) -> None:
        logger.info("Closing subscriber for %s", self.topic_name)
        if self._executor:
            self._executor.stop()
            self._executor = None

        self._close_discovery()

        if self._socket:
            with contextlib.suppress(Exception):
                self._socket.close()
            self._socket = None

        self._connected = False

MessageFingerprintError

Bases: RuntimeError

Raised when an incoming topic's fingerprint doesn't match the expected type.

Source code in src/cortex/core/subscriber_base.py
class MessageFingerprintError(RuntimeError):
    """Raised when an incoming topic's fingerprint doesn't match the expected type."""

SubscriberBase

Discovery + connection scaffolding shared by all subscriber implementations.

Subclasses are responsible only for the I/O loop. They set :attr:_topic_info via :meth:_lookup_blocking (or the async variant used by the asyncio subscriber) and then open whatever socket they prefer against :attr:_topic_info.address.

Source code in src/cortex/core/subscriber_base.py
class SubscriberBase:
    """Discovery + connection scaffolding shared by all subscriber implementations.

    Subclasses are responsible only for the I/O loop. They set
    :attr:`_topic_info` via :meth:`_lookup_blocking` (or the async variant
    used by the asyncio subscriber) and then open whatever socket they
    prefer against :attr:`_topic_info.address`.
    """

    def __init__(
        self,
        topic_name: str,
        message_type: type[Message],
        node_name: str = "anonymous",
        discovery_address: str = DEFAULT_DISCOVERY_ADDRESS,
        topic_timeout: float = 600.0,
        wait_for_topic: bool = True,
        strict_fingerprint: bool = False,
    ):
        self.topic_name = topic_name
        self.message_type = message_type
        self.node_name = node_name
        self.discovery_address = discovery_address
        self.topic_timeout = topic_timeout
        self._wait_for_topic = wait_for_topic
        self._strict_fingerprint = strict_fingerprint

        self._topic_info: TopicInfo | None = None
        self._connected = False
        self._discovery_client: DiscoveryClient | None = DiscoveryClient(
            discovery_address=self.discovery_address
        )
        self.stats = SubscriberStats()

    # ------------------------------------------------------------------ discovery

    def _validate_fingerprint(self, info: TopicInfo) -> None:
        """Refuse or warn on type mismatch.

        Strict mode raises; lax mode preserves historical
        warning-and-continue behavior (kept until callers opt in).
        """
        expected = self.message_type.fingerprint()
        if info.fingerprint == expected:
            return
        msg = (
            f"Message type mismatch for {self.topic_name}: "
            f"expected {self.message_type.__name__} (fp={expected:#018x}), "
            f"got {info.message_type} (fp={info.fingerprint:#018x})"
        )
        if self._strict_fingerprint:
            raise MessageFingerprintError(msg)
        logger.warning(msg)

    def _lookup_nonblocking(self) -> bool:
        """One-shot lookup. Returns True on success."""
        try:
            self._topic_info = self._discovery_client.lookup_topic(self.topic_name)
        except Exception as exc:
            logger.error("Failed to lookup topic: %s", exc)
            return False
        if self._topic_info is None:
            return False
        self._validate_fingerprint(self._topic_info)
        return True

    def _lookup_blocking(self, poll_interval: float = 0.5) -> bool:
        """Block-and-poll for the topic up to :attr:`topic_timeout`."""
        try:
            self._topic_info = self._discovery_client.wait_for_topic(
                self.topic_name,
                timeout=self.topic_timeout,
                poll_interval=poll_interval,
            )
        except Exception as exc:
            logger.error("Failed to wait for topic: %s", exc)
            return False
        if self._topic_info is None:
            return False
        self._validate_fingerprint(self._topic_info)
        return True

    # ------------------------------------------------------------------ properties

    @property
    def is_connected(self) -> bool:
        return self._connected

    @property
    def topic_info(self) -> TopicInfo | None:
        return self._topic_info

    @property
    def receive_count(self) -> int:
        return self.stats.received

    @property
    def dropped_count(self) -> int:
        return self.stats.dropped_estimated

    # ------------------------------------------------------------------ shutdown

    def _close_discovery(self) -> None:
        if self._discovery_client is not None:
            try:
                self._discovery_client.close()
            except Exception as exc:  # best-effort
                logger.debug("Discovery close error: %s", exc)
            self._discovery_client = None

SubscriberStats dataclass

Per-subscriber counters; updated by the receive loop.

Source code in src/cortex/core/subscriber_base.py
@dataclass
class SubscriberStats:
    """Per-subscriber counters; updated by the receive loop."""

    received: int = 0
    dropped_estimated: int = 0
    last_recv_perf_ns: int | None = None
    last_sequence_by_publisher: dict[int, int] = field(default_factory=dict)

ThreadedSubscriber

Bases: SubscriberBase

Synchronous SUB-side receive loop running on a dedicated OS thread.

Lifecycle:

  • Construction blocks on a discovery lookup (with optional wait), opens a fresh sync zmq.Context, and connects the SUB socket. Construction does not start the worker thread.
  • :meth:start spins up the thread; the thread blocks in poller.poll(timeout_ms) between messages so shutdown is prompt.
  • :meth:stop signals the thread and joins it (with a 1 s default grace period); :meth:close calls :meth:stop and tears down zmq.

The class is reentrant-safe in the trivial sense that start / stop / close are idempotent. zmq.SUB itself is single- threaded; do not call :meth:receive from another thread while the worker is running.

Source code in src/cortex/core/sync_subscriber.py
class ThreadedSubscriber(SubscriberBase):
    """Synchronous SUB-side receive loop running on a dedicated OS thread.

    Lifecycle:

    * Construction blocks on a discovery lookup (with optional wait), opens
      a fresh sync ``zmq.Context``, and connects the SUB socket.
      Construction does **not** start the worker thread.
    * :meth:`start` spins up the thread; the thread blocks in
      ``poller.poll(timeout_ms)`` between messages so shutdown is prompt.
    * :meth:`stop` signals the thread and joins it (with a 1 s default
      grace period); :meth:`close` calls :meth:`stop` and tears down zmq.

    The class is reentrant-safe in the trivial sense that ``start`` /
    ``stop`` / ``close`` are idempotent. ``zmq.SUB`` itself is single-
    threaded; do not call :meth:`receive` from another thread while the
    worker is running.
    """

    _POLL_TIMEOUT_MS = 50  # bound on shutdown latency
    _JOIN_TIMEOUT_S = 1.0

    def __init__(
        self,
        topic_name: str,
        message_type: type[Message],
        callback: SyncMessageCallback,
        node_name: str = "anonymous",
        discovery_address: str = DEFAULT_DISCOVERY_ADDRESS,
        queue_size: int = 1,
        wait_for_topic: bool = True,
        topic_timeout: float = 30.0,
        cpu_affinity: list[int] | None = None,
        sched_priority: int | None = None,
    ):
        # Strict fingerprint by default in sync mode: callers picked sync
        # for predictability, so silent type-confusion is unacceptable.
        super().__init__(
            topic_name=topic_name,
            message_type=message_type,
            node_name=node_name,
            discovery_address=discovery_address,
            topic_timeout=topic_timeout,
            wait_for_topic=wait_for_topic,
            strict_fingerprint=True,
        )

        if inspect.iscoroutinefunction(callback):
            raise TypeError(
                "ThreadedSubscriber requires a *synchronous* callback. "
                "Pass an async callback through Node.create_subscriber(mode='async') "
                "instead."
            )

        self._callback = callback
        self._queue_size = queue_size
        self._cpu_affinity = cpu_affinity
        self._sched_priority = sched_priority

        self._context = zmq.Context()
        self._socket: zmq.Socket | None = None
        self._poller: zmq.Poller | None = None

        self._thread: threading.Thread | None = None
        self._stop_event = threading.Event()
        self._started = False

        # Resolve the topic and open the socket up front, so construction
        # failures surface to the caller (not the worker thread).
        if not self._lookup_blocking():
            raise TimeoutError(
                f"Topic {self.topic_name} not registered with discovery within "
                f"{self.topic_timeout}s"
            )
        self._setup_socket(self._topic_info.address)
        self._connected = True

        advisory = low_latency_advisory()
        if advisory:
            logger.info(advisory)

    # ------------------------------------------------------------------ socket

    def _setup_socket(self, address: str) -> None:
        sock = self._context.socket(zmq.SUB)
        # RCVHWM gives "drop old, keep newest" semantics on overflow — the
        # right default for control topics. We deliberately do NOT use
        # ZMQ_CONFLATE; it is incompatible with multipart messages and
        # would silently strip every frame except the last.
        sock.setsockopt(zmq.RCVHWM, max(self._queue_size, 1))
        sock.setsockopt(zmq.LINGER, 0)
        sock.setsockopt_string(zmq.SUBSCRIBE, self.topic_name)
        sock.connect(address)
        self._socket = sock

        poller = zmq.Poller()
        poller.register(sock, zmq.POLLIN)
        self._poller = poller

    # ------------------------------------------------------------------ thread

    def start(self) -> None:
        """Spin up the receive thread (idempotent)."""
        if self._started:
            return
        if not self._connected:
            raise RuntimeError(
                f"Subscriber {self.topic_name} is not connected; cannot start"
            )
        self._stop_event.clear()
        self._thread = threading.Thread(
            target=self._run,
            name=f"cortex-sub-{self.topic_name}",
            daemon=False,
        )
        self._thread.start()
        self._started = True

    def stop(self, timeout: float | None = None) -> None:
        """Signal the worker and join it (idempotent)."""
        if not self._started:
            return
        self._stop_event.set()
        if self._thread is not None:
            self._thread.join(timeout if timeout is not None else self._JOIN_TIMEOUT_S)
            if self._thread.is_alive():
                logger.warning(
                    "ThreadedSubscriber for %s did not stop within %.1fs",
                    self.topic_name,
                    timeout if timeout is not None else self._JOIN_TIMEOUT_S,
                )
        self._thread = None
        self._started = False

    @property
    def running(self) -> bool:
        return self._started and self._thread is not None and self._thread.is_alive()

    # ------------------------------------------------------------------ loop

    def _apply_thread_tuning(self) -> None:
        """Apply CPU affinity and (if requested) real-time scheduling.

        Both are best-effort: we log a warning and continue on any failure
        (missing capability, non-Linux platform, EPERM). The receive loop
        works without either knob — they only buy lower jitter.
        """
        if self._cpu_affinity is not None:
            sched_setaffinity = getattr(os, "sched_setaffinity", None)
            if sched_setaffinity is None:
                logger.warning(
                    "CPU affinity requested but not supported on this platform"
                )
            else:
                try:
                    sched_setaffinity(0, set(self._cpu_affinity))
                    logger.info(
                        "Pinned receive thread to CPUs %s",
                        sorted(self._cpu_affinity),
                    )
                except OSError as exc:
                    logger.warning("Failed to set CPU affinity: %s", exc)

        if self._sched_priority is not None:
            sched_setscheduler = getattr(os, "sched_setscheduler", None)
            sched_param_cls = getattr(os, "sched_param", None)
            sched_fifo = getattr(os, "SCHED_FIFO", None)
            if (
                sched_setscheduler is None
                or sched_param_cls is None
                or sched_fifo is None
            ):
                logger.warning(
                    "SCHED_FIFO requested but not supported on this platform"
                )
                return
            try:
                sched_setscheduler(0, sched_fifo, sched_param_cls(self._sched_priority))
                logger.info(
                    "Receive thread set to SCHED_FIFO at priority %d",
                    self._sched_priority,
                )
            except (OSError, PermissionError) as exc:
                # Most common failure mode: missing CAP_SYS_NICE. Don't bail —
                # the receive loop still works on the default scheduler.
                logger.warning(
                    "Failed to set SCHED_FIFO priority %d (need CAP_SYS_NICE): %s",
                    self._sched_priority,
                    exc,
                )

    def _run(self) -> None:
        """Worker thread entry point."""
        self._apply_thread_tuning()
        if is_free_threaded():
            logger.debug(
                "Sync subscriber %s on free-threaded interpreter (no GIL contention)",
                self.topic_name,
            )

        sock = self._socket
        poller = self._poller
        assert sock is not None and poller is not None

        timeout_ms = self._POLL_TIMEOUT_MS
        try:
            while not self._stop_event.is_set():
                events = dict(poller.poll(timeout=timeout_ms))
                if sock not in events:
                    continue

                with tracing.stage("sync.recv_multipart"):
                    try:
                        frames = sock.recv_multipart(copy=False, flags=zmq.NOBLOCK)
                    except zmq.Again:
                        continue

                with tracing.stage("sync.decode"):
                    decoded = decode_frames(self.message_type, frames)
                if decoded is None:
                    continue
                message, header = decoded

                update_stats_for_header(self.stats, header, perf_counter_ns())

                with tracing.stage("sync.callback"):
                    try:
                        self._callback(message, header)
                    except Exception as exc:
                        # Don't kill the receive thread on a user error.
                        logger.exception(
                            "Callback raised on topic %s: %s", self.topic_name, exc
                        )
        except Exception:
            logger.exception(
                "ThreadedSubscriber receive loop crashed for %s", self.topic_name
            )

    # ------------------------------------------------------------------ shutdown

    def close(self) -> None:
        """Stop the worker and tear down zmq state (idempotent)."""
        logger.info("Closing sync subscriber for %s", self.topic_name)
        self.stop()

        self._close_discovery()

        if self._socket is not None:
            with contextlib.suppress(Exception):
                self._socket.close()
            self._socket = None
        self._poller = None

        with contextlib.suppress(Exception):
            self._context.term()
        self._connected = False

    # ------------------------------------------------------------------ stats

    @property
    def is_running(self) -> bool:
        return self.running
Functions
start
start() -> None

Spin up the receive thread (idempotent).

Source code in src/cortex/core/sync_subscriber.py
def start(self) -> None:
    """Spin up the receive thread (idempotent)."""
    if self._started:
        return
    if not self._connected:
        raise RuntimeError(
            f"Subscriber {self.topic_name} is not connected; cannot start"
        )
    self._stop_event.clear()
    self._thread = threading.Thread(
        target=self._run,
        name=f"cortex-sub-{self.topic_name}",
        daemon=False,
    )
    self._thread.start()
    self._started = True
stop
stop(timeout: float | None = None) -> None

Signal the worker and join it (idempotent).

Source code in src/cortex/core/sync_subscriber.py
def stop(self, timeout: float | None = None) -> None:
    """Signal the worker and join it (idempotent)."""
    if not self._started:
        return
    self._stop_event.set()
    if self._thread is not None:
        self._thread.join(timeout if timeout is not None else self._JOIN_TIMEOUT_S)
        if self._thread.is_alive():
            logger.warning(
                "ThreadedSubscriber for %s did not stop within %.1fs",
                self.topic_name,
                timeout if timeout is not None else self._JOIN_TIMEOUT_S,
            )
    self._thread = None
    self._started = False
close
close() -> None

Stop the worker and tear down zmq state (idempotent).

Source code in src/cortex/core/sync_subscriber.py
def close(self) -> None:
    """Stop the worker and tear down zmq state (idempotent)."""
    logger.info("Closing sync subscriber for %s", self.topic_name)
    self.stop()

    self._close_discovery()

    if self._socket is not None:
        with contextlib.suppress(Exception):
            self._socket.close()
        self._socket = None
    self._poller = None

    with contextlib.suppress(Exception):
        self._context.term()
    self._connected = False