SmartSpectra C++ SDK
Measure human vitals from video with SmartSpectra C++ SDK.
Loading...
Searching...
No Matches
smartspectra.ipc.metrics_collector.MetricsCollector Class Reference

Public Member Functions

 __init__ (self, MetricsCollectorConfig config)
str core_channel (self)
str edge_channel (self)
str recording_state_channel (self)
None start (self)
None stop (self)
None set_session_id (self, str session_id)
Optional[str] set_recording_state (self, bool recording)
None write_snapshot_temp (self)
bool maybe_write_periodic_snapshot (self)
None process_envelope (self, str envelope_json)
Dict[str, Any] get_current_snapshot (self)

Public Attributes

 config = config
 started_at_ms = int(time.time() * 1000)
Optional[int] stopped_at_ms = None
Optional[str] current_session_id = None
bool recording = False
Optional[int] session_start_ms = None

Protected Member Functions

Tuple[Optional[Path], Optional[Path]] _get_session_paths (self, str session_id)
None _rotate_files_for_session (self)
None _aggregate_envelope (self, Dict[str, Any] envelope)
None _store_sample (self, str channel, str series, float timestamp, float value, Dict[str, Any] raw_sample)
None _write_aggregated_json (self)

Protected Attributes

Optional[Any] _jsonl_file = None
int _jsonl_count = 0
int _maxlen = config.max_samples_per_series if config.max_samples_per_series > 0 else None
dict _series_state = {}
dict _series_data = {}
Optional[Path] _current_jsonl_path = None
Optional[Path] _current_json_path = None
float _last_snapshot_time = 0.0
int _envelope_count = 0

Detailed Description

Collects metrics from Redis pub/sub and aggregates them.

Member Function Documentation

◆ _aggregate_envelope()

None smartspectra.ipc.metrics_collector.MetricsCollector._aggregate_envelope ( self,
Dict[str, Any] envelope )
protected
Aggregate envelope data into struct-of-arrays format.

◆ _get_session_paths()

Tuple[Optional[Path], Optional[Path]] smartspectra.ipc.metrics_collector.MetricsCollector._get_session_paths ( self,
str session_id )
protected
Generate session-specific file paths.

Args:
    session_id: Session ID to append to filenames

Returns:
    Tuple of (jsonl_path, json_path) with session_id appended to stems

◆ _rotate_files_for_session()

None smartspectra.ipc.metrics_collector.MetricsCollector._rotate_files_for_session ( self)
protected
Rotate JSONL file for new recording session.

Closes current JSONL file (if open) and opens a new one with session_id in filename.
Updates _current_jsonl_path and _current_json_path.

If the session_id hasn't changed (same file paths), appends to existing files instead
of rotating, and logs this behavior.

◆ _store_sample()

None smartspectra.ipc.metrics_collector.MetricsCollector._store_sample ( self,
str channel,
str series,
float timestamp,
float value,
Dict[str, Any] raw_sample )
protected
Store a single sample in struct-of-arrays format.

◆ _write_aggregated_json()

None smartspectra.ipc.metrics_collector.MetricsCollector._write_aggregated_json ( self)
protected
Write the final aggregated JSON summary file.

◆ core_channel()

str smartspectra.ipc.metrics_collector.MetricsCollector.core_channel ( self)
Get the Redis channel name for core metrics.

◆ edge_channel()

str smartspectra.ipc.metrics_collector.MetricsCollector.edge_channel ( self)
Get the Redis channel name for edge metrics.

◆ get_current_snapshot()

Dict[str, Any] smartspectra.ipc.metrics_collector.MetricsCollector.get_current_snapshot ( self)
Get current snapshot of aggregated metrics.

Returns:
    Dictionary containing 'meta' and 'series' keys in the same format
    as the JSON summary file.

◆ maybe_write_periodic_snapshot()

bool smartspectra.ipc.metrics_collector.MetricsCollector.maybe_write_periodic_snapshot ( self)
Check if periodic snapshot should be written and write if needed.

Returns:
    True if snapshot was written, False otherwise

◆ process_envelope()

None smartspectra.ipc.metrics_collector.MetricsCollector.process_envelope ( self,
str envelope_json )
Process a single metrics envelope (either core or edge).

Args:
    envelope_json: JSON string containing the metrics envelope

◆ recording_state_channel()

str smartspectra.ipc.metrics_collector.MetricsCollector.recording_state_channel ( self)
Get the Redis channel name for recording state.

◆ set_recording_state()

Optional[str] smartspectra.ipc.metrics_collector.MetricsCollector.set_recording_state ( self,
bool recording )
Set recording state and manage sessions.

This method should be called when the recording state changes, typically in response
to updates from the Redis recording_state channel (published by the physiology server).
This ensures the collector's session management reflects the actual server state.

Note: Call set_session_id() before calling this method with recording=True to set
a custom session ID. If no session ID is set, one will be auto-generated.

Args:
    recording: True to start recording, False to stop

Returns:
    Event type string for logging: 'recording_start', 'recording_stop', or None if no change

◆ set_session_id()

None smartspectra.ipc.metrics_collector.MetricsCollector.set_session_id ( self,
str session_id )
Set the session ID for the current or next recording session.

Args:
    session_id: Session ID to use for tracking this recording session

◆ start()

None smartspectra.ipc.metrics_collector.MetricsCollector.start ( self)
Initialize JSONL file for writing.

Note: Files will be rotated with session_id suffix when recording starts.
Until then, uses base config paths.

◆ stop()

None smartspectra.ipc.metrics_collector.MetricsCollector.stop ( self)
Close JSONL file and write final aggregated JSON summary.

◆ write_snapshot_temp()

None smartspectra.ipc.metrics_collector.MetricsCollector.write_snapshot_temp ( self)
Write current aggregated metrics to temporary summary file.

The documentation for this class was generated from the following file:
  • on_prem/python/smartspectra/ipc/metrics_collector.py