Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add secondary feed to go relay #1908

Merged
merged 9 commits into from
Oct 26, 2023
6 changes: 5 additions & 1 deletion broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Config struct {
RequireFeedVersion bool `koanf:"require-feed-version" reload:"hot"`
Timeout time.Duration `koanf:"timeout" reload:"hot"`
URL []string `koanf:"url"`
SecondaryURL []string `koanf:"secondary-url"`
Verify signature.VerifierConfig `koanf:"verify"`
EnableCompression bool `koanf:"enable-compression" reload:"hot"`
}
Expand All @@ -85,7 +86,8 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".require-chain-id", DefaultConfig.RequireChainId, "require chain id to be present on connect")
f.Bool(prefix+".require-feed-version", DefaultConfig.RequireFeedVersion, "require feed version to be present on connect")
f.Duration(prefix+".timeout", DefaultConfig.Timeout, "duration to wait before timing out connection to sequencer feed")
f.StringSlice(prefix+".url", DefaultConfig.URL, "URL of sequencer feed source")
f.StringSlice(prefix+".url", DefaultConfig.URL, "list of primary URLs of sequencer feed source")
f.StringSlice(prefix+".secondary-url", DefaultConfig.SecondaryURL, "list of secondary URLs of sequencer feed source")
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
signature.FeedVerifierConfigAddOptions(prefix+".verify", f)
f.Bool(prefix+".enable-compression", DefaultConfig.EnableCompression, "enable per message deflate compression support")
}
Expand All @@ -97,6 +99,7 @@ var DefaultConfig = Config{
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
URL: []string{""},
SecondaryURL: []string{},
Timeout: 20 * time.Second,
EnableCompression: true,
}
Expand All @@ -108,6 +111,7 @@ var DefaultTestConfig = Config{
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
URL: []string{""},
SecondaryURL: []string{},
Timeout: 200 * time.Millisecond,
EnableCompression: true,
}
Expand Down
135 changes: 111 additions & 24 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,42 @@ package broadcastclients
import (
"context"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

const MAX_FEED_INACTIVE_TIME = time.Second * 6
const ROUTER_QUEUE_SIZE = 1024

type Router struct {
stopwaiter.StopWaiter
messageChan chan broadcaster.BroadcastFeedMessage
confirmedSequenceNumberChan chan arbutil.MessageIndex

forwardTxStreamer broadcastclient.TransactionStreamerInterface
forwardConfirmationChan chan arbutil.MessageIndex
}

func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error {
for _, feedMessage := range feedMessages {
r.messageChan <- *feedMessage
}
return nil
}

type BroadcastClients struct {
clients []*broadcastclient.BroadcastClient
primaryClients []*broadcastclient.BroadcastClient
secondaryClients []*broadcastclient.BroadcastClient
numOfStartedSecondary int

router *Router

// Use atomic access
connected int32
Expand All @@ -31,34 +57,55 @@ func NewBroadcastClients(
addrVerifier contracts.AddressVerifierInterface,
) (*BroadcastClients, error) {
config := configFetcher()
urlCount := len(config.URL)
if urlCount <= 0 {
if len(config.URL) == 0 && len(config.SecondaryURL) == 0 {
return nil, nil
}

clients := BroadcastClients{}
clients.clients = make([]*broadcastclient.BroadcastClient, 0, urlCount)
clients := BroadcastClients{
router: &Router{
messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE),
confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE),
forwardTxStreamer: txStreamer,
forwardConfirmationChan: confirmedSequenceNumberListener,
},
}
var lastClientErr error
for _, address := range config.URL {
client, err := broadcastclient.NewBroadcastClient(
configFetcher,
address,
l2ChainId,
currentMessageCount,
txStreamer,
confirmedSequenceNumberListener,
fatalErrChan,
addrVerifier,
func(delta int32) { clients.adjustCount(delta) },
)
if err != nil {
lastClientErr = err
log.Warn("init broadcast client failed", "address", address)
makeFeeds := func(url []string) []*broadcastclient.BroadcastClient {
feeds := make([]*broadcastclient.BroadcastClient, 0, len(url))
for _, address := range url {
client, err := broadcastclient.NewBroadcastClient(
configFetcher,
address,
l2ChainId,
currentMessageCount,
clients.router,
clients.router.confirmedSequenceNumberChan,
fatalErrChan,
addrVerifier,
func(delta int32) { clients.adjustCount(delta) },
)
if err != nil {
lastClientErr = err
log.Warn("init broadcast client failed", "address", address)
continue
}
feeds = append(feeds, client)
}
clients.clients = append(clients.clients, client)
return feeds
}
if len(clients.clients) == 0 {

clients.primaryClients = makeFeeds(config.URL)
clients.secondaryClients = makeFeeds(config.SecondaryURL)

if len(clients.primaryClients) == 0 && len(clients.secondaryClients) == 0 {
log.Error("no connected feed on startup, last error: %w", lastClientErr)
return nil, nil
}

// have atleast one primary client
if len(clients.primaryClients) == 0 {
clients.primaryClients = append(clients.primaryClients, clients.secondaryClients[0])
clients.secondaryClients = clients.secondaryClients[1:]
}

return &clients, nil
Expand All @@ -72,12 +119,52 @@ func (bcs *BroadcastClients) adjustCount(delta int32) {
}

func (bcs *BroadcastClients) Start(ctx context.Context) {
for _, client := range bcs.clients {
bcs.router.StopWaiter.Start(ctx, bcs.router)

for _, client := range bcs.primaryClients {
client.Start(ctx)
}

bcs.router.LaunchThread(func(ctx context.Context) {
startNewFeedTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME)
defer startNewFeedTimer.Stop()
for {
select {
case <-ctx.Done():
return
case cs := <-bcs.router.confirmedSequenceNumberChan:
startNewFeedTimer.Stop()
bcs.router.forwardConfirmationChan <- cs
startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
case msg := <-bcs.router.messageChan:
startNewFeedTimer.Stop()
if err := bcs.router.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
log.Error("Error routing message from Sequencer Feed", "err", err)
}
startNewFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
case <-startNewFeedTimer.C:
// failed to get messages from primary feed for ~5 seconds, start a new feed
bcs.StartSecondaryFeed(ctx)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
}
})
}

func (bcs *BroadcastClients) StartSecondaryFeed(ctx context.Context) {
if bcs.numOfStartedSecondary < len(bcs.secondaryClients) {
client := bcs.secondaryClients[bcs.numOfStartedSecondary]
bcs.numOfStartedSecondary += 1
client.Start(ctx)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
} else {
log.Warn("failed to start a new secondary feed all available secondary feeds were started")
}
}

func (bcs *BroadcastClients) StopAndWait() {
for _, client := range bcs.clients {
for _, client := range bcs.primaryClients {
client.StopAndWait()
}
for i := 0; i < bcs.numOfStartedSecondary; i++ {
bcs.secondaryClients[i].StopAndWait()
}
}
Loading