SmartSpectra C++ SDK
Measure human vitals from video with SmartSpectra C++ SDK.
Loading...
Searching...
No Matches
physiology_service_impl.hpp
1//
2// Created by greg on 5/17/24.
3// Copyright (c) 2024 Presage Technologies
4//
5
6// === standard library includes (if any) ===
7#include <filesystem>
8#include <iomanip>
9#include <mutex>
10#include <future>
11#include <optional>
12// === third-party includes (if any) ===
13#include <grpc/grpc.h>
14#include <grpcpp/grpcpp.h>
15#include <mediapipe/framework/port/status_macros.h>
16#include <mediapipe/framework/port/opencv_core_inc.h>
17#include <physiology/graph/physiology_core_service.grpc.pb.h>
18#include <physiology/graph/physiology_core_service.pb.h>
19#include <physiology/modules/rpc.h>
20#include <physiology/modules/messages/status.h>
21#include <physiology/modules/filesystem.h>
22#include <physiology/modules/configuration.h>
23#include <reproc++/drain.hpp>
24#include <absl/log/log_sink_registry.h>
25// === local includes (if any) ===
26#include <smartspectra/container/settings.hpp>
27#include <smartspectra/video_source/factory.hpp>
28#include <mediapipe/framework/port/opencv_imgproc_inc.h>
29#include "physiology_service.hpp"
30#include "smartspectra_on_prem_config.hpp"
31#include "reactors/log_stream_reactor.hpp"
32#include "redis_ipc/redis_ipc_stream_writer.hpp"
33
34namespace spectra = presage::physiology;
35namespace cs = presage::smartspectra::container::settings;
36namespace rpc = presage::rpc;
37namespace vs = presage::smartspectra::video_source;
38
39namespace presage::smartspectra::grpc_bindings {
40
41
42template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
43PhysiologyServiceImpl<TIntegrationMode>::PhysiologyServiceImpl(
44 vs::InputTransformMode input_transform_mode,
45 double preprocessed_data_buffer_duration_s,
46 std::optional<bool> enable_phasic_bp,
47 std::optional<bool> enable_eda,
48 std::optional<bool> use_full_range_face_detection,
49 std::optional<bool> use_full_pose_landmarks,
50 std::optional<bool> enable_pose_landmark_segmentation,
51 std::optional<bool> enable_micromotion,
52 bool use_camera,
53 int camera_device_index,
54 const std::string& input_video_path,
55 const std::string& input_video_time_path,
56 int interframe_delay_ms,
57 bool start_with_recording_on,
58 bool exit_on_input_video_completion,
59 uint16_t physiology_core_port_number,
60 bool physiology_core_manual_control,
61 int physiology_core_connection_timeout_ms,
62 const std::string& venv_directory,
63 const LogSettings& log_settings,
64 const int log_buffer_size,
65 int64_t test_physiology_core_seed,
66 std::unique_ptr<ipc::IpcStreamWriter<TIntegrationMode>> ipc_stream_writer,
67 std::optional<std::string> video_output_directory,
68 bool enable_camera_tuning,
69 video_source::CameraTunerSettings tuner_settings,
70 std::chrono::seconds camera_tuning_cooldown_seconds,
71 bool camera_tuning_recheck_on_recovery
72) : physiology_core_port_number(physiology_core_port_number),
73 physiology_core_manual_control(physiology_core_manual_control),
74 test_physiology_core_seed(test_physiology_core_seed),
75 physiology_core_connection_timeout_ms(physiology_core_connection_timeout_ms),
76 venv_directory(venv_directory),
77 log_settings(log_settings),
78 log_buffer(log_buffer_size),
79 absl_log_sink(this->log_buffer),
80 glog_sink(this->log_buffer),
81 core_stdout_log_sink(this->log_buffer, this->core_log_thread_shutdown),
82 core_stderr_log_sink(this->log_buffer, this->core_log_thread_shutdown),
83 container(nullptr),
84 container_settings(
85 {
86 video_source::VideoSourceSettings{},
88 /*headless=*/ false,
89 /*interframe_delay_ms=*/ interframe_delay_ms,
90 /*start_with_recording_on=*/ start_with_recording_on,
91 /*start_time_offset_ms=*/ 0,
92 /*scale_input=*/ true,
93 /*binary_graph=*/ true,
94 enable_phasic_bp,
95 enable_eda,
96 /*enable_dense_facemesh_points=*/false,
97 use_full_range_face_detection,
98 use_full_pose_landmarks,
99 enable_pose_landmark_segmentation,
100 enable_micromotion,
101 /*enable_edge_metrics=*/true,
102 /*print_graph_contents=*/false,
103 /*log_transfer_timing_info=*/log_settings.log_edge_core_transfer_times,
104 video_output_directory,
105 /*verbosity_level=*/ 0,
106 cs::ContinuousSettings{
107 preprocessed_data_buffer_duration_s
108 },
109 cs::GrpcSettings{
110 physiology_core_port_number
111 }
112 }
113 ),
114 input_transform_mode(input_transform_mode),
115 use_camera(use_camera),
116 input_video_path(input_video_path),
117 input_video_time_path(input_video_time_path),
118 start_with_recording_on(start_with_recording_on),
119 exit_on_input_video_completion(exit_on_input_video_completion),
120 local_input_enabled(use_camera || !input_video_path.empty()),
121 camera_device_index(camera_device_index),
122 ipc_stream_writer(std::move(ipc_stream_writer)),
123 tuner_settings_(tuner_settings),
124 camera_tuning_cooldown_duration(camera_tuning_cooldown_seconds),
125 camera_tuning_recheck_on_recovery_(camera_tuning_recheck_on_recovery) {
126 // Enable overlay rendering by default for on-prem service
127 this->tuner_settings_.render_calibrating_overlay = enable_camera_tuning;
128
129 absl::AddLogSink(&this->absl_log_sink);
130 google::AddLogSink(&this->glog_sink);
131}
132
133
134template<container::settings::IntegrationMode TIntegrationMode>
135PhysiologyServiceImpl<TIntegrationMode>::~PhysiologyServiceImpl() {
136 // Attempt graceful shutdown of container/core to avoid dangling threads.
137 if (this->container != nullptr) {
138 absl::Status container_stop_status = this->StopAndDestroyContainer();
139 if (!container_stop_status.ok()) {
140 LOG(WARNING) << "Failed to stop preprocessing container during destruction: "
141 << container_stop_status.message();
142 }
143 }
144 if (this->physiology_core_started && !this->physiology_core_manual_control) {
145 absl::Status core_stop_status = this->StopCoreServer();
146 if (!core_stop_status.ok()) {
147 LOG(WARNING) << "Failed to stop Physiology Core during destruction: "
148 << core_stop_status.message();
149 }
150 this->physiology_core_started = false;
151 }
152 if (this->core_log_thread.joinable()) {
153 this->core_log_thread.join();
154 }
155
156 // Stop local input thread (camera or video file) before tearing down
157 if (this->local_input_enabled) {
158 {
159 std::lock_guard<std::mutex> lk(this->local_input_thread_mutex);
160 this->local_input_thread_shutdown = true;
161 }
162 if (this->local_input_thread.joinable()) {
163 this->local_input_thread.join();
164 }
165 this->local_input_source.reset();
166 }
167 // Shutdown the log buffer to release any waiting threads
168 this->log_buffer.Shutdown();
169
170 absl::RemoveLogSink(&this->absl_log_sink);
171 google::RemoveLogSink(&this->glog_sink);
172}
173
174template<container::settings::IntegrationMode TIntegrationMode>
175absl::Status PhysiologyServiceImpl<TIntegrationMode>::AutoStartIfRequested() {
176 if (!this->start_with_recording_on) {
177 return absl::OkStatus();
178 }
179 if (this->physiology_core_manual_control) {
180 LOG(INFO) << "Skipping auto-start: manual Physiology Core control enabled.";
181 return absl::OkStatus();
182 }
183 if (!this->physiology_core_started) {
184 LOG(INFO) << "Auto-start requested; launching Physiology Core.";
185 MP_RETURN_IF_ERROR(this->StartCoreServer());
186 MP_RETURN_IF_ERROR(this->StartCoreClient());
187 this->physiology_core_started = true;
188 LOG(INFO) << "Physiology Core auto-start complete.";
189 }
190 MP_RETURN_IF_ERROR(this->BuildAndStartContainer());
191 LOG(INFO) << "Edge graph initialized; recording set to "
192 << (this->recording_auto_started ? "on." : "off.");
193 return absl::OkStatus();
194}
195
196template<container::settings::IntegrationMode TIntegrationMode>
197void PhysiologyServiceImpl<TIntegrationMode>::SetOnInputVideoCompletionCallback(std::function<void()> callback) {
198 std::lock_guard<std::mutex> lock(this->input_video_completion_callback_mutex);
199 this->input_video_completion_callback = std::move(callback);
200}
201
202template<container::settings::IntegrationMode TIntegrationMode>
203absl::Status PhysiologyServiceImpl<TIntegrationMode>::PublishRecordingStateToRedis(bool recording) {
204 if (this->ipc_stream_writer == nullptr) {
205 return absl::OkStatus();
206 }
207 auto* redis_writer =
208 dynamic_cast<redis_ipc::RedisIpcStreamWriter<TIntegrationMode>*>(this->ipc_stream_writer.get());
209 if (redis_writer == nullptr) {
210 return absl::OkStatus();
211 }
212 return redis_writer->PublishRecordingState(recording);
213}
214
215template<typename TFunction>
216inline grpc::ServerUnaryReactor* ExecuteAsyncWithDefaultReactor(
217 grpc::CallbackServerContext* context,
218 TFunction&& function
219) {
220 auto reactor = context->DefaultReactor();
221 grpc::Status status = std::forward<TFunction>(function)();
222 reactor->Finish(status);
223 return reactor;
224}
225
226// NOTE: needs explicit template specialization w/ GCC, can't rely on template-from-function-argument inference
227template<container::settings::IntegrationMode TIntegrationMode>
228template<typename TProviderWithContainer>
229TProviderWithContainer* PhysiologyServiceImpl<TIntegrationMode>::GenericHookToProvider(
230 std::shared_ptr<TProviderWithContainer>& provider
231) {
232 // Disable gRPC streams when using IPC writer
233 if (this->ipc_stream_writer != nullptr) {
234 // Create a dummy provider that immediately finishes with an error
235 auto error_provider = std::make_shared<TProviderWithContainer>(nullptr);
236 error_provider->Finish(grpc::Status(grpc::StatusCode::UNAVAILABLE,
237 "gRPC streaming disabled: using alternative IPC backend"));
238 return error_provider.get();
239 }
240
241 std::unique_lock<std::mutex> lock(this->preprocessing_construction_mutex);
242 this->preprocessing_construction_condition.wait(lock, [this] { return this->container != nullptr; });
243 if (provider != nullptr) {
244 if (provider->GetContainer() != this->container.get()) {
245 // Assuming new gRPC connection, discard the old reactor safely and build new one.
246 provider->Finish(grpc::Status::OK);
247 provider->WaitUntilDone();
248 provider.reset();
249 provider = std::make_shared<TProviderWithContainer>(this->container);
250 }
251 } else {
252 provider = std::make_shared<TProviderWithContainer>(this->container);
253 }
254 return provider.get();
255}
256
257template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
258grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::StartCore(
259 grpc::CallbackServerContext* context,
260 const ::google::protobuf::Empty* request,
261 google::protobuf::Empty* response
262) {
263 return ExecuteAsyncWithDefaultReactor(context, [this]() -> grpc::Status {
264 if (this->physiology_core_manual_control) {
265 const char* message = "Skipping start of Physiology Core Service: manual control enabled.";
266 LOG(INFO) << message;
267 } else {
268 if (this->physiology_core_started) {
269 return {::grpc::StatusCode::ALREADY_EXISTS, "Physiology core is already running."};
270 }
271 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(StartCoreServer());
272 }
273 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(StartCoreClient());
274 this->physiology_core_started = true;
275 return ::grpc::Status::OK;
276 });
277}
278
279
280template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
281absl::Status PhysiologyServiceImpl<TIntegrationMode>::BuildAndStartContainer() {
282 std::unique_lock<std::mutex> lock(this->preprocessing_construction_mutex);
283 if (this->container == nullptr) {
284 this->container = std::make_shared<BackgroundContainer>(this->container_settings);
285 if (this->ipc_stream_writer == nullptr) {
286 // Swap container for gRPC stream reactors
287 if (this->frame_with_timestamp_recorder != nullptr) {
288 this->frame_with_timestamp_recorder->SwapContainer(this->container);
289 }
290 if (this->status_code_provider != nullptr) {
291 this->status_code_provider->SwapContainer(this->container);
292 }
293 if (this->metrics_provider != nullptr) {
294 this->metrics_provider->SwapContainer(this->container);
295 }
296 if (this->edge_metrics_provider != nullptr) {
297 this->edge_metrics_provider->SwapContainer(this->container);
298 }
299
300 } else {
301 // Swap container for alternative IPC stream writer
302 auto swap_status = this->ipc_stream_writer->SwapContainer(this->container);
303 if (!swap_status.ok()) {
304 return swap_status;
305 }
306
307 // Prepare camera tuner before registering status callbacks (camera only)
308 if (this->use_camera) {
309 if (!this->local_input_source) {
310 vs::VideoSourceSettings vs_settings{};
311 vs_settings.device_index = this->camera_device_index;
312 vs_settings.capture_height_px = 720;
313 vs_settings.capture_width_px = 1280;
314 vs_settings.input_transform_mode = this->input_transform_mode;
315 MP_ASSIGN_OR_RETURN(auto source, vs::BuildVideoSource(vs_settings));
316 this->local_input_source = std::move(source);
317 }
318
319 if (this->tuner_settings_.render_calibrating_overlay && this->camera_tuner == nullptr) {
320 this->camera_tuner = std::make_shared<video_source::CameraTuner>(this->tuner_settings_);
321
322 auto status = this->camera_tuner->SetVideoSource(this->local_input_source);
323 if (status.ok()) {
324 this->local_input_source = this->camera_tuner->GetVideoSource();
325 this->camera_tuning_enabled = true;
326 LOG(INFO) << "Camera tuner created and video source attached for IPC mode.";
327 } else {
328 LOG(WARNING) << "Camera tuning disabled: " << status.message();
329 this->camera_tuner.reset();
330 this->camera_tuning_enabled = false;
331 }
332 }
333 }
334
335 auto* ipc_writer_ptr = this->ipc_stream_writer.get();
336
337 // Set up status callbacks for camera tuning and IPC writer
338 if (this->use_camera && this->camera_tuning_enabled.load()) {
339 auto* tuner_ptr = this->camera_tuner.get();
340
341 // OnStatusCode: called on EVERY status update for camera tuning
342 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->container->SetOnStatusCode(
343 [this, tuner_ptr, ipc_writer_ptr](presage::physiology::StatusValue status_value) {
344 if (tuner_ptr && tuner_ptr->IsTuning()) {
345 auto status_code = static_cast<presage::physiology::StatusCode>(status_value.value());
346 auto tuning_status = tuner_ptr->ProcessFrame(status_code, status_value.timestamp());
347 if (!tuning_status.ok()) {
348 LOG(WARNING) << "Camera tuner ProcessFrame failed: " << tuning_status.message();
349 }
350 } else if (ipc_writer_ptr != nullptr) {
351 auto flush_status = this->FlushPendingStatusAfterTuning(ipc_writer_ptr);
352 if (!flush_status.ok()) {
353 return flush_status;
354 }
355 }
356 return absl::OkStatus();
357 }
358 ));
359 LOG(INFO) << "Camera tuning status monitoring enabled.";
360 }
361
362 // OnStatusChange: called only when status changes, for IPC writer
363 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->container->SetOnStatusChange(
364 [this, ipc_writer_ptr](presage::physiology::StatusValue status_value) {
365 auto new_status = static_cast<presage::physiology::StatusCode>(status_value.value());
366 auto previous_status = this->latest_status_code_.exchange(new_status);
367 this->latest_status_change_timestamp_us_.store(status_value.timestamp());
368
369 // Handle tuning logic (may trigger tuning based on status)
370 auto tuning_status = this->HandleStatusForTuning(status_value, previous_status);
371 if (!tuning_status.ok()) {
372 return tuning_status;
373 }
374
375 if (!this->camera_tuner || !this->camera_tuner->IsTuning()) {
376 auto flush_status = this->FlushPendingStatusAfterTuning(ipc_writer_ptr);
377 if (!flush_status.ok()) {
378 return flush_status;
379 }
380 return ipc_writer_ptr->WriteStatus(status_value);
381 }
382
383 if (new_status == presage::physiology::StatusCode::OK) {
384 presage::physiology::StatusValue masked_status(status_value);
385 {
386 std::lock_guard<std::mutex> lock(this->pending_status_mutex_);
387 this->pending_status_after_tuning_ = status_value;
388 }
389 masked_status.set_value(presage::physiology::StatusCode::CAMERA_TUNING_IN_PROGRESS);
390 return ipc_writer_ptr->WriteStatus(masked_status);
391 }
392
393 return ipc_writer_ptr->WriteStatus(status_value);
394 }
395 ));
396 }
397 if (this->log_settings.log_dropped_frames) {
398 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->container->SetOnFrameSentThrough(
399 [](bool frame_sent_through, int64_t input_timestamp) {
400 if (!frame_sent_through) {
401 LOG(INFO) << "Dropped frame with capture timestamp " << input_timestamp << " microseconds.";
402 }
403 return absl::OkStatus();
404 }
405 ));
406 }
407 if (this->log_settings.log_throughput_and_latency) {
408 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->container->SetOnCorePerformanceTelemetry([](
409 double effective_core_fps,
410 double effective_core_latency,
411 int64_t input_timestamp
412 ) {
413 LOG(INFO) << "Effective system throughput: " << std::fixed << std::setprecision(3)
414 << effective_core_fps
415 << " FPS/HZ, system latency: " << std::fixed << std::setprecision(3)
416 << effective_core_latency << " s [window: 3s for both]";
417 return absl::OkStatus();
418 }));
419 }
420 }
421 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(container->Initialize());
422 if (!this->container->GraphIsRunning()) {
423 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(container->StartGraph());
424 }
425 if (this->start_with_recording_on && !this->recording_auto_started) {
426 auto recording_status = this->container->SetRecording(true);
427 if (!recording_status.ok()) {
428 LOG(WARNING) << "Failed to auto-start recording: " << recording_status.message();
429 } else {
430 auto publish_status = PublishRecordingStateToRedis(/*recording=*/true);
431 if (!publish_status.ok()) {
432 LOG(WARNING) << "Failed to publish auto-start recording state: " << publish_status.message();
433 }
434 this->recording_auto_started = true;
435 this->is_recording.store(true);
436 }
437 }
438 // notify any awaiting threads in async calls that we have a container available
439 this->preprocessing_construction_condition.notify_all();
440
441 // If configured, start local ingestion thread (camera or video file) after container is ready
442 if (this->local_input_enabled) {
443 std::lock_guard<std::mutex> local_lock(this->local_input_thread_mutex);
444 if (!this->local_input_thread.joinable()) {
445 if (!this->local_input_source) {
446 vs::VideoSourceSettings vs_settings{};
447 vs_settings.input_transform_mode = this->input_transform_mode;
448 if (!this->input_video_path.empty()) {
449 vs_settings.input_video_path = this->input_video_path;
450 vs_settings.input_video_time_path = this->input_video_time_path;
451 } else {
452 vs_settings.device_index = this->camera_device_index;
453 vs_settings.capture_height_px = 720;
454 vs_settings.capture_width_px = 1280;
455 }
456 MP_ASSIGN_OR_RETURN(auto source, vs::BuildVideoSource(vs_settings));
457 this->local_input_source = std::move(source);
458 }
459 this->local_input_thread_shutdown = false;
460 this->local_input_thread = std::thread([this]() {
461 if (this->use_camera && this->camera_tuning_enabled.load() && this->camera_tuner != nullptr) {
462 LOG(INFO) << "Starting initial camera tuning...";
463 auto start_status = this->camera_tuner->StartTuning();
464 if (start_status.ok()) {
465 this->is_tuning_camera.store(true);
466 LOG(INFO) << "Initial camera tuning started";
467 {
468 std::lock_guard<std::mutex> lock(this->camera_tuning_mutex);
469 this->last_camera_tuning_time = std::chrono::steady_clock::now();
470 }
471 } else {
472 LOG(WARNING) << "Failed to start initial camera tuning: " << start_status.message();
473 {
474 std::lock_guard<std::mutex> lock(this->camera_error_mutex_);
475 this->last_camera_error_ = start_status;
476 }
477 }
478 }
479
480 int64_t previous_frame_ts_us = 0;
481 const bool fixed_delay_enabled = !this->use_camera && this->container_settings.interframe_delay_ms > 0;
482
483 while (!this->local_input_thread_shutdown.load()) {
484 // Ensure container is valid and running
485 std::shared_ptr<BackgroundContainer> local_container;
486 {
487 std::unique_lock<std::mutex> c_lock(this->preprocessing_construction_mutex);
488 if (this->container == nullptr || !this->container->GraphIsRunning()) {
489 // Avoid tight spin if container not ready
490 c_lock.unlock();
491 std::this_thread::sleep_for(std::chrono::milliseconds(10));
492 continue;
493 }
494 local_container = this->container;
495 }
496
497 cv::Mat frame_bgr;
498 (*this->local_input_source) >> frame_bgr;
499 if (frame_bgr.empty()) {
500 this->recording_auto_started = false;
501 if (!this->input_video_path.empty()) {
502 LOG(INFO) << "Input video source exhausted; stopping recording.";
503 absl::Status stop_status = local_container->SetRecording(false);
504 if (!stop_status.ok()) {
505 LOG(WARNING) << "Failed to stop recording after replay finished: "
506 << stop_status.message();
507 } else {
508 auto publish_status = PublishRecordingStateToRedis(/*recording=*/false);
509 if (!publish_status.ok()) {
510 LOG(WARNING) << "Failed to publish recording stop after replay: "
511 << publish_status.message();
512 }
513 this->is_recording.store(false);
514 }
515 if (this->exit_on_input_video_completion) {
516 std::function<void()> callback_copy;
517 {
518 std::lock_guard<std::mutex> cb_lock(this->input_video_completion_callback_mutex);
519 callback_copy = this->input_video_completion_callback;
520 }
521 if (callback_copy) {
522 LOG(INFO) << "Input video playback complete; signaling shutdown.";
523 callback_copy();
524 }
525 }
526 this->local_input_thread_shutdown = true;
527 break;
528 }
529
530 std::this_thread::sleep_for(std::chrono::milliseconds(5));
531 continue;
532 }
533
534 if (fixed_delay_enabled) {
535 std::this_thread::sleep_for(std::chrono::milliseconds(this->container_settings.interframe_delay_ms));
536 }
537
538 int64_t ts_us = this->local_input_source->GetFrameTimestamp();
539
540 cv::Mat frame_rgb;
541 cv::cvtColor(frame_bgr, frame_rgb, cv::COLOR_BGR2RGB);
542
543 // Check if tuning completed
544 if (this->use_camera && this->camera_tuner != nullptr) {
545 if (this->is_tuning_camera.load() && !this->camera_tuner->IsTuning()) {
546 this->is_tuning_camera.store(false);
547 auto stage = this->camera_tuner->GetCurrentStage();
548 if (stage == vs::TuningStage::COMPLETE) {
549 LOG(INFO) << "Camera tuning completed successfully";
550 } else if (stage == vs::TuningStage::FAILED) {
551 LOG(WARNING) << "Camera tuning failed";
552 } else {
553 LOG(INFO) << "Camera tuning stopped";
554 }
555 }
556
557 // Apply tuning overlay if active
558 frame_rgb = this->camera_tuner->ProcessFrameForDisplay(frame_rgb);
559 }
560
561 absl::Status s = local_container->AddFrameWithTimestamp(frame_rgb, ts_us);
562 if (!s.ok()) {
563 // Minor backoff to avoid hot loop on errors
564 std::this_thread::sleep_for(std::chrono::milliseconds(2));
565 }
566 }
567 });
568 }
569 }
570 return absl::OkStatus();
571}
572
573template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
574grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::StartEdgeGraph(
575 grpc::CallbackServerContext* context,
576 const ::google::protobuf::Empty* request,
577 ::google::protobuf::Empty* response
578) {
579 return ExecuteAsyncWithDefaultReactor(context, [this]() -> grpc::Status {
580 if (this->physiology_core_started || this->physiology_core_manual_control) {
581 return ::rpc::abslStatusToGrpc(BuildAndStartContainer());
582 }
583 return {::grpc::StatusCode::FAILED_PRECONDITION, "Physiology Core is not running, unable to start graph."};
584 });
585}
586
590template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
592 grpc::CallbackServerContext* context,
593 const ::google::protobuf::Empty* request,
594 google::protobuf::Empty* response
595) {
596 return this->StartEdgeGraph(context, request, response);
597}
598
599/*
600 * @deprecated
601 */
602template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
603grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::IsPreprocessingRunning(
604 grpc::CallbackServerContext* context,
605 const ::google::protobuf::Empty* request,
606 ::google::protobuf::BoolValue* response
607) {
608 return this->IsEdgeGraphRunning(context, request, response);
609}
610
611template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
612grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::WaitUntilGraphIsIdle(
613 grpc::CallbackServerContext* context,
614 const ::google::protobuf::Empty* request,
615 google::protobuf::Empty* response
616) {
617 return ExecuteAsyncWithDefaultReactor(context, [this]() -> grpc::Status {
618 return rpc::abslStatusToGrpc(container->WaitUntilGraphIsIdle());
619 });
620}
621
622template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
623grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::SetRecording(
624 grpc::CallbackServerContext* context,
625 const google::protobuf::BoolValue* request,
626 google::protobuf::Empty* response
627) {
628 return ExecuteAsyncWithDefaultReactor(context, [this, request]() -> grpc::Status {
629 if (this->container == nullptr) {
630 return {
631 grpc::StatusCode::FAILED_PRECONDITION,
632 "Physiology Preprocessing is not running, unable to start or stop recording."
633 };
634 }
635
636 bool recording = request->value();
637 bool was_recording = this->is_recording.load();
638
639 // If tuning is active and trying to start recording, defer the request
640 if (recording && this->is_tuning_camera.load()) {
641 LOG(INFO) << "Ignoring recording start request - camera tuning in progress";
642 // Return success but don't actually start recording
643 return grpc::Status::OK;
644 }
645
646 // Update recording state
647 this->is_recording.store(recording);
648
649 // Handle tuning state based on recording transition
650 if (this->camera_tuning_enabled.load() && this->camera_tuner != nullptr) {
651 if (recording && !was_recording) {
652 // Started recording - stop any ongoing tuning (shouldn't happen due to check above, but be safe)
653 if (this->is_tuning_camera.load()) {
654 LOG(INFO) << "Recording started - interrupting tuning";
655 // CameraTuner lacks an explicit stop API; clearing the flag prevents further processing.
656 this->is_tuning_camera.store(false);
657 }
658 } else if (!recording && was_recording) {
659 // Stopped recording - enable re-tuning after cooldown
660 LOG(INFO) << "Recording stopped - tuning will check status after cooldown";
661 std::lock_guard<std::mutex> lock(this->camera_tuning_mutex);
662 this->last_camera_tuning_time = std::chrono::steady_clock::now();
663 }
664 }
665
666 auto status = container->SetRecording(recording);
667
668 // Publish recording state to Redis IPC if using Redis backend
669 if (status.ok()) {
670 auto publish_status = PublishRecordingStateToRedis(recording);
671 if (!publish_status.ok()) {
672 LOG(WARNING) << "Failed to publish recording state: " << publish_status.message();
673 }
674 }
675
676 return rpc::abslStatusToGrpc(status);
677 });
678}
679
680template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
681grpc::ServerReadReactor<presage::smartspectra::FrameWithTimestamp>*
682PhysiologyServiceImpl<TIntegrationMode>::AddFrameWithTimestamp(
683 grpc::CallbackServerContext* context,
684 google::protobuf::Empty* response
685) {
686 // Disable gRPC frame streaming when using IPC writer
687 if (this->ipc_stream_writer != nullptr) {
688 auto error_recorder = std::make_shared<TFrameWithTimestampRecorder>(nullptr, this->input_transform_mode);
689 error_recorder->Finish(grpc::Status(grpc::StatusCode::UNAVAILABLE,
690 "gRPC frame streaming disabled: using alternative IPC backend"));
691 return error_recorder.get();
692 }
693
694 std::unique_lock<std::mutex> lock(this->preprocessing_construction_mutex);
695 this->preprocessing_construction_condition.wait(lock, [this] { return this->container != nullptr; });
696 if (this->frame_with_timestamp_recorder != nullptr) {
697 if (this->frame_with_timestamp_recorder->GetContainer() != this->container.get()) {
698 // Assuming new gRPC connection, discard the old reactor safely and build new one.
699 this->frame_with_timestamp_recorder->Finish(grpc::Status::OK);
700 this->frame_with_timestamp_recorder->WaitUntilDone();
701 this->frame_with_timestamp_recorder.reset();
702 this->frame_with_timestamp_recorder = std::make_shared<TFrameWithTimestampRecorder>(this->container,
703 this->input_transform_mode);
704 }
705 } else {
706 this->frame_with_timestamp_recorder = std::make_shared<TFrameWithTimestampRecorder>(this->container,
707 this->input_transform_mode);
708 }
709 return this->frame_with_timestamp_recorder.get();
710}
711
712inline cv::Mat BuildOpenCVMatFromProtoBuf(const smartspectra::Mat& mat_protobuf) {
713 cv::Mat mat_cv(mat_protobuf.rows(), mat_protobuf.cols(), mat_protobuf.element_type());
714 size_t data_bytesize = mat_cv.rows * mat_cv.cols * mat_cv.elemSize();
715 auto data_protobuf = const_cast<char*>(mat_protobuf.data().data());
716 std::copy(reinterpret_cast<unsigned char*>(data_protobuf),
717 reinterpret_cast<unsigned char*>(data_protobuf + data_bytesize),
718 mat_cv.data);
719 return mat_cv;
720}
721
722template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
723grpc::ServerWriteReactor<physiology::StatusValue>* PhysiologyServiceImpl<TIntegrationMode>::GetStatusCode(
724 grpc::CallbackServerContext* context,
725 const ::google::protobuf::Empty* request
726) {
727 return this->GenericHookToProvider<TStatusCodeProvider>(this->status_code_provider);
728}
729
730template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
731grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::GetStatusDescription(
732 grpc::CallbackServerContext* context,
733 const physiology::StatusValue* request,
734 google::protobuf::StringValue* response
735) {
736 return ExecuteAsyncWithDefaultReactor(
737 context,
738 [this, &response, &request]() -> grpc::Status {
739 response->set_value(spectra::GetStatusDescription(request->value()));
740 return ::grpc::Status::OK;
741 }
742 );
743}
744
745template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
746grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::GetStatusHint(
747 grpc::CallbackServerContext* context,
748 const physiology::StatusValue* request,
749 google::protobuf::StringValue* response
750) {
751 return ExecuteAsyncWithDefaultReactor(
752 context,
753 [this, &response, &request]() -> grpc::Status {
754 response->set_value(spectra::GetStatusHint(request->value()));
755 return ::grpc::Status::OK;
756 }
757 );
758}
759
760template<container::settings::IntegrationMode TIntegrationMode>
761grpc::ServerWriteReactor<presage::physiology::MetricsBuffer>*
762PhysiologyServiceImpl<TIntegrationMode>::GetCoreMetrics(
763 grpc::CallbackServerContext* context,
764 const google::protobuf::Empty* request
765) {
766 return this->GenericHookToProvider<TMetricsProvider>(this->metrics_provider);
767}
768
769template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
770grpc::ServerWriteReactor<presage::physiology::MetricsBuffer>*
771PhysiologyServiceImpl<TIntegrationMode>::GetMetrics(
772 grpc::CallbackServerContext* context,
773 const ::google::protobuf::Empty* request
774) {
775 return this->GetCoreMetrics(context, request);
776}
777
778template<container::settings::IntegrationMode TIntegrationMode>
779grpc::ServerWriteReactor<presage::physiology::Metrics>* PhysiologyServiceImpl<TIntegrationMode>::GetEdgeMetrics(
780 grpc::CallbackServerContext* context,
781 const google::protobuf::Empty* request
782) {
783 return this->GenericHookToProvider<TEdgeMetricsProvider>(this->edge_metrics_provider);
784}
785
786template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
787grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::StopCore(
788 grpc::CallbackServerContext* context,
789 const ::google::protobuf::Empty* request,
790 google::protobuf::Empty* response
791) {
792 return ExecuteAsyncWithDefaultReactor(context, [this]() -> grpc::Status {
793 if (this->container != nullptr) {
794 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->StopAndDestroyContainer());
795 }
796 absl::Status status = absl::OkStatus();
797 if (this->physiology_core_manual_control) {
798 const char* message = "Skipping shutdown of Physiology Core Service: manual control enabled.";
799 LOG(INFO) << message;
800 } else {
801 if (this->physiology_core_started) {
802 status = StopCoreServer();
803 if (status.ok()) {
804 this->physiology_core_started = false;
805 }
806 } else {
807 LOG(INFO) << "No need to shut down Physiology Core Service: process not started.";
808 status = absl::OkStatus();
809 }
810 if (this->core_stub != nullptr) {
811 this->core_stub.reset();
812 }
813 }
814 return rpc::abslStatusToGrpc(status);
815 });
816}
817
818template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
819absl::Status PhysiologyServiceImpl<TIntegrationMode>::StopAndDestroyContainer() {
820 // Stop local input thread before shutting down the container (if enabled)
821 if (this->local_input_enabled) {
822 std::lock_guard<std::mutex> local_lock(this->local_input_thread_mutex);
823 this->local_input_thread_shutdown = true;
824 if (this->local_input_thread.joinable()) {
825 this->local_input_thread.join();
826 }
827 // keep local_input_source around to allow restart without re-probing device or re-opening file
828 }
829 if (this->container != nullptr) {
830 const bool graph_was_running = this->container->GraphIsRunning();
831 if (graph_was_running) {
832 absl::Status idle_status = container->WaitUntilGraphIsIdle();
833 if (!idle_status.ok() && !absl::IsFailedPrecondition(idle_status)) {
834 LOG(WARNING) << "WaitUntilGraphIsIdle failed prior to shutdown: " << idle_status.message();
835 }
836 }
837 absl::Status stop_status = container->StopGraph();
838 if (!stop_status.ok()) {
839 LOG(WARNING) << "StopGraph failed during shutdown: " << stop_status.message();
840 }
841 }
842 this->container.reset();
843 this->recording_auto_started = false;
844 return absl::OkStatus();
845}
846
852template<container::settings::IntegrationMode TIntegrationMode>
854 grpc::CallbackServerContext* context,
855 const ::google::protobuf::Empty* request,
856 ::google::protobuf::Empty* response
857) {
858 return ExecuteAsyncWithDefaultReactor(
859 context,
860 [this]() -> grpc::Status {
861 return rpc::abslStatusToGrpc(this->StopAndDestroyContainer());
862 }
863 );
864}
865
872template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
874 grpc::CallbackServerContext* context,
875 const ::google::protobuf::Empty* request,
876 google::protobuf::Empty* response
877) {
878 return this->StopEdgeGraph(context, request, response);
879}
880
881template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
882grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::ResetProcessing(
883 grpc::CallbackServerContext* context,
884 const ::google::protobuf::Empty* request,
885 google::protobuf::Empty* response
886) {
887 return ExecuteAsyncWithDefaultReactor(
888 context,
889 [this, &request, &response]() -> grpc::Status {
890 if (!physiology_core_manual_control && !this->physiology_core_started) {
891 return {::grpc::StatusCode::FAILED_PRECONDITION,
892 "Physiology Core has not been started, unable to clean test buffer."};
893 }
894
895 if (this->container == nullptr) {
896 return {::grpc::StatusCode::FAILED_PRECONDITION,
897 "Physiology Preprocessing has not been started, unable to clean test buffer."};
898 }
899
900 // stop the container if it is running
901 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(StopAndDestroyContainer());
902
903 grpc::ClientContext core_context;
904 // forward request to Physiology Core
905 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(core_stub->ResetProcessing(&core_context, *request, response));
906
907 // restart the container
908 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(BuildAndStartContainer());
909 return ::grpc::Status::OK;
910 }
911 );
912
913}
914
915template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
916grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::CheckHealth(
917 grpc::CallbackServerContext* context,
918 const ::google::protobuf::Empty* request,
919 ::google::protobuf::StringValue* response
920) {
921 return ExecuteAsyncWithDefaultReactor(context, [response]() {
922 response->set_value("Service is healthy");
923 return ::grpc::Status::OK;
924 });
925}
926
927template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
928grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::IsPhysiologyCoreUp(
929 grpc::CallbackServerContext* context,
930 const ::google::protobuf::Empty* request,
931 ::google::protobuf::BoolValue* response
932) {
933 return ExecuteAsyncWithDefaultReactor(context, [this, response]() {
934 response->set_value(this->physiology_core_started);
935 return ::grpc::Status::OK;
936 });
937}
938
939template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
940grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::IsEdgeGraphRunning(
941 grpc::CallbackServerContext* context,
942 const ::google::protobuf::Empty* request,
943 ::google::protobuf::BoolValue* response
944) {
945 return ExecuteAsyncWithDefaultReactor(context, [this, response]() {
946 response->set_value(this->container != nullptr && this->container->GraphIsRunning());
947 return ::grpc::Status::OK;
948 });
949}
950
951// region ======================= Physiology Core Start/Stop ===========================================================
952template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
953absl::Status PhysiologyServiceImpl<TIntegrationMode>::StartCoreClient() {
954 channel = grpc::CreateChannel("0.0.0.0:" + std::to_string(this->physiology_core_port_number),
955 grpc::InsecureChannelCredentials());
956 int retry_interval_ms = 50;
957 int remaining_time = this->physiology_core_connection_timeout_ms;
958 while (channel->GetState(true) != GRPC_CHANNEL_READY && remaining_time > 0) {
959 channel->WaitForConnected(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
960 gpr_time_from_millis(retry_interval_ms, GPR_TIMESPAN)));
961 remaining_time -= retry_interval_ms;
962 }
963 if (channel->GetState(true) != GRPC_CHANNEL_READY) {
964 channel.reset();
965 return absl::InternalError(
966 "Failed to connect to Physiology Core gRPC Server within " +
967 std::to_string(this->physiology_core_connection_timeout_ms) + " milliseconds, aborting."
968 );
969 }
970 this->core_stub = presage::physiology::PhysiologyCore::NewStub(channel);
971 return absl::OkStatus();
972}
973
974template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
975absl::Status PhysiologyServiceImpl<TIntegrationMode>::StartCoreServer() {
976 if (!this->venv_verified) {
977 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->VerifyVenv());
978 this->venv_verified = true;
979 }
980 const std::filesystem::path core_server_script = "grpc_core_server.py";
981 const std::filesystem::path local_core_path = PHYSIOLOGY_CORE_DIRECTORY;
982 const std::filesystem::path local_core_server_absolute_path = local_core_path / core_server_script;
983
984 bool mock_setting = false;
985
986 if (!std::filesystem::is_regular_file(local_core_server_absolute_path)) {
987 LOG(INFO) << "Local Physiology Core gRPC Server not found at \""
988 + local_core_server_absolute_path.string()
989 + "\". Assuming development setting and checking for the example physio core server instead...";
990
991 const std::filesystem::path mock_core_server_script = "mock_grpc_core_server.py";
992 const std::filesystem::path local_tests_path = SMARTSPECTRA_ON_PREM_TESTS_DIRECTORY;
993 const std::filesystem::path local_mock_core_server_absolute_path = local_tests_path / mock_core_server_script;
994 if (!std::filesystem::is_regular_file(local_mock_core_server_absolute_path)) {
995 return absl::NotFoundError("Mock Physiology Core gRPC Server not found at \""
996 + local_mock_core_server_absolute_path.string()
997 + "\".");
998 }
999 mock_setting = true;
1000
1001 }
1002
1003 std::string port_string = std::to_string(this->physiology_core_port_number);
1004 std::string random_seed_str = std::to_string(this->test_physiology_core_seed);
1005
1006 std::vector<std::string> string_arguments;
1007 std::vector<const char*> arguments;
1008 if (this->venv_directory.empty()) {
1009 // try running locally w/o Core install
1010 string_arguments = {
1011 "python",
1012 core_server_script,
1013 "--port",
1014 port_string
1015 };
1016 if (mock_setting) {
1017 string_arguments.emplace_back("--seed");
1018 string_arguments.push_back(random_seed_str);
1019 } else {
1020 string_arguments.emplace_back("--headless");
1021 if (this->log_settings.log_edge_core_transfer_times) {
1022 string_arguments.emplace_back("--log_transfer_timing_info");
1023 }
1024 }
1025 std::map<std::string, std::string> variables;
1026 if (!std::string(PHYSIOLOGY_CORE_DIRECTORY).empty()) {
1027 this->physiology_core_process_options.working_directory = PHYSIOLOGY_CORE_DIRECTORY;
1028 variables["PYTHONPATH"] = PHYSIOLOGY_CORE_DIRECTORY;
1029 } else {
1030 variables["PYTHONPATH"] = ".";
1031 }
1032 this->physiology_core_process_options.env.extra = reproc::env(variables);
1033 } else {
1034 std::filesystem::path venv_activate_script = this->venv_directory / "bin" / "activate";
1035 std::string command = "source " + venv_activate_script.string()
1036 + " && python " + local_core_server_absolute_path.string()
1037 + " --port " + port_string + " --headless";
1038 if (this->log_settings.log_edge_core_transfer_times) {
1039 command += " --log_transfer_timing_info";
1040 }
1041 string_arguments = {
1042 "bash",
1043 "-c",
1044 command,
1045 };
1046 }
1047 for (const auto& argument: string_arguments) {
1048 arguments.push_back(argument.c_str());
1049 }
1050 // Ensure the arguments end with a nullptr
1051 arguments.push_back(nullptr);
1052
1053 reproc::stop_actions stop = {
1054 {reproc::stop::noop, reproc::milliseconds(0)},
1055 {reproc::stop::terminate, reproc::milliseconds(5000)},
1056 {reproc::stop::kill, reproc::milliseconds(2000)}
1057 };
1058
1059 this->physiology_core_process_options.stop = stop;
1060 this->physiology_core_process_options.redirect.err.type = reproc::redirect::pipe;
1061 this->physiology_core_process_options.redirect.out.type = reproc::redirect::pipe;
1062 this->physiology_core_process_options.redirect.parent = true;
1063 this->physiology_core_process_options.redirect.discard = false;
1064
1065 std::error_code ec = this->physiology_core_process.start(arguments.data(), this->physiology_core_process_options);
1066 if (ec) {
1067 std::string additional_debugging_info;
1068 if (ec.message().find("No such file") != std::string::npos) {
1069 std::string ps_working_directory = std::filesystem::current_path().string();
1070 std::string files_in_ps_working_directory =
1071 presage::filesystem::GetListOfFilesInDirectoryAsString(ps_working_directory);
1072 std::string pc_working_directory_debugging_info;
1073 std::string physiology_core_directory(PHYSIOLOGY_CORE_DIRECTORY);
1074 if (this->venv_directory.empty() && !physiology_core_directory.empty()) {
1075 pc_working_directory_debugging_info =
1076 "Physiology Core working directory: " + physiology_core_directory + ".\n";
1077 std::string files_in_pc_working_directory =
1078 presage::filesystem::GetListOfFilesInDirectoryAsString(PHYSIOLOGY_CORE_DIRECTORY);
1079 if (files_in_pc_working_directory.empty()) {
1080 pc_working_directory_debugging_info += "Nothing found in Physiology Core working directory.";
1081 } else {
1082 pc_working_directory_debugging_info += "Files in Physiology Core working directory:\n"
1083 + files_in_pc_working_directory;
1084 }
1085 }
1086 additional_debugging_info = +"\nPhysiology Service working directory: " + ps_working_directory +
1087 ".\nFiles in working directory:\n" + files_in_ps_working_directory +
1088 pc_working_directory_debugging_info;
1089 }
1090 return absl::InternalError(
1091 "Failed to start Physiology Core gRPC server: " +
1092 ec.message() + "." + additional_debugging_info
1093 );
1094 }
1095
1096 this->core_log_thread_shutdown = false;
1097 this->core_log_thread = std::thread([this]() {
1098 std::error_code ec =
1099 reproc::drain(
1100 /*process=*/this->physiology_core_process,
1101 /*out=*/this->core_stdout_log_sink,
1102 /*err=*/this->core_stderr_log_sink
1103 );
1104 if (ec) {
1105 if (ec == std::errc::operation_canceled) {
1106 // everything is fine.
1107 return;
1108 }
1109 LOG(ERROR) << "Error draining process output: " << ec.message();
1110 }
1111 });
1112
1113 return absl::OkStatus();
1114}
1115
1116template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1117absl::Status PhysiologyServiceImpl<TIntegrationMode>::StopCoreServer() {
1118 this->core_log_thread_shutdown = true;
1119
1120 // make core server try to log something, so it knows to stop the logging thread
1121
1122 bool have_connection_to_core_server = true;
1123 if (this->core_stub != nullptr) {
1124 grpc::ClientContext core_context;
1125 physiology::BlueTooth request;
1126 google::protobuf::Empty response;
1127 this->core_stub->IssueBlueTooth(&core_context, request, &response);
1128 if (this->core_log_thread.joinable()) {
1129 this->core_log_thread.join();
1130 }
1131 } else {
1132 have_connection_to_core_server = false;
1133 }
1134
1135 std::error_code ec;
1136 int status_code;
1137
1138 // Try to stop the core server regardless of whether streams were shut down. A shutdown w/ a non-zero exit code
1139 // is still better than none at all.
1140 std::tie(status_code, ec) = this->physiology_core_process.stop(this->physiology_core_process_options.stop);
1141
1142 this->physiology_core_process = reproc::process();
1143 if (ec) {
1144 if (!have_connection_to_core_server) {
1145 return absl::InternalError("Physiology Core gRPC server connection could not be established, "
1146 "was not able to stop the Core server 100% gracefully, "
1147 "but it should be stopped.");
1148 } else {
1149 return absl::InternalError(
1150 "Failed to gracefully shut down Physiology Core server. Error: " + ec.message() +
1151 ". Status code: " + std::to_string(status_code) + "."
1152 );
1153 }
1154 } else {
1155 return absl::OkStatus();
1156 }
1157}
1158
1159
1160template<container::settings::IntegrationMode TIntegrationMode>
1161absl::Status PhysiologyServiceImpl<TIntegrationMode>::VerifyVenv() {
1162 if (this->venv_directory.empty()) {
1163 LOG(INFO) << "No venv directory specified, skipping venv activation.";
1164 return absl::OkStatus();
1165 }
1166 std::string usage_message =
1167 "Please ensure that you have a Python 3.11 virtual environment configured inside the root of the "
1168 "physiology repository, that Physiology-Core package installed on that, and that you are running "
1169 "physiology_server from the Physiology repository's root directory.";
1170 std::filesystem::path venv_activate_script = this->venv_directory / "bin" / "activate";
1171 if (!std::filesystem::exists(this->venv_directory)) {
1172 return absl::NotFoundError("Venv directory not found at \"" + this->venv_directory.string() +
1173 "\". " + usage_message);
1174 } else if (!std::filesystem::is_directory(this->venv_directory)) {
1175 return absl::NotFoundError("\"" + this->venv_directory.string() + "\" is not a directory. " + usage_message);
1176 } else if (!std::filesystem::is_regular_file(venv_activate_script)) {
1177 return absl::NotFoundError("Venv activate script not found at \"" + venv_activate_script.string() + "\". " +
1178 usage_message);
1179 }
1180 return absl::OkStatus();
1181}
1182
1183// endregion ===========================================================================================================
1184
1185template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1186grpc::ServerWriteReactor<::presage::smartspectra::LogEntry>* PhysiologyServiceImpl<TIntegrationMode>::StreamLogs(
1187 grpc::CallbackServerContext* context,
1188 const google::protobuf::Empty* request
1189) {
1190 return new LogStreamReactor(log_buffer);
1191}
1192
1193template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1194bool PhysiologyServiceImpl<TIntegrationMode>::StatusRequiresImmediateRetune(
1195 presage::physiology::StatusCode status
1196) {
1197 using presage::physiology::StatusCode;
1198 switch (status) {
1199 case StatusCode::IMAGE_TOO_DARK:
1200 case StatusCode::IMAGE_TOO_BRIGHT:
1201 return true;
1202 default:
1203 return false;
1204 }
1205}
1206
1207template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1208bool PhysiologyServiceImpl<TIntegrationMode>::StatusShouldRetuneAfterRecovery(
1209 presage::physiology::StatusCode status
1210) {
1211 using presage::physiology::StatusCode;
1212 switch (status) {
1213 case StatusCode::IMAGE_TOO_DARK:
1214 case StatusCode::IMAGE_TOO_BRIGHT:
1215 case StatusCode::NO_FACES_FOUND:
1216 return true;
1217 default:
1218 return false;
1219 }
1220}
1221
1222template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1223absl::Status PhysiologyServiceImpl<TIntegrationMode>::HandleStatusForTuning(
1224 const presage::physiology::StatusValue& status_output,
1225 presage::physiology::StatusCode previous_status_code
1226) {
1227 // Only process if tuning is enabled
1228 if (!this->camera_tuning_enabled.load() || this->camera_tuner == nullptr) {
1229 return absl::OkStatus();
1230 }
1231
1232 // Don't trigger tuning if already tuning
1233 if (this->is_tuning_camera.load()) {
1234 return absl::OkStatus();
1235 }
1236
1237 // Don't trigger tuning if recording is active
1238 if (this->is_recording.load()) {
1239 return absl::OkStatus();
1240 }
1241
1242 const auto current_status = static_cast<presage::physiology::StatusCode>(status_output.value());
1243 const bool status_requires_immediate_retune = StatusRequiresImmediateRetune(current_status);
1244 const bool recovered_from_issue =
1245 this->camera_tuning_recheck_on_recovery_ &&
1246 current_status == physiology::StatusCode::OK &&
1247 StatusShouldRetuneAfterRecovery(previous_status_code);
1248
1249 // Nothing to do if status is OK and no recovery-trigger requested.
1250 if (!status_requires_immediate_retune && !recovered_from_issue) {
1251 return absl::OkStatus();
1252 }
1253
1254 // Check cooldown timer
1255 {
1256 std::lock_guard<std::mutex> lock(this->camera_tuning_mutex);
1257 auto now = std::chrono::steady_clock::now();
1258 auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
1259 now - this->last_camera_tuning_time
1260 );
1261
1262 if (elapsed < this->camera_tuning_cooldown_duration) {
1263 // Still in cooldown period
1264 return absl::OkStatus();
1265 }
1266
1267 // Cooldown expired, trigger tuning
1268 this->last_camera_tuning_time = now;
1269 }
1270
1271 // Trigger tuning by starting the tuning process
1272 auto start_status = this->camera_tuner->StartTuning();
1273 if (!start_status.ok()) {
1274 LOG(WARNING) << "Failed to start camera tuning: " << start_status.message();
1275 // Store error for potential inspection
1276 {
1277 std::lock_guard<std::mutex> lock(this->camera_error_mutex_);
1278 this->last_camera_error_ = start_status;
1279 }
1280 return start_status;
1281 }
1282
1283 this->is_tuning_camera.store(true);
1284 if (status_requires_immediate_retune) {
1285 LOG(INFO) << "Status code " << spectra::GetStatusDescription(current_status)
1286 << " triggered camera tuning";
1287 } else {
1288 LOG(INFO) << "Status recovered (" << spectra::GetStatusDescription(previous_status_code)
1289 << " -> " << spectra::GetStatusDescription(current_status)
1290 << "); re-triggering camera tuning to verify conditions";
1291 }
1292
1293 return absl::OkStatus();
1294}
1295
1296template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1297absl::Status PhysiologyServiceImpl<TIntegrationMode>::FlushPendingStatusAfterTuning(
1298 ipc::IpcStreamWriter<TIntegrationMode>* writer
1299) {
1300 if (writer == nullptr) {
1301 return absl::OkStatus();
1302 }
1303
1304 std::optional<presage::physiology::StatusValue> pending_status;
1305 {
1306 std::lock_guard<std::mutex> lock(this->pending_status_mutex_);
1307 if (!this->pending_status_after_tuning_.has_value()) {
1308 return absl::OkStatus();
1309 }
1310 pending_status = this->pending_status_after_tuning_;
1311 this->pending_status_after_tuning_.reset();
1312 }
1313
1314 return writer->WriteStatus(*pending_status);
1315}
1316
1317} // namespace presage::smartspectra::grpc_bindings
grpc::ServerUnaryReactor * StartPreprocessing(grpc::CallbackServerContext *context, const ::google::protobuf::Empty *request, ::google::protobuf::Empty *response) override
Definition physiology_service_impl.hpp:591
grpc::ServerUnaryReactor * StopPreprocessing(grpc::CallbackServerContext *context, const ::google::protobuf::Empty *request, ::google::protobuf::Empty *response) override
Definition physiology_service_impl.hpp:873
grpc::ServerUnaryReactor * StopEdgeGraph(grpc::CallbackServerContext *context, const ::google::protobuf::Empty *request, ::google::protobuf::Empty *response) override
Definition physiology_service_impl.hpp:853
absl::StatusOr< std::shared_ptr< VideoSource > > BuildVideoSource(const VideoSourceSettings &settings)
Factory helper for constructing the appropriate VideoSource implementation based on the provided sett...
Definition factory.cpp:20
InputTransformMode
Transformation applied to frames prior to processing.
Definition input_transform.hpp:19
Definition background_container.cpp:10
Definition physiology_service.hpp:41
Configuration options for constructing a VideoSource.
Definition settings.hpp:23