14#include <grpcpp/grpcpp.h>
15#include <mediapipe/framework/port/status_macros.h>
16#include <mediapipe/framework/port/opencv_core_inc.h>
17#include <physiology/graph/physiology_core_service.grpc.pb.h>
18#include <physiology/graph/physiology_core_service.pb.h>
19#include <physiology/modules/rpc.h>
20#include <physiology/modules/messages/status.h>
21#include <physiology/modules/filesystem.h>
22#include <physiology/modules/configuration.h>
23#include <reproc++/drain.hpp>
24#include <absl/log/log_sink_registry.h>
26#include <smartspectra/container/settings.hpp>
27#include <smartspectra/video_source/factory.hpp>
28#include <mediapipe/framework/port/opencv_imgproc_inc.h>
29#include "physiology_service.hpp"
30#include "smartspectra_on_prem_config.hpp"
31#include "reactors/log_stream_reactor.hpp"
32#include "redis_ipc/redis_ipc_stream_writer.hpp"
34namespace spectra = presage::physiology;
35namespace cs = presage::smartspectra::container::settings;
36namespace rpc = presage::rpc;
37namespace vs = presage::smartspectra::video_source;
39namespace presage::smartspectra::grpc_bindings {
42template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
43PhysiologyServiceImpl<TIntegrationMode>::PhysiologyServiceImpl(
45 double preprocessed_data_buffer_duration_s,
46 std::optional<bool> enable_phasic_bp,
47 std::optional<bool> enable_eda,
48 std::optional<bool> use_full_range_face_detection,
49 std::optional<bool> use_full_pose_landmarks,
50 std::optional<bool> enable_pose_landmark_segmentation,
51 std::optional<bool> enable_micromotion,
53 int camera_device_index,
54 const std::string& input_video_path,
55 const std::string& input_video_time_path,
56 int interframe_delay_ms,
57 bool start_with_recording_on,
58 bool exit_on_input_video_completion,
59 uint16_t physiology_core_port_number,
60 bool physiology_core_manual_control,
61 int physiology_core_connection_timeout_ms,
62 const std::string& venv_directory,
64 const int log_buffer_size,
65 int64_t test_physiology_core_seed,
66 std::unique_ptr<ipc::IpcStreamWriter<TIntegrationMode>> ipc_stream_writer,
67 std::optional<std::string> video_output_directory,
68 bool enable_camera_tuning,
69 video_source::CameraTunerSettings tuner_settings,
70 std::chrono::seconds camera_tuning_cooldown_seconds,
71 bool camera_tuning_recheck_on_recovery
72) : physiology_core_port_number(physiology_core_port_number),
73 physiology_core_manual_control(physiology_core_manual_control),
74 test_physiology_core_seed(test_physiology_core_seed),
75 physiology_core_connection_timeout_ms(physiology_core_connection_timeout_ms),
76 venv_directory(venv_directory),
77 log_settings(log_settings),
78 log_buffer(log_buffer_size),
79 absl_log_sink(this->log_buffer),
80 glog_sink(this->log_buffer),
81 core_stdout_log_sink(this->log_buffer, this->core_log_thread_shutdown),
82 core_stderr_log_sink(this->log_buffer, this->core_log_thread_shutdown),
86 video_source::VideoSourceSettings{},
90 start_with_recording_on,
97 use_full_range_face_detection,
98 use_full_pose_landmarks,
99 enable_pose_landmark_segmentation,
103 log_settings.log_edge_core_transfer_times,
104 video_output_directory,
106 cs::ContinuousSettings{
107 preprocessed_data_buffer_duration_s
110 physiology_core_port_number
114 input_transform_mode(input_transform_mode),
115 use_camera(use_camera),
116 input_video_path(input_video_path),
117 input_video_time_path(input_video_time_path),
118 start_with_recording_on(start_with_recording_on),
119 exit_on_input_video_completion(exit_on_input_video_completion),
120 local_input_enabled(use_camera || !input_video_path.empty()),
121 camera_device_index(camera_device_index),
122 ipc_stream_writer(std::move(ipc_stream_writer)),
123 tuner_settings_(tuner_settings),
124 camera_tuning_cooldown_duration(camera_tuning_cooldown_seconds),
125 camera_tuning_recheck_on_recovery_(camera_tuning_recheck_on_recovery) {
127 this->tuner_settings_.render_calibrating_overlay = enable_camera_tuning;
129 absl::AddLogSink(&this->absl_log_sink);
130 google::AddLogSink(&this->glog_sink);
134template<container::settings::IntegrationMode TIntegrationMode>
135PhysiologyServiceImpl<TIntegrationMode>::~PhysiologyServiceImpl() {
137 if (this->container !=
nullptr) {
138 absl::Status container_stop_status = this->StopAndDestroyContainer();
139 if (!container_stop_status.ok()) {
140 LOG(WARNING) <<
"Failed to stop preprocessing container during destruction: "
141 << container_stop_status.message();
144 if (this->physiology_core_started && !this->physiology_core_manual_control) {
145 absl::Status core_stop_status = this->StopCoreServer();
146 if (!core_stop_status.ok()) {
147 LOG(WARNING) <<
"Failed to stop Physiology Core during destruction: "
148 << core_stop_status.message();
150 this->physiology_core_started =
false;
152 if (this->core_log_thread.joinable()) {
153 this->core_log_thread.join();
157 if (this->local_input_enabled) {
159 std::lock_guard<std::mutex> lk(this->local_input_thread_mutex);
160 this->local_input_thread_shutdown =
true;
162 if (this->local_input_thread.joinable()) {
163 this->local_input_thread.join();
165 this->local_input_source.reset();
168 this->log_buffer.Shutdown();
170 absl::RemoveLogSink(&this->absl_log_sink);
171 google::RemoveLogSink(&this->glog_sink);
174template<container::settings::IntegrationMode TIntegrationMode>
175absl::Status PhysiologyServiceImpl<TIntegrationMode>::AutoStartIfRequested() {
176 if (!this->start_with_recording_on) {
177 return absl::OkStatus();
179 if (this->physiology_core_manual_control) {
180 LOG(INFO) <<
"Skipping auto-start: manual Physiology Core control enabled.";
181 return absl::OkStatus();
183 if (!this->physiology_core_started) {
184 LOG(INFO) <<
"Auto-start requested; launching Physiology Core.";
185 MP_RETURN_IF_ERROR(this->StartCoreServer());
186 MP_RETURN_IF_ERROR(this->StartCoreClient());
187 this->physiology_core_started =
true;
188 LOG(INFO) <<
"Physiology Core auto-start complete.";
190 MP_RETURN_IF_ERROR(this->BuildAndStartContainer());
191 LOG(INFO) <<
"Edge graph initialized; recording set to "
192 << (this->recording_auto_started ?
"on." :
"off.");
193 return absl::OkStatus();
196template<container::settings::IntegrationMode TIntegrationMode>
197void PhysiologyServiceImpl<TIntegrationMode>::SetOnInputVideoCompletionCallback(std::function<
void()> callback) {
198 std::lock_guard<std::mutex> lock(this->input_video_completion_callback_mutex);
199 this->input_video_completion_callback = std::move(callback);
202template<container::settings::IntegrationMode TIntegrationMode>
203absl::Status PhysiologyServiceImpl<TIntegrationMode>::PublishRecordingStateToRedis(
bool recording) {
204 if (this->ipc_stream_writer ==
nullptr) {
205 return absl::OkStatus();
208 dynamic_cast<redis_ipc::RedisIpcStreamWriter<TIntegrationMode>*
>(this->ipc_stream_writer.get());
209 if (redis_writer ==
nullptr) {
210 return absl::OkStatus();
212 return redis_writer->PublishRecordingState(recording);
215template<
typename TFunction>
216inline grpc::ServerUnaryReactor* ExecuteAsyncWithDefaultReactor(
217 grpc::CallbackServerContext* context,
220 auto reactor = context->DefaultReactor();
221 grpc::Status status = std::forward<TFunction>(function)();
222 reactor->Finish(status);
227template<container::settings::IntegrationMode TIntegrationMode>
228template<
typename TProv
iderWithContainer>
229TProviderWithContainer* PhysiologyServiceImpl<TIntegrationMode>::GenericHookToProvider(
230 std::shared_ptr<TProviderWithContainer>& provider
233 if (this->ipc_stream_writer !=
nullptr) {
235 auto error_provider = std::make_shared<TProviderWithContainer>(
nullptr);
236 error_provider->Finish(grpc::Status(grpc::StatusCode::UNAVAILABLE,
237 "gRPC streaming disabled: using alternative IPC backend"));
238 return error_provider.get();
241 std::unique_lock<std::mutex> lock(this->preprocessing_construction_mutex);
242 this->preprocessing_construction_condition.wait(lock, [
this] {
return this->container !=
nullptr; });
243 if (provider !=
nullptr) {
244 if (provider->GetContainer() != this->container.get()) {
246 provider->Finish(grpc::Status::OK);
247 provider->WaitUntilDone();
249 provider = std::make_shared<TProviderWithContainer>(this->container);
252 provider = std::make_shared<TProviderWithContainer>(this->container);
254 return provider.get();
257template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
258grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::StartCore(
259 grpc::CallbackServerContext* context,
260 const ::google::protobuf::Empty* request,
261 google::protobuf::Empty* response
263 return ExecuteAsyncWithDefaultReactor(context, [
this]() -> grpc::Status {
264 if (this->physiology_core_manual_control) {
265 const char* message =
"Skipping start of Physiology Core Service: manual control enabled.";
266 LOG(INFO) << message;
268 if (this->physiology_core_started) {
269 return {::grpc::StatusCode::ALREADY_EXISTS,
"Physiology core is already running."};
271 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(StartCoreServer());
273 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(StartCoreClient());
274 this->physiology_core_started =
true;
275 return ::grpc::Status::OK;
280template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
281absl::Status PhysiologyServiceImpl<TIntegrationMode>::BuildAndStartContainer() {
282 std::unique_lock<std::mutex> lock(this->preprocessing_construction_mutex);
283 if (this->container ==
nullptr) {
284 this->container = std::make_shared<BackgroundContainer>(this->container_settings);
285 if (this->ipc_stream_writer ==
nullptr) {
287 if (this->frame_with_timestamp_recorder !=
nullptr) {
288 this->frame_with_timestamp_recorder->SwapContainer(this->container);
290 if (this->status_code_provider !=
nullptr) {
291 this->status_code_provider->SwapContainer(this->container);
293 if (this->metrics_provider !=
nullptr) {
294 this->metrics_provider->SwapContainer(this->container);
296 if (this->edge_metrics_provider !=
nullptr) {
297 this->edge_metrics_provider->SwapContainer(this->container);
302 auto swap_status = this->ipc_stream_writer->SwapContainer(this->container);
303 if (!swap_status.ok()) {
308 if (this->use_camera) {
309 if (!this->local_input_source) {
311 vs_settings.device_index = this->camera_device_index;
312 vs_settings.capture_height_px = 720;
313 vs_settings.capture_width_px = 1280;
314 vs_settings.input_transform_mode = this->input_transform_mode;
316 this->local_input_source = std::move(source);
319 if (this->tuner_settings_.render_calibrating_overlay && this->camera_tuner ==
nullptr) {
320 this->camera_tuner = std::make_shared<video_source::CameraTuner>(this->tuner_settings_);
322 auto status = this->camera_tuner->SetVideoSource(this->local_input_source);
324 this->local_input_source = this->camera_tuner->GetVideoSource();
325 this->camera_tuning_enabled =
true;
326 LOG(INFO) <<
"Camera tuner created and video source attached for IPC mode.";
328 LOG(WARNING) <<
"Camera tuning disabled: " << status.message();
329 this->camera_tuner.reset();
330 this->camera_tuning_enabled =
false;
335 auto* ipc_writer_ptr = this->ipc_stream_writer.get();
338 if (this->use_camera && this->camera_tuning_enabled.load()) {
339 auto* tuner_ptr = this->camera_tuner.get();
342 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->container->SetOnStatusCode(
343 [
this, tuner_ptr, ipc_writer_ptr](presage::physiology::StatusValue status_value) {
344 if (tuner_ptr && tuner_ptr->IsTuning()) {
345 auto status_code = static_cast<presage::physiology::StatusCode>(status_value.value());
346 auto tuning_status = tuner_ptr->ProcessFrame(status_code, status_value.timestamp());
347 if (!tuning_status.ok()) {
348 LOG(WARNING) <<
"Camera tuner ProcessFrame failed: " << tuning_status.message();
350 }
else if (ipc_writer_ptr !=
nullptr) {
351 auto flush_status = this->FlushPendingStatusAfterTuning(ipc_writer_ptr);
352 if (!flush_status.ok()) {
356 return absl::OkStatus();
359 LOG(INFO) <<
"Camera tuning status monitoring enabled.";
363 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->container->SetOnStatusChange(
364 [
this, ipc_writer_ptr](presage::physiology::StatusValue status_value) {
365 auto new_status = static_cast<presage::physiology::StatusCode>(status_value.value());
366 auto previous_status = this->latest_status_code_.exchange(new_status);
367 this->latest_status_change_timestamp_us_.store(status_value.timestamp());
370 auto tuning_status = this->HandleStatusForTuning(status_value, previous_status);
371 if (!tuning_status.ok()) {
372 return tuning_status;
375 if (!this->camera_tuner || !this->camera_tuner->IsTuning()) {
376 auto flush_status = this->FlushPendingStatusAfterTuning(ipc_writer_ptr);
377 if (!flush_status.ok()) {
380 return ipc_writer_ptr->WriteStatus(status_value);
383 if (new_status == presage::physiology::StatusCode::OK) {
384 presage::physiology::StatusValue masked_status(status_value);
386 std::lock_guard<std::mutex> lock(this->pending_status_mutex_);
387 this->pending_status_after_tuning_ = status_value;
389 masked_status.set_value(presage::physiology::StatusCode::CAMERA_TUNING_IN_PROGRESS);
390 return ipc_writer_ptr->WriteStatus(masked_status);
393 return ipc_writer_ptr->WriteStatus(status_value);
397 if (this->log_settings.log_dropped_frames) {
398 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->container->SetOnFrameSentThrough(
399 [](
bool frame_sent_through, int64_t input_timestamp) {
400 if (!frame_sent_through) {
401 LOG(INFO) <<
"Dropped frame with capture timestamp " << input_timestamp <<
" microseconds.";
403 return absl::OkStatus();
407 if (this->log_settings.log_throughput_and_latency) {
408 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->container->SetOnCorePerformanceTelemetry([](
409 double effective_core_fps,
410 double effective_core_latency,
411 int64_t input_timestamp
413 LOG(INFO) <<
"Effective system throughput: " << std::fixed << std::setprecision(3)
414 << effective_core_fps
415 <<
" FPS/HZ, system latency: " << std::fixed << std::setprecision(3)
416 << effective_core_latency <<
" s [window: 3s for both]";
417 return absl::OkStatus();
421 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(container->Initialize());
422 if (!this->container->GraphIsRunning()) {
423 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(container->StartGraph());
425 if (this->start_with_recording_on && !this->recording_auto_started) {
426 auto recording_status = this->container->SetRecording(
true);
427 if (!recording_status.ok()) {
428 LOG(WARNING) <<
"Failed to auto-start recording: " << recording_status.message();
430 auto publish_status = PublishRecordingStateToRedis(
true);
431 if (!publish_status.ok()) {
432 LOG(WARNING) <<
"Failed to publish auto-start recording state: " << publish_status.message();
434 this->recording_auto_started =
true;
435 this->is_recording.store(
true);
439 this->preprocessing_construction_condition.notify_all();
442 if (this->local_input_enabled) {
443 std::lock_guard<std::mutex> local_lock(this->local_input_thread_mutex);
444 if (!this->local_input_thread.joinable()) {
445 if (!this->local_input_source) {
447 vs_settings.input_transform_mode = this->input_transform_mode;
448 if (!this->input_video_path.empty()) {
449 vs_settings.input_video_path = this->input_video_path;
450 vs_settings.input_video_time_path = this->input_video_time_path;
452 vs_settings.device_index = this->camera_device_index;
453 vs_settings.capture_height_px = 720;
454 vs_settings.capture_width_px = 1280;
457 this->local_input_source = std::move(source);
459 this->local_input_thread_shutdown =
false;
460 this->local_input_thread = std::thread([
this]() {
461 if (this->use_camera && this->camera_tuning_enabled.load() && this->camera_tuner !=
nullptr) {
462 LOG(INFO) <<
"Starting initial camera tuning...";
463 auto start_status = this->camera_tuner->StartTuning();
464 if (start_status.ok()) {
465 this->is_tuning_camera.store(
true);
466 LOG(INFO) <<
"Initial camera tuning started";
468 std::lock_guard<std::mutex> lock(this->camera_tuning_mutex);
469 this->last_camera_tuning_time = std::chrono::steady_clock::now();
472 LOG(WARNING) <<
"Failed to start initial camera tuning: " << start_status.message();
474 std::lock_guard<std::mutex> lock(this->camera_error_mutex_);
475 this->last_camera_error_ = start_status;
480 int64_t previous_frame_ts_us = 0;
481 const bool fixed_delay_enabled = !this->use_camera && this->container_settings.interframe_delay_ms > 0;
483 while (!this->local_input_thread_shutdown.load()) {
485 std::shared_ptr<BackgroundContainer> local_container;
487 std::unique_lock<std::mutex> c_lock(this->preprocessing_construction_mutex);
488 if (this->container ==
nullptr || !this->container->GraphIsRunning()) {
491 std::this_thread::sleep_for(std::chrono::milliseconds(10));
494 local_container = this->container;
498 (*this->local_input_source) >> frame_bgr;
499 if (frame_bgr.empty()) {
500 this->recording_auto_started =
false;
501 if (!this->input_video_path.empty()) {
502 LOG(INFO) <<
"Input video source exhausted; stopping recording.";
503 absl::Status stop_status = local_container->SetRecording(
false);
504 if (!stop_status.ok()) {
505 LOG(WARNING) <<
"Failed to stop recording after replay finished: "
506 << stop_status.message();
508 auto publish_status = PublishRecordingStateToRedis(
false);
509 if (!publish_status.ok()) {
510 LOG(WARNING) <<
"Failed to publish recording stop after replay: "
511 << publish_status.message();
513 this->is_recording.store(
false);
515 if (this->exit_on_input_video_completion) {
516 std::function<void()> callback_copy;
518 std::lock_guard<std::mutex> cb_lock(this->input_video_completion_callback_mutex);
519 callback_copy = this->input_video_completion_callback;
522 LOG(INFO) <<
"Input video playback complete; signaling shutdown.";
526 this->local_input_thread_shutdown =
true;
530 std::this_thread::sleep_for(std::chrono::milliseconds(5));
534 if (fixed_delay_enabled) {
535 std::this_thread::sleep_for(std::chrono::milliseconds(this->container_settings.interframe_delay_ms));
538 int64_t ts_us = this->local_input_source->GetFrameTimestamp();
541 cv::cvtColor(frame_bgr, frame_rgb, cv::COLOR_BGR2RGB);
544 if (this->use_camera && this->camera_tuner !=
nullptr) {
545 if (this->is_tuning_camera.load() && !this->camera_tuner->IsTuning()) {
546 this->is_tuning_camera.store(
false);
547 auto stage = this->camera_tuner->GetCurrentStage();
548 if (stage == vs::TuningStage::COMPLETE) {
549 LOG(INFO) <<
"Camera tuning completed successfully";
550 }
else if (stage == vs::TuningStage::FAILED) {
551 LOG(WARNING) <<
"Camera tuning failed";
553 LOG(INFO) <<
"Camera tuning stopped";
558 frame_rgb = this->camera_tuner->ProcessFrameForDisplay(frame_rgb);
561 absl::Status s = local_container->AddFrameWithTimestamp(frame_rgb, ts_us);
564 std::this_thread::sleep_for(std::chrono::milliseconds(2));
570 return absl::OkStatus();
573template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
574grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::StartEdgeGraph(
575 grpc::CallbackServerContext* context,
576 const ::google::protobuf::Empty* request,
577 ::google::protobuf::Empty* response
579 return ExecuteAsyncWithDefaultReactor(context, [
this]() -> grpc::Status {
580 if (this->physiology_core_started || this->physiology_core_manual_control) {
581 return ::rpc::abslStatusToGrpc(BuildAndStartContainer());
583 return {::grpc::StatusCode::FAILED_PRECONDITION,
"Physiology Core is not running, unable to start graph."};
590template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
592 grpc::CallbackServerContext* context,
593 const ::google::protobuf::Empty* request,
594 google::protobuf::Empty* response
596 return this->StartEdgeGraph(context, request, response);
602template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
603grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::IsPreprocessingRunning(
604 grpc::CallbackServerContext* context,
605 const ::google::protobuf::Empty* request,
606 ::google::protobuf::BoolValue* response
608 return this->IsEdgeGraphRunning(context, request, response);
611template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
612grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::WaitUntilGraphIsIdle(
613 grpc::CallbackServerContext* context,
614 const ::google::protobuf::Empty* request,
615 google::protobuf::Empty* response
617 return ExecuteAsyncWithDefaultReactor(context, [
this]() -> grpc::Status {
618 return rpc::abslStatusToGrpc(
container->WaitUntilGraphIsIdle());
622template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
623grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::SetRecording(
624 grpc::CallbackServerContext* context,
625 const google::protobuf::BoolValue* request,
626 google::protobuf::Empty* response
628 return ExecuteAsyncWithDefaultReactor(context, [
this, request]() -> grpc::Status {
629 if (this->container ==
nullptr) {
631 grpc::StatusCode::FAILED_PRECONDITION,
632 "Physiology Preprocessing is not running, unable to start or stop recording."
636 bool recording = request->value();
637 bool was_recording = this->is_recording.load();
640 if (recording && this->is_tuning_camera.load()) {
641 LOG(INFO) <<
"Ignoring recording start request - camera tuning in progress";
643 return grpc::Status::OK;
647 this->is_recording.store(recording);
650 if (this->camera_tuning_enabled.load() && this->camera_tuner !=
nullptr) {
651 if (recording && !was_recording) {
653 if (this->is_tuning_camera.load()) {
654 LOG(INFO) <<
"Recording started - interrupting tuning";
656 this->is_tuning_camera.store(
false);
658 }
else if (!recording && was_recording) {
660 LOG(INFO) <<
"Recording stopped - tuning will check status after cooldown";
661 std::lock_guard<std::mutex> lock(this->camera_tuning_mutex);
662 this->last_camera_tuning_time = std::chrono::steady_clock::now();
666 auto status = container->SetRecording(recording);
670 auto publish_status = PublishRecordingStateToRedis(recording);
671 if (!publish_status.ok()) {
672 LOG(WARNING) <<
"Failed to publish recording state: " << publish_status.message();
676 return rpc::abslStatusToGrpc(status);
680template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
681grpc::ServerReadReactor<presage::smartspectra::FrameWithTimestamp>*
682PhysiologyServiceImpl<TIntegrationMode>::AddFrameWithTimestamp(
683 grpc::CallbackServerContext* context,
684 google::protobuf::Empty* response
687 if (this->ipc_stream_writer !=
nullptr) {
688 auto error_recorder = std::make_shared<TFrameWithTimestampRecorder>(
nullptr, this->input_transform_mode);
689 error_recorder->Finish(grpc::Status(grpc::StatusCode::UNAVAILABLE,
690 "gRPC frame streaming disabled: using alternative IPC backend"));
691 return error_recorder.get();
694 std::unique_lock<std::mutex> lock(this->preprocessing_construction_mutex);
695 this->preprocessing_construction_condition.wait(lock, [
this] {
return this->container !=
nullptr; });
696 if (this->frame_with_timestamp_recorder !=
nullptr) {
697 if (this->frame_with_timestamp_recorder->GetContainer() != this->container.get()) {
699 this->frame_with_timestamp_recorder->Finish(grpc::Status::OK);
700 this->frame_with_timestamp_recorder->WaitUntilDone();
701 this->frame_with_timestamp_recorder.reset();
702 this->frame_with_timestamp_recorder = std::make_shared<TFrameWithTimestampRecorder>(this->container,
703 this->input_transform_mode);
706 this->frame_with_timestamp_recorder = std::make_shared<TFrameWithTimestampRecorder>(this->container,
707 this->input_transform_mode);
709 return this->frame_with_timestamp_recorder.get();
712inline cv::Mat BuildOpenCVMatFromProtoBuf(
const smartspectra::Mat& mat_protobuf) {
713 cv::Mat mat_cv(mat_protobuf.rows(), mat_protobuf.cols(), mat_protobuf.element_type());
714 size_t data_bytesize = mat_cv.rows * mat_cv.cols * mat_cv.elemSize();
715 auto data_protobuf =
const_cast<char*
>(mat_protobuf.data().data());
716 std::copy(
reinterpret_cast<unsigned char*
>(data_protobuf),
717 reinterpret_cast<unsigned char*
>(data_protobuf + data_bytesize),
722template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
723grpc::ServerWriteReactor<physiology::StatusValue>* PhysiologyServiceImpl<TIntegrationMode>::GetStatusCode(
724 grpc::CallbackServerContext* context,
725 const ::google::protobuf::Empty* request
727 return this->GenericHookToProvider<TStatusCodeProvider>(this->status_code_provider);
730template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
731grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::GetStatusDescription(
732 grpc::CallbackServerContext* context,
733 const physiology::StatusValue* request,
734 google::protobuf::StringValue* response
736 return ExecuteAsyncWithDefaultReactor(
738 [
this, &response, &request]() -> grpc::Status {
739 response->set_value(spectra::GetStatusDescription(request->value()));
740 return ::grpc::Status::OK;
745template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
746grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::GetStatusHint(
747 grpc::CallbackServerContext* context,
748 const physiology::StatusValue* request,
749 google::protobuf::StringValue* response
751 return ExecuteAsyncWithDefaultReactor(
753 [
this, &response, &request]() -> grpc::Status {
754 response->set_value(spectra::GetStatusHint(request->value()));
755 return ::grpc::Status::OK;
760template<container::settings::IntegrationMode TIntegrationMode>
761grpc::ServerWriteReactor<presage::physiology::MetricsBuffer>*
762PhysiologyServiceImpl<TIntegrationMode>::GetCoreMetrics(
763 grpc::CallbackServerContext* context,
764 const google::protobuf::Empty* request
766 return this->GenericHookToProvider<TMetricsProvider>(this->metrics_provider);
769template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
770grpc::ServerWriteReactor<presage::physiology::MetricsBuffer>*
771PhysiologyServiceImpl<TIntegrationMode>::GetMetrics(
772 grpc::CallbackServerContext* context,
773 const ::google::protobuf::Empty* request
775 return this->GetCoreMetrics(context, request);
778template<container::settings::IntegrationMode TIntegrationMode>
779grpc::ServerWriteReactor<presage::physiology::Metrics>* PhysiologyServiceImpl<TIntegrationMode>::GetEdgeMetrics(
780 grpc::CallbackServerContext* context,
781 const google::protobuf::Empty* request
783 return this->GenericHookToProvider<TEdgeMetricsProvider>(this->edge_metrics_provider);
786template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
787grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::StopCore(
788 grpc::CallbackServerContext* context,
789 const ::google::protobuf::Empty* request,
790 google::protobuf::Empty* response
792 return ExecuteAsyncWithDefaultReactor(context, [
this]() -> grpc::Status {
793 if (this->container !=
nullptr) {
794 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->StopAndDestroyContainer());
796 absl::Status status = absl::OkStatus();
797 if (this->physiology_core_manual_control) {
798 const char* message =
"Skipping shutdown of Physiology Core Service: manual control enabled.";
799 LOG(INFO) << message;
801 if (this->physiology_core_started) {
802 status = StopCoreServer();
804 this->physiology_core_started =
false;
807 LOG(INFO) <<
"No need to shut down Physiology Core Service: process not started.";
808 status = absl::OkStatus();
810 if (this->core_stub !=
nullptr) {
811 this->core_stub.reset();
814 return rpc::abslStatusToGrpc(status);
818template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
819absl::Status PhysiologyServiceImpl<TIntegrationMode>::StopAndDestroyContainer() {
821 if (this->local_input_enabled) {
822 std::lock_guard<std::mutex> local_lock(this->local_input_thread_mutex);
823 this->local_input_thread_shutdown =
true;
824 if (this->local_input_thread.joinable()) {
825 this->local_input_thread.join();
829 if (this->container !=
nullptr) {
830 const bool graph_was_running = this->container->GraphIsRunning();
831 if (graph_was_running) {
832 absl::Status idle_status = container->WaitUntilGraphIsIdle();
833 if (!idle_status.ok() && !absl::IsFailedPrecondition(idle_status)) {
834 LOG(WARNING) <<
"WaitUntilGraphIsIdle failed prior to shutdown: " << idle_status.message();
837 absl::Status stop_status = container->StopGraph();
838 if (!stop_status.ok()) {
839 LOG(WARNING) <<
"StopGraph failed during shutdown: " << stop_status.message();
842 this->container.reset();
843 this->recording_auto_started =
false;
844 return absl::OkStatus();
852template<container::settings::IntegrationMode TIntegrationMode>
854 grpc::CallbackServerContext* context,
855 const ::google::protobuf::Empty* request,
856 ::google::protobuf::Empty* response
858 return ExecuteAsyncWithDefaultReactor(
860 [
this]() -> grpc::Status {
861 return rpc::abslStatusToGrpc(this->StopAndDestroyContainer());
872template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
874 grpc::CallbackServerContext* context,
875 const ::google::protobuf::Empty* request,
876 google::protobuf::Empty* response
881template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
882grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::ResetProcessing(
883 grpc::CallbackServerContext* context,
884 const ::google::protobuf::Empty* request,
885 google::protobuf::Empty* response
887 return ExecuteAsyncWithDefaultReactor(
889 [
this, &request, &response]() -> grpc::Status {
890 if (!physiology_core_manual_control && !this->physiology_core_started) {
891 return {::grpc::StatusCode::FAILED_PRECONDITION,
892 "Physiology Core has not been started, unable to clean test buffer."};
895 if (this->container ==
nullptr) {
896 return {::grpc::StatusCode::FAILED_PRECONDITION,
897 "Physiology Preprocessing has not been started, unable to clean test buffer."};
901 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(StopAndDestroyContainer());
903 grpc::ClientContext core_context;
905 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(core_stub->ResetProcessing(&core_context, *request, response));
908 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(BuildAndStartContainer());
909 return ::grpc::Status::OK;
915template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
916grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::CheckHealth(
917 grpc::CallbackServerContext* context,
918 const ::google::protobuf::Empty* request,
919 ::google::protobuf::StringValue* response
921 return ExecuteAsyncWithDefaultReactor(context, [response]() {
922 response->set_value(
"Service is healthy");
923 return ::grpc::Status::OK;
927template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
928grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::IsPhysiologyCoreUp(
929 grpc::CallbackServerContext* context,
930 const ::google::protobuf::Empty* request,
931 ::google::protobuf::BoolValue* response
933 return ExecuteAsyncWithDefaultReactor(context, [
this, response]() {
934 response->set_value(this->physiology_core_started);
935 return ::grpc::Status::OK;
939template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
940grpc::ServerUnaryReactor* PhysiologyServiceImpl<TIntegrationMode>::IsEdgeGraphRunning(
941 grpc::CallbackServerContext* context,
942 const ::google::protobuf::Empty* request,
943 ::google::protobuf::BoolValue* response
945 return ExecuteAsyncWithDefaultReactor(context, [
this, response]() {
946 response->set_value(this->container !=
nullptr && this->container->GraphIsRunning());
947 return ::grpc::Status::OK;
952template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
953absl::Status PhysiologyServiceImpl<TIntegrationMode>::StartCoreClient() {
954 channel = grpc::CreateChannel(
"0.0.0.0:" + std::to_string(this->physiology_core_port_number),
955 grpc::InsecureChannelCredentials());
956 int retry_interval_ms = 50;
957 int remaining_time = this->physiology_core_connection_timeout_ms;
958 while (channel->GetState(
true) != GRPC_CHANNEL_READY && remaining_time > 0) {
959 channel->WaitForConnected(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
960 gpr_time_from_millis(retry_interval_ms, GPR_TIMESPAN)));
961 remaining_time -= retry_interval_ms;
963 if (channel->GetState(
true) != GRPC_CHANNEL_READY) {
965 return absl::InternalError(
966 "Failed to connect to Physiology Core gRPC Server within " +
967 std::to_string(this->physiology_core_connection_timeout_ms) +
" milliseconds, aborting."
970 this->core_stub = presage::physiology::PhysiologyCore::NewStub(channel);
971 return absl::OkStatus();
974template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
975absl::Status PhysiologyServiceImpl<TIntegrationMode>::StartCoreServer() {
976 if (!this->venv_verified) {
977 PHYSIOLOGY_GRPC_RETURN_IF_ERROR(this->VerifyVenv());
978 this->venv_verified =
true;
980 const std::filesystem::path core_server_script =
"grpc_core_server.py";
981 const std::filesystem::path local_core_path = PHYSIOLOGY_CORE_DIRECTORY;
982 const std::filesystem::path local_core_server_absolute_path = local_core_path / core_server_script;
984 bool mock_setting =
false;
986 if (!std::filesystem::is_regular_file(local_core_server_absolute_path)) {
987 LOG(INFO) <<
"Local Physiology Core gRPC Server not found at \""
988 + local_core_server_absolute_path.string()
989 +
"\". Assuming development setting and checking for the example physio core server instead...";
991 const std::filesystem::path mock_core_server_script =
"mock_grpc_core_server.py";
992 const std::filesystem::path local_tests_path = SMARTSPECTRA_ON_PREM_TESTS_DIRECTORY;
993 const std::filesystem::path local_mock_core_server_absolute_path = local_tests_path / mock_core_server_script;
994 if (!std::filesystem::is_regular_file(local_mock_core_server_absolute_path)) {
995 return absl::NotFoundError(
"Mock Physiology Core gRPC Server not found at \""
996 + local_mock_core_server_absolute_path.string()
1003 std::string port_string = std::to_string(this->physiology_core_port_number);
1004 std::string random_seed_str = std::to_string(this->test_physiology_core_seed);
1006 std::vector<std::string> string_arguments;
1007 std::vector<const char*> arguments;
1008 if (this->venv_directory.empty()) {
1010 string_arguments = {
1017 string_arguments.emplace_back(
"--seed");
1018 string_arguments.push_back(random_seed_str);
1020 string_arguments.emplace_back(
"--headless");
1021 if (this->log_settings.log_edge_core_transfer_times) {
1022 string_arguments.emplace_back(
"--log_transfer_timing_info");
1025 std::map<std::string, std::string> variables;
1026 if (!std::string(PHYSIOLOGY_CORE_DIRECTORY).empty()) {
1027 this->physiology_core_process_options.working_directory = PHYSIOLOGY_CORE_DIRECTORY;
1028 variables[
"PYTHONPATH"] = PHYSIOLOGY_CORE_DIRECTORY;
1030 variables[
"PYTHONPATH"] =
".";
1032 this->physiology_core_process_options.env.extra = reproc::env(variables);
1034 std::filesystem::path venv_activate_script = this->venv_directory /
"bin" /
"activate";
1035 std::string command =
"source " + venv_activate_script.string()
1036 +
" && python " + local_core_server_absolute_path.string()
1037 +
" --port " + port_string +
" --headless";
1038 if (this->log_settings.log_edge_core_transfer_times) {
1039 command +=
" --log_transfer_timing_info";
1041 string_arguments = {
1047 for (
const auto& argument: string_arguments) {
1048 arguments.push_back(argument.c_str());
1051 arguments.push_back(
nullptr);
1053 reproc::stop_actions stop = {
1054 {reproc::stop::noop, reproc::milliseconds(0)},
1055 {reproc::stop::terminate, reproc::milliseconds(5000)},
1056 {reproc::stop::kill, reproc::milliseconds(2000)}
1059 this->physiology_core_process_options.stop = stop;
1060 this->physiology_core_process_options.redirect.err.type = reproc::redirect::pipe;
1061 this->physiology_core_process_options.redirect.out.type = reproc::redirect::pipe;
1062 this->physiology_core_process_options.redirect.parent =
true;
1063 this->physiology_core_process_options.redirect.discard =
false;
1065 std::error_code ec = this->physiology_core_process.start(arguments.data(), this->physiology_core_process_options);
1067 std::string additional_debugging_info;
1068 if (ec.message().find(
"No such file") != std::string::npos) {
1069 std::string ps_working_directory = std::filesystem::current_path().string();
1070 std::string files_in_ps_working_directory =
1071 presage::filesystem::GetListOfFilesInDirectoryAsString(ps_working_directory);
1072 std::string pc_working_directory_debugging_info;
1073 std::string physiology_core_directory(PHYSIOLOGY_CORE_DIRECTORY);
1074 if (this->venv_directory.empty() && !physiology_core_directory.empty()) {
1075 pc_working_directory_debugging_info =
1076 "Physiology Core working directory: " + physiology_core_directory +
".\n";
1077 std::string files_in_pc_working_directory =
1078 presage::filesystem::GetListOfFilesInDirectoryAsString(PHYSIOLOGY_CORE_DIRECTORY);
1079 if (files_in_pc_working_directory.empty()) {
1080 pc_working_directory_debugging_info +=
"Nothing found in Physiology Core working directory.";
1082 pc_working_directory_debugging_info +=
"Files in Physiology Core working directory:\n"
1083 + files_in_pc_working_directory;
1086 additional_debugging_info = +
"\nPhysiology Service working directory: " + ps_working_directory +
1087 ".\nFiles in working directory:\n" + files_in_ps_working_directory +
1088 pc_working_directory_debugging_info;
1090 return absl::InternalError(
1091 "Failed to start Physiology Core gRPC server: " +
1092 ec.message() +
"." + additional_debugging_info
1096 this->core_log_thread_shutdown =
false;
1097 this->core_log_thread = std::thread([
this]() {
1098 std::error_code ec =
1100 this->physiology_core_process,
1101 this->core_stdout_log_sink,
1102 this->core_stderr_log_sink
1105 if (ec == std::errc::operation_canceled) {
1109 LOG(ERROR) <<
"Error draining process output: " << ec.message();
1113 return absl::OkStatus();
1116template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1117absl::Status PhysiologyServiceImpl<TIntegrationMode>::StopCoreServer() {
1118 this->core_log_thread_shutdown =
true;
1122 bool have_connection_to_core_server =
true;
1123 if (this->core_stub !=
nullptr) {
1124 grpc::ClientContext core_context;
1125 physiology::BlueTooth request;
1126 google::protobuf::Empty response;
1127 this->core_stub->IssueBlueTooth(&core_context, request, &response);
1128 if (this->core_log_thread.joinable()) {
1129 this->core_log_thread.join();
1132 have_connection_to_core_server =
false;
1140 std::tie(status_code, ec) = this->physiology_core_process.stop(this->physiology_core_process_options.stop);
1142 this->physiology_core_process = reproc::process();
1144 if (!have_connection_to_core_server) {
1145 return absl::InternalError(
"Physiology Core gRPC server connection could not be established, "
1146 "was not able to stop the Core server 100% gracefully, "
1147 "but it should be stopped.");
1149 return absl::InternalError(
1150 "Failed to gracefully shut down Physiology Core server. Error: " + ec.message() +
1151 ". Status code: " + std::to_string(status_code) +
"."
1155 return absl::OkStatus();
1160template<container::settings::IntegrationMode TIntegrationMode>
1161absl::Status PhysiologyServiceImpl<TIntegrationMode>::VerifyVenv() {
1162 if (this->venv_directory.empty()) {
1163 LOG(INFO) <<
"No venv directory specified, skipping venv activation.";
1164 return absl::OkStatus();
1166 std::string usage_message =
1167 "Please ensure that you have a Python 3.11 virtual environment configured inside the root of the "
1168 "physiology repository, that Physiology-Core package installed on that, and that you are running "
1169 "physiology_server from the Physiology repository's root directory.";
1170 std::filesystem::path venv_activate_script = this->venv_directory /
"bin" /
"activate";
1171 if (!std::filesystem::exists(this->venv_directory)) {
1172 return absl::NotFoundError(
"Venv directory not found at \"" + this->venv_directory.string() +
1173 "\". " + usage_message);
1174 }
else if (!std::filesystem::is_directory(this->venv_directory)) {
1175 return absl::NotFoundError(
"\"" + this->venv_directory.string() +
"\" is not a directory. " + usage_message);
1176 }
else if (!std::filesystem::is_regular_file(venv_activate_script)) {
1177 return absl::NotFoundError(
"Venv activate script not found at \"" + venv_activate_script.string() +
"\". " +
1180 return absl::OkStatus();
1185template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1186grpc::ServerWriteReactor<::presage::smartspectra::LogEntry>* PhysiologyServiceImpl<TIntegrationMode>::StreamLogs(
1187 grpc::CallbackServerContext* context,
1188 const google::protobuf::Empty* request
1190 return new LogStreamReactor(log_buffer);
1193template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1194bool PhysiologyServiceImpl<TIntegrationMode>::StatusRequiresImmediateRetune(
1195 presage::physiology::StatusCode status
1197 using presage::physiology::StatusCode;
1199 case StatusCode::IMAGE_TOO_DARK:
1200 case StatusCode::IMAGE_TOO_BRIGHT:
1207template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1208bool PhysiologyServiceImpl<TIntegrationMode>::StatusShouldRetuneAfterRecovery(
1209 presage::physiology::StatusCode status
1211 using presage::physiology::StatusCode;
1213 case StatusCode::IMAGE_TOO_DARK:
1214 case StatusCode::IMAGE_TOO_BRIGHT:
1215 case StatusCode::NO_FACES_FOUND:
1222template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1223absl::Status PhysiologyServiceImpl<TIntegrationMode>::HandleStatusForTuning(
1224 const presage::physiology::StatusValue& status_output,
1225 presage::physiology::StatusCode previous_status_code
1228 if (!this->camera_tuning_enabled.load() || this->camera_tuner ==
nullptr) {
1229 return absl::OkStatus();
1233 if (this->is_tuning_camera.load()) {
1234 return absl::OkStatus();
1238 if (this->is_recording.load()) {
1239 return absl::OkStatus();
1242 const auto current_status =
static_cast<presage::physiology::StatusCode
>(status_output.value());
1243 const bool status_requires_immediate_retune = StatusRequiresImmediateRetune(current_status);
1244 const bool recovered_from_issue =
1245 this->camera_tuning_recheck_on_recovery_ &&
1246 current_status == physiology::StatusCode::OK &&
1247 StatusShouldRetuneAfterRecovery(previous_status_code);
1250 if (!status_requires_immediate_retune && !recovered_from_issue) {
1251 return absl::OkStatus();
1256 std::lock_guard<std::mutex> lock(this->camera_tuning_mutex);
1257 auto now = std::chrono::steady_clock::now();
1258 auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
1259 now - this->last_camera_tuning_time
1262 if (elapsed < this->camera_tuning_cooldown_duration) {
1264 return absl::OkStatus();
1268 this->last_camera_tuning_time = now;
1272 auto start_status = this->camera_tuner->StartTuning();
1273 if (!start_status.ok()) {
1274 LOG(WARNING) <<
"Failed to start camera tuning: " << start_status.message();
1277 std::lock_guard<std::mutex> lock(this->camera_error_mutex_);
1278 this->last_camera_error_ = start_status;
1280 return start_status;
1283 this->is_tuning_camera.store(
true);
1284 if (status_requires_immediate_retune) {
1285 LOG(INFO) <<
"Status code " << spectra::GetStatusDescription(current_status)
1286 <<
" triggered camera tuning";
1288 LOG(INFO) <<
"Status recovered (" << spectra::GetStatusDescription(previous_status_code)
1289 <<
" -> " << spectra::GetStatusDescription(current_status)
1290 <<
"); re-triggering camera tuning to verify conditions";
1293 return absl::OkStatus();
1296template<smartspectra::container::settings::IntegrationMode TIntegrationMode>
1297absl::Status PhysiologyServiceImpl<TIntegrationMode>::FlushPendingStatusAfterTuning(
1298 ipc::IpcStreamWriter<TIntegrationMode>* writer
1300 if (writer ==
nullptr) {
1301 return absl::OkStatus();
1304 std::optional<presage::physiology::StatusValue> pending_status;
1306 std::lock_guard<std::mutex> lock(this->pending_status_mutex_);
1307 if (!this->pending_status_after_tuning_.has_value()) {
1308 return absl::OkStatus();
1310 pending_status = this->pending_status_after_tuning_;
1311 this->pending_status_after_tuning_.reset();
1314 return writer->WriteStatus(*pending_status);
grpc::ServerUnaryReactor * StartPreprocessing(grpc::CallbackServerContext *context, const ::google::protobuf::Empty *request, ::google::protobuf::Empty *response) override
Definition physiology_service_impl.hpp:591
grpc::ServerUnaryReactor * StopPreprocessing(grpc::CallbackServerContext *context, const ::google::protobuf::Empty *request, ::google::protobuf::Empty *response) override
Definition physiology_service_impl.hpp:873
grpc::ServerUnaryReactor * StopEdgeGraph(grpc::CallbackServerContext *context, const ::google::protobuf::Empty *request, ::google::protobuf::Empty *response) override
Definition physiology_service_impl.hpp:853
absl::StatusOr< std::shared_ptr< VideoSource > > BuildVideoSource(const VideoSourceSettings &settings)
Factory helper for constructing the appropriate VideoSource implementation based on the provided sett...
Definition factory.cpp:20
InputTransformMode
Transformation applied to frames prior to processing.
Definition input_transform.hpp:19
Definition background_container.cpp:10
Definition settings.hpp:103
Definition physiology_service.hpp:41
Configuration options for constructing a VideoSource.
Definition settings.hpp:23