SmartSpectra C++ SDK
Measure human vitals from video with SmartSpectra C++ SDK.
Loading...
Searching...
No Matches
redis_ipc_stream_writer.hpp
1// redis_ipc_stream_writer.hpp
2// Created by Greg on 1/30/25.
3// Copyright (C) 2025 Presage Security, Inc.
4//
5// SPDX-License-Identifier: LGPL-3.0-or-later
6
7#pragma once
8
9#include <atomic>
10#include <memory>
11#include <mutex>
12#include <string>
13#include <vector>
14
15#include "../ipc_stream_writer.hpp"
16#include "redis_ipc_configuration.hpp"
17#include "redis_client.hpp"
18
19namespace presage::smartspectra::redis_ipc {
20
25template<container::settings::IntegrationMode TIntegrationMode>
26class RedisIpcStreamWriter : public ipc::IpcStreamWriter<TIntegrationMode> {
27public:
29 using typename Base::BackgroundContainer;
30
31 explicit RedisIpcStreamWriter(const RedisIpcConfig& configuration);
32 virtual ~RedisIpcStreamWriter();
33
34 // Implement base interface
35 absl::Status WriteStatus(const presage::physiology::StatusValue& status) override;
36 absl::Status WriteCoreMetrics(const presage::physiology::MetricsBuffer& metrics, int64_t timestamp_us) override;
37 absl::Status WriteEdgeMetrics(const presage::physiology::Metrics& metrics, int64_t timestamp_us) override;
38 absl::Status WriteFrame(const cv::Mat& frame, int64_t timestamp_us) override;
39 void Close() override;
40 bool IsOperational() const override;
41 absl::Status Flush() override;
42
48 absl::Status PublishRecordingState(bool recording);
49
50protected:
54 absl::Status Initialize();
55
59 std::string BuildKey(const std::string& suffix) const;
60
64 std::string BuildChannel(const std::string& suffix) const;
65
66private:
67
68
69
70 RedisIpcConfig configuration_;
71 RedisClient redis_client_;
72
73 // Thread safety
74 mutable std::mutex write_mutex_;
75 std::atomic<bool> is_operational_{false};
76
77 // Error tracking with rate limiting
78 std::atomic<int> publish_error_count_{0};
79 static constexpr int kMaxErrorLogs = 5;
80};
81
82} // namespace presage::smartspectra::redis_ipc
Definition ipc_stream_writer.hpp:46
Definition redis_client.hpp:22
absl::Status WriteCoreMetrics(const presage::physiology::MetricsBuffer &metrics, int64_t timestamp_us) override
Definition redis_ipc_stream_writer_impl.hpp:112
absl::Status Flush() override
Definition redis_ipc_stream_writer_impl.hpp:233
absl::Status PublishRecordingState(bool recording)
Definition redis_ipc_stream_writer_impl.hpp:239
std::string BuildKey(const std::string &suffix) const
Definition redis_ipc_stream_writer_impl.hpp:269
absl::Status WriteFrame(const cv::Mat &frame, int64_t timestamp_us) override
Definition redis_ipc_stream_writer_impl.hpp:169
absl::Status WriteEdgeMetrics(const presage::physiology::Metrics &metrics, int64_t timestamp_us) override
Definition redis_ipc_stream_writer_impl.hpp:141
std::string BuildChannel(const std::string &suffix) const
Definition redis_ipc_stream_writer_impl.hpp:277
void Close() override
Definition redis_ipc_stream_writer_impl.hpp:221
bool IsOperational() const override
Definition redis_ipc_stream_writer_impl.hpp:228
absl::Status WriteStatus(const presage::physiology::StatusValue &status) override
Definition redis_ipc_stream_writer_impl.hpp:86
absl::Status Initialize()
Definition redis_ipc_stream_writer_impl.hpp:40
Definition redis_ipc_configuration.hpp:26