diff --git a/moxygen/MoQCodec.cpp b/moxygen/MoQCodec.cpp index d8c3818..f8620bd 100644 --- a/moxygen/MoQCodec.cpp +++ b/moxygen/MoQCodec.cpp @@ -128,7 +128,10 @@ void MoQObjectStreamCodec::onIngress( case StreamType::STREAM_HEADER_SUBGROUP: parseState_ = ParseState::OBJECT_STREAM; break; - // CONTROL doesn't have a wire type yet. + case StreamType::FETCH_HEADER: + parseState_ = ParseState::FETCH_HEADER; + break; + // CONTROL doesn't have a wire type yet. default: XLOG(DBG4) << "Stream not allowed: 0x" << std::setfill('0') << std::setw(sizeof(uint64_t) * 2) << std::hex @@ -154,6 +157,19 @@ void MoQObjectStreamCodec::onIngress( } break; } + case ParseState::FETCH_HEADER: { + auto newCursor = cursor; + auto res = parseFetchHeader(newCursor); + if (res.hasError()) { + XLOG(DBG6) << __func__ << " " << uint32_t(res.error()); + connError_ = res.error(); + break; + } + curObjectHeader_.trackIdentifier = SubscribeID(res.value()); + parseState_ = ParseState::MULTI_OBJECT_HEADER; + cursor = newCursor; + break; + } case ParseState::OBJECT_STREAM: { auto newCursor = cursor; auto res = parseStreamHeader(newCursor, streamType_); diff --git a/moxygen/MoQCodec.h b/moxygen/MoQCodec.h index 07f8090..8c954bc 100644 --- a/moxygen/MoQCodec.h +++ b/moxygen/MoQCodec.h @@ -141,6 +141,7 @@ class MoQObjectStreamCodec : public MoQCodec { public: ~ObjectCallback() override = default; + virtual void onFetchHeader(uint64_t subscribeID) = 0; virtual void onObjectHeader(ObjectHeader objectHeader) = 0; virtual void onObjectPayload( @@ -168,6 +169,7 @@ class MoQObjectStreamCodec : public MoQCodec { STREAM_HEADER_TYPE, DATAGRAM, OBJECT_STREAM, + FETCH_HEADER, MULTI_OBJECT_HEADER, OBJECT_PAYLOAD, // OBJECT_PAYLOAD_NO_LENGTH diff --git a/moxygen/MoQFramer.cpp b/moxygen/MoQFramer.cpp index 707db96..489ad1b 100644 --- a/moxygen/MoQFramer.cpp +++ b/moxygen/MoQFramer.cpp @@ -188,6 +188,15 @@ folly::Expected parseServerSetup( return serverSetup; } +folly::Expected parseFetchHeader( + folly::io::Cursor& cursor) noexcept { + auto subscribeID = quic::decodeQuicInteger(cursor); + if (!subscribeID) { + return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); + } + return subscribeID->first; +} + folly::Expected parseObjectHeader( folly::io::Cursor& cursor, size_t length) noexcept { @@ -287,10 +296,13 @@ folly::Expected parseMultiObjectHeader( const ObjectHeader& headerTemplate) noexcept { DCHECK( streamType == StreamType::STREAM_HEADER_TRACK || - streamType == StreamType::STREAM_HEADER_SUBGROUP); + streamType == StreamType::STREAM_HEADER_SUBGROUP || + streamType == StreamType::FETCH_HEADER); + // TODO get rid of this auto length = cursor.totalLength(); ObjectHeader objectHeader = headerTemplate; - if (streamType == StreamType::STREAM_HEADER_TRACK) { + if (streamType == StreamType::STREAM_HEADER_TRACK || + streamType == StreamType::FETCH_HEADER) { auto group = quic::decodeQuicInteger(cursor, length); if (!group) { return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); @@ -301,12 +313,28 @@ folly::Expected parseMultiObjectHeader( } else { objectHeader.forwardPreference = ForwardPreference::Subgroup; } + if (streamType == StreamType::FETCH_HEADER) { + objectHeader.forwardPreference = ForwardPreference::Fetch; + auto subgroup = quic::decodeQuicInteger(cursor, length); + if (!subgroup) { + return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); + } + length -= subgroup->second; + objectHeader.subgroup = subgroup->first; + } auto id = quic::decodeQuicInteger(cursor, length); if (!id) { return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); } length -= id->second; objectHeader.id = id->first; + if (streamType == StreamType::FETCH_HEADER) { + if (length < 2) { + return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); + } + objectHeader.priority = cursor.readBE(); + length--; + } auto payloadLength = quic::decodeQuicInteger(cursor, length); if (!payloadLength) { return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW); @@ -1153,6 +1181,9 @@ WriteResult writeStreamHeader( folly::to_underlying(StreamType::STREAM_HEADER_SUBGROUP), size, error); + } else if (objectHeader.forwardPreference == ForwardPreference::Fetch) { + writeVarint( + writeBuf, folly::to_underlying(StreamType::FETCH_HEADER), size, error); } else { LOG(FATAL) << "Unsupported forward preference to stream header"; } @@ -1161,8 +1192,10 @@ WriteResult writeStreamHeader( writeVarint(writeBuf, objectHeader.group, size, error); writeVarint(writeBuf, objectHeader.subgroup, size, error); } - writeBuf.append(&objectHeader.priority, 1); - size += 1; + if (objectHeader.forwardPreference != ForwardPreference::Fetch) { + writeBuf.append(&objectHeader.priority, 1); + size += 1; + } if (error) { return folly::makeUnexpected(quic::TransportErrorCode::INTERNAL_ERROR); } @@ -1199,12 +1232,16 @@ WriteResult writeObject( if (objectHeader.forwardPreference != ForwardPreference::Subgroup) { writeVarint(writeBuf, objectHeader.group, size, error); } + if (objectHeader.forwardPreference == ForwardPreference::Fetch) { + writeVarint(writeBuf, objectHeader.subgroup, size, error); + } writeVarint(writeBuf, objectHeader.id, size, error); CHECK( objectHeader.status != ObjectStatus::NORMAL || (objectHeader.length && *objectHeader.length > 0)) << "Normal objects require non-zero length"; - if (objectHeader.forwardPreference == ForwardPreference::Datagram) { + if (objectHeader.forwardPreference == ForwardPreference::Datagram || + objectHeader.forwardPreference == ForwardPreference::Fetch) { writeBuf.append(&objectHeader.priority, 1); size += 1; } diff --git a/moxygen/MoQFramer.h b/moxygen/MoQFramer.h index 420d109..8b7c980 100644 --- a/moxygen/MoQFramer.h +++ b/moxygen/MoQFramer.h @@ -112,6 +112,7 @@ enum class StreamType : uint64_t { OBJECT_DATAGRAM = 1, STREAM_HEADER_TRACK = 0x2, STREAM_HEADER_SUBGROUP = 0x4, + FETCH_HEADER = 0x5, CONTROL = 100000000 }; @@ -148,7 +149,7 @@ constexpr uint64_t kVersionDraft06_exp = constexpr uint64_t kVersionDraft07_exp = 0xff070001; // Draft 7 FETCH support constexpr uint64_t kVersionDraft07_exp2 = 0xff070002; // Draft 7 FETCH + removal of Subscribe ID on objects -constexpr uint64_t kVersionDraftCurrent = kVersionDraft07_exp2; +constexpr uint64_t kVersionDraftCurrent = kVersionDraft07; struct ClientSetup { std::vector supportedVersions; @@ -168,7 +169,7 @@ folly::Expected parseServerSetup( folly::io::Cursor& cursor, size_t length) noexcept; -enum class ForwardPreference : uint8_t { Track, Subgroup, Datagram }; +enum class ForwardPreference : uint8_t { Track, Subgroup, Datagram, Fetch }; enum class ObjectStatus : uint64_t { NORMAL = 0, @@ -264,6 +265,9 @@ folly::Expected parseObjectHeader( folly::io::Cursor& cursor, size_t length) noexcept; +folly::Expected parseFetchHeader( + folly::io::Cursor& cursor) noexcept; + folly::Expected parseStreamHeader( folly::io::Cursor& cursor, StreamType streamType) noexcept; diff --git a/moxygen/MoQServer.cpp b/moxygen/MoQServer.cpp index afdbeef..b25bf96 100644 --- a/moxygen/MoQServer.cpp +++ b/moxygen/MoQServer.cpp @@ -92,18 +92,6 @@ void MoQServer::ControlVisitor::operator()(Fetch fetch) const { XLOG(INFO) << "Fetch id=" << fetch.subscribeID; } -void MoQServer::ControlVisitor::operator()(FetchCancel fetchCancel) const { - XLOG(INFO) << "FetchCancel id=" << fetchCancel.subscribeID; -} - -void MoQServer::ControlVisitor::operator()(FetchOk fetchOk) const { - XLOG(INFO) << "FetchOk id=" << fetchOk.subscribeID; -} - -void MoQServer::ControlVisitor::operator()(FetchError fetchError) const { - XLOG(INFO) << "FetchError id=" << fetchError.subscribeID; -} - void MoQServer::ControlVisitor::operator()(SubscribeDone subscribeDone) const { XLOG(INFO) << "SubscribeDone id=" << subscribeDone.subscribeID << " code=" << folly::to_underlying(subscribeDone.statusCode) diff --git a/moxygen/MoQServer.h b/moxygen/MoQServer.h index 9d9e90a..c774e8c 100644 --- a/moxygen/MoQServer.h +++ b/moxygen/MoQServer.h @@ -39,9 +39,6 @@ class MoQServer : public MoQSession::ServerSetupCallback { void operator()(SubscribeRequest subscribeReq) const override; void operator()(SubscribeUpdate subscribeUpdate) const override; void operator()(Fetch fetch) const override; - void operator()(FetchCancel fetchCancel) const override; - void operator()(FetchOk fetchOk) const override; - void operator()(FetchError fetchError) const override; void operator()(Unannounce unannounce) const override; void operator()(AnnounceCancel announceCancel) const override; void operator()(SubscribeAnnounces subscribeAnnounces) const override; diff --git a/moxygen/MoQSession.cpp b/moxygen/MoQSession.cpp index 7297339..73bb6b3 100644 --- a/moxygen/MoQSession.cpp +++ b/moxygen/MoQSession.cpp @@ -24,6 +24,10 @@ MoQSession::~MoQSession() { subTrack.second->subscribeError( {/*TrackHandle fills in subId*/ 0, 500, "session closed", folly::none}); } + for (auto& fetch : fetches_) { + fetch.second->fetchError( + {/*TrackHandle fills in subId*/ 0, 500, "session closed"}); + } for (auto& pendingAnn : pendingAnnounce_) { pendingAnn.second.setValue(folly::makeUnexpected( AnnounceError({pendingAnn.first, 500, "session closed"}))); @@ -79,6 +83,15 @@ void MoQSession::close(folly::Optional error) { } subTracks_.clear(); + for (auto& fetch : fetches_) { + fetch.second->fetchError( + {/*TrackHandle fillsin subId*/ 0, 500, "session closed"}); + // TODO: there needs to be a way to queue an error in TrackHandle, both + // from here, when close races the FETCH stream, and from readLoop + // where we get a reset. + } + fetches_.clear(); + wt->closeSession( error.has_value() ? folly::make_optional(folly::to_underlying(error.value())) @@ -218,14 +231,21 @@ folly::coro::Task MoQSession::readLoop( StreamType streamType, proxygen::WebTransport::StreamReadHandle* readHandle) { XLOG(DBG1) << __func__ << " sess=" << this; + auto g = folly::makeGuard([func = __func__, this] { + XLOG(DBG1) << "exit " << func << " sess=" << this; + }); + co_await folly::coro::co_safe_point; std::unique_ptr codec; + MoQObjectStreamCodec* objCodec = nullptr; if (streamType == StreamType::CONTROL) { codec = std::make_unique(dir_, this); } else { - codec = std::make_unique(this); + auto res = std::make_unique(this); + objCodec = res.get(); + codec = std::move(res); } - auto id = readHandle->getID(); - codec->setStreamId(id); + auto streamId = readHandle->getID(); + codec->setStreamId(streamId); // TODO: disallow OBJECT on control streams and non-object on non-control bool fin = false; @@ -234,15 +254,30 @@ folly::coro::Task MoQSession::readLoop( auto streamData = co_await folly::coro::co_awaitTry( readHandle->readStreamData().via(evb_)); if (streamData.hasException()) { - XLOG(ERR) << streamData.exception().what() << " id=" << id + XLOG(ERR) << streamData.exception().what() << " id=" << streamId << " sess=" << this; + // TODO: possibly erase fetch co_return; } else { if (streamData->data || streamData->fin) { codec->onIngress(std::move(streamData->data), streamData->fin); } fin = streamData->fin; - XLOG_IF(DBG3, fin) << "End of stream id=" << id << " sess=" << this; + XLOG_IF(DBG3, fin) << "End of stream id=" << streamId << " sess=" << this; + if (fin && objCodec) { + auto trackId = objCodec->getTrackIdentifier(); + if (auto subscribeID = std::get_if(&trackId)) { + // it's fetch + auto track = getTrack(trackId); + if (track) { + track->fin(); + track->setAllDataReceived(); + if (track->fetchOkReceived()) { + fetches_.erase(*subscribeID); + } + } + } + } } } } @@ -261,7 +296,15 @@ std::shared_ptr MoQSession::getTrack( } track = trackIt->second; } else { - // TODO - handle subscribe ID + auto subscribeID = std::get(trackIdentifier); + XLOG(DBG3) << "getTrack subID=" << subscribeID; + auto trackIt = fetches_.find(subscribeID); + if (trackIt == fetches_.end()) { + // received an object for unknown subscribe ID + XLOG(ERR) << "unknown subscribe ID=" << subscribeID << " sess=" << this; + return nullptr; + } + track = trackIt->second; } return track; } @@ -441,19 +484,67 @@ void MoQSession::onMaxSubscribeId(MaxSubscribeId maxSubscribeId) { } void MoQSession::onFetch(Fetch fetch) { - XLOG(ERR) << "Not implemented yet"; + XLOG(DBG1) << __func__ << " sess=" << this; + const auto subscribeID = fetch.subscribeID; + if (closeSessionIfSubscribeIdInvalid(subscribeID)) { + return; + } + if (fetch.end < fetch.start) { + fetchError( + {fetch.subscribeID, + folly::to_underlying(FetchErrorCode::INVALID_RANGE), + "End must be after start"}); + return; + } + controlMessages_.enqueue(std::move(fetch)); } void MoQSession::onFetchCancel(FetchCancel fetchCancel) { - XLOG(ERR) << "Not implemented yet"; + XLOG(DBG1) << __func__ << " sess=" << this; + PublishKey publishKey( + {fetchCancel.subscribeID, 0, 0, ForwardPreference::Fetch, 0}); + auto pubDataIt = publishDataMap_.find(publishKey); + if (pubDataIt == publishDataMap_.end()) { + XLOG(DBG4) << "No publish key for fetch id=" << fetchCancel.subscribeID + << " sess=" << this; + // Either the Fetch stream has already closed, or isn't open yet. + // If it's already closed, a no-op is fine. + // If it isn't open yet, we could: + // a) enqueue a callback the publisher has to handle + // b) add a placeholder - this is annoying + // See: https://github.com/moq-wg/moq-transport/issues/630 + } else { + wt_->resetStream(pubDataIt->second.streamID, 0); + retireSubscribeId(/*signal=*/true); + pubDataIt->second.cancelled = true; + } } void MoQSession::onFetchOk(FetchOk fetchOk) { - XLOG(ERR) << "Not implemented yet"; + XLOG(DBG1) << __func__ << " sess=" << this; + auto fetchIt = fetches_.find(fetchOk.subscribeID); + if (fetchIt == fetches_.end()) { + XLOG(ERR) << "No matching subscribe ID=" << fetchOk.subscribeID + << " sess=" << this; + return; + } + auto trackHandle = fetchIt->second; + trackHandle->fetchOK(trackHandle); + if (trackHandle->allDataReceived()) { + fetches_.erase(trackHandle->subscribeID()); + } } void MoQSession::onFetchError(FetchError fetchError) { - XLOG(ERR) << "Not implemented yet"; + XLOG(DBG1) << __func__ << " sess=" << this; + auto fetchIt = fetches_.find(fetchError.subscribeID); + if (fetchIt == fetches_.end()) { + XLOG(ERR) << "No matching subscribe ID=" << fetchError.subscribeID + << " sess=" << this; + return; + } + fetchIt->second->fetchError(fetchError); + fetches_.erase(fetchIt); } void MoQSession::onAnnounce(Announce ann) { @@ -743,6 +834,7 @@ void MoQSession::subscribeDone(SubscribeDone subDone) { auto res = writeSubscribeDone(controlWriteBuf_, std::move(subDone)); if (!res) { XLOG(ERR) << "writeSubscribeDone failed sess=" << this; + // TODO: any control write failure should probably result in close() return; } @@ -784,6 +876,70 @@ void MoQSession::subscribeUpdate(SubscribeUpdate subUpdate) { controlWriteEvent_.signal(); } +folly::coro::Task< + folly::Expected, FetchError>> +MoQSession::fetch(Fetch fetch) { + XLOG(DBG1) << __func__ << " sess=" << this; + auto g = + folly::makeGuard([func = __func__] { XLOG(DBG1) << "exit " << func; }); + auto fullTrackName = fetch.fullTrackName; + if (nextSubscribeID_ >= peerMaxSubscribeID_) { + XLOG(WARN) << "Issuing fetch that will fail; nextSubscribeID_=" + << nextSubscribeID_ + << " peerMaxSubscribeid_=" << peerMaxSubscribeID_ + << " sess=" << this; + } + auto subID = nextSubscribeID_++; + fetch.subscribeID = subID; + auto wres = writeFetch(controlWriteBuf_, std::move(fetch)); + if (!wres) { + XLOG(ERR) << "writeFetch failed sess=" << this; + co_return folly::makeUnexpected( + FetchError({subID, 500, "local write failed"})); + } + controlWriteEvent_.signal(); + auto subTrack = fetches_.emplace( + std::piecewise_construct, + std::forward_as_tuple(subID), + std::forward_as_tuple(std::make_shared( + fullTrackName, subID, cancellationSource_.getToken()))); + + auto trackHandle = subTrack.first->second; + auto res = co_await trackHandle->fetchReady(); + XLOG(DBG1) << __func__ << " fetchReady trackHandle=" << trackHandle; + co_return res; +} + +void MoQSession::fetchOk(FetchOk fetchOk) { + XLOG(DBG1) << __func__ << " sess=" << this; + auto res = writeFetchOk(controlWriteBuf_, fetchOk); + if (!res) { + XLOG(ERR) << "writeFetchOk failed sess=" << this; + return; + } + controlWriteEvent_.signal(); +} + +void MoQSession::fetchError(FetchError fetchErr) { + XLOG(DBG1) << __func__ << " sess=" << this; + auto res = writeFetchError(controlWriteBuf_, std::move(fetchErr)); + if (!res) { + XLOG(ERR) << "writeFetchError failed sess=" << this; + return; + } + controlWriteEvent_.signal(); +} + +void MoQSession::fetchCancel(FetchCancel fetchCan) { + XLOG(DBG1) << __func__ << " sess=" << this; + auto res = writeFetchCancel(controlWriteBuf_, std::move(fetchCan)); + if (!res) { + XLOG(ERR) << "writeFetchCancel failed sess=" << this; + return; + } + controlWriteEvent_.signal(); +} + namespace { constexpr uint32_t IdMask = 0x1FFFFF; uint64_t groupOrder(GroupOrder groupOrder, uint64_t group) { @@ -845,6 +1001,37 @@ folly::SemiFuture MoQSession::publishStatus( return publishImpl(objHeader, subscribeID, 0, nullptr, true, false); } +folly::Try MoQSession::closeFetchStream(SubscribeID subID) { + PublishKey publishKey{ + TrackIdentifier(subID), 0, 0, ForwardPreference::Fetch, 0}; + auto pubDataIt = publishDataMap_.find(publishKey); + if (pubDataIt == publishDataMap_.end()) { + XLOG(ERR) << "Invalid subscribeID to closeFetchStream=" << subID.value; + return folly::makeTryWith( + [] { throw std::runtime_error("Invalid subscribeID"); }); + } + if (pubDataIt->second.objectLength && *pubDataIt->second.objectLength > 0) { + XLOG(ERR) << "Non-zero length remaining in previous obj id=" << subID.value; + close(); + return folly::makeTryWith( + [] { throw std::runtime_error("Premature closeFetchStream"); }); + } + auto streamID = pubDataIt->second.streamID; + auto cancelled = pubDataIt->second.cancelled; + publishDataMap_.erase(pubDataIt); + if (!cancelled) { + XLOG(DBG1) << "Closing fetch stream=" << streamID; + retireSubscribeId(/*signal=*/true); + auto writeRes = wt_->writeStreamData(streamID, nullptr, true); + if (!writeRes) { + XLOG(ERR) << "Failed to close fetch stream sess=" << this + << " error=" << static_cast(writeRes.error()); + // Probably got a STOP_SENDING, which is fine + } + } + return folly::Try(folly::unit); +} + folly::SemiFuture MoQSession::publishImpl( const ObjectHeader& objHeader, SubscribeID subscribeID, @@ -877,10 +1064,9 @@ folly::SemiFuture MoQSession::publishImpl( // - Next portion of the object calls this function again with payloadOffset // > 0 if (payloadOffset != 0) { - XLOG(WARN) - << __func__ - << " Can't start publishing in the middle. Disgregard data for this new obj with payloadOffset = " - << payloadOffset << " sess=" << this; + XLOG(WARN) << __func__ << " Can't start publishing in the middle. " + << "Disgregard data for this new obj with payloadOffset = " + << payloadOffset << " sess=" << this; return folly::makeSemiFuture(folly::exception_wrapper( std::runtime_error("Can't start publishing in the middle."))); } @@ -917,19 +1103,29 @@ folly::SemiFuture MoQSession::publishImpl( pubDataIt = res.first; // Serialize multi-object stream header if (objHeader.forwardPreference == ForwardPreference::Track || + objHeader.forwardPreference == ForwardPreference::Fetch || objHeader.forwardPreference == ForwardPreference::Subgroup) { writeStreamHeader(writeBuf, objHeader); } } else { XLOG(DBG4) << "Found open pub data sess=" << this; } + if (pubDataIt->second.cancelled) { + XLOG(DBG2) << "Peer has cancelled stream=" << pubDataIt->second.streamID + << " sess=" << this; + // caller MUST NOT call publish again on this stream + publishDataMap_.erase(pubDataIt); + return folly::makeSemiFuture( + folly::exception_wrapper(std::runtime_error("cancelled"))); + } // TODO: Missing offset checks uint64_t payloadLength = payload ? payload->computeChainDataLength() : 0; if (payloadOffset == 0) { // new object // validate group and object are moving in the right direction bool multiObject = false; - if (objHeader.forwardPreference == ForwardPreference::Track) { + if (objHeader.forwardPreference == ForwardPreference::Track || + objHeader.forwardPreference == ForwardPreference::Fetch) { if (objHeader.group < pubDataIt->second.group) { XLOG(ERR) << "Decreasing group in Track sess=" << this; return folly::makeSemiFuture(folly::exception_wrapper( @@ -1006,6 +1202,7 @@ folly::SemiFuture MoQSession::publishImpl( if (!writeRes) { XLOG(ERR) << "Failed to write stream data. sess=" << this << " error=" << static_cast(writeRes.error()); + // TODO: remove stream from publishDataMap_? return folly::makeSemiFuture( folly::exception_wrapper(WebTransportException( writeRes.error(), "Failed to write stream data."))); diff --git a/moxygen/MoQSession.h b/moxygen/MoQSession.h index bbbdb61..fa235ea 100644 --- a/moxygen/MoQSession.h +++ b/moxygen/MoQSession.h @@ -84,6 +84,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, SubscribeUpdate, Unsubscribe, SubscribeDone, + Fetch, TrackStatusRequest, TrackStatus, Goaway>; @@ -143,15 +144,6 @@ class MoQSession : public MoQControlCodec::ControlCallback, virtual void operator()(Fetch fetch) const { XLOG(INFO) << "Fetch subID=" << fetch.subscribeID; } - virtual void operator()(FetchCancel fetchCancel) const { - XLOG(INFO) << "FetchCancel subID=" << fetchCancel.subscribeID; - } - virtual void operator()(FetchOk fetchOk) const { - XLOG(INFO) << "FetchOk subID=" << fetchOk.subscribeID; - } - virtual void operator()(FetchError fetchError) const { - XLOG(INFO) << "FetchError subID=" << fetchError.subscribeID; - } virtual void operator()(TrackStatusRequest trackStatusRequest) const { XLOG(INFO) << "Subscribe ftn=" << trackStatusRequest.fullTrackName.trackNamespace @@ -206,6 +198,10 @@ class MoQSession : public MoQControlCodec::ControlCallback, folly::Expected, SubscribeError>>(); promise_ = std::move(contract.first); future_ = std::move(contract.second); + auto contract2 = folly::coro::makePromiseContract< + folly::Expected, FetchError>>(); + fetchPromise_ = std::move(contract2.first); + fetchFuture_ = std::move(contract2.second); } void setTrackName(FullTrackName trackName) { @@ -231,7 +227,6 @@ class MoQSession : public MoQControlCodec::ControlCallback, ready() { co_return co_await std::move(future_); } - void subscribeOK( std::shared_ptr self, GroupOrder order, @@ -248,6 +243,22 @@ class MoQSession : public MoQControlCodec::ControlCallback, } } + folly::coro::Task, FetchError>> + fetchReady() { + co_return co_await std::move(fetchFuture_); + } + void fetchOK(std::shared_ptr self) { + XCHECK_EQ(self.get(), this); + XLOG(DBG1) << __func__ << " trackHandle=" << this; + fetchPromise_.setValue(std::move(self)); + } + void fetchError(FetchError fetchErr) { + if (!promise_.isFulfilled()) { + fetchErr.subscribeID = subscribeID_; + fetchPromise_.setValue(folly::makeUnexpected(std::move(fetchErr))); + } + } + struct ObjectSource { ObjectHeader header; FullTrackName fullTrackName; @@ -289,15 +300,29 @@ class MoQSession : public MoQControlCodec::ControlCallback, return latest_; } + void setAllDataReceived() { + allDataReceived_ = true; + } + + bool allDataReceived() const { + return allDataReceived_; + } + + bool fetchOkReceived() const { + return fetchPromise_.isFulfilled(); + } + private: FullTrackName fullTrackName_; SubscribeID subscribeID_; - folly::coro::Promise< - folly::Expected, SubscribeError>> - promise_; - folly::coro::Future< - folly::Expected, SubscribeError>> - future_; + using SubscribeResult = + folly::Expected, SubscribeError>; + folly::coro::Promise promise_; + folly::coro::Future future_; + using FetchResult = + folly::Expected, FetchError>; + folly::coro::Promise fetchPromise_; + folly::coro::Future fetchFuture_; folly:: F14FastMap, std::shared_ptr> objects_; @@ -306,6 +331,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, GroupOrder groupOrder_; folly::Optional latest_; folly::CancellationToken cancelToken_; + bool allDataReceived_{false}; }; folly::coro::Task< @@ -317,6 +343,12 @@ class MoQSession : public MoQControlCodec::ControlCallback, void subscribeDone(SubscribeDone subDone); void subscribeUpdate(SubscribeUpdate subUpdate); + folly::coro::Task, FetchError>> + fetch(Fetch fetch); + void fetchOk(FetchOk fetchOk); + void fetchError(FetchError fetchError); + void fetchCancel(FetchCancel fetchCancel); + class WebTransportException : public std::runtime_error { public: explicit WebTransportException( @@ -343,6 +375,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, folly::SemiFuture publishStatus( const ObjectHeader& objHeader, SubscribeID subscribeID); + folly::Try closeFetchStream(SubscribeID subID); void onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh) override; void onNewBidiStream(proxygen::WebTransport::BidiStreamHandle bh) override; @@ -370,6 +403,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, uint64_t id, std::unique_ptr payload, bool eom) override; + void onFetchHeader(uint64_t) override {} void onSubscribe(SubscribeRequest subscribeRequest) override; void onSubscribeUpdate(SubscribeUpdate subscribeUpdate) override; void onSubscribeOk(SubscribeOk subscribeOk) override; @@ -428,6 +462,8 @@ class MoQSession : public MoQControlCodec::ControlCallback, return group == other.group && subgroup == other.subgroup; } else if (pref == ForwardPreference::Track) { return true; + } else if (pref == ForwardPreference::Fetch) { + return true; } return false; } @@ -460,6 +496,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, folly::Optional objectLength; uint64_t offset; bool streamPerObject; + bool cancelled{false}; }; // Get the max subscribe id from the setup params. If MAX_SUBSCRIBE_ID key is @@ -484,6 +521,9 @@ class MoQSession : public MoQControlCodec::ControlCallback, // Track Alias -> Track Handle folly::F14FastMap, TrackAlias::hash> subTracks_; + folly:: + F14FastMap, SubscribeID::hash> + fetches_; folly::F14FastMap subIdToTrackAlias_; diff --git a/moxygen/test/MoQSessionTest.cpp b/moxygen/test/MoQSessionTest.cpp index 74643dc..a5e3def 100644 --- a/moxygen/test/MoQSessionTest.cpp +++ b/moxygen/test/MoQSessionTest.cpp @@ -10,6 +10,9 @@ #include #include #include +#include + +using namespace moxygen; namespace { using namespace moxygen; @@ -23,6 +26,7 @@ class MockControlVisitorBase { virtual void onSubscribeUpdate(SubscribeUpdate subscribeUpdate) const = 0; virtual void onSubscribeDone(SubscribeDone subscribeDone) const = 0; virtual void onUnsubscribe(Unsubscribe unsubscribe) const = 0; + virtual void onFetch(Fetch fetch) const = 0; virtual void onAnnounce(Announce announce) const = 0; virtual void onUnannounce(Unannounce unannounce) const = 0; virtual void onAnnounceCancel(AnnounceCancel announceCancel) const = 0; @@ -86,6 +90,11 @@ class MockControlVisitor : public MoQSession::ControlVisitor, onUnsubscribe(unsubscribe); } + MOCK_METHOD(void, onFetch, (Fetch), (const)); + void operator()(Fetch fetch) const override { + onFetch(fetch); + } + MOCK_METHOD(void, onTrackStatusRequest, (TrackStatusRequest), (const)); void operator()(TrackStatusRequest trackStatusRequest) const override { onTrackStatusRequest(trackStatusRequest); @@ -196,7 +205,378 @@ TEST_F(MoQSessionTest, Setup) { clientSession_->close(); } +TEST_F(MoQSessionTest, Fetch) { + setupMoQSession(); + auto f = [](std::shared_ptr session) mutable + -> folly::coro::Task { + auto handle = co_await session->fetch( + {SubscribeID(0), + FullTrackName{TrackNamespace{{"foo"}}, "bar"}, + 0, + GroupOrder::OldestFirst, + AbsoluteLocation{0, 0}, + AbsoluteLocation{0, 1}, + {}}); + EXPECT_TRUE(handle.hasValue()); + auto obj = co_await handle.value()->objects().next(); + EXPECT_NE(obj.value(), nullptr); + EXPECT_EQ( + *std::get_if(&obj.value()->header.trackIdentifier), + SubscribeID(0)); + auto payload = co_await obj.value()->payload(); + EXPECT_EQ(payload->computeChainDataLength(), 100); + obj = co_await handle.value()->objects().next(); + EXPECT_FALSE(obj.has_value()); + session->close(); + }; + EXPECT_CALL(serverControl, onFetch(testing::_)) + .WillOnce(testing::Invoke([this](Fetch fetch) { + EXPECT_EQ( + fetch.fullTrackName, + FullTrackName({TrackNamespace{{"foo"}}, "bar"})); + serverSession_->fetchOk( + {fetch.subscribeID, + GroupOrder::OldestFirst, + /*endOfTrack=*/0, + AbsoluteLocation{100, 100}, + {}}); + serverSession_->publish( + {fetch.subscribeID, + fetch.start.group, + /*subgroup=*/0, + fetch.start.object, + /*priority=*/0, + ForwardPreference::Fetch, + ObjectStatus::NORMAL, + 100}, + fetch.subscribeID, + 0, + moxygen::test::makeBuf(100), + true); + serverSession_->closeFetchStream(fetch.subscribeID); + })); + f(clientSession_).scheduleOn(&eventBase_).start(); + eventBase_.loop(); +} + +TEST_F(MoQSessionTest, FetchCleanupFromStreamFin) { + setupMoQSession(); + auto f = [](std::shared_ptr session, + std::shared_ptr serverSession) mutable + -> folly::coro::Task { + auto handle = co_await session->fetch( + {SubscribeID(0), + FullTrackName{TrackNamespace{{"foo"}}, "bar"}, + 0, + GroupOrder::OldestFirst, + AbsoluteLocation{0, 0}, + AbsoluteLocation{0, 1}, + {}}); + EXPECT_TRUE(handle.hasValue()); + // publish here now we know FETCH_OK has been received at client + serverSession->publish( + {handle.value()->subscribeID(), + /*group=*/0, + /*subgroup=*/0, + /*object=*/0, + /*priority=*/0, + ForwardPreference::Fetch, + ObjectStatus::NORMAL, + 100}, + handle.value()->subscribeID(), + 0, + moxygen::test::makeBuf(100), + true); + serverSession->closeFetchStream(handle.value()->subscribeID()); + + auto obj = co_await handle.value()->objects().next(); + EXPECT_NE(obj.value(), nullptr); + EXPECT_EQ( + *std::get_if(&obj.value()->header.trackIdentifier), + SubscribeID(0)); + auto payload = co_await obj.value()->payload(); + EXPECT_EQ(payload->computeChainDataLength(), 100); + obj = co_await handle.value()->objects().next(); + EXPECT_FALSE(obj.has_value()); + session->close(); + }; + EXPECT_CALL(serverControl, onFetch(testing::_)) + .WillOnce(testing::Invoke([this](Fetch fetch) { + EXPECT_EQ( + fetch.fullTrackName, + FullTrackName({TrackNamespace{{"foo"}}, "bar"})); + serverSession_->fetchOk( + {fetch.subscribeID, + GroupOrder::OldestFirst, + /*endOfTrack=*/0, + AbsoluteLocation{100, 100}, + {}}); + })); + f(clientSession_, serverSession_).scheduleOn(&eventBase_).start(); + eventBase_.loop(); +} + +TEST_F(MoQSessionTest, FetchError) { + setupMoQSession(); + auto f = [](std::shared_ptr session) mutable + -> folly::coro::Task { + auto handle = co_await session->fetch( + {SubscribeID(0), + FullTrackName{TrackNamespace{{"foo"}}, "bar"}, + 0, + GroupOrder::OldestFirst, + AbsoluteLocation{0, 1}, + AbsoluteLocation{0, 0}, + {}}); + EXPECT_TRUE(handle.hasError()); + EXPECT_EQ( + handle.error().errorCode, + folly::to_underlying(FetchErrorCode::INVALID_RANGE)); + session->close(); + }; + f(clientSession_).scheduleOn(&eventBase_).start(); + eventBase_.loop(); +} + +TEST_F(MoQSessionTest, FetchCancel) { + setupMoQSession(); + auto f = [](std::shared_ptr clientSession, + std::shared_ptr serverSession) mutable + -> folly::coro::Task { + auto handle = co_await clientSession->fetch( + {SubscribeID(0), + FullTrackName{TrackNamespace{{"foo"}}, "bar"}, + 0, + GroupOrder::OldestFirst, + AbsoluteLocation{0, 0}, + AbsoluteLocation{0, 2}, + {}}); + EXPECT_TRUE(handle.hasValue()); + auto subscribeID = handle.value()->subscribeID(); + clientSession->fetchCancel({subscribeID}); + co_await folly::coro::co_reschedule_on_current_executor; + co_await folly::coro::co_reschedule_on_current_executor; + co_await folly::coro::co_reschedule_on_current_executor; + auto res = co_await folly::coro::co_awaitTry(serverSession->publish( + {subscribeID, + /*group=*/0, + /*subgroup=*/0, + /*object=*/1, + /*priority=*/0, + ForwardPreference::Fetch, + ObjectStatus::NORMAL, + 100}, + subscribeID, + 0, + moxygen::test::makeBuf(100), + true)); + // publish after fetchCancel fails + EXPECT_TRUE(res.hasException()); + clientSession->close(); + }; + EXPECT_CALL(serverControl, onFetch(testing::_)) + .WillOnce(testing::Invoke([this](Fetch fetch) { + EXPECT_EQ( + fetch.fullTrackName, + FullTrackName({TrackNamespace{{"foo"}}, "bar"})); + serverSession_->fetchOk( + {fetch.subscribeID, + GroupOrder::OldestFirst, + /*endOfTrack=*/0, + AbsoluteLocation{100, 100}, + {}}); + serverSession_->publish( + {fetch.subscribeID, + fetch.start.group, + /*subgroup=*/0, + fetch.start.object, + /*priority=*/0, + ForwardPreference::Fetch, + ObjectStatus::NORMAL, + 100}, + fetch.subscribeID, + 0, + moxygen::test::makeBuf(100), + true); + // published 1 object + })); + f(clientSession_, serverSession_).scheduleOn(&eventBase_).start(); + eventBase_.loop(); +} + +TEST_F(MoQSessionTest, FetchEarlyCancel) { + setupMoQSession(); + auto f = [](std::shared_ptr clientSession) mutable + -> folly::coro::Task { + auto handle = co_await clientSession->fetch( + {SubscribeID(0), + FullTrackName{TrackNamespace{{"foo"}}, "bar"}, + 0, + GroupOrder::OldestFirst, + AbsoluteLocation{0, 0}, + AbsoluteLocation{0, 2}, + {}}); + EXPECT_TRUE(handle.hasValue()); + auto subscribeID = handle.value()->subscribeID(); + // TODO: this no-ops right now so there's nothing to verify + clientSession->fetchCancel({subscribeID}); + clientSession->close(); + }; + EXPECT_CALL(serverControl, onFetch(testing::_)) + .WillOnce(testing::Invoke([this](Fetch fetch) { + EXPECT_EQ( + fetch.fullTrackName, + FullTrackName({TrackNamespace{{"foo"}}, "bar"})); + serverSession_->fetchOk( + {fetch.subscribeID, + GroupOrder::OldestFirst, + /*endOfTrack=*/0, + AbsoluteLocation{100, 100}, + {}}); + })); + f(clientSession_).scheduleOn(&eventBase_).start(); + eventBase_.loop(); +} + +TEST_F(MoQSessionTest, FetchBadLength) { + setupMoQSession(); + auto f = [](std::shared_ptr session) mutable + -> folly::coro::Task { + auto handle = co_await session->fetch( + {SubscribeID(0), + FullTrackName{TrackNamespace{{"foo"}}, "bar"}, + 0, + GroupOrder::OldestFirst, + AbsoluteLocation{0, 0}, + AbsoluteLocation{0, 1}, + {}}); + // onSessionEnd races fetchOk, and the client gets a fetchError + EXPECT_FALSE(handle.hasValue()); + }; + EXPECT_CALL(serverControl, onFetch(testing::_)) + .WillOnce(testing::Invoke([this](Fetch fetch) { + EXPECT_EQ( + fetch.fullTrackName, + FullTrackName({TrackNamespace{{"foo"}}, "bar"})); + serverSession_->fetchOk( + {fetch.subscribeID, + GroupOrder::OldestFirst, + /*endOfTrack=*/0, + AbsoluteLocation{100, 100}, + {}}); + serverSession_->publish( + {fetch.subscribeID, + fetch.start.group, + /*subgroup=*/0, + fetch.start.object, + /*priority=*/0, + ForwardPreference::Fetch, + ObjectStatus::NORMAL, + 100}, + fetch.subscribeID, + 0, + moxygen::test::makeBuf(10), + false); + auto res = serverSession_->closeFetchStream(fetch.subscribeID); + EXPECT_TRUE(res.hasException()); + // this should close the session too + })); + f(clientSession_).scheduleOn(&eventBase_).start(); + eventBase_.loop(); +} + +TEST_F(MoQSessionTest, BadFetchClose) { + setupMoQSession(); + auto res = clientSession_->closeFetchStream(SubscribeID(1000)); + EXPECT_TRUE(res.hasException()); +} + +TEST_F(MoQSessionTest, FetchOverLimit) { + setupMoQSession(); + auto f = [](std::shared_ptr session) mutable + -> folly::coro::Task { + Fetch fetch{ + SubscribeID(0), + FullTrackName{TrackNamespace{{"foo"}}, "bar"}, + 0, + GroupOrder::OldestFirst, + AbsoluteLocation{0, 0}, + AbsoluteLocation{0, 1}, + {}}; + auto handle = co_await session->fetch(fetch); + handle = co_await session->fetch(fetch); + handle = co_await session->fetch(fetch); + EXPECT_TRUE(handle.hasError()); + }; + EXPECT_CALL(serverControl, onFetch(testing::_)) + .WillOnce(testing::Invoke([this](Fetch fetch) { + EXPECT_EQ( + fetch.fullTrackName, + FullTrackName({TrackNamespace{{"foo"}}, "bar"})); + serverSession_->fetchOk( + {fetch.subscribeID, + GroupOrder::OldestFirst, + /*endOfTrack=*/0, + AbsoluteLocation{100, 100}, + {}}); + })) + .WillOnce(testing::Invoke([this](Fetch fetch) { + EXPECT_EQ( + fetch.fullTrackName, + FullTrackName({TrackNamespace{{"foo"}}, "bar"})); + serverSession_->fetchOk( + {fetch.subscribeID, + GroupOrder::OldestFirst, + /*endOfTrack=*/0, + AbsoluteLocation{100, 100}, + {}}); + })); + f(clientSession_).scheduleOn(&eventBase_).start(); + eventBase_.loop(); +} + +TEST_F(MoQSessionTest, FetchBadID) { + setupMoQSession(); + serverSession_->fetchOk( + {SubscribeID(1000), + GroupOrder::OldestFirst, + /*endOfTrack=*/0, + AbsoluteLocation{100, 100}, + {}}); + eventBase_.loopOnce(); + serverSession_->fetchError({SubscribeID(2000), 500, "local write failed"}); + eventBase_.loopOnce(); + // These are no-ops +} + +// Missing Test Cases +// === // receive bidi stream on client +// getTrack by alias (subscribe with stream) +// getTrack with invalid alias and subscribe ID +// receive non-normal object +// onObjectPayload maps to non-existent object in TrackHandle +// subscribeUpdate/onSubscribeUpdate +// unsubscribe/onUnsubscribe +// onSubscribeOk/Error/Done with unknown ID +// onMaxSubscribeID with ID == 0 {no setup param} +// onFetchCancel with no publish data +// announce/unannounce/announceCancel/announceError/announceOk +// subscribeAnnounces/unsubscribeAnnounces +// GOAWAY +// onConnectionError +// control message write failures +// order on invalid pub track +// publishStreamPerObject +// publish with payloadOffset > 0 +// createUniStream fails +// publish invalid group/object per forward pref +// publish without length +// publish object larger than length +// datagrams +// write stream data fails for object +// publish with stream EOM +// uni stream or datagram before setup complete TEST_F(MoQSessionTest, SetupTimeout) { eventBase_.loopOnce(); diff --git a/moxygen/test/Mocks.h b/moxygen/test/Mocks.h index 6591282..5b145db 100644 --- a/moxygen/test/Mocks.h +++ b/moxygen/test/Mocks.h @@ -22,6 +22,7 @@ class MockMoQCodecCallback : public MoQControlCodec::ControlCallback, uint64_t id, std::unique_ptr payload, bool eom)); + MOCK_METHOD(void, onFetchHeader, (uint64_t subscribeID)); MOCK_METHOD(void, onSubscribe, (SubscribeRequest subscribeRequest)); MOCK_METHOD(void, onSubscribeUpdate, (SubscribeUpdate subscribeUpdate)); MOCK_METHOD(void, onSubscribeOk, (SubscribeOk subscribeOk)); diff --git a/moxygen/test/TestUtils.cpp b/moxygen/test/TestUtils.cpp index cc31e20..bc16132 100644 --- a/moxygen/test/TestUtils.cpp +++ b/moxygen/test/TestUtils.cpp @@ -6,6 +6,8 @@ #include "moxygen/test/TestUtils.h" +#include +#include #include "moxygen/MoQFramer.h" namespace moxygen::test { @@ -229,4 +231,18 @@ std::unique_ptr writeAllObjectMessages() { return writeBuf.move(); } +std::unique_ptr makeBuf(uint32_t size) { + auto out = folly::IOBuf::create(size); + out->append(size); + // fill with random junk + folly::io::RWPrivateCursor cursor(out.get()); + while (cursor.length() >= 8) { + cursor.write(folly::Random::rand64()); + } + while (cursor.length()) { + cursor.write((uint8_t)folly::Random::rand32()); + } + return out; +} + } // namespace moxygen::test diff --git a/moxygen/test/TestUtils.h b/moxygen/test/TestUtils.h index d1aaa88..a64e955 100644 --- a/moxygen/test/TestUtils.h +++ b/moxygen/test/TestUtils.h @@ -20,4 +20,6 @@ inline std::unique_ptr writeAllMessages() { return buf; } +std::unique_ptr makeBuf(uint32_t size = 10); + } // namespace moxygen::test