SmartSpectra C++ SDK
Measure human vitals from video with SmartSpectra C++ SDK.
Loading...
Searching...
No Matches
SmartSpectra-IPC Python Package

The SmartSpectra-IPC package (smartspectra-ipc) provides Python utilities for working with alternative IPC backends in OnPrem deployments. The package is distributed as a wheel file with OnPrem releases and focuses on simplifying metrics collection, aggregation, and analysis workflows.

Overview

The package is designed for extensibility. Current functionality centers on Redis metrics collection via the MetricsCollector class, with plans to add File IPC utilities, data transformation pipelines, and integrations with analysis frameworks in future releases.

Package Structure:

smartspectra.ipc/
├── MetricsCollector # High-level metrics collection and aggregation
├── MetricsCollectorConfig # Configuration dataclass
├── RespReader # Low-level Redis RESP protocol reader
├── encode_command() # RESP command encoding utility
└── normalize() # Response normalization utility

Installation

The smartspectra-ipc wheel is included in OnPrem package distributions:

pip install smartspectra_ipc-1.7.0-py3-none-any.whl

Dependencies:

  • Python 3.11+
  • No external dependencies (uses stdlib only)

MetricsCollector Class

The MetricsCollector class provides high-level abstractions for collecting metrics from Redis pub/sub channels published by physiology_server when using the Redis IPC backend.

Key Features

Automatic Channel Management:

  • Subscribes to core and edge metrics channels based on key prefix
  • Automatically subscribes to recording_state channel for server state updates
  • Handles Redis connection and RESP protocol details
  • No manual Redis client configuration needed

Recording State Management:

  • Automatically detects recording start/stop via recording_state channel
  • Generates session-specific file paths when recording starts
  • Supports custom session IDs via set_session_id() method
  • Falls back to datetime-based session IDs if not set

Per-Session File Naming:

  • Creates session-specific files: metrics_{session_id}.jsonl and metrics_{session_id}.json
  • Conditional file rotation: only rotates if session changes
  • Same session ID appends to existing files (no rotation)
  • Falls back to base config paths if no session_id set

Data Aggregation:

  • Converts streaming data (array-of-structs) to analysis format (struct-of-arrays)
  • Aggregates all time-series into unified data structure
  • Handles timestamp resets and wraps automatically
  • Deduplicates and sorts samples by timestamp

Dual Output:

  • Continuous JSONL: Streams all metric envelopes to file as received
  • Final JSON Summary: Writes aggregated struct-of-arrays format when recording stops

Periodic Snapshots:

  • Automatic snapshots every 30 seconds during recording
  • Recording start/stop triggers immediate snapshot writes
  • Real-time access to aggregated data via get_current_snapshot()

Session Tracking:

  • Records start/end times with millisecond precision
  • Automatic or custom session IDs for organizing collection runs
  • Metadata includes sample counts, series counts, and duration

Configuration

MetricsCollectorConfig is a Python dataclass with the following fields:

Field Type Default Description
redis_host str "localhost" Redis server hostname
redis_port int 6379 Redis server port
redis_key_prefix str "physiology" Key prefix for channel names
output_jsonl_path Path None Base path for JSONL output (session-specific files created during recording)
output_json_path Path None Base path for JSON summary (session-specific files created during recording)
max_samples_per_series int 0 Max samples per series (0 = unlimited)
append_mode bool False Append to existing files vs. truncate
whitespace bool False Include whitespace in JSON output

Dataclass Benefits:

As a dataclass, MetricsCollectorConfig supports:

  • Automatic __init__, __repr__, and __eq__ generation
  • Type hints for all fields
  • Default values without boilerplate
  • Easy serialization/deserialization with dataclasses.asdict() and dataclasses.replace()

Loading from Disk:

import json
from dataclasses import asdict
from pathlib import Path
from smartspectra.ipc import MetricsCollectorConfig
# Save configuration
config = MetricsCollectorConfig(
redis_host="192.168.1.100",
session_id="experiment_42"
)
with open("collector_config.json", "w") as f:
json.dump(asdict(config), f, indent=2)
# Load configuration
with open("collector_config.json", "r") as f:
config_dict = json.load(f)
# Convert Path strings back to Path objects if needed
if config_dict.get("output_jsonl_path"):
config_dict["output_jsonl_path"] = Path(config_dict["output_jsonl_path"])
if config_dict.get("output_json_path"):
config_dict["output_json_path"] = Path(config_dict["output_json_path"])
loaded_config = MetricsCollectorConfig(**config_dict)
Definition __init__.py:1

Basic Usage

Complete Example from redis_ipc_metrics_saving_client.py:

from smartspectra.ipc import MetricsCollector, MetricsCollectorConfig
from smartspectra.ipc import RespReader, encode_command, normalize
import socket
from pathlib import Path
# Create configuration (note: session_id is not in config anymore)
config = MetricsCollectorConfig(
redis_host="localhost",
redis_port=6379,
redis_key_prefix="physiology",
output_jsonl_path=Path("./metrics.jsonl"),
output_json_path=Path("./metrics_summary.json")
)
# Create and start collector
collector = MetricsCollector(config)
collector.start() # Opens JSONL file for writing
# Connect to Redis and subscribe
with socket.create_connection((config.redis_host, config.redis_port)) as sock:
sock.settimeout(0.5)
reader = RespReader(sock)
# Send PING
sock.sendall(encode_command("PING"))
reply = reader.read() # Should be "PONG"
# Subscribe to channels (including recording_state)
core_channel = collector.core_channel # "physiology:core_metrics"
edge_channel = collector.edge_channel # "physiology:edge_metrics"
recording_channel = collector.recording_channel # "physiology:recording_state"
sock.sendall(encode_command("SUBSCRIBE", core_channel, edge_channel, recording_channel))
# Process messages
while True:
reply = reader.read()
if reply is None:
continue
reply = normalize(reply)
if isinstance(reply, list) and reply:
kind = reply[0]
if kind == "message" and len(reply) >= 3:
payload_raw = reply[2]
if isinstance(payload_raw, bytes):
payload_str = payload_raw.decode('utf-8')
else:
payload_str = str(payload_raw)
# Feed envelope to collector
collector.process_envelope(payload_str)
# Stop collection and write final summary
collector.stop() # Closes JSONL file, writes JSON summary

Real-Time Snapshots

The get_current_snapshot() method returns the current aggregated state without stopping collection.

Automatic Periodic Snapshots:

The collector automatically writes snapshot files during recording:

  • Every 30 seconds during active recording
  • When recording starts
  • When recording stops
  • Snapshot files use temp naming: {base_name}_snapshot_temp.json

Manual Snapshots:

Call get_current_snapshot() at any time for:

  • Real-time monitoring dashboards
  • Progress tracking during long collections
  • Intermediate checkpoints
  • Live data visualization

Example:

import time
from smartspectra.ipc import MetricsCollector, MetricsCollectorConfig
# Configure and start
config = MetricsCollectorConfig(
redis_host="localhost",
redis_port=6379,
redis_key_prefix="physiology"
)
collector = MetricsCollector(config)
collector.start()
# ... Redis message processing loop in background ...
# Get periodic snapshots
while collecting:
snapshot = collector.get_current_snapshot()
# Access metadata
meta = snapshot['meta']
print(f"Session: {meta.get('session_id', 'N/A')}")
print(f"Duration: {meta['duration_seconds']:.1f}s")
print(f"Total samples: {meta['total_samples']}")
print(f"Series count: {meta['series_count']}")
print(f"JSONL envelopes: {meta['jsonl_envelopes_written']}")
# Access aggregated series data
series = snapshot['series']
# Example: Get pulse data from core metrics
core_channel = "physiology:core_metrics"
if core_channel in series:
pulse_series = series[core_channel].get('pulse.strict')
if pulse_series:
timestamps = pulse_series['timestamps'] # List of floats (microseconds)
values = pulse_series['values'] # List of floats (BPM)
relative = pulse_series['relative_us'] # Relative old timestamps (deprecated)
if values:
latest_bpm = values[-1]
print(f"Latest pulse: {latest_bpm:.1f} BPM")
print(f"Sample count: {len(values)}")
time.sleep(5) # Update every 5 seconds
# Finalize
collector.stop()

Snapshot Structure:

{
"meta": {
"started_at_ms": 1672531200000,
"started_at_iso": "2023-01-01T00:00:00",
"updated_at_ms": 1672532800000, # Last update timestamp
"updated_at_iso": "2023-01-01T00:30:00",
"stopped_at_ms": 1672534800000, # Only if stopped
"stopped_at_iso": "2023-01-01T01:00:00", # Only if stopped
"duration_seconds": 3600.0,
"session_id": "experiment_001",
"redis_host": "localhost",
"redis_port": 6379,
"redis_prefix": "physiology",
"jsonl_envelopes_written": 18000,
"series_count": 12,
"total_samples": 180000
},
"series": {
"physiology:core_metrics": {
"pulse.strict": {
"timestamps": [1234567.0, 1234568.0, ...], # Absolute timestamps (μs)
"values": [72.5, 72.3, ...], # Metric values
"relative_us": [1234567.0, 1234568.0, ...] # Original timestamps
},
"breathing.strict": {
"timestamps": [...],
"values": [...],
"relative_us": [...]
}
},
"physiology:edge_metrics": {
"eda": {
"timestamps": [...],
"values": [...],
"relative_us": [...]
},
"upper_breathing_trace": {
"timestamps": [...],
"values": [...],
"relative_us": [...]
}
}
}
}

Output Formats

JSONL Output (Continuous)

Each line is a complete JSON envelope as received from Redis:

{"type":"core_metrics","timestamp":1234567890,"payload":{"pulse":{"strict":{"value":72.5,"confidence":0.95}}}}
{"type":"edge_metrics","timestamp":1234567891,"payload":{"eda":0.42,"upper_breathing_trace":1.23}}
{"type":"core_metrics","timestamp":1234567892,"payload":{"pulse":{"strict":{"value":72.3,"confidence":0.96}}}}

Use Cases:

  • Streaming analysis pipelines
  • Event replay
  • Debugging
  • Audit trails

JSON Summary (Final)

Written on collector.stop(), contains aggregated struct-of-arrays format suitable for analysis:

{
"meta": {
"started_at_ms": 1672531200000,
"started_at_iso": "2023-01-01T00:00:00",
"updated_at_ms": 1672534800000,
"updated_at_iso": "2023-01-01T01:00:00",
"stopped_at_ms": 1672534800000,
"stopped_at_iso": "2023-01-01T01:00:00",
"duration_seconds": 3600.0,
"session_id": "experiment_001",
"series_count": 8,
"total_samples": 180000
},
"series": {
"physiology:core_metrics": {
"pulse.strict": {
"timestamps": [1234567.0, 1234568.0, ...],
"values": [72.5, 72.3, ...],
"relative_us": [1234567.0, 1234568.0, ...]
}
}
}
}

Use Cases:

  • NumPy/Pandas integration: pd.DataFrame(series_data)
  • Statistical analysis
  • Visualization (matplotlib, plotly)
  • Machine learning feature extraction

Advanced Features

Timestamp Handling

The collector automatically handles:

Timestamp Resets:

  • Detects when timestamp decreases by > 1 second
  • Applies offset to maintain monotonic timeline
  • Preserves original timestamps in relative_us field

Deduplication:

  • Ignores samples with duplicate/earlier timestamps
  • Maintains strict temporal ordering

Wrapping:

  • Handles 32-bit timestamp overflows
  • Continues sequence across wrap boundaries

Sample Limiting

Limit memory usage for long-running collections:

config = MetricsCollectorConfig(
max_samples_per_series=10000 # Keep last 10K samples per series
)

When limit is reached, oldest samples are discarded (FIFO).

Append Mode

Continue previous collection sessions:

config = MetricsCollectorConfig(
output_jsonl_path=Path("./metrics.jsonl"),
append_mode=True # Append instead of truncate
)

Useful for:

  • Multi-day collections
  • Resuming after crashes
  • Continuous monitoring

Session Management

The MetricsCollector provides automatic session management with per-session file naming.

Setting Session ID:

Use the set_session_id() method to set a custom session ID before or during recording:

from datetime import datetime
from smartspectra.ipc import MetricsCollector, MetricsCollectorConfig
# Create collector with base file paths
config = MetricsCollectorConfig(
redis_host="localhost",
redis_port=6379,
output_jsonl_path=Path("./metrics.jsonl"),
output_json_path=Path("./metrics_summary.json")
)
collector = MetricsCollector(config)
# Option 1: Set custom session ID
collector.set_session_id("experiment_001")
# Option 2: Auto-generate datetime-based session ID
session_id = datetime.now().strftime("run_%Y%m%d_%H%M%S")
collector.set_session_id(session_id)
# Option 3: Don't set session_id - collector auto-generates one when recording starts
# (datetime-based like "20251006_205619")

Per-Session File Naming:

When a session ID is set (manually or auto-generated), the collector creates session-specific files:

  • metrics_{session_id}.jsonl - Continuous JSONL stream
  • metrics_{session_id}.json - Final aggregated summary

Examples:

  • metrics_experiment_001.jsonl
  • metrics_run_20251006_143022.jsonl
  • metrics_20251006_205619.jsonl (auto-generated)

File Rotation Behavior:

The collector uses conditional file rotation:

  • Same session ID: Appends to existing files (no rotation)
    • Useful for resuming interrupted sessions
    • Logs: "Continuing with same session_id"
  • Different session ID: Rotates to new files
    • Creates new files with new session ID
    • Logs: "Rotating files for new session"
  • No session ID: Falls back to base config paths
    • Uses output_jsonl_path and output_json_path as-is

Recording State Integration:

The collector automatically subscribes to the recording_state channel and:

  • Generates session ID when recording starts (if not set)
  • Creates session-specific files when recording starts
  • Writes final JSON summary when recording stops

Integration Examples

With Pandas

import pandas as pd
import json
# Load JSON summary
with open("metrics_summary.json") as f:
data = json.load(f)
# Convert pulse series to DataFrame
pulse_data = data['series']['physiology:core_metrics']['pulse.strict']
df = pd.DataFrame({
'timestamp_us': pulse_data['timestamps'],
'pulse_bpm': pulse_data['values']
})
# Convert to datetime index
df['datetime'] = pd.to_datetime(df['timestamp_us'], unit='us')
df.set_index('datetime', inplace=True)
# Analyze
print(df['pulse_bpm'].describe())
df['pulse_bpm'].plot()

With NumPy

import numpy as np
import json
with open("metrics_summary.json") as f:
data = json.load(f)
# Extract arrays
pulse_data = data['series']['physiology:core_metrics']['pulse.strict']
timestamps = np.array(pulse_data['timestamps'])
values = np.array(pulse_data['values'])
# Compute statistics
mean_bpm = np.mean(values)
std_bpm = np.std(values)
print(f"Pulse: {mean_bpm:.1f} ± {std_bpm:.1f} BPM")

See Also

  • Redis IPC Backend - Redis IPC backend configuration
  • SmartSpectra OnPrem Architecture - System architecture overview
  • samples/redis_ipc_metrics_saving_client.py - Complete usage example
  • samples/plot_metrics.py - Visualization example using JSON summary