SmartSpectra C++ SDK
Measure human vitals from video with SmartSpectra C++ SDK.
Loading...
Searching...
No Matches
redis_ipc_stream_writer_impl.hpp
1// redis_ipc_stream_writer_impl.hpp
2// Created by Greg on 1/30/25.
3// Copyright (C) 2025 Presage Security, Inc.
4//
5// SPDX-License-Identifier: LGPL-3.0-or-later
6
7#pragma once
8
9#include "redis_ipc_stream_writer.hpp"
10
11#include <sstream>
12#include <glog/logging.h>
13#include <absl/strings/escaping.h>
14#include <google/protobuf/util/json_util.h>
15#include <mediapipe/framework/port/ret_check.h>
16#include "../json_envelope.hpp"
17#include "../turbojpeg_compressor.hpp"
18
19namespace presage::smartspectra::redis_ipc {
20
21template<container::settings::IntegrationMode TIntegrationMode>
22RedisIpcStreamWriter<TIntegrationMode>::RedisIpcStreamWriter(const RedisIpcConfig& configuration)
23 : configuration_(configuration) {
24 redis_client_.SetResponseLimits(
25 configuration_.max_bulk_string_bytes,
26 configuration_.max_array_elements
27 );
28 auto status = Initialize();
29 if (!status.ok()) {
30 LOG(ERROR) << "Failed to initialize RedisIpcStreamWriter: " << status.message();
31 }
32}
33
34template<container::settings::IntegrationMode TIntegrationMode>
35RedisIpcStreamWriter<TIntegrationMode>::~RedisIpcStreamWriter() {
36 Close();
37}
38
39template<container::settings::IntegrationMode TIntegrationMode>
41 std::lock_guard<std::mutex> lock(write_mutex_);
42
43 // Connect to Redis
44 if (!redis_client_.Connect(
45 configuration_.host,
46 configuration_.port,
47 configuration_.connection_timeout
48 )) {
49 return absl::InternalError(
50 "Failed to connect to Redis: " + redis_client_.last_error()
51 );
52 }
53
54 // Clear recording commands queue
55 if (!redis_client_.Del(BuildKey(configuration_.keys.command_queue))) {
56 if (configuration_.enable_debug_logging) {
57 LOG(WARNING) << "Failed to clear Redis recording commands: "
58 << redis_client_.last_error();
59 }
60 }
61
62 // Initialize recording state
63 const std::string initial_state = R"({"recording":false,"timestamp":)" +
64 std::to_string(this->GetCurrentTimestampUs()) + "}";
65
66 if (!redis_client_.Set(BuildKey(configuration_.keys.recording_state), initial_state)) {
67 if (configuration_.enable_debug_logging) {
68 LOG(WARNING) << "Failed to set initial Redis recording state: "
69 << redis_client_.last_error();
70 }
71 }
72
73 // Publish initial state
74 if (!redis_client_.Publish(BuildChannel("recording_state"), initial_state)) {
75 if (configuration_.enable_debug_logging) {
76 LOG(WARNING) << "Failed to publish initial recording state: "
77 << redis_client_.last_error();
78 }
79 }
80
81 is_operational_ = true;
82 return absl::OkStatus();
83}
84
85template<container::settings::IntegrationMode TIntegrationMode>
87 const presage::physiology::StatusValue& status
88) {
89 if (!is_operational_) {
90 return absl::FailedPreconditionError("Writer is not operational");
91 }
92
93 std::lock_guard<std::mutex> lock(write_mutex_);
94
95 const std::string payload = BuildJsonEnvelope(
96 "status",
97 BuildStatusPayload(status),
98 status.timestamp()
99 );
100
101 if (!redis_client_.Publish(BuildChannel(configuration_.channels.status), payload)) {
102 if (publish_error_count_++ < kMaxErrorLogs) {
103 LOG(WARNING) << "Failed to publish status to Redis: "
104 << redis_client_.last_error();
105 }
106 }
107
108 return absl::OkStatus();
109}
110
111template<container::settings::IntegrationMode TIntegrationMode>
113 const presage::physiology::MetricsBuffer& metrics,
114 int64_t timestamp_us
115) {
116 if (!is_operational_) {
117 return absl::FailedPreconditionError("Writer is not operational");
118 }
119
120 std::string json;
121 auto status = google::protobuf::util::MessageToJsonString(metrics, &json, GetJsonOptions());
122 if (!status.ok()) {
123 return absl::InternalError("Failed to serialize MetricsBuffer: " + status.message().as_string());
124 }
125
126 std::lock_guard<std::mutex> lock(write_mutex_);
127
128 const std::string payload = BuildJsonEnvelope("core_metrics", json, timestamp_us);
129
130 if (!redis_client_.Publish(BuildChannel(configuration_.channels.core_metrics), payload)) {
131 if (publish_error_count_++ < kMaxErrorLogs) {
132 LOG(WARNING) << "Failed to publish core metrics to Redis: "
133 << redis_client_.last_error();
134 }
135 }
136
137 return absl::OkStatus();
138}
139
140template<container::settings::IntegrationMode TIntegrationMode>
142 const presage::physiology::Metrics& metrics, int64_t timestamp_us
143) {
144 if (!is_operational_) {
145 return absl::FailedPreconditionError("Writer is not operational");
146 }
147
148 std::string json;
149 auto status = google::protobuf::util::MessageToJsonString(metrics, &json, GetJsonOptions());
150 if (!status.ok()) {
151 return absl::InternalError("Failed to serialize Metrics: " + status.message().as_string());
152 }
153
154 std::lock_guard<std::mutex> lock(write_mutex_);
155
156 const std::string payload = BuildJsonEnvelope("edge_metrics", json, timestamp_us);
157
158 if (!redis_client_.Publish(BuildChannel(configuration_.channels.edge_metrics), payload)) {
159 if (publish_error_count_++ < kMaxErrorLogs) {
160 LOG(WARNING) << "Failed to publish edge metrics to Redis: "
161 << redis_client_.last_error();
162 }
163 }
164
165 return absl::OkStatus();
166}
167
168template<container::settings::IntegrationMode TIntegrationMode>
170 const cv::Mat& frame,
171 int64_t timestamp_us
172) {
173 if (!is_operational_) {
174 return absl::FailedPreconditionError("Writer is not operational");
175 }
176
177 if (frame.empty()) {
178 return absl::InvalidArgumentError("Frame is empty");
179 }
180
181 // Create compressor for this frame
182 MP_ASSIGN_OR_RETURN(auto compressor, compression::TurboJpegCompressor::Create());
183
184 // Compress frame
185 std::vector<unsigned char> jpeg_data;
186 MP_RETURN_IF_ERROR(compressor.Compress(frame, 90, jpeg_data));
187
188 // Base64 encode
189 std::string base64;
190 absl::Base64Escape(absl::string_view(
191 reinterpret_cast<const char*>(jpeg_data.data()),
192 jpeg_data.size()
193 ), &base64);
194
195 std::lock_guard<std::mutex> lock(write_mutex_);
196
197 // Build payload with metadata
198 std::ostringstream payload;
199 payload << '{'
200 << "\"timestamp\":" << timestamp_us << ','
201 << "\"width\":" << frame.cols << ','
202 << "\"height\":" << frame.rows << ','
203 << "\"channels\":" << frame.channels() << ','
204 << "\"jpeg_base64\":\"" << base64 << "\""
205 << '}';
206
207 const std::string payload_str = payload.str();
208
209 // Publish to channel for real-time streaming
210 if (!redis_client_.Publish(BuildChannel(configuration_.channels.hud_frame), payload_str)) {
211 if (publish_error_count_++ < kMaxErrorLogs) {
212 LOG(WARNING) << "Failed to publish frame to Redis: "
213 << redis_client_.last_error();
214 }
215 }
216
217 return absl::OkStatus();
218}
219
220template<container::settings::IntegrationMode TIntegrationMode>
222 is_operational_ = false;
223 std::lock_guard<std::mutex> lock(write_mutex_);
224 redis_client_.Close();
225}
226
227template<container::settings::IntegrationMode TIntegrationMode>
229 return is_operational_;
230}
231
232template<container::settings::IntegrationMode TIntegrationMode>
234 // Redis pub/sub is fire-and-forget, no buffering to flush
235 return absl::OkStatus();
236}
237
238template<container::settings::IntegrationMode TIntegrationMode>
240 if (!is_operational_) {
241 return absl::FailedPreconditionError("Writer is not operational");
242 }
243
244 std::lock_guard<std::mutex> lock(write_mutex_);
245
246 // Build recording state JSON
247 const std::string state = R"({"recording":)" + std::string(recording ? "true" : "false") +
248 R"(,"timestamp":)" + std::to_string(this->GetCurrentTimestampUs()) + "}";
249
250 // Publish to recording_state channel
251 if (!redis_client_.Publish(BuildChannel("recording_state"), state)) {
252 if (configuration_.enable_debug_logging) {
253 LOG(WARNING) << "Failed to publish recording state: " << redis_client_.last_error();
254 }
255 return absl::InternalError("Failed to publish recording state: " + redis_client_.last_error());
256 }
257
258 // Also update recording_state key for persistence
259 if (!redis_client_.Set(BuildKey(configuration_.keys.recording_state), state)) {
260 if (configuration_.enable_debug_logging) {
261 LOG(WARNING) << "Failed to set recording state key: " << redis_client_.last_error();
262 }
263 }
264
265 return absl::OkStatus();
266}
267
268template<container::settings::IntegrationMode TIntegrationMode>
269std::string RedisIpcStreamWriter<TIntegrationMode>::BuildKey(const std::string& suffix) const {
270 if (configuration_.key_prefix.empty()) {
271 return suffix;
272 }
273 return configuration_.key_prefix + ":" + suffix;
274}
275
276template<container::settings::IntegrationMode TIntegrationMode>
277std::string RedisIpcStreamWriter<TIntegrationMode>::BuildChannel(const std::string& suffix) const {
278 return BuildKey(suffix);
279}
280
281} // namespace presage::smartspectra::redis_ipc
static absl::StatusOr< TurboJpegCompressor > Create()
Definition turbojpeg_compressor.cpp:14
absl::Status WriteCoreMetrics(const presage::physiology::MetricsBuffer &metrics, int64_t timestamp_us) override
Definition redis_ipc_stream_writer_impl.hpp:112
absl::Status Flush() override
Definition redis_ipc_stream_writer_impl.hpp:233
absl::Status PublishRecordingState(bool recording)
Definition redis_ipc_stream_writer_impl.hpp:239
std::string BuildKey(const std::string &suffix) const
Definition redis_ipc_stream_writer_impl.hpp:269
absl::Status WriteFrame(const cv::Mat &frame, int64_t timestamp_us) override
Definition redis_ipc_stream_writer_impl.hpp:169
absl::Status WriteEdgeMetrics(const presage::physiology::Metrics &metrics, int64_t timestamp_us) override
Definition redis_ipc_stream_writer_impl.hpp:141
std::string BuildChannel(const std::string &suffix) const
Definition redis_ipc_stream_writer_impl.hpp:277
void Close() override
Definition redis_ipc_stream_writer_impl.hpp:221
bool IsOperational() const override
Definition redis_ipc_stream_writer_impl.hpp:228
absl::Status WriteStatus(const presage::physiology::StatusValue &status) override
Definition redis_ipc_stream_writer_impl.hpp:86
absl::Status Initialize()
Definition redis_ipc_stream_writer_impl.hpp:40
Definition redis_ipc_configuration.hpp:26