From a2a47e85dd729c681101a572fb5be732bc10a2e6 Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Mon, 9 Oct 2023 15:35:45 -0500 Subject: [PATCH 1/5] add secondary feed to go relay --- broadcastclient/broadcastclient.go | 6 +- broadcastclients/broadcastclients.go | 135 ++++++++++++++++++++++----- 2 files changed, 116 insertions(+), 25 deletions(-) diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 2649c88192..a7ee269a4c 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -69,6 +69,7 @@ type Config struct { RequireFeedVersion bool `koanf:"require-feed-version" reload:"hot"` Timeout time.Duration `koanf:"timeout" reload:"hot"` URL []string `koanf:"url"` + SecondaryURL []string `koanf:"secondary-url"` Verify signature.VerifierConfig `koanf:"verify"` EnableCompression bool `koanf:"enable-compression" reload:"hot"` } @@ -85,7 +86,8 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".require-chain-id", DefaultConfig.RequireChainId, "require chain id to be present on connect") f.Bool(prefix+".require-feed-version", DefaultConfig.RequireFeedVersion, "require feed version to be present on connect") f.Duration(prefix+".timeout", DefaultConfig.Timeout, "duration to wait before timing out connection to sequencer feed") - f.StringSlice(prefix+".url", DefaultConfig.URL, "URL of sequencer feed source") + f.StringSlice(prefix+".url", DefaultConfig.URL, "list of primary URLs of sequencer feed source") + f.StringSlice(prefix+".secondary-url", DefaultConfig.SecondaryURL, "list of secondary URLs of sequencer feed source") signature.FeedVerifierConfigAddOptions(prefix+".verify", f) f.Bool(prefix+".enable-compression", DefaultConfig.EnableCompression, "enable per message deflate compression support") } @@ -97,6 +99,7 @@ var DefaultConfig = Config{ RequireFeedVersion: false, Verify: signature.DefultFeedVerifierConfig, URL: []string{""}, + SecondaryURL: []string{}, Timeout: 20 * time.Second, EnableCompression: true, } @@ -108,6 +111,7 @@ var DefaultTestConfig = Config{ RequireFeedVersion: false, Verify: signature.DefultFeedVerifierConfig, URL: []string{""}, + SecondaryURL: []string{}, Timeout: 200 * time.Millisecond, EnableCompression: true, } diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 74596bb08f..873d6be03b 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -6,16 +6,42 @@ package broadcastclients import ( "context" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcastclient" + "github.com/offchainlabs/nitro/broadcaster" "github.com/offchainlabs/nitro/util/contracts" + "github.com/offchainlabs/nitro/util/stopwaiter" ) +const MAX_FEED_INACTIVE_TIME = time.Second * 6 +const ROUTER_QUEUE_SIZE = 1024 + +type Router struct { + stopwaiter.StopWaiter + messageChan chan broadcaster.BroadcastFeedMessage + confirmedSequenceNumberChan chan arbutil.MessageIndex + + forwardTxStreamer broadcastclient.TransactionStreamerInterface + forwardConfirmationChan chan arbutil.MessageIndex +} + +func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error { + for _, feedMessage := range feedMessages { + r.messageChan <- *feedMessage + } + return nil +} + type BroadcastClients struct { - clients []*broadcastclient.BroadcastClient + primaryClients []*broadcastclient.BroadcastClient + secondaryClients []*broadcastclient.BroadcastClient + numOfStartedSecondary int + + router *Router // Use atomic access connected int32 @@ -31,34 +57,55 @@ func NewBroadcastClients( addrVerifier contracts.AddressVerifierInterface, ) (*BroadcastClients, error) { config := configFetcher() - urlCount := len(config.URL) - if urlCount <= 0 { + if len(config.URL) == 0 && len(config.SecondaryURL) == 0 { return nil, nil } - clients := BroadcastClients{} - clients.clients = make([]*broadcastclient.BroadcastClient, 0, urlCount) + clients := BroadcastClients{ + router: &Router{ + messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE), + confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE), + forwardTxStreamer: txStreamer, + forwardConfirmationChan: confirmedSequenceNumberListener, + }, + } var lastClientErr error - for _, address := range config.URL { - client, err := broadcastclient.NewBroadcastClient( - configFetcher, - address, - l2ChainId, - currentMessageCount, - txStreamer, - confirmedSequenceNumberListener, - fatalErrChan, - addrVerifier, - func(delta int32) { clients.adjustCount(delta) }, - ) - if err != nil { - lastClientErr = err - log.Warn("init broadcast client failed", "address", address) + makeFeeds := func(url []string) []*broadcastclient.BroadcastClient { + feeds := make([]*broadcastclient.BroadcastClient, 0, len(url)) + for _, address := range url { + client, err := broadcastclient.NewBroadcastClient( + configFetcher, + address, + l2ChainId, + currentMessageCount, + clients.router, + clients.router.confirmedSequenceNumberChan, + fatalErrChan, + addrVerifier, + func(delta int32) { clients.adjustCount(delta) }, + ) + if err != nil { + lastClientErr = err + log.Warn("init broadcast client failed", "address", address) + continue + } + feeds = append(feeds, client) } - clients.clients = append(clients.clients, client) + return feeds } - if len(clients.clients) == 0 { + + clients.primaryClients = makeFeeds(config.URL) + clients.secondaryClients = makeFeeds(config.SecondaryURL) + + if len(clients.primaryClients) == 0 && len(clients.secondaryClients) == 0 { log.Error("no connected feed on startup, last error: %w", lastClientErr) + return nil, nil + } + + // have atleast one primary client + if len(clients.primaryClients) == 0 { + clients.primaryClients = append(clients.primaryClients, clients.secondaryClients[0]) + clients.secondaryClients = clients.secondaryClients[1:] } return &clients, nil @@ -72,12 +119,52 @@ func (bcs *BroadcastClients) adjustCount(delta int32) { } func (bcs *BroadcastClients) Start(ctx context.Context) { - for _, client := range bcs.clients { + bcs.router.StopWaiter.Start(ctx, bcs.router) + + for _, client := range bcs.primaryClients { client.Start(ctx) } + + bcs.router.LaunchThread(func(ctx context.Context) { + startNewFeedTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME) + defer startNewFeedTimer.Stop() + for { + select { + case <-ctx.Done(): + return + case cs := <-bcs.router.confirmedSequenceNumberChan: + startNewFeedTimer.Stop() + bcs.router.forwardConfirmationChan <- cs + startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + case msg := <-bcs.router.messageChan: + startNewFeedTimer.Stop() + if err := bcs.router.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { + log.Error("Error routing message from Sequencer Feed", "err", err) + } + startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + case <-startNewFeedTimer.C: + // failed to get messages from primary feed for ~5 seconds, start a new feed + bcs.StartSecondaryFeed(ctx) + } + } + }) } + +func (bcs *BroadcastClients) StartSecondaryFeed(ctx context.Context) { + if bcs.numOfStartedSecondary < len(bcs.secondaryClients) { + client := bcs.secondaryClients[bcs.numOfStartedSecondary] + bcs.numOfStartedSecondary += 1 + client.Start(ctx) + } else { + log.Warn("failed to start a new secondary feed all available secondary feeds were started") + } +} + func (bcs *BroadcastClients) StopAndWait() { - for _, client := range bcs.clients { + for _, client := range bcs.primaryClients { client.StopAndWait() } + for i := 0; i < bcs.numOfStartedSecondary; i++ { + bcs.secondaryClients[i].StopAndWait() + } } From a65d5b58db52441cad55ea39a63248d27614099b Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Mon, 9 Oct 2023 18:25:34 -0500 Subject: [PATCH 2/5] code refactor --- arbnode/node.go | 2 ++ broadcastclients/broadcastclients.go | 6 +++--- relay/relay.go | 1 + 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/arbnode/node.go b/arbnode/node.go index bf57b1c004..1b42880d3b 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -36,6 +36,7 @@ import ( "github.com/offchainlabs/nitro/das" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/execution/gethexec" + "github.com/offchainlabs/nitro/relay" "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/solgen/go/challengegen" "github.com/offchainlabs/nitro/solgen/go/ospgen" @@ -643,6 +644,7 @@ func createNodeImpl( nil, fatalErrChan, bpVerifier, + relay.ConfigDefault.Queue, ) if err != nil { return nil, err diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 873d6be03b..a92c3b736d 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -18,7 +18,6 @@ import ( ) const MAX_FEED_INACTIVE_TIME = time.Second * 6 -const ROUTER_QUEUE_SIZE = 1024 type Router struct { stopwaiter.StopWaiter @@ -55,6 +54,7 @@ func NewBroadcastClients( confirmedSequenceNumberListener chan arbutil.MessageIndex, fatalErrChan chan error, addrVerifier contracts.AddressVerifierInterface, + queueCapcity int, ) (*BroadcastClients, error) { config := configFetcher() if len(config.URL) == 0 && len(config.SecondaryURL) == 0 { @@ -63,8 +63,8 @@ func NewBroadcastClients( clients := BroadcastClients{ router: &Router{ - messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE), - confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE), + messageChan: make(chan broadcaster.BroadcastFeedMessage, queueCapcity), + confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, queueCapcity), forwardTxStreamer: txStreamer, forwardConfirmationChan: confirmedSequenceNumberListener, }, diff --git a/relay/relay.go b/relay/relay.go index bb07251190..26894c0a3d 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -58,6 +58,7 @@ func NewRelay(config *Config, feedErrChan chan error) (*Relay, error) { confirmedSequenceNumberListener, feedErrChan, nil, + config.Queue, ) if err != nil { return nil, err From 6a3c33741c5224ac0d9dd9299c79f7e1216e7c0e Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Tue, 10 Oct 2023 09:52:18 -0500 Subject: [PATCH 3/5] Revert "code refactor" This reverts commit a65d5b58db52441cad55ea39a63248d27614099b. --- arbnode/node.go | 2 -- broadcastclients/broadcastclients.go | 6 +++--- relay/relay.go | 1 - 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/arbnode/node.go b/arbnode/node.go index 1b42880d3b..bf57b1c004 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -36,7 +36,6 @@ import ( "github.com/offchainlabs/nitro/das" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/execution/gethexec" - "github.com/offchainlabs/nitro/relay" "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/solgen/go/challengegen" "github.com/offchainlabs/nitro/solgen/go/ospgen" @@ -644,7 +643,6 @@ func createNodeImpl( nil, fatalErrChan, bpVerifier, - relay.ConfigDefault.Queue, ) if err != nil { return nil, err diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index a92c3b736d..873d6be03b 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -18,6 +18,7 @@ import ( ) const MAX_FEED_INACTIVE_TIME = time.Second * 6 +const ROUTER_QUEUE_SIZE = 1024 type Router struct { stopwaiter.StopWaiter @@ -54,7 +55,6 @@ func NewBroadcastClients( confirmedSequenceNumberListener chan arbutil.MessageIndex, fatalErrChan chan error, addrVerifier contracts.AddressVerifierInterface, - queueCapcity int, ) (*BroadcastClients, error) { config := configFetcher() if len(config.URL) == 0 && len(config.SecondaryURL) == 0 { @@ -63,8 +63,8 @@ func NewBroadcastClients( clients := BroadcastClients{ router: &Router{ - messageChan: make(chan broadcaster.BroadcastFeedMessage, queueCapcity), - confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, queueCapcity), + messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE), + confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE), forwardTxStreamer: txStreamer, forwardConfirmationChan: confirmedSequenceNumberListener, }, diff --git a/relay/relay.go b/relay/relay.go index 26894c0a3d..bb07251190 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -58,7 +58,6 @@ func NewRelay(config *Config, feedErrChan chan error) (*Relay, error) { confirmedSequenceNumberListener, feedErrChan, nil, - config.Queue, ) if err != nil { return nil, err From 878ec4ae5df30be16422d20ce30c937c4a03488c Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Fri, 20 Oct 2023 11:26:38 -0500 Subject: [PATCH 4/5] address PR comments, refactor code to use two router design --- broadcastclients/broadcastclients.go | 106 +++++++++++++++++++-------- 1 file changed, 76 insertions(+), 30 deletions(-) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 48f644b7f0..f508404799 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -21,6 +21,7 @@ const ROUTER_QUEUE_SIZE = 1024 const RECENT_FEED_INITIAL_MAP_SIZE = 1024 const RECENT_FEED_ITEM_TTL = time.Second * 10 const MAX_FEED_INACTIVE_TIME = time.Second * 5 +const PRIMARY_FEED_UPTIME = time.Minute * 10 type Router struct { stopwaiter.StopWaiter @@ -43,7 +44,8 @@ type BroadcastClients struct { secondaryClients []*broadcastclient.BroadcastClient numOfStartedSecondary int - router *Router + primaryRouter *Router + secondaryRouter *Router // Use atomic access connected int32 @@ -62,17 +64,20 @@ func NewBroadcastClients( if len(config.URL) == 0 && len(config.SecondaryURL) == 0 { return nil, nil } - - clients := BroadcastClients{ - router: &Router{ + newStandardRouter := func() *Router { + return &Router{ messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE), confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE), forwardTxStreamer: txStreamer, forwardConfirmationChan: confirmedSequenceNumberListener, - }, + } + } + clients := BroadcastClients{ + primaryRouter: newStandardRouter(), + secondaryRouter: newStandardRouter(), } var lastClientErr error - makeFeeds := func(url []string) []*broadcastclient.BroadcastClient { + makeFeeds := func(url []string, router *Router) []*broadcastclient.BroadcastClient { feeds := make([]*broadcastclient.BroadcastClient, 0, len(url)) for _, address := range url { client, err := broadcastclient.NewBroadcastClient( @@ -80,8 +85,8 @@ func NewBroadcastClients( address, l2ChainId, currentMessageCount, - clients.router, - clients.router.confirmedSequenceNumberChan, + router, + router.confirmedSequenceNumberChan, fatalErrChan, addrVerifier, func(delta int32) { clients.adjustCount(delta) }, @@ -96,8 +101,8 @@ func NewBroadcastClients( return feeds } - clients.primaryClients = makeFeeds(config.URL) - clients.secondaryClients = makeFeeds(config.SecondaryURL) + clients.primaryClients = makeFeeds(config.URL, clients.primaryRouter) + clients.secondaryClients = makeFeeds(config.SecondaryURL, clients.secondaryRouter) if len(clients.primaryClients) == 0 && len(clients.secondaryClients) == 0 { log.Error("no connected feed on startup, last error: %w", lastClientErr) @@ -121,7 +126,8 @@ func (bcs *BroadcastClients) adjustCount(delta int32) { } func (bcs *BroadcastClients) Start(ctx context.Context) { - bcs.router.StopWaiter.Start(ctx, bcs.router) + bcs.primaryRouter.StopWaiter.Start(ctx, bcs.primaryRouter) + bcs.secondaryRouter.StopWaiter.Start(ctx, bcs.secondaryRouter) for _, client := range bcs.primaryClients { client.Start(ctx) @@ -130,17 +136,24 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { var lastConfirmed arbutil.MessageIndex recentFeedItemsNew := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) recentFeedItemsOld := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) - bcs.router.LaunchThread(func(ctx context.Context) { + bcs.primaryRouter.LaunchThread(func(ctx context.Context) { recentFeedItemsCleanup := time.NewTicker(RECENT_FEED_ITEM_TTL) - startNewFeedTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME) + startSecondaryFeedTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME) + stopSecondaryFeedTimer := time.NewTicker(PRIMARY_FEED_UPTIME) + primaryFeedIsDownTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME) defer recentFeedItemsCleanup.Stop() - defer startNewFeedTimer.Stop() + defer startSecondaryFeedTimer.Stop() + defer stopSecondaryFeedTimer.Stop() + defer primaryFeedIsDownTimer.Stop() for { select { case <-ctx.Done(): return - case msg := <-bcs.router.messageChan: - startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + + // Primary feeds + case msg := <-bcs.primaryRouter.messageChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { continue } @@ -148,29 +161,55 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { continue } recentFeedItemsNew[msg.SequenceNumber] = time.Now() - // need to stop the timer because forwardTxStreamer might be blocked when traffic is high - // and that shouldn't create race condition between channels timer.C and messageChan - startNewFeedTimer.Stop() - if err := bcs.router.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { - log.Error("Error routing message from Sequencer Feed", "err", err) + if err := bcs.primaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { + log.Error("Error routing message from Primary Sequencer Feeds", "err", err) } - startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) - case cs := <-bcs.router.confirmedSequenceNumberChan: - startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME) if cs == lastConfirmed { continue } lastConfirmed = cs - startNewFeedTimer.Stop() - bcs.router.forwardConfirmationChan <- cs - startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + bcs.primaryRouter.forwardConfirmationChan <- cs + + // Secondary Feeds + case msg := <-bcs.secondaryRouter.messageChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok { + continue + } + if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok { + continue + } + recentFeedItemsNew[msg.SequenceNumber] = time.Now() + if err := bcs.secondaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { + log.Error("Error routing message from Secondary Sequencer Feeds", "err", err) + } + case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan: + startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + if cs == lastConfirmed { + continue + } + lastConfirmed = cs + bcs.secondaryRouter.forwardConfirmationChan <- cs + + // Cycle buckets to get rid of old entries case <-recentFeedItemsCleanup.C: - // Cycle buckets to get rid of old entries recentFeedItemsOld = recentFeedItemsNew recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE) - case <-startNewFeedTimer.C: - // failed to get messages from primary feed for ~5 seconds, start a new feed + + // failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed + case <-startSecondaryFeedTimer.C: bcs.StartSecondaryFeed(ctx) + + // failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary + case <-primaryFeedIsDownTimer.C: + stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME) + + // primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed + case <-stopSecondaryFeedTimer.C: + bcs.StopSecondaryFeed(ctx) } } }) @@ -185,6 +224,13 @@ func (bcs *BroadcastClients) StartSecondaryFeed(ctx context.Context) { log.Warn("failed to start a new secondary feed all available secondary feeds were started") } } +func (bcs *BroadcastClients) StopSecondaryFeed(ctx context.Context) { + if bcs.numOfStartedSecondary > 0 { + bcs.numOfStartedSecondary -= 1 + client := bcs.secondaryClients[bcs.numOfStartedSecondary] + client.StopAndWait() + } +} func (bcs *BroadcastClients) StopAndWait() { for _, client := range bcs.primaryClients { From 756ef6ff323c6e133882e04cf48f5abecaa0cb50 Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Thu, 26 Oct 2023 10:43:52 -0500 Subject: [PATCH 5/5] code refactor --- broadcastclients/broadcastclients.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index f508404799..acfcf8045b 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -201,7 +201,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { // failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed case <-startSecondaryFeedTimer.C: - bcs.StartSecondaryFeed(ctx) + bcs.startSecondaryFeed(ctx) // failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary case <-primaryFeedIsDownTimer.C: @@ -209,13 +209,13 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { // primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed case <-stopSecondaryFeedTimer.C: - bcs.StopSecondaryFeed(ctx) + bcs.stopSecondaryFeed(ctx) } } }) } -func (bcs *BroadcastClients) StartSecondaryFeed(ctx context.Context) { +func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { if bcs.numOfStartedSecondary < len(bcs.secondaryClients) { client := bcs.secondaryClients[bcs.numOfStartedSecondary] bcs.numOfStartedSecondary += 1 @@ -224,7 +224,7 @@ func (bcs *BroadcastClients) StartSecondaryFeed(ctx context.Context) { log.Warn("failed to start a new secondary feed all available secondary feeds were started") } } -func (bcs *BroadcastClients) StopSecondaryFeed(ctx context.Context) { +func (bcs *BroadcastClients) stopSecondaryFeed(ctx context.Context) { if bcs.numOfStartedSecondary > 0 { bcs.numOfStartedSecondary -= 1 client := bcs.secondaryClients[bcs.numOfStartedSecondary]