HORUS Python Bindings

Production-Ready Python API for the HORUS robotics framework - combines simplicity with advanced features for professional robotics applications.

Why HORUS Python?

  • Zero Boilerplate: Working node in 10 lines
  • Flexible API: Functional style or class inheritance - your choice
  • Production Performance: ~500ns latency (same shared memory as Rust)
  • Per-Node Rate Control: Different nodes at different frequencies (100Hz sensor, 10Hz logger)
  • Automatic Timestamps: Built-in message timing and staleness detection
  • Typed Messages: Optional type-safe messages from Rust
  • Multiprocess Support: Process isolation and multi-language nodes
  • Pythonic: Feels like native Python, not wrapped C++
  • Rich Ecosystem: Use NumPy, OpenCV, scikit-learn, etc.

Quick Start

Installation

Automatic (Recommended)

Python bindings are automatically installed when you run the HORUS installer:

# From HORUS root directory
./install.sh

The installer will detect Python 3.9+ and automatically build and install the bindings.

Manual Installation

If you prefer to install manually or need to rebuild:

# Install maturin (Python/Rust build tool)
# Option A: Via Cargo (recommended for Ubuntu 24.04+)
cargo install maturin

# Option B: Via pip (if not blocked by PEP 668)
# pip install maturin

# Build and install from source
cd horus_py
maturin develop --release

Requirements:

  • Python 3.9+
  • Rust 1.70+
  • Linux (for shared memory support)

Minimal Example

import horus

def process(node):
    node.send("output", "Hello HORUS!")

node = horus.Node(pubs="output", tick=process, rate=1)
horus.run(node, duration=3)

This minimal example demonstrates functional-style node creation without class boilerplate.


Core API

Creating a Node

node = horus.Node(
    name="my_node",            # Optional: auto-generated if not provided
    pubs=["topic1", "topic2"], # Topics to publish to
    subs=["input1", "input2"], # Topics to subscribe to
    tick=my_function,          # Function called repeatedly
    rate=30,                   # Hz (default: 30)
    init=setup_fn,             # Optional: called once at start
    shutdown=cleanup_fn        # Optional: called once at end
)

Parameters:

  • name (str, optional): Node name (auto-generated if omitted)
  • pubs (str | list[str], optional): Topics to publish to
  • subs (str | list[str], optional): Topics to subscribe from
  • tick (callable): Function called each cycle
  • rate (int, optional): Execution rate in Hz (default: 30)
  • init (callable, optional): Setup function
  • shutdown (callable, optional): Cleanup function

Alternative: Class-Based Inheritance

For those who prefer OOP, you can inherit from horus.Node:

import horus

class SensorNode(horus.Node):
    def __init__(self):
        super().__init__(
            name="sensor",
            pubs=["temperature"],
            rate=10
        )

    def tick(self, info=None):
        # Override tick method
        self.send("temperature", 25.0)

    def init(self, info=None):
        # Optional: override init
        print("Sensor initialized!")

    def shutdown(self, info=None):
        # Optional: override shutdown
        print("Sensor shutting down!")

# Use it
sensor = SensorNode()
horus.run(sensor)

Both patterns work! Use functional style for simplicity or class inheritance for complex nodes with state.

Node Functions

Your tick function receives the node as a parameter:

def my_tick(node):
    # Check for messages
    if node.has_msg("input"):
        data = node.get("input")  # Get one message

    # Get all messages
    all_msgs = node.get_all("input")

    # Send messages
    node.send("output", {"value": 42})

Node Methods:

  • node.send(topic, data) - Publish message
  • node.get(topic) - Get one message (returns None if empty)
  • node.get_all(topic) - Get all messages as list
  • node.has_msg(topic) - Check if messages available

Running Nodes

# Single node
horus.run(node)

# Multiple nodes
horus.run(node1, node2, node3, duration=10)

Examples

1. Simple Publisher

import horus

def publish_temperature(node):
    node.send("temperature", 25.5)

sensor = horus.Node(
    name="temp_sensor",
    pubs="temperature",
    tick=publish_temperature,
    rate=1  # 1 Hz
)

horus.run(sensor, duration=10)

2. Subscriber

import horus

def display_temperature(node):
    if node.has_msg("temperature"):
        temp = node.get("temperature")
        print(f"Temperature: {temp}°C")

display = horus.Node(
    name="display",
    subs="temperature",
    tick=display_temperature
)

horus.run(display)

3. Pub/Sub Pipeline

import horus

def publish(node):
    node.send("raw", 42.0)

def process(node):
    if node.has_msg("raw"):
        data = node.get("raw")
        result = data * 2.0
        node.send("processed", result)

def display(node):
    if node.has_msg("processed"):
        value = node.get("processed")
        print(f"Result: {value}")

# Create pipeline
publisher = horus.Node("publisher", pubs="raw", tick=publish, rate=1)
processor = horus.Node("processor", subs="raw", pubs="processed", tick=process)
displayer = horus.Node("display", subs="processed", tick=display)

# Run all together
horus.run(publisher, processor, displayer, duration=5)

4. Using Lambda Functions

import horus

# Producer (inline)
producer = horus.Node(
    pubs="numbers",
    tick=lambda n: n.send("numbers", 42),
    rate=1
)

# Transformer (inline)
doubler = horus.Node(
    subs="numbers",
    pubs="doubled",
    tick=lambda n: n.send("doubled", n.get("numbers") * 2) if n.has_msg("numbers") else None
)

horus.run(producer, doubler, duration=5)

5. Multi-Topic Robot Controller

import horus

def robot_controller(node):
    # Read from multiple sensors
    lidar_data = None
    camera_data = None

    if node.has_msg("lidar"):
        lidar_data = node.get("lidar")

    if node.has_msg("camera"):
        camera_data = node.get("camera")

    # Compute commands
    if lidar_data and camera_data:
        cmd = compute_navigation(lidar_data, camera_data)
        node.send("motors", cmd)
        node.send("status", "navigating")

robot = horus.Node(
    name="robot_controller",
    subs=["lidar", "camera"],
    pubs=["motors", "status"],
    tick=robot_controller,
    rate=50  # 50Hz control loop
)

6. Lifecycle Management

import horus

class Context:
    def __init__(self):
        self.count = 0
        self.file = None

ctx = Context()

def init_handler(node):
    print("Starting up!")
    ctx.file = open("data.txt", "w")

def tick_handler(node):
    ctx.count += 1
    data = f"Tick {ctx.count}"
    node.send("data", data)
    ctx.file.write(data + "\n")

def shutdown_handler(node):
    print(f"Processed {ctx.count} messages")
    ctx.file.close()

node = horus.Node(
    pubs="data",
    init=init_handler,
    tick=tick_handler,
    shutdown=shutdown_handler,
    rate=10
)

horus.run(node, duration=5)

Advanced Features (Production-Ready)

HORUS Python includes advanced features that match or exceed ROS2 capabilities while maintaining simplicity.

Priority-Based Execution

Lower priority numbers execute first, ensuring correct data flow:

scheduler = horus.Scheduler()
scheduler.add(sensor, priority=0)     # Executes FIRST
scheduler.add(controller, priority=1) # Executes SECOND
scheduler.add(actuator, priority=2)   # Executes THIRD
scheduler.run()

Automatic Timestamps

All messages automatically get microsecond-precision timestamps:

import horus
import time

def control_tick(node):
    if node.has_msg("sensor_data"):
        # Check message age
        age = node.get_message_age("sensor_data")
        if age > 0.1:  # More than 100ms old
            node.log_warning(f"Stale data: {age*1000:.1f}ms old")
            return

        # Or use built-in staleness detection
        if node.is_stale("sensor_data", max_age=0.1):
            return  # Skip stale data

        # Get message with timestamp
        msg, timestamp = node.get_with_timestamp("sensor_data")
        latency = time.time() - timestamp
        print(f"Latency: {latency*1000:.1f}ms")

        # Process fresh data
        data = node.get("sensor_data")
        process(data)

Timestamp Methods:

  • node.get_message_age(topic) - Get age of next message in seconds
  • node.is_stale(topic, max_age) - Check if message is too old
  • node.get_timestamp(topic) - Peek at timestamp without consuming
  • node.get_with_timestamp(topic) - Get message and timestamp together

Multiprocess Execution

Run Python nodes in separate processes for isolation and multi-language support:

# Run multiple Python files as separate processes
horus run node1.py node2.py node3.py

# Mix Python and Rust nodes
horus run sensor.rs controller.py visualizer.py

# Mix Rust and Python
horus run lidar_driver.rs planner.py motor_control.rs

All nodes in the same horus run session automatically communicate via shared memory!

Example - Distributed System:

# sensor_node.py
import horus

def sensor_tick(node):
    data = read_lidar()  # Your sensor code
    node.send("lidar_data", data)

sensor = horus.Node(name="lidar", pubs="lidar_data", tick=sensor_tick)
horus.run(sensor)
# controller_node.py
import horus

def control_tick(node):
    if node.has_msg("lidar_data"):
        data = node.get("lidar_data")
        cmd = compute_control(data)
        node.send("motor_cmd", cmd)

controller = horus.Node(
    name="controller",
    subs="lidar_data",
    pubs="motor_cmd",
    tick=control_tick
)
horus.run(controller)
# Run both in separate processes
horus run sensor_node.py controller_node.py

Benefits:

  • Process isolation: One crash doesn't kill everything
  • Multi-language: Mix Python and Rust nodes in the same application
  • Parallel execution: True multicore utilization
  • Zero configuration: Shared memory IPC automatically set up

Complete Example: All Features Together

import horus
import time

def sensor_tick(node):
    """High-frequency sensor (100Hz)"""
    imu = {"accel_x": 1.0, "accel_y": 0.0, "accel_z": 9.8}
    node.send("imu_data", imu)
    age = node.get_message_age("imu_data")
    node.log_info(f"Published IMU (age: {age:.3f}s)")

def control_tick(node):
    """Medium-frequency control (50Hz)"""
    if node.has_msg("imu_data"):
        # Check for stale data
        if node.is_stale("imu_data", max_age=0.05):
            node.log_warning("Stale IMU data!")
            return

        imu = node.get("imu_data")
        cmd = {"linear": 1.0, "angular": 0.0}
        node.send("cmd_vel", cmd)

def logger_tick(node):
    """Low-frequency logging (10Hz)"""
    if node.has_msg("cmd_vel"):
        msg, timestamp = node.get_with_timestamp("cmd_vel")
        latency = (time.time() - timestamp) * 1000
        node.log_info(f"Command latency: {latency:.1f}ms")

# Create nodes
sensor = horus.Node(name="imu", pubs="imu_data", tick=sensor_tick)
controller = horus.Node(name="ctrl", subs="imu_data", pubs="cmd_vel", tick=control_tick)
logger = horus.Node(name="log", subs="cmd_vel", tick=logger_tick)

# Configure with different rates and priorities
scheduler = horus.Scheduler()
scheduler.add(sensor, priority=0, logging=True, rate_hz=100.0)
scheduler.add(controller, priority=1, logging=False, rate_hz=50.0)
scheduler.add(logger, priority=2, logging=True, rate_hz=10.0)

scheduler.run(duration=5.0)

# Check statistics
stats = scheduler.get_node_stats("imu")
print(f"Sensor: {stats['total_ticks']} ticks in 5 seconds")

Network Communication

HORUS Python supports network communication for distributed multi-machine systems. Hub, Link, and Router all work transparently over the network.

Hub Network Endpoints

Add an endpoint parameter to communicate over the network:

from horus import Hub, CmdVel

# Local (shared memory) - default
local_hub = Hub(CmdVel)

# Network (UDP direct)
network_hub = Hub(CmdVel, endpoint="[email protected]:8000")

# Router (TCP broker for WAN/NAT traversal)
router_hub = Hub(CmdVel, endpoint="cmdvel@router")

Endpoint Syntax:

  • "topic" - Local shared memory (~500ns latency)
  • "topic@host:port" - Direct UDP (<50μs latency)
  • "topic@router" - Router broker (auto-discovery on localhost:7777)
  • "[email protected]:7777" - Router broker at specific address

Hub Methods

Core Operations:

MethodDescription
hub.send(msg)Send a single message
hub.recv()Receive one message (returns None if empty)
hub.topic()Get the topic name
hub.transport_type()Get transport type ("shm", "udp", "router")

Batch Operations (High-Performance):

MethodDescription
hub.send_many(msgs)Send multiple messages at once, returns count sent
hub.recv_many(n)Receive up to n messages in batch
hub.drain()Receive all available messages (up to 1000)
hub.view()Zero-copy raw bytes (GenericMessage only)

Batch Example:

from horus import Hub, CmdVel

# === Send multiple messages at once ===
hub = Hub(CmdVel)
commands = [
    CmdVel(linear=1.0, angular=0.0),
    CmdVel(linear=1.5, angular=0.2),
    CmdVel(linear=0.0, angular=0.5),
]
sent = hub.send_many(commands)
print(f"Sent {sent}/{len(commands)} messages")

# === Receive up to N messages ===
messages = hub.recv_many(10)  # Get up to 10 messages
for msg in messages:
    print(f"linear={msg.linear}, angular={msg.angular}")

# === Drain all available messages ===
all_msgs = hub.drain()  # Get everything pending
print(f"Drained {len(all_msgs)} messages")

Zero-Copy Raw Access (Generic Hubs Only):

from horus import Hub
import msgpack  # pip install msgpack

# For custom/large data where you control serialization
hub = Hub("sensor_data")  # Generic (untyped) topic

# view() returns raw MessagePack bytes without JSON overhead
if raw := hub.view():
    # Parse with your own logic (msgpack, struct, numpy, etc.)
    data = msgpack.unpackb(raw)
    print(f"Raw data: {data}")

# Note: view() raises TypeError for typed hubs (CmdVel, etc.)
# because they're already optimized for direct field access

Performance Tips:

  • Use send_many() when you have multiple messages queued (1 GIL release vs N)
  • Use drain() when processing bursts of data
  • Use view() for custom serialization of large blobs (avoids JSON round-trip)
  • Batch operations release the GIL once for all messages, not per-message

Generic Hub Methods

When you create a Hub with a string topic (instead of a typed class), you get a generic hub that accepts any serializable data:

from horus import Hub

# Generic hub (string topic = dynamic typing)
hub = Hub("my.topic")

# Typed hub (class = static typing, better performance)
typed_hub = Hub(CmdVel)

Generic Hub Methods:

MethodDescription
send(obj)Send any JSON-serializable object (dict, list, str, int, float)
recv()Receive and deserialize to Python object
send_bytes(data)Send raw bytes (up to 4KB)
view()Receive raw bytes without deserialization
send_numpy(arr)Send NumPy array as bytes
send_with_metadata(data, meta)Send bytes with string metadata
recv_with_metadata()Receive bytes and metadata tuple
is_genericProperty: True if this is a generic hub

Example: JSON-Serializable Objects

from horus import Hub

pub = Hub("sensor.data")
sub = Hub("sensor.data")

# Send dict, list, or any JSON-serializable data
pub.send({"temperature": 25.5, "humidity": 60.0})
pub.send([1.0, 2.0, 3.0, 4.0])
pub.send("status: OK")

# Receive (returns Python object)
msg = sub.recv()  # {"temperature": 25.5, "humidity": 60.0}

Example: Raw Bytes

from horus import Hub

pub = Hub("binary.data")
sub = Hub("binary.data")

# Send raw bytes (custom binary protocol)
pub.send_bytes(b'\x01\x02\x03\x04')

# Receive raw bytes (no deserialization)
raw = sub.view()  # b'\x01\x02\x03\x04'

Example: NumPy Arrays

from horus import Hub
import numpy as np

pub = Hub("numpy.data")
sub = Hub("numpy.data")

# Send NumPy array
arr = np.array([1.0, 2.0, 3.0], dtype=np.float32)
pub.send_numpy(arr)

# Receive and reconstruct
raw = sub.view()
received = np.frombuffer(raw, dtype=np.float32)

Example: Bytes with Metadata

from horus import Hub
import json

pub = Hub("tagged.data")
sub = Hub("tagged.data")

# Send with type hint metadata
data = json.dumps({"x": 1.0, "y": 2.0}).encode()
pub.send_with_metadata(data, "json")

# Receive with metadata
payload, metadata = sub.recv_with_metadata()
if metadata == "json":
    obj = json.loads(payload)

Performance Comparison:

MethodLatencyUse Case
Typed (Hub(CmdVel))~3μsProduction, high-frequency
send() / recv()~50μsDynamic schemas, prototyping
send_bytes() / view()~10μsCustom binary protocols
send_numpy()~15μsArray data

Link provides high-performance point-to-point (SPSC) communication - ~30% faster than Hub for single producer/consumer patterns.

Key Architecture: Link uses a single-slot design (not a ring buffer):

  • Always stores exactly one message (the latest value)
  • New messages overwrite old unread messages
  • Perfect for sensors where only the latest reading matters
  • send() never blocks - it always succeeds immediately
from horus import Link, CmdVel

# === MACHINE 1 (Producer) ===
# Connects to consumer at 192.168.1.20:9000
output = Link.producer(CmdVel, "[email protected]:9000")
output.send(CmdVel(linear=1.0, angular=0.5))

# === MACHINE 2 (Consumer) ===
# Listens on port 9000
input_link = Link.consumer(CmdVel, "[email protected]:9000")
if msg := input_link.recv():
    print(f"Received: {msg}")

Link Methods:

MethodDescription
Link.producer(type, endpoint)Create producer (connects to consumer)
Link.consumer(type, endpoint)Create consumer (listens for producer)
link.send(msg)Send a message
link.recv()Receive a message (returns None if empty)
link.topic()Get the topic name

Local Link (Shared Memory):

# No "@" in endpoint = local shared memory
producer = Link.producer(CmdVel, "motor_commands")
consumer = Link.consumer(CmdVel, "motor_commands")

producer.send(CmdVel(linear=1.0, angular=0.0))
msg = consumer.recv()  # ~248ns latency

Router Client (WAN/NAT Traversal)

For communication across networks, through NAT, or for large-scale deployments, use the Router:

from horus import RouterClient, Hub, CmdVel

# Create router client for explicit connection management
router = RouterClient("192.168.1.100", 7777)

# Build endpoints through the router
cmd_endpoint = router.endpoint("cmdvel")  # Returns "[email protected]:7777"
pose_endpoint = router.endpoint("pose")

# Use endpoints with Hub
hub = Hub(CmdVel, endpoint=cmd_endpoint)

# Router properties
print(f"Address: {router.address}")        # "192.168.1.100:7777"
print(f"Connected: {router.is_connected}") # True
print(f"Topics: {router.topics}")          # ["cmdvel", "pose"]
print(f"Uptime: {router.uptime_seconds}s")

Helper Functions:

from horus import default_router_endpoint, router_endpoint

# Default router (localhost:7777)
ep1 = default_router_endpoint("cmdvel")  # "cmdvel@router"

# Custom router address
ep2 = router_endpoint("cmdvel", "192.168.1.100", 7777)  # "[email protected]:7777"

Router Server (for testing):

from horus import RouterServer

# Start a local router (for development/testing)
server = RouterServer(port=7777)
server.start()

# For production, use CLI instead:
# $ horus router start --port 7777

When to Use What

TransportLatencyUse Case
Hub (local)~500nsSame-machine pub/sub
Link (local)~248nsSame-machine point-to-point (30% faster)
Hub (network)<50μsMulti-machine pub/sub on LAN
Link (network)5-15μsMulti-machine point-to-point (3-10x faster than Hub)
Router10-50msWAN, NAT traversal, cloud deployments

Multi-Machine Example

# === ROBOT (192.168.1.50) ===
from horus import Hub, Link, CmdVel, LaserScan

# Local: Critical flight control (ultra-fast)
imu_link = Link.consumer(ImuData, "imu")  # 248ns

# Network: Telemetry to ground station
telemetry = Hub(DroneStatus, endpoint="[email protected]:8000")

# Network: Commands from ground station
commands = Hub(Command, endpoint="[email protected]:8001")


# === GROUND STATION (192.168.1.100) ===
from horus import Hub

# Receive telemetry from robot
telemetry_sub = Hub(DroneStatus, endpoint="[email protected]:8000")

# Send commands to robot
command_pub = Hub(Command, endpoint="[email protected]:8001")

Integration with Python Ecosystem

NumPy Integration

import horus
import numpy as np

def process_array(node):
    if node.has_msg("raw_data"):
        data = node.get("raw_data")
        # Convert to NumPy array
        arr = np.array(data)
        # Process with NumPy
        result = np.fft.fft(arr)
        node.send("fft_result", result.tolist())

processor = horus.Node(
    subs="raw_data",
    pubs="fft_result",
    tick=process_array
)

OpenCV Integration

import horus
import cv2
import numpy as np

def process_image(node):
    if node.has_msg("camera"):
        img_data = node.get("camera")
        # Convert to OpenCV format
        img = np.array(img_data, dtype=np.uint8).reshape((480, 640, 3))

        # Apply OpenCV processing
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)

        # Publish result
        node.send("edges", edges.flatten().tolist())

vision = horus.Node(
    subs="camera",
    pubs="edges",
    tick=process_image,
    rate=30
)

scikit-learn Integration

import horus
from sklearn.linear_model import LinearRegression
import numpy as np

model = LinearRegression()

def train_model(node):
    if node.has_msg("training_data"):
        data = node.get("training_data")
        X = np.array(data['features'])
        y = np.array(data['labels'])

        # Train model
        model.fit(X, y)
        score = model.score(X, y)

        node.send("model_score", score)

trainer = horus.Node(
    subs="training_data",
    pubs="model_score",
    tick=train_model
)

Advanced Patterns

State Management

import horus

class RobotState:
    def __init__(self):
        self.position = {"x": 0.0, "y": 0.0}
        self.velocity = 0.0
        self.last_update = 0

state = RobotState()

def update_state(node):
    if node.has_msg("velocity"):
        state.velocity = node.get("velocity")

    if node.has_msg("position"):
        state.position = node.get("position")

    # Publish combined state
    node.send("robot_state", {
        "pos": state.position,
        "vel": state.velocity
    })

state_manager = horus.Node(
    subs=["velocity", "position"],
    pubs="robot_state",
    tick=update_state
)

Rate Limiting

import horus
import time

class RateLimiter:
    def __init__(self, min_interval):
        self.min_interval = min_interval
        self.last_send = 0

limiter = RateLimiter(min_interval=0.1)  # 100ms minimum

def rate_limited_publish(node):
    current_time = time.time()

    if current_time - limiter.last_send >= limiter.min_interval:
        node.send("output", "data")
        limiter.last_send = current_time

node = horus.Node(
    pubs="output",
    tick=rate_limited_publish,
    rate=100  # Node runs at 100Hz, but publishes at max 10Hz
)

Error Handling

import horus

def safe_processing(node):
    try:
        if node.has_msg("input"):
            data = node.get("input")
            result = risky_operation(data)
            node.send("output", result)
    except Exception as e:
        node.send("errors", str(e))
        print(f"Error: {e}")

processor = horus.Node(
    subs="input",
    pubs=["output", "errors"],
    tick=safe_processing
)

Performance Tips

1. Use Per-Node Rate Control

# NEW: Use scheduler with per-node rates for optimal performance
scheduler = horus.Scheduler()

# High-frequency sensor (100Hz)
scheduler.add(sensor, priority=0, rate_hz=100.0)

# Medium-frequency control (50Hz)
scheduler.add(controller, priority=1, rate_hz=50.0)

# Low-frequency logging (10Hz)
scheduler.add(logger, priority=2, rate_hz=10.0)

scheduler.run()

# Monitor performance with get_node_stats()
stats = scheduler.get_node_stats("sensor")
print(f"Sensor executed {stats['total_ticks']} ticks")

2. Monitor Message Staleness

def control_tick(node):
    # Skip stale data to maintain real-time performance
    if node.is_stale("sensor_data", max_age=0.1):
        node.log_warning("Skipping stale sensor data")
        return

    # Process fresh data only
    data = node.get("sensor_data")
    process(data)

3. Use Dicts for Messages

# Send messages as Python dicts (automatically serialized to JSON)
cmd = {"linear": 1.5, "angular": 0.8}
node.send("cmd_vel", cmd)

# Check message age using node method
if node.get_message_age("cmd_vel") > 0.1:
    print("Message is stale")

4. Batch Processing

# === Node API (for tick functions) ===
def batch_processor(node):
    # Process all available messages at once
    messages = node.get_all("input")
    if messages:
        results = [process(msg) for msg in messages]
        for result in results:
            node.send("output", result)

# === Hub API (direct usage) ===
from horus import Hub, CmdVel

hub = Hub(CmdVel)

# Receive in batches (1 GIL release for N messages)
messages = hub.recv_many(100)  # Up to 100 messages
all_msgs = hub.drain()          # All available messages

# Send in batches (1 GIL release for N messages)
commands = [CmdVel(linear=v, angular=0) for v in range(10)]
sent = hub.send_many(commands)  # Returns count sent

5. Keep tick() Fast

# GOOD: Fast tick
def good_tick(node):
    if node.has_msg("input"):
        data = node.get("input")
        result = quick_operation(data)
        node.send("output", result)

# BAD: Slow tick
def bad_tick(node):
    time.sleep(1)  # Don't block!
    data = requests.get("http://api.example.com")  # Don't do I/O!

6. Offload Heavy Processing

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

def heavy_processing_node(node):
    if node.has_msg("input"):
        data = node.get("input")
        # Offload to thread pool
        future = executor.submit(expensive_operation, data)
        # Don't block - check result later or use callback

7. Use Multiprocess for CPU-Intensive Tasks

# Isolate heavy processing in separate processes
horus run sensor.py heavy_vision.py light_controller.py

# Each node gets its own CPU core

Development

Building from Source

# Debug build (fast compile, slow runtime)
cd horus_py
maturin develop

# Release build (slow compile, fast runtime)
maturin develop --release

# Build wheel for distribution
maturin build --release

Running Tests

# Install test dependencies
pip install pytest

# Run all tests
pytest tests/

# Run specific feature tests
horus run tests/test_rate_control.py    # Phase 1: Per-node rates
horus run tests/test_timestamps.py      # Phase 2: Timestamps
horus run tests/test_typed_messages.py  # Phase 3: Typed messages

# With coverage
pytest --cov=horus tests/

# Test multiprocess execution (Phase 4)
horus run tests/multiprocess_publisher.py tests/multiprocess_subscriber.py

Mock Mode

HORUS Python includes a mock mode for testing without Rust bindings:

# If Rust bindings aren't available, automatically falls back to mock
# You'll see: "Warning: Rust bindings not available. Running in mock mode."

# Use for unit testing Python logic without HORUS running

Debugging Tips

# Enable logging for specific nodes
scheduler = horus.Scheduler()
scheduler.add(my_node, priority=0, logging=True)  # Enable logs

# Check node statistics
stats = scheduler.get_node_stats("my_node")
print(f"Ticks: {stats['total_ticks']}, Errors: {stats['errors_count']}")

# Monitor message timestamps
msg, timestamp = node.get_with_timestamp("topic")
age = time.time() - timestamp
print(f"Message age: {age*1000:.1f}ms")

Interoperability

With Rust Nodes

Important: For cross-language communication, use typed hubs by passing a message type to Hub().

Cross-Language with Typed Hubs

# Python node with typed hub
from horus import Hub, CmdVel

cmd_hub = Hub(CmdVel)  # Typed hub
cmd_hub.send(CmdVel(linear=1.0, angular=0.5))
// Rust node receives
use horus::prelude::*;
use horus::prelude::*; // Provides cmd_vel::CmdVel;

let hub = Hub::<CmdVel>::new("cmdvel")?;
if let Some(cmd) = hub.recv(&mut None) {
    println!("Got: linear={}, angular={}", cmd.linear, cmd.angular);
}

Generic Hub (String Topics)

# Generic Hub - for custom topics
from horus import Hub

hub = Hub("my_topic")  # Pass string for generic hub
hub.send({"linear": 1.0, "angular": 0.5})  # Uses JSON serialization

Typed hubs: Use Hub(CmdVel), Hub(Pose2D) for cross-language communication. See Python Message Library for details.


Common Patterns

Producer-Consumer

# Producer
producer = horus.Node(
    pubs="queue",
    tick=lambda n: n.send("queue", generate_work())
)

# Consumer
consumer = horus.Node(
    subs="queue",
    tick=lambda n: process_work(n.get("queue")) if n.has_msg("queue") else None
)

horus.run(producer, consumer)

Request-Response

def request_node(node):
    node.send("requests", {"id": 1, "query": "data"})

def response_node(node):
    if node.has_msg("requests"):
        req = node.get("requests")
        response = handle_request(req)
        node.send("responses", response)

req = horus.Node(pubs="requests", tick=request_node)
res = horus.Node(subs="requests", pubs="responses", tick=response_node)

Periodic Tasks

import time

class PeriodicTask:
    def __init__(self, interval):
        self.interval = interval
        self.last_run = 0

task = PeriodicTask(interval=5.0)  # Every 5 seconds

def periodic_tick(node):
    current = time.time()
    if current - task.last_run >= task.interval:
        node.send("periodic", "task_executed")
        task.last_run = current

node = horus.Node(pubs="periodic", tick=periodic_tick, rate=10)

Troubleshooting

Import Errors

# If you see: ModuleNotFoundError: No module named 'horus'
# Rebuild and install:
cd horus_py
maturin develop --release

Slow Performance

# Use release build (not debug)
maturin develop --release

# Check tick rate isn't too high
node = horus.Node(tick=fn, rate=30)  # 30Hz is reasonable

Memory Issues

# Avoid accumulating data in closures
# BAD:
all_data = []
def bad_tick(node):
    all_data.append(node.get("input"))  # Memory leak!

# GOOD:
def good_tick(node):
    data = node.get("input")
    process_and_discard(data)  # Process immediately

Monitor Integration and Logging

Current Limitations

Python nodes currently do NOT appear in the HORUS monitor logs.

The Python bindings pass None for the NodeInfo context parameter, which bypasses the logging system:

# Internal implementation (simplified)
hub.send(msg, None)  #  No context = no logging
hub.recv(&mut None)       #  No context = no logging

What this means:

  • Python nodes communicate via shared memory
  • All message passing functionality works
  • Pub/sub events don't appear in monitor logs
  • Cannot monitor Python node messages in real-time via monitor

Scheduler Logging Flag

The scheduler.add(node, priority, logging=True) flag currently enables Python-level logging (via print statements), not monitor integration:

scheduler = horus.Scheduler()
scheduler.add(sensor, priority=0, logging=True)  # Enables console logging
scheduler.add(controller, priority=1, logging=False)  # No console output

Console logging output:

[INFO] sensor: Published to topic 'lidar_data'
[INFO] sensor: Received from topic 'commands'

Monitoring Python Nodes

Since Python nodes don't integrate with the monitor logging system, use these alternatives:

  1. Node-level logging methods:
def tick(node):
    node.log_info("Processing sensor data")
    node.log_warning("Sensor reading is stale")
    node.log_error("Failed to process data")
    node.log_debug("Debug information")

# These print to console, not monitor
  1. Manual topic monitoring:
def tick(node):
    if node.has_msg("input"):
        data = node.get("input")
        print(f"[{node.name}] Received: {data}")
        node.send("output", result)
        print(f"[{node.name}] Published: {result}")
  1. Node statistics:
scheduler = horus.Scheduler()
scheduler.add(node, priority=0, logging=True)
scheduler.run(duration=10)

# Get stats after running
stats = scheduler.get_node_stats("my_node")
print(f"Ticks: {stats['total_ticks']}")
print(f"Errors: {stats['errors_count']}")

Future Improvements

Monitor integration for Python nodes is planned for a future release. This will require:

  • Passing NodeInfo context through Python bindings
  • Implementing LogSummary for Python message types
  • Enabling monitor to read Python node logs from /dev/shm/horus_logs

See Also


Remember: With HORUS Python, you focus on what your robot does, not how the framework works!