From 23c1d6e52e819a824efdcabe635a2d3b13fde250 Mon Sep 17 00:00:00 2001 From: Alan Frindell Date: Sat, 14 Dec 2024 09:21:43 -0800 Subject: [PATCH] Consumers - new moq API Summary: The MoQSession APIs were really just hacks to get things started and haven't kept up with draft changes. This is the beginning of a rewrite. The interfaces in MoQConsumers.h will be both the publish and subscribe side API. The session will return a TrackConsumer object from subscribeOK and a FetchConsumer from fetchOK. The caller will provide a TrackConsumer to subscribe and a FetchConsumer to fetch that will be used as callbacks. The existing read and write paths for non-control data in MoQSession will be replaced. Reviewed By: sharmafb Differential Revision: D66881590 fbshipit-source-id: 5f280c8bff269c7991a0f60ced9b659abb4ff6af --- moxygen/MoQConsumers.h | 261 +++++++++++++++++++++++++++++++++++++++++ moxygen/MoQFramer.h | 10 ++ moxygen/test/Mocks.h | 101 ++++++++++++++++ 3 files changed, 372 insertions(+) create mode 100644 moxygen/MoQConsumers.h diff --git a/moxygen/MoQConsumers.h b/moxygen/MoQConsumers.h new file mode 100644 index 0000000..22836a3 --- /dev/null +++ b/moxygen/MoQConsumers.h @@ -0,0 +1,261 @@ +#pragma once + +#include +#include + +namespace moxygen { + +// MoQ Consumers +// +// These interfaces are used both for writing and reading track data. +// +// A publisher will acquire a consumer object from a session and invoke the +// methods. The implementation "consumes" the data by serializing it on the +// wire. +// +// A subscriber will provide a consumer object to a producer (the session or +// another producer) and that producer will invoke the methods as data becomes +// available. + +struct MoQPublishError { + // Do not add additional codes unless you know what you are doing + enum Code { + API_ERROR = 1, // Semantic error (APIs called out of order) + WRITE_ERROR = 2, // The underlying write failed + CANCELLED = 3, // The subgroup was/should be reset + TOO_FAR_BEHIND = 5, // Subscriber exceeded buffer limit (subscribe only) + BLOCKED = 4, // Consumer cannot accept more data (fetch only), + // or out of stream credit (subscribe and fetch) + }; + // Do not add additional codes unless you know what you are doing + + Code code; + std::string msg; + + explicit MoQPublishError(Code inCode) : code(inCode) {} + + MoQPublishError(Code inCode, std::string inMsg) + : code(inCode), msg(std::move(inMsg)) {} + + std::string describe() const { + return folly::to( + "error=", folly::to_underlying(code), " msg=", msg); + } + + const char* what() const noexcept { + return msg.c_str(); + } +}; + +enum class ObjectPublishStatus { IN_PROGRESS, DONE }; + +// Interface for Publishing and Receiving objects on a subgroup +class SubgroupConsumer { + public: + virtual ~SubgroupConsumer() = default; + + // SubgroupConsumer enforces API semantics. + // + // It’s an error to deliver any object or status if the object ID isn’t + // strictly larger than the last delivered one on this subgroup. + // + // When using beginObject/objectPayload to deliver a streaming object, the + // total number of bytes delivered must equal the value in the length in + // beginObject. + // + // It's an error to begin delivering any object/status or close the + // stream in the middle of a streaming object. + + // Deliver the next object on this subgroup. + virtual folly::Expected + object(uint64_t objectID, Payload payload, bool finSubgroup = false) = 0; + + // Deliver Object Status=ObjectNotExists as the given object. + virtual folly::Expected objectNotExists( + uint64_t objectID, + bool finSubgroup = false) = 0; + + // Advance the reliable offset of the subgroup stream to the + // current offset. + virtual void checkpoint() {} + + // Begin delivering the next object in this subgroup. + virtual folly::Expected + beginObject(uint64_t objectID, uint64_t length, Payload initialPayload) = 0; + + // Deliver the next chunk of data in the current object. The return value is + // IN_PROGRESS if the object is not yet complete, DONE if the payload exactly + // matched the remaining expected size. + virtual folly::Expected objectPayload( + Payload payload, + bool finSubgroup = false) = 0; + + // Deliver Object Status=EndOfGroup for the given object ID. This implies + // endOfSubgroup. + virtual folly::Expected endOfGroup( + uint64_t endOfGroupObjectID) = 0; + + // Deliver Object Status=EndOfTrackAndGroup for the given object ID. This + // implies endOfSubgroup. + virtual folly::Expected endOfTrackAndGroup( + uint64_t endOfTrackObjectID) = 0; + + // Inform the consumer the subgroup is complete. If the consumer is writing, + // this closes the underlying transport stream. This can only be called if + // the publisher knows the entire subgroup has been delivered. + virtual folly::Expected endOfSubgroup() = 0; + + // Inform the consumer that the subgroup terminates with an error. If the + // consumer is writing, this resets the transport stream with the given error + // code. The stream will be reliably delivered up to the last checkpoint(). + virtual void reset(ResetStreamErrorCode error) = 0; +}; + +// Interface for Publishing and Receiving Subscriptions +// +// Note that for now, both the stream interface and datagram interface coexist +// even though the specification disallows mixing and matching those right now. +// +// A note on subscription flow control: +// +// For consumers that are writing, publishing data on a track consumes resources +// which are freed as the subscribers receive and acknowledge data. The +// underlying session will terminate a subscription that exceeds publisher side +// resource limits. +class TrackConsumer { + public: + virtual ~TrackConsumer() = default; + + // Begin delivering a new subgroup in the specified group. If the consumer is + // writing, this Can fail with MoQPublishError::BLOCKED when out of stream + // credit. + virtual folly::Expected, MoQPublishError> + beginSubgroup(uint64_t groupID, uint64_t subgroupID, Priority priority) = 0; + + // Wait for additional stream credit. + virtual folly::Expected, MoQPublishError> + awaitStreamCredit() = 0; + + // Deliver a single-object or object status subgroup. header.length must + // equal payload length, or be 0 for non-NORMAL status. Can fail with + // MoQPublishError::BLOCKED when out of stream credit. + virtual folly::Expected objectStream( + const ObjectHeader& header, + Payload payload) = 0; + + // Deliver a datagram in this track. This can be dropped by the sender or + // receiver if resources are low. + virtual folly::Expected datagram( + const ObjectHeader& header, + Payload payload) = 0; + + // Deliver Object Status=GroupNotExists for the specified group. If the + // consumer is writing, this consumes an entire transport stream. Can fail + // with MoQPublishError::BLOCKED when out of stream credit. + virtual folly::Expected + groupNotExists(uint64_t groupID, uint64_t subgroup, Priority pri) = 0; + + // Inform the consumer that the publisher will not open any new subgroups or + // send any new datagrams for this track. + virtual folly::Expected subscribeDone( + SubscribeDone subDone) = 0; +}; + +// Interface for Publishing and Receiving objects for a Fetch +// +// A note on Fetch flow control: +// +// Fetches are not intended to be consumed in real-time and can apply +// backpressure to the publisher. The API is similar to write(2), and a +// returned MoQPublishError with code=BLOCKED signals the publisher to stop +// invoking methods. +// +// Publishers can use awaitReadyToConsume to determine when it is ok to resume. +// +// When used as a read interface, the application can return BLOCKED to initiate +// backpressure, but the library may still have some already read data that +// will be parsed and additional APIs may be invoked. +// +class FetchConsumer { + public: + virtual ~FetchConsumer() = default; + + // FetchConsumer enforces API semantics. + // + // It’s an error to deliver any object or status if the object ID isn’t + // strictly larger than the last delivered one on this group. It's an + // error to deliver a group smaller than the previously delivered group. + // + // When using beginObject/objectPayload to deliver a streaming object, + // the total number of bytes delivered must equal the value in the length + // in beginObject. + // + // It's an error to begin delivering any object or status, or close in the + // middle of a streaming object. + + // Deliver the next object in this FETCH response. + virtual folly::Expected object( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + Payload payload, + bool finFetch) = 0; + + // Deliver Object Status=ObjectNotExists for the given object. + virtual folly::Expected objectNotExists( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + bool finFetch = false) = 0; + + // Deliver Object Status=ObjectNotExists for the givenobject. + virtual folly::Expected groupNotExists( + uint64_t groupID, + uint64_t subgroupID, + bool finFetch = false) = 0; + + // Advance the reliable offset of the fetch stream to the current offset. + virtual void checkpoint() {} + + // Begin delivering the next object in this subgroup. + virtual folly::Expected beginObject( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + uint64_t length, + Payload initialPayload) = 0; + + virtual folly::Expected objectPayload( + Payload payload, + bool finSubgroup = false) = 0; + + // Deliver Object Status=EndOfGroup for the given object ID. + virtual folly::Expected endOfGroup( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + bool finFetch = false) = 0; + + // Deliver Object Status=EndOfTrackAndGroup for the given object ID. This + // implies endOfFetch. + virtual folly::Expected endOfTrackAndGroup( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID) = 0; + + // Inform the consumer the fetch is complete. If the consumer is writing, + // this closes the underlying transport stream. This can only be called if + // the publisher knows the entire fetch has been delivered. + virtual folly::Expected endOfFetch() = 0; + + // Inform the consumer that the fetch terminates with an error. If the + // consumer is writing, this resets the transport stream with the given error + // code. The stream will be reliably delivered up to the last checkpoint(). + virtual void reset(ResetStreamErrorCode error) = 0; + + // Wait for the fetch to become writable + virtual folly::Expected, MoQPublishError> + awaitReadyToConsume() = 0; +}; + +} // namespace moxygen diff --git a/moxygen/MoQFramer.h b/moxygen/MoQFramer.h index 8b7c980..807f397 100644 --- a/moxygen/MoQFramer.h +++ b/moxygen/MoQFramer.h @@ -24,6 +24,9 @@ const size_t kMaxNamespaceLength = 32; //////// Types //////// +using Payload = std::unique_ptr; +using Priority = uint8_t; + enum class ErrorCode : uint32_t { UNKNOWN = 0, PARSE_ERROR = 1, @@ -78,6 +81,13 @@ enum class FetchErrorCode : uint32_t { TIMEOUT = 4, }; +enum class ResetStreamErrorCode : uint32_t { + INTERNAL_ERROR = 0, + DELIVERY_TIMEOUT = 1, + SESSION_CLOSED = 2, + CANCELLED = 3, // received UNSUBSCRIBE / FETCH_CANCEL / STOP_SENDING +}; + using WriteResult = folly::Expected; enum class FrameType : uint64_t { diff --git a/moxygen/test/Mocks.h b/moxygen/test/Mocks.h index 5b145db..fc0b464 100644 --- a/moxygen/test/Mocks.h +++ b/moxygen/test/Mocks.h @@ -2,6 +2,7 @@ #include #include +#include namespace moxygen { @@ -57,4 +58,104 @@ class MockMoQCodecCallback : public MoQControlCodec::ControlCallback, MOCK_METHOD(void, onGoaway, (Goaway goaway)); MOCK_METHOD(void, onConnectionError, (ErrorCode error)); }; + +class MockTrackConsumer : public TrackConsumer { + public: + MOCK_METHOD( + (folly::Expected, MoQPublishError>), + beginSubgroup, + (uint64_t groupID, uint64_t subgroupID, Priority priority), + (override)); + + MOCK_METHOD( + (folly::Expected, MoQPublishError>), + awaitStreamCredit, + (), + (override)); + + MOCK_METHOD( + (folly::Expected), + objectStream, + (const ObjectHeader& header, Payload payload), + (override)); + + MOCK_METHOD( + (folly::Expected), + groupNotExists, + (uint64_t groupID, uint64_t subgroupID, Priority priority), + (override)); + + MOCK_METHOD( + (folly::Expected), + datagram, + (const ObjectHeader& header, Payload payload), + (override)); + + MOCK_METHOD( + (folly::Expected), + subscribeDone, + (SubscribeDone), + (override)); +}; + +class MockFetchConsumer : public FetchConsumer { + public: + MOCK_METHOD( + (folly::Expected), + object, + (uint64_t, uint64_t, uint64_t, Payload, bool), + (override)); + + MOCK_METHOD( + (folly::Expected), + objectNotExists, + (uint64_t, uint64_t, uint64_t, bool), + (override)); + + MOCK_METHOD(void, checkpoint, (), (override)); + + MOCK_METHOD( + (folly::Expected), + beginObject, + (uint64_t, uint64_t, uint64_t, uint64_t, Payload), + (override)); + + MOCK_METHOD( + (folly::Expected), + objectPayload, + (Payload, bool), + (override)); + + MOCK_METHOD( + (folly::Expected), + groupNotExists, + (uint64_t groupID, uint64_t subgroupID, bool finFetch), + (override)); + + MOCK_METHOD( + (folly::Expected), + endOfGroup, + (uint64_t, uint64_t, uint64_t, bool), + (override)); + + MOCK_METHOD( + (folly::Expected), + endOfTrackAndGroup, + (uint64_t, uint64_t, uint64_t), + (override)); + + MOCK_METHOD( + (folly::Expected), + endOfFetch, + (), + (override)); + + MOCK_METHOD(void, reset, (ResetStreamErrorCode), (override)); + + MOCK_METHOD( + (folly::Expected, MoQPublishError>), + awaitReadyToConsume, + (), + (override)); +}; } // namespace moxygen