HORUS Python Bindings
Production-Ready Python API for the HORUS robotics framework - combines simplicity with advanced features for professional robotics applications.
Why HORUS Python?
- Zero Boilerplate: Working node in 10 lines
- Flexible API: Functional style or class inheritance - your choice
- Production Performance: ~500ns latency (same shared memory as Rust)
- Per-Node Rate Control: Different nodes at different frequencies (100Hz sensor, 10Hz logger)
- Automatic Timestamps: Built-in message timing and staleness detection
- Typed Messages: Optional type-safe messages from Rust
- Multiprocess Support: Process isolation and multi-language nodes
- Pythonic: Feels like native Python, not wrapped C++
- Rich Ecosystem: Use NumPy, OpenCV, scikit-learn, etc.
Quick Start
Installation
Automatic (Recommended)
Python bindings are automatically installed when you run the HORUS installer:
# From HORUS root directory
./install.sh
The installer will detect Python 3.9+ and automatically build and install the bindings.
Manual Installation
If you prefer to install manually or need to rebuild:
# Install maturin (Python/Rust build tool)
# Option A: Via Cargo (recommended for Ubuntu 24.04+)
cargo install maturin
# Option B: Via pip (if not blocked by PEP 668)
# pip install maturin
# Build and install from source
cd horus_py
maturin develop --release
Requirements:
- Python 3.9+
- Rust 1.70+
- Linux (for shared memory support)
Minimal Example
import horus
def process(node):
node.send("output", "Hello HORUS!")
node = horus.Node(pubs="output", tick=process, rate=1)
horus.run(node, duration=3)
This minimal example demonstrates functional-style node creation without class boilerplate.
Core API
Creating a Node
node = horus.Node(
name="my_node", # Optional: auto-generated if not provided
pubs=["topic1", "topic2"], # Topics to publish to
subs=["input1", "input2"], # Topics to subscribe to
tick=my_function, # Function called repeatedly
rate=30, # Hz (default: 30)
init=setup_fn, # Optional: called once at start
shutdown=cleanup_fn # Optional: called once at end
)
Parameters:
name(str, optional): Node name (auto-generated if omitted)pubs(str | list[str], optional): Topics to publish tosubs(str | list[str], optional): Topics to subscribe fromtick(callable): Function called each cyclerate(int, optional): Execution rate in Hz (default: 30)init(callable, optional): Setup functionshutdown(callable, optional): Cleanup function
Alternative: Class-Based Inheritance
For those who prefer OOP, you can inherit from horus.Node:
import horus
class SensorNode(horus.Node):
def __init__(self):
super().__init__(
name="sensor",
pubs=["temperature"],
rate=10
)
def tick(self, info=None):
# Override tick method
self.send("temperature", 25.0)
def init(self, info=None):
# Optional: override init
print("Sensor initialized!")
def shutdown(self, info=None):
# Optional: override shutdown
print("Sensor shutting down!")
# Use it
sensor = SensorNode()
horus.run(sensor)
Both patterns work! Use functional style for simplicity or class inheritance for complex nodes with state.
Node Functions
Your tick function receives the node as a parameter:
def my_tick(node):
# Check for messages
if node.has_msg("input"):
data = node.get("input") # Get one message
# Get all messages
all_msgs = node.get_all("input")
# Send messages
node.send("output", {"value": 42})
Node Methods:
node.send(topic, data)- Publish messagenode.get(topic)- Get one message (returns None if empty)node.get_all(topic)- Get all messages as listnode.has_msg(topic)- Check if messages available
Running Nodes
# Single node
horus.run(node)
# Multiple nodes
horus.run(node1, node2, node3, duration=10)
Examples
1. Simple Publisher
import horus
def publish_temperature(node):
node.send("temperature", 25.5)
sensor = horus.Node(
name="temp_sensor",
pubs="temperature",
tick=publish_temperature,
rate=1 # 1 Hz
)
horus.run(sensor, duration=10)
2. Subscriber
import horus
def display_temperature(node):
if node.has_msg("temperature"):
temp = node.get("temperature")
print(f"Temperature: {temp}°C")
display = horus.Node(
name="display",
subs="temperature",
tick=display_temperature
)
horus.run(display)
3. Pub/Sub Pipeline
import horus
def publish(node):
node.send("raw", 42.0)
def process(node):
if node.has_msg("raw"):
data = node.get("raw")
result = data * 2.0
node.send("processed", result)
def display(node):
if node.has_msg("processed"):
value = node.get("processed")
print(f"Result: {value}")
# Create pipeline
publisher = horus.Node("publisher", pubs="raw", tick=publish, rate=1)
processor = horus.Node("processor", subs="raw", pubs="processed", tick=process)
displayer = horus.Node("display", subs="processed", tick=display)
# Run all together
horus.run(publisher, processor, displayer, duration=5)
4. Using Lambda Functions
import horus
# Producer (inline)
producer = horus.Node(
pubs="numbers",
tick=lambda n: n.send("numbers", 42),
rate=1
)
# Transformer (inline)
doubler = horus.Node(
subs="numbers",
pubs="doubled",
tick=lambda n: n.send("doubled", n.get("numbers") * 2) if n.has_msg("numbers") else None
)
horus.run(producer, doubler, duration=5)
5. Multi-Topic Robot Controller
import horus
def robot_controller(node):
# Read from multiple sensors
lidar_data = None
camera_data = None
if node.has_msg("lidar"):
lidar_data = node.get("lidar")
if node.has_msg("camera"):
camera_data = node.get("camera")
# Compute commands
if lidar_data and camera_data:
cmd = compute_navigation(lidar_data, camera_data)
node.send("motors", cmd)
node.send("status", "navigating")
robot = horus.Node(
name="robot_controller",
subs=["lidar", "camera"],
pubs=["motors", "status"],
tick=robot_controller,
rate=50 # 50Hz control loop
)
6. Lifecycle Management
import horus
class Context:
def __init__(self):
self.count = 0
self.file = None
ctx = Context()
def init_handler(node):
print("Starting up!")
ctx.file = open("data.txt", "w")
def tick_handler(node):
ctx.count += 1
data = f"Tick {ctx.count}"
node.send("data", data)
ctx.file.write(data + "\n")
def shutdown_handler(node):
print(f"Processed {ctx.count} messages")
ctx.file.close()
node = horus.Node(
pubs="data",
init=init_handler,
tick=tick_handler,
shutdown=shutdown_handler,
rate=10
)
horus.run(node, duration=5)
Advanced Features (Production-Ready)
HORUS Python includes advanced features that match or exceed ROS2 capabilities while maintaining simplicity.
Priority-Based Execution
Lower priority numbers execute first, ensuring correct data flow:
scheduler = horus.Scheduler()
scheduler.add(sensor, priority=0) # Executes FIRST
scheduler.add(controller, priority=1) # Executes SECOND
scheduler.add(actuator, priority=2) # Executes THIRD
scheduler.run()
Automatic Timestamps
All messages automatically get microsecond-precision timestamps:
import horus
import time
def control_tick(node):
if node.has_msg("sensor_data"):
# Check message age
age = node.get_message_age("sensor_data")
if age > 0.1: # More than 100ms old
node.log_warning(f"Stale data: {age*1000:.1f}ms old")
return
# Or use built-in staleness detection
if node.is_stale("sensor_data", max_age=0.1):
return # Skip stale data
# Get message with timestamp
msg, timestamp = node.get_with_timestamp("sensor_data")
latency = time.time() - timestamp
print(f"Latency: {latency*1000:.1f}ms")
# Process fresh data
data = node.get("sensor_data")
process(data)
Timestamp Methods:
node.get_message_age(topic)- Get age of next message in secondsnode.is_stale(topic, max_age)- Check if message is too oldnode.get_timestamp(topic)- Peek at timestamp without consumingnode.get_with_timestamp(topic)- Get message and timestamp together
Multiprocess Execution
Run Python nodes in separate processes for isolation and multi-language support:
# Run multiple Python files as separate processes
horus run node1.py node2.py node3.py
# Mix Python and Rust nodes
horus run sensor.rs controller.py visualizer.py
# Mix Rust and Python
horus run lidar_driver.rs planner.py motor_control.rs
All nodes in the same horus run session automatically communicate via shared memory!
Example - Distributed System:
# sensor_node.py
import horus
def sensor_tick(node):
data = read_lidar() # Your sensor code
node.send("lidar_data", data)
sensor = horus.Node(name="lidar", pubs="lidar_data", tick=sensor_tick)
horus.run(sensor)
# controller_node.py
import horus
def control_tick(node):
if node.has_msg("lidar_data"):
data = node.get("lidar_data")
cmd = compute_control(data)
node.send("motor_cmd", cmd)
controller = horus.Node(
name="controller",
subs="lidar_data",
pubs="motor_cmd",
tick=control_tick
)
horus.run(controller)
# Run both in separate processes
horus run sensor_node.py controller_node.py
Benefits:
- Process isolation: One crash doesn't kill everything
- Multi-language: Mix Python and Rust nodes in the same application
- Parallel execution: True multicore utilization
- Zero configuration: Shared memory IPC automatically set up
Complete Example: All Features Together
import horus
import time
def sensor_tick(node):
"""High-frequency sensor (100Hz)"""
imu = {"accel_x": 1.0, "accel_y": 0.0, "accel_z": 9.8}
node.send("imu_data", imu)
age = node.get_message_age("imu_data")
node.log_info(f"Published IMU (age: {age:.3f}s)")
def control_tick(node):
"""Medium-frequency control (50Hz)"""
if node.has_msg("imu_data"):
# Check for stale data
if node.is_stale("imu_data", max_age=0.05):
node.log_warning("Stale IMU data!")
return
imu = node.get("imu_data")
cmd = {"linear": 1.0, "angular": 0.0}
node.send("cmd_vel", cmd)
def logger_tick(node):
"""Low-frequency logging (10Hz)"""
if node.has_msg("cmd_vel"):
msg, timestamp = node.get_with_timestamp("cmd_vel")
latency = (time.time() - timestamp) * 1000
node.log_info(f"Command latency: {latency:.1f}ms")
# Create nodes
sensor = horus.Node(name="imu", pubs="imu_data", tick=sensor_tick)
controller = horus.Node(name="ctrl", subs="imu_data", pubs="cmd_vel", tick=control_tick)
logger = horus.Node(name="log", subs="cmd_vel", tick=logger_tick)
# Configure with different rates and priorities
scheduler = horus.Scheduler()
scheduler.add(sensor, priority=0, logging=True, rate_hz=100.0)
scheduler.add(controller, priority=1, logging=False, rate_hz=50.0)
scheduler.add(logger, priority=2, logging=True, rate_hz=10.0)
scheduler.run(duration=5.0)
# Check statistics
stats = scheduler.get_node_stats("imu")
print(f"Sensor: {stats['total_ticks']} ticks in 5 seconds")
Network Communication
HORUS Python supports network communication for distributed multi-machine systems. Hub, Link, and Router all work transparently over the network.
Hub Network Endpoints
Add an endpoint parameter to communicate over the network:
from horus import Hub, CmdVel
# Local (shared memory) - default
local_hub = Hub(CmdVel)
# Network (UDP direct)
network_hub = Hub(CmdVel, endpoint="[email protected]:8000")
# Router (TCP broker for WAN/NAT traversal)
router_hub = Hub(CmdVel, endpoint="cmdvel@router")
Endpoint Syntax:
"topic"- Local shared memory (~500ns latency)"topic@host:port"- Direct UDP (<50μs latency)"topic@router"- Router broker (auto-discovery on localhost:7777)"[email protected]:7777"- Router broker at specific address
Hub Methods
Core Operations:
| Method | Description |
|---|---|
hub.send(msg) | Send a single message |
hub.recv() | Receive one message (returns None if empty) |
hub.topic() | Get the topic name |
hub.transport_type() | Get transport type ("shm", "udp", "router") |
Batch Operations (High-Performance):
| Method | Description |
|---|---|
hub.send_many(msgs) | Send multiple messages at once, returns count sent |
hub.recv_many(n) | Receive up to n messages in batch |
hub.drain() | Receive all available messages (up to 1000) |
hub.view() | Zero-copy raw bytes (GenericMessage only) |
Batch Example:
from horus import Hub, CmdVel
# === Send multiple messages at once ===
hub = Hub(CmdVel)
commands = [
CmdVel(linear=1.0, angular=0.0),
CmdVel(linear=1.5, angular=0.2),
CmdVel(linear=0.0, angular=0.5),
]
sent = hub.send_many(commands)
print(f"Sent {sent}/{len(commands)} messages")
# === Receive up to N messages ===
messages = hub.recv_many(10) # Get up to 10 messages
for msg in messages:
print(f"linear={msg.linear}, angular={msg.angular}")
# === Drain all available messages ===
all_msgs = hub.drain() # Get everything pending
print(f"Drained {len(all_msgs)} messages")
Zero-Copy Raw Access (Generic Hubs Only):
from horus import Hub
import msgpack # pip install msgpack
# For custom/large data where you control serialization
hub = Hub("sensor_data") # Generic (untyped) topic
# view() returns raw MessagePack bytes without JSON overhead
if raw := hub.view():
# Parse with your own logic (msgpack, struct, numpy, etc.)
data = msgpack.unpackb(raw)
print(f"Raw data: {data}")
# Note: view() raises TypeError for typed hubs (CmdVel, etc.)
# because they're already optimized for direct field access
Performance Tips:
- Use
send_many()when you have multiple messages queued (1 GIL release vs N) - Use
drain()when processing bursts of data - Use
view()for custom serialization of large blobs (avoids JSON round-trip) - Batch operations release the GIL once for all messages, not per-message
Generic Hub Methods
When you create a Hub with a string topic (instead of a typed class), you get a generic hub that accepts any serializable data:
from horus import Hub
# Generic hub (string topic = dynamic typing)
hub = Hub("my.topic")
# Typed hub (class = static typing, better performance)
typed_hub = Hub(CmdVel)
Generic Hub Methods:
| Method | Description |
|---|---|
send(obj) | Send any JSON-serializable object (dict, list, str, int, float) |
recv() | Receive and deserialize to Python object |
send_bytes(data) | Send raw bytes (up to 4KB) |
view() | Receive raw bytes without deserialization |
send_numpy(arr) | Send NumPy array as bytes |
send_with_metadata(data, meta) | Send bytes with string metadata |
recv_with_metadata() | Receive bytes and metadata tuple |
is_generic | Property: True if this is a generic hub |
Example: JSON-Serializable Objects
from horus import Hub
pub = Hub("sensor.data")
sub = Hub("sensor.data")
# Send dict, list, or any JSON-serializable data
pub.send({"temperature": 25.5, "humidity": 60.0})
pub.send([1.0, 2.0, 3.0, 4.0])
pub.send("status: OK")
# Receive (returns Python object)
msg = sub.recv() # {"temperature": 25.5, "humidity": 60.0}
Example: Raw Bytes
from horus import Hub
pub = Hub("binary.data")
sub = Hub("binary.data")
# Send raw bytes (custom binary protocol)
pub.send_bytes(b'\x01\x02\x03\x04')
# Receive raw bytes (no deserialization)
raw = sub.view() # b'\x01\x02\x03\x04'
Example: NumPy Arrays
from horus import Hub
import numpy as np
pub = Hub("numpy.data")
sub = Hub("numpy.data")
# Send NumPy array
arr = np.array([1.0, 2.0, 3.0], dtype=np.float32)
pub.send_numpy(arr)
# Receive and reconstruct
raw = sub.view()
received = np.frombuffer(raw, dtype=np.float32)
Example: Bytes with Metadata
from horus import Hub
import json
pub = Hub("tagged.data")
sub = Hub("tagged.data")
# Send with type hint metadata
data = json.dumps({"x": 1.0, "y": 2.0}).encode()
pub.send_with_metadata(data, "json")
# Receive with metadata
payload, metadata = sub.recv_with_metadata()
if metadata == "json":
obj = json.loads(payload)
Performance Comparison:
| Method | Latency | Use Case |
|---|---|---|
Typed (Hub(CmdVel)) | ~3μs | Production, high-frequency |
send() / recv() | ~50μs | Dynamic schemas, prototyping |
send_bytes() / view() | ~10μs | Custom binary protocols |
send_numpy() | ~15μs | Array data |
Link Class (Point-to-Point)
Link provides high-performance point-to-point (SPSC) communication - ~30% faster than Hub for single producer/consumer patterns.
Key Architecture: Link uses a single-slot design (not a ring buffer):
- Always stores exactly one message (the latest value)
- New messages overwrite old unread messages
- Perfect for sensors where only the latest reading matters
send()never blocks - it always succeeds immediately
from horus import Link, CmdVel
# === MACHINE 1 (Producer) ===
# Connects to consumer at 192.168.1.20:9000
output = Link.producer(CmdVel, "[email protected]:9000")
output.send(CmdVel(linear=1.0, angular=0.5))
# === MACHINE 2 (Consumer) ===
# Listens on port 9000
input_link = Link.consumer(CmdVel, "[email protected]:9000")
if msg := input_link.recv():
print(f"Received: {msg}")
Link Methods:
| Method | Description |
|---|---|
Link.producer(type, endpoint) | Create producer (connects to consumer) |
Link.consumer(type, endpoint) | Create consumer (listens for producer) |
link.send(msg) | Send a message |
link.recv() | Receive a message (returns None if empty) |
link.topic() | Get the topic name |
Local Link (Shared Memory):
# No "@" in endpoint = local shared memory
producer = Link.producer(CmdVel, "motor_commands")
consumer = Link.consumer(CmdVel, "motor_commands")
producer.send(CmdVel(linear=1.0, angular=0.0))
msg = consumer.recv() # ~248ns latency
Router Client (WAN/NAT Traversal)
For communication across networks, through NAT, or for large-scale deployments, use the Router:
from horus import RouterClient, Hub, CmdVel
# Create router client for explicit connection management
router = RouterClient("192.168.1.100", 7777)
# Build endpoints through the router
cmd_endpoint = router.endpoint("cmdvel") # Returns "[email protected]:7777"
pose_endpoint = router.endpoint("pose")
# Use endpoints with Hub
hub = Hub(CmdVel, endpoint=cmd_endpoint)
# Router properties
print(f"Address: {router.address}") # "192.168.1.100:7777"
print(f"Connected: {router.is_connected}") # True
print(f"Topics: {router.topics}") # ["cmdvel", "pose"]
print(f"Uptime: {router.uptime_seconds}s")
Helper Functions:
from horus import default_router_endpoint, router_endpoint
# Default router (localhost:7777)
ep1 = default_router_endpoint("cmdvel") # "cmdvel@router"
# Custom router address
ep2 = router_endpoint("cmdvel", "192.168.1.100", 7777) # "[email protected]:7777"
Router Server (for testing):
from horus import RouterServer
# Start a local router (for development/testing)
server = RouterServer(port=7777)
server.start()
# For production, use CLI instead:
# $ horus router start --port 7777
When to Use What
| Transport | Latency | Use Case |
|---|---|---|
| Hub (local) | ~500ns | Same-machine pub/sub |
| Link (local) | ~248ns | Same-machine point-to-point (30% faster) |
| Hub (network) | <50μs | Multi-machine pub/sub on LAN |
| Link (network) | 5-15μs | Multi-machine point-to-point (3-10x faster than Hub) |
| Router | 10-50ms | WAN, NAT traversal, cloud deployments |
Multi-Machine Example
# === ROBOT (192.168.1.50) ===
from horus import Hub, Link, CmdVel, LaserScan
# Local: Critical flight control (ultra-fast)
imu_link = Link.consumer(ImuData, "imu") # 248ns
# Network: Telemetry to ground station
telemetry = Hub(DroneStatus, endpoint="[email protected]:8000")
# Network: Commands from ground station
commands = Hub(Command, endpoint="[email protected]:8001")
# === GROUND STATION (192.168.1.100) ===
from horus import Hub
# Receive telemetry from robot
telemetry_sub = Hub(DroneStatus, endpoint="[email protected]:8000")
# Send commands to robot
command_pub = Hub(Command, endpoint="[email protected]:8001")
Integration with Python Ecosystem
NumPy Integration
import horus
import numpy as np
def process_array(node):
if node.has_msg("raw_data"):
data = node.get("raw_data")
# Convert to NumPy array
arr = np.array(data)
# Process with NumPy
result = np.fft.fft(arr)
node.send("fft_result", result.tolist())
processor = horus.Node(
subs="raw_data",
pubs="fft_result",
tick=process_array
)
OpenCV Integration
import horus
import cv2
import numpy as np
def process_image(node):
if node.has_msg("camera"):
img_data = node.get("camera")
# Convert to OpenCV format
img = np.array(img_data, dtype=np.uint8).reshape((480, 640, 3))
# Apply OpenCV processing
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
# Publish result
node.send("edges", edges.flatten().tolist())
vision = horus.Node(
subs="camera",
pubs="edges",
tick=process_image,
rate=30
)
scikit-learn Integration
import horus
from sklearn.linear_model import LinearRegression
import numpy as np
model = LinearRegression()
def train_model(node):
if node.has_msg("training_data"):
data = node.get("training_data")
X = np.array(data['features'])
y = np.array(data['labels'])
# Train model
model.fit(X, y)
score = model.score(X, y)
node.send("model_score", score)
trainer = horus.Node(
subs="training_data",
pubs="model_score",
tick=train_model
)
Advanced Patterns
State Management
import horus
class RobotState:
def __init__(self):
self.position = {"x": 0.0, "y": 0.0}
self.velocity = 0.0
self.last_update = 0
state = RobotState()
def update_state(node):
if node.has_msg("velocity"):
state.velocity = node.get("velocity")
if node.has_msg("position"):
state.position = node.get("position")
# Publish combined state
node.send("robot_state", {
"pos": state.position,
"vel": state.velocity
})
state_manager = horus.Node(
subs=["velocity", "position"],
pubs="robot_state",
tick=update_state
)
Rate Limiting
import horus
import time
class RateLimiter:
def __init__(self, min_interval):
self.min_interval = min_interval
self.last_send = 0
limiter = RateLimiter(min_interval=0.1) # 100ms minimum
def rate_limited_publish(node):
current_time = time.time()
if current_time - limiter.last_send >= limiter.min_interval:
node.send("output", "data")
limiter.last_send = current_time
node = horus.Node(
pubs="output",
tick=rate_limited_publish,
rate=100 # Node runs at 100Hz, but publishes at max 10Hz
)
Error Handling
import horus
def safe_processing(node):
try:
if node.has_msg("input"):
data = node.get("input")
result = risky_operation(data)
node.send("output", result)
except Exception as e:
node.send("errors", str(e))
print(f"Error: {e}")
processor = horus.Node(
subs="input",
pubs=["output", "errors"],
tick=safe_processing
)
Performance Tips
1. Use Per-Node Rate Control
# NEW: Use scheduler with per-node rates for optimal performance
scheduler = horus.Scheduler()
# High-frequency sensor (100Hz)
scheduler.add(sensor, priority=0, rate_hz=100.0)
# Medium-frequency control (50Hz)
scheduler.add(controller, priority=1, rate_hz=50.0)
# Low-frequency logging (10Hz)
scheduler.add(logger, priority=2, rate_hz=10.0)
scheduler.run()
# Monitor performance with get_node_stats()
stats = scheduler.get_node_stats("sensor")
print(f"Sensor executed {stats['total_ticks']} ticks")
2. Monitor Message Staleness
def control_tick(node):
# Skip stale data to maintain real-time performance
if node.is_stale("sensor_data", max_age=0.1):
node.log_warning("Skipping stale sensor data")
return
# Process fresh data only
data = node.get("sensor_data")
process(data)
3. Use Dicts for Messages
# Send messages as Python dicts (automatically serialized to JSON)
cmd = {"linear": 1.5, "angular": 0.8}
node.send("cmd_vel", cmd)
# Check message age using node method
if node.get_message_age("cmd_vel") > 0.1:
print("Message is stale")
4. Batch Processing
# === Node API (for tick functions) ===
def batch_processor(node):
# Process all available messages at once
messages = node.get_all("input")
if messages:
results = [process(msg) for msg in messages]
for result in results:
node.send("output", result)
# === Hub API (direct usage) ===
from horus import Hub, CmdVel
hub = Hub(CmdVel)
# Receive in batches (1 GIL release for N messages)
messages = hub.recv_many(100) # Up to 100 messages
all_msgs = hub.drain() # All available messages
# Send in batches (1 GIL release for N messages)
commands = [CmdVel(linear=v, angular=0) for v in range(10)]
sent = hub.send_many(commands) # Returns count sent
5. Keep tick() Fast
# GOOD: Fast tick
def good_tick(node):
if node.has_msg("input"):
data = node.get("input")
result = quick_operation(data)
node.send("output", result)
# BAD: Slow tick
def bad_tick(node):
time.sleep(1) # Don't block!
data = requests.get("http://api.example.com") # Don't do I/O!
6. Offload Heavy Processing
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def heavy_processing_node(node):
if node.has_msg("input"):
data = node.get("input")
# Offload to thread pool
future = executor.submit(expensive_operation, data)
# Don't block - check result later or use callback
7. Use Multiprocess for CPU-Intensive Tasks
# Isolate heavy processing in separate processes
horus run sensor.py heavy_vision.py light_controller.py
# Each node gets its own CPU core
Development
Building from Source
# Debug build (fast compile, slow runtime)
cd horus_py
maturin develop
# Release build (slow compile, fast runtime)
maturin develop --release
# Build wheel for distribution
maturin build --release
Running Tests
# Install test dependencies
pip install pytest
# Run all tests
pytest tests/
# Run specific feature tests
horus run tests/test_rate_control.py # Phase 1: Per-node rates
horus run tests/test_timestamps.py # Phase 2: Timestamps
horus run tests/test_typed_messages.py # Phase 3: Typed messages
# With coverage
pytest --cov=horus tests/
# Test multiprocess execution (Phase 4)
horus run tests/multiprocess_publisher.py tests/multiprocess_subscriber.py
Mock Mode
HORUS Python includes a mock mode for testing without Rust bindings:
# If Rust bindings aren't available, automatically falls back to mock
# You'll see: "Warning: Rust bindings not available. Running in mock mode."
# Use for unit testing Python logic without HORUS running
Debugging Tips
# Enable logging for specific nodes
scheduler = horus.Scheduler()
scheduler.add(my_node, priority=0, logging=True) # Enable logs
# Check node statistics
stats = scheduler.get_node_stats("my_node")
print(f"Ticks: {stats['total_ticks']}, Errors: {stats['errors_count']}")
# Monitor message timestamps
msg, timestamp = node.get_with_timestamp("topic")
age = time.time() - timestamp
print(f"Message age: {age*1000:.1f}ms")
Interoperability
With Rust Nodes
Important: For cross-language communication, use typed hubs by passing a message type to Hub().
Cross-Language with Typed Hubs
# Python node with typed hub
from horus import Hub, CmdVel
cmd_hub = Hub(CmdVel) # Typed hub
cmd_hub.send(CmdVel(linear=1.0, angular=0.5))
// Rust node receives
use horus::prelude::*;
use horus::prelude::*; // Provides cmd_vel::CmdVel;
let hub = Hub::<CmdVel>::new("cmdvel")?;
if let Some(cmd) = hub.recv(&mut None) {
println!("Got: linear={}, angular={}", cmd.linear, cmd.angular);
}
Generic Hub (String Topics)
# Generic Hub - for custom topics
from horus import Hub
hub = Hub("my_topic") # Pass string for generic hub
hub.send({"linear": 1.0, "angular": 0.5}) # Uses JSON serialization
Typed hubs: Use Hub(CmdVel), Hub(Pose2D) for cross-language communication.
See Python Message Library for details.
Common Patterns
Producer-Consumer
# Producer
producer = horus.Node(
pubs="queue",
tick=lambda n: n.send("queue", generate_work())
)
# Consumer
consumer = horus.Node(
subs="queue",
tick=lambda n: process_work(n.get("queue")) if n.has_msg("queue") else None
)
horus.run(producer, consumer)
Request-Response
def request_node(node):
node.send("requests", {"id": 1, "query": "data"})
def response_node(node):
if node.has_msg("requests"):
req = node.get("requests")
response = handle_request(req)
node.send("responses", response)
req = horus.Node(pubs="requests", tick=request_node)
res = horus.Node(subs="requests", pubs="responses", tick=response_node)
Periodic Tasks
import time
class PeriodicTask:
def __init__(self, interval):
self.interval = interval
self.last_run = 0
task = PeriodicTask(interval=5.0) # Every 5 seconds
def periodic_tick(node):
current = time.time()
if current - task.last_run >= task.interval:
node.send("periodic", "task_executed")
task.last_run = current
node = horus.Node(pubs="periodic", tick=periodic_tick, rate=10)
Troubleshooting
Import Errors
# If you see: ModuleNotFoundError: No module named 'horus'
# Rebuild and install:
cd horus_py
maturin develop --release
Slow Performance
# Use release build (not debug)
maturin develop --release
# Check tick rate isn't too high
node = horus.Node(tick=fn, rate=30) # 30Hz is reasonable
Memory Issues
# Avoid accumulating data in closures
# BAD:
all_data = []
def bad_tick(node):
all_data.append(node.get("input")) # Memory leak!
# GOOD:
def good_tick(node):
data = node.get("input")
process_and_discard(data) # Process immediately
Monitor Integration and Logging
Current Limitations
Python nodes currently do NOT appear in the HORUS monitor logs.
The Python bindings pass None for the NodeInfo context parameter, which bypasses the logging system:
# Internal implementation (simplified)
hub.send(msg, None) # No context = no logging
hub.recv(&mut None) # No context = no logging
What this means:
- Python nodes communicate via shared memory
- All message passing functionality works
- Pub/sub events don't appear in monitor logs
- Cannot monitor Python node messages in real-time via monitor
Scheduler Logging Flag
The scheduler.add(node, priority, logging=True) flag currently enables Python-level logging (via print statements), not monitor integration:
scheduler = horus.Scheduler()
scheduler.add(sensor, priority=0, logging=True) # Enables console logging
scheduler.add(controller, priority=1, logging=False) # No console output
Console logging output:
[INFO] sensor: Published to topic 'lidar_data'
[INFO] sensor: Received from topic 'commands'
Monitoring Python Nodes
Since Python nodes don't integrate with the monitor logging system, use these alternatives:
- Node-level logging methods:
def tick(node):
node.log_info("Processing sensor data")
node.log_warning("Sensor reading is stale")
node.log_error("Failed to process data")
node.log_debug("Debug information")
# These print to console, not monitor
- Manual topic monitoring:
def tick(node):
if node.has_msg("input"):
data = node.get("input")
print(f"[{node.name}] Received: {data}")
node.send("output", result)
print(f"[{node.name}] Published: {result}")
- Node statistics:
scheduler = horus.Scheduler()
scheduler.add(node, priority=0, logging=True)
scheduler.run(duration=10)
# Get stats after running
stats = scheduler.get_node_stats("my_node")
print(f"Ticks: {stats['total_ticks']}")
print(f"Errors: {stats['errors_count']}")
Future Improvements
Monitor integration for Python nodes is planned for a future release. This will require:
- Passing
NodeInfocontext through Python bindings - Implementing
LogSummaryfor Python message types - Enabling monitor to read Python node logs from
/dev/shm/horus_logs
See Also
- Network Communication - Detailed network guide (Rust-focused, concepts apply to Python)
- Examples - More code examples
- Core Concepts - Understanding HORUS architecture
- Monitor - Real-time monitoring and visualization
- Python Message Library - Typed message classes
- Multi-Language Support - Cross-language communication
- Performance - Optimization guide
Remember: With HORUS Python, you focus on what your robot does, not how the framework works!