SmartSpectra C++ SDK
Measure human vitals from video with SmartSpectra C++ SDK.
Loading...
Searching...
No Matches
file_ipc_stream_writer_impl.hpp
1// file_ipc_stream_writer_impl.hpp
2// Created by Greg on 9/26/2025.
3// Copyright (C) 2025 Presage Security, Inc.
4//
5// SPDX-License-Identifier: LGPL-3.0-or-later
6
7#pragma once
8
9// === standard library includes (if any) ===
10#include <iomanip>
11#include <sstream>
12#include <vector>
13#include <algorithm>
14#include <limits>
15// === third-party includes (if any) ===
16#include <absl/strings/str_cat.h>
17#include <physiology/modules/messages/status.h>
18// === local includes (if any) ===
19#include "file_ipc_stream_writer.hpp"
20
21namespace presage::smartspectra::file_ipc {
22
23template<container::settings::IntegrationMode TIntegrationMode>
24FileIpcStreamWriter<TIntegrationMode>::FileIpcStreamWriter(const FileIpcConfiguration& configuration)
25 : configuration_(configuration), base_directory_(configuration.base_directory) {
26 // Initialize TurboJPEG compressor
27 turbo_jpeg_compressor_ = tjInitCompress();
28 if (!turbo_jpeg_compressor_) {
29 LOG(ERROR) << "Failed to initialize TurboJPEG compressor";
30 }
31
32 auto status = Initialize();
33 if (!status.ok()) {
34 LOG(ERROR) << "Failed to initialize FileIpcStreamWriter: " << status.message();
35 }
36}
37
38template<container::settings::IntegrationMode TIntegrationMode>
39FileIpcStreamWriter<TIntegrationMode>::~FileIpcStreamWriter() {
40 Close();
41
42 // Clean up TurboJPEG compressor
43 if (turbo_jpeg_compressor_) {
44 tjDestroy(turbo_jpeg_compressor_);
45 turbo_jpeg_compressor_ = nullptr;
46 }
47}
48
49template<container::settings::IntegrationMode TIntegrationMode>
51 std::lock_guard<std::mutex> lock(write_mutex_);
52
53 // Create base directory if needed
54 if (configuration_.create_if_missing) {
55 std::error_code ec;
56 std::filesystem::create_directories(base_directory_, ec);
57 if (ec) {
58 return absl::InternalError(
59 absl::StrCat("Failed to create base directory ", base_directory_.string(), ": ", ec.message())
60 );
61 }
62 }
63
64 if (!std::filesystem::exists(base_directory_)) {
65 return absl::NotFoundError(absl::StrCat("Base directory does not exist: ", base_directory_.string()));
66 }
67
68 // Create frames directory
69 frames_dir_ = base_directory_ / "frames";
70 if (configuration_.create_if_missing) {
71 std::error_code ec;
72 std::filesystem::create_directories(frames_dir_, ec);
73 if (ec) {
74 return absl::InternalError(
75 absl::StrCat("Failed to create frames directory: ", ec.message())
76 );
77 }
78 }
79
80 // Open stream files
81 auto open_mode = configuration_.truncate_on_init
82 ? (std::ios::out | std::ios::trunc)
83 : (std::ios::out | std::ios::app);
84
85 core_metrics_stream_.open(base_directory_ / "core_metrics.jsonl", open_mode);
86 edge_metrics_stream_.open(base_directory_ / "edge_metrics.jsonl", open_mode);
87 status_stream_.open(base_directory_ / "status_codes.jsonl", open_mode);
88
89 if (!core_metrics_stream_.is_open() || !edge_metrics_stream_.is_open() || !status_stream_.is_open()) {
90 return absl::InternalError("Failed to open one or more output streams");
91 }
92
93 // Set buffer size if specified
94 if (configuration_.io_buffer_size > 0) {
95 // Create separate buffers for each stream to avoid memory corruption
96 core_metrics_buffer_ = std::vector<char>(configuration_.io_buffer_size);
97 edge_metrics_buffer_ = std::vector<char>(configuration_.io_buffer_size);
98 status_buffer_ = std::vector<char>(configuration_.io_buffer_size);
99
100 core_metrics_stream_.rdbuf()->pubsetbuf(core_metrics_buffer_.data(), core_metrics_buffer_.size());
101 edge_metrics_stream_.rdbuf()->pubsetbuf(edge_metrics_buffer_.data(), edge_metrics_buffer_.size());
102 status_stream_.rdbuf()->pubsetbuf(status_buffer_.data(), status_buffer_.size());
103 }
104
105 // Setup recording state files
106 recording_state_path_ = base_directory_ / "recording_state.json";
107 recording_command_path_ = base_directory_ / "recording_command.json";
108
109 // Initialize recording state
110 std::ofstream recording_state(recording_state_path_, std::ios::out | std::ios::trunc);
111 if (recording_state.is_open()) {
112 recording_state << R"({"recording":false,"timestamp":)"
113 << this->GetCurrentTimestampUs() << "}\n";
114 recording_state.close();
115 }
116
117 // Remove any stale command file
118 if (std::filesystem::exists(recording_command_path_)) {
119 std::error_code ec;
120 std::filesystem::remove(recording_command_path_, ec);
121 if (ec && configuration_.enable_debug_logging) {
122 LOG(WARNING) << "Failed to remove stale command file: " << ec.message();
123 }
124 }
125
126 is_operational_ = true;
127 return absl::OkStatus();
128}
129
130template<container::settings::IntegrationMode TIntegrationMode>
131absl::Status FileIpcStreamWriter<TIntegrationMode>::WriteStatus(const presage::physiology::StatusValue& status) {
132 if (!is_operational_) {
133 return absl::FailedPreconditionError("Writer is not operational");
134 }
135
136 std::lock_guard<std::mutex> lock(write_mutex_);
137 std::string payload = BuildEnvelope("status", BuildStatusPayload(status), status.timestamp());
138 return WriteJsonLine(status_stream_, payload);
139}
140
141template<container::settings::IntegrationMode TIntegrationMode>
143 const presage::physiology::MetricsBuffer& metrics,
144 int64_t timestamp_us
145) {
146 if (!is_operational_) {
147 return absl::FailedPreconditionError("Writer is not operational");
148 }
149
150 std::string json;
151 auto status = (google::protobuf::util::MessageToJsonString(metrics, &json, GetJsonOptions()));
152 if (!status.ok()) {
153 return absl::InternalError(
154 absl::StrCat("Failed to serialize MetricsBuffer: ", status.message().as_string())
155 );
156 }
157
158 std::lock_guard<std::mutex> lock(write_mutex_);
159 std::string payload = BuildEnvelope("core_metrics", json, timestamp_us);
160 return WriteJsonLine(core_metrics_stream_, payload);
161}
162
163template<container::settings::IntegrationMode TIntegrationMode>
165 const presage::physiology::Metrics& metrics, int64_t timestamp_us
166) {
167 if (!is_operational_) {
168 return absl::FailedPreconditionError("Writer is not operational");
169 }
170
171 std::string json;
172 auto status = google::protobuf::util::MessageToJsonString(metrics, &json, GetJsonOptions());
173 if (!status.ok()) {
174 return absl::InternalError(
175 absl::StrCat("Failed to serialize Metrics: ", status.message().as_string())
176 );
177 }
178
179 std::lock_guard<std::mutex> lock(write_mutex_);
180 std::string payload = BuildEnvelope("edge_metrics", json, timestamp_us);
181 return WriteJsonLine(edge_metrics_stream_, payload);
182}
183
184template<container::settings::IntegrationMode TIntegrationMode>
185absl::Status FileIpcStreamWriter<TIntegrationMode>::WriteFrame(const cv::Mat& frame, int64_t timestamp_us) {
186 if (!is_operational_) {
187 return absl::FailedPreconditionError("Writer is not operational");
188 }
189
190 if (frame.empty()) {
191 return absl::InvalidArgumentError("Frame is empty");
192 }
193
194 if (!turbo_jpeg_compressor_) {
195 return absl::FailedPreconditionError("TurboJPEG compressor not initialized");
196 }
197
198 // Use fixed paths like hud_frame_streamer - only maintain latest.jpg and latest.json
199 std::lock_guard<std::mutex> lock(frame_mutex_);
200
201 std::filesystem::path frame_path = frames_dir_ / "latest.jpg";
202 std::filesystem::path temp_frame_path = frames_dir_ / "latest.jpg.tmp";
203 std::filesystem::path metadata_path = frames_dir_ / "latest.json";
204 std::filesystem::path temp_metadata_path = frames_dir_ / "latest.json.tmp";
205
206 // Compress frame using TurboJPEG
207 std::vector<unsigned char> jpeg_data;
208 auto compress_status = CompressFrameWithTurboJpeg(frame, jpeg_data);
209 if (!compress_status.ok()) {
210 return compress_status;
211 }
212
213 // Write compressed frame to temporary file first (atomic write)
214 std::ofstream frame_stream(temp_frame_path, std::ios::binary | std::ios::trunc);
215 if (!frame_stream.is_open()) {
216 return absl::InternalError(absl::StrCat("Failed to open ", temp_frame_path.string(), " for writing"));
217 }
218
219 frame_stream.write(reinterpret_cast<const char*>(jpeg_data.data()), jpeg_data.size());
220 frame_stream.close();
221
222 if (!frame_stream) {
223 return absl::InternalError(absl::StrCat("Failed to write frame to ", temp_frame_path.string()));
224 }
225
226 // Atomically rename frame to final path
227 std::error_code ec;
228 // Remove existing file first to avoid issues on some filesystems
229 std::filesystem::remove(frame_path, ec);
230 ec.clear();
231 std::filesystem::rename(temp_frame_path, frame_path, ec);
232 if (ec) {
233 return absl::InternalError(absl::StrCat("Failed to rename frame file: ", ec.message()));
234 }
235
236 // Write metadata JSON file
237 std::ostringstream metadata;
238 metadata << '{'
239 << "\"timestamp\":" << timestamp_us << ','
240 << "\"width\":" << frame.cols << ','
241 << "\"height\":" << frame.rows << ','
242 << "\"channels\":" << frame.channels()
243 << '}';
244
245 std::ofstream metadata_stream(temp_metadata_path, std::ios::out | std::ios::trunc);
246 if (!metadata_stream.is_open()) {
247 // Log warning but don't fail - frame is already written
248 LOG(WARNING) << "Failed to open metadata file: " << temp_metadata_path;
249 return absl::OkStatus();
250 }
251
252 metadata_stream << metadata.str() << '\n';
253 metadata_stream.close();
254
255 if (!metadata_stream) {
256 LOG(WARNING) << "Failed to write metadata file: " << temp_metadata_path;
257 return absl::OkStatus();
258 }
259
260 // Atomically rename metadata to final path
261 std::filesystem::remove(metadata_path, ec);
262 ec.clear();
263 std::filesystem::rename(temp_metadata_path, metadata_path, ec);
264 if (ec) {
265 LOG(WARNING) << "Failed to rename metadata file: " << ec.message();
266 }
267
268 return absl::OkStatus();
269}
270
271template<container::settings::IntegrationMode TIntegrationMode>
273 is_operational_ = false;
274 std::lock_guard<std::mutex> lock(write_mutex_);
275
276 if (core_metrics_stream_.is_open()) {
277 core_metrics_stream_.flush();
278 core_metrics_stream_.close();
279 }
280 if (edge_metrics_stream_.is_open()) {
281 edge_metrics_stream_.flush();
282 edge_metrics_stream_.close();
283 }
284 if (status_stream_.is_open()) {
285 status_stream_.flush();
286 status_stream_.close();
287 }
288}
289
290template<container::settings::IntegrationMode TIntegrationMode>
292 return is_operational_;
293}
294
295template<container::settings::IntegrationMode TIntegrationMode>
297 if (!is_operational_) {
298 return absl::FailedPreconditionError("Writer is not operational");
299 }
300
301 std::lock_guard<std::mutex> lock(write_mutex_);
302 core_metrics_stream_.flush();
303 edge_metrics_stream_.flush();
304 status_stream_.flush();
305
306 if (core_metrics_stream_.fail() || edge_metrics_stream_.fail() || status_stream_.fail()) {
307 return absl::InternalError("Failed to flush one or more streams");
308 }
309
310 return absl::OkStatus();
311}
312
313template<container::settings::IntegrationMode TIntegrationMode>
314absl::Status FileIpcStreamWriter<TIntegrationMode>::WriteJsonLine(std::ofstream& stream, const std::string& json_data) {
315 if (!stream.is_open()) {
316 return absl::FailedPreconditionError("Stream is not open");
317 }
318
319 stream << json_data << '\n';
320 stream.flush();
321
322 if (stream.fail()) {
323 return absl::InternalError("Failed to write to stream");
324 }
325
326 return absl::OkStatus();
327}
328
329template<container::settings::IntegrationMode TIntegrationMode>
331 const std::string& type,
332 const std::string& json_payload,
333 int64_t timestamp_us
334) {
335 std::ostringstream envelope;
336 envelope << '{'
337 << "\"type\":\"" << type << "\","
338 << "\"timestamp\":" << timestamp_us << ','
339 << "\"payload\":" << json_payload
340 << '}';
341 return envelope.str();
342}
343
344template<container::settings::IntegrationMode TIntegrationMode>
345std::string FileIpcStreamWriter<TIntegrationMode>::BuildStatusPayload(const presage::physiology::StatusValue& status) {
346 std::ostringstream payload;
347 payload << '{'
348 << "\"value\":" << status.value() << ','
349 << "\"timestamp\":" << status.timestamp() << ','
350 << "\"description\":\""
351 << JsonEscape(presage::physiology::GetStatusDescription(status.value())) << "\","
352 << "\"hint\":\""
353 << JsonEscape(presage::physiology::GetStatusHint(status.value())) << "\""
354 << '}';
355 return payload.str();
356}
357
358template<container::settings::IntegrationMode TIntegrationMode>
359std::string FileIpcStreamWriter<TIntegrationMode>::JsonEscape(const std::string& value) {
360 std::string escaped;
361 escaped.reserve(value.size() * 2);
362 for (unsigned char ch : value) {
363 switch (ch) {
364 case '\\': escaped += "\\\\"; break;
365 case '\"': escaped += "\\\""; break;
366 case '\n': escaped += "\\n"; break;
367 case '\r': escaped += "\\r"; break;
368 case '\t': escaped += "\\t"; break;
369 case '\b': escaped += "\\b"; break;
370 case '\f': escaped += "\\f"; break;
371 default:
372 if (ch < 0x20) {
373 escaped += "\\u00";
374 escaped += "0123456789abcdef"[ch >> 4];
375 escaped += "0123456789abcdef"[ch & 0x0f];
376 } else {
377 escaped.push_back(static_cast<char>(ch));
378 }
379 break;
380 }
381 }
382 return escaped;
383}
384
385template<container::settings::IntegrationMode TIntegrationMode>
386const google::protobuf::util::JsonPrintOptions& FileIpcStreamWriter<TIntegrationMode>::GetJsonOptions() {
387 static google::protobuf::util::JsonPrintOptions options = []() {
388 google::protobuf::util::JsonPrintOptions opts;
389 opts.preserve_proto_field_names = true;
390 opts.add_whitespace = false;
391 opts.always_print_primitive_fields = false;
392 return opts;
393 }();
394 return options;
395}
396
397template<container::settings::IntegrationMode TIntegrationMode>
399 const cv::Mat& frame,
400 std::vector<unsigned char>& jpeg_data
401) {
402 if (!turbo_jpeg_compressor_) {
403 return absl::FailedPreconditionError("TurboJPEG compressor not initialized");
404 }
405
406 if (frame.depth() != CV_8U) {
407 return absl::InvalidArgumentError("Frame must have 8-bit depth for JPEG compression");
408 }
409
410 // Determine pixel format based on number of channels
411 int pixel_format = TJPF_BGR;
412 switch (frame.channels()) {
413 case 1:
414 pixel_format = TJPF_GRAY;
415 break;
416 case 3:
417 pixel_format = TJPF_BGR;
418 break;
419 case 4:
420 pixel_format = TJPF_BGRA;
421 break;
422 default:
423 return absl::InvalidArgumentError(
424 absl::StrCat("Unsupported number of channels: ", frame.channels())
425 );
426 }
427
428 // Prepare compression parameters
429 unsigned char* raw_jpeg = nullptr;
430 unsigned long jpeg_size = 0;
431
432 const int subsample = TJSAMP_420; // 4:2:0 subsampling for better compression
433 const int flags = TJFLAG_FASTUPSAMPLE | TJFLAG_FASTDCT; // Fast compression flags
434
435 // Check that frame stride doesn't exceed TurboJPEG limits
436 if (frame.step > static_cast<size_t>(std::numeric_limits<int>::max())) {
437 return absl::InvalidArgumentError("Frame stride exceeds TurboJPEG limits");
438 }
439 const int pitch = frame.step ? static_cast<int>(frame.step) : 0;
440
441 // Perform compression
442 int compress_result = tjCompress2(
443 turbo_jpeg_compressor_,
444 frame.data,
445 frame.cols,
446 pitch,
447 frame.rows,
448 pixel_format,
449 &raw_jpeg,
450 &jpeg_size,
451 subsample,
452 configuration_.jpeg_quality, // Use configured JPEG quality
453 flags
454 );
455
456 if (compress_result != 0) {
457 // Clean up if allocated
458 if (raw_jpeg) {
459 tjFree(raw_jpeg);
460 }
461
462 // Get error message from TurboJPEG
463 const char* error_msg = tjGetErrorStr2(turbo_jpeg_compressor_);
464 std::string error_text = error_msg ? error_msg : "unknown error";
465 return absl::InternalError(absl::StrCat("TurboJPEG compression failed: ", error_text));
466 }
467
468 // Copy compressed data to output vector
469 jpeg_data.clear();
470 jpeg_data.reserve(jpeg_size);
471 jpeg_data.assign(raw_jpeg, raw_jpeg + jpeg_size);
472
473 // Free TurboJPEG buffer
474 tjFree(raw_jpeg);
475
476 if (configuration_.enable_debug_logging) {
477 LOG(INFO) << "Compressed frame from " << (frame.total() * frame.elemSize())
478 << " bytes to " << jpeg_size << " bytes (quality="
479 << configuration_.jpeg_quality << ")";
480 }
481
482 return absl::OkStatus();
483}
484
485} // namespace presage::smartspectra::file_ipc
static std::string BuildStatusPayload(const presage::physiology::StatusValue &status)
Definition file_ipc_stream_writer_impl.hpp:345
absl::Status Flush() override
Definition file_ipc_stream_writer_impl.hpp:296
static const google::protobuf::util::JsonPrintOptions & GetJsonOptions()
Definition file_ipc_stream_writer_impl.hpp:386
void Close() override
Definition file_ipc_stream_writer_impl.hpp:272
absl::Status WriteJsonLine(std::ofstream &stream, const std::string &json_data)
Definition file_ipc_stream_writer_impl.hpp:314
absl::Status WriteCoreMetrics(const presage::physiology::MetricsBuffer &metrics, int64_t timestamp_us) override
Definition file_ipc_stream_writer_impl.hpp:142
absl::Status Initialize()
Definition file_ipc_stream_writer_impl.hpp:50
absl::Status WriteStatus(const presage::physiology::StatusValue &status) override
Definition file_ipc_stream_writer_impl.hpp:131
absl::Status CompressFrameWithTurboJpeg(const cv::Mat &frame, std::vector< unsigned char > &jpeg_data)
Definition file_ipc_stream_writer_impl.hpp:398
bool IsOperational() const override
Definition file_ipc_stream_writer_impl.hpp:291
static std::string BuildEnvelope(const std::string &type, const std::string &json_payload, int64_t timestamp_us)
Definition file_ipc_stream_writer_impl.hpp:330
absl::Status WriteEdgeMetrics(const presage::physiology::Metrics &metrics, int64_t timestamp_us) override
Definition file_ipc_stream_writer_impl.hpp:164
absl::Status WriteFrame(const cv::Mat &frame, int64_t timestamp_us) override
Definition file_ipc_stream_writer_impl.hpp:185
static std::string JsonEscape(const std::string &value)
Definition file_ipc_stream_writer_impl.hpp:359
Definition file_ipc_configuration.hpp:27