Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefetch state needed for future block executions by executing them in parallel against old state #2089

Merged
merged 4 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,8 +968,16 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution
log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos)
return false
}
err = s.exec.DigestMessage(pos, msg)
if err != nil {
var msgForPrefetch *arbostypes.MessageWithMetadata
if pos+1 < msgCount {
msg, err := s.GetMessage(pos + 1)
if err != nil {
log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos+1)
return false
}
msgForPrefetch = msg
}
if err = s.exec.DigestMessage(pos, msg, msgForPrefetch); err != nil {
logger := log.Warn
if prevMessageCount < msgCount {
logger = log.Debug
Expand Down
43 changes: 38 additions & 5 deletions execution/gethexec/executionengine.go
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type ExecutionEngine struct {
nextScheduledVersionCheck time.Time // protected by the createBlocksMutex

reorgSequencing bool

prefetchBlock bool
}

func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) {
Expand Down Expand Up @@ -71,6 +73,16 @@ func (s *ExecutionEngine) EnableReorgSequencing() {
s.reorgSequencing = true
}

func (s *ExecutionEngine) EnablePrefetchBlock() {
if s.Started() {
panic("trying to enable prefetch block after start")
}
if s.prefetchBlock {
panic("trying to enable prefetch block when already set")
}
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
s.prefetchBlock = true
}

func (s *ExecutionEngine) SetTransactionStreamer(streamer execution.TransactionStreamer) {
if s.Started() {
panic("trying to set transaction streamer after start")
Expand Down Expand Up @@ -107,7 +119,11 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost
return err
}
for i := range newMessages {
err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i])
var msgForPrefetch *arbostypes.MessageWithMetadata
if i < len(newMessages)-1 {
msgForPrefetch = &newMessages[i]
}
err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch)
if err != nil {
return err
}
Expand Down Expand Up @@ -489,15 +505,20 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess
return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos)))
}

func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
// DigestMessage is used to create a block by executing msg against the latest state and storing it.
// Also, while creating a block by executing msg against the latest state,
// in parallel, creates a block by executing msgForPrefetch (msg+1) against the latest state
// but does not store the block.
// This helps in filling the cache, so that the next block creation is faster.
func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
if !s.createBlocksMutex.TryLock() {
return errors.New("createBlock mutex held")
}
defer s.createBlocksMutex.Unlock()
return s.digestMessageWithBlockMutex(num, msg)
return s.digestMessageWithBlockMutex(num, msg, msgForPrefetch)
}

func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
currentHeader, err := s.getCurrentHeader()
if err != nil {
return err
Expand All @@ -511,11 +532,23 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex,
}

startTime := time.Now()
var wg sync.WaitGroup
if s.prefetchBlock && msgForPrefetch != nil {
wg.Add(1)
go func() {
defer wg.Done()
_, _, _, err := s.createBlockFromNextMessage(msgForPrefetch)
if err != nil {
return
}
}()
}

block, statedb, receipts, err := s.createBlockFromNextMessage(msg)
if err != nil {
return err
}

wg.Wait()
err = s.appendBlock(block, statedb, receipts, time.Since(startTime))
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ func (n *ExecutionNode) StopAndWait() {
// }
}

func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
return n.ExecEngine.DigestMessage(num, msg)
func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch)
}
func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error {
return n.ExecEngine.Reorg(count, newMessages, oldMessages)
Expand Down
7 changes: 7 additions & 0 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type SequencerConfig struct {
MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"`
NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"`
NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"`
EnablePrefetchBlock bool `koanf:"enable-prefetch-block"`
}

func (c *SequencerConfig) Validate() error {
Expand Down Expand Up @@ -97,6 +98,7 @@ var DefaultSequencerConfig = SequencerConfig{
MaxTxDataSize: 95000,
NonceFailureCacheSize: 1024,
NonceFailureCacheExpiry: time.Second,
EnablePrefetchBlock: false,
}

var TestSequencerConfig = SequencerConfig{
Expand All @@ -112,6 +114,7 @@ var TestSequencerConfig = SequencerConfig{
MaxTxDataSize: 95000,
NonceFailureCacheSize: 1024,
NonceFailureCacheExpiry: time.Second,
EnablePrefetchBlock: false,
}

func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -127,6 +130,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-tx-data-size", DefaultSequencerConfig.MaxTxDataSize, "maximum transaction size the sequencer will accept")
f.Int(prefix+".nonce-failure-cache-size", DefaultSequencerConfig.NonceFailureCacheSize, "number of transactions with too high of a nonce to keep in memory while waiting for their predecessor")
f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high")
f.Bool(prefix+".enable-prefetch-block", DefaultSequencerConfig.EnablePrefetchBlock, "enable prefetching of blocks")
}

type txQueueItem struct {
Expand Down Expand Up @@ -324,6 +328,9 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead
}
s.Pause()
execEngine.EnableReorgSequencing()
if config.EnablePrefetchBlock {
execEngine.EnablePrefetchBlock()
}
return s, nil
}

Expand Down
2 changes: 1 addition & 1 deletion execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken")

// always needed
type ExecutionClient interface {
DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error
DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error
Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error
HeadMessageNumber() (arbutil.MessageIndex, error)
HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error)
Expand Down
Loading