16#include <absl/strings/str_cat.h>
17#include <physiology/modules/messages/status.h>
19#include "file_ipc_stream_writer.hpp"
21namespace presage::smartspectra::file_ipc {
23template<container::settings::IntegrationMode TIntegrationMode>
24FileIpcStreamWriter<TIntegrationMode>::FileIpcStreamWriter(
const FileIpcConfiguration& configuration)
25 : configuration_(configuration), base_directory_(configuration.base_directory) {
27 turbo_jpeg_compressor_ = tjInitCompress();
28 if (!turbo_jpeg_compressor_) {
29 LOG(ERROR) <<
"Failed to initialize TurboJPEG compressor";
32 auto status = Initialize();
34 LOG(ERROR) <<
"Failed to initialize FileIpcStreamWriter: " << status.message();
38template<container::settings::IntegrationMode TIntegrationMode>
39FileIpcStreamWriter<TIntegrationMode>::~FileIpcStreamWriter() {
43 if (turbo_jpeg_compressor_) {
44 tjDestroy(turbo_jpeg_compressor_);
45 turbo_jpeg_compressor_ =
nullptr;
49template<container::settings::IntegrationMode TIntegrationMode>
51 std::lock_guard<std::mutex> lock(write_mutex_);
54 if (configuration_.create_if_missing) {
56 std::filesystem::create_directories(base_directory_, ec);
58 return absl::InternalError(
59 absl::StrCat(
"Failed to create base directory ", base_directory_.string(),
": ", ec.message())
64 if (!std::filesystem::exists(base_directory_)) {
65 return absl::NotFoundError(absl::StrCat(
"Base directory does not exist: ", base_directory_.string()));
69 frames_dir_ = base_directory_ /
"frames";
70 if (configuration_.create_if_missing) {
72 std::filesystem::create_directories(frames_dir_, ec);
74 return absl::InternalError(
75 absl::StrCat(
"Failed to create frames directory: ", ec.message())
81 auto open_mode = configuration_.truncate_on_init
82 ? (std::ios::out | std::ios::trunc)
83 : (std::ios::out | std::ios::app);
85 core_metrics_stream_.open(base_directory_ /
"core_metrics.jsonl", open_mode);
86 edge_metrics_stream_.open(base_directory_ /
"edge_metrics.jsonl", open_mode);
87 status_stream_.open(base_directory_ /
"status_codes.jsonl", open_mode);
89 if (!core_metrics_stream_.is_open() || !edge_metrics_stream_.is_open() || !status_stream_.is_open()) {
90 return absl::InternalError(
"Failed to open one or more output streams");
94 if (configuration_.io_buffer_size > 0) {
96 core_metrics_buffer_ = std::vector<char>(configuration_.io_buffer_size);
97 edge_metrics_buffer_ = std::vector<char>(configuration_.io_buffer_size);
98 status_buffer_ = std::vector<char>(configuration_.io_buffer_size);
100 core_metrics_stream_.rdbuf()->pubsetbuf(core_metrics_buffer_.data(), core_metrics_buffer_.size());
101 edge_metrics_stream_.rdbuf()->pubsetbuf(edge_metrics_buffer_.data(), edge_metrics_buffer_.size());
102 status_stream_.rdbuf()->pubsetbuf(status_buffer_.data(), status_buffer_.size());
106 recording_state_path_ = base_directory_ /
"recording_state.json";
107 recording_command_path_ = base_directory_ /
"recording_command.json";
110 std::ofstream recording_state(recording_state_path_, std::ios::out | std::ios::trunc);
111 if (recording_state.is_open()) {
112 recording_state << R
"({"recording":false,"timestamp":)"
113 << this->GetCurrentTimestampUs() << "}\n";
114 recording_state.close();
118 if (std::filesystem::exists(recording_command_path_)) {
120 std::filesystem::remove(recording_command_path_, ec);
121 if (ec && configuration_.enable_debug_logging) {
122 LOG(WARNING) <<
"Failed to remove stale command file: " << ec.message();
126 is_operational_ =
true;
127 return absl::OkStatus();
130template<container::settings::IntegrationMode TIntegrationMode>
132 if (!is_operational_) {
133 return absl::FailedPreconditionError(
"Writer is not operational");
136 std::lock_guard<std::mutex> lock(write_mutex_);
141template<container::settings::IntegrationMode TIntegrationMode>
143 const presage::physiology::MetricsBuffer& metrics,
146 if (!is_operational_) {
147 return absl::FailedPreconditionError(
"Writer is not operational");
151 auto status = (google::protobuf::util::MessageToJsonString(metrics, &json,
GetJsonOptions()));
153 return absl::InternalError(
154 absl::StrCat(
"Failed to serialize MetricsBuffer: ", status.message().as_string())
158 std::lock_guard<std::mutex> lock(write_mutex_);
159 std::string payload =
BuildEnvelope(
"core_metrics", json, timestamp_us);
163template<container::settings::IntegrationMode TIntegrationMode>
165 const presage::physiology::Metrics& metrics, int64_t timestamp_us
167 if (!is_operational_) {
168 return absl::FailedPreconditionError(
"Writer is not operational");
172 auto status = google::protobuf::util::MessageToJsonString(metrics, &json,
GetJsonOptions());
174 return absl::InternalError(
175 absl::StrCat(
"Failed to serialize Metrics: ", status.message().as_string())
179 std::lock_guard<std::mutex> lock(write_mutex_);
180 std::string payload =
BuildEnvelope(
"edge_metrics", json, timestamp_us);
184template<container::settings::IntegrationMode TIntegrationMode>
186 if (!is_operational_) {
187 return absl::FailedPreconditionError(
"Writer is not operational");
191 return absl::InvalidArgumentError(
"Frame is empty");
194 if (!turbo_jpeg_compressor_) {
195 return absl::FailedPreconditionError(
"TurboJPEG compressor not initialized");
199 std::lock_guard<std::mutex> lock(frame_mutex_);
201 std::filesystem::path frame_path = frames_dir_ /
"latest.jpg";
202 std::filesystem::path temp_frame_path = frames_dir_ /
"latest.jpg.tmp";
203 std::filesystem::path metadata_path = frames_dir_ /
"latest.json";
204 std::filesystem::path temp_metadata_path = frames_dir_ /
"latest.json.tmp";
207 std::vector<unsigned char> jpeg_data;
209 if (!compress_status.ok()) {
210 return compress_status;
214 std::ofstream frame_stream(temp_frame_path, std::ios::binary | std::ios::trunc);
215 if (!frame_stream.is_open()) {
216 return absl::InternalError(absl::StrCat(
"Failed to open ", temp_frame_path.string(),
" for writing"));
219 frame_stream.write(
reinterpret_cast<const char*
>(jpeg_data.data()), jpeg_data.size());
220 frame_stream.close();
223 return absl::InternalError(absl::StrCat(
"Failed to write frame to ", temp_frame_path.string()));
229 std::filesystem::remove(frame_path, ec);
231 std::filesystem::rename(temp_frame_path, frame_path, ec);
233 return absl::InternalError(absl::StrCat(
"Failed to rename frame file: ", ec.message()));
237 std::ostringstream metadata;
239 <<
"\"timestamp\":" << timestamp_us <<
','
240 <<
"\"width\":" << frame.cols <<
','
241 <<
"\"height\":" << frame.rows <<
','
242 <<
"\"channels\":" << frame.channels()
245 std::ofstream metadata_stream(temp_metadata_path, std::ios::out | std::ios::trunc);
246 if (!metadata_stream.is_open()) {
248 LOG(WARNING) <<
"Failed to open metadata file: " << temp_metadata_path;
249 return absl::OkStatus();
252 metadata_stream << metadata.str() <<
'\n';
253 metadata_stream.close();
255 if (!metadata_stream) {
256 LOG(WARNING) <<
"Failed to write metadata file: " << temp_metadata_path;
257 return absl::OkStatus();
261 std::filesystem::remove(metadata_path, ec);
263 std::filesystem::rename(temp_metadata_path, metadata_path, ec);
265 LOG(WARNING) <<
"Failed to rename metadata file: " << ec.message();
268 return absl::OkStatus();
271template<container::settings::IntegrationMode TIntegrationMode>
273 is_operational_ =
false;
274 std::lock_guard<std::mutex> lock(write_mutex_);
276 if (core_metrics_stream_.is_open()) {
277 core_metrics_stream_.flush();
278 core_metrics_stream_.close();
280 if (edge_metrics_stream_.is_open()) {
281 edge_metrics_stream_.flush();
282 edge_metrics_stream_.close();
284 if (status_stream_.is_open()) {
285 status_stream_.flush();
286 status_stream_.close();
290template<container::settings::IntegrationMode TIntegrationMode>
292 return is_operational_;
295template<container::settings::IntegrationMode TIntegrationMode>
297 if (!is_operational_) {
298 return absl::FailedPreconditionError(
"Writer is not operational");
301 std::lock_guard<std::mutex> lock(write_mutex_);
302 core_metrics_stream_.flush();
303 edge_metrics_stream_.flush();
304 status_stream_.flush();
306 if (core_metrics_stream_.fail() || edge_metrics_stream_.fail() || status_stream_.fail()) {
307 return absl::InternalError(
"Failed to flush one or more streams");
310 return absl::OkStatus();
313template<container::settings::IntegrationMode TIntegrationMode>
315 if (!stream.is_open()) {
316 return absl::FailedPreconditionError(
"Stream is not open");
319 stream << json_data <<
'\n';
323 return absl::InternalError(
"Failed to write to stream");
326 return absl::OkStatus();
329template<container::settings::IntegrationMode TIntegrationMode>
331 const std::string& type,
332 const std::string& json_payload,
335 std::ostringstream envelope;
337 <<
"\"type\":\"" << type <<
"\","
338 <<
"\"timestamp\":" << timestamp_us <<
','
339 <<
"\"payload\":" << json_payload
341 return envelope.str();
344template<container::settings::IntegrationMode TIntegrationMode>
346 std::ostringstream payload;
348 <<
"\"value\":" << status.value() <<
','
349 <<
"\"timestamp\":" << status.timestamp() <<
','
350 <<
"\"description\":\""
351 <<
JsonEscape(presage::physiology::GetStatusDescription(status.value())) <<
"\","
353 <<
JsonEscape(presage::physiology::GetStatusHint(status.value())) <<
"\""
355 return payload.str();
358template<container::settings::IntegrationMode TIntegrationMode>
361 escaped.reserve(value.size() * 2);
362 for (
unsigned char ch : value) {
364 case '\\': escaped +=
"\\\\";
break;
365 case '\"': escaped +=
"\\\"";
break;
366 case '\n': escaped +=
"\\n";
break;
367 case '\r': escaped +=
"\\r";
break;
368 case '\t': escaped +=
"\\t";
break;
369 case '\b': escaped +=
"\\b";
break;
370 case '\f': escaped +=
"\\f";
break;
374 escaped +=
"0123456789abcdef"[ch >> 4];
375 escaped +=
"0123456789abcdef"[ch & 0x0f];
377 escaped.push_back(
static_cast<char>(ch));
385template<container::settings::IntegrationMode TIntegrationMode>
387 static google::protobuf::util::JsonPrintOptions options = []() {
388 google::protobuf::util::JsonPrintOptions opts;
389 opts.preserve_proto_field_names =
true;
390 opts.add_whitespace =
false;
391 opts.always_print_primitive_fields =
false;
397template<container::settings::IntegrationMode TIntegrationMode>
399 const cv::Mat& frame,
400 std::vector<unsigned char>& jpeg_data
402 if (!turbo_jpeg_compressor_) {
403 return absl::FailedPreconditionError(
"TurboJPEG compressor not initialized");
406 if (frame.depth() != CV_8U) {
407 return absl::InvalidArgumentError(
"Frame must have 8-bit depth for JPEG compression");
411 int pixel_format = TJPF_BGR;
412 switch (frame.channels()) {
414 pixel_format = TJPF_GRAY;
417 pixel_format = TJPF_BGR;
420 pixel_format = TJPF_BGRA;
423 return absl::InvalidArgumentError(
424 absl::StrCat(
"Unsupported number of channels: ", frame.channels())
429 unsigned char* raw_jpeg =
nullptr;
430 unsigned long jpeg_size = 0;
432 const int subsample = TJSAMP_420;
433 const int flags = TJFLAG_FASTUPSAMPLE | TJFLAG_FASTDCT;
436 if (frame.step >
static_cast<size_t>(std::numeric_limits<int>::max())) {
437 return absl::InvalidArgumentError(
"Frame stride exceeds TurboJPEG limits");
439 const int pitch = frame.step ?
static_cast<int>(frame.step) : 0;
442 int compress_result = tjCompress2(
443 turbo_jpeg_compressor_,
452 configuration_.jpeg_quality,
456 if (compress_result != 0) {
463 const char* error_msg = tjGetErrorStr2(turbo_jpeg_compressor_);
464 std::string error_text = error_msg ? error_msg :
"unknown error";
465 return absl::InternalError(absl::StrCat(
"TurboJPEG compression failed: ", error_text));
470 jpeg_data.reserve(jpeg_size);
471 jpeg_data.assign(raw_jpeg, raw_jpeg + jpeg_size);
476 if (configuration_.enable_debug_logging) {
477 LOG(INFO) <<
"Compressed frame from " << (frame.total() * frame.elemSize())
478 <<
" bytes to " << jpeg_size <<
" bytes (quality="
479 << configuration_.jpeg_quality <<
")";
482 return absl::OkStatus();
static std::string BuildStatusPayload(const presage::physiology::StatusValue &status)
Definition file_ipc_stream_writer_impl.hpp:345
absl::Status Flush() override
Definition file_ipc_stream_writer_impl.hpp:296
static const google::protobuf::util::JsonPrintOptions & GetJsonOptions()
Definition file_ipc_stream_writer_impl.hpp:386
void Close() override
Definition file_ipc_stream_writer_impl.hpp:272
absl::Status WriteJsonLine(std::ofstream &stream, const std::string &json_data)
Definition file_ipc_stream_writer_impl.hpp:314
absl::Status WriteCoreMetrics(const presage::physiology::MetricsBuffer &metrics, int64_t timestamp_us) override
Definition file_ipc_stream_writer_impl.hpp:142
absl::Status Initialize()
Definition file_ipc_stream_writer_impl.hpp:50
absl::Status WriteStatus(const presage::physiology::StatusValue &status) override
Definition file_ipc_stream_writer_impl.hpp:131
absl::Status CompressFrameWithTurboJpeg(const cv::Mat &frame, std::vector< unsigned char > &jpeg_data)
Definition file_ipc_stream_writer_impl.hpp:398
bool IsOperational() const override
Definition file_ipc_stream_writer_impl.hpp:291
static std::string BuildEnvelope(const std::string &type, const std::string &json_payload, int64_t timestamp_us)
Definition file_ipc_stream_writer_impl.hpp:330
absl::Status WriteEdgeMetrics(const presage::physiology::Metrics &metrics, int64_t timestamp_us) override
Definition file_ipc_stream_writer_impl.hpp:164
absl::Status WriteFrame(const cv::Mat &frame, int64_t timestamp_us) override
Definition file_ipc_stream_writer_impl.hpp:185
static std::string JsonEscape(const std::string &value)
Definition file_ipc_stream_writer_impl.hpp:359
Definition file_ipc_configuration.hpp:27