diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 2649c88192..a7ee269a4c 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -69,6 +69,7 @@ type Config struct { RequireFeedVersion bool `koanf:"require-feed-version" reload:"hot"` Timeout time.Duration `koanf:"timeout" reload:"hot"` URL []string `koanf:"url"` + SecondaryURL []string `koanf:"secondary-url"` Verify signature.VerifierConfig `koanf:"verify"` EnableCompression bool `koanf:"enable-compression" reload:"hot"` } @@ -85,7 +86,8 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".require-chain-id", DefaultConfig.RequireChainId, "require chain id to be present on connect") f.Bool(prefix+".require-feed-version", DefaultConfig.RequireFeedVersion, "require feed version to be present on connect") f.Duration(prefix+".timeout", DefaultConfig.Timeout, "duration to wait before timing out connection to sequencer feed") - f.StringSlice(prefix+".url", DefaultConfig.URL, "URL of sequencer feed source") + f.StringSlice(prefix+".url", DefaultConfig.URL, "list of primary URLs of sequencer feed source") + f.StringSlice(prefix+".secondary-url", DefaultConfig.SecondaryURL, "list of secondary URLs of sequencer feed source") signature.FeedVerifierConfigAddOptions(prefix+".verify", f) f.Bool(prefix+".enable-compression", DefaultConfig.EnableCompression, "enable per message deflate compression support") } @@ -97,6 +99,7 @@ var DefaultConfig = Config{ RequireFeedVersion: false, Verify: signature.DefultFeedVerifierConfig, URL: []string{""}, + SecondaryURL: []string{}, Timeout: 20 * time.Second, EnableCompression: true, } @@ -108,6 +111,7 @@ var DefaultTestConfig = Config{ RequireFeedVersion: false, Verify: signature.DefultFeedVerifierConfig, URL: []string{""}, + SecondaryURL: []string{}, Timeout: 200 * time.Millisecond, EnableCompression: true, } diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 74596bb08f..873d6be03b 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -6,16 +6,42 @@ package broadcastclients import ( "context" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcastclient" + "github.com/offchainlabs/nitro/broadcaster" "github.com/offchainlabs/nitro/util/contracts" + "github.com/offchainlabs/nitro/util/stopwaiter" ) +const MAX_FEED_INACTIVE_TIME = time.Second * 6 +const ROUTER_QUEUE_SIZE = 1024 + +type Router struct { + stopwaiter.StopWaiter + messageChan chan broadcaster.BroadcastFeedMessage + confirmedSequenceNumberChan chan arbutil.MessageIndex + + forwardTxStreamer broadcastclient.TransactionStreamerInterface + forwardConfirmationChan chan arbutil.MessageIndex +} + +func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error { + for _, feedMessage := range feedMessages { + r.messageChan <- *feedMessage + } + return nil +} + type BroadcastClients struct { - clients []*broadcastclient.BroadcastClient + primaryClients []*broadcastclient.BroadcastClient + secondaryClients []*broadcastclient.BroadcastClient + numOfStartedSecondary int + + router *Router // Use atomic access connected int32 @@ -31,34 +57,55 @@ func NewBroadcastClients( addrVerifier contracts.AddressVerifierInterface, ) (*BroadcastClients, error) { config := configFetcher() - urlCount := len(config.URL) - if urlCount <= 0 { + if len(config.URL) == 0 && len(config.SecondaryURL) == 0 { return nil, nil } - clients := BroadcastClients{} - clients.clients = make([]*broadcastclient.BroadcastClient, 0, urlCount) + clients := BroadcastClients{ + router: &Router{ + messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE), + confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE), + forwardTxStreamer: txStreamer, + forwardConfirmationChan: confirmedSequenceNumberListener, + }, + } var lastClientErr error - for _, address := range config.URL { - client, err := broadcastclient.NewBroadcastClient( - configFetcher, - address, - l2ChainId, - currentMessageCount, - txStreamer, - confirmedSequenceNumberListener, - fatalErrChan, - addrVerifier, - func(delta int32) { clients.adjustCount(delta) }, - ) - if err != nil { - lastClientErr = err - log.Warn("init broadcast client failed", "address", address) + makeFeeds := func(url []string) []*broadcastclient.BroadcastClient { + feeds := make([]*broadcastclient.BroadcastClient, 0, len(url)) + for _, address := range url { + client, err := broadcastclient.NewBroadcastClient( + configFetcher, + address, + l2ChainId, + currentMessageCount, + clients.router, + clients.router.confirmedSequenceNumberChan, + fatalErrChan, + addrVerifier, + func(delta int32) { clients.adjustCount(delta) }, + ) + if err != nil { + lastClientErr = err + log.Warn("init broadcast client failed", "address", address) + continue + } + feeds = append(feeds, client) } - clients.clients = append(clients.clients, client) + return feeds } - if len(clients.clients) == 0 { + + clients.primaryClients = makeFeeds(config.URL) + clients.secondaryClients = makeFeeds(config.SecondaryURL) + + if len(clients.primaryClients) == 0 && len(clients.secondaryClients) == 0 { log.Error("no connected feed on startup, last error: %w", lastClientErr) + return nil, nil + } + + // have atleast one primary client + if len(clients.primaryClients) == 0 { + clients.primaryClients = append(clients.primaryClients, clients.secondaryClients[0]) + clients.secondaryClients = clients.secondaryClients[1:] } return &clients, nil @@ -72,12 +119,52 @@ func (bcs *BroadcastClients) adjustCount(delta int32) { } func (bcs *BroadcastClients) Start(ctx context.Context) { - for _, client := range bcs.clients { + bcs.router.StopWaiter.Start(ctx, bcs.router) + + for _, client := range bcs.primaryClients { client.Start(ctx) } + + bcs.router.LaunchThread(func(ctx context.Context) { + startNewFeedTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME) + defer startNewFeedTimer.Stop() + for { + select { + case <-ctx.Done(): + return + case cs := <-bcs.router.confirmedSequenceNumberChan: + startNewFeedTimer.Stop() + bcs.router.forwardConfirmationChan <- cs + startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + case msg := <-bcs.router.messageChan: + startNewFeedTimer.Stop() + if err := bcs.router.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil { + log.Error("Error routing message from Sequencer Feed", "err", err) + } + startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME) + case <-startNewFeedTimer.C: + // failed to get messages from primary feed for ~5 seconds, start a new feed + bcs.StartSecondaryFeed(ctx) + } + } + }) } + +func (bcs *BroadcastClients) StartSecondaryFeed(ctx context.Context) { + if bcs.numOfStartedSecondary < len(bcs.secondaryClients) { + client := bcs.secondaryClients[bcs.numOfStartedSecondary] + bcs.numOfStartedSecondary += 1 + client.Start(ctx) + } else { + log.Warn("failed to start a new secondary feed all available secondary feeds were started") + } +} + func (bcs *BroadcastClients) StopAndWait() { - for _, client := range bcs.clients { + for _, client := range bcs.primaryClients { client.StopAndWait() } + for i := 0; i < bcs.numOfStartedSecondary; i++ { + bcs.secondaryClients[i].StopAndWait() + } }