From ccc9823add61657cecb384632a1e818ff9a29662 Mon Sep 17 00:00:00 2001 From: Wei-Cheng Lin Date: Wed, 18 Sep 2024 14:14:39 -0700 Subject: [PATCH] add SubscriptionType enum to distniguish between state, delta, patch to replace boolean Summary: extending this so i could add patch api Differential Revision: D62675733 fbshipit-source-id: 004d1432b2c22cc63014f8a26b97449aa0353958 --- fboss/fsdb/client/FsdbPatchSubscriber.h | 5 +- fboss/fsdb/client/FsdbPubSubManager.cpp | 72 +++++++++++-------- fboss/fsdb/client/FsdbPubSubManager.h | 3 +- fboss/fsdb/client/FsdbSubscriber.cpp | 22 ++++-- fboss/fsdb/client/FsdbSubscriber.h | 20 +++++- .../client/test/FsdbPubSubManagerTest.cpp | 6 ++ .../tests/client/FsdbPubSubManagerTest.cpp | 33 ++++----- 7 files changed, 106 insertions(+), 55 deletions(-) diff --git a/fboss/fsdb/client/FsdbPatchSubscriber.h b/fboss/fsdb/client/FsdbPatchSubscriber.h index 9b7ce71f0230e..90da5a3c0d561 100644 --- a/fboss/fsdb/client/FsdbPatchSubscriber.h +++ b/fboss/fsdb/client/FsdbPatchSubscriber.h @@ -35,5 +35,8 @@ using FsdbPatchSubscriber = FsdbPatchSubscriberImpl< SubscriberMessage, SubscriberChunk, std::map>; -// TODO: impl extended patch subscribers +using FsdbExtPatchSubscriber = FsdbPatchSubscriberImpl< + SubscriberMessage, + SubscriberChunk, + std::map>; } // namespace facebook::fboss::fsdb diff --git a/fboss/fsdb/client/FsdbPubSubManager.cpp b/fboss/fsdb/client/FsdbPubSubManager.cpp index 47981c16fc612..95cc934b9dfee 100644 --- a/fboss/fsdb/client/FsdbPubSubManager.cpp +++ b/fboss/fsdb/client/FsdbPubSubManager.cpp @@ -13,10 +13,8 @@ namespace { using namespace facebook::fboss::fsdb; -auto constexpr kDelta = "delta"; auto constexpr kState = "state"; auto constexpr kStats = "stats"; -auto constexpr kPath = "path"; auto constexpr kReconnectThread = "FsdbReconnectThread"; auto constexpr kSubscriberThread = "FsdbSubscriberThread"; auto constexpr kStatsPublisherThread = "FsdbStatsPublisherThread"; @@ -25,12 +23,12 @@ auto constexpr kStatePublisherThread = "FsdbStatePublisherThread"; std::string toSubscriptionStr( const std::string& fsdbHost, const std::vector& path, - bool isDelta, + SubscriptionType subscriptionType, bool subscribeStats) { return folly::to( fsdbHost, ":/", - (isDelta ? kDelta : kPath), + subscriptionTypeToStr[subscriptionType], ":/", (subscribeStats ? kStats : kState), ":/", @@ -40,12 +38,12 @@ std::string toSubscriptionStr( std::string toSubscriptionStr( const std::string& fsdbHost, const std::vector& paths, - bool isDelta, + SubscriptionType subscriptionType, bool subscribeStats) { return folly::to( fsdbHost, ":/", - (isDelta ? kDelta : kPath), + subscriptionTypeToStr[subscriptionType], ":/", (subscribeStats ? kStats : kState), ":/", @@ -411,6 +409,8 @@ void FsdbPubSubManager::addStatePathSubscription( SubscriptionStateChangeCb stateChangeCb, FsdbExtStateSubscriber::FsdbOperStateUpdateCb operStateCb, FsdbStreamClient::ServerOptions&& serverOptions) { + XLOG(INFO) << "addStatePathSubscription: " + << typeid(FsdbExtStateSubscriber).name(); addSubscriptionImpl( std::move(subscriptionOptions), PathHelpers::toExtendedOperPath(subscribePaths), @@ -479,13 +479,12 @@ void FsdbPubSubManager::addSubscriptionImpl( bool subscribeStats, FsdbStreamClient::ServerOptions&& serverOptions, const std::optional& clientIdSuffix) { - auto isDelta = std::disjunction_v< - std::is_same, - std::is_same>; + auto subscriptionType = SubscriberT::subscriptionType(); + XCHECK(subscriptionType != SubscriptionType::UNKNOWN) << "Unknown data type"; auto subsStr = toSubscriptionStr( serverOptions.dstAddr.getAddressStr(), subscribePath, - isDelta, + subscriptionType, subscribeStats); auto& path2Subscriber = subscribeStats ? statPath2Subscriber_ : statePath2Subscriber_; @@ -521,13 +520,12 @@ void FsdbPubSubManager::addSubscriptionImpl( SubscriptionStateChangeCb stateChangeCb, typename SubscriberT::FsdbSubUnitUpdateCb subUnitAvailableCb, FsdbStreamClient::ServerOptions&& serverOptions) { - auto isDelta = std::disjunction_v< - std::is_same, - std::is_same>; + auto subscriptionType = SubscriberT::subscriptionType(); + XCHECK(subscriptionType != SubscriptionType::UNKNOWN) << "Unknown data type"; auto subsStr = toSubscriptionStr( serverOptions.dstAddr.getAddressStr(), subscribePath, - isDelta, + subscriptionType, subscriptionOptions.subscribeStats_); auto& path2Subscriber = subscriptionOptions.subscribeStats_ ? statPath2Subscriber_ @@ -574,25 +572,34 @@ void FsdbPubSubManager::removeStateDeltaSubscription( const Path& subscribePath, const std::string& fsdbHost) { removeSubscriptionImpl( - subscribePath, fsdbHost, true /*delta*/, false /*subscribeStats*/); + subscribePath, + fsdbHost, + SubscriptionType::DELTA, + false /*subscribeStats*/); } void FsdbPubSubManager::removeStatePathSubscription( const Path& subscribePath, const std::string& fsdbHost) { removeSubscriptionImpl( - subscribePath, fsdbHost, false /*delta*/, false /*subscribeStats*/); + subscribePath, + fsdbHost, + SubscriptionType::PATH, + false /*subscribeStats*/); } void FsdbPubSubManager::removeStatDeltaSubscription( const Path& subscribePath, const std::string& fsdbHost) { removeSubscriptionImpl( - subscribePath, fsdbHost, true /*delta*/, true /*subscribeStats*/); + subscribePath, + fsdbHost, + SubscriptionType::DELTA, + true /*subscribeStats*/); } void FsdbPubSubManager::removeStatPathSubscription( const Path& subscribePath, const std::string& fsdbHost) { removeSubscriptionImpl( - subscribePath, fsdbHost, false /*delta*/, true /*subscribeStats*/); + subscribePath, fsdbHost, SubscriptionType::PATH, true /*subscribeStats*/); } void FsdbPubSubManager::removeStateDeltaSubscription( @@ -601,7 +608,7 @@ void FsdbPubSubManager::removeStateDeltaSubscription( removeSubscriptionImpl( PathHelpers::toExtendedOperPath(subscribePath), fsdbHost, - true /*delta*/, + SubscriptionType::DELTA, false /*subscribeStats*/); } void FsdbPubSubManager::removeStatePathSubscription( @@ -610,7 +617,7 @@ void FsdbPubSubManager::removeStatePathSubscription( removeSubscriptionImpl( PathHelpers::toExtendedOperPath(subscribePath), fsdbHost, - false /*delta*/, + SubscriptionType::PATH, false /*subscribeStats*/); } void FsdbPubSubManager::removeStatDeltaSubscription( @@ -619,7 +626,7 @@ void FsdbPubSubManager::removeStatDeltaSubscription( removeSubscriptionImpl( PathHelpers::toExtendedOperPath(subscribePath), fsdbHost, - true /*delta*/, + SubscriptionType::DELTA, true /*subscribeStats*/); } void FsdbPubSubManager::removeStatPathSubscription( @@ -628,20 +635,26 @@ void FsdbPubSubManager::removeStatPathSubscription( removeSubscriptionImpl( PathHelpers::toExtendedOperPath(subscribePath), fsdbHost, - false /*delta*/, + SubscriptionType::PATH, true /*subscribeStats*/); } void FsdbPubSubManager::removeStateExtDeltaSubscription( const std::vector& subscribePaths, const std::string& fsdbHost) { removeSubscriptionImpl( - subscribePaths, fsdbHost, true /*delta*/, false /*subscribeStats*/); + subscribePaths, + fsdbHost, + SubscriptionType::DELTA, + false /*subscribeStats*/); } void FsdbPubSubManager::removeStatExtDeltaSubscription( const std::vector& subscribePaths, const std::string& fsdbHost) { removeSubscriptionImpl( - subscribePaths, fsdbHost, true /*delta*/, true /*subscribeStats*/); + subscribePaths, + fsdbHost, + SubscriptionType::DELTA, + true /*subscribeStats*/); } void FsdbPubSubManager::clearStateSubscriptions() { @@ -655,10 +668,10 @@ template void FsdbPubSubManager::removeSubscriptionImpl( const std::vector& subscribePath, const std::string& fsdbHost, - bool isDelta, + SubscriptionType subscriptionType, bool subscribeStats) { - auto subsStr = - toSubscriptionStr(fsdbHost, subscribePath, isDelta, subscribeStats); + auto subsStr = toSubscriptionStr( + fsdbHost, subscribePath, subscriptionType, subscribeStats); auto& path2Subscriber = subscribeStats ? statPath2Subscriber_ : statePath2Subscriber_; if (path2Subscriber.wlock()->erase(subsStr)) { @@ -670,7 +683,10 @@ FsdbStreamClient::State FsdbPubSubManager::getStatePathSubsriptionState( const MultiPath& subscribePath, const std::string& fsdbHost) const { auto subsStr = toSubscriptionStr( - fsdbHost, PathHelpers::toExtendedOperPath(subscribePath), false, false); + fsdbHost, + PathHelpers::toExtendedOperPath(subscribePath), + SubscriptionType::PATH, + false); auto path2SubscriberR = statePath2Subscriber_.rlock(); if (path2SubscriberR->find(subsStr) == path2SubscriberR->end()) { return FsdbStreamClient::State::CANCELLED; diff --git a/fboss/fsdb/client/FsdbPubSubManager.h b/fboss/fsdb/client/FsdbPubSubManager.h index 319fa15a72775..f398fa8f91d68 100644 --- a/fboss/fsdb/client/FsdbPubSubManager.h +++ b/fboss/fsdb/client/FsdbPubSubManager.h @@ -5,6 +5,7 @@ #include #include #include "fboss/fsdb/client/FsdbDeltaSubscriber.h" +#include "fboss/fsdb/client/FsdbPatchSubscriber.h" #include "fboss/fsdb/client/FsdbStateSubscriber.h" #include "fboss/fsdb/client/FsdbStreamClient.h" #include "fboss/fsdb/common/Flags.h" @@ -247,7 +248,7 @@ class FsdbPubSubManager { void removeSubscriptionImpl( const std::vector& subscribePath, const std::string& fsdbHost, - bool isDelta, + SubscriptionType subscribeType, bool subscribeStats); template void addSubscriptionImpl( diff --git a/fboss/fsdb/client/FsdbSubscriber.cpp b/fboss/fsdb/client/FsdbSubscriber.cpp index cded91da1d434..a54f99c8452f2 100644 --- a/fboss/fsdb/client/FsdbSubscriber.cpp +++ b/fboss/fsdb/client/FsdbSubscriber.cpp @@ -5,19 +5,27 @@ namespace facebook::fboss::fsdb { template -std::string FsdbSubscriber::typeStr() const { +SubscriptionType FsdbSubscriber::subscriptionType() { if constexpr ( std::is_same_v || - std::is_same_v) { - return "Delta"; - } else if ( + std::is_same_v) { + return SubscriptionType::DELTA; + } else if constexpr ( std::is_same_v || - std::is_same_v) { - return "Path"; + std::is_same_v) { + return SubscriptionType::PATH; + } else if constexpr (std::is_same_v) { + return SubscriptionType::PATCH; } else { - return "Patch"; + static_assert(folly::always_false, "unsupported request type"); } } + +template +std::string FsdbSubscriber::typeStr() const { + auto subType = subscriptionType(); + return subscriptionTypeToStr[subType]; +} template std::string FsdbSubscriber::pathsStr(const Paths& path) const { return PathHelpers::toString(path); diff --git a/fboss/fsdb/client/FsdbSubscriber.h b/fboss/fsdb/client/FsdbSubscriber.h index 4a5e6e085afbe..9e5f032f466de 100644 --- a/fboss/fsdb/client/FsdbSubscriber.h +++ b/fboss/fsdb/client/FsdbSubscriber.h @@ -26,6 +26,20 @@ enum class SubscriptionState : uint16_t { CONNECTED, }; +enum class SubscriptionType { + UNKNOWN = 0, + PATH = 1, + DELTA = 2, + PATCH = 3, +}; + +static std::unordered_map subscriptionTypeToStr = + { + {SubscriptionType::PATH, "Path"}, + {SubscriptionType::DELTA, "Delta"}, + {SubscriptionType::PATCH, "Patch"}, +}; + inline bool isConnected(const SubscriptionState& state) { return state == SubscriptionState::CONNECTED; } @@ -82,7 +96,7 @@ struct SubscriptionOptions { struct SubscriptionInfo { std::string server; - bool isDelta; + SubscriptionType subscriptionType; bool isStats; std::vector paths; FsdbStreamClient::State state; @@ -168,10 +182,12 @@ class FsdbSubscriber : public FsdbSubscriberBase { cancelStaleStateTimeout(); } + static SubscriptionType subscriptionType(); + SubscriptionInfo getInfo() const override { return SubscriptionInfo{ getServer(), - !std::is_same_v, + subscriptionType(), this->isStats(), PathHelpers::toStringList(subscribePaths_), getState(), diff --git a/fboss/fsdb/client/test/FsdbPubSubManagerTest.cpp b/fboss/fsdb/client/test/FsdbPubSubManagerTest.cpp index ec5b2d925d145..0a570fa6204f7 100644 --- a/fboss/fsdb/client/test/FsdbPubSubManagerTest.cpp +++ b/fboss/fsdb/client/test/FsdbPubSubManagerTest.cpp @@ -222,4 +222,10 @@ TEST_F(PubSubManagerTest, removeAllSubscriptions) { addStatDeltaSubscription({"foo"}); } +TEST_F(PubSubManagerTest, TestSubscriptionInfo) { + EXPECT_EQ(FsdbPatchSubscriber::subscriptionType(), SubscriptionType::PATCH); + EXPECT_EQ(FsdbStateSubscriber::subscriptionType(), SubscriptionType::PATH); + EXPECT_EQ(FsdbDeltaSubscriber::subscriptionType(), SubscriptionType::DELTA); +} + } // namespace facebook::fboss::fsdb diff --git a/fboss/fsdb/tests/client/FsdbPubSubManagerTest.cpp b/fboss/fsdb/tests/client/FsdbPubSubManagerTest.cpp index da2cb9175ca64..1979b2e91388e 100644 --- a/fboss/fsdb/tests/client/FsdbPubSubManagerTest.cpp +++ b/fboss/fsdb/tests/client/FsdbPubSubManagerTest.cpp @@ -45,28 +45,24 @@ class FsdbPubSubManagerTest : public ::testing::Test { protected: folly::Synchronized> subscriptionLastDisconnectReason; - void updateSubscriptionLastDisconnectReason(bool isDelta, bool isStats) { + void updateSubscriptionLastDisconnectReason( + SubscriptionType subscriptionType, + bool isStats) { auto subscriptionInfoList = this->pubSubManager_->getSubscriptionInfo(); for (const auto& subscriptionInfo : subscriptionInfoList) { - if (isDelta == subscriptionInfo.isDelta && + if (subscriptionType == subscriptionInfo.subscriptionType && isStats == subscriptionInfo.isStats) { auto reason = subscriptionInfo.disconnectReason; subscriptionLastDisconnectReason.withWLock([&](auto& map) { std::string subscriberId = isStats ? "STAT" : "State"; - subscriberId += isDelta ? "_Delta" : "_Path"; + subscriberId += + fmt::format("_{}", subscriptionTypeToStr[subscriptionType]); map[subscriberId] = reason; }); return; } } } - FsdbErrorCode getSubscriptionLastDisconnectReason( - bool isDelta, - bool isStats) { - std::string subscriberId = isStats ? "STAT" : "State"; - subscriberId += isDelta ? "_Delta" : "_Path"; - return subscriptionLastDisconnectReason.rlock()->at(subscriberId); - } void checkDisconnectReason(FsdbErrorCode expected) { WITH_RETRIES({ auto reasons = subscriptionLastDisconnectReason.rlock(); @@ -493,21 +489,26 @@ TYPED_TEST(FsdbPubSubManagerGRTest, verifySubscriptionDisconnectOnPublisherGR) { this->addStatDeltaSubscription( this->makeOperDeltaCb(statDeltas), this->subscrStateChangeCb(stateDeltas, [this]() { - this->updateSubscriptionLastDisconnectReason(true, true); + this->updateSubscriptionLastDisconnectReason( + SubscriptionType::DELTA, true); })); this->addStatPathSubscription( this->makeOperStateCb(statPaths), this->subscrStateChangeCb(stateDeltas, [this]() { - this->updateSubscriptionLastDisconnectReason(false, true); + this->updateSubscriptionLastDisconnectReason( + SubscriptionType::PATH, true); })); this->addStateDeltaSubscription( this->makeOperDeltaCb(stateDeltas), this->subscrStateChangeCb(stateDeltas, [this]() { - this->updateSubscriptionLastDisconnectReason(true, false); + this->updateSubscriptionLastDisconnectReason( + SubscriptionType::DELTA, false); })); - SubscriptionStateChangeCb stChangeCb = this->subscrStateChangeCb( - statePaths, - [this]() { this->updateSubscriptionLastDisconnectReason(false, false); }); + SubscriptionStateChangeCb stChangeCb = + this->subscrStateChangeCb(statePaths, [this]() { + this->updateSubscriptionLastDisconnectReason( + SubscriptionType::PATH, false); + }); this->addStatePathSubscription(this->makeOperStateCb(statePaths), stChangeCb); // Publish this->publish(makePortStats(1));