Skip to content

Commit

Permalink
Add an option for the inbox reader to only read safe or finalized L1 …
Browse files Browse the repository at this point in the history
…blocks
  • Loading branch information
ganeshvanahalli committed Jan 17, 2024
1 parent 4dac708 commit 7876f5b
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 29 deletions.
102 changes: 73 additions & 29 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type InboxReaderConfig struct {
DefaultBlocksToRead uint64 `koanf:"default-blocks-to-read" reload:"hot"`
TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"`
MaxBlocksToRead uint64 `koanf:"max-blocks-to-read" reload:"hot"`
ReadMode string `koanf:"read-mode" reload:"hot"`
}

type InboxReaderConfigFetcher func() *InboxReaderConfig
Expand All @@ -39,6 +40,12 @@ func (c *InboxReaderConfig) Validate() error {
if c.MaxBlocksToRead == 0 || c.MaxBlocksToRead < c.DefaultBlocksToRead {
return errors.New("inbox reader max-blocks-to-read cannot be zero or less than default-blocks-to-read")
}
if c.ReadMode != "latest" {
c.ReadMode = strings.ToLower(c.ReadMode)
if c.ReadMode != "safe" && c.ReadMode != "finalized" {
return fmt.Errorf("inbox reader read-mode is invalid, want: safe or finalized, got: %s", c.ReadMode)
}
}
return nil
}

Expand All @@ -50,6 +57,7 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".default-blocks-to-read", DefaultInboxReaderConfig.DefaultBlocksToRead, "the default number of blocks to read at once (will vary based on traffic by default)")
f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once")
f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once")
f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read safe or finalized L1 blocks. Takes string input, valid strings- safe, finalized")
}

var DefaultInboxReaderConfig = InboxReaderConfig{
Expand All @@ -60,6 +68,7 @@ var DefaultInboxReaderConfig = InboxReaderConfig{
DefaultBlocksToRead: 100,
TargetMessagesRead: 500,
MaxBlocksToRead: 2000,
ReadMode: "latest",
}

var TestInboxReaderConfig = InboxReaderConfig{
Expand All @@ -70,6 +79,7 @@ var TestInboxReaderConfig = InboxReaderConfig{
DefaultBlocksToRead: 100,
TargetMessagesRead: 500,
MaxBlocksToRead: 2000,
ReadMode: "latest",
}

type InboxReader struct {
Expand Down Expand Up @@ -218,6 +228,7 @@ func (r *InboxReader) CaughtUp() chan struct{} {
}

func (r *InboxReader) run(ctx context.Context, hadError bool) error {
readMode := r.config().ReadMode
from, err := r.getNextBlockToRead()
if err != nil {
return err
Expand All @@ -238,38 +249,71 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}
defer storeSeenBatchCount() // in case of error
for {

latestHeader, err := r.l1Reader.LastHeader(ctx)
if err != nil {
return err
}
config := r.config()
currentHeight := latestHeader.Number

neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1)
neededBlockHeight := arbmath.BigAddByUint(from, neededBlockAdvance)
checkDelayTimer := time.NewTimer(config.CheckDelay)
WaitForHeight:
for arbmath.BigLessThan(currentHeight, neededBlockHeight) {
select {
case latestHeader = <-newHeaders:
if latestHeader == nil {
// shutting down
currentHeight := big.NewInt(0)
if readMode != "latest" {
var blockNum uint64
fetchLatestSafeOrFinalized := func() {
if readMode == "safe" {
blockNum, err = r.l1Reader.LatestSafeBlockNr(ctx)
} else {
blockNum, err = r.l1Reader.LatestFinalizedBlockNr(ctx)
}
}
fetchLatestSafeOrFinalized()
if err != nil || blockNum == 0 {
return fmt.Errorf("inboxreader running in read only %s mode and unable to fetch latest %s block. err: %w", readMode, readMode, err)
}
currentHeight.SetUint64(blockNum)
// latest block in our db is newer than the latest safe/finalized block hence reset 'from' to match the last safe/finalized block number
if from.Uint64() > currentHeight.Uint64()+1 {
from.Set(currentHeight)
}
for currentHeight.Cmp(from) <= 0 {
select {
case <-newHeaders:
fetchLatestSafeOrFinalized()
if err != nil || blockNum == 0 {
return fmt.Errorf("inboxreader waiting for recent %s block and unable to fetch its block number. err: %w", readMode, err)
}
currentHeight.SetUint64(blockNum)
case <-ctx.Done():
return nil
}
currentHeight = new(big.Int).Set(latestHeader.Number)
case <-ctx.Done():
return nil
case <-checkDelayTimer.C:
break WaitForHeight
}
}
checkDelayTimer.Stop()
} else {

latestHeader, err := r.l1Reader.LastHeader(ctx)
if err != nil {
return err
}
currentHeight = latestHeader.Number

neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1)
neededBlockHeight := arbmath.BigAddByUint(from, neededBlockAdvance)
checkDelayTimer := time.NewTimer(config.CheckDelay)
WaitForHeight:
for arbmath.BigLessThan(currentHeight, neededBlockHeight) {
select {
case latestHeader = <-newHeaders:
if latestHeader == nil {
// shutting down
return nil
}
currentHeight = new(big.Int).Set(latestHeader.Number)
case <-ctx.Done():
return nil
case <-checkDelayTimer.C:
break WaitForHeight
}
}
checkDelayTimer.Stop()

if config.DelayBlocks > 0 {
currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks))
if currentHeight.Cmp(r.firstMessageBlock) < 0 {
currentHeight = new(big.Int).Set(r.firstMessageBlock)
if config.DelayBlocks > 0 {
currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks))
if currentHeight.Cmp(r.firstMessageBlock) < 0 {
currentHeight = new(big.Int).Set(r.firstMessageBlock)
}
}
}

Expand Down Expand Up @@ -358,7 +402,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
r.lastReadBatchCount = checkingBatchCount
r.lastReadMutex.Unlock()
storeSeenBatchCount()
if !r.caughtUp {
if !r.caughtUp && readMode == "latest" {
r.caughtUp = true
close(r.caughtUpChan)
}
Expand Down Expand Up @@ -406,7 +450,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if err != nil {
return err
}
if !r.caughtUp && to.Cmp(currentHeight) == 0 {
if !r.caughtUp && to.Cmp(currentHeight) == 0 && readMode == "latest" {
r.caughtUp = true
close(r.caughtUpChan)
}
Expand Down
7 changes: 7 additions & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ func (c *Config) Validate() error {
if c.DelayedSequencer.Enable && !c.Sequencer {
return errors.New("cannot enable delayed sequencer without enabling sequencer")
}
if c.InboxReader.ReadMode != "latest" {
if c.Sequencer {
return errors.New("cannot enable inboxreader in safe or finalized mode along with sequencer")
}
c.Feed.Output.Enable = false
c.Feed.Input.URL = []string{}
}
if err := c.BlockValidator.Validate(); err != nil {
return err
}
Expand Down

0 comments on commit 7876f5b

Please sign in to comment.