diff --git a/moxygen/MoQSession.cpp b/moxygen/MoQSession.cpp index 73bb6b3..a995a20 100644 --- a/moxygen/MoQSession.cpp +++ b/moxygen/MoQSession.cpp @@ -65,6 +65,18 @@ void MoQSession::start() { } } +void MoQSession::drain() { + XLOG(DBG1) << __func__ << " sess=" << this; + draining_ = true; + checkForCloseOnDrain(); +} + +void MoQSession::checkForCloseOnDrain() { + if (draining_ && fetches_.empty() && subTracks_.empty()) { + close(); + } +} + void MoQSession::close(folly::Optional error) { XLOG(DBG1) << __func__ << " sess=" << this; if (wt_) { @@ -274,6 +286,7 @@ folly::coro::Task MoQSession::readLoop( track->setAllDataReceived(); if (track->fetchOkReceived()) { fetches_.erase(*subscribeID); + checkForCloseOnDrain(); } } } @@ -448,6 +461,7 @@ void MoQSession::onSubscribeError(SubscribeError subErr) { subTracks_[trackAliasIt->second]->subscribeError(std::move(subErr)); subTracks_.erase(trackAliasIt->second); subIdToTrackAlias_.erase(trackAliasIt); + checkForCloseOnDrain(); } void MoQSession::onSubscribeDone(SubscribeDone subscribeDone) { @@ -461,7 +475,13 @@ void MoQSession::onSubscribeDone(SubscribeDone subscribeDone) { } // TODO: handle final object and status code + // TODO: there could still be objects in flight. Removing from maps now + // will prevent their delivery. I think the only way to handle this is with + // timeouts. subTracks_[trackAliasIt->second]->fin(); + subTracks_.erase(trackAliasIt->second); + subIdToTrackAlias_.erase(trackAliasIt); + checkForCloseOnDrain(); controlMessages_.enqueue(std::move(subscribeDone)); } @@ -545,6 +565,7 @@ void MoQSession::onFetchError(FetchError fetchError) { } fetchIt->second->fetchError(fetchError); fetches_.erase(fetchIt); + checkForCloseOnDrain(); } void MoQSession::onAnnounce(Announce ann) { @@ -746,9 +767,15 @@ MoQSession::TrackHandle::objects() { folly::makeGuard([func = __func__] { XLOG(DBG1) << "exit " << func; }); auto cancelToken = co_await folly::coro::co_current_cancellation_token; auto mergeToken = folly::CancellationToken::merge(cancelToken, cancelToken_); - while (!mergeToken.isCancellationRequested()) { - auto obj = co_await folly::coro::co_withCancellation( - mergeToken, newObjects_.dequeue()); + while (!cancelToken.isCancellationRequested()) { + auto optionalObj = newObjects_.try_dequeue(); + std::shared_ptr obj; + if (optionalObj) { + obj = *optionalObj; + } else { + obj = co_await folly::coro::co_withCancellation( + mergeToken, newObjects_.dequeue()); + } if (!obj) { XLOG(DBG3) << "Out of objects for trackHandle=" << this << " id=" << subscribeID_; @@ -825,6 +852,8 @@ void MoQSession::unsubscribe(Unsubscribe unsubscribe) { XLOG(ERR) << "writeUnsubscribe failed sess=" << this; return; } + // we rely on receiving subscribeDone after unsubscribe to remove from + // subTracks_ controlWriteEvent_.signal(); } diff --git a/moxygen/MoQSession.h b/moxygen/MoQSession.h index fa235ea..09c2b16 100644 --- a/moxygen/MoQSession.h +++ b/moxygen/MoQSession.h @@ -57,6 +57,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, ~MoQSession() override; void start(); + void drain(); void close(folly::Optional error = folly::none); folly::coro::Task setup(ClientSetup setup); @@ -272,14 +273,25 @@ class MoQSession : public MoQControlCodec::ControlCallback, co_return nullptr; } folly::IOBufQueue payloadBuf{folly::IOBufQueue::cacheChainLength()}; - while (true) { - auto buf = co_await folly::coro::co_withCancellation( - cancelToken, payloadQueue.dequeue()); + auto curCancelToken = + co_await folly::coro::co_current_cancellation_token; + auto mergeToken = + folly::CancellationToken::merge(curCancelToken, cancelToken); + while (!curCancelToken.isCancellationRequested()) { + std::unique_ptr buf; + auto optionalBuf = payloadQueue.try_dequeue(); + if (optionalBuf) { + buf = std::move(*optionalBuf); + } else { + buf = co_await folly::coro::co_withCancellation( + cancelToken, payloadQueue.dequeue()); + } if (!buf) { - co_return payloadBuf.move(); + break; } payloadBuf.append(std::move(buf)); } + co_return payloadBuf.move(); } }; @@ -431,6 +443,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, void onTrackStatus(TrackStatus trackStatus) override; void onGoaway(Goaway goaway) override; void onConnectionError(ErrorCode error) override; + void checkForCloseOnDrain(); folly::SemiFuture publishImpl( const ObjectHeader& objHeader, @@ -560,6 +573,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, folly::coro::Promise setupPromise_; folly::coro::Future setupFuture_; bool setupComplete_{false}; + bool draining_{false}; folly::CancellationSource cancellationSource_; // SubscribeID must be a unique monotonically increasing number that is