From b21fed2355af9afd33527df02a41e5dd0bfa1115 Mon Sep 17 00:00:00 2001 From: Alan Frindell Date: Thu, 19 Dec 2024 08:58:44 -0800 Subject: [PATCH] Use Consumer interface for MoQSession publishing (#12) Summary: This is a major rewrite of MoQSession using the new Consumers interfaces. It's separated into the writes (this diff) and reads (next diff). Note the relay -- which does writes and reads -- is slightly broken in this diff but fixed in the next, so don't read too much into it. The previous "publish" API required the session to maintain a huge map of every currently open stream across the session and perform lookups into this map in order to do the writes. It also had a number API error cases that could be eliminated by constraining the interface. Now subscribeOK and fetchOK return a Consumer object which the publisher will use to pass track data according to those APIs. No maps are required -- the publisher hangs onto the handle(s) it needs to publish. MoQForwarder also had a major rewrite. It conveniently now implements the TrackConsumer interface as well, so a publisher can trivially publish to one or N subscribers. Differential Revision: D66881597 --- moxygen/MoQSession.cpp | 1168 ++++++++++++----- moxygen/MoQSession.h | 166 ++- moxygen/relay/MoQForwarder.h | 538 ++++++-- moxygen/relay/MoQRelay.cpp | 56 +- moxygen/samples/chat/MoQChatClient.cpp | 33 +- moxygen/samples/chat/MoQChatClient.h | 1 + moxygen/samples/date/MoQDateServer.cpp | 274 ++-- .../MoQFlvStreamerClient.cpp | 138 +- moxygen/samples/text-client/MoQTextClient.cpp | 2 +- moxygen/test/MoQSessionTest.cpp | 136 +- 10 files changed, 1628 insertions(+), 884 deletions(-) diff --git a/moxygen/MoQSession.cpp b/moxygen/MoQSession.cpp index 0777674..8d391e2 100644 --- a/moxygen/MoQSession.cpp +++ b/moxygen/MoQSession.cpp @@ -12,30 +12,742 @@ #include namespace { +using namespace moxygen; constexpr std::chrono::seconds kSetupTimeout(5); + +constexpr uint32_t IdMask = 0x1FFFFF; +uint64_t groupOrder(GroupOrder groupOrder, uint64_t group) { + uint32_t truncGroup = static_cast(group) & IdMask; + return groupOrder == GroupOrder::OldestFirst ? truncGroup + : (IdMask - truncGroup); } +uint32_t objOrder(uint64_t objId) { + return static_cast(objId) & IdMask; +} + +uint64_t order( + uint64_t group, + uint64_t object, + uint8_t priority, + uint8_t pubPri, + GroupOrder pubGroupOrder) { + // 6 reserved bits | 58 bit order + // 6 reserved | 8 sub pri | 8 pub pri | 21 group order | 21 obj order + return ( + (uint64_t(pubPri) << 50) | (uint64_t(priority) << 42) | + (groupOrder(pubGroupOrder, group) << 21) | objOrder(object)); +} + +// Helper classes for publishing + +// StreamPublisherImpl is for publishing to a single stream, either a Subgroup +// or a Fetch response. It's of course illegal to mix-and-match the APIs, but +// the object is only handed to the application as either a SubgroupConsumer +// or a FetchConsumer +class StreamPublisherImpl : public SubgroupConsumer, + public FetchConsumer, + public folly::CancellationCallback { + public: + StreamPublisherImpl() = delete; + + // Fetch constructor + StreamPublisherImpl( + MoQSession::PublisherImpl* publisher, + proxygen::WebTransport::StreamWriteHandle* writeHandle); + + // Subscribe constructor + StreamPublisherImpl( + MoQSession::PublisherImpl* publisher, + proxygen::WebTransport::StreamWriteHandle* writeHandle, + TrackAlias alias, + uint64_t groupID, + uint64_t subgroupID); + + proxygen::WebTransport::StreamWriteHandle* getWriteHandle() const { + return writeHandle_; + } + + // SubgroupConsumer overrides + folly::Expected + object(uint64_t objectID, Payload payload, bool finSubgroup = false) override; + folly::Expected objectNotExists( + uint64_t objectID, + bool finSubgroup = false) override; + folly::Expected beginObject( + uint64_t objectId, + uint64_t length, + Payload initialPayload) override; + folly::Expected objectPayload( + Payload payload, + bool finSubgroup = false) override; + folly::Expected endOfGroup( + uint64_t endOfGroupObjectId) override; + folly::Expected endOfTrackAndGroup( + uint64_t endOfTrackObjectId) override; + folly::Expected endOfSubgroup() override; + void reset(ResetStreamErrorCode error) override; + + // FetchConsumer overrides + folly::Expected object( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + Payload payload, + bool finFetch) override { + if (!setGroupAndSubgroup(groupID, subgroupID)) { + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::API_ERROR, "Group moved back")); + } + header_.status = ObjectStatus::NORMAL; + return object(objectID, std::move(payload), finFetch); + } + + folly::Expected objectNotExists( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + bool finFetch = false) override { + if (!setGroupAndSubgroup(groupID, subgroupID)) { + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::API_ERROR, "Group moved back")); + } + return objectNotExists(objectID, finFetch); + } + + folly::Expected groupNotExists( + uint64_t groupID, + uint64_t subgroupID, + bool finFetch) override { + if (!setGroupAndSubgroup(groupID, subgroupID)) { + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::API_ERROR, "Group moved back")); + } + return publishStatus(0, ObjectStatus::GROUP_NOT_EXIST, finFetch); + } + + folly::Expected beginObject( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + uint64_t length, + Payload initialPayload) override { + if (!setGroupAndSubgroup(groupID, subgroupID)) { + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::API_ERROR, "Group moved back")); + } + header_.status = ObjectStatus::NORMAL; + return beginObject(objectID, length, std::move(initialPayload)); + } + + folly::Expected endOfGroup( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + bool finFetch) override { + if (!setGroupAndSubgroup(groupID, subgroupID)) { + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::API_ERROR, "Group moved back")); + } + return publishStatus(objectID, ObjectStatus::END_OF_GROUP, finFetch); + } + folly::Expected endOfTrackAndGroup( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID) override { + if (!setGroupAndSubgroup(groupID, subgroupID)) { + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::API_ERROR, "Group moved back")); + } + return endOfTrackAndGroup(objectID); + } + folly::Expected endOfFetch() override { + if (!writeHandle_) { + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::CANCELLED, "Fetch cancelled")); + } + return endOfSubgroup(); + } + + folly::Expected, MoQPublishError> + awaitReadyToConsume() override; + + folly::Expected + publishStatus(uint64_t objectID, ObjectStatus status, bool finSubgroup); + + private: + bool setGroupAndSubgroup(uint64_t groupID, uint64_t subgroupID) { + if (groupID < header_.group) { + return false; + } else if (groupID > header_.group) { + // Fetch group advanced, reset expected object + header_.id = std::numeric_limits::max(); + } + header_.group = groupID; + header_.subgroup = subgroupID; + return true; + } + + folly::Expected validatePublish( + uint64_t objectID); + folly::Expected + validateObjectPublishAndUpdateState(folly::IOBuf* payload, bool finSubgroup); + folly::Expected writeCurrentObject( + uint64_t objectID, + uint64_t length, + Payload payload, + bool finSubgroup); + folly::Expected writeToStream(bool finSubgroup); + + void onStreamComplete(); + + MoQSession::PublisherImpl* publisher_{nullptr}; + proxygen::WebTransport::StreamWriteHandle* writeHandle_{nullptr}; + ObjectHeader header_; + folly::Optional currentLengthRemaining_; + folly::IOBufQueue writeBuf_{folly::IOBufQueue::cacheChainLength()}; +}; + +class TrackPublisherImpl : public MoQSession::PublisherImpl, + public TrackConsumer { + public: + TrackPublisherImpl() = delete; + TrackPublisherImpl( + MoQSession* session, + SubscribeID subscribeID, + TrackAlias trackAlias, + Priority priority, + GroupOrder groupOrder) + : PublisherImpl(session, subscribeID, priority, groupOrder), + trackAlias_(trackAlias) {} + + // PublisherImpl overrides + void onStreamComplete(const ObjectHeader& finalHeader) override; + + void reset(ResetStreamErrorCode) override { + // TBD: reset all subgroups_? Currently called from cleanup() + } + + // TrackConsumer overrides + folly::Expected, MoQPublishError> + beginSubgroup(uint64_t groupID, uint64_t subgroupID, Priority priority) + override; + + folly::Expected, MoQPublishError> + awaitStreamCredit() override; + + folly::Expected objectStream( + const ObjectHeader& header, + Payload payload) override; + + folly::Expected + groupNotExists(uint64_t groupID, uint64_t subgroup, Priority pri) override; + + folly::Expected datagram( + const ObjectHeader& header, + Payload payload) override; + + folly::Expected subscribeDone( + SubscribeDone subDone) override; + + private: + TrackAlias trackAlias_; + folly::F14FastMap< + std::pair, + std::shared_ptr> + subgroups_; +}; + +class FetchPublisherImpl : public MoQSession::PublisherImpl { + public: + FetchPublisherImpl( + MoQSession* session, + SubscribeID subscribeID, + Priority priority, + GroupOrder groupOrder) + : PublisherImpl(session, subscribeID, priority, groupOrder) {} + + folly::Expected, MoQPublishError> beginFetch( + GroupOrder groupOrder); + + void reset(ResetStreamErrorCode error) override { + if (streamPublisher_) { + streamPublisher_->reset(error); + } + } + + void onStreamComplete(const ObjectHeader&) override { + streamPublisher_.reset(); + PublisherImpl::fetchComplete(); + } + + private: + std::shared_ptr streamPublisher_; +}; + +// TrackPublisherImpl + +folly::Expected, MoQPublishError> +TrackPublisherImpl::beginSubgroup( + uint64_t groupID, + uint64_t subgroupID, + Priority priority) { + auto wt = getWebTransport(); + if (!wt) { + XLOG(ERR) << "Trying to publish after subscribeDone"; + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Publish after subscribeDone")); + } + auto stream = wt->createUniStream(); + if (!stream) { + // failed to create a stream + // TODO: can it fail for non-stream credit reasons? Session closing should + // be handled above. + XLOG(ERR) << "Failed to create uni stream tp=" << this; + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::BLOCKED, "Failed to create uni stream.")); + } + XLOG(DBG4) << "New stream created, id: " << stream.value()->getID() + << " tp=" << this; + stream.value()->setPriority( + 1, order(groupID, subgroupID, priority, priority_, groupOrder_), false); + auto subgroupPublisher = std::make_shared( + this, *stream, trackAlias_, groupID, subgroupID); + // TODO: these are currently unused, but the intent might be to reset + // open subgroups automatically from some path? + subgroups_[{groupID, subgroupID}] = subgroupPublisher; + return subgroupPublisher; +} + +folly::Expected, MoQPublishError> +TrackPublisherImpl::awaitStreamCredit() { + auto wt = getWebTransport(); + if (!wt) { + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "awaitStreamCredit after subscribeDone")); + } + return wt->awaitUniStreamCredit(); +} + +void TrackPublisherImpl::onStreamComplete(const ObjectHeader& finalHeader) { + subgroups_.erase({finalHeader.group, finalHeader.subgroup}); +} + +folly::Expected TrackPublisherImpl::objectStream( + const ObjectHeader& objHeader, + Payload payload) { + XCHECK(objHeader.status == ObjectStatus::NORMAL || !payload); + auto subgroup = + beginSubgroup(objHeader.group, objHeader.subgroup, objHeader.priority); + if (subgroup.hasError()) { + return folly::makeUnexpected(std::move(subgroup.error())); + } + switch (objHeader.status) { + case ObjectStatus::NORMAL: + return subgroup.value()->object(objHeader.id, std::move(payload), true); + case ObjectStatus::OBJECT_NOT_EXIST: + return subgroup.value()->objectNotExists(objHeader.id, true); + case ObjectStatus::GROUP_NOT_EXIST: { + auto& subgroupPublisherImpl = + static_cast(*subgroup.value()); + return subgroupPublisherImpl.publishStatus( + objHeader.id, objHeader.status, true); + } + case ObjectStatus::END_OF_GROUP: + return subgroup.value()->endOfGroup(objHeader.id); + case ObjectStatus::END_OF_TRACK_AND_GROUP: + return subgroup.value()->endOfTrackAndGroup(objHeader.id); + case ObjectStatus::END_OF_SUBGROUP: + return subgroup.value()->endOfSubgroup(); + } +} + +folly::Expected +TrackPublisherImpl::groupNotExists( + uint64_t groupID, + uint64_t subgroupID, + Priority priority) { + return objectStream( + {trackAlias_, + groupID, + subgroupID, + 0, + priority, + ForwardPreference::Subgroup, + ObjectStatus::GROUP_NOT_EXIST, + 0}, + nullptr); +} + +folly::Expected TrackPublisherImpl::datagram( + const ObjectHeader& header, + Payload payload) { + XCHECK(header.forwardPreference == ForwardPreference::Datagram); + auto wt = getWebTransport(); + if (!wt) { + XLOG(ERR) << "Trying to publish after subscribeDone"; + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Publish after subscribeDone")); + } + folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; + XCHECK(header.length); + (void)writeObject( + writeBuf, + ObjectHeader{ + trackAlias_, + header.group, + header.id, + header.id, + header.priority, + header.forwardPreference, + header.status, + *header.length}, + std::move(payload)); + // TODO: set priority when WT has an API for that + auto res = wt->sendDatagram(writeBuf.move()); + if (res.hasError()) { + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::WRITE_ERROR, "sendDatagram failed")); + } + return folly::unit; +} + +folly::Expected TrackPublisherImpl::subscribeDone( + SubscribeDone subDone) { + subDone.subscribeID = subscribeID_; + return PublisherImpl::subscribeDone(std::move(subDone)); +} + +// FetchPublisherImpl + +folly::Expected, MoQPublishError> +FetchPublisherImpl::beginFetch(GroupOrder groupOrder) { + auto wt = getWebTransport(); + if (!wt) { + XLOG(ERR) << "Trying to publish after fetchCancel"; + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Publish after fetchCancel")); + } + + auto stream = wt->createUniStream(); + if (!stream) { + // failed to create a stream + XLOG(ERR) << "Failed to create uni stream tp=" << this; + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::BLOCKED, "Failed to create uni stream.")); + } + XLOG(DBG4) << "New stream created, id: " << stream.value()->getID() + << " tp=" << this; + setGroupOrder(groupOrder); + stream.value()->setPriority(1, order(0, 0, 0, priority_, groupOrder_), false); + streamPublisher_ = std::make_shared(this, *stream); + return streamPublisher_; +} + +// StreamPublisherImpl + +StreamPublisherImpl::StreamPublisherImpl( + MoQSession::PublisherImpl* publisher, + proxygen::WebTransport::StreamWriteHandle* writeHandle) + : CancellationCallback( + writeHandle->getCancelToken(), + [this] { + if (writeHandle_) { + auto code = writeHandle_->stopSendingErrorCode(); + XLOG(DBG1) << "Peer requested write termination code=" + << (code ? folly::to(*code) + : std::string("none")); + reset(ResetStreamErrorCode::CANCELLED); + } + }), + publisher_(publisher), + writeHandle_(writeHandle), + header_{ + publisher->subscribeID(), + 0, + 0, + std::numeric_limits::max(), + 0, + ForwardPreference::Fetch, + ObjectStatus::NORMAL, + folly::none} { + (void)writeStreamHeader(writeBuf_, header_); +} + +StreamPublisherImpl::StreamPublisherImpl( + MoQSession::PublisherImpl* publisher, + proxygen::WebTransport::StreamWriteHandle* writeHandle, + TrackAlias alias, + uint64_t groupID, + uint64_t subgroupID) + : StreamPublisherImpl(publisher, writeHandle) { + writeBuf_.move(); + header_.trackIdentifier = alias; + header_.forwardPreference = ForwardPreference::Subgroup; + setGroupAndSubgroup(groupID, subgroupID); + (void)writeStreamHeader(writeBuf_, header_); +} + +// Private methods + +void StreamPublisherImpl::onStreamComplete() { + XCHECK_EQ(writeHandle_, nullptr); + publisher_->onStreamComplete(header_); +} + +folly::Expected +StreamPublisherImpl::validatePublish(uint64_t objectID) { + if (currentLengthRemaining_) { + XLOG(ERR) << "Still publishing previous object sgp=" << this; + reset(ResetStreamErrorCode::INTERNAL_ERROR); + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Previous object incomplete")); + } + if (header_.id != std::numeric_limits::max() && + objectID <= header_.id) { + XLOG(ERR) << "Object ID not advancing header_.id=" << header_.id + << " objectID=" << objectID << " sgp=" << this; + reset(ResetStreamErrorCode::INTERNAL_ERROR); + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Object ID not advancing in subgroup")); + } + if (!writeHandle_) { + XLOG(ERR) << "Write after subgroup complete sgp=" << this; + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::API_ERROR, "Subgroup reset")); + } + return folly::unit; +} + +folly::Expected +StreamPublisherImpl::writeCurrentObject( + uint64_t objectID, + uint64_t length, + Payload payload, + bool finSubgroup) { + header_.id = objectID; + header_.length = length; + (void)writeObject(writeBuf_, header_, std::move(payload)); + return writeToStream(finSubgroup); +} + +folly::Expected +StreamPublisherImpl::writeToStream(bool finSubgroup) { + auto writeHandle = writeHandle_; + if (finSubgroup) { + writeHandle_ = nullptr; + } + auto writeRes = writeHandle->writeStreamData(writeBuf_.move(), finSubgroup); + if (writeRes.hasValue()) { + if (finSubgroup) { + onStreamComplete(); + } + return folly::unit; + } + XLOG(ERR) << "write error=" << uint64_t(writeRes.error()); + reset(ResetStreamErrorCode::INTERNAL_ERROR); + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::WRITE_ERROR, "write error")); +} + +folly::Expected +StreamPublisherImpl::validateObjectPublishAndUpdateState( + folly::IOBuf* payload, + bool finSubgroup) { + auto length = payload ? payload->computeChainDataLength() : 0; + if (!currentLengthRemaining_) { + XLOG(ERR) << "Not publishing object sgp=" << this; + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::API_ERROR, "Not publishing object")); + } + if (length > *currentLengthRemaining_) { + XLOG(ERR) << "Length=" << length + << " exceeds remaining=" << *currentLengthRemaining_ + << " sgp=" << this; + reset(ResetStreamErrorCode::INTERNAL_ERROR); + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Length exceeds remaining in object")); + } + *currentLengthRemaining_ -= length; + if (*currentLengthRemaining_ == 0) { + currentLengthRemaining_.reset(); + return ObjectPublishStatus::DONE; + } else if (finSubgroup) { + XLOG(ERR) << "finSubgroup with length remaining=" + << *currentLengthRemaining_ << " sgp=" << this; + reset(ResetStreamErrorCode::INTERNAL_ERROR); + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "finSubgroup with open object")); + } + return ObjectPublishStatus::IN_PROGRESS; +} + +// Interface Methods + +folly::Expected StreamPublisherImpl::object( + uint64_t objectID, + Payload payload, + bool finSubgroup) { + auto validateRes = validatePublish(objectID); + if (!validateRes) { + return validateRes; + } + auto length = payload ? payload->computeChainDataLength() : 0; + return writeCurrentObject(objectID, length, std::move(payload), finSubgroup); +} + +folly::Expected +StreamPublisherImpl::objectNotExists(uint64_t objectID, bool finSubgroup) { + return publishStatus(objectID, ObjectStatus::OBJECT_NOT_EXIST, finSubgroup); +} + +folly::Expected StreamPublisherImpl::beginObject( + uint64_t objectID, + uint64_t length, + Payload initialPayload) { + auto validateRes = validatePublish(objectID); + if (!validateRes) { + return validateRes; + } + currentLengthRemaining_ = length; + auto validateObjectPublishRes = validateObjectPublishAndUpdateState( + initialPayload.get(), + /*finSubgroup=*/false); + if (!validateObjectPublishRes) { + return folly::makeUnexpected(validateObjectPublishRes.error()); + } + return writeCurrentObject( + objectID, length, std::move(initialPayload), /*finSubgroup=*/false); +} + +folly::Expected +StreamPublisherImpl::objectPayload(Payload payload, bool finSubgroup) { + auto validateObjectPublishRes = + validateObjectPublishAndUpdateState(payload.get(), finSubgroup); + if (!validateObjectPublishRes) { + return validateObjectPublishRes; + } + writeBuf_.append(std::move(payload)); + auto writeRes = writeToStream(finSubgroup); + if (writeRes.hasValue()) { + return validateObjectPublishRes.value(); + } else { + return folly::makeUnexpected(writeRes.error()); + } +} + +folly::Expected StreamPublisherImpl::endOfGroup( + uint64_t endOfGroupObjectId) { + return publishStatus(endOfGroupObjectId, ObjectStatus::END_OF_GROUP, true); +} + +folly::Expected +StreamPublisherImpl::endOfTrackAndGroup(uint64_t endOfTrackObjectId) { + return publishStatus( + endOfTrackObjectId, ObjectStatus::END_OF_TRACK_AND_GROUP, true); +} + +folly::Expected +StreamPublisherImpl::publishStatus( + uint64_t objectID, + ObjectStatus status, + bool finSubgroup) { + auto validateRes = validatePublish(objectID); + if (!validateRes) { + return validateRes; + } + header_.status = status; + return writeCurrentObject(objectID, 0, nullptr, finSubgroup); +} + +folly::Expected +StreamPublisherImpl::endOfSubgroup() { + if (currentLengthRemaining_) { + XLOG(ERR) << "Still publishing previous object sgp=" << this; + reset(ResetStreamErrorCode::INTERNAL_ERROR); + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Previous object incomplete")); + } + if (!writeBuf_.empty()) { + XLOG(WARN) << "No objects published on subgroup=" << header_; + } + return writeToStream(true); +} + +void StreamPublisherImpl::reset(ResetStreamErrorCode error) { + if (!writeBuf_.empty()) { + // TODO: stream header is pending, reliable reset? + XLOG(WARN) << "Stream header pending on subgroup=" << header_; + } + if (writeHandle_) { + auto writeHandle = writeHandle_; + writeHandle_ = nullptr; + writeHandle->resetStream(uint32_t(error)); + } else { + // Can happen on STOP_SENDING + XLOG(ERR) << "reset with no write handle: sgp=" << this; + } + onStreamComplete(); +} + +folly::Expected, MoQPublishError> +StreamPublisherImpl::awaitReadyToConsume() { + if (!writeHandle_) { + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::CANCELLED, "Fetch cancelled")); + } + auto writableFuture = writeHandle_->awaitWritable(); + if (!writableFuture) { + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::WRITE_ERROR, "awaitWritable failed")); + } + return std::move(writableFuture.value()); +} + +} // namespace + namespace moxygen { +using folly::coro::co_awaitTry; +using folly::coro::co_error; + MoQSession::~MoQSession() { - XLOG(DBG1) << "requestCancellation from dtor sess=" << this; - cancellationSource_.requestCancellation(); + cleanup(); +} + +void MoQSession::cleanup() { + // TODO: Are these loops safe since they may (should?) delete elements + for (auto& pubTrack : pubTracks_) { + pubTrack.second->reset(ResetStreamErrorCode::SESSION_CLOSED); + } + pubTracks_.clear(); for (auto& subTrack : subTracks_) { subTrack.second->subscribeError( {/*TrackHandle fills in subId*/ 0, 500, "session closed", folly::none}); } + subTracks_.clear(); for (auto& fetch : fetches_) { + // TODO: there needs to be a way to queue an error in TrackHandle, both + // from here, when close races the FETCH stream, and from readLoop + // where we get a reset. fetch.second->fetchError( {/*TrackHandle fills in subId*/ 0, 500, "session closed"}); } + fetches_.clear(); for (auto& pendingAnn : pendingAnnounce_) { pendingAnn.second.setValue(folly::makeUnexpected( AnnounceError({pendingAnn.first, 500, "session closed"}))); } + pendingAnnounce_.clear(); for (auto& pendingSn : pendingSubscribeAnnounces_) { pendingSn.second.setValue(folly::makeUnexpected( SubscribeAnnouncesError({pendingSn.first, 500, "session closed"}))); } + pendingSubscribeAnnounces_.clear(); + if (!cancellationSource_.isCancellationRequested()) { + XLOG(DBG1) << "requestCancellation from cleanup sess=" << this; + cancellationSource_.requestCancellation(); + } } void MoQSession::start() { @@ -86,23 +798,7 @@ void MoQSession::close(folly::Optional error) { auto wt = wt_; wt_ = nullptr; - for (auto& subTrack : subTracks_) { - subTrack.second->subscribeError( - {/*TrackHandle fills in subId*/ 0, - 500, - "session closed", - folly::none}); - } - subTracks_.clear(); - - for (auto& fetch : fetches_) { - fetch.second->fetchError( - {/*TrackHandle fillsin subId*/ 0, 500, "session closed"}); - // TODO: there needs to be a way to queue an error in TrackHandle, both - // from here, when close races the FETCH stream, and from readLoop - // where we get a reset. - } - fetches_.clear(); + cleanup(); wt->closeSession( error.has_value() @@ -269,6 +965,8 @@ folly::coro::Task MoQSession::readLoop( // TODO: disallow OBJECT on control streams and non-object on non-control bool fin = false; auto token = co_await folly::coro::co_current_cancellation_token; + std::shared_ptr track; + folly::CancellationSource cs; while (!fin && !token.isCancellationRequested()) { auto streamData = co_await folly::coro::co_awaitTry( readHandle->readStreamData().via(evb_)); @@ -276,28 +974,32 @@ folly::coro::Task MoQSession::readLoop( XLOG(ERR) << streamData.exception().what() << " id=" << streamId << " sess=" << this; // TODO: possibly erase fetch - co_return; + cs.requestCancellation(); + break; } else { if (streamData->data || streamData->fin) { codec->onIngress(std::move(streamData->data), streamData->fin); } - fin = streamData->fin; - XLOG_IF(DBG3, fin) << "End of stream id=" << streamId << " sess=" << this; - if (fin && objCodec) { + if (!track && objCodec) { + // TODO: this might not be set auto trackId = objCodec->getTrackIdentifier(); if (auto subscribeID = std::get_if(&trackId)) { // it's fetch - auto track = getTrack(trackId); - if (track) { - track->fin(); - track->setAllDataReceived(); - if (track->fetchOkReceived()) { - fetches_.erase(*subscribeID); - checkForCloseOnDrain(); - } - } + track = getTrack(trackId); + track->mergeReadCancelToken( + folly::CancellationToken::merge(cs.getToken(), token)); } } + fin = streamData->fin; + XLOG_IF(DBG3, fin) << "End of stream id=" << streamId << " sess=" << this; + } + } + if (track) { + track->fin(); + track->setAllDataReceived(); + if (track->fetchOkReceived()) { + fetches_.erase(track->subscribeID()); + checkForCloseOnDrain(); } } } @@ -423,7 +1125,16 @@ void MoQSession::onSubscribe(SubscribeRequest subscribeRequest) { subscribeError({subscribeRequest.subscribeID, 400, "dup sub ID"}); return; } - pubTracks_[subscribeRequest.subscribeID].priority = subscribeRequest.priority; + // TODO: Check for duplicate alias + auto trackPublisher = std::make_shared( + this, + subscribeRequest.subscribeID, + subscribeRequest.trackAlias, + subscribeRequest.priority, + subscribeRequest.groupOrder); + pubTracks_.emplace(subscribeID, std::move(trackPublisher)); + // TODO: there should be a timeout for the application to call + // subscribeOK/Error controlMessages_.enqueue(std::move(subscribeRequest)); } @@ -439,7 +1150,7 @@ void MoQSession::onSubscribeUpdate(SubscribeUpdate subscribeUpdate) { return; } - it->second.priority = subscribeUpdate.priority; + it->second->setPriority(subscribeUpdate.priority); // TODO: update priority of tracks in flight controlMessages_.enqueue(std::move(subscribeUpdate)); } @@ -547,29 +1258,26 @@ void MoQSession::onFetch(Fetch fetch) { fetchError({fetch.subscribeID, 400, "dup sub ID"}); return; } - pubTracks_[fetch.subscribeID].priority = fetch.priority; - pubTracks_[fetch.subscribeID].groupOrder = fetch.groupOrder; + auto fetchPublisher = std::make_shared( + this, fetch.subscribeID, fetch.priority, fetch.groupOrder); + pubTracks_.emplace(fetch.subscribeID, std::move(fetchPublisher)); controlMessages_.enqueue(std::move(fetch)); } void MoQSession::onFetchCancel(FetchCancel fetchCancel) { XLOG(DBG1) << __func__ << " sess=" << this; - PublishKey publishKey( - {fetchCancel.subscribeID, 0, 0, ForwardPreference::Fetch, 0}); - auto pubDataIt = publishDataMap_.find(publishKey); - if (pubDataIt == publishDataMap_.end()) { + auto pubTrackIt = pubTracks_.find(fetchCancel.subscribeID); + if (pubTrackIt == pubTracks_.end()) { XLOG(DBG4) << "No publish key for fetch id=" << fetchCancel.subscribeID << " sess=" << this; - // Either the Fetch stream has already closed, or isn't open yet. + // The Fetch stream has already closed, or never existed // If it's already closed, a no-op is fine. - // If it isn't open yet, we could: - // a) enqueue a callback the publisher has to handle - // b) add a placeholder - this is annoying // See: https://github.com/moq-wg/moq-transport/issues/630 } else { - wt_->resetStream(pubDataIt->second.streamID, 0); + // It's possible the fetch stream hasn't opened yet if the application + // hasn't made it to fetchOK. + pubTrackIt->second->reset(ResetStreamErrorCode::CANCELLED); retireSubscribeId(/*signal=*/true); - pubDataIt->second.cancelled = true; } } @@ -800,6 +1508,7 @@ MoQSession::TrackHandle::objects() { folly::makeGuard([func = __func__] { XLOG(DBG1) << "exit " << func; }); auto cancelToken = co_await folly::coro::co_current_cancellation_token; auto mergeToken = folly::CancellationToken::merge(cancelToken, cancelToken_); + folly::EventBaseThreadTimekeeper tk(*evb_); while (!cancelToken.isCancellationRequested()) { auto optionalObj = newObjects_.try_dequeue(); std::shared_ptr obj; @@ -807,7 +1516,8 @@ MoQSession::TrackHandle::objects() { obj = *optionalObj; } else { obj = co_await folly::coro::co_withCancellation( - mergeToken, newObjects_.dequeue()); + mergeToken, + folly::coro::timeout(newObjects_.dequeue(), objectTimeout_, &tk)); } if (!obj) { XLOG(DBG3) << "Out of objects for trackHandle=" << this @@ -846,7 +1556,7 @@ MoQSession::subscribe(SubscribeRequest sub) { std::piecewise_construct, std::forward_as_tuple(trackAlias), std::forward_as_tuple(std::make_shared( - fullTrackName, subID, cancellationSource_.getToken()))); + fullTrackName, subID, evb_, cancellationSource_.getToken()))); auto trackHandle = subTrack.first->second; auto res2 = co_await trackHandle->ready(); @@ -855,20 +1565,32 @@ MoQSession::subscribe(SubscribeRequest sub) { co_return res2; } -void MoQSession::subscribeOk(SubscribeOk subOk) { +std::shared_ptr MoQSession::subscribeOk(SubscribeOk subOk) { XLOG(DBG1) << __func__ << " sess=" << this; auto it = pubTracks_.find(subOk.subscribeID); if (it == pubTracks_.end()) { XLOG(ERR) << "Invalid Subscribe OK, id=" << subOk.subscribeID; - return; - } - it->second.groupOrder = subOk.groupOrder; + return nullptr; + } + auto trackPublisher = + std::dynamic_pointer_cast(it->second); + if (!trackPublisher) { + XLOG(ERR) << "subscribe ID maps to a fetch, not a subscribe, id=" + << subOk.subscribeID; + subscribeError( + {subOk.subscribeID, + folly::to_underlying(FetchErrorCode::INTERNAL_ERROR), + ""}); + return nullptr; + } + trackPublisher->setGroupOrder(subOk.groupOrder); auto res = writeSubscribeOk(controlWriteBuf_, subOk); if (!res) { XLOG(ERR) << "writeSubscribeOk failed sess=" << this; - return; + return nullptr; } controlWriteEvent_.signal(); + return std::static_pointer_cast(trackPublisher); } void MoQSession::subscribeError(SubscribeError subErr) { @@ -919,6 +1641,14 @@ void MoQSession::unsubscribe(Unsubscribe unsubscribe) { controlWriteEvent_.signal(); } +folly::Expected +MoQSession::PublisherImpl::subscribeDone(SubscribeDone subscribeDone) { + auto session = session_; + session_ = nullptr; + session->subscribeDone(std::move(subscribeDone)); + return folly::unit; +} + void MoQSession::subscribeDone(SubscribeDone subDone) { XLOG(DBG1) << __func__ << " sess=" << this; auto it = pubTracks_.find(subDone.subscribeID); @@ -963,6 +1693,24 @@ void MoQSession::sendMaxSubscribeID(bool signal) { } } +void MoQSession::PublisherImpl::fetchComplete() { + auto session = session_; + session_ = nullptr; + session->fetchComplete(subscribeID_); +} + +void MoQSession::fetchComplete(SubscribeID subscribeID) { + XLOG(DBG1) << __func__ << " sess=" << this; + auto it = pubTracks_.find(subscribeID); + if (it == pubTracks_.end()) { + XLOG(ERR) << "fetchComplete for invalid id=" << subscribeID + << " sess=" << this; + return; + } + pubTracks_.erase(it); + retireSubscribeId(/*signal=*/true); +} + void MoQSession::subscribeUpdate(SubscribeUpdate subUpdate) { XLOG(DBG1) << __func__ << " sess=" << this; auto trackAliasIt = subIdToTrackAlias_.find(subUpdate.subscribeID); @@ -1013,27 +1761,49 @@ MoQSession::fetch(Fetch fetch) { std::piecewise_construct, std::forward_as_tuple(subID), std::forward_as_tuple(std::make_shared( - fullTrackName, subID, cancellationSource_.getToken()))); + fullTrackName, subID, evb_, cancellationSource_.getToken()))); auto trackHandle = subTrack.first->second; + trackHandle->setNewObjectTimeout(std::chrono::seconds(2)); auto res = co_await trackHandle->fetchReady(); XLOG(DBG1) << __func__ << " fetchReady trackHandle=" << trackHandle; co_return res; } -void MoQSession::fetchOk(FetchOk fetchOk) { +std::shared_ptr MoQSession::fetchOk(FetchOk fetchOk) { XLOG(DBG1) << __func__ << " sess=" << this; auto it = pubTracks_.find(fetchOk.subscribeID); if (it == pubTracks_.end()) { XLOG(ERR) << "Invalid Fetch OK, id=" << fetchOk.subscribeID; - return; + return nullptr; + } + auto fetchPublisher = dynamic_cast(it->second.get()); + if (!fetchPublisher) { + XLOG(ERR) << "subscribe ID maps to a subscribe, not a fetch, id=" + << fetchOk.subscribeID; + fetchError( + {fetchOk.subscribeID, + folly::to_underlying(FetchErrorCode::INTERNAL_ERROR), + ""}); + return nullptr; + } + auto fetchConsumer = fetchPublisher->beginFetch(fetchOk.groupOrder); + if (!fetchConsumer) { + XLOG(ERR) << "beginFetch Failed, id=" << fetchOk.subscribeID; + fetchError( + {fetchOk.subscribeID, + folly::to_underlying(FetchErrorCode::INTERNAL_ERROR), + ""}); + return nullptr; } + auto res = writeFetchOk(controlWriteBuf_, fetchOk); if (!res) { XLOG(ERR) << "writeFetchOk failed sess=" << this; - return; + return nullptr; } controlWriteEvent_.signal(); + return *fetchConsumer; } void MoQSession::fetchError(FetchError fetchErr) { @@ -1068,290 +1838,6 @@ void MoQSession::fetchCancel(FetchCancel fetchCan) { controlWriteEvent_.signal(); } -namespace { -constexpr uint32_t IdMask = 0x1FFFFF; -uint64_t groupOrder(GroupOrder groupOrder, uint64_t group) { - uint32_t truncGroup = static_cast(group) & IdMask; - return groupOrder == GroupOrder::OldestFirst ? truncGroup - : (IdMask - truncGroup); -} - -uint32_t objOrder(uint64_t objId) { - return static_cast(objId) & IdMask; -} -} // namespace - -uint64_t MoQSession::order( - const ObjectHeader& objHeader, - const SubscribeID subscribeID) { - PubTrack pubTrack{ - std::numeric_limits::max(), GroupOrder::OldestFirst}; - auto pubTrackIt = pubTracks_.find(subscribeID); - if (pubTrackIt != pubTracks_.end()) { - pubTrack = pubTrackIt->second; - } - // 6 reserved bits | 58 bit order - // 6 reserved | 8 sub pri | 8 pub pri | 21 group order | 21 obj order - return ( - (uint64_t(pubTrack.priority) << 50) | - (uint64_t(objHeader.priority) << 42) | - (groupOrder(pubTrack.groupOrder, objHeader.group) << 21) | - objOrder(objHeader.id)); -} - -folly::SemiFuture MoQSession::publish( - const ObjectHeader& objHeader, - SubscribeID subscribeID, - uint64_t payloadOffset, - std::unique_ptr payload, - bool eom) { - XCHECK_EQ(objHeader.status, ObjectStatus::NORMAL); - return publishImpl( - objHeader, subscribeID, payloadOffset, std::move(payload), eom, false); -} - -folly::SemiFuture MoQSession::publishStreamPerObject( - const ObjectHeader& objHeader, - SubscribeID subscribeID, - uint64_t payloadOffset, - std::unique_ptr payload, - bool eom) { - XCHECK_EQ(objHeader.status, ObjectStatus::NORMAL); - XCHECK_EQ(objHeader.forwardPreference, ForwardPreference::Subgroup); - return publishImpl( - objHeader, subscribeID, payloadOffset, std::move(payload), eom, true); -} - -folly::SemiFuture MoQSession::publishStatus( - const ObjectHeader& objHeader, - SubscribeID subscribeID) { - XCHECK_NE(objHeader.status, ObjectStatus::NORMAL); - return publishImpl(objHeader, subscribeID, 0, nullptr, true, false); -} - -folly::Try MoQSession::closeFetchStream(SubscribeID subID) { - PublishKey publishKey{ - TrackIdentifier(subID), 0, 0, ForwardPreference::Fetch, 0}; - auto pubDataIt = publishDataMap_.find(publishKey); - if (pubDataIt == publishDataMap_.end()) { - XLOG(ERR) << "Invalid subscribeID to closeFetchStream=" << subID.value; - return folly::makeTryWith( - [] { throw std::runtime_error("Invalid subscribeID"); }); - } - if (pubDataIt->second.objectLength && *pubDataIt->second.objectLength > 0) { - XLOG(ERR) << "Non-zero length remaining in previous obj id=" << subID.value; - close(); - return folly::makeTryWith( - [] { throw std::runtime_error("Premature closeFetchStream"); }); - } - auto streamID = pubDataIt->second.streamID; - auto cancelled = pubDataIt->second.cancelled; - publishDataMap_.erase(pubDataIt); - if (!cancelled) { - XLOG(DBG1) << "Closing fetch stream=" << streamID; - retireSubscribeId(/*signal=*/true); - auto writeRes = wt_->writeStreamData(streamID, nullptr, true); - if (!writeRes) { - XLOG(ERR) << "Failed to close fetch stream sess=" << this - << " error=" << static_cast(writeRes.error()); - // Probably got a STOP_SENDING, which is fine - } - } - return folly::Try(folly::unit); -} - -folly::SemiFuture MoQSession::publishImpl( - const ObjectHeader& objHeader, - SubscribeID subscribeID, - uint64_t payloadOffset, - std::unique_ptr payload, - bool eom, - bool streamPerObject) { - XLOG(DBG1) << __func__ << " " << objHeader << " sess=" << this; - // TODO: Should there be verification that subscribeID / trackAlias are - // valid, current subscriptions, or make the peer do that? - folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()}; - bool sendAsDatagram = - objHeader.forwardPreference == ForwardPreference::Datagram; - - PublishKey publishKey( - {objHeader.trackIdentifier, - objHeader.group, - objHeader.subgroup, - objHeader.forwardPreference, - objHeader.id}); - auto pubDataIt = publishDataMap_.find(publishKey); - if (pubDataIt == publishDataMap_.end()) { - XLOG(DBG4) << "New publish key, existing map size=" - << publishDataMap_.size() << " sess=" << this; - // New publishing key - - // payloadOffset can be > 0 here if wt_->createUniStream() FAILS, that can - // happen if the subscriber closes session abruptly, then: - // - We do not add this publishKey to publishDataMap_ - // - Next portion of the object calls this function again with payloadOffset - // > 0 - if (payloadOffset != 0) { - XLOG(WARN) << __func__ << " Can't start publishing in the middle. " - << "Disgregard data for this new obj with payloadOffset = " - << payloadOffset << " sess=" << this; - return folly::makeSemiFuture(folly::exception_wrapper( - std::runtime_error("Can't start publishing in the middle."))); - } - - // Create a new stream (except for datagram) - proxygen::WebTransport::StreamWriteHandle* stream = nullptr; - if (!sendAsDatagram) { - auto res = wt_->createUniStream(); - if (!res) { - // failed to create a stream - XLOG(ERR) << "Failed to create uni stream sess=" << this; - return folly::makeSemiFuture(folly::exception_wrapper( - std::runtime_error("Failed to create uni stream."))); - } - stream = *res; - XLOG(DBG4) << "New stream created, id: " << stream->getID() - << " sess=" << this; - stream->setPriority(1, order(objHeader, subscribeID), false); - } - - // Add publishing key - auto res = publishDataMap_.emplace( - std::piecewise_construct, - std::forward_as_tuple(publishKey), - std::forward_as_tuple(PublishData( - {((stream) ? stream->getID() - : std::numeric_limits::max()), - objHeader.group, - objHeader.subgroup, - objHeader.id, - objHeader.length, - 0, - streamPerObject}))); - pubDataIt = res.first; - // Serialize multi-object stream header - if (objHeader.forwardPreference == ForwardPreference::Track || - objHeader.forwardPreference == ForwardPreference::Fetch || - objHeader.forwardPreference == ForwardPreference::Subgroup) { - writeStreamHeader(writeBuf, objHeader); - } - } else { - XLOG(DBG4) << "Found open pub data sess=" << this; - } - if (pubDataIt->second.cancelled) { - XLOG(DBG2) << "Peer has cancelled stream=" << pubDataIt->second.streamID - << " sess=" << this; - // caller MUST NOT call publish again on this stream - publishDataMap_.erase(pubDataIt); - return folly::makeSemiFuture( - folly::exception_wrapper(std::runtime_error("cancelled"))); - } - // TODO: Missing offset checks - uint64_t payloadLength = payload ? payload->computeChainDataLength() : 0; - if (payloadOffset == 0) { - // new object - // validate group and object are moving in the right direction - bool multiObject = false; - if (objHeader.forwardPreference == ForwardPreference::Track || - objHeader.forwardPreference == ForwardPreference::Fetch) { - if (objHeader.group < pubDataIt->second.group) { - XLOG(ERR) << "Decreasing group in Track sess=" << this; - return folly::makeSemiFuture(folly::exception_wrapper( - std::runtime_error("Decreasing group in Track."))); - } - if (objHeader.group == pubDataIt->second.group) { - if (objHeader.id < pubDataIt->second.objectID || - (objHeader.id == pubDataIt->second.objectID && - pubDataIt->second.offset != 0)) { - XLOG(ERR) << "obj id must increase within group sess=" << this; - return folly::makeSemiFuture(folly::exception_wrapper( - std::runtime_error("obj id must increase within group."))); - } - } - multiObject = true; - } else if (objHeader.forwardPreference == ForwardPreference::Subgroup) { - if (objHeader.status != ObjectStatus::END_OF_SUBGROUP && - ((objHeader.id < pubDataIt->second.objectID || - (objHeader.id == pubDataIt->second.objectID && - pubDataIt->second.offset != 0)))) { - XLOG(ERR) << "obj id must increase within subgroup sess=" << this; - return folly::makeSemiFuture(folly::exception_wrapper( - std::runtime_error("obj id must increase within subgroup."))); - } - multiObject = true; - } - pubDataIt->second.group = objHeader.group; - pubDataIt->second.objectID = objHeader.id; - auto objCopy = objHeader; - if (multiObject && !objCopy.length) { - if (eom) { - objCopy.length = payloadLength; - } else { - XLOG(ERR) << "Multi object streams require length sess=" << this; - } - } - writeObject(writeBuf, objCopy, nullptr); - } - if (pubDataIt->second.objectLength && - *pubDataIt->second.objectLength < payloadLength) { - XLOG(ERR) << "Object length exceeds header length sess=" << this; - return folly::makeSemiFuture(folly::exception_wrapper( - std::runtime_error("Object length exceeds header length."))); - } - writeBuf.append(std::move(payload)); - if (sendAsDatagram) { - wt_->sendDatagram(writeBuf.move()); - publishDataMap_.erase(pubDataIt); - return folly::makeSemiFuture(); - } else { - bool streamEOM = - (objHeader.status == ObjectStatus::END_OF_GROUP || - objHeader.status == ObjectStatus::END_OF_SUBGROUP || - objHeader.status == ObjectStatus::END_OF_TRACK_AND_GROUP); - // TODO: verify that pubDataIt->second.objectLength is empty or 0 - if (eom && pubDataIt->second.streamPerObject) { - writeObject( - writeBuf, - ObjectHeader( - {objHeader.trackIdentifier, - objHeader.group, - objHeader.subgroup, - objHeader.id, - objHeader.priority, - ForwardPreference::Subgroup, - ObjectStatus::END_OF_SUBGROUP, - 0}), - nullptr); - streamEOM = true; - } - XLOG_IF(DBG1, streamEOM) << "End of stream sess=" << this; - auto writeRes = wt_->writeStreamData( - pubDataIt->second.streamID, writeBuf.move(), streamEOM); - if (!writeRes) { - XLOG(ERR) << "Failed to write stream data. sess=" << this - << " error=" << static_cast(writeRes.error()); - // TODO: remove stream from publishDataMap_? - return folly::makeSemiFuture( - folly::exception_wrapper(WebTransportException( - writeRes.error(), "Failed to write stream data."))); - } - if (streamEOM) { - publishDataMap_.erase(pubDataIt); - } else { - if (eom) { - pubDataIt->second.offset = 0; - pubDataIt->second.objectLength.reset(); - } else { - pubDataIt->second.offset += payloadLength; - if (pubDataIt->second.objectLength) { - *pubDataIt->second.objectLength -= payloadLength; - } - } - } - return folly::makeSemiFuture(); - } -} - void MoQSession::onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh) { XLOG(DBG1) << __func__ << " sess=" << this; if (!setupComplete_) { diff --git a/moxygen/MoQSession.h b/moxygen/MoQSession.h index 09c2b16..a2c15bb 100644 --- a/moxygen/MoQSession.h +++ b/moxygen/MoQSession.h @@ -16,6 +16,7 @@ #include #include #include +#include #include "moxygen/util/TimedBaton.h" #include @@ -71,7 +72,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, if (maxConcurrent > maxConcurrentSubscribes_) { auto delta = maxConcurrent - maxConcurrentSubscribes_; maxSubscribeID_ += delta; - sendMaxSubscribeID(/*signal=*/true); + sendMaxSubscribeID(/*signalWriteLoop=*/true); } } @@ -191,9 +192,11 @@ class MoQSession : public MoQControlCodec::ControlCallback, TrackHandle( FullTrackName fullTrackName, SubscribeID subscribeID, + folly::EventBase* evb, folly::CancellationToken token) : fullTrackName_(std::move(fullTrackName)), subscribeID_(subscribeID), + evb_(evb), cancelToken_(std::move(token)) { auto contract = folly::coro::makePromiseContract< folly::Expected, SubscribeError>>(); @@ -217,10 +220,18 @@ class MoQSession : public MoQControlCodec::ControlCallback, return subscribeID_; } + void setNewObjectTimeout(std::chrono::milliseconds objectTimeout) { + objectTimeout_ = objectTimeout; + } + [[nodiscard]] folly::CancellationToken getCancelToken() const { return cancelToken_; } + void mergeReadCancelToken(folly::CancellationToken readToken) { + cancelToken_ = folly::CancellationToken::merge(cancelToken_, readToken); + } + void fin(); folly::coro::Task< @@ -327,6 +338,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, private: FullTrackName fullTrackName_; SubscribeID subscribeID_; + folly::EventBase* evb_; using SubscribeResult = folly::Expected, SubscribeError>; folly::coro::Promise promise_; @@ -343,21 +355,21 @@ class MoQSession : public MoQControlCodec::ControlCallback, GroupOrder groupOrder_; folly::Optional latest_; folly::CancellationToken cancelToken_; + std::chrono::milliseconds objectTimeout_{std::chrono::hours(24)}; bool allDataReceived_{false}; }; folly::coro::Task< folly::Expected, SubscribeError>> subscribe(SubscribeRequest sub); - void subscribeOk(SubscribeOk subOk); + std::shared_ptr subscribeOk(SubscribeOk subOk); void subscribeError(SubscribeError subErr); void unsubscribe(Unsubscribe unsubscribe); - void subscribeDone(SubscribeDone subDone); void subscribeUpdate(SubscribeUpdate subUpdate); folly::coro::Task, FetchError>> fetch(Fetch fetch); - void fetchOk(FetchOk fetchOk); + std::shared_ptr fetchOk(FetchOk fetchOk); void fetchError(FetchError fetchError); void fetchCancel(FetchCancel fetchCancel); @@ -371,23 +383,54 @@ class MoQSession : public MoQControlCodec::ControlCallback, proxygen::WebTransport::ErrorCode errorCode; }; - // Publish this object. - folly::SemiFuture publish( - const ObjectHeader& objHeader, - SubscribeID subscribeID, - uint64_t payloadOffset, - std::unique_ptr payload, - bool eom); - folly::SemiFuture publishStreamPerObject( - const ObjectHeader& objHeader, - SubscribeID subscribeID, - uint64_t payloadOffset, - std::unique_ptr payload, - bool eom); - folly::SemiFuture publishStatus( - const ObjectHeader& objHeader, - SubscribeID subscribeID); - folly::Try closeFetchStream(SubscribeID subID); + class PublisherImpl { + public: + PublisherImpl( + MoQSession* session, + SubscribeID subscribeID, + Priority priority, + GroupOrder groupOrder) + : session_(session), + subscribeID_(subscribeID), + priority_(priority), + groupOrder_(groupOrder) {} + virtual ~PublisherImpl() = default; + + SubscribeID subscribeID() const { + return subscribeID_; + } + uint8_t priority() const { + return priority_; + } + void setPriority(uint8_t priority) { + priority_ = priority; + } + void setGroupOrder(GroupOrder groupOrder) { + groupOrder_ = groupOrder; + } + + virtual void reset(ResetStreamErrorCode error) = 0; + + virtual void onStreamComplete(const ObjectHeader& finalHeader) = 0; + + folly::Expected subscribeDone( + SubscribeDone subDone); + + void fetchComplete(); + + protected: + proxygen::WebTransport* getWebTransport() const { + if (session_) { + return session_->wt_; + } + return nullptr; + } + + MoQSession* session_{nullptr}; + SubscribeID subscribeID_; + uint8_t priority_; + GroupOrder groupOrder_; + }; void onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh) override; void onNewBidiStream(proxygen::WebTransport::BidiStreamHandle bh) override; @@ -398,6 +441,8 @@ class MoQSession : public MoQControlCodec::ControlCallback, } private: + void cleanup(); + folly::coro::Task controlWriteLoop( proxygen::WebTransport::StreamWriteHandle* writeHandle); folly::coro::Task readLoop( @@ -405,6 +450,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, proxygen::WebTransport::StreamReadHandle* readHandle); std::shared_ptr getTrack(TrackIdentifier trackidentifier); + void subscribeDone(SubscribeDone subDone); void onClientSetup(ClientSetup clientSetup) override; void onServerSetup(ServerSetup setup) override; @@ -445,72 +491,9 @@ class MoQSession : public MoQControlCodec::ControlCallback, void onConnectionError(ErrorCode error) override; void checkForCloseOnDrain(); - folly::SemiFuture publishImpl( - const ObjectHeader& objHeader, - SubscribeID subscribeID, - uint64_t payloadOffset, - std::unique_ptr payload, - bool eom, - bool streamPerObject); - - uint64_t order(const ObjectHeader& objHeader, const SubscribeID subscribeID); - - void retireSubscribeId(bool signal); - void sendMaxSubscribeID(bool signal); - - struct PublishKey { - TrackIdentifier trackIdentifier; - uint64_t group; - uint64_t subgroup; - ForwardPreference pref; - uint64_t object; - - bool operator==(const PublishKey& other) const { - if (trackIdentifier != other.trackIdentifier || pref != other.pref) { - return false; - } - if (pref == ForwardPreference::Datagram) { - return object == other.object; - } else if (pref == ForwardPreference::Subgroup) { - return group == other.group && subgroup == other.subgroup; - } else if (pref == ForwardPreference::Track) { - return true; - } else if (pref == ForwardPreference::Fetch) { - return true; - } - return false; - } - - struct hash { - size_t operator()(const PublishKey& ook) const { - if (ook.pref == ForwardPreference::Datagram) { - return folly::hash::hash_combine( - TrackIdentifierHash{}(ook.trackIdentifier), - ook.group, - ook.object); - } else if (ook.pref == ForwardPreference::Subgroup) { - return folly::hash::hash_combine( - TrackIdentifierHash{}(ook.trackIdentifier), - ook.group, - ook.subgroup); - } - // Track or Fetch - return folly::hash::hash_combine( - TrackIdentifierHash{}(ook.trackIdentifier)); - } - }; - }; - - struct PublishData { - uint64_t streamID; - uint64_t group; - uint64_t subgroup; - uint64_t objectID; - folly::Optional objectLength; - uint64_t offset; - bool streamPerObject; - bool cancelled{false}; - }; + void retireSubscribeId(bool signalWriteLoop); + void sendMaxSubscribeID(bool signalWriteLoop); + void fetchComplete(SubscribeID subscribeID); // Get the max subscribe id from the setup params. If MAX_SUBSCRIBE_ID key is // not present, we default to 0 as specified. 0 means that the peer MUST NOT @@ -555,13 +538,10 @@ class MoQSession : public MoQControlCodec::ControlCallback, TrackNamespace::hash> pendingSubscribeAnnounces_; - struct PubTrack { - uint8_t priority; - GroupOrder groupOrder; - }; // Subscriber ID -> metadata about a publish track - folly::F14FastMap pubTracks_; - folly::F14FastMap publishDataMap_; + folly:: + F14FastMap, SubscribeID::hash> + pubTracks_; uint64_t nextTrackId_{0}; uint64_t closedSubscribes_{0}; // TODO: Make this value configurable. maxConcurrentSubscribes_ represents diff --git a/moxygen/relay/MoQForwarder.h b/moxygen/relay/MoQForwarder.h index 6a9cd54..2c8c61d 100644 --- a/moxygen/relay/MoQForwarder.h +++ b/moxygen/relay/MoQForwarder.h @@ -15,7 +15,7 @@ namespace moxygen { -class MoQForwarder { +class MoQForwarder : public TrackConsumer { public: explicit MoQForwarder( FullTrackName ftn, @@ -42,166 +42,448 @@ class MoQForwarder { finAfterEnd_ = finAfterEnd; } + struct SubgroupIdentifier { + uint64_t group; + uint64_t subgroup; + struct hash { + size_t operator()(const SubgroupIdentifier& id) const { + return folly::hash::hash_combine(id.group, id.subgroup); + } + }; + bool operator==(const SubgroupIdentifier& other) const { + return group == other.group && subgroup == other.subgroup; + } + }; + class SubgroupForwarder; struct Subscriber { std::shared_ptr session; SubscribeID subscribeID; TrackAlias trackAlias; SubscribeRange range; - - struct hash { - std::uint64_t operator()(const Subscriber& subscriber) const { - return folly::hash::hash_combine( - subscriber.session.get(), - subscriber.subscribeID.value, - subscriber.trackAlias.value, - subscriber.range.start.group, - subscriber.range.start.object, - subscriber.range.end.group, - subscriber.range.end.object); - } - }; - bool operator==(const Subscriber& other) const { - return session == other.session && subscribeID == other.subscribeID && - trackAlias == other.trackAlias && - (range.start <=> other.range.start) == - std::strong_ordering::equivalent && - (range.end <=> other.range.end) == std::strong_ordering::equivalent; - } + std::shared_ptr trackConsumer; + // Stores the SubgroupConsumer for this subscriber for all currently + // publishing subgroups. Having this state here makes it easy to remove + // a Subscriber and all open subgroups. + folly::F14FastMap< + SubgroupIdentifier, + std::shared_ptr, + SubgroupIdentifier::hash> + subgroups; }; [[nodiscard]] bool empty() const { return subscribers_.empty(); } - void addSubscriber(Subscriber sub) { - subscribers_.emplace(std::move(sub)); - } - - void addSubscriber( - std::shared_ptr session, - SubscribeID subscribeID, - TrackAlias trackAlias, - const SubscribeRequest& sub) { - subscribers_.emplace(Subscriber( - {std::move(session), - subscribeID, - trackAlias, - toSubscribeRange(sub, latest_)})); + void addSubscriber(std::shared_ptr sub) { + auto session = sub->session.get(); + subscribers_.emplace(session, std::move(sub)); } - bool updateSubscriber(const SubscribeUpdate& subscribeUpdate) { - folly::F14NodeSet::iterator it = - subscribers_.begin(); - for (; it != subscribers_.end();) { - if (subscribeUpdate.subscribeID == it->subscribeID) { - break; - } + bool updateSubscriber( + const std::shared_ptr session, + const SubscribeUpdate& subscribeUpdate) { + auto subIt = subscribers_.find(session.get()); + if (subIt == subscribers_.end()) { + return false; } - if (it == subscribers_.end()) { - // subscribeID not found + auto& subscriber = subIt->second; + if (subscribeUpdate.subscribeID != subscriber->subscribeID) { + XLOG(ERR) << "Invalid subscribeID"; return false; } - // Not implemented: Validation about subscriptions - Subscriber subscriber = *it; - subscribers_.erase(it); - subscriber.range.start = subscribeUpdate.start; - subscriber.range.end = subscribeUpdate.end; - subscribers_.emplace(std::move(subscriber)); + // TODO: Validate update subscription range conforms to SUBSCRIBE_UPDATE + // rules + subscriber->range.start = subscribeUpdate.start; + subscriber->range.end = subscribeUpdate.end; return true; } + void removeSession(const std::shared_ptr& session) { + removeSession( + session, + {SubscribeID(0), + SubscribeDoneStatusCode::GOING_AWAY, + "byebyebye", + latest_}); + } + void removeSession( const std::shared_ptr& session, - folly::Optional subID = folly::none) { - // The same session could have multiple subscriptions, remove all of them - // TODO: This shouldn't need to be a linear search - for (auto it = subscribers_.begin(); it != subscribers_.end();) { - if (it->session.get() == session.get() && - (!subID || *subID == it->subscribeID)) { - if (subID) { - it->session->subscribeDone( - {*subID, - SubscribeDoneStatusCode::UNSUBSCRIBED, - "byebyebye", - latest_}); - } // else assume the session went away ungracefully - XLOG(DBG1) << "Removing session from forwarder"; - it = subscribers_.erase(it); - } else { - it++; - } + SubscribeDone subDone) { + auto subIt = subscribers_.find(session.get()); + if (subIt == subscribers_.end()) { + // ? + XLOG(ERR) << "Session not found"; + return; } + subDone.subscribeID = subIt->second->subscribeID; + subscribeDone(*subIt->second, subDone); + subscribers_.erase(subIt); XLOG(DBG1) << "subscribers_.size()=" << subscribers_.size(); } - void publish( - ObjectHeader objHeader, - std::unique_ptr payload, - uint64_t payloadOffset = 0, - bool eom = true, - bool streamPerObject = false) { - AbsoluteLocation now{objHeader.group, objHeader.id}; + void subscribeDone(Subscriber& subscriber, SubscribeDone subDone) { + // TODO: Resetting subgroups here is too aggressive + XLOG(DBG1) << "Resetting open subgroups for subscriber=" << &subscriber; + for (auto& subgroup : subscriber.subgroups) { + subgroup.second->reset(ResetStreamErrorCode::CANCELLED); + } + subscriber.trackConsumer->subscribeDone(subDone); + } + + void forEachSubscriber( + std::function&)> fn) { + for (auto subscriberIt = subscribers_.begin(); + subscriberIt != subscribers_.end();) { + const auto& sub = subscriberIt->second; + subscriberIt++; + fn(sub); + } + } + + void updateLatest(uint64_t group, uint64_t object = 0) { + AbsoluteLocation now{group, object}; if (!latest_ || now > *latest_) { latest_ = now; } - for (auto it = subscribers_.begin(); it != subscribers_.end();) { - auto& sub = *it; - if (sub.range.start > now) { - // future subscriber - it++; - continue; - } - auto evb = sub.session->getEventBase(); - if (sub.range.end < now) { - // subscription over - if (finAfterEnd_) { - evb->runImmediatelyOrRunInEventBaseThread([session = sub.session, - subId = sub.subscribeID, - now, - trackName = - fullTrackName_] { - session->subscribeDone( - {subId, SubscribeDoneStatusCode::SUBSCRIPTION_ENDED, "", now}); - }); - } - it = subscribers_.erase(it); + } + + bool checkRange(const Subscriber& sub) { + XCHECK(latest_); + if (*latest_ < sub.range.start) { + // future + return false; + } else if (*latest_ > sub.range.end) { + // now past, send subscribeDone + // TOOD: maybe this is too early for a relay. + removeSession( + sub.session, + {sub.subscribeID, + SubscribeDoneStatusCode::SUBSCRIPTION_ENDED, + "", + sub.range.end}); + return false; + } + return true; + } + + void removeSession(const Subscriber& sub, const MoQPublishError& err) { + removeSession( + sub.session, + {sub.subscribeID, + SubscribeDoneStatusCode::INTERNAL_ERROR, + err.what(), + sub.range.end}); + } + + folly::Expected, MoQPublishError> + beginSubgroup(uint64_t groupID, uint64_t subgroupID, Priority priority) + override { + updateLatest(groupID, 0); + auto subgroupForwarder = std::make_shared( + *this, groupID, subgroupID, priority); + SubgroupIdentifier subgroupIdentifier(groupID, subgroupID); + forEachSubscriber([=](const std::shared_ptr& sub) { + if (!checkRange(*sub)) { + return; + } + auto res = + sub->trackConsumer->beginSubgroup(groupID, subgroupID, priority); + if (res.hasError()) { + removeSession(*sub, res.error()); } else { - evb->runImmediatelyOrRunInEventBaseThread( - [session = sub.session, - subId = sub.subscribeID, - trackAlias = sub.trackAlias, - objHeader, - payloadOffset, - buf = (payload) ? payload->clone() : nullptr, - eom, - streamPerObject]() mutable { - objHeader.trackIdentifier = trackAlias; - if (objHeader.status != ObjectStatus::NORMAL) { - session->publishStatus(objHeader, subId); - } else if (streamPerObject) { - session->publishStreamPerObject( - objHeader, subId, payloadOffset, std::move(buf), eom); - } else { - session->publish( - objHeader, subId, payloadOffset, std::move(buf), eom); - } - }); - it++; - } - } - } - - void error(SubscribeDoneStatusCode errorCode, std::string reasonPhrase) { - for (auto sub : subscribers_) { - sub.session->subscribeDone( - {sub.subscribeID, errorCode, reasonPhrase, latest_}); - } - subscribers_.clear(); + sub->subgroups[subgroupIdentifier] = res.value(); + } + }); + subgroups_.emplace(subgroupIdentifier, subgroupForwarder); + return subgroupForwarder; + } + + folly::Expected, MoQPublishError> + awaitStreamCredit() override { + return folly::makeSemiFuture(); + } + + folly::Expected objectStream( + const ObjectHeader& header, + Payload payload) override { + updateLatest(header.group, header.id); + forEachSubscriber([&](const std::shared_ptr& sub) { + if (!checkRange(*sub)) { + return; + } + sub->trackConsumer->objectStream(header, maybeClone(payload)) + .onError([this, sub](const auto& err) { removeSession(*sub, err); }); + }); + return folly::unit; + } + + folly::Expected + groupNotExists(uint64_t groupID, uint64_t subgroup, Priority pri) override { + updateLatest(groupID, 0); + forEachSubscriber([&](const std::shared_ptr& sub) { + if (!checkRange(*sub)) { + return; + } + sub->trackConsumer->groupNotExists(groupID, subgroup, pri) + .onError([this, sub](const auto& err) { removeSession(*sub, err); }); + }); + return folly::unit; + } + + folly::Expected datagram( + const ObjectHeader& header, + Payload payload) override { + updateLatest(header.group, header.id); + forEachSubscriber([&](const std::shared_ptr& sub) { + if (!checkRange(*sub)) { + return; + } + sub->trackConsumer->datagram(header, maybeClone(payload)) + .onError([this, sub](const auto& err) { removeSession(*sub, err); }); + }); + return folly::unit; + } + + folly::Expected subscribeDone( + SubscribeDone subDone) override { + forEachSubscriber([&](const std::shared_ptr& sub) { + removeSession(sub->session, subDone); + }); + return folly::unit; } + class SubgroupForwarder : public SubgroupConsumer { + folly::Optional currentObjectLength_; + MoQForwarder& forwarder_; + SubgroupIdentifier identifier_; + Priority priority_; + + void forEachSubscriberSubgroup( + std::function& sub, + const std::shared_ptr&)> fn) { + forwarder_.forEachSubscriber([&](const std::shared_ptr& sub) { + if (forwarder_.latest_ && forwarder_.checkRange(*sub)) { + auto subgroupConsumerIt = sub->subgroups.find(identifier_); + if (subgroupConsumerIt == sub->subgroups.end()) { + auto res = sub->trackConsumer->beginSubgroup( + identifier_.group, identifier_.subgroup, priority_); + if (res.hasError()) { + forwarder_.removeSession(*sub, res.error()); + } else { + auto emplaceRes = + sub->subgroups.emplace(identifier_, res.value()); + subgroupConsumerIt = emplaceRes.first; + } + } + fn(sub, subgroupConsumerIt->second); + } + }); + } + + public: + SubgroupForwarder( + MoQForwarder& forwarder, + uint64_t group, + uint64_t subgroup, + Priority priority) + : forwarder_(forwarder), + identifier_{group, subgroup}, + priority_(priority) {} + + folly::Expected + object(uint64_t objectID, Payload payload, bool finSubgroup) override { + if (currentObjectLength_) { + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Still publishing previous object")); + } + forwarder_.updateLatest(identifier_.group, objectID); + forEachSubscriberSubgroup( + [&](const std::shared_ptr& sub, + const std::shared_ptr& subgroupConsumer) { + subgroupConsumer->object(objectID, maybeClone(payload), finSubgroup) + .onError([this, sub](const auto& err) { + forwarder_.removeSession(*sub, err); + }); + if (finSubgroup) { + sub->subgroups.erase(identifier_); + } + }); + if (finSubgroup) { + forwarder_.subgroups_.erase(identifier_); + } + return folly::unit; + } + + folly::Expected objectNotExists( + uint64_t objectID, + bool finSubgroup = false) override { + if (currentObjectLength_) { + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Still publishing previous object")); + } + forwarder_.updateLatest(identifier_.group, objectID); + forEachSubscriberSubgroup( + [&](const std::shared_ptr& sub, + const std::shared_ptr& subgroupConsumer) { + subgroupConsumer->objectNotExists(objectID, finSubgroup) + .onError([this, sub](const auto& err) { + forwarder_.removeSession(*sub, err); + }); + if (finSubgroup) { + sub->subgroups.erase(identifier_); + } + }); + if (finSubgroup) { + forwarder_.subgroups_.erase(identifier_); + } + return folly::unit; + } + + folly::Expected beginObject( + uint64_t objectID, + uint64_t length, + Payload initialPayload) override { + // TODO: use a shared class for object publish state validation + forwarder_.updateLatest(identifier_.group, objectID); + if (currentObjectLength_) { + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Still publishing previous object")); + } + auto payloadLength = + (initialPayload) ? initialPayload->computeChainDataLength() : 0; + if (length > payloadLength) { + currentObjectLength_ = length - payloadLength; + } + forEachSubscriberSubgroup( + [&](const std::shared_ptr& sub, + const std::shared_ptr& subgroupConsumer) { + subgroupConsumer + ->beginObject(objectID, length, maybeClone(initialPayload)) + .onError([this, sub](const auto& err) { + forwarder_.removeSession(*sub, err); + }); + }); + return folly::unit; + } + + folly::Expected endOfGroup( + uint64_t endOfGroupObjectID) override { + if (currentObjectLength_) { + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Still publishing previous object")); + } + forwarder_.updateLatest(identifier_.group, endOfGroupObjectID); + forEachSubscriberSubgroup( + [&](const std::shared_ptr& sub, + const std::shared_ptr& subgroupConsumer) { + subgroupConsumer->endOfGroup(endOfGroupObjectID) + .onError([this, sub](const auto& err) { + forwarder_.removeSession(*sub, err); + }); + sub->subgroups.erase(identifier_); + }); + forwarder_.subgroups_.erase(identifier_); + return folly::unit; + } + + folly::Expected endOfTrackAndGroup( + uint64_t endOfTrackObjectID) override { + if (currentObjectLength_) { + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Still publishing previous object")); + } + forwarder_.updateLatest(identifier_.group, endOfTrackObjectID); + forEachSubscriberSubgroup( + [&](const std::shared_ptr& sub, + const std::shared_ptr& subgroupConsumer) { + subgroupConsumer->endOfTrackAndGroup(endOfTrackObjectID) + .onError([this, sub](const auto& err) { + forwarder_.removeSession(*sub, err); + }); + sub->subgroups.erase(identifier_); + }); + forwarder_.subgroups_.erase(identifier_); + return folly::unit; + } + + folly::Expected endOfSubgroup() override { + if (currentObjectLength_) { + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Still publishing previous object")); + } + forEachSubscriberSubgroup( + [&](const std::shared_ptr& sub, + const std::shared_ptr& subgroupConsumer) { + subgroupConsumer->endOfSubgroup().onError( + [this, sub](const auto& err) { + forwarder_.removeSession(*sub, err); + }); + sub->subgroups.erase(identifier_); + }); + forwarder_.subgroups_.erase(identifier_); + return folly::unit; + } + + void reset(ResetStreamErrorCode error) override { + forEachSubscriberSubgroup( + [&](const std::shared_ptr& sub, + const std::shared_ptr& subgroupConsumer) { + subgroupConsumer->reset(error); + sub->subgroups.erase(identifier_); + }); + forwarder_.subgroups_.erase(identifier_); + } + + folly::Expected objectPayload( + Payload payload, + bool finSubgroup = false) override { + if (!currentObjectLength_) { + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Haven't started publishing object")); + } + auto payloadLength = (payload) ? payload->computeChainDataLength() : 0; + if (payloadLength > *currentObjectLength_) { + return folly::makeUnexpected(MoQPublishError( + MoQPublishError::API_ERROR, "Payload exceeded length")); + } + *currentObjectLength_ -= payloadLength; + forEachSubscriberSubgroup( + [&](const std::shared_ptr& sub, + const std::shared_ptr& subgroupConsumer) { + subgroupConsumer->objectPayload(maybeClone(payload), finSubgroup) + .onError([this, sub](const auto& err) { + forwarder_.removeSession(*sub, err); + }); + if (finSubgroup) { + sub->subgroups.erase(identifier_); + } + }); + if (*currentObjectLength_ == 0) { + currentObjectLength_.reset(); + if (finSubgroup) { + forwarder_.subgroups_.erase(identifier_); + } + return ObjectPublishStatus::DONE; + } + return ObjectPublishStatus::IN_PROGRESS; + } + }; + private: + static Payload maybeClone(const Payload& payload) { + return payload ? payload->clone() : nullptr; + } + FullTrackName fullTrackName_; - folly::F14NodeSet subscribers_; + folly::F14FastMap> subscribers_; + folly::F14FastMap< + SubgroupIdentifier, + std::shared_ptr, + SubgroupIdentifier::hash> + subgroups_; GroupOrder groupOrder_{GroupOrder::OldestFirst}; folly::Optional latest_; bool finAfterEnd_{true}; diff --git a/moxygen/relay/MoQRelay.cpp b/moxygen/relay/MoQRelay.cpp index add6a35..dffdcbb 100644 --- a/moxygen/relay/MoQRelay.cpp +++ b/moxygen/relay/MoQRelay.cpp @@ -204,25 +204,47 @@ folly::coro::Task MoQRelay::onSubscribe( } // Add to subscribers list auto sessionPtr = session.get(); - forwarder->addSubscriber( - std::move(session), subReq.subscribeID, subReq.trackAlias, subReq); - sessionPtr->subscribeOk( + auto trackPublisher = sessionPtr->subscribeOk( {subReq.subscribeID, std::chrono::milliseconds(0), MoQSession::resolveGroupOrder(forwarder->groupOrder(), subGroupOrder), forwarder->latest()}); + if (trackPublisher) { + auto subscriber = std::make_shared( + std::move(session), + subReq.subscribeID, + subReq.trackAlias, + toSubscribeRange(subReq, forwarder->latest()), + std::move(trackPublisher)); + forwarder->addSubscriber(std::move(subscriber)); + } else { + XLOG(ERR) << "Downstream subscribeOK failed"; + // TODO: unsubsubscribe upstream + } } folly::coro::Task MoQRelay::forwardTrack( std::shared_ptr track, - std::shared_ptr fowarder) { + std::shared_ptr forwarder) { while (auto obj = co_await track->objects().next()) { XLOG(DBG1) << __func__ << " new object t=" << obj.value()->fullTrackName << " g=" << obj.value()->header.group << " o=" << obj.value()->header.id; folly::IOBufQueue payloadBuf{folly::IOBufQueue::cacheChainLength()}; - uint64_t payloadOffset = 0; bool eom = false; + // TODO: this is wrong - we're publishing each object in it's own subgroup + // stream now + auto res = forwarder->beginSubgroup( + obj.value()->header.group, + obj.value()->header.subgroup, + obj.value()->header.priority); + if (!res) { + XLOG(ERR) << "Failed to begin forwarding subgroup"; + // TODO: error + } + auto subgroupPub = std::move(res.value()); + subgroupPub->beginObject( + obj.value()->header.id, *obj.value()->header.length, nullptr); while (!eom) { auto payload = co_await obj.value()->payloadQueue.dequeue(); if (payload) { @@ -235,17 +257,15 @@ folly::coro::Task MoQRelay::forwardTrack( eom = true; } auto payloadLength = payloadBuf.chainLength(); - if (eom || payloadOffset + payloadLength > 1280) { - fowarder->publish( - obj.value()->header, payloadBuf.move(), payloadOffset, eom); - payloadOffset += payloadLength; + if (eom || payloadLength > 1280) { + subgroupPub->objectPayload(payloadBuf.move(), eom); } else { XLOG(DBG1) << __func__ - << " Not publishing yet payloadOffset=" << payloadOffset - << " payloadLength=" << payloadLength + << " Not publishing yet payloadLength=" << payloadLength << " eom=" << uint64_t(eom); } } + subgroupPub.reset(); } } @@ -257,7 +277,12 @@ void MoQRelay::onUnsubscribe( for (auto subscriptionIt = subscriptions_.begin(); subscriptionIt != subscriptions_.end();) { auto& subscription = subscriptionIt->second; - subscription.forwarder->removeSession(session, unsub.subscribeID); + subscription.forwarder->removeSession( + session, + {subscription.subscribeID, + SubscribeDoneStatusCode::UNSUBSCRIBED, + "", + subscription.forwarder->latest()}); if (subscription.forwarder->empty()) { XLOG(INFO) << "Removed last subscriber for " << subscriptionIt->first; subscription.cancellationSource.requestCancellation(); @@ -312,8 +337,11 @@ void MoQRelay::removeSession(const std::shared_ptr& session) { subscriptionIt != subscriptions_.end();) { auto& subscription = subscriptionIt->second; if (subscription.upstream.get() == session.get()) { - subscription.forwarder->error( - SubscribeDoneStatusCode::SUBSCRIPTION_ENDED, "upstream disconnect"); + subscription.forwarder->subscribeDone( + {SubscribeID(0), + SubscribeDoneStatusCode::SUBSCRIPTION_ENDED, + "upstream disconnect", + subscription.forwarder->latest()}); subscription.cancellationSource.requestCancellation(); } else { subscription.forwarder->removeSession(session); diff --git a/moxygen/samples/chat/MoQChatClient.cpp b/moxygen/samples/chat/MoQChatClient.cpp index 9d9fcb5..227bec0 100644 --- a/moxygen/samples/chat/MoQChatClient.cpp +++ b/moxygen/samples/chat/MoQChatClient.cpp @@ -104,7 +104,7 @@ folly::coro::Task MoQChatClient::controlReadLoop() { if (client_.nextGroup_ > 0) { latest.emplace(client_.nextGroup_ - 1, 0); } - client_.moqClient_.moqSession_->subscribeOk( + client_.publisher_ = client_.moqClient_.moqSession_->subscribeOk( {subscribeReq.subscribeID, std::chrono::milliseconds(0), MoQSession::resolveGroupOrder( @@ -124,6 +124,14 @@ folly::coro::Task MoQChatClient::controlReadLoop() { unsubscribe.subscribeID == *client_.chatSubscribeID_) { client_.chatSubscribeID_.reset(); client_.chatTrackAlias_.reset(); + if (client_.publisher_) { + client_.publisher_->subscribeDone( + {unsubscribe.subscribeID, + SubscribeDoneStatusCode::UNSUBSCRIBED, + "", + folly::none}); + client_.publisher_.reset(); + } } } @@ -157,18 +165,17 @@ void MoQChatClient::publishLoop() { moqClient_.moqSession_->close(); moqClient_.moqSession_.reset(); } else if (chatSubscribeID_) { - moqClient_.moqSession_->publishStreamPerObject( - {*chatTrackAlias_, - nextGroup_++, - /*subgroup=*/0, - /*id=*/0, - /*pri=*/0, - ForwardPreference::Subgroup, - ObjectStatus::NORMAL}, - *chatSubscribeID_, - 0, - folly::IOBuf::copyBuffer(input), - true); + if (publisher_) { + publisher_->objectStream( + {*chatTrackAlias_, + nextGroup_++, + /*subgroup=*/0, + /*id=*/0, + /*pri=*/0, + ForwardPreference::Subgroup, + ObjectStatus::NORMAL}, + folly::IOBuf::copyBuffer(input)); + } } }); if (input == "/leave") { diff --git a/moxygen/samples/chat/MoQChatClient.h b/moxygen/samples/chat/MoQChatClient.h index c752b5e..2594478 100644 --- a/moxygen/samples/chat/MoQChatClient.h +++ b/moxygen/samples/chat/MoQChatClient.h @@ -51,6 +51,7 @@ class MoQChatClient { MoQClient moqClient_; folly::Optional chatSubscribeID_; folly::Optional chatTrackAlias_; + std::shared_ptr publisher_; uint64_t nextGroup_{0}; struct UserTrack { std::string deviceId; diff --git a/moxygen/samples/date/MoQDateServer.cpp b/moxygen/samples/date/MoQDateServer.cpp index 3bcf7c0..75efae6 100644 --- a/moxygen/samples/date/MoQDateServer.cpp +++ b/moxygen/samples/date/MoQDateServer.cpp @@ -49,6 +49,7 @@ class MoQDateServer : MoQServer { std::chrono::seconds(FLAGS_relay_transaction_timeout)) .scheduleOn(evb) .start(); + publishDateLoop().scheduleOn(evb).start(); } } @@ -76,7 +77,7 @@ class MoQDateServer : MoQServer { void operator()(SubscribeUpdate subscribeUpdate) const override { XLOG(INFO) << "SubscribeUpdate id=" << subscribeUpdate.subscribeID; - if (!server_.onSubscribeUpdate(subscribeUpdate)) { + if (!server_.onSubscribeUpdate(clientSession_, subscribeUpdate)) { clientSession_->subscribeError( {subscribeUpdate.subscribeID, 403, "unexpected subscribe update"}); } @@ -109,6 +110,11 @@ class MoQDateServer : MoQServer { std::unique_ptr makeControlVisitor( std::shared_ptr clientSession) override { + if (!loopRunning_) { + // start date loop on first server connect + loopRunning_ = true; + publishDateLoop().scheduleOn(clientSession->getEventBase()).start(); + } return std::make_unique( *this, std::move(clientSession)); } @@ -126,7 +132,7 @@ class MoQDateServer : MoQServer { {subReq.subscribeID, 400, "start in the past, use FETCH"}); return; } - clientSession->subscribeOk( + auto trackPublisher = clientSession->subscribeOk( {subReq.subscribeID, std::chrono::milliseconds(0), MoQSession::resolveGroupOrder( @@ -134,11 +140,12 @@ class MoQDateServer : MoQServer { nowLoc}); forwarder_.setLatest(nowLoc); - forwarder_.addSubscriber( - {std::move(clientSession), - subReq.subscribeID, - subReq.trackAlias, - range}); + forwarder_.addSubscriber(std::make_shared( + std::move(clientSession), + subReq.subscribeID, + subReq.trackAlias, + range, + std::move(trackPublisher))); } void onFetch(Fetch fetch, std::shared_ptr clientSession) { @@ -158,7 +165,7 @@ class MoQDateServer : MoQServer { return; } range.end = std::min(range.end, nowLoc); - clientSession->fetchOk( + auto fetchPub = clientSession->fetchOk( {fetch.subscribeID, MoQSession::resolveGroupOrder( GroupOrder::OldestFirst, fetch.groupOrder), @@ -166,168 +173,167 @@ class MoQDateServer : MoQServer { nowLoc, {}}); - catchup(clientSession, fetch.subscribeID, range, nowLoc); + catchup(fetchPub, range, nowLoc) + .scheduleOn(clientSession->getEventBase()) + .start(); + } + + bool onSubscribeUpdate( + const std::shared_ptr& session, + const SubscribeUpdate& subscribeUpdate) { + return forwarder_.updateSubscriber(session, subscribeUpdate); + } + + Payload minutePayload(uint64_t group) { + time_t in_time_t = group * 60; + struct tm local_tm; + auto lt = ::localtime_r(&in_time_t, &local_tm); + std::stringstream ss; + ss << std::put_time(lt, "%Y-%m-%d %H:%M:"); + XLOG(DBG1) << ss.str() << lt->tm_sec; + return folly::IOBuf::copyBuffer(ss.str()); } - bool onSubscribeUpdate(const SubscribeUpdate& subscribeUpdate) { - return forwarder_.updateSubscriber(subscribeUpdate); + Payload secondPayload(uint64_t object) { + XCHECK_GT(object, 0); + auto secBuf = folly::to(object - 1); + XLOG(DBG1) << (object - 1); + return folly::IOBuf::copyBuffer(secBuf); } - void catchup( - std::shared_ptr clientSession, - SubscribeID subscribeID, + folly::coro::Task catchup( + std::shared_ptr fetchPub, SubscribeRange range, AbsoluteLocation now) { if (range.start.object > 61) { XLOG(ERR) << "Invalid start object"; - return; + co_return; } - time_t t = - range.start.group * 60 + std::max(range.start.object, (uint64_t)1) - 1; - auto pubFn = [clientSession, subscribeID]( - ObjectHeader objHdr, - std::unique_ptr payload, - uint64_t payloadOffset, - bool eom, - bool) { - objHdr.trackIdentifier = subscribeID; - if (objHdr.status == ObjectStatus::NORMAL) { - XLOG(DBG1) << "Publish normal object trackIdentifier=" - << std::get(objHdr.trackIdentifier); - clientSession - ->publish( - objHdr, subscribeID, payloadOffset, std::move(payload), eom) - .via(clientSession->getEventBase()); + while (range.start < now && range.start < range.end) { + uint64_t subgroup = streamPerObject_ ? range.start.object : 0; + folly::Expected res{folly::unit}; + if (range.start.object == 0) { + res = fetchPub->object( + range.start.group, + subgroup, + range.start.object, + minutePayload(range.start.group), + false); + } else if (range.start.object <= 60) { + res = fetchPub->object( + range.start.group, + subgroup, + range.start.object, + secondPayload(range.start.object), + false); } else { - clientSession->publishStatus(objHdr, subscribeID); + res = fetchPub->endOfGroup( + range.start.group, + subgroup, + range.start.object, + /*finFetch=*/false); } - }; - while (range.start < now && range.start < range.end) { - auto n = publishDate( - pubFn, - t, - false, - subscribeID, - TrackAlias(0), - ForwardPreference::Fetch, - range.end); - t++; - // publishDate publishes two objects for obj = 0 - range.start.object += n; - if (range.start.object > 60) { + if (!res) { + XLOG(ERR) << "catchup error: " << res.error().what(); + if (res.error().code == MoQPublishError::BLOCKED) { + XLOG(DBG1) << "Fetch blocked, waiting"; + auto awaitRes = fetchPub->awaitReadyToConsume(); + if (!awaitRes) { + XLOG(ERR) << "awaitReadyToConsume error: " + << awaitRes.error().what(); + fetchPub->reset(ResetStreamErrorCode::INTERNAL_ERROR); + co_return; + } + co_await std::move(awaitRes.value()); + } else { + fetchPub->reset(ResetStreamErrorCode::INTERNAL_ERROR); + co_return; + } + } + range.start.object++; + if (range.start.object > 61) { range.start.group++; range.start.object = 0; } } // TODO - empty range may log an error? - clientSession->closeFetchStream(subscribeID); + fetchPub->endOfFetch(); } folly::coro::Task publishDateLoop() { bool first = false; auto cancelToken = co_await folly::coro::co_current_cancellation_token; - auto pubFn = [this]( - ObjectHeader objHdr, - std::unique_ptr payload, - uint64_t payloadOffset, - bool eom, - bool streamPerObject) { - forwarder_.publish( - std::move(objHdr), - std::move(payload), - payloadOffset, - eom, - streamPerObject); - }; + std::shared_ptr subgroupPublisher; while (!cancelToken.isCancellationRequested()) { if (!forwarder_.empty()) { auto now = std::chrono::system_clock::now(); auto in_time_t = std::chrono::system_clock::to_time_t(now); - publishDate( - pubFn, - in_time_t, - first, - 0, - 0, - ForwardPreference::Subgroup, - folly::none); - first = false; + if (streamPerObject_) { + publishDate(uint64_t(in_time_t / 60), uint64_t(in_time_t % 60)); + } else { + subgroupPublisher = publishDate( + subgroupPublisher, + uint64_t(in_time_t / 60), + uint64_t(in_time_t % 60)); + } } co_await folly::coro::sleep(std::chrono::seconds(1)); } } - size_t publishDate( - const std::function, - uint64_t, - bool, - bool)>& publishFn, - time_t in_time_t, - bool forceGroup, - SubscribeID, - TrackAlias trackAlias, - ForwardPreference pref, - folly::Optional end) { - size_t objectsPublished = 0; - struct tm local_tm; - auto lt = ::localtime_r(&in_time_t, &local_tm); - std::stringstream ss; - ss << std::put_time(lt, "%Y-%m-%d %H:%M:"); - XLOG(DBG1) << ss.str() << lt->tm_sec; - AbsoluteLocation nowLoc( - {uint64_t(in_time_t / 60), uint64_t(lt->tm_sec + 1)}); - if (lt->tm_sec == 0 || forceGroup) { - ObjectHeader objHeader( - {trackAlias, - nowLoc.group, - /*subgroup=*/0, - /*object=*/0, - /*priority*/ 0, - pref, - ObjectStatus::NORMAL, - folly::none}); - publishFn( - objHeader, - folly::IOBuf::copyBuffer(ss.str()), - 0, - true, - !end && streamPerObject_); - objectsPublished++; + std::shared_ptr publishDate( + std::shared_ptr subgroupPublisher, + uint64_t group, + uint64_t second) { + uint64_t subgroup = 0; + uint64_t object = second; + if (!subgroupPublisher) { + subgroupPublisher = + forwarder_.beginSubgroup(group, subgroup, /*priority=*/0).value(); } - if (!end || nowLoc < *end) { - auto secBuf = folly::to(lt->tm_sec); - uint64_t subgroup = streamPerObject_ ? nowLoc.object + 1 : 0; - ObjectHeader objHeader( - {trackAlias, - nowLoc.group, - subgroup, - nowLoc.object, - /*priority=*/0, - pref, - ObjectStatus::NORMAL, - folly::none}); - publishFn( - objHeader, - folly::IOBuf::copyBuffer(secBuf), - 0, - true, - !end && streamPerObject_ && nowLoc.object < 60); - objectsPublished++; - if (nowLoc.object == 60) { - objHeader.status = ObjectStatus::END_OF_GROUP; - objHeader.id++; - publishFn(std::move(objHeader), nullptr, 0, true, false); - } + if (object == 0) { + subgroupPublisher->object(0, minutePayload(group), false); + } + object++; + subgroupPublisher->object(object, secondPayload(object), false); + if (object >= 60) { + object++; + subgroupPublisher->endOfGroup(object); + subgroupPublisher.reset(); + } + return subgroupPublisher; + } + + void publishDate(uint64_t group, uint64_t second) { + uint64_t subgroup = second; + uint64_t object = second; + ObjectHeader header{ + TrackAlias(0), + group, + subgroup, + object, + /*priority=*/0, + ForwardPreference::Subgroup, + ObjectStatus::NORMAL, + folly::none}; + if (second == 0) { + forwarder_.objectStream(header, minutePayload(group)); + } + header.subgroup++; + header.id++; + forwarder_.objectStream(header, secondPayload(header.id)); + if (header.id >= 60) { + header.subgroup++; + header.id++; + header.status = ObjectStatus::END_OF_GROUP; + forwarder_.objectStream(header, nullptr); } - return objectsPublished; } void unsubscribe( std::shared_ptr session, Unsubscribe unsubscribe) { - forwarder_.removeSession(session, unsubscribe.subscribeID); + forwarder_.removeSession(session); } void terminateClientSession(std::shared_ptr session) override { @@ -342,13 +348,13 @@ class MoQDateServer : MoQServer { MoQForwarder forwarder_; std::unique_ptr relayClient_; bool streamPerObject_{false}; + bool loopRunning_{false}; }; } // namespace int main(int argc, char* argv[]) { folly::Init init(&argc, &argv, true); folly::EventBase evb; MoQDateServer moqDateServer(&evb, FLAGS_stream_per_object); - moqDateServer.publishDateLoop().scheduleOn(&evb).start(); evb.loopForever(); return 0; } diff --git a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp index 9be749f..8e156c8 100644 --- a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp +++ b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp @@ -107,92 +107,32 @@ class MoQFlvStreamerClient { << ", to subID-TrackAlias: " << sub.second.subscribeID << "-" << sub.second.trackAlias; - if (sub.second.fullTrackName == fullVideoTrackName_) { + if (sub.second.fullTrackName == fullVideoTrackName_ && videoPub_) { if (item->isEOF && MoQFlvStreamerClient::isAnyElementSent(latestVideo_)) { // EOF detected and an some video element was sent, close group - ObjectHeader objHeaderEndOfGroup = { - sub.second.trackAlias, - latestVideo_.group, - 0, // subgroup per group - latestVideo_.object++, - VIDEO_STREAM_PRIORITY, - ForwardPreference::Subgroup, - ObjectStatus::END_OF_GROUP}; - - XLOG(DBG1) << "Closing group because EOF. objHeader: " - << objHeaderEndOfGroup; moqClient_.getEventBase()->runInEventBaseThread( - [this, - objHeaderEndOfGroup, - subscribeID(sub.second.subscribeID)] { - moqClient_.moqSession_->publishStatus( - objHeaderEndOfGroup, subscribeID); + [this, subscribeID(sub.second.subscribeID)] { + latestVideo_.object++, + XLOG(DBG1) << "Closing group because EOF. objId=" + << latestVideo_.object; + if (videoSgPub_) { + videoSgPub_->endOfGroup(latestVideo_.object); + videoSgPub_.reset(); + } }); } if (item->type == FlvSequentialReader::MediaType::VIDEO) { // Video - if (item->isIdr && - MoQFlvStreamerClient::isAnyElementSent(latestVideo_)) { - // Close group - ObjectHeader objHeaderEndOfGroup = { - sub.second.trackAlias, - latestVideo_.group, - 0, // subgroup per group - latestVideo_.object++, - VIDEO_STREAM_PRIORITY, - ForwardPreference::Subgroup, - ObjectStatus::END_OF_GROUP}; - - XLOG(DBG1) << "Closing group because IDR. objHeader: " - << objHeaderEndOfGroup; - moqClient_.getEventBase()->runInEventBaseThread( - [this, - objHeaderEndOfGroup, - subscribeID(sub.second.subscribeID)] { - moqClient_.moqSession_->publishStatus( - objHeaderEndOfGroup, subscribeID); - }); - - // Start new group - latestVideo_.group++; - latestVideo_.object = 0; - } moqClient_.getEventBase()->runInEventBaseThread( - [this, - item, - latestVideo(latestVideo_), - trackAlias(sub.second.trackAlias), - subscribeID(sub.second.subscribeID)] { - auto objPayload = encodeToMoQMi(item); - if (!objPayload) { - XLOG(ERR) << "Failed to encode video frame"; - } else { - ObjectHeader objHeader = ObjectHeader{ - trackAlias, - latestVideo.group, - 0, // subgroup per group - latestVideo.object, - VIDEO_STREAM_PRIORITY, - ForwardPreference::Subgroup, - ObjectStatus::NORMAL, - objPayload->computeChainDataLength()}; - - XLOG(DBG1) << "Sending video frame" << objHeader - << ", payload size: " - << objPayload->computeChainDataLength(); - - moqClient_.moqSession_->publish( - objHeader, subscribeID, 0, std::move(objPayload), true); - } + [this, item, subscribeID(sub.second.subscribeID)] { + publishVideoItem(subscribeID, std::move(item)); }); - - latestVideo_.object++; } } if (sub.second.fullTrackName == fullAudioTrackName_ && - item->type == FlvSequentialReader::MediaType::AUDIO) { + item->type == FlvSequentialReader::MediaType::AUDIO && audioPub_) { // Audio if (item->data) { // Send audio data in a thread (stream per object) @@ -214,8 +154,7 @@ class MoQFlvStreamerClient { XLOG(DBG1) << "Sending audio frame" << objHeader << ", payload size: " << objPayload->computeChainDataLength(); - moqClient_.moqSession_->publishStreamPerObject( - objHeader, subscribeID, 0, std::move(objPayload), true); + audioPub_->objectStream(objHeader, std::move(objPayload)); } }); } @@ -229,6 +168,45 @@ class MoQFlvStreamerClient { co_return; } + void publishVideoItem( + SubscribeID subscribeID, + std::shared_ptr item) { + if (item->isIdr && MoQFlvStreamerClient::isAnyElementSent(latestVideo_) && + videoSgPub_) { + // Close group + latestVideo_.object++, + XLOG(DBG1) << "Closing group because IDR. objHeader: " + << latestVideo_.object; + videoSgPub_->endOfGroup(latestVideo_.object); + videoSgPub_.reset(); + + // Start new group + latestVideo_.group++; + latestVideo_.object = 0; + } + if (!videoSgPub_) { + auto res = videoPub_->beginSubgroup( + latestVideo_.group, 0, VIDEO_STREAM_PRIORITY); + if (!res) { + XLOG(FATAL) << "Error creating subgroup"; + } + videoSgPub_ = std::move(res.value()); + } + auto objPayload = encodeToMoQMi(item); + if (!objPayload) { + XLOG(ERR) << "Failed to encode video frame"; + } else { + XLOG(DBG1) << "Sending video frame={" << latestVideo_.group << "," + << latestVideo_.object + << "}, payload size: " << objPayload->computeChainDataLength(); + videoSgPub_->object( + latestVideo_.object, + std::move(objPayload), + /*finSubgroup=*/false); + } + latestVideo_.object++; + } + folly::coro::Task controlReadLoop() { class ControlVisitor : public MoQSession::ControlVisitor { public: @@ -245,8 +223,10 @@ class MoQFlvStreamerClient { XLOG(INFO) << "SubscribeRequest"; AbsoluteLocation latest_; // Track not available + bool isAudio = true; if (subscribeReq.fullTrackName == client_.fullVideoTrackName_) { latest_ = client_.latestVideo_; + isAudio = false; } else if (subscribeReq.fullTrackName == client_.fullAudioTrackName_) { latest_ = client_.latestAudio_; } else { @@ -266,13 +246,18 @@ class MoQFlvStreamerClient { client_.subscriptions_[subscribeReq.subscribeID.value] = subscribeReq; XLOG(INFO) << "Subscribed " << subscribeReq.subscribeID; - client_.moqClient_.moqSession_->subscribeOk( + auto trackPub = client_.moqClient_.moqSession_->subscribeOk( {subscribeReq.subscribeID, std::chrono::milliseconds(0), MoQSession::resolveGroupOrder( GroupOrder::OldestFirst, subscribeReq.groupOrder), latest_, {}}); + if (isAudio) { + client_.audioPub_ = std::move(trackPub); + } else { + client_.videoPub_ = std::move(trackPub); + } return; } @@ -360,6 +345,9 @@ class MoQFlvStreamerClient { AbsoluteLocation latestAudio_{0, 0}; std::map subscriptions_; + std::shared_ptr audioPub_; + std::shared_ptr videoPub_; + std::shared_ptr videoSgPub_; }; } // namespace diff --git a/moxygen/samples/text-client/MoQTextClient.cpp b/moxygen/samples/text-client/MoQTextClient.cpp index b171aa9..57042f7 100644 --- a/moxygen/samples/text-client/MoQTextClient.cpp +++ b/moxygen/samples/text-client/MoQTextClient.cpp @@ -195,7 +195,7 @@ class MoQTextClient { XLOG(INFO) << "SubscribeDone"; } - virtual void operator()(Goaway) const override { + void operator()(Goaway) const override { XLOG(INFO) << "Goaway"; client_.moqClient_.moqSession_->unsubscribe({client_.subscribeID_}); } diff --git a/moxygen/test/MoQSessionTest.cpp b/moxygen/test/MoQSessionTest.cpp index a5e3def..fceb755 100644 --- a/moxygen/test/MoQSessionTest.cpp +++ b/moxygen/test/MoQSessionTest.cpp @@ -234,26 +234,18 @@ TEST_F(MoQSessionTest, Fetch) { EXPECT_EQ( fetch.fullTrackName, FullTrackName({TrackNamespace{{"foo"}}, "bar"})); - serverSession_->fetchOk( + auto fetchPub = serverSession_->fetchOk( {fetch.subscribeID, GroupOrder::OldestFirst, /*endOfTrack=*/0, AbsoluteLocation{100, 100}, {}}); - serverSession_->publish( - {fetch.subscribeID, - fetch.start.group, - /*subgroup=*/0, - fetch.start.object, - /*priority=*/0, - ForwardPreference::Fetch, - ObjectStatus::NORMAL, - 100}, - fetch.subscribeID, - 0, + fetchPub->object( + fetch.start.group, + /*subgroupID=*/0, + fetch.start.object, moxygen::test::makeBuf(100), - true); - serverSession_->closeFetchStream(fetch.subscribeID); + /*finFetch=*/true); })); f(clientSession_).scheduleOn(&eventBase_).start(); eventBase_.loop(); @@ -261,8 +253,10 @@ TEST_F(MoQSessionTest, Fetch) { TEST_F(MoQSessionTest, FetchCleanupFromStreamFin) { setupMoQSession(); + std::shared_ptr fetchPub; auto f = [](std::shared_ptr session, - std::shared_ptr serverSession) mutable + std::shared_ptr serverSession, + std::shared_ptr& fetchPub) mutable -> folly::coro::Task { auto handle = co_await session->fetch( {SubscribeID(0), @@ -274,20 +268,13 @@ TEST_F(MoQSessionTest, FetchCleanupFromStreamFin) { {}}); EXPECT_TRUE(handle.hasValue()); // publish here now we know FETCH_OK has been received at client - serverSession->publish( - {handle.value()->subscribeID(), - /*group=*/0, - /*subgroup=*/0, - /*object=*/0, - /*priority=*/0, - ForwardPreference::Fetch, - ObjectStatus::NORMAL, - 100}, - handle.value()->subscribeID(), - 0, + XCHECK(fetchPub); + fetchPub->object( + /*groupID=*/0, + /*subgroupID=*/0, + /*objectID=*/0, moxygen::test::makeBuf(100), - true); - serverSession->closeFetchStream(handle.value()->subscribeID()); + /*finFetch=*/true); auto obj = co_await handle.value()->objects().next(); EXPECT_NE(obj.value(), nullptr); @@ -301,18 +288,18 @@ TEST_F(MoQSessionTest, FetchCleanupFromStreamFin) { session->close(); }; EXPECT_CALL(serverControl, onFetch(testing::_)) - .WillOnce(testing::Invoke([this](Fetch fetch) { + .WillOnce(testing::Invoke([this, &fetchPub](Fetch fetch) { EXPECT_EQ( fetch.fullTrackName, FullTrackName({TrackNamespace{{"foo"}}, "bar"})); - serverSession_->fetchOk( + fetchPub = serverSession_->fetchOk( {fetch.subscribeID, GroupOrder::OldestFirst, /*endOfTrack=*/0, AbsoluteLocation{100, 100}, {}}); })); - f(clientSession_, serverSession_).scheduleOn(&eventBase_).start(); + f(clientSession_, serverSession_, fetchPub).scheduleOn(&eventBase_).start(); eventBase_.loop(); } @@ -340,8 +327,10 @@ TEST_F(MoQSessionTest, FetchError) { TEST_F(MoQSessionTest, FetchCancel) { setupMoQSession(); + std::shared_ptr fetchPub; auto f = [](std::shared_ptr clientSession, - std::shared_ptr serverSession) mutable + std::shared_ptr serverSession, + std::shared_ptr& fetchPub) mutable -> folly::coro::Task { auto handle = co_await clientSession->fetch( {SubscribeID(0), @@ -357,50 +346,37 @@ TEST_F(MoQSessionTest, FetchCancel) { co_await folly::coro::co_reschedule_on_current_executor; co_await folly::coro::co_reschedule_on_current_executor; co_await folly::coro::co_reschedule_on_current_executor; - auto res = co_await folly::coro::co_awaitTry(serverSession->publish( - {subscribeID, - /*group=*/0, - /*subgroup=*/0, - /*object=*/1, - /*priority=*/0, - ForwardPreference::Fetch, - ObjectStatus::NORMAL, - 100}, - subscribeID, - 0, + XCHECK(fetchPub); + auto res = fetchPub->object( + /*groupID=*/0, + /*subgroupID=*/0, + /*objectID=*/1, moxygen::test::makeBuf(100), - true)); + /*finFetch=*/true); // publish after fetchCancel fails - EXPECT_TRUE(res.hasException()); + EXPECT_TRUE(res.hasError()); clientSession->close(); }; EXPECT_CALL(serverControl, onFetch(testing::_)) - .WillOnce(testing::Invoke([this](Fetch fetch) { + .WillOnce(testing::Invoke([this, &fetchPub](Fetch fetch) { EXPECT_EQ( fetch.fullTrackName, FullTrackName({TrackNamespace{{"foo"}}, "bar"})); - serverSession_->fetchOk( + fetchPub = serverSession_->fetchOk( {fetch.subscribeID, GroupOrder::OldestFirst, /*endOfTrack=*/0, AbsoluteLocation{100, 100}, {}}); - serverSession_->publish( - {fetch.subscribeID, - fetch.start.group, - /*subgroup=*/0, - fetch.start.object, - /*priority=*/0, - ForwardPreference::Fetch, - ObjectStatus::NORMAL, - 100}, - fetch.subscribeID, - 0, + fetchPub->object( + fetch.start.group, + /*subgroupID=*/0, + fetch.start.object, moxygen::test::makeBuf(100), true); // published 1 object })); - f(clientSession_, serverSession_).scheduleOn(&eventBase_).start(); + f(clientSession_, serverSession_, fetchPub).scheduleOn(&eventBase_).start(); eventBase_.loop(); } @@ -450,47 +426,37 @@ TEST_F(MoQSessionTest, FetchBadLength) { AbsoluteLocation{0, 0}, AbsoluteLocation{0, 1}, {}}); - // onSessionEnd races fetchOk, and the client gets a fetchError - EXPECT_FALSE(handle.hasValue()); + EXPECT_TRUE(handle.hasValue()); + // FETCH_OK comes but the FETCH stream is reset and we timeout waiting + // for a new object. + EXPECT_THROW( + co_await handle.value()->objects().next(), folly::FutureTimeout); + session->close(); }; EXPECT_CALL(serverControl, onFetch(testing::_)) .WillOnce(testing::Invoke([this](Fetch fetch) { EXPECT_EQ( fetch.fullTrackName, FullTrackName({TrackNamespace{{"foo"}}, "bar"})); - serverSession_->fetchOk( + auto fetchPub = serverSession_->fetchOk( {fetch.subscribeID, GroupOrder::OldestFirst, /*endOfTrack=*/0, AbsoluteLocation{100, 100}, {}}); - serverSession_->publish( - {fetch.subscribeID, - fetch.start.group, - /*subgroup=*/0, - fetch.start.object, - /*priority=*/0, - ForwardPreference::Fetch, - ObjectStatus::NORMAL, - 100}, - fetch.subscribeID, - 0, - moxygen::test::makeBuf(10), - false); - auto res = serverSession_->closeFetchStream(fetch.subscribeID); - EXPECT_TRUE(res.hasException()); + auto objPub = fetchPub->beginObject( + fetch.start.group, + /*subgroupID=*/0, + fetch.start.object, + 100, + moxygen::test::makeBuf(10)); + fetchPub->endOfFetch(); // this should close the session too })); f(clientSession_).scheduleOn(&eventBase_).start(); eventBase_.loop(); } -TEST_F(MoQSessionTest, BadFetchClose) { - setupMoQSession(); - auto res = clientSession_->closeFetchStream(SubscribeID(1000)); - EXPECT_TRUE(res.hasException()); -} - TEST_F(MoQSessionTest, FetchOverLimit) { setupMoQSession(); auto f = [](std::shared_ptr session) mutable @@ -676,13 +642,13 @@ TEST_F(MoQSessionTest, MaxSubscribeID) { {sub.subscribeID, 400, "bad", folly::none}); })) .WillOnce(testing::Invoke([this](auto sub) { - serverSession_->subscribeOk( + auto pub = serverSession_->subscribeOk( {sub.subscribeID, std::chrono::milliseconds(0), GroupOrder::OldestFirst, folly::none, {}}); - serverSession_->subscribeDone( + pub->subscribeDone( {sub.subscribeID, SubscribeDoneStatusCode::TRACK_ENDED, "end of track",