72 if (!this->initialized) {
73 return absl::FailedPreconditionError(
"Container not initialized.");
76 this->operation_context.Reset();
79 MP_RETURN_IF_ERROR(CheckCallbackNotNull(
"OnStatusChange", this->OnStatusChange));
80 MP_RETURN_IF_ERROR(CheckCallbackNotNull(
"OnStatusCode", this->OnStatusCode));
81 MP_RETURN_IF_ERROR(this->graph.ObserveOutputStream(
82 pe::graph::output_streams::kStatusCode,
83 [
this](
const mediapipe::Packet& status_packet) -> absl::Status {
84 if (!status_packet.IsEmpty()) {
85 physiology::StatusValue status = status_packet.Get<physiology::StatusValue>();
88 MP_RETURN_IF_ERROR(this->OnStatusCode(status));
91 if (status.value() != this->previous_status_code) {
92 this->previous_status_code = status.value();
93 return this->OnStatusChange(status);
96 return absl::OkStatus();
101 MP_RETURN_IF_ERROR(CheckCallbackNotNull(
"OnCoreMetricsOutput", this->OnCoreMetricsOutput));
102 MP_RETURN_IF_ERROR(this->graph.ObserveOutputStream(
103 physiology::edge::graph::output_streams::kMetricsBuffer,
104 [
this](
const mediapipe::Packet& output_packet) -> absl::Status {
105 if (!output_packet.IsEmpty()) {
106 auto metrics_buffer = output_packet.Get<physiology::MetricsBuffer>();
107 auto timestamp = output_packet.Timestamp();
108 MP_RETURN_IF_ERROR(this->ComputeCorePerformanceTelemetry(metrics_buffer));
109 return this->OnCoreMetricsOutput(metrics_buffer, timestamp.Value());
111 return absl::OkStatus();
118 if (TOperationMode == settings::OperationMode::Continuous) {
119 if (this->settings.enable_edge_metrics) {
120 MP_RETURN_IF_ERROR(CheckCallbackNotNull(
"OnEdgeMetricsOutput", this->OnEdgeMetricsOutput));
121 MP_RETURN_IF_ERROR(this->graph.ObserveOutputStream(
122 physiology::edge::graph::output_streams::kEdgeMetrics,
123 [
this](
const mediapipe::Packet& output_packet) {
124 if (!output_packet.IsEmpty()) {
125 auto metrics = output_packet.Get<physiology::Metrics>();
126 auto timestamp = output_packet.Timestamp();
127 return this->OnEdgeMetricsOutput(metrics, timestamp.Value());
129 return absl::OkStatus();
135 MP_RETURN_IF_ERROR(CheckCallbackNotNull(
"OnVideoOutput", this->OnVideoOutput));
136 MP_RETURN_IF_ERROR(this->graph.ObserveOutputStream(
137 physiology::edge::graph::output_streams::kOutputVideo,
138 [
this](
const mediapipe::Packet& output_video_packet) -> absl::Status {
139 if (!output_video_packet.IsEmpty()) {
140 cv::Mat output_frame_rgb;
141 MP_RETURN_IF_ERROR(it::GetFrameFromPacket<TDeviceType>(output_frame_rgb,
142 this->device_context,
143 output_video_packet));
145 cv::cvtColor(output_frame_rgb, this->output_frame_bgr, cv::COLOR_RGB2BGR);
146 auto timestamp = output_video_packet.Timestamp();
147 return this->OnVideoOutput(this->output_frame_bgr, timestamp.Value());
149 return absl::OkStatus();
153 MP_RETURN_IF_ERROR(CheckCallbackNotNull(
"OnFrameSentThrough", this->OnFrameSentThrough));
154 MP_RETURN_IF_ERROR(this->graph.ObserveOutputStream(
155 pe::graph::output_streams::kFrameSentThrough,
156 [
this](
const mediapipe::Packet& output_packet) {
157 if (!output_packet.IsEmpty()) {
158 bool frame_sent_through = output_packet.Get<bool>();
159 auto timestamp = output_packet.Timestamp();
160 return this->OnFrameSentThrough(frame_sent_through, timestamp.Value());
162 return absl::OkStatus();
166 MP_RETURN_IF_ERROR(this->graph.StartRun({}));
167 MP_RETURN_IF_ERROR(this->graph.WaitUntilIdle());
168 this->running =
true;
169 return absl::OkStatus();
210 if (!this->initialized) {
211 return absl::FailedPreconditionError(
"Container not initialized.");
213 if (!this->running) {
214 return absl::FailedPreconditionError(
"Graph not started.");
217 auto input_frame = absl::make_unique<mediapipe::ImageFrame>(
218 mediapipe::ImageFormat::SRGB, frame_rgb.cols, frame_rgb.rows,
219 mediapipe::ImageFrame::kDefaultAlignmentBoundary
221 cv::Mat input_frame_mat = mediapipe::formats::MatView(input_frame.get());
223 frame_rgb.copyTo(input_frame_mat);
225 auto frame_timestamp = mediapipe::Timestamp(frame_timestamp_μs);
230 .AddPacketToInputStream(
231 pe::graph::input_streams::kRecording,
232 mediapipe::MakePacket<bool>(this->recording).At(frame_timestamp)
237 it::FeedFrameToGraph(std::move(input_frame), this->graph, this->device_context, frame_timestamp_μs,
238 pe::graph::input_streams::kInputVideo)
240 return absl::OkStatus();