Skip to content

Commit

Permalink
Use Consumer interface for MoQSession publishing (#12)
Browse files Browse the repository at this point in the history
Summary:

This is a major rewrite of MoQSession using the new Consumers interfaces.  It's separated into the writes (this diff) and reads (next diff).

Note the relay -- which does writes and reads -- is slightly broken in this diff but fixed in the next, so don't read too much into it.

The previous "publish" API required the session to maintain a huge map of every currently open stream across the session and perform lookups into this map in order to do the writes.  It also had a number API error cases that could be eliminated by constraining the interface.

Now subscribeOK and fetchOK return a Consumer object which the publisher will use to pass track data according to those APIs.  No maps are required -- the publisher hangs onto the handle(s) it needs to publish.

MoQForwarder also had a major rewrite.  It conveniently now implements the TrackConsumer interface as well, so a publisher can trivially publish to one or N subscribers.

Differential Revision: D66881597
  • Loading branch information
afrind authored and facebook-github-bot committed Dec 19, 2024
1 parent 93fe21d commit b2b7d6b
Show file tree
Hide file tree
Showing 10 changed files with 1,628 additions and 884 deletions.
1,168 changes: 827 additions & 341 deletions moxygen/MoQSession.cpp

Large diffs are not rendered by default.

166 changes: 73 additions & 93 deletions moxygen/MoQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <folly/coro/Task.h>
#include <folly/coro/UnboundedQueue.h>
#include <folly/logging/xlog.h>
#include <moxygen/MoQConsumers.h>
#include "moxygen/util/TimedBaton.h"

#include <boost/variant.hpp>
Expand Down Expand Up @@ -71,7 +72,7 @@ class MoQSession : public MoQControlCodec::ControlCallback,
if (maxConcurrent > maxConcurrentSubscribes_) {
auto delta = maxConcurrent - maxConcurrentSubscribes_;
maxSubscribeID_ += delta;
sendMaxSubscribeID(/*signal=*/true);
sendMaxSubscribeID(/*signalWriteLoop=*/true);
}
}

Expand Down Expand Up @@ -191,9 +192,11 @@ class MoQSession : public MoQControlCodec::ControlCallback,
TrackHandle(
FullTrackName fullTrackName,
SubscribeID subscribeID,
folly::EventBase* evb,
folly::CancellationToken token)
: fullTrackName_(std::move(fullTrackName)),
subscribeID_(subscribeID),
evb_(evb),
cancelToken_(std::move(token)) {
auto contract = folly::coro::makePromiseContract<
folly::Expected<std::shared_ptr<TrackHandle>, SubscribeError>>();
Expand All @@ -217,10 +220,18 @@ class MoQSession : public MoQControlCodec::ControlCallback,
return subscribeID_;
}

void setNewObjectTimeout(std::chrono::milliseconds objectTimeout) {
objectTimeout_ = objectTimeout;
}

[[nodiscard]] folly::CancellationToken getCancelToken() const {
return cancelToken_;
}

void mergeReadCancelToken(folly::CancellationToken readToken) {
cancelToken_ = folly::CancellationToken::merge(cancelToken_, readToken);
}

void fin();

folly::coro::Task<
Expand Down Expand Up @@ -327,6 +338,7 @@ class MoQSession : public MoQControlCodec::ControlCallback,
private:
FullTrackName fullTrackName_;
SubscribeID subscribeID_;
folly::EventBase* evb_;
using SubscribeResult =
folly::Expected<std::shared_ptr<TrackHandle>, SubscribeError>;
folly::coro::Promise<SubscribeResult> promise_;
Expand All @@ -343,21 +355,21 @@ class MoQSession : public MoQControlCodec::ControlCallback,
GroupOrder groupOrder_;
folly::Optional<AbsoluteLocation> latest_;
folly::CancellationToken cancelToken_;
std::chrono::milliseconds objectTimeout_{std::chrono::hours(24)};
bool allDataReceived_{false};
};

folly::coro::Task<
folly::Expected<std::shared_ptr<TrackHandle>, SubscribeError>>
subscribe(SubscribeRequest sub);
void subscribeOk(SubscribeOk subOk);
std::shared_ptr<TrackConsumer> subscribeOk(SubscribeOk subOk);
void subscribeError(SubscribeError subErr);
void unsubscribe(Unsubscribe unsubscribe);
void subscribeDone(SubscribeDone subDone);
void subscribeUpdate(SubscribeUpdate subUpdate);

folly::coro::Task<folly::Expected<std::shared_ptr<TrackHandle>, FetchError>>
fetch(Fetch fetch);
void fetchOk(FetchOk fetchOk);
std::shared_ptr<FetchConsumer> fetchOk(FetchOk fetchOk);
void fetchError(FetchError fetchError);
void fetchCancel(FetchCancel fetchCancel);

Expand All @@ -371,23 +383,54 @@ class MoQSession : public MoQControlCodec::ControlCallback,
proxygen::WebTransport::ErrorCode errorCode;
};

// Publish this object.
folly::SemiFuture<folly::Unit> publish(
const ObjectHeader& objHeader,
SubscribeID subscribeID,
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
bool eom);
folly::SemiFuture<folly::Unit> publishStreamPerObject(
const ObjectHeader& objHeader,
SubscribeID subscribeID,
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
bool eom);
folly::SemiFuture<folly::Unit> publishStatus(
const ObjectHeader& objHeader,
SubscribeID subscribeID);
folly::Try<folly::Unit> closeFetchStream(SubscribeID subID);
class PublisherImpl {
public:
PublisherImpl(
MoQSession* session,
SubscribeID subscribeID,
Priority priority,
GroupOrder groupOrder)
: session_(session),
subscribeID_(subscribeID),
priority_(priority),
groupOrder_(groupOrder) {}
virtual ~PublisherImpl() = default;

SubscribeID subscribeID() const {
return subscribeID_;
}
uint8_t priority() const {
return priority_;
}
void setPriority(uint8_t priority) {
priority_ = priority;
}
void setGroupOrder(GroupOrder groupOrder) {
groupOrder_ = groupOrder;
}

virtual void reset(ResetStreamErrorCode error) = 0;

virtual void onStreamComplete(const ObjectHeader& finalHeader) = 0;

folly::Expected<folly::Unit, MoQPublishError> subscribeDone(
SubscribeDone subDone);

void fetchComplete();

protected:
proxygen::WebTransport* getWebTransport() const {
if (session_) {
return session_->wt_;
}
return nullptr;
}

MoQSession* session_{nullptr};
SubscribeID subscribeID_;
uint8_t priority_;
GroupOrder groupOrder_;
};

void onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh) override;
void onNewBidiStream(proxygen::WebTransport::BidiStreamHandle bh) override;
Expand All @@ -398,13 +441,16 @@ class MoQSession : public MoQControlCodec::ControlCallback,
}

private:
void cleanup();

folly::coro::Task<void> controlWriteLoop(
proxygen::WebTransport::StreamWriteHandle* writeHandle);
folly::coro::Task<void> readLoop(
StreamType streamType,
proxygen::WebTransport::StreamReadHandle* readHandle);

std::shared_ptr<TrackHandle> getTrack(TrackIdentifier trackidentifier);
void subscribeDone(SubscribeDone subDone);

void onClientSetup(ClientSetup clientSetup) override;
void onServerSetup(ServerSetup setup) override;
Expand Down Expand Up @@ -445,72 +491,9 @@ class MoQSession : public MoQControlCodec::ControlCallback,
void onConnectionError(ErrorCode error) override;
void checkForCloseOnDrain();

folly::SemiFuture<folly::Unit> publishImpl(
const ObjectHeader& objHeader,
SubscribeID subscribeID,
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
bool eom,
bool streamPerObject);

uint64_t order(const ObjectHeader& objHeader, const SubscribeID subscribeID);

void retireSubscribeId(bool signal);
void sendMaxSubscribeID(bool signal);

struct PublishKey {
TrackIdentifier trackIdentifier;
uint64_t group;
uint64_t subgroup;
ForwardPreference pref;
uint64_t object;

bool operator==(const PublishKey& other) const {
if (trackIdentifier != other.trackIdentifier || pref != other.pref) {
return false;
}
if (pref == ForwardPreference::Datagram) {
return object == other.object;
} else if (pref == ForwardPreference::Subgroup) {
return group == other.group && subgroup == other.subgroup;
} else if (pref == ForwardPreference::Track) {
return true;
} else if (pref == ForwardPreference::Fetch) {
return true;
}
return false;
}

struct hash {
size_t operator()(const PublishKey& ook) const {
if (ook.pref == ForwardPreference::Datagram) {
return folly::hash::hash_combine(
TrackIdentifierHash{}(ook.trackIdentifier),
ook.group,
ook.object);
} else if (ook.pref == ForwardPreference::Subgroup) {
return folly::hash::hash_combine(
TrackIdentifierHash{}(ook.trackIdentifier),
ook.group,
ook.subgroup);
}
// Track or Fetch
return folly::hash::hash_combine(
TrackIdentifierHash{}(ook.trackIdentifier));
}
};
};

struct PublishData {
uint64_t streamID;
uint64_t group;
uint64_t subgroup;
uint64_t objectID;
folly::Optional<uint64_t> objectLength;
uint64_t offset;
bool streamPerObject;
bool cancelled{false};
};
void retireSubscribeId(bool signalWriteLoop);
void sendMaxSubscribeID(bool signalWriteLoop);
void fetchComplete(SubscribeID subscribeID);

// Get the max subscribe id from the setup params. If MAX_SUBSCRIBE_ID key is
// not present, we default to 0 as specified. 0 means that the peer MUST NOT
Expand Down Expand Up @@ -555,13 +538,10 @@ class MoQSession : public MoQControlCodec::ControlCallback,
TrackNamespace::hash>
pendingSubscribeAnnounces_;

struct PubTrack {
uint8_t priority;
GroupOrder groupOrder;
};
// Subscriber ID -> metadata about a publish track
folly::F14FastMap<SubscribeID, PubTrack, SubscribeID::hash> pubTracks_;
folly::F14FastMap<PublishKey, PublishData, PublishKey::hash> publishDataMap_;
folly::
F14FastMap<SubscribeID, std::shared_ptr<PublisherImpl>, SubscribeID::hash>
pubTracks_;
uint64_t nextTrackId_{0};
uint64_t closedSubscribes_{0};
// TODO: Make this value configurable. maxConcurrentSubscribes_ represents
Expand Down
Loading

0 comments on commit b2b7d6b

Please sign in to comment.