Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add secondary feed to go relay #1908

Merged
merged 9 commits into from
Oct 26, 2023
6 changes: 5 additions & 1 deletion broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Config struct {
RequireFeedVersion bool `koanf:"require-feed-version" reload:"hot"`
Timeout time.Duration `koanf:"timeout" reload:"hot"`
URL []string `koanf:"url"`
SecondaryURL []string `koanf:"secondary-url"`
Verify signature.VerifierConfig `koanf:"verify"`
EnableCompression bool `koanf:"enable-compression" reload:"hot"`
}
Expand All @@ -85,7 +86,8 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".require-chain-id", DefaultConfig.RequireChainId, "require chain id to be present on connect")
f.Bool(prefix+".require-feed-version", DefaultConfig.RequireFeedVersion, "require feed version to be present on connect")
f.Duration(prefix+".timeout", DefaultConfig.Timeout, "duration to wait before timing out connection to sequencer feed")
f.StringSlice(prefix+".url", DefaultConfig.URL, "URL of sequencer feed source")
f.StringSlice(prefix+".url", DefaultConfig.URL, "list of primary URLs of sequencer feed source")
f.StringSlice(prefix+".secondary-url", DefaultConfig.SecondaryURL, "list of secondary URLs of sequencer feed source. Would be started in the order they appear in the list when primary feeds fails")
signature.FeedVerifierConfigAddOptions(prefix+".verify", f)
f.Bool(prefix+".enable-compression", DefaultConfig.EnableCompression, "enable per message deflate compression support")
}
Expand All @@ -97,6 +99,7 @@ var DefaultConfig = Config{
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
URL: []string{},
SecondaryURL: []string{},
Timeout: 20 * time.Second,
EnableCompression: true,
}
Expand All @@ -108,6 +111,7 @@ var DefaultTestConfig = Config{
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
URL: []string{""},
SecondaryURL: []string{},
Timeout: 200 * time.Millisecond,
EnableCompression: true,
}
Expand Down
209 changes: 184 additions & 25 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,46 @@ package broadcastclients
import (
"context"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

const ROUTER_QUEUE_SIZE = 1024
const RECENT_FEED_INITIAL_MAP_SIZE = 1024
const RECENT_FEED_ITEM_TTL = time.Second * 10
const MAX_FEED_INACTIVE_TIME = time.Second * 5
const PRIMARY_FEED_UPTIME = time.Minute * 10

type Router struct {
stopwaiter.StopWaiter
messageChan chan broadcaster.BroadcastFeedMessage
confirmedSequenceNumberChan chan arbutil.MessageIndex

forwardTxStreamer broadcastclient.TransactionStreamerInterface
forwardConfirmationChan chan arbutil.MessageIndex
}

func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error {
for _, feedMessage := range feedMessages {
r.messageChan <- *feedMessage
}
return nil
}

type BroadcastClients struct {
clients []*broadcastclient.BroadcastClient
primaryClients []*broadcastclient.BroadcastClient
secondaryClients []*broadcastclient.BroadcastClient
numOfStartedSecondary int

primaryRouter *Router
secondaryRouter *Router

// Use atomic access
connected int32
Expand All @@ -31,34 +61,58 @@ func NewBroadcastClients(
addrVerifier contracts.AddressVerifierInterface,
) (*BroadcastClients, error) {
config := configFetcher()
urlCount := len(config.URL)
if urlCount <= 0 {
if len(config.URL) == 0 && len(config.SecondaryURL) == 0 {
return nil, nil
}

clients := BroadcastClients{}
clients.clients = make([]*broadcastclient.BroadcastClient, 0, urlCount)
newStandardRouter := func() *Router {
return &Router{
messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE),
confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE),
forwardTxStreamer: txStreamer,
forwardConfirmationChan: confirmedSequenceNumberListener,
}
}
clients := BroadcastClients{
primaryRouter: newStandardRouter(),
secondaryRouter: newStandardRouter(),
}
var lastClientErr error
for _, address := range config.URL {
client, err := broadcastclient.NewBroadcastClient(
configFetcher,
address,
l2ChainId,
currentMessageCount,
txStreamer,
confirmedSequenceNumberListener,
fatalErrChan,
addrVerifier,
func(delta int32) { clients.adjustCount(delta) },
)
if err != nil {
lastClientErr = err
log.Warn("init broadcast client failed", "address", address)
makeFeeds := func(url []string, router *Router) []*broadcastclient.BroadcastClient {
feeds := make([]*broadcastclient.BroadcastClient, 0, len(url))
for _, address := range url {
client, err := broadcastclient.NewBroadcastClient(
configFetcher,
address,
l2ChainId,
currentMessageCount,
router,
router.confirmedSequenceNumberChan,
fatalErrChan,
addrVerifier,
func(delta int32) { clients.adjustCount(delta) },
)
if err != nil {
lastClientErr = err
log.Warn("init broadcast client failed", "address", address)
continue
}
feeds = append(feeds, client)
}
clients.clients = append(clients.clients, client)
return feeds
}
if len(clients.clients) == 0 {

clients.primaryClients = makeFeeds(config.URL, clients.primaryRouter)
clients.secondaryClients = makeFeeds(config.SecondaryURL, clients.secondaryRouter)

if len(clients.primaryClients) == 0 && len(clients.secondaryClients) == 0 {
log.Error("no connected feed on startup, last error: %w", lastClientErr)
return nil, nil
}

// have atleast one primary client
if len(clients.primaryClients) == 0 {
clients.primaryClients = append(clients.primaryClients, clients.secondaryClients[0])
clients.secondaryClients = clients.secondaryClients[1:]
}

return &clients, nil
Expand All @@ -72,12 +126,117 @@ func (bcs *BroadcastClients) adjustCount(delta int32) {
}

func (bcs *BroadcastClients) Start(ctx context.Context) {
for _, client := range bcs.clients {
bcs.primaryRouter.StopWaiter.Start(ctx, bcs.primaryRouter)
bcs.secondaryRouter.StopWaiter.Start(ctx, bcs.secondaryRouter)

for _, client := range bcs.primaryClients {
client.Start(ctx)
}

var lastConfirmed arbutil.MessageIndex
recentFeedItemsNew := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
recentFeedItemsOld := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
bcs.primaryRouter.LaunchThread(func(ctx context.Context) {
recentFeedItemsCleanup := time.NewTicker(RECENT_FEED_ITEM_TTL)
startSecondaryFeedTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME)
stopSecondaryFeedTimer := time.NewTicker(PRIMARY_FEED_UPTIME)
primaryFeedIsDownTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME)
defer recentFeedItemsCleanup.Stop()
defer startSecondaryFeedTimer.Stop()
defer stopSecondaryFeedTimer.Stop()
defer primaryFeedIsDownTimer.Stop()
for {
select {
case <-ctx.Done():
return

// Primary feeds
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := bcs.primaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
bcs.primaryRouter.forwardConfirmationChan <- cs

// Secondary Feeds
case msg := <-bcs.secondaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := bcs.secondaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
log.Error("Error routing message from Secondary Sequencer Feeds", "err", err)
}
case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
bcs.secondaryRouter.forwardConfirmationChan <- cs

// Cycle buckets to get rid of old entries
case <-recentFeedItemsCleanup.C:
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)

// failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed
case <-startSecondaryFeedTimer.C:
bcs.startSecondaryFeed(ctx)

// failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)

// primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed
case <-stopSecondaryFeedTimer.C:
bcs.stopSecondaryFeed(ctx)
}
}
})
}

func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) {
if bcs.numOfStartedSecondary < len(bcs.secondaryClients) {
client := bcs.secondaryClients[bcs.numOfStartedSecondary]
bcs.numOfStartedSecondary += 1
client.Start(ctx)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
} else {
log.Warn("failed to start a new secondary feed all available secondary feeds were started")
}
}
func (bcs *BroadcastClients) stopSecondaryFeed(ctx context.Context) {
if bcs.numOfStartedSecondary > 0 {
bcs.numOfStartedSecondary -= 1
client := bcs.secondaryClients[bcs.numOfStartedSecondary]
client.StopAndWait()
}
}

func (bcs *BroadcastClients) StopAndWait() {
for _, client := range bcs.clients {
for _, client := range bcs.primaryClients {
client.StopAndWait()
}
for i := 0; i < bcs.numOfStartedSecondary; i++ {
bcs.secondaryClients[i].StopAndWait()
}
}
23 changes: 0 additions & 23 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"net"
"time"

flag "github.com/spf13/pflag"

Expand Down Expand Up @@ -77,9 +76,6 @@ func NewRelay(config *Config, feedErrChan chan error) (*Relay, error) {
}, nil
}

const RECENT_FEED_ITEM_TTL = time.Second * 10
const RECENT_FEED_INITIAL_MAP_SIZE = 1024

func (r *Relay) Start(ctx context.Context) error {
r.StopWaiter.Start(ctx, r)
err := r.broadcaster.Initialize()
Expand All @@ -93,35 +89,16 @@ func (r *Relay) Start(ctx context.Context) error {

r.broadcastClients.Start(ctx)

var lastConfirmed arbutil.MessageIndex
recentFeedItemsNew := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
recentFeedItemsOld := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
r.LaunchThread(func(ctx context.Context) {
recentFeedItemsCleanup := time.NewTicker(RECENT_FEED_ITEM_TTL)
defer recentFeedItemsCleanup.Stop()
for {
select {
case <-ctx.Done():
return
case msg := <-r.messageChan:
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
sharedmetrics.UpdateSequenceNumberGauge(msg.SequenceNumber)
r.broadcaster.BroadcastSingleFeedMessage(&msg)
case cs := <-r.confirmedSequenceNumberChan:
if lastConfirmed == cs {
continue
}
r.broadcaster.Confirm(cs)
case <-recentFeedItemsCleanup.C:
// Cycle buckets to get rid of old entries
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
}
}
})
Expand Down
Loading