The SmartSpectra-IPC package (smartspectra-ipc) provides Python utilities for working with alternative IPC backends in OnPrem deployments. The package is distributed as a wheel file with OnPrem releases and focuses on simplifying metrics collection, aggregation, and analysis workflows.
Overview
The package is designed for extensibility. Current functionality centers on Redis metrics collection via the MetricsCollector class, with plans to add File IPC utilities, data transformation pipelines, and integrations with analysis frameworks in future releases.
Package Structure:
smartspectra.ipc/
├── MetricsCollector # High-level metrics collection and aggregation
├── MetricsCollectorConfig # Configuration dataclass
├── RespReader # Low-level Redis RESP protocol reader
├── encode_command() # RESP command encoding utility
└── normalize() # Response normalization utility
Installation
The smartspectra-ipc wheel is included in OnPrem package distributions:
pip install smartspectra_ipc-1.7.0-py3-none-any.whl
Dependencies:
- Python 3.11+
- No external dependencies (uses stdlib only)
MetricsCollector Class
The MetricsCollector class provides high-level abstractions for collecting metrics from Redis pub/sub channels published by physiology_server when using the Redis IPC backend.
Key Features
Automatic Channel Management:
- Subscribes to core and edge metrics channels based on key prefix
- Automatically subscribes to recording_state channel for server state updates
- Handles Redis connection and RESP protocol details
- No manual Redis client configuration needed
Recording State Management:
- Automatically detects recording start/stop via recording_state channel
- Generates session-specific file paths when recording starts
- Supports custom session IDs via set_session_id() method
- Falls back to datetime-based session IDs if not set
Per-Session File Naming:
- Creates session-specific files: metrics_{session_id}.jsonl and metrics_{session_id}.json
- Conditional file rotation: only rotates if session changes
- Same session ID appends to existing files (no rotation)
- Falls back to base config paths if no session_id set
Data Aggregation:
- Converts streaming data (array-of-structs) to analysis format (struct-of-arrays)
- Aggregates all time-series into unified data structure
- Handles timestamp resets and wraps automatically
- Deduplicates and sorts samples by timestamp
Dual Output:
- Continuous JSONL: Streams all metric envelopes to file as received
- Final JSON Summary: Writes aggregated struct-of-arrays format when recording stops
Periodic Snapshots:
- Automatic snapshots every 30 seconds during recording
- Recording start/stop triggers immediate snapshot writes
- Real-time access to aggregated data via get_current_snapshot()
Session Tracking:
- Records start/end times with millisecond precision
- Automatic or custom session IDs for organizing collection runs
- Metadata includes sample counts, series counts, and duration
Configuration
MetricsCollectorConfig is a Python dataclass with the following fields:
| Field | Type | Default | Description |
| redis_host | str | "localhost" | Redis server hostname |
| redis_port | int | 6379 | Redis server port |
| redis_key_prefix | str | "physiology" | Key prefix for channel names |
| output_jsonl_path | Path | None | Base path for JSONL output (session-specific files created during recording) |
| output_json_path | Path | None | Base path for JSON summary (session-specific files created during recording) |
| max_samples_per_series | int | 0 | Max samples per series (0 = unlimited) |
| append_mode | bool | False | Append to existing files vs. truncate |
| whitespace | bool | False | Include whitespace in JSON output |
Dataclass Benefits:
As a dataclass, MetricsCollectorConfig supports:
- Automatic __init__, __repr__, and __eq__ generation
- Type hints for all fields
- Default values without boilerplate
- Easy serialization/deserialization with dataclasses.asdict() and dataclasses.replace()
Loading from Disk:
import json
from dataclasses import asdict
from pathlib import Path
config = MetricsCollectorConfig(
redis_host="192.168.1.100",
session_id="experiment_42"
)
with open("collector_config.json", "w") as f:
json.dump(asdict(config), f, indent=2)
with open("collector_config.json", "r") as f:
config_dict = json.load(f)
if config_dict.get("output_jsonl_path"):
config_dict["output_jsonl_path"] = Path(config_dict["output_jsonl_path"])
if config_dict.get("output_json_path"):
config_dict["output_json_path"] = Path(config_dict["output_json_path"])
loaded_config = MetricsCollectorConfig(**config_dict)
Basic Usage
Complete Example from redis_ipc_metrics_saving_client.py:
import socket
from pathlib import Path
config = MetricsCollectorConfig(
redis_host="localhost",
redis_port=6379,
redis_key_prefix="physiology",
output_jsonl_path=Path("./metrics.jsonl"),
output_json_path=Path("./metrics_summary.json")
)
collector = MetricsCollector(config)
collector.start()
with socket.create_connection((config.redis_host, config.redis_port)) as sock:
sock.settimeout(0.5)
reader = RespReader(sock)
sock.sendall(encode_command("PING"))
reply = reader.read()
core_channel = collector.core_channel
edge_channel = collector.edge_channel
recording_channel = collector.recording_channel
sock.sendall(encode_command("SUBSCRIBE", core_channel, edge_channel, recording_channel))
while True:
reply = reader.read()
if reply is None:
continue
reply = normalize(reply)
if isinstance(reply, list) and reply:
kind = reply[0]
if kind == "message" and len(reply) >= 3:
payload_raw = reply[2]
if isinstance(payload_raw, bytes):
payload_str = payload_raw.decode('utf-8')
else:
payload_str = str(payload_raw)
collector.process_envelope(payload_str)
collector.stop()
Real-Time Snapshots
The get_current_snapshot() method returns the current aggregated state without stopping collection.
Automatic Periodic Snapshots:
The collector automatically writes snapshot files during recording:
- Every 30 seconds during active recording
- When recording starts
- When recording stops
- Snapshot files use temp naming: {base_name}_snapshot_temp.json
Manual Snapshots:
Call get_current_snapshot() at any time for:
- Real-time monitoring dashboards
- Progress tracking during long collections
- Intermediate checkpoints
- Live data visualization
Example:
import time
config = MetricsCollectorConfig(
redis_host="localhost",
redis_port=6379,
redis_key_prefix="physiology"
)
collector = MetricsCollector(config)
collector.start()
while collecting:
snapshot = collector.get_current_snapshot()
meta = snapshot['meta']
print(f"Session: {meta.get('session_id', 'N/A')}")
print(f"Duration: {meta['duration_seconds']:.1f}s")
print(f"Total samples: {meta['total_samples']}")
print(f"Series count: {meta['series_count']}")
print(f"JSONL envelopes: {meta['jsonl_envelopes_written']}")
series = snapshot['series']
core_channel = "physiology:core_metrics"
if core_channel in series:
pulse_series = series[core_channel].get('pulse.strict')
if pulse_series:
timestamps = pulse_series['timestamps']
values = pulse_series['values']
relative = pulse_series['relative_us']
if values:
latest_bpm = values[-1]
print(f"Latest pulse: {latest_bpm:.1f} BPM")
print(f"Sample count: {len(values)}")
time.sleep(5)
collector.stop()
Snapshot Structure:
{
"meta": {
"started_at_ms": 1672531200000,
"started_at_iso": "2023-01-01T00:00:00",
"updated_at_ms": 1672532800000,
"updated_at_iso": "2023-01-01T00:30:00",
"stopped_at_ms": 1672534800000,
"stopped_at_iso": "2023-01-01T01:00:00",
"duration_seconds": 3600.0,
"session_id": "experiment_001",
"redis_host": "localhost",
"redis_port": 6379,
"redis_prefix": "physiology",
"jsonl_envelopes_written": 18000,
"series_count": 12,
"total_samples": 180000
},
"series": {
"physiology:core_metrics": {
"pulse.strict": {
"timestamps": [1234567.0, 1234568.0, ...],
"values": [72.5, 72.3, ...],
"relative_us": [1234567.0, 1234568.0, ...]
},
"breathing.strict": {
"timestamps": [...],
"values": [...],
"relative_us": [...]
}
},
"physiology:edge_metrics": {
"eda": {
"timestamps": [...],
"values": [...],
"relative_us": [...]
},
"upper_breathing_trace": {
"timestamps": [...],
"values": [...],
"relative_us": [...]
}
}
}
}
Output Formats
JSONL Output (Continuous)
Each line is a complete JSON envelope as received from Redis:
{"type":"core_metrics","timestamp":1234567890,"payload":{"pulse":{"strict":{"value":72.5,"confidence":0.95}}}}
{"type":"edge_metrics","timestamp":1234567891,"payload":{"eda":0.42,"upper_breathing_trace":1.23}}
{"type":"core_metrics","timestamp":1234567892,"payload":{"pulse":{"strict":{"value":72.3,"confidence":0.96}}}}
Use Cases:
- Streaming analysis pipelines
- Event replay
- Debugging
- Audit trails
JSON Summary (Final)
Written on collector.stop(), contains aggregated struct-of-arrays format suitable for analysis:
{
"meta": {
"started_at_ms": 1672531200000,
"started_at_iso": "2023-01-01T00:00:00",
"updated_at_ms": 1672534800000,
"updated_at_iso": "2023-01-01T01:00:00",
"stopped_at_ms": 1672534800000,
"stopped_at_iso": "2023-01-01T01:00:00",
"duration_seconds": 3600.0,
"session_id": "experiment_001",
"series_count": 8,
"total_samples": 180000
},
"series": {
"physiology:core_metrics": {
"pulse.strict": {
"timestamps": [1234567.0, 1234568.0, ...],
"values": [72.5, 72.3, ...],
"relative_us": [1234567.0, 1234568.0, ...]
}
}
}
}
Use Cases:
- NumPy/Pandas integration: pd.DataFrame(series_data)
- Statistical analysis
- Visualization (matplotlib, plotly)
- Machine learning feature extraction
Advanced Features
Timestamp Handling
The collector automatically handles:
Timestamp Resets:
- Detects when timestamp decreases by > 1 second
- Applies offset to maintain monotonic timeline
- Preserves original timestamps in relative_us field
Deduplication:
- Ignores samples with duplicate/earlier timestamps
- Maintains strict temporal ordering
Wrapping:
- Handles 32-bit timestamp overflows
- Continues sequence across wrap boundaries
Sample Limiting
Limit memory usage for long-running collections:
config = MetricsCollectorConfig(
max_samples_per_series=10000
)
When limit is reached, oldest samples are discarded (FIFO).
Append Mode
Continue previous collection sessions:
config = MetricsCollectorConfig(
output_jsonl_path=Path("./metrics.jsonl"),
append_mode=True
)
Useful for:
- Multi-day collections
- Resuming after crashes
- Continuous monitoring
Session Management
The MetricsCollector provides automatic session management with per-session file naming.
Setting Session ID:
Use the set_session_id() method to set a custom session ID before or during recording:
from datetime import datetime
config = MetricsCollectorConfig(
redis_host="localhost",
redis_port=6379,
output_jsonl_path=Path("./metrics.jsonl"),
output_json_path=Path("./metrics_summary.json")
)
collector = MetricsCollector(config)
collector.set_session_id("experiment_001")
session_id = datetime.now().strftime("run_%Y%m%d_%H%M%S")
collector.set_session_id(session_id)
Per-Session File Naming:
When a session ID is set (manually or auto-generated), the collector creates session-specific files:
- metrics_{session_id}.jsonl - Continuous JSONL stream
- metrics_{session_id}.json - Final aggregated summary
Examples:
- metrics_experiment_001.jsonl
- metrics_run_20251006_143022.jsonl
- metrics_20251006_205619.jsonl (auto-generated)
File Rotation Behavior:
The collector uses conditional file rotation:
- Same session ID: Appends to existing files (no rotation)
- Useful for resuming interrupted sessions
- Logs: "Continuing with same session_id"
- Different session ID: Rotates to new files
- Creates new files with new session ID
- Logs: "Rotating files for new session"
- No session ID: Falls back to base config paths
- Uses output_jsonl_path and output_json_path as-is
Recording State Integration:
The collector automatically subscribes to the recording_state channel and:
- Generates session ID when recording starts (if not set)
- Creates session-specific files when recording starts
- Writes final JSON summary when recording stops
Integration Examples
With Pandas
import pandas as pd
import json
with open("metrics_summary.json") as f:
data = json.load(f)
pulse_data = data['series']['physiology:core_metrics']['pulse.strict']
df = pd.DataFrame({
'timestamp_us': pulse_data['timestamps'],
'pulse_bpm': pulse_data['values']
})
df['datetime'] = pd.to_datetime(df['timestamp_us'], unit='us')
df.set_index('datetime', inplace=True)
print(df['pulse_bpm'].describe())
df['pulse_bpm'].plot()
With NumPy
import numpy as np
import json
with open("metrics_summary.json") as f:
data = json.load(f)
pulse_data = data['series']['physiology:core_metrics']['pulse.strict']
timestamps = np.array(pulse_data['timestamps'])
values = np.array(pulse_data['values'])
mean_bpm = np.mean(values)
std_bpm = np.std(values)
print(f"Pulse: {mean_bpm:.1f} ± {std_bpm:.1f} BPM")
See Also
- Redis IPC Backend - Redis IPC backend configuration
- SmartSpectra OnPrem Architecture - System architecture overview
- samples/redis_ipc_metrics_saving_client.py - Complete usage example
- samples/plot_metrics.py - Visualization example using JSON summary