9#include "redis_ipc_stream_writer.hpp"
12#include <glog/logging.h>
13#include <absl/strings/escaping.h>
14#include <google/protobuf/util/json_util.h>
15#include <mediapipe/framework/port/ret_check.h>
16#include "../json_envelope.hpp"
17#include "../turbojpeg_compressor.hpp"
19namespace presage::smartspectra::redis_ipc {
21template<container::settings::IntegrationMode TIntegrationMode>
22RedisIpcStreamWriter<TIntegrationMode>::RedisIpcStreamWriter(
const RedisIpcConfig& configuration)
23 : configuration_(configuration) {
24 redis_client_.SetResponseLimits(
25 configuration_.max_bulk_string_bytes,
26 configuration_.max_array_elements
28 auto status = Initialize();
30 LOG(ERROR) <<
"Failed to initialize RedisIpcStreamWriter: " << status.message();
34template<container::settings::IntegrationMode TIntegrationMode>
35RedisIpcStreamWriter<TIntegrationMode>::~RedisIpcStreamWriter() {
39template<container::settings::IntegrationMode TIntegrationMode>
41 std::lock_guard<std::mutex> lock(write_mutex_);
44 if (!redis_client_.Connect(
47 configuration_.connection_timeout
49 return absl::InternalError(
50 "Failed to connect to Redis: " + redis_client_.last_error()
55 if (!redis_client_.Del(
BuildKey(configuration_.keys.command_queue))) {
56 if (configuration_.enable_debug_logging) {
57 LOG(WARNING) <<
"Failed to clear Redis recording commands: "
58 << redis_client_.last_error();
63 const std::string initial_state = R
"({"recording":false,"timestamp":)" +
64 std::to_string(this->GetCurrentTimestampUs()) + "}";
66 if (!redis_client_.Set(
BuildKey(configuration_.keys.recording_state), initial_state)) {
67 if (configuration_.enable_debug_logging) {
68 LOG(WARNING) <<
"Failed to set initial Redis recording state: "
69 << redis_client_.last_error();
74 if (!redis_client_.Publish(
BuildChannel(
"recording_state"), initial_state)) {
75 if (configuration_.enable_debug_logging) {
76 LOG(WARNING) <<
"Failed to publish initial recording state: "
77 << redis_client_.last_error();
81 is_operational_ =
true;
82 return absl::OkStatus();
85template<container::settings::IntegrationMode TIntegrationMode>
87 const presage::physiology::StatusValue& status
89 if (!is_operational_) {
90 return absl::FailedPreconditionError(
"Writer is not operational");
93 std::lock_guard<std::mutex> lock(write_mutex_);
95 const std::string payload = BuildJsonEnvelope(
97 BuildStatusPayload(status),
101 if (!redis_client_.Publish(
BuildChannel(configuration_.channels.status), payload)) {
102 if (publish_error_count_++ < kMaxErrorLogs) {
103 LOG(WARNING) <<
"Failed to publish status to Redis: "
104 << redis_client_.last_error();
108 return absl::OkStatus();
111template<container::settings::IntegrationMode TIntegrationMode>
113 const presage::physiology::MetricsBuffer& metrics,
116 if (!is_operational_) {
117 return absl::FailedPreconditionError(
"Writer is not operational");
121 auto status = google::protobuf::util::MessageToJsonString(metrics, &json, GetJsonOptions());
123 return absl::InternalError(
"Failed to serialize MetricsBuffer: " + status.message().as_string());
126 std::lock_guard<std::mutex> lock(write_mutex_);
128 const std::string payload = BuildJsonEnvelope(
"core_metrics", json, timestamp_us);
130 if (!redis_client_.Publish(
BuildChannel(configuration_.channels.core_metrics), payload)) {
131 if (publish_error_count_++ < kMaxErrorLogs) {
132 LOG(WARNING) <<
"Failed to publish core metrics to Redis: "
133 << redis_client_.last_error();
137 return absl::OkStatus();
140template<container::settings::IntegrationMode TIntegrationMode>
142 const presage::physiology::Metrics& metrics, int64_t timestamp_us
144 if (!is_operational_) {
145 return absl::FailedPreconditionError(
"Writer is not operational");
149 auto status = google::protobuf::util::MessageToJsonString(metrics, &json, GetJsonOptions());
151 return absl::InternalError(
"Failed to serialize Metrics: " + status.message().as_string());
154 std::lock_guard<std::mutex> lock(write_mutex_);
156 const std::string payload = BuildJsonEnvelope(
"edge_metrics", json, timestamp_us);
158 if (!redis_client_.Publish(
BuildChannel(configuration_.channels.edge_metrics), payload)) {
159 if (publish_error_count_++ < kMaxErrorLogs) {
160 LOG(WARNING) <<
"Failed to publish edge metrics to Redis: "
161 << redis_client_.last_error();
165 return absl::OkStatus();
168template<container::settings::IntegrationMode TIntegrationMode>
170 const cv::Mat& frame,
173 if (!is_operational_) {
174 return absl::FailedPreconditionError(
"Writer is not operational");
178 return absl::InvalidArgumentError(
"Frame is empty");
185 std::vector<unsigned char> jpeg_data;
186 MP_RETURN_IF_ERROR(compressor.Compress(frame, 90, jpeg_data));
190 absl::Base64Escape(absl::string_view(
191 reinterpret_cast<const char*
>(jpeg_data.data()),
195 std::lock_guard<std::mutex> lock(write_mutex_);
198 std::ostringstream payload;
200 <<
"\"timestamp\":" << timestamp_us <<
','
201 <<
"\"width\":" << frame.cols <<
','
202 <<
"\"height\":" << frame.rows <<
','
203 <<
"\"channels\":" << frame.channels() <<
','
204 <<
"\"jpeg_base64\":\"" << base64 <<
"\""
207 const std::string payload_str = payload.str();
210 if (!redis_client_.Publish(
BuildChannel(configuration_.channels.hud_frame), payload_str)) {
211 if (publish_error_count_++ < kMaxErrorLogs) {
212 LOG(WARNING) <<
"Failed to publish frame to Redis: "
213 << redis_client_.last_error();
217 return absl::OkStatus();
220template<container::settings::IntegrationMode TIntegrationMode>
222 is_operational_ =
false;
223 std::lock_guard<std::mutex> lock(write_mutex_);
224 redis_client_.Close();
227template<container::settings::IntegrationMode TIntegrationMode>
229 return is_operational_;
232template<container::settings::IntegrationMode TIntegrationMode>
235 return absl::OkStatus();
238template<container::settings::IntegrationMode TIntegrationMode>
240 if (!is_operational_) {
241 return absl::FailedPreconditionError(
"Writer is not operational");
244 std::lock_guard<std::mutex> lock(write_mutex_);
247 const std::string state = R
"({"recording":)" + std::string(recording ? "true" :
"false") +
248 R
"(,"timestamp":)" + std::to_string(this->GetCurrentTimestampUs()) + "}";
251 if (!redis_client_.Publish(
BuildChannel(
"recording_state"), state)) {
252 if (configuration_.enable_debug_logging) {
253 LOG(WARNING) <<
"Failed to publish recording state: " << redis_client_.last_error();
255 return absl::InternalError(
"Failed to publish recording state: " + redis_client_.last_error());
259 if (!redis_client_.Set(
BuildKey(configuration_.keys.recording_state), state)) {
260 if (configuration_.enable_debug_logging) {
261 LOG(WARNING) <<
"Failed to set recording state key: " << redis_client_.last_error();
265 return absl::OkStatus();
268template<container::settings::IntegrationMode TIntegrationMode>
270 if (configuration_.key_prefix.empty()) {
273 return configuration_.key_prefix +
":" + suffix;
276template<container::settings::IntegrationMode TIntegrationMode>
static absl::StatusOr< TurboJpegCompressor > Create()
Definition turbojpeg_compressor.cpp:14
absl::Status WriteCoreMetrics(const presage::physiology::MetricsBuffer &metrics, int64_t timestamp_us) override
Definition redis_ipc_stream_writer_impl.hpp:112
absl::Status Flush() override
Definition redis_ipc_stream_writer_impl.hpp:233
absl::Status PublishRecordingState(bool recording)
Definition redis_ipc_stream_writer_impl.hpp:239
std::string BuildKey(const std::string &suffix) const
Definition redis_ipc_stream_writer_impl.hpp:269
absl::Status WriteFrame(const cv::Mat &frame, int64_t timestamp_us) override
Definition redis_ipc_stream_writer_impl.hpp:169
absl::Status WriteEdgeMetrics(const presage::physiology::Metrics &metrics, int64_t timestamp_us) override
Definition redis_ipc_stream_writer_impl.hpp:141
std::string BuildChannel(const std::string &suffix) const
Definition redis_ipc_stream_writer_impl.hpp:277
void Close() override
Definition redis_ipc_stream_writer_impl.hpp:221
bool IsOperational() const override
Definition redis_ipc_stream_writer_impl.hpp:228
absl::Status WriteStatus(const presage::physiology::StatusValue &status) override
Definition redis_ipc_stream_writer_impl.hpp:86
absl::Status Initialize()
Definition redis_ipc_stream_writer_impl.hpp:40
Definition redis_ipc_configuration.hpp:26