24#include <mediapipe/framework/output_stream_poller.h>
25#include <physiology/modules/messages/status.h>
26#include <google/protobuf/message.h>
29namespace presage::smartspectra::container::packet_helpers {
33template <typename T, typename = std::enable_if_t<std::is_base_of<google::protobuf::Message, T>::value>>
34std::ostream& operator<<(std::ostream& os,
const T& proto) {
35 return os << proto.DebugString();
39template <
typename A,
typename B>
40inline std::ostream& operator<<(std::ostream& os,
const std::pair<A, B>& p) {
41 os <<
"(" << p.first <<
", " << p.second <<
")";
46template<
typename TPacketContentsType,
typename TReportPredicate,
bool TPr
intTimestamp = false>
47inline absl::Status GetPacketContentsIfAny(
48 TPacketContentsType& contents,
49 bool& nonempty_packet_received,
50 mediapipe::OutputStreamPoller& poller,
51 const char* stream_name,
52 mediapipe::Timestamp& timestamp,
53 TReportPredicate&& report_if
55 nonempty_packet_received =
false;
56 if (poller.QueueSize() > 0) {
57 mediapipe::Packet packet;
58 if (!poller.Next(&packet)) {
59 return absl::UnknownError(
60 "Failed to get packet from output stream " + std::string(stream_name) +
".");
62 if (!packet.IsEmpty()) {
63 nonempty_packet_received =
true;
64 contents = packet.Get<TPacketContentsType>();
65 std::string extra_information =
"";
66 timestamp = packet.Timestamp();
67 if (TPrintTimestamp) {
68 extra_information =
" (timestamp: " + std::to_string(timestamp.Value()) +
")";
71 LOG(INFO) <<
"Got " + std::string(stream_name) +
" packet: " << contents << extra_information;
76 return absl::OkStatus();
80template<
typename TPacketContentsType,
typename TReportPredicate,
bool TPr
intTimestamp = false>
81inline absl::Status GetPacketContentsIfAny(
82 TPacketContentsType& contents,
83 bool& nonempty_packet_received,
84 mediapipe::OutputStreamPoller& poller,
85 const char* stream_name,
86 TReportPredicate&& report_if
88 mediapipe::Timestamp timestamp;
89 return GetPacketContentsIfAny<TPacketContentsType, TReportPredicate, TPrintTimestamp>(
90 contents, nonempty_packet_received, poller, stream_name, timestamp, std::forward<TReportPredicate>(report_if)
95template<
typename TPacketContentsType>
96inline absl::Status GetPacketContentsIfAny(
97 TPacketContentsType& contents,
98 bool& nonempty_packet_received,
99 mediapipe::OutputStreamPoller& poller,
100 const char* string_name,
101 mediapipe::Timestamp& timestamp,
102 bool report_on_package_retrieval
104 return GetPacketContentsIfAny(
105 contents, nonempty_packet_received, poller, string_name, timestamp, [&report_on_package_retrieval]() {
106 return report_on_package_retrieval;
112template<
typename TPacketContentsType>
113inline absl::Status GetPacketContentsIfAny(
114 TPacketContentsType& contents,
115 bool& nonempty_packet_received,
116 mediapipe::OutputStreamPoller& poller,
117 const char* string_name,
118 bool report_on_package_retrieval
120 mediapipe::Timestamp timestamp;
121 return GetPacketContentsIfAny(
122 contents, nonempty_packet_received, poller, string_name, timestamp, [&report_on_package_retrieval]() {
123 return report_on_package_retrieval;