Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add secondary feed to go relay #1908

Merged
merged 9 commits into from
Oct 26, 2023
6 changes: 5 additions & 1 deletion broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Config struct {
RequireFeedVersion bool `koanf:"require-feed-version" reload:"hot"`
Timeout time.Duration `koanf:"timeout" reload:"hot"`
URL []string `koanf:"url"`
SecondaryURL []string `koanf:"secondary-url"`
Verify signature.VerifierConfig `koanf:"verify"`
EnableCompression bool `koanf:"enable-compression" reload:"hot"`
}
Expand All @@ -85,7 +86,8 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".require-chain-id", DefaultConfig.RequireChainId, "require chain id to be present on connect")
f.Bool(prefix+".require-feed-version", DefaultConfig.RequireFeedVersion, "require feed version to be present on connect")
f.Duration(prefix+".timeout", DefaultConfig.Timeout, "duration to wait before timing out connection to sequencer feed")
f.StringSlice(prefix+".url", DefaultConfig.URL, "URL of sequencer feed source")
f.StringSlice(prefix+".url", DefaultConfig.URL, "list of primary URLs of sequencer feed source")
f.StringSlice(prefix+".secondary-url", DefaultConfig.SecondaryURL, "list of secondary URLs of sequencer feed source. Would be started in the order they appear in the list when primary feeds fails")
signature.FeedVerifierConfigAddOptions(prefix+".verify", f)
f.Bool(prefix+".enable-compression", DefaultConfig.EnableCompression, "enable per message deflate compression support")
}
Expand All @@ -97,6 +99,7 @@ var DefaultConfig = Config{
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
URL: []string{},
SecondaryURL: []string{},
Timeout: 20 * time.Second,
EnableCompression: true,
}
Expand All @@ -108,6 +111,7 @@ var DefaultTestConfig = Config{
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
URL: []string{""},
SecondaryURL: []string{},
Timeout: 200 * time.Millisecond,
EnableCompression: true,
}
Expand Down
161 changes: 137 additions & 24 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,44 @@ package broadcastclients
import (
"context"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

const ROUTER_QUEUE_SIZE = 1024
const RECENT_FEED_INITIAL_MAP_SIZE = 1024
const RECENT_FEED_ITEM_TTL = time.Second * 10
const MAX_FEED_INACTIVE_TIME = time.Second * 5

type Router struct {
stopwaiter.StopWaiter
messageChan chan broadcaster.BroadcastFeedMessage
confirmedSequenceNumberChan chan arbutil.MessageIndex

forwardTxStreamer broadcastclient.TransactionStreamerInterface
forwardConfirmationChan chan arbutil.MessageIndex
}

func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error {
for _, feedMessage := range feedMessages {
r.messageChan <- *feedMessage
}
return nil
}

type BroadcastClients struct {
clients []*broadcastclient.BroadcastClient
primaryClients []*broadcastclient.BroadcastClient
secondaryClients []*broadcastclient.BroadcastClient
numOfStartedSecondary int

router *Router

// Use atomic access
connected int32
Expand All @@ -31,34 +59,55 @@ func NewBroadcastClients(
addrVerifier contracts.AddressVerifierInterface,
) (*BroadcastClients, error) {
config := configFetcher()
urlCount := len(config.URL)
if urlCount <= 0 {
if len(config.URL) == 0 && len(config.SecondaryURL) == 0 {
return nil, nil
}

clients := BroadcastClients{}
clients.clients = make([]*broadcastclient.BroadcastClient, 0, urlCount)
clients := BroadcastClients{
router: &Router{
messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE),
confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE),
forwardTxStreamer: txStreamer,
forwardConfirmationChan: confirmedSequenceNumberListener,
},
}
var lastClientErr error
for _, address := range config.URL {
client, err := broadcastclient.NewBroadcastClient(
configFetcher,
address,
l2ChainId,
currentMessageCount,
txStreamer,
confirmedSequenceNumberListener,
fatalErrChan,
addrVerifier,
func(delta int32) { clients.adjustCount(delta) },
)
if err != nil {
lastClientErr = err
log.Warn("init broadcast client failed", "address", address)
makeFeeds := func(url []string) []*broadcastclient.BroadcastClient {
feeds := make([]*broadcastclient.BroadcastClient, 0, len(url))
for _, address := range url {
client, err := broadcastclient.NewBroadcastClient(
configFetcher,
address,
l2ChainId,
currentMessageCount,
clients.router,
clients.router.confirmedSequenceNumberChan,
fatalErrChan,
addrVerifier,
func(delta int32) { clients.adjustCount(delta) },
)
if err != nil {
lastClientErr = err
log.Warn("init broadcast client failed", "address", address)
continue
}
feeds = append(feeds, client)
}
clients.clients = append(clients.clients, client)
return feeds
}
if len(clients.clients) == 0 {

clients.primaryClients = makeFeeds(config.URL)
clients.secondaryClients = makeFeeds(config.SecondaryURL)

if len(clients.primaryClients) == 0 && len(clients.secondaryClients) == 0 {
log.Error("no connected feed on startup, last error: %w", lastClientErr)
return nil, nil
}

// have atleast one primary client
if len(clients.primaryClients) == 0 {
clients.primaryClients = append(clients.primaryClients, clients.secondaryClients[0])
clients.secondaryClients = clients.secondaryClients[1:]
}

return &clients, nil
Expand All @@ -72,12 +121,76 @@ func (bcs *BroadcastClients) adjustCount(delta int32) {
}

func (bcs *BroadcastClients) Start(ctx context.Context) {
for _, client := range bcs.clients {
bcs.router.StopWaiter.Start(ctx, bcs.router)

for _, client := range bcs.primaryClients {
client.Start(ctx)
}

var lastConfirmed arbutil.MessageIndex
recentFeedItemsNew := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
recentFeedItemsOld := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
bcs.router.LaunchThread(func(ctx context.Context) {
recentFeedItemsCleanup := time.NewTicker(RECENT_FEED_ITEM_TTL)
startNewFeedTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME)
defer recentFeedItemsCleanup.Stop()
defer startNewFeedTimer.Stop()
for {
select {
case <-ctx.Done():
return
case msg := <-bcs.router.messageChan:
startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
// need to stop the timer because forwardTxStreamer might be blocked when traffic is high
// and that shouldn't create race condition between channels timer.C and messageChan
startNewFeedTimer.Stop()
if err := bcs.router.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
log.Error("Error routing message from Sequencer Feed", "err", err)
}
startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
case cs := <-bcs.router.confirmedSequenceNumberChan:
startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
startNewFeedTimer.Stop()
bcs.router.forwardConfirmationChan <- cs
startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
case <-recentFeedItemsCleanup.C:
// Cycle buckets to get rid of old entries
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
case <-startNewFeedTimer.C:
// failed to get messages from primary feed for ~5 seconds, start a new feed
bcs.StartSecondaryFeed(ctx)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
}
})
}

func (bcs *BroadcastClients) StartSecondaryFeed(ctx context.Context) {
if bcs.numOfStartedSecondary < len(bcs.secondaryClients) {
client := bcs.secondaryClients[bcs.numOfStartedSecondary]
bcs.numOfStartedSecondary += 1
client.Start(ctx)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
} else {
log.Warn("failed to start a new secondary feed all available secondary feeds were started")
}
}

func (bcs *BroadcastClients) StopAndWait() {
for _, client := range bcs.clients {
for _, client := range bcs.primaryClients {
client.StopAndWait()
}
for i := 0; i < bcs.numOfStartedSecondary; i++ {
bcs.secondaryClients[i].StopAndWait()
}
}
23 changes: 0 additions & 23 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"net"
"time"

flag "github.com/spf13/pflag"

Expand Down Expand Up @@ -77,9 +76,6 @@ func NewRelay(config *Config, feedErrChan chan error) (*Relay, error) {
}, nil
}

const RECENT_FEED_ITEM_TTL = time.Second * 10
const RECENT_FEED_INITIAL_MAP_SIZE = 1024

func (r *Relay) Start(ctx context.Context) error {
r.StopWaiter.Start(ctx, r)
err := r.broadcaster.Initialize()
Expand All @@ -93,35 +89,16 @@ func (r *Relay) Start(ctx context.Context) error {

r.broadcastClients.Start(ctx)

var lastConfirmed arbutil.MessageIndex
recentFeedItemsNew := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
recentFeedItemsOld := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
r.LaunchThread(func(ctx context.Context) {
recentFeedItemsCleanup := time.NewTicker(RECENT_FEED_ITEM_TTL)
defer recentFeedItemsCleanup.Stop()
for {
select {
case <-ctx.Done():
return
case msg := <-r.messageChan:
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
sharedmetrics.UpdateSequenceNumberGauge(msg.SequenceNumber)
r.broadcaster.BroadcastSingleFeedMessage(&msg)
case cs := <-r.confirmedSequenceNumberChan:
if lastConfirmed == cs {
continue
}
r.broadcaster.Confirm(cs)
case <-recentFeedItemsCleanup.C:
// Cycle buckets to get rid of old entries
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
}
}
})
Expand Down
Loading