SmartSpectra C++ SDK
Measure human vitals from video with SmartSpectra C++ SDK.
Loading...
Searching...
No Matches
physiology_service.hpp
1// physiology_service.hpp
2// Created by Greg on 5/17/24.
3// Copyright (C) 2024 Presage Security, Inc.
4//
5// SPDX-License-Identifier: LGPL-3.0-or-later
6
7#pragma once
8// === standard library includes (if any) ===
9#include <chrono>
10#include <thread>
11#include <atomic>
12#include <string>
13#include <functional>
14// === third-party includes (if any) ===
15#include <reproc++/reproc.hpp>
16#include <physiology/graph/physiology_core_service.grpc.pb.h>
17#include <physiology/graph/physiology_core_service.pb.h>
18// === local includes (if any) ===
19#include <smartspectra/cpp/physiology_service.grpc.pb.h>
20#include "log/log_buffer.hpp"
21#include "log/absl_log_sink.hpp"
22#include "log/glog_sink.hpp"
23#include "log/core_log_sink.hpp"
24//TODO: remove relative includes from public headers -- might require moving on_prem into <smartspectra/root>/cpp/smartspectra/
25// the one below doesn't resolve w/ <> most likely due to the current header (physiology_service.hpp) being in a
26// header file set with BASE_DIR at parent of smartspectra/cpp (see CMake file)
27#include "smartspectra/cpp/physiology_service.pb.h"
28#include "reactors/core_metrics_provider.hpp"
29#include "reactors/edge_metrics_provider.hpp"
30#include "reactors/status_code_provider.hpp"
31#include "reactors/frame_with_timestamp_recorder.hpp"
32#include "ipc_stream_writer.hpp"
33#include <smartspectra/container/background_container.hpp>
34#include <smartspectra/container/settings.hpp>
35#include <smartspectra/video_source/video_source.hpp>
36#include <smartspectra/video_source/camera_tuner.hpp>
37
38
39namespace presage::smartspectra::grpc_bindings {
40
42 int verbosity_level = 0;
43 bool log_dropped_frames = false;
44 bool log_throughput_and_latency = false;
45 bool log_edge_core_transfer_times = false;
46};
47
48template<container::settings::IntegrationMode TIntegrationMode>
49class PhysiologyServiceImpl final : public Physiology::CallbackService {
50public:
51 explicit PhysiologyServiceImpl(
52 video_source::InputTransformMode input_transform_mode = video_source::InputTransformMode::None,
53 double preprocessed_data_buffer_duration_s = 0.2,
54 std::optional<bool> enable_phasic_bp = std::nullopt,
55 std::optional<bool> enable_eda = std::nullopt,
56 std::optional<bool> use_full_range_face_detection = std::nullopt,
57 std::optional<bool> use_full_pose_landmarks = std::nullopt,
58 std::optional<bool> enable_pose_landmark_segmentation = std::nullopt,
59 std::optional<bool> enable_micromotion = std::nullopt,
60 bool use_camera = false,
61 int camera_device_index = 0,
62 const std::string& input_video_path = "",
63 const std::string& input_video_time_path = "",
64 int interframe_delay_ms = 20,
65 bool start_with_recording_on = false,
66 bool exit_on_input_video_completion = false,
67 uint16_t physiology_core_port_number = 50052,
68 bool physiology_core_manual_control = false,
69 int physiology_core_connection_timeout_ms = 50000,
70 const std::string& venv_directory = "",
71 const LogSettings& log_settings = {},
72 const int log_buffer_size = 1000,
73 int64_t test_physiology_core_seed = -1,
74 std::unique_ptr<ipc::IpcStreamWriter<TIntegrationMode>> ipc_stream_writer = nullptr,
75 std::optional<std::string> video_output_directory = std::nullopt,
76 bool enable_camera_tuning = true,
77 video_source::CameraTunerSettings tuner_settings = {},
78 std::chrono::seconds camera_tuning_cooldown_seconds = std::chrono::seconds(10),
79 bool camera_tuning_recheck_on_recovery = false
80 );
81
82 ~PhysiologyServiceImpl() override;
83
84 using BackgroundContainer =
86 presage::platform_independence::DeviceType::Cpu,
87 smartspectra::container::settings::OperationMode::Continuous,
88 TIntegrationMode
89 >;
90
91 using TFrameWithTimestampRecorder = FrameWithTimestampRecorder<TIntegrationMode>;
92 using TStatusCodeProvider = StatusCodeProvider<TIntegrationMode>;
93 using TMetricsProvider = CoreMetricsProvider<TIntegrationMode>;
94 using TEdgeMetricsProvider = EdgeMetricsProvider<TIntegrationMode>;
95
96 using BackgroundContainerSettings = typename BackgroundContainer::SettingsType;
97
98 grpc::ServerUnaryReactor* StartCore(
99 grpc::CallbackServerContext* context,
100 const ::google::protobuf::Empty* request,
101 ::google::protobuf::Empty* response
102 ) override;
103
104 grpc::ServerUnaryReactor* StartEdgeGraph(
105 grpc::CallbackServerContext* context,
106 const ::google::protobuf::Empty* request,
107 ::google::protobuf::Empty* response
108 ) override;
109
110 grpc::ServerUnaryReactor* StartPreprocessing(
111 grpc::CallbackServerContext* context,
112 const ::google::protobuf::Empty* request,
113 ::google::protobuf::Empty* response
114 ) override;
115
116 grpc::ServerUnaryReactor* WaitUntilGraphIsIdle(
117 grpc::CallbackServerContext* context,
118 const ::google::protobuf::Empty* request,
119 ::google::protobuf::Empty* response
120 ) override;
121
122 grpc::ServerUnaryReactor* SetRecording(
123 grpc::CallbackServerContext* context,
124 const ::google::protobuf::BoolValue* request,
125 ::google::protobuf::Empty* response
126 ) override;
127
128 grpc::ServerReadReactor<presage::smartspectra::FrameWithTimestamp>* AddFrameWithTimestamp(
129 grpc::CallbackServerContext* context,
130 google::protobuf::Empty* response
131 ) override;
132
133 grpc::ServerWriteReactor<physiology::StatusValue>* GetStatusCode(
134 grpc::CallbackServerContext* context,
135 const google::protobuf::Empty* request
136 ) override;
137
138 grpc::ServerUnaryReactor* GetStatusDescription(
139 grpc::CallbackServerContext* context,
140 const physiology::StatusValue* request,
141 google::protobuf::StringValue* response
142 ) override;
143
144 grpc::ServerUnaryReactor* GetStatusHint(
145 grpc::CallbackServerContext* context,
146 const physiology::StatusValue* request,
147 google::protobuf::StringValue* response
148 ) override;
149
150 grpc::ServerWriteReactor<presage::physiology::MetricsBuffer>* GetCoreMetrics(
151 grpc::CallbackServerContext* context,
152 const google::protobuf::Empty* request
153 ) override;
154
155 grpc::ServerWriteReactor<presage::physiology::MetricsBuffer>* GetMetrics(
156 grpc::CallbackServerContext* context,
157 const google::protobuf::Empty* request
158 ) override;
159
160 grpc::ServerWriteReactor<presage::physiology::Metrics>* GetEdgeMetrics(
161 grpc::CallbackServerContext* context,
162 const google::protobuf::Empty* request
163 ) override;
164
165 grpc::ServerUnaryReactor* StopCore(
166 grpc::CallbackServerContext* context,
167 const ::google::protobuf::Empty* request,
168 ::google::protobuf::Empty* response
169 ) override;
170
171 grpc::ServerUnaryReactor* StopEdgeGraph(
172 grpc::CallbackServerContext* context,
173 const ::google::protobuf::Empty* request,
174 ::google::protobuf::Empty* response
175 ) override;
176
177 grpc::ServerUnaryReactor* StopPreprocessing(
178 grpc::CallbackServerContext* context,
179 const ::google::protobuf::Empty* request,
180 ::google::protobuf::Empty* response
181 ) override;
182
183 grpc::ServerUnaryReactor* ResetProcessing(
184 grpc::CallbackServerContext* context,
185 const ::google::protobuf::Empty* request,
186 ::google::protobuf::Empty* response
187 ) override;
188
189 grpc::ServerUnaryReactor* CheckHealth(
190 grpc::CallbackServerContext* context,
191 const ::google::protobuf::Empty* request,
192 ::google::protobuf::StringValue* response
193 ) override;
194
195 grpc::ServerUnaryReactor* IsPhysiologyCoreUp(
196 grpc::CallbackServerContext* context,
197 const ::google::protobuf::Empty* request,
198 ::google::protobuf::BoolValue* response
199 ) override;
200
201 grpc::ServerUnaryReactor* IsEdgeGraphRunning(
202 grpc::CallbackServerContext* context,
203 const ::google::protobuf::Empty* request,
204 ::google::protobuf::BoolValue* response
205 ) override;
206
207 grpc::ServerUnaryReactor* IsPreprocessingRunning(
208 grpc::CallbackServerContext* context,
209 const ::google::protobuf::Empty* request,
210 ::google::protobuf::BoolValue* response
211 ) override;
212
213 grpc::ServerWriteReactor<::presage::smartspectra::LogEntry>* StreamLogs(
214 grpc::CallbackServerContext* context,
215 const google::protobuf::Empty* request
216 ) override;
217
218 absl::Status AutoStartIfRequested();
219 void SetOnInputVideoCompletionCallback(std::function<void()> callback);
220
221private:
222 template<typename TProviderWithContainer>
223 TProviderWithContainer* GenericHookToProvider(std::shared_ptr<TProviderWithContainer>& provider);
224
225 absl::Status StartCoreServer();
226
227 absl::Status StartCoreClient();
228
229 absl::Status StopCoreServer();
230
231 absl::Status BuildAndStartContainer();
232
233 absl::Status StopAndDestroyContainer();
234
235 absl::Status VerifyVenv();
236 absl::Status PublishRecordingStateToRedis(bool recording);
237
238 absl::Status HandleStatusForTuning(
239 const presage::physiology::StatusValue& status_output,
240 presage::physiology::StatusCode previous_status_code
241 );
242
243 absl::Status FlushPendingStatusAfterTuning(ipc::IpcStreamWriter<TIntegrationMode>* writer);
244
245 // Restrict re-tuning to the subset of status codes that camera tuning can address.
246 static bool StatusRequiresImmediateRetune(presage::physiology::StatusCode status);
247 static bool StatusShouldRetuneAfterRecovery(presage::physiology::StatusCode status);
248
249
250 // parameters
251 const uint16_t physiology_core_port_number;
252 const bool physiology_core_manual_control;
253 const int64_t test_physiology_core_seed;
254 const int physiology_core_connection_timeout_ms;
255
256 // state
257 std::shared_ptr<BackgroundContainer> container;
258 std::shared_ptr<TFrameWithTimestampRecorder> frame_with_timestamp_recorder;
259 std::shared_ptr<TStatusCodeProvider> status_code_provider;
260 std::shared_ptr<TMetricsProvider> metrics_provider;
261 std::shared_ptr<TEdgeMetricsProvider> edge_metrics_provider;
262 std::mutex preprocessing_construction_mutex;
263 std::condition_variable preprocessing_construction_condition;
264 video_source::InputTransformMode input_transform_mode;
265
266 const BackgroundContainerSettings container_settings;
267 reproc::options physiology_core_process_options;
268 reproc::process physiology_core_process;
269 bool physiology_core_started = false;
270 std::shared_ptr<grpc::Channel> channel;
271 std::unique_ptr<presage::physiology::PhysiologyCore::Stub> core_stub = nullptr;
272 std::filesystem::path venv_directory;
273 bool venv_verified = false;
274
275 LogSettings log_settings;
276 LogBuffer log_buffer;
277 AbslLogSink absl_log_sink;
278 GlogSink glog_sink;
279
280 CoreLogSink core_stdout_log_sink;
281 CoreLogSink core_stderr_log_sink;
282 std::thread core_log_thread;
283 std::atomic<bool> core_log_thread_shutdown = false;
284
285 // Camera/video ingestion thread (optional)
286 const bool use_camera;
287 const std::string input_video_path;
288 const std::string input_video_time_path;
289 const bool local_input_enabled;
290 const bool start_with_recording_on;
291 const bool exit_on_input_video_completion;
292 bool recording_auto_started = false;
293 const int camera_device_index;
294 std::mutex local_input_thread_mutex;
295 std::thread local_input_thread;
296 std::atomic<bool> local_input_thread_shutdown = false;
297 std::shared_ptr<video_source::VideoSource> local_input_source;
298 std::function<void()> input_video_completion_callback;
299 std::mutex input_video_completion_callback_mutex;
300
301 // IPC stream writer (optional, alternative to gRPC streams)
302 std::unique_ptr<ipc::IpcStreamWriter<TIntegrationMode>> ipc_stream_writer;
303
304 // Camera tuning (optional, for IPC modes with camera)
305 std::shared_ptr<video_source::CameraTuner> camera_tuner;
306 video_source::CameraTunerSettings tuner_settings_;
307 std::atomic<bool> camera_tuning_enabled = false;
308 std::atomic<bool> is_recording = false;
309 std::atomic<bool> is_tuning_camera = false;
310 std::chrono::steady_clock::time_point last_camera_tuning_time;
311 std::chrono::seconds camera_tuning_cooldown_duration;
312 const bool camera_tuning_recheck_on_recovery_;
313 std::mutex camera_tuning_mutex;
314 std::atomic<int64_t> latest_status_change_timestamp_us_{0};
315 std::atomic<presage::physiology::StatusCode> latest_status_code_{presage::physiology::StatusCode::PROCESSING_NOT_STARTED};
316 std::mutex pending_status_mutex_;
317 std::optional<presage::physiology::StatusValue> pending_status_after_tuning_;
318
319 // Camera thread error tracking
320 std::optional<absl::Status> last_camera_error_;
321 std::mutex camera_error_mutex_;
322};
323
324typedef PhysiologyServiceImpl<smartspectra::container::settings::IntegrationMode::Grpc> PhysiologyServiceImpl_GrpcBackend;
325} // namespace presage::smartspectra::grpc_bindings
Container for background thread processing.
Definition background_container.hpp:40
Definition core_metrics_provider.hpp:40
Definition edge_metrics_provider.hpp:25
Definition frame_with_timestamp_recorder.hpp:38
grpc::ServerUnaryReactor * StartPreprocessing(grpc::CallbackServerContext *context, const ::google::protobuf::Empty *request, ::google::protobuf::Empty *response) override
Definition physiology_service_impl.hpp:591
grpc::ServerUnaryReactor * StopPreprocessing(grpc::CallbackServerContext *context, const ::google::protobuf::Empty *request, ::google::protobuf::Empty *response) override
Definition physiology_service_impl.hpp:873
grpc::ServerUnaryReactor * StopEdgeGraph(grpc::CallbackServerContext *context, const ::google::protobuf::Empty *request, ::google::protobuf::Empty *response) override
Definition physiology_service_impl.hpp:853
Definition status_code_provider.hpp:41
Definition ipc_stream_writer.hpp:46
InputTransformMode
Transformation applied to frames prior to processing.
Definition input_transform.hpp:19
Definition physiology_service.hpp:41
Configuration parameters for the camera tuning process.
Definition camera_tuner_settings.hpp:14