The Redis IPC backend publishes metrics and video frames to Redis pub/sub channels in real-time. This enables distributed systems integration, multiple concurrent consumers, and decoupled architectures where clients subscribe to data streams without direct server connections.
Overview
When using the Redis IPC backend, physiology_server publishes to five Redis channels:
- Core Metrics ({prefix}:core_metrics): Pulse, breathing, and other vitals
- Edge Metrics ({prefix}:edge_metrics): On-device metrics from Edge
- Status ({prefix}:status): System status updates
- Frames ({prefix}:frame): JPEG-compressed video frames (base64-encoded)
- Recording State ({prefix}:recording_state): Recording start/stop notifications
All messages use the same JSON envelope format as File IPC, ensuring consistency across backends.
Prerequisites
Redis Server
A running Redis server is required. Install Redis:
Ubuntu/Debian:
sudo apt update
sudo apt install redis-server
sudo systemctl start redis-server
macOS:
brew install redis
brew services start redis
Docker:
docker run -d -p 6379:6379 redis:latest
Python Redis Client (for consumers)
Configuration
Command Line Usage
Enable Redis IPC backend with direct camera input:
physiology_server \
--streaming_backend=redis \
--use_camera \
--redis_config_path=/path/to/config.json \
--also_log_to_stderr
Required Flags:
- --streaming_backend=redis: Select Redis IPC backend
- --use_camera: Enable direct camera input (required for alternative IPC backends)
Optional Flags:
- --redis_config_path: Path to JSON configuration file
- Individual configuration flags (see below)
Configuration File
Generate a default configuration file:
physiology_server --save_default_redis_ipc_config
This creates default_redis_ipc_config.json:
{
"host": "localhost",
"port": 6379,
"password": "",
"database": 0,
"key_prefix": "physiology",
"connection_timeout_ms": 5000,
"operation_timeout_ms": 1000,
"enable_debug_logging": false,
"max_bulk_string_bytes": 10485760,
"max_array_elements": 1000
}
Configuration Options
| Option | Type | Default | Description |
| host | string | "localhost" | Redis server hostname or IP |
| port | int | 6379 | Redis server port |
| password | string | "" | Redis authentication password (empty = no auth) |
| database | int | 0 | Redis database number (0-15) |
| key_prefix | string | "physiology" | Prefix for all Redis keys/channels |
| connection_timeout_ms | int | 5000 | Connection timeout in milliseconds |
| operation_timeout_ms | int | 1000 | Operation timeout in milliseconds |
| use_pubsub | bool | true | Use pub/sub mode (vs. key-value storage) |
| enable_debug_logging | bool | false | Enable verbose logging |
| max_bulk_string_bytes | int64 | 10485760 | Maximum message size (10 MB, security limit) |
| max_array_elements | int | 1000 | Maximum array elements (security limit) |
Command Line Overrides
Individual options can be overridden:
physiology_server \
--streaming_backend=redis \
--use_camera \
--redis_host=192.168.1.100 \
--redis_port=6379 \
--redis_key_prefix=myapp \
--redis_use_pubsub=true
Channel Names
Channels follow the pattern {prefix}:{type}:
| Channel | Description |
| physiology:core_metrics | Core vitals (default prefix) |
| physiology:edge_metrics | Edge metrics |
| physiology:status | Status updates |
| physiology:frame | Video frames |
| physiology:recording_state | Recording state changes |
Change the prefix with --redis_key_prefix:
--redis_key_prefix=app1 # Channels: app1:core_metrics, app1:edge_metrics, etc.
Message Format
JSON Envelope Structure
All messages use the same envelope format as File IPC:
{
"type": "core_metrics",
"timestamp": 1234567890123456,
"payload": { ... }
}
Here, "type" is always one of {"core_metrics", "edge_metrics", "status", "frame"}.
Core Metrics Example
{
"type": "core_metrics",
"timestamp": 1234567890123456,
"payload": {
"pulse": {
"strict": {"value": 72.5, "confidence": 0.95}
},
"breathing": {
"strict": {"value": 16.2, "confidence": 0.89}
}
}
}
Frame Example
{
"type": "frame",
"timestamp": 1234567890123459,
"payload": {
"data": "/9j/4AAQSkZJRgABAQEAYABgAAD...",
"width": 640,
"height": 480,
"format": "jpeg"
}
}
Frame data is base64-encoded JPEG.
Recording State Example
Published when recording starts or stops via the SetRecording gRPC endpoint:
{
"type": "recording_state",
"timestamp": 1234567890123460,
"payload": {
"recording": true
}
}
The recording field is a boolean indicating whether recording is active. This enables clients to:
- Detect when recording starts/stops
- Trigger session-specific file naming
- Synchronize data collection with recording state
- Implement automatic session management
Python Client Example
Basic Subscriber
Subscribe to metrics using redis-py:
import redis
import json
client = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = client.pubsub()
pubsub.subscribe('physiology:core_metrics', 'physiology:edge_metrics', 'physiology:recording_state')
for message in pubsub.listen():
if message['type'] == 'message':
envelope = json.loads(message['data'])
print(f"Received {envelope['type']} at {envelope['timestamp']}")
print(f"Payload: {envelope['payload']}")
Multi-Channel Subscriber
Handle all channel types:
import redis
import json
import base64
import numpy as np
import cv2
def handle_message(envelope):
"""Process received message based on type."""
msg_type = envelope['type']
timestamp = envelope['timestamp']
payload = envelope['payload']
if msg_type == 'core_metrics':
pulse = payload.get('pulse', {}).get('strict', {}).get('value')
print(f"Pulse: {pulse} BPM")
elif msg_type == 'edge_metrics':
eda = payload.get('eda')
print(f"EDA: {eda}")
elif msg_type == 'frame':
frame_data = base64.b64decode(payload['data'])
nparr = np.frombuffer(frame_data, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
print(f"Received frame: {payload['width']}x{payload['height']}")
elif msg_type == 'status':
print(f"Status: {payload.get('message')}")
elif msg_type == 'recording_state':
is_recording = payload.get('recording')
print(f"Recording state changed: {'started' if is_recording else 'stopped'}")
client = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = client.pubsub()
pubsub.subscribe(
'physiology:core_metrics',
'physiology:edge_metrics',
'physiology:status',
'physiology:frame',
'physiology:recording_state'
)
for message in pubsub.listen():
if message['type'] == 'message':
envelope = json.loads(message['data'])
handle_message(envelope)
Complete Example
A full-featured example is available in the samples directory:
- samples/alt_ipc_example_on_prem_client.py: Supports both File and Redis backends
- samples/redis_ipc_metrics_saving_client.py: Saves Redis metrics to files using SmartSpectra-IPC
Async Redis for Command Queue Processing
⚠️ Important for Production: When implementing command queue consumers (e.g., monitoring {prefix}:command_queue for UI commands), use redis.asyncio instead of blocking Redis operations.
Why async matters:
- Blocking operations (BRPOP with synchronous sockets) can cause UI commands to be delayed or require multiple button presses
- Async operations process commands instantly without blocking other tasks
- Critical for responsive UI interactions (Start/Stop Recording, etc.)
Example: Async Command Queue Consumer
import asyncio
import json
import redis.asyncio as redis
async def monitor_command_queue():
"""Monitor Redis command queue asynchronously."""
client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True,
socket_connect_timeout=2.0,
socket_keepalive=True,
)
command_queue_key = 'physiology:metrics:command_queue'
try:
while True:
result = await client.brpop(command_queue_key, timeout=1)
if result is None:
continue
_, command_json = result
command = json.loads(command_json)
requested_recording = command.get('recording', False)
print(f"Received command: recording={requested_recording}")
finally:
await client.close()
asyncio.run(monitor_command_queue())
Benefits: Instant command processing, responsive UI interactions, no event loop blocking.
Reference: See samples/redis_ipc_metrics_saving_client.py for full implementation.
Authentication and Security
Password Authentication
Configure Redis with password:
# In redis.conf
requirepass your_secure_password
Update configuration:
{
"password": "your_secure_password"
}
Or via command line:
--redis_password=your_secure_password
Network Security
Bind to Specific Interface:
# In redis.conf - only allow localhost connections
bind 127.0.0.1
Use TLS/SSL (Redis 6+):
# In redis.conf
tls-port 6380
tls-cert-file /path/to/cert.pem
tls-key-file /path/to/key.pem
Message Size Limits
Security limits prevent denial-of-service attacks:
- max_bulk_string_bytes: Maximum message size (default: 10 MB)
- max_array_elements: Maximum array elements (default: 1000)
Adjust if needed for your use case:
--redis_max_bulk_string_bytes=20971520 # 20 MB
Performance Considerations
Pub/Sub vs. Key-Value
Pub/Sub Mode (recommended, default):
- Real-time streaming to multiple subscribers
- Messages not persisted
- Low latency
- Subscribers receive only messages published while connected
Key-Value Mode:
- Messages stored in Redis
- Supports historical data retrieval
- Higher memory usage
- Requires TTL management
Enable key-value mode:
Network Latency
- Local Redis: Sub-millisecond latency
- Remote Redis: Network latency applies
- Connection Pooling: Reused connections reduce overhead
Frame Rate
High frame rates increase Redis throughput:
| Frame Rate | Approx. Bandwidth | Recommendation |
| 30 FPS | ~10-20 MB/s | Suitable for local Redis |
| 60 FPS | ~20-40 MB/s | Use local or fast network |
| 120 FPS | ~40-80 MB/s | Local Redis strongly recommended |
Multiple Subscribers
Redis pub/sub scales to many subscribers:
- Each subscriber receives own copy of messages
- No impact on publisher performance
- Redis handles fan-out efficiently
Troubleshooting
Connection Errors
Symptom: Failed to connect to Redis or Connection refused
Solutions:
# Verify Redis is running
redis-cli ping # Should return "PONG"
# Check Redis port
netstat -an | grep 6379
# Test connection
redis-cli -h localhost -p 6379
# Check firewall
sudo ufw allow 6379/tcp
Authentication Failures
Symptom: NOAUTH Authentication required
Solutions:
- Verify password is correct in configuration
- Check requirepass in redis.conf
Test with redis-cli:
redis-cli -h localhost -p 6379 -a your_password
Timeout Errors
Symptom: Operation timed out
Solutions:
Increase timeout values:
--redis_connection_timeout_ms=10000
--redis_operation_timeout_ms=5000
- Check network latency: ping your-redis-host
- Verify Redis server is not overloaded
Missing Messages
Symptom: Subscriber doesn't receive messages
Solutions:
- Verify channel names match (including prefix)
- Ensure subscriber is connected before publisher starts
- Check Redis logs: tail -f /var/log/redis/redis-server.log
Test with redis-cli:
# In one terminal (subscribe)
redis-cli
SUBSCRIBE physiology:core_metrics
# In another terminal (publish test message)
redis-cli
PUBLISH physiology:core_metrics '{"test":"message"}'
High Memory Usage
Symptom: Redis memory grows continuously
Solutions:
Use Cases
Real-Time Dashboard
Multiple clients can subscribe to live metrics:
# Server
physiology_server --streaming_backend=redis --use_camera
# Dashboard 1 (web interface)
python web_dashboard.py --redis_host=localhost
# Dashboard 2 (mobile app)
./mobile_app --redis_host=localhost
# Analytics (background processor)
python analytics_worker.py --redis_host=localhost
Distributed System Integration
Decouple physiology server from consumers:
┌─────────────────────┐
│ physiology_server │
│ (publishes to │
│ Redis) │
└──────────┬──────────┘
│
▼
┌──────────────┐
│ Redis Server │
└──────┬───────┘
│
┌────┴────┬────────────┬───────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌──────┐ ┌──────────┐ ┌─────────┐
│ Web UI │ │ API │ │ Database │ │ Alerts │
│ │ │ │ │ Logger │ │ Service │
└────────┘ └──────┘ └──────────┘ └─────────┘
Cloud Deployment
Redis enables separation of processing and storage:
# On edge device (local processing)
physiology_server \
--streaming_backend=redis \
--redis_host=cloud.redis.host \
--redis_password=$REDIS_PASSWORD \
--use_camera
# On cloud (storage and analytics)
python cloud_processor.py --redis_host=localhost
Multi-Tenant Systems
Isolate tenants using key prefixes:
# Tenant A
physiology_server --streaming_backend=redis --redis_key_prefix=tenant_a
# Tenant B
physiology_server --streaming_backend=redis --redis_key_prefix=tenant_b
# Tenant A client subscribes to: tenant_a:core_metrics
# Tenant B client subscribes to: tenant_b:core_metrics
Advanced Topics
Pattern Subscriptions
Subscribe to multiple channels with wildcards:
pubsub.psubscribe('*:core_metrics')
pubsub.psubscribe('physiology:*')
Message Filtering
Filter messages on the client side:
for message in pubsub.listen():
if message['type'] == 'message':
envelope = json.loads(message['data'])
if envelope['type'] == 'core_metrics':
pulse = envelope['payload'].get('pulse', {})
confidence = pulse.get('strict', {}).get('confidence', 0)
if confidence >= 0.9:
process_pulse(pulse)
Monitoring and Metrics
Monitor Redis pub/sub activity:
# Active clients
redis-cli CLIENT LIST | grep pubsub
# Pub/sub channels
redis-cli PUBSUB CHANNELS
# Subscribers per channel
redis-cli PUBSUB NUMSUB physiology:core_metrics
# Monitor all commands (debugging)
redis-cli MONITOR
See Also