Skip to content

Commit

Permalink
Merge branch 'master' into validate-config-params
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli authored Dec 6, 2023
2 parents 1445f1a + f5211ab commit 467d7e0
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 30 deletions.
37 changes: 24 additions & 13 deletions broadcastclients/broadcastclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ func (bcs *BroadcastClients) adjustCount(delta int32) {
}
}

// Clears out a ticker's channel and resets it to the interval
func clearAndResetTicker(timer *time.Ticker, interval time.Duration) {
timer.Stop()
// Clear out any previous ticks
// A ticker's channel is only buffers one tick, so we don't need a loop here
select {
case <-timer.C:
default:
}
timer.Reset(interval)
}

func (bcs *BroadcastClients) Start(ctx context.Context) {
bcs.primaryRouter.StopWaiter.Start(ctx, bcs.primaryRouter)
bcs.secondaryRouter.StopWaiter.Start(ctx, bcs.secondaryRouter)
Expand Down Expand Up @@ -182,46 +194,45 @@ func (bcs *BroadcastClients) Start(ctx context.Context) {
return
// Primary feeds
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if err := msgHandler(msg, bcs.primaryRouter); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME)
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
confSeqHandler(cs, bcs.primaryRouter)
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME)
// Failed to get messages from primary feed for ~5 seconds, reset the timer responsible for stopping a secondary
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)
clearAndResetTicker(stopSecondaryFeedTimer, PRIMARY_FEED_UPTIME)
default:
select {
case <-ctx.Done():
return
// Secondary Feeds
case msg := <-bcs.secondaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
if err := msgHandler(msg, bcs.secondaryRouter); err != nil {
log.Error("Error routing message from Secondary Sequencer Feeds", "err", err)
}
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
confSeqHandler(cs, bcs.secondaryRouter)

clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
case msg := <-bcs.primaryRouter.messageChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
if err := msgHandler(msg, bcs.primaryRouter); err != nil {
log.Error("Error routing message from Primary Sequencer Feeds", "err", err)
}
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME)
case cs := <-bcs.primaryRouter.confirmedSequenceNumberChan:
startSecondaryFeedTimer.Reset(MAX_FEED_INACTIVE_TIME)
primaryFeedIsDownTimer.Reset(MAX_FEED_INACTIVE_TIME)
confSeqHandler(cs, bcs.primaryRouter)
clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME)
clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME)
case <-startSecondaryFeedTimer.C:
bcs.startSecondaryFeed(ctx)
case <-primaryFeedIsDownTimer.C:
stopSecondaryFeedTimer.Reset(PRIMARY_FEED_UPTIME)
clearAndResetTicker(stopSecondaryFeedTimer, PRIMARY_FEED_UPTIME)
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion cmd/chaininfo/chain_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package chaininfo
import (
_ "embed"
"encoding/json"
"errors"
"fmt"
"math/big"
"os"
Expand Down Expand Up @@ -86,7 +87,10 @@ func ProcessChainInfo(chainId uint64, chainName string, l2ChainInfoFiles []strin
if chainId != 0 {
return nil, fmt.Errorf("unsupported chain ID %v", chainId)
}
return nil, fmt.Errorf("unsupported chain name %v", chainName)
if chainName != "" {
return nil, fmt.Errorf("unsupported chain name %v", chainName)
}
return nil, errors.New("must specify --chain.id or --chain.name to choose rollup")
}

func findChainInfo(chainId uint64, chainName string, chainsInfoBytes []byte) (*ChainInfo, error) {
Expand All @@ -95,6 +99,10 @@ func findChainInfo(chainId uint64, chainName string, chainsInfoBytes []byte) (*C
if err != nil {
return nil, err
}
if chainId == 0 && chainName == "" && len(chainsInfo) == 1 {
// If single chain info and no chain id/name given, default to single chain info
return &chainsInfo[0], nil
}
for _, chainInfo := range chainsInfo {
if (chainId == 0 || chainInfo.ChainConfig.ChainID.Uint64() == chainId) && (chainName == "" || chainInfo.ChainName == chainName) {
return &chainInfo, nil
Expand Down
24 changes: 8 additions & 16 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,9 @@ func ParseNode(ctx context.Context, args []string) (*NodeConfig, *genericconf.Wa
l2ChainName := k.String("chain.name")
l2ChainInfoIpfsUrl := k.String("chain.info-ipfs-url")
l2ChainInfoIpfsDownloadPath := k.String("chain.info-ipfs-download-path")
if l2ChainId == 0 && l2ChainName == "" {
return nil, nil, nil, errors.New("must specify --chain.id or --chain.name to choose rollup")
}
l2ChainInfoFiles := k.Strings("chain.info-files")
l2ChainInfoJson := k.String("chain.info-json")
chainFound, err := applyChainParameters(ctx, k, uint64(l2ChainId), l2ChainName, l2ChainInfoFiles, l2ChainInfoJson, l2ChainInfoIpfsUrl, l2ChainInfoIpfsDownloadPath)
err = applyChainParameters(ctx, k, uint64(l2ChainId), l2ChainName, l2ChainInfoFiles, l2ChainInfoJson, l2ChainInfoIpfsUrl, l2ChainInfoIpfsDownloadPath)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -791,13 +788,6 @@ func ParseNode(ctx context.Context, args []string) (*NodeConfig, *genericconf.Wa
}

if nodeConfig.Persistent.Chain == "" {
if !chainFound {
// If persistent-chain not defined, user not creating custom chain
if l2ChainId != 0 {
return nil, nil, nil, fmt.Errorf("Unknown chain id: %d, L2ChainInfoFiles: %v. update chain id, modify --chain.info-files or provide --persistent.chain\n", l2ChainId, l2ChainInfoFiles)
}
return nil, nil, nil, fmt.Errorf("Unknown chain name: %s, L2ChainInfoFiles: %v. update chain name, modify --chain.info-files or provide --persistent.chain\n", l2ChainName, l2ChainInfoFiles)
}
return nil, nil, nil, errors.New("--persistent.chain not specified")
}

Expand All @@ -822,7 +812,7 @@ func ParseNode(ctx context.Context, args []string) (*NodeConfig, *genericconf.Wa
return &nodeConfig, &l1Wallet, &l2DevWallet, nil
}

func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, chainName string, l2ChainInfoFiles []string, l2ChainInfoJson string, l2ChainInfoIpfsUrl string, l2ChainInfoIpfsDownloadPath string) (bool, error) {
func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, chainName string, l2ChainInfoFiles []string, l2ChainInfoJson string, l2ChainInfoIpfsUrl string, l2ChainInfoIpfsDownloadPath string) error {
combinedL2ChainInfoFiles := l2ChainInfoFiles
if l2ChainInfoIpfsUrl != "" {
l2ChainInfoIpfsFile, err := util.GetL2ChainInfoIpfsFile(ctx, l2ChainInfoIpfsUrl, l2ChainInfoIpfsDownloadPath)
Expand All @@ -833,7 +823,7 @@ func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, c
}
chainInfo, err := chaininfo.ProcessChainInfo(chainId, chainName, combinedL2ChainInfoFiles, l2ChainInfoJson)
if err != nil {
return false, err
return err
}
var parentChainIsArbitrum bool
if chainInfo.ParentChainIsArbitrum != nil {
Expand Down Expand Up @@ -866,6 +856,8 @@ func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, c
chainDefaults["node.data-availability.enable"] = true
chainDefaults["node.data-availability.rest-aggregator.enable"] = true
chainDefaults["node.data-availability.rest-aggregator.online-url-list"] = chainInfo.DasIndexUrl
} else if chainInfo.ChainConfig.ArbitrumChainParams.DataAvailabilityCommittee {
chainDefaults["node.data-availability.enable"] = true
}
if !chainInfo.HasGenesisState {
chainDefaults["init.empty"] = true
Expand All @@ -874,7 +866,7 @@ func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, c
l2MaxTxSize := gethexec.DefaultSequencerConfig.MaxTxDataSize
bufferSpace := 5000
if l2MaxTxSize < bufferSpace*2 {
return false, fmt.Errorf("not enough room in parent chain max tx size %v for bufferSpace %v * 2", l2MaxTxSize, bufferSpace)
return fmt.Errorf("not enough room in parent chain max tx size %v for bufferSpace %v * 2", l2MaxTxSize, bufferSpace)
}
safeBatchSize := l2MaxTxSize - bufferSpace
chainDefaults["node.batch-poster.max-size"] = safeBatchSize
Expand All @@ -885,9 +877,9 @@ func applyChainParameters(ctx context.Context, k *koanf.Koanf, chainId uint64, c
}
err = k.Load(confmap.Provider(chainDefaults, "."), nil)
if err != nil {
return false, err
return err
}
return true, nil
return nil
}

type NodeConfigFetcher struct {
Expand Down

0 comments on commit 467d7e0

Please sign in to comment.