diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index f96d51ce0e..7e9cf1dbad 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -968,8 +968,16 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos) return false } - err = s.exec.DigestMessage(pos, msg) - if err != nil { + var msgForPrefetch *arbostypes.MessageWithMetadata + if pos+1 < msgCount { + msg, err := s.GetMessage(pos + 1) + if err != nil { + log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos+1) + return false + } + msgForPrefetch = msg + } + if err = s.exec.DigestMessage(pos, msg, msgForPrefetch); err != nil { logger := log.Warn if prevMessageCount < msgCount { logger = log.Debug diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 20e9ca6f3b..003159589a 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -41,6 +41,8 @@ type ExecutionEngine struct { nextScheduledVersionCheck time.Time // protected by the createBlocksMutex reorgSequencing bool + + prefetchBlock bool } func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) { @@ -71,6 +73,16 @@ func (s *ExecutionEngine) EnableReorgSequencing() { s.reorgSequencing = true } +func (s *ExecutionEngine) EnablePrefetchBlock() { + if s.Started() { + panic("trying to enable prefetch block after start") + } + if s.prefetchBlock { + panic("trying to enable prefetch block when already set") + } + s.prefetchBlock = true +} + func (s *ExecutionEngine) SetTransactionStreamer(streamer execution.TransactionStreamer) { if s.Started() { panic("trying to set transaction streamer after start") @@ -107,7 +119,11 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost return err } for i := range newMessages { - err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i]) + var msgForPrefetch *arbostypes.MessageWithMetadata + if i < len(newMessages)-1 { + msgForPrefetch = &newMessages[i] + } + err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch) if err != nil { return err } @@ -489,15 +505,20 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos))) } -func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { +// DigestMessage is used to create a block by executing msg against the latest state and storing it. +// Also, while creating a block by executing msg against the latest state, +// in parallel, creates a block by executing msgForPrefetch (msg+1) against the latest state +// but does not store the block. +// This helps in filling the cache, so that the next block creation is faster. +func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { if !s.createBlocksMutex.TryLock() { return errors.New("createBlock mutex held") } defer s.createBlocksMutex.Unlock() - return s.digestMessageWithBlockMutex(num, msg) + return s.digestMessageWithBlockMutex(num, msg, msgForPrefetch) } -func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { +func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { currentHeader, err := s.getCurrentHeader() if err != nil { return err @@ -511,11 +532,23 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, } startTime := time.Now() + var wg sync.WaitGroup + if s.prefetchBlock && msgForPrefetch != nil { + wg.Add(1) + go func() { + defer wg.Done() + _, _, _, err := s.createBlockFromNextMessage(msgForPrefetch) + if err != nil { + return + } + }() + } + block, statedb, receipts, err := s.createBlockFromNextMessage(msg) if err != nil { return err } - + wg.Wait() err = s.appendBlock(block, statedb, receipts, time.Since(startTime)) if err != nil { return err diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 00337cc355..1ad73febe7 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -311,8 +311,8 @@ func (n *ExecutionNode) StopAndWait() { // } } -func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { - return n.ExecEngine.DigestMessage(num, msg) +func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { + return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch) } func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error { return n.ExecEngine.Reorg(count, newMessages, oldMessages) diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 5db38cbb4d..9bc6f4378d 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -66,6 +66,7 @@ type SequencerConfig struct { MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"` NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"` NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"` + EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` } func (c *SequencerConfig) Validate() error { @@ -97,6 +98,7 @@ var DefaultSequencerConfig = SequencerConfig{ MaxTxDataSize: 95000, NonceFailureCacheSize: 1024, NonceFailureCacheExpiry: time.Second, + EnablePrefetchBlock: false, } var TestSequencerConfig = SequencerConfig{ @@ -112,6 +114,7 @@ var TestSequencerConfig = SequencerConfig{ MaxTxDataSize: 95000, NonceFailureCacheSize: 1024, NonceFailureCacheExpiry: time.Second, + EnablePrefetchBlock: false, } func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -127,6 +130,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-tx-data-size", DefaultSequencerConfig.MaxTxDataSize, "maximum transaction size the sequencer will accept") f.Int(prefix+".nonce-failure-cache-size", DefaultSequencerConfig.NonceFailureCacheSize, "number of transactions with too high of a nonce to keep in memory while waiting for their predecessor") f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high") + f.Bool(prefix+".enable-prefetch-block", DefaultSequencerConfig.EnablePrefetchBlock, "enable prefetching of blocks") } type txQueueItem struct { @@ -324,6 +328,9 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead } s.Pause() execEngine.EnableReorgSequencing() + if config.EnablePrefetchBlock { + execEngine.EnablePrefetchBlock() + } return s, nil } diff --git a/execution/interface.go b/execution/interface.go index 5f7c01719e..6761011a77 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -28,7 +28,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken") // always needed type ExecutionClient interface { - DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error + DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error HeadMessageNumber() (arbutil.MessageIndex, error) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error)