From 815a29d82d37a8435f97da35e879bf8a2038c50b Mon Sep 17 00:00:00 2001 From: Alan Frindell Date: Mon, 16 Dec 2024 13:42:18 -0800 Subject: [PATCH] ObjectReceiver callback that inserts objects into a queue Summary: This is an adapter that is close to the old MoQSession read interface. Reviewed By: sharmafb Differential Revision: D67245323 fbshipit-source-id: 5eff879d005e2883f95250e3f3aa73777d99efbf --- moxygen/QueueCallback.h | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 moxygen/QueueCallback.h diff --git a/moxygen/QueueCallback.h b/moxygen/QueueCallback.h new file mode 100644 index 0000000..3549006 --- /dev/null +++ b/moxygen/QueueCallback.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include + +namespace moxygen { + +class QueueCallback : public ObjectReceiverCallback { + public: + struct Object { + moxygen::ObjectHeader header; + moxygen::Payload payload; + }; + folly::coro::UnboundedQueue> queue; + + FlowControlState onObject(const ObjectHeader& objHeader, Payload payload) + override { + queue.enqueue(Object({objHeader, std::move(payload)})); + return FlowControlState::UNBLOCKED; + } + void onObjectStatus(const ObjectHeader& hdr) override { + queue.enqueue(Object({hdr, nullptr})); + } + void onEndOfStream() override { + queue.enqueue(folly::makeUnexpected(folly::unit)); + } + void onError(ResetStreamErrorCode) override { + queue.enqueue(folly::makeUnexpected(folly::unit)); + } + void onSubscribeDone(SubscribeDone) override { + queue.enqueue(folly::makeUnexpected(folly::unit)); + } +}; +} // namespace moxygen