diff --git a/moxygen/samples/date/MoQDateServer.cpp b/moxygen/samples/date/MoQDateServer.cpp index 59887be..629e831 100644 --- a/moxygen/samples/date/MoQDateServer.cpp +++ b/moxygen/samples/date/MoQDateServer.cpp @@ -21,15 +21,17 @@ DEFINE_int32(relay_transaction_timeout, 120, "Transaction timeout (s)"); DEFINE_string(cert, "", "Cert path"); DEFINE_string(key, "", "Key path"); DEFINE_int32(port, 9667, "Server Port"); +DEFINE_bool(stream_per_group, false, "Use one stream for each group"); namespace { using namespace moxygen; class MoQDateServer : MoQServer { public: - explicit MoQDateServer(folly::EventBase* evb) + explicit MoQDateServer(folly::EventBase* evb, ForwardPreference pref) : MoQServer(FLAGS_port, FLAGS_cert, FLAGS_key, "/moq-date"), - forwarder_(dateTrackName()) { + forwarder_(dateTrackName()), + pref_(pref) { if (!FLAGS_relay_url.empty()) { proxygen::URL url(FLAGS_relay_url); if (!url.isValid() || !url.hasHost()) { @@ -141,11 +143,13 @@ class MoQDateServer : MoQServer { if (range.start >= now) { return false; } + MoQForwarder forwarder(dateTrackName()); + forwarder.addSubscriber({clientSession, subscribeID, trackAlias, range}); time_t t = range.start.group * 60 + std::max(range.start.object, (uint64_t)1) - 1; while (range.start < now && range.start < range.end) { - auto n = publishDate( - t, false, clientSession, subscribeID, trackAlias, range.end); + auto n = + publishDate(forwarder, t, false, subscribeID, trackAlias, range.end); t++; // publishDate publishes two objects for obj = 0 range.start.object += n; @@ -154,15 +158,7 @@ class MoQDateServer : MoQServer { range.start.object = 0; } } - if (range.end <= now) { - clientSession->subscribeDone( - {subscribeID, - SubscribeDoneStatusCode::SUBSCRIPTION_ENDED, - "", - range.start}); - return true; - } - return false; + return forwarder.empty(); } folly::coro::Task publishDateLoop() { @@ -173,7 +169,7 @@ class MoQDateServer : MoQServer { auto now = std::chrono::system_clock::now(); auto in_time_t = std::chrono::system_clock::to_time_t(now); - publishDate(in_time_t, first, nullptr, 0, 0, folly::none); + publishDate(forwarder_, in_time_t, first, 0, 0, folly::none); first = false; } co_await folly::coro::sleep(std::chrono::seconds(1)); @@ -181,9 +177,9 @@ class MoQDateServer : MoQServer { } size_t publishDate( + MoQForwarder& forwarder, time_t in_time_t, bool forceGroup, - const std::shared_ptr& session, uint64_t subscribeID, uint64_t trackAlias, folly::Optional end) { @@ -197,72 +193,39 @@ class MoQDateServer : MoQServer { {uint64_t(in_time_t / 60), uint64_t(lt->tm_sec + 1)}); if (lt->tm_sec == 0 || forceGroup) { ObjectHeader objHeader( - {0, - 0, + {subscribeID, + trackAlias, nowLoc.group, 0, 0, - ForwardPreference::Object, + pref_, ObjectStatus::NORMAL, folly::none}); - if (session) { - publishObjectToSession( - session, - subscribeID, - trackAlias, - std::move(objHeader), - folly::IOBuf::copyBuffer(ss.str())); - } else { - forwarder_.publish( - std::move(objHeader), folly::IOBuf::copyBuffer(ss.str())); - } + forwarder.publish(objHeader, folly::IOBuf::copyBuffer(ss.str())); objectsPublished++; } if (!end || nowLoc < *end) { auto secBuf = folly::to(lt->tm_sec); ObjectHeader objHeader( - {0, - 0, + {subscribeID, + trackAlias, nowLoc.group, nowLoc.object, 0, - ForwardPreference::Object, + pref_, ObjectStatus::NORMAL, folly::none}); - if (session) { - publishObjectToSession( - session, - subscribeID, - trackAlias, - std::move(objHeader), - folly::IOBuf::copyBuffer(secBuf)); - } else { - forwarder_.publish( - std::move(objHeader), folly::IOBuf::copyBuffer(secBuf)); - } + forwarder.publish(std::move(objHeader), folly::IOBuf::copyBuffer(secBuf)); objectsPublished++; + if (nowLoc.object == 60) { + objHeader.status = ObjectStatus::END_OF_GROUP; + objHeader.id++; + forwarder.publish(std::move(objHeader), nullptr); + } } return objectsPublished; } - void publishObjectToSession( - const std::shared_ptr& session, - uint64_t subscribeID, - uint64_t trackAlias, - ObjectHeader inObjHeader, - std::unique_ptr payload) { - session->getEventBase()->runImmediatelyOrRunInEventBaseThread( - [session, - subscribeID, - trackAlias, - objHeader = std::move(inObjHeader), - buf = payload->clone()]() mutable { - objHeader.subscribeID = subscribeID; - objHeader.trackAlias = trackAlias; - session->publish(std::move(objHeader), 0, std::move(buf), true); - }); - } - void unsubscribe( std::shared_ptr session, Unsubscribe unsubscribe) { @@ -275,17 +238,21 @@ class MoQDateServer : MoQServer { } private: - [[nodiscard]] FullTrackName dateTrackName() const { + static FullTrackName dateTrackName() { return FullTrackName({"moq-date", "/date"}); } MoQForwarder forwarder_; std::unique_ptr relayClient_; + ForwardPreference pref_{ForwardPreference::Group}; }; } // namespace int main(int argc, char* argv[]) { folly::Init init(&argc, &argv, true); folly::EventBase evb; - MoQDateServer moqDateServer(&evb); + MoQDateServer moqDateServer( + &evb, + FLAGS_stream_per_group ? ForwardPreference::Group + : ForwardPreference::Object); moqDateServer.publishDateLoop().scheduleOn(&evb).start(); evb.loopForever(); return 0;