From c274bcbd0f6ae97ae3ad933e3b8071308103104e Mon Sep 17 00:00:00 2001 From: Alan Frindell Date: Thu, 12 Dec 2024 16:43:37 -0800 Subject: [PATCH] FETCH in date and text (#10) Summary: Replace SUBSCRIBE in the past in MoQDateServer with FETCH. Change MoQTextClient so that it will determine the live head and FETCH for any past data requested on the command line. Pull Request resolved: https://github.com/facebookexperimental/moxygen/pull/10 Reviewed By: sharmafb Differential Revision: D65532689 fbshipit-source-id: 5a27cd22ec665e7f1e3964b0270f806acf3bfaf7 --- moxygen/samples/date/MoQDateServer.cpp | 147 +++++++++++++---- moxygen/samples/text-client/MoQTextClient.cpp | 151 ++++++++++++------ 2 files changed, 223 insertions(+), 75 deletions(-) diff --git a/moxygen/samples/date/MoQDateServer.cpp b/moxygen/samples/date/MoQDateServer.cpp index 7e36771..3bcf7c0 100644 --- a/moxygen/samples/date/MoQDateServer.cpp +++ b/moxygen/samples/date/MoQDateServer.cpp @@ -87,6 +87,18 @@ class MoQDateServer : MoQServer { server_.unsubscribe(clientSession_, std::move(unsubscribe)); } + void operator()(Fetch fetch) const override { + XLOG(INFO) << "Fetch track ns=" << fetch.fullTrackName.trackNamespace + << " name=" << fetch.fullTrackName.trackName + << " subscribe id=" << fetch.subscribeID; + if (fetch.fullTrackName != server_.dateTrackName()) { + clientSession_->fetchError( + {fetch.subscribeID, 403, "unexpected fetch"}); + } else { + server_.onFetch(std::move(fetch), clientSession_); + } + } + void operator()(Goaway) const override { XLOG(INFO) << "Goaway"; } @@ -109,6 +121,11 @@ class MoQDateServer : MoQServer { AbsoluteLocation nowLoc( {uint64_t(in_time_t / 60), uint64_t(in_time_t % 60) + 1}); auto range = toSubscribeRange(subReq, nowLoc); + if (range.start < nowLoc) { + clientSession->subscribeError( + {subReq.subscribeID, 400, "start in the past, use FETCH"}); + return; + } clientSession->subscribeOk( {subReq.subscribeID, std::chrono::milliseconds(0), @@ -116,38 +133,84 @@ class MoQDateServer : MoQServer { GroupOrder::OldestFirst, subReq.groupOrder), nowLoc}); - bool done = catchup( - clientSession, subReq.subscribeID, subReq.trackAlias, range, nowLoc); - if (!done) { - forwarder_.setLatest(nowLoc); - forwarder_.addSubscriber( - {std::move(clientSession), - subReq.subscribeID, - subReq.trackAlias, - range}); + forwarder_.setLatest(nowLoc); + forwarder_.addSubscriber( + {std::move(clientSession), + subReq.subscribeID, + subReq.trackAlias, + range}); + } + + void onFetch(Fetch fetch, std::shared_ptr clientSession) { + if (fetch.end < fetch.start) { + clientSession->fetchError({fetch.subscribeID, 400, "No objects"}); + return; } + auto now = std::chrono::system_clock::now(); + auto in_time_t = std::chrono::system_clock::to_time_t(now); + AbsoluteLocation nowLoc( + {uint64_t(in_time_t / 60), uint64_t(in_time_t % 60) + 1}); + auto range = toSubscribeRange( + fetch.start, fetch.end, LocationType::AbsoluteRange, nowLoc); + if (range.start > nowLoc) { + clientSession->fetchError( + {fetch.subscribeID, 400, "fetch starts in future"}); + return; + } + range.end = std::min(range.end, nowLoc); + clientSession->fetchOk( + {fetch.subscribeID, + MoQSession::resolveGroupOrder( + GroupOrder::OldestFirst, fetch.groupOrder), + 0, // not end of track + nowLoc, + {}}); + + catchup(clientSession, fetch.subscribeID, range, nowLoc); } bool onSubscribeUpdate(const SubscribeUpdate& subscribeUpdate) { return forwarder_.updateSubscriber(subscribeUpdate); } - bool catchup( + void catchup( std::shared_ptr clientSession, SubscribeID subscribeID, - TrackAlias trackAlias, SubscribeRange range, AbsoluteLocation now) { - if (range.start >= now) { - return false; + if (range.start.object > 61) { + XLOG(ERR) << "Invalid start object"; + return; } - MoQForwarder forwarder(dateTrackName()); - forwarder.addSubscriber({clientSession, subscribeID, trackAlias, range}); time_t t = range.start.group * 60 + std::max(range.start.object, (uint64_t)1) - 1; + auto pubFn = [clientSession, subscribeID]( + ObjectHeader objHdr, + std::unique_ptr payload, + uint64_t payloadOffset, + bool eom, + bool) { + objHdr.trackIdentifier = subscribeID; + if (objHdr.status == ObjectStatus::NORMAL) { + XLOG(DBG1) << "Publish normal object trackIdentifier=" + << std::get(objHdr.trackIdentifier); + clientSession + ->publish( + objHdr, subscribeID, payloadOffset, std::move(payload), eom) + .via(clientSession->getEventBase()); + } else { + clientSession->publishStatus(objHdr, subscribeID); + } + }; while (range.start < now && range.start < range.end) { - auto n = - publishDate(forwarder, t, false, subscribeID, trackAlias, range.end); + auto n = publishDate( + pubFn, + t, + false, + subscribeID, + TrackAlias(0), + ForwardPreference::Fetch, + range.end); t++; // publishDate publishes two objects for obj = 0 range.start.object += n; @@ -156,18 +219,38 @@ class MoQDateServer : MoQServer { range.start.object = 0; } } - return forwarder.empty(); + // TODO - empty range may log an error? + clientSession->closeFetchStream(subscribeID); } folly::coro::Task publishDateLoop() { bool first = false; auto cancelToken = co_await folly::coro::co_current_cancellation_token; + auto pubFn = [this]( + ObjectHeader objHdr, + std::unique_ptr payload, + uint64_t payloadOffset, + bool eom, + bool streamPerObject) { + forwarder_.publish( + std::move(objHdr), + std::move(payload), + payloadOffset, + eom, + streamPerObject); + }; while (!cancelToken.isCancellationRequested()) { if (!forwarder_.empty()) { auto now = std::chrono::system_clock::now(); auto in_time_t = std::chrono::system_clock::to_time_t(now); - - publishDate(forwarder_, in_time_t, first, 0, 0, folly::none); + publishDate( + pubFn, + in_time_t, + first, + 0, + 0, + ForwardPreference::Subgroup, + folly::none); first = false; } co_await folly::coro::sleep(std::chrono::seconds(1)); @@ -175,11 +258,17 @@ class MoQDateServer : MoQServer { } size_t publishDate( - MoQForwarder& forwarder, + const std::function, + uint64_t, + bool, + bool)>& publishFn, time_t in_time_t, bool forceGroup, - SubscribeID subscribeID, + SubscribeID, TrackAlias trackAlias, + ForwardPreference pref, folly::Optional end) { size_t objectsPublished = 0; struct tm local_tm; @@ -196,15 +285,15 @@ class MoQDateServer : MoQServer { /*subgroup=*/0, /*object=*/0, /*priority*/ 0, - ForwardPreference::Subgroup, + pref, ObjectStatus::NORMAL, folly::none}); - forwarder.publish( + publishFn( objHeader, folly::IOBuf::copyBuffer(ss.str()), 0, true, - streamPerObject_); + !end && streamPerObject_); objectsPublished++; } if (!end || nowLoc < *end) { @@ -216,20 +305,20 @@ class MoQDateServer : MoQServer { subgroup, nowLoc.object, /*priority=*/0, - ForwardPreference::Subgroup, + pref, ObjectStatus::NORMAL, folly::none}); - forwarder.publish( + publishFn( objHeader, folly::IOBuf::copyBuffer(secBuf), 0, true, - streamPerObject_ && nowLoc.object < 60); + !end && streamPerObject_ && nowLoc.object < 60); objectsPublished++; if (nowLoc.object == 60) { objHeader.status = ObjectStatus::END_OF_GROUP; objHeader.id++; - forwarder.publish(std::move(objHeader), nullptr); + publishFn(std::move(objHeader), nullptr, 0, true, false); } } return objectsPublished; diff --git a/moxygen/samples/text-client/MoQTextClient.cpp b/moxygen/samples/text-client/MoQTextClient.cpp index bab245c..b171aa9 100644 --- a/moxygen/samples/text-client/MoQTextClient.cpp +++ b/moxygen/samples/text-client/MoQTextClient.cpp @@ -6,6 +6,7 @@ #include #include "moxygen/MoQClient.h" +#include "moxygen/MoQLocation.h" #include #include @@ -22,10 +23,49 @@ DEFINE_string(eo, "", "End object, leave blank for entire group"); DEFINE_int32(connect_timeout, 1000, "Connect timeout (ms)"); DEFINE_int32(transaction_timeout, 120, "Transaction timeout (s)"); DEFINE_bool(quic_transport, false, "Use raw QUIC transport"); +DEFINE_bool(fetch, false, "Use fetch rather than subscribe"); namespace { using namespace moxygen; +struct SubParams { + LocationType locType; + folly::Optional start; + folly::Optional end; +}; + +SubParams flags2params() { + SubParams result; + std::string soStr(FLAGS_so); + if (FLAGS_sg.empty()) { + if (soStr.empty()) { + result.locType = LocationType::LatestObject; + return result; + } else if (auto so = folly::to(soStr) > 0) { + XLOG(ERR) << "Invalid: sg blank, so=" << so; + exit(1); + } else { + result.locType = LocationType::LatestGroup; + return result; + } + } else if (soStr.empty()) { + soStr = std::string("0"); + } + result.start.emplace( + folly::to(FLAGS_sg), folly::to(soStr)); + if (FLAGS_eg.empty()) { + result.locType = LocationType::AbsoluteStart; + return result; + } else { + result.locType = LocationType::AbsoluteRange; + result.end.emplace( + folly::to(FLAGS_eg), + (FLAGS_eo.empty() ? 0 : folly::to(FLAGS_eo) + 1)); + return result; + } + return result; +} + class MoQTextClient { public: MoQTextClient(folly::EventBase* evb, proxygen::URL url, FullTrackName ftn) @@ -48,15 +88,79 @@ class MoQTextClient { auto exec = co_await folly::coro::co_current_executor; controlReadLoop().scheduleOn(exec).start(); - auto track = co_await moqClient_.moqSession_->subscribe(std::move(sub)); + SubParams subParams{sub.locType, sub.start, sub.end}; + sub.locType = LocationType::LatestObject; + sub.start = folly::none; + sub.end = folly::none; + auto track = co_await moqClient_.moqSession_->subscribe(sub); if (track.hasValue()) { subscribeID_ = track.value()->subscribeID(); + XLOG(DBG1) << "subscribeID=" << subscribeID_; + auto latest = track.value()->latest(); + if (latest) { + XLOG(INFO) << "Latest={" << latest->group << ", " << latest->object + << "}"; + } + if (subParams.start && latest) { + // There was a specific start and the track has started + auto range = toSubscribeRange( + subParams.start, subParams.end, subParams.locType, *latest); + if (range.start <= *latest) { + AbsoluteLocation fetchEnd = *latest; + // The start was before latest, need to FETCH + if (range.end < *latest) { + // The end is before latest, UNSUBSCRIBE + XLOG(DBG1) << "end={" << range.end.group << "," + << range.end.object << "} before latest, unsubscribe"; + moqClient_.moqSession_->unsubscribe({subscribeID_}); + fetchEnd = range.end; + if (fetchEnd.object == 0) { + fetchEnd.group--; + } + } + if (FLAGS_fetch) { + XLOG(DBG1) << "FETCH start={" << range.start.group << "," + << range.start.object << "} end={" << fetchEnd.group + << "," << fetchEnd.object << "}"; + auto fetchTrack = co_await moqClient_.moqSession_->fetch( + {SubscribeID(0), + sub.fullTrackName, + sub.priority, + sub.groupOrder, + range.start, + fetchEnd, + {}}); + if (fetchTrack.hasError()) { + XLOG(ERR) << "Fetch failed err=" << fetchTrack.error().errorCode + << " reason=" << fetchTrack.error().reasonPhrase; + } else { + XLOG(DBG1) << "subscribeID=" + << fetchTrack.value()->subscribeID(); + readTrack(std::move(fetchTrack.value())) + .scheduleOn(exec) + .start(); + } + } + } // else we started from current or no content - nothing to FETCH + if (subParams.end && (!latest || range.end > *latest)) { + // The end is set but after latest, SUBSCRIBE_UPDATE for the end + XLOG(DBG1) << "Setting subscribe end={" << range.end.group << "," + << range.end.object << "} before latest, update"; + moqClient_.moqSession_->subscribeUpdate( + {subscribeID_, + latest.value_or(AbsoluteLocation{0, 0}), + range.end, + sub.priority, + sub.params}); + } + } co_await readTrack(std::move(track.value())); } else { XLOG(INFO) << "SubscribeError id=" << track.error().subscribeID << " code=" << track.error().errorCode << " reason=" << track.error().reasonPhrase; } + moqClient_.moqSession_->drain(); } catch (const std::exception& ex) { XLOG(ERR) << ex.what(); co_return; @@ -89,7 +193,6 @@ class MoQTextClient { void operator()(SubscribeDone) const override { XLOG(INFO) << "SubscribeDone"; - client_.moqClient_.moqSession_->close(); } virtual void operator()(Goaway) const override { @@ -114,10 +217,6 @@ class MoQTextClient { folly::coro::Task readTrack( std::shared_ptr track) { XLOG(INFO) << __func__; - if (auto latest = track->latest()) { - XLOG(INFO) << "Latest={" << latest->group << ", " << latest->object - << "}"; - } auto g = folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; }); // TODO: check track.value()->getCancelToken() @@ -135,46 +234,6 @@ class MoQTextClient { } // namespace using namespace moxygen; -namespace { - -struct SubParams { - LocationType locType; - folly::Optional start; - folly::Optional end; -}; - -SubParams flags2params() { - SubParams result; - std::string soStr(FLAGS_so); - if (FLAGS_sg.empty()) { - if (soStr.empty()) { - result.locType = LocationType::LatestObject; - return result; - } else if (auto so = folly::to(soStr) > 0) { - XLOG(ERR) << "Invalid: sg blank, so=" << so; - exit(1); - } else { - result.locType = LocationType::LatestGroup; - return result; - } - } else if (soStr.empty()) { - soStr = std::string("0"); - } - result.start.emplace( - folly::to(FLAGS_sg), folly::to(soStr)); - if (FLAGS_eg.empty()) { - result.locType = LocationType::AbsoluteStart; - return result; - } else { - result.locType = LocationType::AbsoluteRange; - result.end.emplace( - folly::to(FLAGS_eg), - (FLAGS_eo.empty() ? 0 : folly::to(FLAGS_eo) + 1)); - return result; - } - return result; -} -} // namespace int main(int argc, char* argv[]) { folly::Init init(&argc, &argv, false);