Skip to content

Commit

Permalink
Merge branch 'master' into system-tests-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi authored Oct 30, 2023
2 parents da90832 + cd85568 commit 955502b
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 60 deletions.
6 changes: 5 additions & 1 deletion broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Config struct {
RequireFeedVersion bool `koanf:"require-feed-version" reload:"hot"`
Timeout time.Duration `koanf:"timeout" reload:"hot"`
URL []string `koanf:"url"`
SecondaryURL []string `koanf:"secondary-url"`
Verify signature.VerifierConfig `koanf:"verify"`
EnableCompression bool `koanf:"enable-compression" reload:"hot"`
}
Expand All @@ -85,7 +86,8 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".require-chain-id", DefaultConfig.RequireChainId, "require chain id to be present on connect")
f.Bool(prefix+".require-feed-version", DefaultConfig.RequireFeedVersion, "require feed version to be present on connect")
f.Duration(prefix+".timeout", DefaultConfig.Timeout, "duration to wait before timing out connection to sequencer feed")
f.StringSlice(prefix+".url", DefaultConfig.URL, "URL of sequencer feed source")
f.StringSlice(prefix+".url", DefaultConfig.URL, "list of primary URLs of sequencer feed source")
f.StringSlice(prefix+".secondary-url", DefaultConfig.SecondaryURL, "list of secondary URLs of sequencer feed source. Would be started in the order they appear in the list when primary feeds fails")
signature.FeedVerifierConfigAddOptions(prefix+".verify", f)
f.Bool(prefix+".enable-compression", DefaultConfig.EnableCompression, "enable per message deflate compression support")
}
Expand All @@ -97,6 +99,7 @@ var DefaultConfig = Config{
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
URL: []string{},
SecondaryURL: []string{},
Timeout: 20 * time.Second,
EnableCompression: true,
}
Expand All @@ -108,6 +111,7 @@ var DefaultTestConfig = Config{
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
URL: []string{""},
SecondaryURL: []string{},
Timeout: 200 * time.Millisecond,
EnableCompression: true,
}
Expand Down
209 changes: 184 additions & 25 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,46 @@ package broadcastclients
import (
"context"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/util/contracts"
"github.com/offchainlabs/nitro/util/stopwaiter"
)

const ROUTER_QUEUE_SIZE = 1024
const RECENT_FEED_INITIAL_MAP_SIZE = 1024
const RECENT_FEED_ITEM_TTL = time.Second * 10
const MAX_FEED_INACTIVE_TIME = time.Second * 5
const PRIMARY_FEED_UPTIME = time.Minute * 10

type Router struct {
stopwaiter.StopWaiter
messageChan chan broadcaster.BroadcastFeedMessage
confirmedSequenceNumberChan chan arbutil.MessageIndex

forwardTxStreamer broadcastclient.TransactionStreamerInterface
forwardConfirmationChan chan arbutil.MessageIndex
}

func (r *Router) AddBroadcastMessages(feedMessages []*broadcaster.BroadcastFeedMessage) error {
for _, feedMessage := range feedMessages {
r.messageChan <- *feedMessage
}
return nil
}

type BroadcastClients struct {
clients []*broadcastclient.BroadcastClient
primaryClients []*broadcastclient.BroadcastClient
secondaryClients []*broadcastclient.BroadcastClient
numOfStartedSecondary int

primaryRouter *Router
secondaryRouter *Router

// Use atomic access
connected int32
Expand All @@ -31,34 +61,58 @@ func NewBroadcastClients(
addrVerifier contracts.AddressVerifierInterface,
) (*BroadcastClients, error) {
config := configFetcher()
urlCount := len(config.URL)
if urlCount <= 0 {
if len(config.URL) == 0 && len(config.SecondaryURL) == 0 {
return nil, nil
}

clients := BroadcastClients{}
clients.clients = make([]*broadcastclient.BroadcastClient, 0, urlCount)
newStandardRouter := func() *Router {
return &Router{
messageChan: make(chan broadcaster.BroadcastFeedMessage, ROUTER_QUEUE_SIZE),
confirmedSequenceNumberChan: make(chan arbutil.MessageIndex, ROUTER_QUEUE_SIZE),
forwardTxStreamer: txStreamer,
forwardConfirmationChan: confirmedSequenceNumberListener,
}
}
clients := BroadcastClients{
primaryRouter: newStandardRouter(),
secondaryRouter: newStandardRouter(),
}
var lastClientErr error
for _, address := range config.URL {
client, err := broadcastclient.NewBroadcastClient(
configFetcher,
address,
l2ChainId,
currentMessageCount,
txStreamer,
confirmedSequenceNumberListener,
fatalErrChan,
addrVerifier,
func(delta int32) { clients.adjustCount(delta) },
)
if err != nil {
lastClientErr = err
log.Warn("init broadcast client failed", "address", address)
makeFeeds := func(url []string, router *Router) []*broadcastclient.BroadcastClient {
feeds := make([]*broadcastclient.BroadcastClient, 0, len(url))
for _, address := range url {
client, err := broadcastclient.NewBroadcastClient(
configFetcher,
address,
l2ChainId,
currentMessageCount,
router,
router.confirmedSequenceNumberChan,
fatalErrChan,
addrVerifier,
func(delta int32) { clients.adjustCount(delta) },
)
if err != nil {
lastClientErr = err
log.Warn("init broadcast client failed", "address", address)
continue
}
feeds = append(feeds, client)
}
clients.clients = append(clients.clients, client)
return feeds
}
if len(clients.clients) == 0 {

clients.primaryClients = makeFeeds(config.URL, clients.primaryRouter)
clients.secondaryClients = makeFeeds(config.SecondaryURL, clients.secondaryRouter)

if len(clients.primaryClients) == 0 && len(clients.secondaryClients) == 0 {
log.Error("no connected feed on startup, last error: %w", lastClientErr)
return nil, nil
}

// have atleast one primary client
if len(clients.primaryClients) == 0 {
clients.primaryClients = append(clients.primaryClients, clients.secondaryClients[0])
clients.secondaryClients = clients.secondaryClients[1:]
}

return &clients, nil
Expand All @@ -72,12 +126,117 @@ func (bcs *BroadcastClients) adjustCount(delta int32) {
}

func (bcs *BroadcastClients) Start(ctx context.Context) {
for _, client := range bcs.clients {
bcs.primaryRouter.StopWaiter.Start(ctx, bcs.primaryRouter)
bcs.secondaryRouter.StopWaiter.Start(ctx, bcs.secondaryRouter)

for _, client := range bcs.primaryClients {
client.Start(ctx)
}

var lastConfirmed arbutil.MessageIndex
recentFeedItemsNew := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
recentFeedItemsOld := make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)
bcs.primaryRouter.LaunchThread(func(ctx context.Context) {
recentFeedItemsCleanup := time.NewTicker(RECENT_FEED_ITEM_TTL)
startSecondaryFeedTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME)
stopSecondaryFeedTimer := time.NewTicker(PRIMARY_FEED_UPTIME)
primaryFeedIsDownTimer := time.NewTicker(MAX_FEED_INACTIVE_TIME)
defer recentFeedItemsCleanup.Stop()
defer startSecondaryFeedTimer.Stop()
defer stopSecondaryFeedTimer.Stop()
defer primaryFeedIsDownTimer.Stop()
for {
select {
case <-ctx.Done():
return

// Primary feeds
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := bcs.primaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
bcs.primaryRouter.forwardConfirmationChan <- cs

// Secondary Feeds
case msg := <-bcs.secondaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if _, ok := recentFeedItemsNew[msg.SequenceNumber]; ok {
continue
}
if _, ok := recentFeedItemsOld[msg.SequenceNumber]; ok {
continue
}
recentFeedItemsNew[msg.SequenceNumber] = time.Now()
if err := bcs.secondaryRouter.forwardTxStreamer.AddBroadcastMessages([]*broadcaster.BroadcastFeedMessage{&msg}); err != nil {
log.Error("Error routing message from Secondary Sequencer Feeds", "err", err)
}
case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if cs == lastConfirmed {
continue
}
lastConfirmed = cs
bcs.secondaryRouter.forwardConfirmationChan <- cs

// Cycle buckets to get rid of old entries
case <-recentFeedItemsCleanup.C:
recentFeedItemsOld = recentFeedItemsNew
recentFeedItemsNew = make(map[arbutil.MessageIndex]time.Time, RECENT_FEED_INITIAL_MAP_SIZE)

// failed to get messages from both primary and secondary feeds for ~5 seconds, start a new secondary feed
case <-startSecondaryFeedTimer.C:
bcs.startSecondaryFeed(ctx)

// failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)

// primary feeds have been up and running for PRIMARY_FEED_UPTIME=10 mins without a failure, stop the recently started secondary feed
case <-stopSecondaryFeedTimer.C:
bcs.stopSecondaryFeed(ctx)
}
}
})
}

func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) {
if bcs.numOfStartedSecondary < len(bcs.secondaryClients) {
client := bcs.secondaryClients[bcs.numOfStartedSecondary]
bcs.numOfStartedSecondary += 1
client.Start(ctx)
} else {
log.Warn("failed to start a new secondary feed all available secondary feeds were started")
}
}
func (bcs *BroadcastClients) stopSecondaryFeed(ctx context.Context) {
if bcs.numOfStartedSecondary > 0 {
bcs.numOfStartedSecondary -= 1
client := bcs.secondaryClients[bcs.numOfStartedSecondary]
client.StopAndWait()
}
}

func (bcs *BroadcastClients) StopAndWait() {
for _, client := range bcs.clients {
for _, client := range bcs.primaryClients {
client.StopAndWait()
}
for i := 0; i < bcs.numOfStartedSecondary; i++ {
bcs.secondaryClients[i].StopAndWait()
}
}
2 changes: 1 addition & 1 deletion cmd/daserver/daserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func startup() error {
dasLifecycleManager.Register(&L1ReaderCloser{l1Reader})
}

vcsRevision, vcsTime := confighelpers.GetVersion()
vcsRevision, _, vcsTime := confighelpers.GetVersion()
var rpcServer *http.Server
if serverConfig.EnableRPC {
log.Info("Starting HTTP-RPC server", "addr", serverConfig.RPCAddr, "port", serverConfig.RPCPort, "revision", vcsRevision, "vcs.time", vcsTime)
Expand Down
9 changes: 7 additions & 2 deletions cmd/genericconf/getversion18.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package genericconf

import "runtime/debug"

func GetVersion(definedVersion string, definedTime string, definedModified string) (string, string) {
func GetVersion(definedVersion string, definedTime string, definedModified string) (string, string, string) {
vcsVersion := "development"
vcsTime := "development"
vcsModified := "false"
Expand Down Expand Up @@ -43,5 +43,10 @@ func GetVersion(definedVersion string, definedTime string, definedModified strin
vcsVersion = vcsVersion + "-modified"
}

return vcsVersion, vcsTime
strippedVersion := vcsVersion
if len(strippedVersion) > 0 && strippedVersion[0] == 'v' {
strippedVersion = strippedVersion[1:]
}

return vcsVersion, strippedVersion, vcsTime
}
4 changes: 2 additions & 2 deletions cmd/nitro-val/nitro_val.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func mainImpl() int {
stackConf.P2P.ListenAddr = ""
stackConf.P2P.NoDial = true
stackConf.P2P.NoDiscovery = true
vcsRevision, vcsTime := confighelpers.GetVersion()
stackConf.Version = vcsRevision
vcsRevision, strippedRevision, vcsTime := confighelpers.GetVersion()
stackConf.Version = strippedRevision

pathResolver := func(workdir string) func(string) string {
if workdir == "" {
Expand Down
4 changes: 2 additions & 2 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ func mainImpl() int {
stackConf.P2P.ListenAddr = ""
stackConf.P2P.NoDial = true
stackConf.P2P.NoDiscovery = true
vcsRevision, vcsTime := confighelpers.GetVersion()
stackConf.Version = vcsRevision
vcsRevision, strippedRevision, vcsTime := confighelpers.GetVersion()
stackConf.Version = strippedRevision

pathResolver := func(workdir string) func(string) string {
if workdir == "" {
Expand Down
2 changes: 1 addition & 1 deletion cmd/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func startup() error {
glogger.Verbosity(log.Lvl(relayConfig.LogLevel))
log.Root().SetHandler(glogger)

vcsRevision, vcsTime := confighelpers.GetVersion()
vcsRevision, _, vcsTime := confighelpers.GetVersion()
log.Info("Running Arbitrum nitro relay", "revision", vcsRevision, "vcs.time", vcsTime)

defer log.Info("Cleanly shutting down relay")
Expand Down
4 changes: 2 additions & 2 deletions cmd/util/confighelpers/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ func loadS3Variables(k *koanf.Koanf) error {

var ErrVersion = errors.New("configuration: version requested")

func GetVersion() (string, string) {
func GetVersion() (string, string, string) {
return genericconf.GetVersion(version, datetime, modified)
}

func PrintErrorAndExit(err error, usage func(string)) {
vcsRevision, vcsTime := GetVersion()
vcsRevision, _, vcsTime := GetVersion()
fmt.Printf("Version: %v, time: %v\n", vcsRevision, vcsTime)
if err != nil && errors.Is(err, ErrVersion) {
// Already printed version, just exit
Expand Down
Loading

0 comments on commit 955502b

Please sign in to comment.