Skip to content

Commit

Permalink
Prefetch state needed for future block executions by executing them i…
Browse files Browse the repository at this point in the history
…n parallel against old state
  • Loading branch information
amsanghi committed Jan 22, 2024
1 parent 4ad2693 commit 02e2be0
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 9 deletions.
10 changes: 9 additions & 1 deletion arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,15 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution
log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos)
return false
}
err = s.exec.DigestMessage(pos, msg)
var msgForPrefetch *arbostypes.MessageWithMetadata
if pos+1 < msgCount {
msgForPrefetch, err = s.GetMessage(pos + 1)
if err != nil {
log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos+1)
return false
}
}
err = s.exec.DigestMessage(pos, msg, msgForPrefetch)
if err != nil {
logger := log.Warn
if prevMessageCount < msgCount {
Expand Down
38 changes: 33 additions & 5 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type ExecutionEngine struct {
nextScheduledVersionCheck time.Time // protected by the createBlocksMutex

reorgSequencing bool

prefetchBlock bool
}

func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) {
Expand Down Expand Up @@ -71,6 +73,16 @@ func (s *ExecutionEngine) EnableReorgSequencing() {
s.reorgSequencing = true
}

func (s *ExecutionEngine) EnablePrefetchBlock() {
if s.Started() {
panic("trying to enable prefetch block after start")
}
if s.prefetchBlock {
panic("trying to enable prefetch block when already set")
}
s.prefetchBlock = true
}

func (s *ExecutionEngine) SetTransactionStreamer(streamer execution.TransactionStreamer) {
if s.Started() {
panic("trying to set transaction streamer after start")
Expand Down Expand Up @@ -107,7 +119,11 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost
return err
}
for i := range newMessages {
err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i])
var msgForPrefetch *arbostypes.MessageWithMetadata
if i < len(newMessages)-1 {
msgForPrefetch = &newMessages[i]
}
err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch)
if err != nil {
return err
}
Expand Down Expand Up @@ -486,15 +502,15 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess
return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos)))
}

func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
if !s.createBlocksMutex.TryLock() {
return errors.New("createBlock mutex held")
}
defer s.createBlocksMutex.Unlock()
return s.digestMessageWithBlockMutex(num, msg)
return s.digestMessageWithBlockMutex(num, msg, msgForPrefetch)
}

func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
currentHeader, err := s.getCurrentHeader()
if err != nil {
return err
Expand All @@ -508,11 +524,23 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex,
}

startTime := time.Now()
var wg sync.WaitGroup
if s.prefetchBlock && msgForPrefetch != nil {
wg.Add(1)
go func() {
defer wg.Done()
_, _, _, err := s.createBlockFromNextMessage(msgForPrefetch)
if err != nil {
return
}
}()
}

block, statedb, receipts, err := s.createBlockFromNextMessage(msg)
if err != nil {
return err
}

wg.Wait()
err = s.appendBlock(block, statedb, receipts, time.Since(startTime))
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ func (n *ExecutionNode) StopAndWait() {
// }
}

func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
return n.ExecEngine.DigestMessage(num, msg)
func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch)
}
func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error {
return n.ExecEngine.Reorg(count, newMessages, oldMessages)
Expand Down
7 changes: 7 additions & 0 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type SequencerConfig struct {
MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"`
NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"`
NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"`
EnablePrefetchBlock bool `koanf:"enable-prefetch-block"`
}

func (c *SequencerConfig) Validate() error {
Expand Down Expand Up @@ -97,6 +98,7 @@ var DefaultSequencerConfig = SequencerConfig{
MaxTxDataSize: 95000,
NonceFailureCacheSize: 1024,
NonceFailureCacheExpiry: time.Second,
EnablePrefetchBlock: false,
}

var TestSequencerConfig = SequencerConfig{
Expand All @@ -112,6 +114,7 @@ var TestSequencerConfig = SequencerConfig{
MaxTxDataSize: 95000,
NonceFailureCacheSize: 1024,
NonceFailureCacheExpiry: time.Second,
EnablePrefetchBlock: false,
}

func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -127,6 +130,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-tx-data-size", DefaultSequencerConfig.MaxTxDataSize, "maximum transaction size the sequencer will accept")
f.Int(prefix+".nonce-failure-cache-size", DefaultSequencerConfig.NonceFailureCacheSize, "number of transactions with too high of a nonce to keep in memory while waiting for their predecessor")
f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high")
f.Bool(prefix+".enable-prefetch-block", DefaultSequencerConfig.EnablePrefetchBlock, "enable prefetching of blocks")
}

type txQueueItem struct {
Expand Down Expand Up @@ -324,6 +328,9 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead
}
s.Pause()
execEngine.EnableReorgSequencing()
if config.EnablePrefetchBlock {
execEngine.EnablePrefetchBlock()
}
return s, nil
}

Expand Down
2 changes: 1 addition & 1 deletion execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken")

// always needed
type ExecutionClient interface {
DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error
DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error
Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error
HeadMessageNumber() (arbutil.MessageIndex, error)
HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error)
Expand Down

0 comments on commit 02e2be0

Please sign in to comment.