Skip to content

Node & Executors

Source: cortex.core.node, cortex.core.executor

A Node owns a ZMQ async context, a set of publishers/subscribers, and a list of timers. Executors are the loops that drive timers and subscriber receive paths.

Responsibilities

flowchart TB
    subgraph NodeResp [Node]
        CTX[shared zmq.asyncio.Context]
        PUBS[Publishers dict]
        SUBS[Subscribers dict]
        TIMERS[Timers list]
    end

    NodeResp -- create_publisher --> P[Publisher]
    NodeResp -- create_subscriber --> S[Subscriber]
    NodeResp -- create_timer --> RE[RateExecutor]
    NodeResp -- run / close --> Lifecycle

    P -. uses .-> CTX
    S -. uses .-> CTX

One node ≈ one process. You can run several in the same process via asyncio.gather([n.run() for n in nodes]) (examples/multi_node_system.py) — but they share the event loop, so a slow callback in one blocks the others.

Lifecycle

stateDiagram-v2
    [*] --> Constructed: Node(name)
    Constructed --> Configured: create_publisher/subscriber/timer
    Configured --> Running: await node.run()
    Running --> Running: timers fire, callbacks dispatch
    Running --> Stopping: node.stop() or cancel
    Stopping --> Closed: await node.close()
    Closed --> [*]: context terminated

node.run() spawns one asyncio task per timer and one per callback-bearing subscriber, then asyncio.gathers them.

node.close() stops executors, cancels tasks, closes all sockets, and terms the ZMQ context. Idempotent.

async with Node("my_node") as node:
    node.create_publisher("/x", IntMessage)
    node.create_subscriber("/y", IntMessage, callback=on_y)
    await node.run()   # blocks until cancelled
# __aexit__ calls close() automatically

Executors

Two subclasses of BaseExecutor:

classDiagram
    class BaseExecutor {
        <<abstract>>
        +func: AsyncCallback
        +start()
        +stop()
        +run(*args, **kwargs)
        #_run_impl()*
    }
    class AsyncExecutor {
        +_run_impl()
    }
    class RateExecutor {
        +rate_hz: float
        +interval: float
        +_run_impl()
    }
    BaseExecutor <|-- AsyncExecutor
    BaseExecutor <|-- RateExecutor

AsyncExecutor

Tight loop: await func(); await asyncio.sleep(0). Used by Subscriber.run for receive-dispatch.

RateExecutor

Fixed-grid timer at rate_hz. next_exec_time is initialized once, then advances by exactly one interval per callback invocation — never reset to "now."

flowchart TD
    Start[next = perf_counter] --> Loop{running?}
    Loop -- no --> End
    Loop -- yes --> Now[now = perf_counter]
    Now --> Due{now >= next?}
    Due -- yes --> Call[await func]
    Call --> Advance[next += interval]
    Advance --> Wait
    Due -- no --> Wait[await sleep max 0, next - now]
    Wait --> Loop

Missed ticks are not skipped. If a 100 Hz callback overruns by 20 ms, the next two ticks fire back-to-back with zero-length sleeps until the clock catches up. The grid is preserved; no tick is silently dropped.

Timer usage

node.create_timer(1.0 / 30, self.publish_frame)   # 30 Hz
node.create_timer(1.0, self.log_stats)            # 1 Hz

Plain async functions, no decorator. They share the event loop with subscriber callbacks — same head-of-line caveat.

Shared ZMQ context

Every publisher/subscriber created through a node reuses the node's zmq.asyncio.Context. Socket creation is cheap, io threads are shared, terminating the context shuts everything down. Don't create your own context inside callbacks.

Minimal complete node

from dataclasses import dataclass
import numpy as np
import cortex
from cortex import Node, Message
from cortex.messages.base import MessageHeader


@dataclass
class Ping(Message):
    payload: np.ndarray
    counter: int


class Echo(Node):
    def __init__(self):
        super().__init__("echo")
        self.pub = self.create_publisher("/pong", Ping)
        self.create_subscriber("/ping", Ping, callback=self.on_ping)
        self._n = 0

    async def on_ping(self, msg: Ping, header: MessageHeader):
        self._n += 1
        self.pub.publish(Ping(payload=msg.payload, counter=self._n))


async def main():
    async with Echo() as node:
        await node.run()


if __name__ == "__main__":
    cortex.run(main())

See also