Skip to content

Commit

Permalink
Merge pull request #1908 from OffchainLabs/secondary-relay-feed
Browse files Browse the repository at this point in the history
add secondary feed to go relay
  • Loading branch information
ganeshvanahalli authored Oct 26, 2023
2 parents 6393265 + 2f5229b commit e113afb
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 49 deletions.
6 changes: 5 additions & 1 deletion broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Config struct {
RequireFeedVersion bool `koanf:"require-feed-version" reload:"hot"`
Timeout time.Duration `koanf:"timeout" reload:"hot"`
URL []string `koanf:"url"`
SecondaryURL []string `koanf:"secondary-url"`
Verify signature.VerifierConfig `koanf:"verify"`
EnableCompression bool `koanf:"enable-compression" reload:"hot"`
}
Expand All @@ -85,7 +86,8 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".require-chain-id", DefaultConfig.RequireChainId, "require chain id to be present on connect")
f.Bool(prefix+".require-feed-version", DefaultConfig.RequireFeedVersion, "require feed version to be present on connect")
f.Duration(prefix+".timeout", DefaultConfig.Timeout, "duration to wait before timing out connection to sequencer feed")
f.StringSlice(prefix+".url", DefaultConfig.URL, "URL of sequencer feed source")
f.StringSlice(prefix+".url", DefaultConfig.URL, "list of primary URLs of sequencer feed source")
f.StringSlice(prefix+".secondary-url", DefaultConfig.SecondaryURL, "list of secondary URLs of sequencer feed source. Would be started in the order they appear in the list when primary feeds fails")
signature.FeedVerifierConfigAddOptions(prefix+".verify", f)
f.Bool(prefix+".enable-compression", DefaultConfig.EnableCompression, "enable per message deflate compression support")
}
Expand All @@ -97,6 +99,7 @@ var DefaultConfig = Config{
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
URL: []string{},
SecondaryURL: []string{},
Timeout: 20 * time.Second,
EnableCompression: true,
}
Expand All @@ -108,6 +111,7 @@ var DefaultTestConfig = Config{
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
URL: []string{""},
SecondaryURL: []string{},
Timeout: 200 * time.Millisecond,
EnableCompression: true,
}
Expand Down
209 changes: 184 additions & 25 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,46 @@ package broadcastclients
import (
"context"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

const ROUTER_QUEUE_SIZE = 1024
const RECENT_FEED_INITIAL_MAP_SIZE = 1024
const RECENT_FEED_ITEM_TTL = time.Second * 10
const MAX_FEED_INACTIVE_TIME = time.Second * 5
const PRIMARY_FEED_UPTIME = time.Minute * 10

type Router struct {
stopwaiter.StopWaiter
messageChan chan broadcaster.BroadcastFeedMessage
confirmedSequenceNumberChan chan arbutil.MessageIndex

forwardTxStreamer broadcastclient.TransactionStreamerInterface
forwardConfirmationChan chan arbutil.MessageIndex
}

func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error {
for _, feedMessage := range feedMessages {
r.messageChan <- *feedMessage
}
return nil
}

type BroadcastClients struct {
clients []*broadcastclient.BroadcastClient
primaryClients []*broadcastclient.BroadcastClient
secondaryClients []*broadcastclient.BroadcastClient
numOfStartedSecondary int

primaryRouter *Router
secondaryRouter *Router

// Use atomic access
connected int32
Expand All @@ -31,34 +61,58 @@ func NewBroadcastClients(
addrVerifier contracts.AddressVerifierInterface,
) (*BroadcastClients, error) {
config := configFetcher()
urlCount := len(config.URL)
if urlCount <= 0 {
if len(config.URL) == 0 && len(config.SecondaryURL) == 0 {
return nil, nil
}

clients := BroadcastClients{}
clients.clients = make([]*broadcastclient.BroadcastClient, 0, urlCount)
newStandardRouter := func() *Router {
return &Router{
messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE),
confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE),
forwardTxStreamer: txStreamer,
forwardConfirmationChan: confirmedSequenceNumberListener,
}
}
clients := BroadcastClients{
primaryRouter: newStandardRouter(),
secondaryRouter: newStandardRouter(),
}
var lastClientErr error
for _, address := range config.URL {
client, err := broadcastclient.NewBroadcastClient(
configFetcher,
address,
l2ChainId,
currentMessageCount,
txStreamer,
confirmedSequenceNumberListener,
fatalErrChan,
addrVerifier,
func(delta int32) { clients.adjustCount(delta) },
)
if err != nil {
lastClientErr = err
log.Warn("init broadcast client failed", "address", address)
makeFeeds := func(url []string, router *Router) []*broadcastclient.BroadcastClient {
feeds := make([]*broadcastclient.BroadcastClient, 0, len(url))
for _, address := range url {
client, err := broadcastclient.NewBroadcastClient(
configFetcher,
address,
l2ChainId,
currentMessageCount,
router,
router.confirmedSequenceNumberChan,
fatalErrChan,
addrVerifier,
func(delta int32) { clients.adjustCount(delta) },
)
if err != nil {
lastClientErr = err
log.Warn("init broadcast client failed", "address", address)
continue
}
feeds = append(feeds, client)
}
clients.clients = append(clients.clients, client)
return feeds
}
if len(clients.clients) == 0 {

clients.primaryClients = makeFeeds(config.URL, clients.primaryRouter)
clients.secondaryClients = makeFeeds(config.SecondaryURL, clients.secondaryRouter)

if len(clients.primaryClients) == 0 && len(clients.secondaryClients) == 0 {
log.Error("no connected feed on startup, last error: %w", lastClientErr)
return nil, nil
}

// have atleast one primary client
if len(clients.primaryClients) == 0 {
clients.primaryClients = append(clients.primaryClients, clients.secondaryClients[0])
clients.secondaryClients = clients.secondaryClients[1:]
}

return &clients, nil
Expand All @@ -72,12 +126,117 @@ func (bcs *BroadcastClients) adjustCount(delta int32) {
}

func (bcs *BroadcastClients) Start(ctx context.Context) {
for _, client := range bcs.clients {
bcs.primaryRouter.StopWaiter.Start(ctx, bcs.primaryRouter)
bcs.secondaryRouter.StopWaiter.Start(ctx, bcs.secondaryRouter)

for _, client := range bcs.primaryClients {
client.Start(ctx)
}

var lastConfirmed arbutil.MessageIndex
recentFeedItemsNew := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
recentFeedItemsOld := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
bcs.primaryRouter.LaunchThread(func(ctx context.Context) {
recentFeedItemsCleanup := time.NewTicker(RECENT_FEED_ITEM_TTL)
startSecondaryFeedTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME)
stopSecondaryFeedTimer := time.NewTicker(PRIMARY_FEED_UPTIME)
primaryFeedIsDownTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME)
defer recentFeedItemsCleanup.Stop()
defer startSecondaryFeedTimer.Stop()
defer stopSecondaryFeedTimer.Stop()
defer primaryFeedIsDownTimer.Stop()
for {
select {
case <-ctx.Done():
return

// Primary feeds
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := bcs.primaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
bcs.primaryRouter.forwardConfirmationChan <- cs

// Secondary Feeds
case msg := <-bcs.secondaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := bcs.secondaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
log.Error("Error routing message from Secondary Sequencer Feeds", "err", err)
}
case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
bcs.secondaryRouter.forwardConfirmationChan <- cs

// Cycle buckets to get rid of old entries
case <-recentFeedItemsCleanup.C:
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)

// failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed
case <-startSecondaryFeedTimer.C:
bcs.startSecondaryFeed(ctx)

// failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)

// primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed
case <-stopSecondaryFeedTimer.C:
bcs.stopSecondaryFeed(ctx)
}
}
})
}

func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) {
if bcs.numOfStartedSecondary < len(bcs.secondaryClients) {
client := bcs.secondaryClients[bcs.numOfStartedSecondary]
bcs.numOfStartedSecondary += 1
client.Start(ctx)
} else {
log.Warn("failed to start a new secondary feed all available secondary feeds were started")
}
}
func (bcs *BroadcastClients) stopSecondaryFeed(ctx context.Context) {
if bcs.numOfStartedSecondary > 0 {
bcs.numOfStartedSecondary -= 1
client := bcs.secondaryClients[bcs.numOfStartedSecondary]
client.StopAndWait()
}
}

func (bcs *BroadcastClients) StopAndWait() {
for _, client := range bcs.clients {
for _, client := range bcs.primaryClients {
client.StopAndWait()
}
for i := 0; i < bcs.numOfStartedSecondary; i++ {
bcs.secondaryClients[i].StopAndWait()
}
}
23 changes: 0 additions & 23 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"net"
"time"

flag "github.com/spf13/pflag"

Expand Down Expand Up @@ -77,9 +76,6 @@ func NewRelay(config *Config, feedErrChan chan error) (*Relay, error) {
}, nil
}

const RECENT_FEED_ITEM_TTL = time.Second * 10
const RECENT_FEED_INITIAL_MAP_SIZE = 1024

func (r *Relay) Start(ctx context.Context) error {
r.StopWaiter.Start(ctx, r)
err := r.broadcaster.Initialize()
Expand All @@ -93,35 +89,16 @@ func (r *Relay) Start(ctx context.Context) error {

r.broadcastClients.Start(ctx)

var lastConfirmed arbutil.MessageIndex
recentFeedItemsNew := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
recentFeedItemsOld := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
r.LaunchThread(func(ctx context.Context) {
recentFeedItemsCleanup := time.NewTicker(RECENT_FEED_ITEM_TTL)
defer recentFeedItemsCleanup.Stop()
for {
select {
case <-ctx.Done():
return
case msg := <-r.messageChan:
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
sharedmetrics.UpdateSequenceNumberGauge(msg.SequenceNumber)
r.broadcaster.BroadcastSingleFeedMessage(&msg)
case cs := <-r.confirmedSequenceNumberChan:
if lastConfirmed == cs {
continue
}
r.broadcaster.Confirm(cs)
case <-recentFeedItemsCleanup.C:
// Cycle buckets to get rid of old entries
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
}
}
})
Expand Down

0 comments on commit e113afb

Please sign in to comment.