14#include <grpcpp/grpcpp.h>
15#include <grpcpp/impl/codegen/server_callback.h>
17#include "queued_server_write_reactor.hpp"
19namespace presage::smartspectra::grpc_bindings {
21template <
typename TMessage>
22QueuedServerWriteReactor<TMessage>::QueuedServerWriteReactor() =
default;
24template <
typename TMessage>
25QueuedServerWriteReactor<TMessage>::~QueuedServerWriteReactor() =
default;
27template <
typename TMessage>
28void QueuedServerWriteReactor<TMessage>::EnqueueWrite(
const TMessage& message) {
29 std::lock_guard<std::mutex> lock(write_mutex_);
30 auto message_pointer = std::make_shared<TMessage>();
31 message_pointer->CopyFrom(message);
32 pending_writes.push(message_pointer);
36template <
typename TMessage>
37void QueuedServerWriteReactor<TMessage>::OnWriteDone(
bool ok) {
39 std::lock_guard<std::mutex> lock(write_mutex_);
40 write_in_progress =
false;
46 this->Finish(grpc::Status(grpc::StatusCode::UNKNOWN,
"Unexpected Failure writing message."));
50template <
typename TMessage>
51void QueuedServerWriteReactor<TMessage>::ProcessWriteQueue() {
52 if (!write_in_progress && !pending_writes.empty()) {
53 auto message_ptr = pending_writes.front();
55 write_in_progress =
true;
56 this->StartWrite(message_ptr.get());