SmartSpectra C++ SDK
Measure human vitals from video with SmartSpectra C++ SDK.
Loading...
Searching...
No Matches
Redis IPC Backend

The Redis IPC backend publishes metrics and video frames to Redis pub/sub channels in real-time. This enables distributed systems integration, multiple concurrent consumers, and decoupled architectures where clients subscribe to data streams without direct server connections.

Overview

When using the Redis IPC backend, physiology_server publishes to five Redis channels:

  • Core Metrics ({prefix}:core_metrics): Pulse, breathing, and other vitals
  • Edge Metrics ({prefix}:edge_metrics): On-device metrics from Edge
  • Status ({prefix}:status): System status updates
  • Frames ({prefix}:frame): JPEG-compressed video frames (base64-encoded)
  • Recording State ({prefix}:recording_state): Recording start/stop notifications

All messages use the same JSON envelope format as File IPC, ensuring consistency across backends.

Prerequisites

Redis Server

A running Redis server is required. Install Redis:

Ubuntu/Debian:

sudo apt update
sudo apt install redis-server
sudo systemctl start redis-server

macOS:

brew install redis
brew services start redis

Docker:

docker run -d -p 6379:6379 redis:latest

Python Redis Client (for consumers)

pip install redis

Configuration

Command Line Usage

Enable Redis IPC backend with direct camera input:

physiology_server \
--streaming_backend=redis \
--use_camera \
--redis_config_path=/path/to/config.json \
--also_log_to_stderr

Required Flags:

  • --streaming_backend=redis: Select Redis IPC backend
  • --use_camera: Enable direct camera input (required for alternative IPC backends)

Optional Flags:

  • --redis_config_path: Path to JSON configuration file
  • Individual configuration flags (see below)

Configuration File

Generate a default configuration file:

physiology_server --save_default_redis_ipc_config

This creates default_redis_ipc_config.json:

{
"host": "localhost",
"port": 6379,
"password": "",
"database": 0,
"key_prefix": "physiology",
"connection_timeout_ms": 5000,
"operation_timeout_ms": 1000,
"enable_debug_logging": false,
"max_bulk_string_bytes": 10485760,
"max_array_elements": 1000
}

Configuration Options

Option Type Default Description
host string "localhost" Redis server hostname or IP
port int 6379 Redis server port
password string "" Redis authentication password (empty = no auth)
database int 0 Redis database number (0-15)
key_prefix string "physiology" Prefix for all Redis keys/channels
connection_timeout_ms int 5000 Connection timeout in milliseconds
operation_timeout_ms int 1000 Operation timeout in milliseconds
use_pubsub bool true Use pub/sub mode (vs. key-value storage)
enable_debug_logging bool false Enable verbose logging
max_bulk_string_bytes int64 10485760 Maximum message size (10 MB, security limit)
max_array_elements int 1000 Maximum array elements (security limit)

Command Line Overrides

Individual options can be overridden:

physiology_server \
--streaming_backend=redis \
--use_camera \
--redis_host=192.168.1.100 \
--redis_port=6379 \
--redis_key_prefix=myapp \
--redis_use_pubsub=true

Channel Names

Channels follow the pattern {prefix}:{type}:

Channel Description
physiology:core_metrics Core vitals (default prefix)
physiology:edge_metrics Edge metrics
physiology:status Status updates
physiology:frame Video frames
physiology:recording_state Recording state changes

Change the prefix with --redis_key_prefix:

--redis_key_prefix=app1 # Channels: app1:core_metrics, app1:edge_metrics, etc.

Message Format

JSON Envelope Structure

All messages use the same envelope format as File IPC:

{
"type": "core_metrics",
"timestamp": 1234567890123456,
"payload": { ... }
}

Here, "type" is always one of {"core_metrics", "edge_metrics", "status", "frame"}.

Core Metrics Example

{
"type": "core_metrics",
"timestamp": 1234567890123456,
"payload": {
"pulse": {
"strict": {"value": 72.5, "confidence": 0.95}
},
"breathing": {
"strict": {"value": 16.2, "confidence": 0.89}
}
}
}

Frame Example

{
"type": "frame",
"timestamp": 1234567890123459,
"payload": {
"data": "/9j/4AAQSkZJRgABAQEAYABgAAD...",
"width": 640,
"height": 480,
"format": "jpeg"
}
}

Frame data is base64-encoded JPEG.

Recording State Example

Published when recording starts or stops via the SetRecording gRPC endpoint:

{
"type": "recording_state",
"timestamp": 1234567890123460,
"payload": {
"recording": true
}
}

The recording field is a boolean indicating whether recording is active. This enables clients to:

  • Detect when recording starts/stops
  • Trigger session-specific file naming
  • Synchronize data collection with recording state
  • Implement automatic session management

Python Client Example

Basic Subscriber

Subscribe to metrics using redis-py:

import redis
import json
# Connect to Redis
client = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = client.pubsub()
# Subscribe to channels
pubsub.subscribe('physiology:core_metrics', 'physiology:edge_metrics', 'physiology:recording_state')
# Listen for messages
for message in pubsub.listen():
if message['type'] == 'message':
envelope = json.loads(message['data'])
print(f"Received {envelope['type']} at {envelope['timestamp']}")
print(f"Payload: {envelope['payload']}")

Multi-Channel Subscriber

Handle all channel types:

import redis
import json
import base64
import numpy as np
import cv2
def handle_message(envelope):
"""Process received message based on type."""
msg_type = envelope['type']
timestamp = envelope['timestamp']
payload = envelope['payload']
if msg_type == 'core_metrics':
pulse = payload.get('pulse', {}).get('strict', {}).get('value')
print(f"Pulse: {pulse} BPM")
elif msg_type == 'edge_metrics':
eda = payload.get('eda')
print(f"EDA: {eda}")
elif msg_type == 'frame':
# Decode base64 JPEG frame
frame_data = base64.b64decode(payload['data'])
nparr = np.frombuffer(frame_data, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
print(f"Received frame: {payload['width']}x{payload['height']}")
elif msg_type == 'status':
print(f"Status: {payload.get('message')}")
elif msg_type == 'recording_state':
is_recording = payload.get('recording')
print(f"Recording state changed: {'started' if is_recording else 'stopped'}")
# Connect and subscribe
client = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = client.pubsub()
pubsub.subscribe(
'physiology:core_metrics',
'physiology:edge_metrics',
'physiology:status',
'physiology:frame',
'physiology:recording_state'
)
# Process messages
for message in pubsub.listen():
if message['type'] == 'message':
envelope = json.loads(message['data'])
handle_message(envelope)

Complete Example

A full-featured example is available in the samples directory:

  • samples/alt_ipc_example_on_prem_client.py: Supports both File and Redis backends
  • samples/redis_ipc_metrics_saving_client.py: Saves Redis metrics to files using SmartSpectra-IPC

Async Redis for Command Queue Processing

⚠️ Important for Production: When implementing command queue consumers (e.g., monitoring {prefix}:command_queue for UI commands), use redis.asyncio instead of blocking Redis operations.

Why async matters:

  • Blocking operations (BRPOP with synchronous sockets) can cause UI commands to be delayed or require multiple button presses
  • Async operations process commands instantly without blocking other tasks
  • Critical for responsive UI interactions (Start/Stop Recording, etc.)

Example: Async Command Queue Consumer

import asyncio
import json
import redis.asyncio as redis
async def monitor_command_queue():
"""Monitor Redis command queue asynchronously."""
# Create async Redis connection
client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True,
socket_connect_timeout=2.0,
socket_keepalive=True,
)
command_queue_key = 'physiology:metrics:command_queue'
try:
while True:
# BRPOP with timeout - truly async, doesn't block event loop
result = await client.brpop(command_queue_key, timeout=1)
if result is None:
# Timeout, continue
continue
# result is (key, value)
_, command_json = result
command = json.loads(command_json)
# Process command
requested_recording = command.get('recording', False)
print(f"Received command: recording={requested_recording}")
# Forward to physiology_server via gRPC or other mechanism
# await forward_command(requested_recording)
finally:
await client.close()
# Run the async function
asyncio.run(monitor_command_queue())

Benefits: Instant command processing, responsive UI interactions, no event loop blocking.

Reference: See samples/redis_ipc_metrics_saving_client.py for full implementation.

Authentication and Security

Password Authentication

Configure Redis with password:

# In redis.conf
requirepass your_secure_password

Update configuration:

{
"password": "your_secure_password"
}

Or via command line:

--redis_password=your_secure_password

Network Security

Bind to Specific Interface:

# In redis.conf - only allow localhost connections
bind 127.0.0.1

Use TLS/SSL (Redis 6+):

# In redis.conf
tls-port 6380
tls-cert-file /path/to/cert.pem
tls-key-file /path/to/key.pem

Message Size Limits

Security limits prevent denial-of-service attacks:

  • max_bulk_string_bytes: Maximum message size (default: 10 MB)
  • max_array_elements: Maximum array elements (default: 1000)

Adjust if needed for your use case:

--redis_max_bulk_string_bytes=20971520 # 20 MB

Performance Considerations

Pub/Sub vs. Key-Value

Pub/Sub Mode (recommended, default):

  • Real-time streaming to multiple subscribers
  • Messages not persisted
  • Low latency
  • Subscribers receive only messages published while connected

Key-Value Mode:

  • Messages stored in Redis
  • Supports historical data retrieval
  • Higher memory usage
  • Requires TTL management

Enable key-value mode:

--redis_use_pubsub=false

Network Latency

  • Local Redis: Sub-millisecond latency
  • Remote Redis: Network latency applies
  • Connection Pooling: Reused connections reduce overhead

Frame Rate

High frame rates increase Redis throughput:

Frame Rate Approx. Bandwidth Recommendation
30 FPS ~10-20 MB/s Suitable for local Redis
60 FPS ~20-40 MB/s Use local or fast network
120 FPS ~40-80 MB/s Local Redis strongly recommended

Multiple Subscribers

Redis pub/sub scales to many subscribers:

  • Each subscriber receives own copy of messages
  • No impact on publisher performance
  • Redis handles fan-out efficiently

Troubleshooting

Connection Errors

Symptom: Failed to connect to Redis or Connection refused

Solutions:

# Verify Redis is running
redis-cli ping # Should return "PONG"
# Check Redis port
netstat -an | grep 6379
# Test connection
redis-cli -h localhost -p 6379
# Check firewall
sudo ufw allow 6379/tcp

Authentication Failures

Symptom: NOAUTH Authentication required

Solutions:

  • Verify password is correct in configuration
  • Check requirepass in redis.conf
  • Test with redis-cli:

    redis-cli -h localhost -p 6379 -a your_password

Timeout Errors

Symptom: Operation timed out

Solutions:

  • Increase timeout values:

    --redis_connection_timeout_ms=10000
    --redis_operation_timeout_ms=5000
  • Check network latency: ping your-redis-host
  • Verify Redis server is not overloaded

Missing Messages

Symptom: Subscriber doesn't receive messages

Solutions:

  • Verify channel names match (including prefix)
  • Ensure subscriber is connected before publisher starts
  • Check Redis logs: tail -f /var/log/redis/redis-server.log
  • Test with redis-cli:

    # In one terminal (subscribe)
    redis-cli
    SUBSCRIBE physiology:core_metrics
    # In another terminal (publish test message)
    redis-cli
    PUBLISH physiology:core_metrics '{"test":"message"}'

High Memory Usage

Symptom: Redis memory grows continuously

Solutions:

  • Use pub/sub mode (default) - doesn't persist messages
  • If using key-value mode, configure TTL:

    redis-cli
    CONFIG SET maxmemory-policy allkeys-lru
    CONFIG SET maxmemory 2gb
  • Monitor memory: redis-cli INFO memory

Use Cases

Real-Time Dashboard

Multiple clients can subscribe to live metrics:

# Server
physiology_server --streaming_backend=redis --use_camera
# Dashboard 1 (web interface)
python web_dashboard.py --redis_host=localhost
# Dashboard 2 (mobile app)
./mobile_app --redis_host=localhost
# Analytics (background processor)
python analytics_worker.py --redis_host=localhost

Distributed System Integration

Decouple physiology server from consumers:

┌─────────────────────┐
│ physiology_server │
│ (publishes to │
│ Redis) │
└──────────┬──────────┘
┌──────────────┐
│ Redis Server │
└──────┬───────┘
┌────┴────┬────────────┬───────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌──────┐ ┌──────────┐ ┌─────────┐
│ Web UI │ │ API │ │ Database │ │ Alerts │
│ │ │ │ │ Logger │ │ Service │
└────────┘ └──────┘ └──────────┘ └─────────┘

Cloud Deployment

Redis enables separation of processing and storage:

# On edge device (local processing)
physiology_server \
--streaming_backend=redis \
--redis_host=cloud.redis.host \
--redis_password=$REDIS_PASSWORD \
--use_camera
# On cloud (storage and analytics)
python cloud_processor.py --redis_host=localhost

Multi-Tenant Systems

Isolate tenants using key prefixes:

# Tenant A
physiology_server --streaming_backend=redis --redis_key_prefix=tenant_a
# Tenant B
physiology_server --streaming_backend=redis --redis_key_prefix=tenant_b
# Tenant A client subscribes to: tenant_a:core_metrics
# Tenant B client subscribes to: tenant_b:core_metrics

Advanced Topics

Pattern Subscriptions

Subscribe to multiple channels with wildcards:

# Subscribe to all metrics from any prefix
pubsub.psubscribe('*:core_metrics')
# Subscribe to all channels for a prefix
pubsub.psubscribe('physiology:*')

Message Filtering

Filter messages on the client side:

for message in pubsub.listen():
if message['type'] == 'message':
envelope = json.loads(message['data'])
# Only process high-confidence pulse readings
if envelope['type'] == 'core_metrics':
pulse = envelope['payload'].get('pulse', {})
confidence = pulse.get('strict', {}).get('confidence', 0)
if confidence >= 0.9:
process_pulse(pulse)

Monitoring and Metrics

Monitor Redis pub/sub activity:

# Active clients
redis-cli CLIENT LIST | grep pubsub
# Pub/sub channels
redis-cli PUBSUB CHANNELS
# Subscribers per channel
redis-cli PUBSUB NUMSUB physiology:core_metrics
# Monitor all commands (debugging)
redis-cli MONITOR

See Also