Multi-node system¶
Walk-through of examples/multi_node_system.py — a sensor → processor → monitor pipeline with custom messages and multiple pub/subs.
Topology¶
flowchart LR
subgraph Sensors [Sensor nodes]
S1["sensor_lidar<br/>10 Hz"]
S2["sensor_camera<br/>10 Hz"]
end
Proc[processor]
Mon[monitor]
S1 -- "/sensor/lidar/raw" --> Proc
S2 -- "/sensor/camera/raw" --> Proc
Proc -- "/processed/data" --> Mon
Mon -- "/system/status<br/>1 Hz" --> World((world))
Four nodes share one asyncio event loop via asyncio.gather. Cortex IPC works just as well between nodes in one process as between separate processes — data still rides ZMQ sockets either way.
Message schema¶
Three dataclasses shared between every node so fingerprints match:
@dataclass
class SensorReading(Message):
sensor_id: str
timestamp: float
values: np.ndarray
temperature: float
@dataclass
class ProcessedData(Message):
source_sensor: str
timestamp: float
filtered_values: np.ndarray
statistics: dict # {mean, std, min, max}
@dataclass
class SystemStatus(Message):
timestamp: float
num_sensors: int
processing_rate_hz: float
total_messages: int
Sensor node¶
class SensorNode(Node):
def __init__(self, sensor_id: str, publish_rate: float = 10.0):
super().__init__(f"sensor_{sensor_id}")
self.reading_pub = self.create_publisher(
f"/sensor/{sensor_id}/raw", SensorReading
)
self.create_timer(1.0 / publish_rate, self._publish_reading)
async def _publish_reading(self):
t = time.time()
values = np.sin(np.linspace(0, 2*np.pi, 100) + t) + 0.1*np.random.randn(100)
self.reading_pub.publish(SensorReading(
sensor_id=self.sensor_id,
timestamp=t,
values=values.astype("f4"),
temperature=25.0 + 0.5*np.random.randn(),
))
Processor¶
Subscribes to every sensor, filters, republishes:
class ProcessorNode(Node):
def __init__(self, sensor_ids: list[str]):
super().__init__("processor")
for sid in sensor_ids:
self.create_subscriber(
f"/sensor/{sid}/raw", SensorReading, callback=self._on_reading
)
self.processed_pub = self.create_publisher("/processed/data", ProcessedData)
async def _on_reading(self, msg: SensorReading, header: MessageHeader):
filtered = np.convolve(msg.values, np.ones(5) / 5, mode="same")
self.processed_pub.publish(ProcessedData(
source_sensor=msg.sensor_id,
timestamp=msg.timestamp,
filtered_values=filtered.astype("f4"),
statistics={
"mean": float(filtered.mean()),
"std": float(filtered.std()),
"min": float(filtered.min()),
"max": float(filtered.max()),
},
))
Monitor¶
Tracks throughput, emits status at 1 Hz:
class MonitorNode(Node):
def __init__(self):
super().__init__("monitor")
self.create_subscriber("/processed/data", ProcessedData, callback=self._on_processed)
self.status_pub = self.create_publisher("/system/status", SystemStatus)
self.create_timer(1.0, self._publish_status)
Run the graph¶
async def main():
sensor_nodes = [SensorNode(sid, publish_rate=10.0) for sid in ["lidar", "camera"]]
processor_node = ProcessorNode(["lidar", "camera"])
monitor_node = MonitorNode()
all_nodes = [*sensor_nodes, processor_node, monitor_node]
await asyncio.sleep(1.0) # let topics register and subscribers connect
try:
await asyncio.gather(*[n.run() for n in all_nodes])
finally:
for n in all_nodes:
await n.close()
Timeline¶
sequenceDiagram
participant L as lidar
participant C as camera
participant P as processor
participant M as monitor
par at 10 Hz
L->>P: SensorReading
and
C->>P: SensorReading
end
P->>M: ProcessedData (per reading)
Note over M: counts per second
M->>M: publish SystemStatus (1 Hz)
Run it¶
Expected output:
[processor] Sensor lidar: mean=0.012, std=0.708
[processor] Sensor camera: mean=-0.034, std=0.711
[monitor] System status: 192 messages, 19.2 Hz processing rate