Skip to content

Commit

Permalink
Add MoQSession::drain API (#9)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #9

This will close the session when all fetches and subscribes are complete.  This required changing the objects() and payload() read loops a bit.  Since the sources are shared pointers, the callees can continue reading from them after the session is gone.  We use try_dequeue() to drain the queue after the session's token has been cancelled.

Reviewed By: NEUDitao

Differential Revision: D65532678

fbshipit-source-id: e7bf4023cd0cf61bb77910da92523133c41d86c1
  • Loading branch information
afrind authored and facebook-github-bot committed Dec 10, 2024
1 parent 656ad23 commit 26e5bad
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
35 changes: 32 additions & 3 deletions moxygen/MoQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ void MoQSession::start() {
}
}

void MoQSession::drain() {
XLOG(DBG1) << __func__ << " sess=" << this;
draining_ = true;
checkForCloseOnDrain();
}

void MoQSession::checkForCloseOnDrain() {
if (draining_ && fetches_.empty() && subTracks_.empty()) {
close();
}
}

void MoQSession::close(folly::Optional<SessionCloseErrorCode> error) {
XLOG(DBG1) << __func__ << " sess=" << this;
if (wt_) {
Expand Down Expand Up @@ -274,6 +286,7 @@ folly::coro::Task<void> MoQSession::readLoop(
track->setAllDataReceived();
if (track->fetchOkReceived()) {
fetches_.erase(*subscribeID);
checkForCloseOnDrain();
}
}
}
Expand Down Expand Up @@ -448,6 +461,7 @@ void MoQSession::onSubscribeError(SubscribeError subErr) {
subTracks_[trackAliasIt->second]->subscribeError(std::move(subErr));
subTracks_.erase(trackAliasIt->second);
subIdToTrackAlias_.erase(trackAliasIt);
checkForCloseOnDrain();
}

void MoQSession::onSubscribeDone(SubscribeDone subscribeDone) {
Expand All @@ -461,7 +475,13 @@ void MoQSession::onSubscribeDone(SubscribeDone subscribeDone) {
}

// TODO: handle final object and status code
// TODO: there could still be objects in flight. Removing from maps now
// will prevent their delivery. I think the only way to handle this is with
// timeouts.
subTracks_[trackAliasIt->second]->fin();
subTracks_.erase(trackAliasIt->second);
subIdToTrackAlias_.erase(trackAliasIt);
checkForCloseOnDrain();
controlMessages_.enqueue(std::move(subscribeDone));
}

Expand Down Expand Up @@ -545,6 +565,7 @@ void MoQSession::onFetchError(FetchError fetchError) {
}
fetchIt->second->fetchError(fetchError);
fetches_.erase(fetchIt);
checkForCloseOnDrain();
}

void MoQSession::onAnnounce(Announce ann) {
Expand Down Expand Up @@ -746,9 +767,15 @@ MoQSession::TrackHandle::objects() {
folly::makeGuard([func = __func__] { XLOG(DBG1) << "exit " << func; });
auto cancelToken = co_await folly::coro::co_current_cancellation_token;
auto mergeToken = folly::CancellationToken::merge(cancelToken, cancelToken_);
while (!mergeToken.isCancellationRequested()) {
auto obj = co_await folly::coro::co_withCancellation(
mergeToken, newObjects_.dequeue());
while (!cancelToken.isCancellationRequested()) {
auto optionalObj = newObjects_.try_dequeue();
std::shared_ptr<ObjectSource> obj;
if (optionalObj) {
obj = *optionalObj;
} else {
obj = co_await folly::coro::co_withCancellation(
mergeToken, newObjects_.dequeue());
}
if (!obj) {
XLOG(DBG3) << "Out of objects for trackHandle=" << this
<< " id=" << subscribeID_;
Expand Down Expand Up @@ -825,6 +852,8 @@ void MoQSession::unsubscribe(Unsubscribe unsubscribe) {
XLOG(ERR) << "writeUnsubscribe failed sess=" << this;
return;
}
// we rely on receiving subscribeDone after unsubscribe to remove from
// subTracks_
controlWriteEvent_.signal();
}

Expand Down
22 changes: 18 additions & 4 deletions moxygen/MoQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class MoQSession : public MoQControlCodec::ControlCallback,
~MoQSession() override;

void start();
void drain();
void close(folly::Optional<SessionCloseErrorCode> error = folly::none);

folly::coro::Task<ServerSetup> setup(ClientSetup setup);
Expand Down Expand Up @@ -272,14 +273,25 @@ class MoQSession : public MoQControlCodec::ControlCallback,
co_return nullptr;
}
folly::IOBufQueue payloadBuf{folly::IOBufQueue::cacheChainLength()};
while (true) {
auto buf = co_await folly::coro::co_withCancellation(
cancelToken, payloadQueue.dequeue());
auto curCancelToken =
co_await folly::coro::co_current_cancellation_token;
auto mergeToken =
folly::CancellationToken::merge(curCancelToken, cancelToken);
while (!curCancelToken.isCancellationRequested()) {
std::unique_ptr<folly::IOBuf> buf;
auto optionalBuf = payloadQueue.try_dequeue();
if (optionalBuf) {
buf = std::move(*optionalBuf);
} else {
buf = co_await folly::coro::co_withCancellation(
cancelToken, payloadQueue.dequeue());
}
if (!buf) {
co_return payloadBuf.move();
break;
}
payloadBuf.append(std::move(buf));
}
co_return payloadBuf.move();
}
};

Expand Down Expand Up @@ -431,6 +443,7 @@ class MoQSession : public MoQControlCodec::ControlCallback,
void onTrackStatus(TrackStatus trackStatus) override;
void onGoaway(Goaway goaway) override;
void onConnectionError(ErrorCode error) override;
void checkForCloseOnDrain();

folly::SemiFuture<folly::Unit> publishImpl(
const ObjectHeader& objHeader,
Expand Down Expand Up @@ -560,6 +573,7 @@ class MoQSession : public MoQControlCodec::ControlCallback,
folly::coro::Promise<ServerSetup> setupPromise_;
folly::coro::Future<ServerSetup> setupFuture_;
bool setupComplete_{false};
bool draining_{false};
folly::CancellationSource cancellationSource_;

// SubscribeID must be a unique monotonically increasing number that is
Expand Down

0 comments on commit 26e5bad

Please sign in to comment.