From 0871ed186db11cbda2e62842335bb33c82db63bd Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Fri, 27 Oct 2023 11:17:04 -0500 Subject: [PATCH 1/3] Add pause to stopwaiter to enable restart after stopping --- broadcastclients/broadcastclients.go | 5 +++-- util/stopwaiter/stopwaiter.go | 20 ++++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index acfcf8045b..52346ecbb3 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -220,7 +220,7 @@ func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { client := bcs.secondaryClients[bcs.numOfStartedSecondary] bcs.numOfStartedSecondary += 1 client.Start(ctx) - } else { + } else if len(bcs.secondaryClients) > 0 { log.Warn("failed to start a new secondary feed all available secondary feeds were started") } } @@ -228,7 +228,8 @@ func (bcs *BroadcastClients) stopSecondaryFeed(ctx context.Context) { if bcs.numOfStartedSecondary > 0 { bcs.numOfStartedSecondary -= 1 client := bcs.secondaryClients[bcs.numOfStartedSecondary] - client.StopAndWait() + client.Pause() + log.Info("disconnected secondary feed") } } diff --git a/util/stopwaiter/stopwaiter.go b/util/stopwaiter/stopwaiter.go index 1e70e328eb..e279e333a4 100644 --- a/util/stopwaiter/stopwaiter.go +++ b/util/stopwaiter/stopwaiter.go @@ -116,6 +116,20 @@ func (s *StopWaiterSafe) StopAndWait() error { return s.stopAndWaitImpl(stopDelayWarningTimeout) } +// Pause calls StopAndWait but updates started and stopped booleans to default. Only call if you want to restart the stopwaiter +func (s *StopWaiterSafe) Pause() error { + if err := s.stopAndWaitImpl(stopDelayWarningTimeout); err != nil { + return err + } + + s.mutex.Lock() + defer s.mutex.Unlock() + s.started = false + s.stopped = false + + return nil +} + func getAllStackTraces() string { buf := make([]byte, 64*1024*1024) size := runtime.Stack(buf, true) @@ -326,6 +340,12 @@ func (s *StopWaiter) StopAndWait() { } } +func (s *StopWaiter) Pause() { + if err := s.StopWaiterSafe.Pause(); err != nil { + panic(err) + } +} + // If stop was already called, thread might silently not be launched func (s *StopWaiter) LaunchThread(foo func(context.Context)) { if err := s.StopWaiterSafe.LaunchThreadSafe(foo); err != nil { From dd8a25655bffcf25d9493ad29248308f32d66ea4 Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Fri, 27 Oct 2023 15:34:12 -0500 Subject: [PATCH 2/3] remove pause implementation and code refactor --- broadcastclients/broadcastclients.go | 88 +++++++++++++++------------- util/stopwaiter/stopwaiter.go | 20 ------- 2 files changed, 46 insertions(+), 62 deletions(-) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 52346ecbb3..7fef07be71 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -42,6 +42,7 @@ func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedM type BroadcastClients struct { primaryClients []*broadcastclient.BroadcastClient secondaryClients []*broadcastclient.BroadcastClient + secondaryURL []string numOfStartedSecondary int primaryRouter *Router @@ -51,6 +52,8 @@ type BroadcastClients struct { connected int32 } +var makeClient func(string, *Router) (*broadcastclient.BroadcastClient, error) + func NewBroadcastClients( configFetcher broadcastclient.ConfigFetcher, l2ChainId uint64, @@ -75,44 +78,36 @@ func NewBroadcastClients( clients := BroadcastClients{ primaryRouter: newStandardRouter(), secondaryRouter: newStandardRouter(), + secondaryURL: config.SecondaryURL, } var lastClientErr error - makeFeeds := func(url []string, router *Router) []*broadcastclient.BroadcastClient { - feeds := make([]*broadcastclient.BroadcastClient, 0, len(url)) - for _, address := range url { - client, err := broadcastclient.NewBroadcastClient( - configFetcher, - address, - l2ChainId, - currentMessageCount, - router, - router.confirmedSequenceNumberChan, - fatalErrChan, - addrVerifier, - func(delta int32) { clients.adjustCount(delta) }, - ) - if err != nil { - lastClientErr = err - log.Warn("init broadcast client failed", "address", address) - continue - } - feeds = append(feeds, client) - } - return feeds + makeClient = func(url string, router *Router) (*broadcastclient.BroadcastClient, error) { + return broadcastclient.NewBroadcastClient( + configFetcher, + url, + l2ChainId, + currentMessageCount, + router, + router.confirmedSequenceNumberChan, + fatalErrChan, + addrVerifier, + func(delta int32) { clients.adjustCount(delta) }, + ) } - clients.primaryClients = makeFeeds(config.URL, clients.primaryRouter) - clients.secondaryClients = makeFeeds(config.SecondaryURL, clients.secondaryRouter) - - if len(clients.primaryClients) == 0 && len(clients.secondaryClients) == 0 { - log.Error("no connected feed on startup, last error: %w", lastClientErr) - return nil, nil + clients.primaryClients = make([]*broadcastclient.BroadcastClient, 0, len(config.URL)) + for _, address := range config.URL { + client, err := makeClient(address, clients.primaryRouter) + if err != nil { + lastClientErr = err + log.Warn("init broadcast client failed", "address", address) + continue + } + clients.primaryClients = append(clients.primaryClients, client) } - - // have atleast one primary client if len(clients.primaryClients) == 0 { - clients.primaryClients = append(clients.primaryClients, clients.secondaryClients[0]) - clients.secondaryClients = clients.secondaryClients[1:] + log.Error("no connected feed on startup, last error: %w", lastClientErr) + return nil, nil } return &clients, nil @@ -209,27 +204,36 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { // primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed case <-stopSecondaryFeedTimer.C: - bcs.stopSecondaryFeed(ctx) + bcs.stopSecondaryFeed() } } }) } func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { - if bcs.numOfStartedSecondary < len(bcs.secondaryClients) { - client := bcs.secondaryClients[bcs.numOfStartedSecondary] + if bcs.numOfStartedSecondary < len(bcs.secondaryURL) { + pos := bcs.numOfStartedSecondary + url := bcs.secondaryURL[pos] + client, err := makeClient(url, bcs.secondaryRouter) + if err != nil { + log.Warn("init broadcast secondary client failed", "address", url) + bcs.secondaryURL = append(bcs.secondaryURL[:pos], bcs.secondaryURL[pos+1:]...) + return + } bcs.numOfStartedSecondary += 1 + bcs.secondaryClients = append(bcs.secondaryClients, client) client.Start(ctx) - } else if len(bcs.secondaryClients) > 0 { + log.Info("secondary feed started", "url", url) + } else if len(bcs.secondaryURL) > 0 { log.Warn("failed to start a new secondary feed all available secondary feeds were started") } } -func (bcs *BroadcastClients) stopSecondaryFeed(ctx context.Context) { + +func (bcs *BroadcastClients) stopSecondaryFeed() { if bcs.numOfStartedSecondary > 0 { bcs.numOfStartedSecondary -= 1 - client := bcs.secondaryClients[bcs.numOfStartedSecondary] - client.Pause() - log.Info("disconnected secondary feed") + bcs.secondaryClients[bcs.numOfStartedSecondary].StopAndWait() + log.Info("disconnected secondary feed", "url", bcs.secondaryURL[bcs.numOfStartedSecondary]) } } @@ -237,7 +241,7 @@ func (bcs *BroadcastClients) StopAndWait() { for _, client := range bcs.primaryClients { client.StopAndWait() } - for i := 0; i < bcs.numOfStartedSecondary; i++ { - bcs.secondaryClients[i].StopAndWait() + for _, client := range bcs.secondaryClients { + client.StopAndWait() } } diff --git a/util/stopwaiter/stopwaiter.go b/util/stopwaiter/stopwaiter.go index e279e333a4..1e70e328eb 100644 --- a/util/stopwaiter/stopwaiter.go +++ b/util/stopwaiter/stopwaiter.go @@ -116,20 +116,6 @@ func (s *StopWaiterSafe) StopAndWait() error { return s.stopAndWaitImpl(stopDelayWarningTimeout) } -// Pause calls StopAndWait but updates started and stopped booleans to default. Only call if you want to restart the stopwaiter -func (s *StopWaiterSafe) Pause() error { - if err := s.stopAndWaitImpl(stopDelayWarningTimeout); err != nil { - return err - } - - s.mutex.Lock() - defer s.mutex.Unlock() - s.started = false - s.stopped = false - - return nil -} - func getAllStackTraces() string { buf := make([]byte, 64*1024*1024) size := runtime.Stack(buf, true) @@ -340,12 +326,6 @@ func (s *StopWaiter) StopAndWait() { } } -func (s *StopWaiter) Pause() { - if err := s.StopWaiterSafe.Pause(); err != nil { - panic(err) - } -} - // If stop was already called, thread might silently not be launched func (s *StopWaiter) LaunchThread(foo func(context.Context)) { if err := s.StopWaiterSafe.LaunchThreadSafe(foo); err != nil { From 1f0ac34e128bb048882f22b10b8753b96d5e31c3 Mon Sep 17 00:00:00 2001 From: ganeshvanahalli Date: Fri, 27 Oct 2023 15:59:33 -0500 Subject: [PATCH 3/3] code refactor --- broadcastclients/broadcastclients.go | 33 ++++++++++++++-------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 7fef07be71..551dcdb462 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -40,10 +40,9 @@ func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedM } type BroadcastClients struct { - primaryClients []*broadcastclient.BroadcastClient - secondaryClients []*broadcastclient.BroadcastClient - secondaryURL []string - numOfStartedSecondary int + primaryClients []*broadcastclient.BroadcastClient + secondaryClients []*broadcastclient.BroadcastClient + secondaryURL []string primaryRouter *Router secondaryRouter *Router @@ -76,11 +75,12 @@ func NewBroadcastClients( } } clients := BroadcastClients{ - primaryRouter: newStandardRouter(), - secondaryRouter: newStandardRouter(), - secondaryURL: config.SecondaryURL, + primaryRouter: newStandardRouter(), + secondaryRouter: newStandardRouter(), + primaryClients: make([]*broadcastclient.BroadcastClient, 0, len(config.URL)), + secondaryClients: make([]*broadcastclient.BroadcastClient, 0, len(config.SecondaryURL)), + secondaryURL: config.SecondaryURL, } - var lastClientErr error makeClient = func(url string, router *Router) (*broadcastclient.BroadcastClient, error) { return broadcastclient.NewBroadcastClient( configFetcher, @@ -95,7 +95,7 @@ func NewBroadcastClients( ) } - clients.primaryClients = make([]*broadcastclient.BroadcastClient, 0, len(config.URL)) + var lastClientErr error for _, address := range config.URL { client, err := makeClient(address, clients.primaryRouter) if err != nil { @@ -211,8 +211,8 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { } func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { - if bcs.numOfStartedSecondary < len(bcs.secondaryURL) { - pos := bcs.numOfStartedSecondary + pos := len(bcs.secondaryClients) + if pos < len(bcs.secondaryURL) { url := bcs.secondaryURL[pos] client, err := makeClient(url, bcs.secondaryRouter) if err != nil { @@ -220,7 +220,6 @@ func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { bcs.secondaryURL = append(bcs.secondaryURL[:pos], bcs.secondaryURL[pos+1:]...) return } - bcs.numOfStartedSecondary += 1 bcs.secondaryClients = append(bcs.secondaryClients, client) client.Start(ctx) log.Info("secondary feed started", "url", url) @@ -230,10 +229,12 @@ func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { } func (bcs *BroadcastClients) stopSecondaryFeed() { - if bcs.numOfStartedSecondary > 0 { - bcs.numOfStartedSecondary -= 1 - bcs.secondaryClients[bcs.numOfStartedSecondary].StopAndWait() - log.Info("disconnected secondary feed", "url", bcs.secondaryURL[bcs.numOfStartedSecondary]) + pos := len(bcs.secondaryClients) + if pos > 0 { + pos -= 1 + bcs.secondaryClients[pos].StopAndWait() + bcs.secondaryClients = bcs.secondaryClients[:pos] + log.Info("disconnected secondary feed", "url", bcs.secondaryURL[pos]) } }