SmartSpectra C++ SDK
Measure human vitals from video with SmartSpectra C++ SDK.
Loading...
Searching...
No Matches
ipc_stream_writer.hpp
1
2// ipc_stream_writer.hpp
3// Created by greg on 1/26/25.
4// Copyright (C) 2025 Presage Security, Inc.
5//
6// This program is free software; you can redistribute it and/or
7// modify it under the terms of the GNU Lesser General Public
8// License as published by the Free Software Foundation; either
9// version 3 of the License, or (at your option) any later version.
10//
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14// Lesser General Public License for more details.
15//
16// You should have received a copy of the GNU Lesser General Public License
17// along with this program; if not, write to the Free Software Foundation,
18// Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20
21#pragma once
22
23#include <memory>
24#include <chrono>
25#include <absl/status/status.h>
26#include <mediapipe/framework/port/status_macros.h>
27#include <physiology/modules/messages/status.pb.h>
28#include <physiology/modules/messages/metrics.pb.h>
29#include <opencv2/core/mat.hpp>
30#include <smartspectra/container/background_container.hpp>
31#include <glog/logging.h>
32
33namespace presage::smartspectra::ipc {
34
45template<container::settings::IntegrationMode TIntegrationMode>
47public:
48 using BackgroundContainer =
50 presage::platform_independence::DeviceType::Cpu,
51 smartspectra::container::settings::OperationMode::Continuous,
52 TIntegrationMode
53 >;
54
55 virtual ~IpcStreamWriter() = default;
56
64 virtual absl::Status SwapContainer(std::shared_ptr<BackgroundContainer> new_container) {
65 container_ = new_container;
66 if (container_) {
67 MP_RETURN_IF_ERROR(RegisterCallbacks());
68 MP_RETURN_IF_ERROR(SendInitialStatus());
69 }
70 return absl::OkStatus();
71 }
72
77 const BackgroundContainer* GetContainer() const {
78 return container_.get();
79 }
80
86 virtual absl::Status WriteStatus(const presage::physiology::StatusValue& status) = 0;
87
94 virtual absl::Status WriteCoreMetrics(
95 const presage::physiology::MetricsBuffer& metrics,
96 int64_t timestamp_us
97 ) = 0;
98
104 virtual absl::Status WriteEdgeMetrics(const presage::physiology::Metrics& metrics, int64_t timestamp_us) = 0;
105
112 virtual absl::Status WriteFrame(const cv::Mat& frame, int64_t timestamp_us) = 0;
113
118 virtual void Close() = 0;
119
124 virtual bool IsOperational() const = 0;
125
131 virtual absl::Status Flush() = 0;
132
133protected:
139 virtual absl::Status RegisterCallbacks() {
140 if (!container_ || !IsOperational()) {
141 return absl::OkStatus();
142 }
143
144 // Register status callback
145 MP_RETURN_IF_ERROR(container_->SetOnStatusChange(
146 [this](presage::physiology::StatusValue status_value) {
147 return WriteStatus(status_value);
148 }
149 ));
150
151
152 // Register core metrics callback
153 MP_RETURN_IF_ERROR(container_->SetOnCoreMetricsOutput(
154 [this](presage::physiology::MetricsBuffer metrics, int64_t timestamp_us) {
155 return WriteCoreMetrics(metrics, timestamp_us);
156 }
157 ));
158
159 // Register edge metrics callback
160 MP_RETURN_IF_ERROR(container_->SetOnEdgeMetricsOutput(
161 [this](presage::physiology::Metrics metrics, int64_t timestamp_us) {
162 return WriteEdgeMetrics(metrics, timestamp_us);
163 }
164 ));
165
166 // Register video output callback for camera capture
167 MP_RETURN_IF_ERROR(container_->SetOnVideoOutput(
168 [this](cv::Mat& frame, int64_t timestamp_us) {
169 return WriteFrame(frame, timestamp_us);
170 }
171 ));
172
173 return absl::OkStatus();
174 }
175
181 virtual absl::Status SendInitialStatus() {
182 if (!container_ || !IsOperational()) {
183 return absl::OkStatus();
184 }
185
186 auto status_code = container_->GetStatusCode();
187 auto current_time = std::chrono::system_clock::now().time_since_epoch();
188 int64_t timestamp = std::chrono::duration_cast<std::chrono::microseconds>(current_time).count();
189
190 physiology::StatusValue status_value;
191 status_value.set_value(status_code);
192 status_value.set_timestamp(timestamp);
193
194 return WriteStatus(status_value);
195 }
196
197 // Protected member accessible to derived classes
198 std::shared_ptr<BackgroundContainer> container_;
199
200 int64_t GetCurrentTimestampUs() {
201 using namespace std::chrono;
202 return duration_cast<microseconds>(system_clock::now().time_since_epoch()).count();
203 }
204
205};
206
211// template<container::settings::IntegrationMode TIntegrationMode>
212// std::unique_ptr<IpcStreamWriter<TIntegrationMode>> CreateIpcStreamWriter(
213// StreamBackend backend,
214// const FileIpcConfig& file_config,
215// const RedisIpcConfig& redis_config
216// );
217
218} // namespace presage::smartspectra::ipc
Container for background thread processing.
Definition background_container.hpp:40
Definition ipc_stream_writer.hpp:46
virtual absl::Status SendInitialStatus()
Definition ipc_stream_writer.hpp:181
virtual absl::Status WriteCoreMetrics(const presage::physiology::MetricsBuffer &metrics, int64_t timestamp_us)=0
virtual absl::Status WriteStatus(const presage::physiology::StatusValue &status)=0
const BackgroundContainer * GetContainer() const
Definition ipc_stream_writer.hpp:77
virtual absl::Status RegisterCallbacks()
Definition ipc_stream_writer.hpp:139
virtual absl::Status WriteFrame(const cv::Mat &frame, int64_t timestamp_us)=0
virtual absl::Status WriteEdgeMetrics(const presage::physiology::Metrics &metrics, int64_t timestamp_us)=0
virtual absl::Status SwapContainer(std::shared_ptr< BackgroundContainer > new_container)
Definition ipc_stream_writer.hpp:64