diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index acfcf8045b..551dcdb462 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -40,9 +40,9 @@ func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedM } type BroadcastClients struct { - primaryClients []*broadcastclient.BroadcastClient - secondaryClients []*broadcastclient.BroadcastClient - numOfStartedSecondary int + primaryClients []*broadcastclient.BroadcastClient + secondaryClients []*broadcastclient.BroadcastClient + secondaryURL []string primaryRouter *Router secondaryRouter *Router @@ -51,6 +51,8 @@ type BroadcastClients struct { connected int32 } +var makeClient func(string, *Router) (*broadcastclient.BroadcastClient, error) + func NewBroadcastClients( configFetcher broadcastclient.ConfigFetcher, l2ChainId uint64, @@ -73,48 +75,41 @@ func NewBroadcastClients( } } clients := BroadcastClients{ - primaryRouter: newStandardRouter(), - secondaryRouter: newStandardRouter(), + primaryRouter: newStandardRouter(), + secondaryRouter: newStandardRouter(), + primaryClients: make([]*broadcastclient.BroadcastClient, 0, len(config.URL)), + secondaryClients: make([]*broadcastclient.BroadcastClient, 0, len(config.SecondaryURL)), + secondaryURL: config.SecondaryURL, + } + makeClient = func(url string, router *Router) (*broadcastclient.BroadcastClient, error) { + return broadcastclient.NewBroadcastClient( + configFetcher, + url, + l2ChainId, + currentMessageCount, + router, + router.confirmedSequenceNumberChan, + fatalErrChan, + addrVerifier, + func(delta int32) { clients.adjustCount(delta) }, + ) } + var lastClientErr error - makeFeeds := func(url []string, router *Router) []*broadcastclient.BroadcastClient { - feeds := make([]*broadcastclient.BroadcastClient, 0, len(url)) - for _, address := range url { - client, err := broadcastclient.NewBroadcastClient( - configFetcher, - address, - l2ChainId, - currentMessageCount, - router, - router.confirmedSequenceNumberChan, - fatalErrChan, - addrVerifier, - func(delta int32) { clients.adjustCount(delta) }, - ) - if err != nil { - lastClientErr = err - log.Warn("init broadcast client failed", "address", address) - continue - } - feeds = append(feeds, client) + for _, address := range config.URL { + client, err := makeClient(address, clients.primaryRouter) + if err != nil { + lastClientErr = err + log.Warn("init broadcast client failed", "address", address) + continue } - return feeds + clients.primaryClients = append(clients.primaryClients, client) } - - clients.primaryClients = makeFeeds(config.URL, clients.primaryRouter) - clients.secondaryClients = makeFeeds(config.SecondaryURL, clients.secondaryRouter) - - if len(clients.primaryClients) == 0 && len(clients.secondaryClients) == 0 { + if len(clients.primaryClients) == 0 { log.Error("no connected feed on startup, last error: %w", lastClientErr) return nil, nil } - // have atleast one primary client - if len(clients.primaryClients) == 0 { - clients.primaryClients = append(clients.primaryClients, clients.secondaryClients[0]) - clients.secondaryClients = clients.secondaryClients[1:] - } - return &clients, nil } @@ -209,26 +204,37 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { // primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed case <-stopSecondaryFeedTimer.C: - bcs.stopSecondaryFeed(ctx) + bcs.stopSecondaryFeed() } } }) } func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { - if bcs.numOfStartedSecondary < len(bcs.secondaryClients) { - client := bcs.secondaryClients[bcs.numOfStartedSecondary] - bcs.numOfStartedSecondary += 1 + pos := len(bcs.secondaryClients) + if pos < len(bcs.secondaryURL) { + url := bcs.secondaryURL[pos] + client, err := makeClient(url, bcs.secondaryRouter) + if err != nil { + log.Warn("init broadcast secondary client failed", "address", url) + bcs.secondaryURL = append(bcs.secondaryURL[:pos], bcs.secondaryURL[pos+1:]...) + return + } + bcs.secondaryClients = append(bcs.secondaryClients, client) client.Start(ctx) - } else { + log.Info("secondary feed started", "url", url) + } else if len(bcs.secondaryURL) > 0 { log.Warn("failed to start a new secondary feed all available secondary feeds were started") } } -func (bcs *BroadcastClients) stopSecondaryFeed(ctx context.Context) { - if bcs.numOfStartedSecondary > 0 { - bcs.numOfStartedSecondary -= 1 - client := bcs.secondaryClients[bcs.numOfStartedSecondary] - client.StopAndWait() + +func (bcs *BroadcastClients) stopSecondaryFeed() { + pos := len(bcs.secondaryClients) + if pos > 0 { + pos -= 1 + bcs.secondaryClients[pos].StopAndWait() + bcs.secondaryClients = bcs.secondaryClients[:pos] + log.Info("disconnected secondary feed", "url", bcs.secondaryURL[pos]) } } @@ -236,7 +242,7 @@ func (bcs *BroadcastClients) StopAndWait() { for _, client := range bcs.primaryClients { client.StopAndWait() } - for i := 0; i < bcs.numOfStartedSecondary; i++ { - bcs.secondaryClients[i].StopAndWait() + for _, client := range bcs.secondaryClients { + client.StopAndWait() } }