Skip to content

Commit

Permalink
Merge branch 'master' into gate-get-scheduled-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuacolvin0 authored Feb 2, 2024
2 parents acc85b3 + 4128f24 commit 0fcce6a
Show file tree
Hide file tree
Showing 31 changed files with 530 additions and 117 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ jobs:
skip-pkg-cache: true
- name: Custom Lint
run: |
go run ./linter/koanf ./...
go run ./linter/pointercheck ./...
go run ./linters ./...
- name: Set environment variables
run: |
Expand Down
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,7 @@ contracts/test/prover/proofs/%.json: $(arbitrator_cases)/%.wasm $(arbitrator_pro
# strategic rules to minimize dependency building

.make/lint: $(DEP_PREDICATE) build-node-deps $(ORDER_ONLY_PREDICATE) .make
go run ./linter/koanf ./...
go run ./linter/pointercheck ./...
go run ./linters ./...
golangci-lint run --fix
yarn --cwd contracts solhint
@touch $@
Expand Down
12 changes: 10 additions & 2 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,8 +968,16 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution
log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos)
return false
}
err = s.exec.DigestMessage(pos, msg)
if err != nil {
var msgForPrefetch *arbostypes.MessageWithMetadata
if pos+1 < msgCount {
msg, err := s.GetMessage(pos + 1)
if err != nil {
log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos+1)
return false
}
msgForPrefetch = msg
}
if err = s.exec.DigestMessage(pos, msg, msgForPrefetch); err != nil {
logger := log.Warn
if prevMessageCount < msgCount {
logger = log.Debug
Expand Down
9 changes: 6 additions & 3 deletions arbstate/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
}
payload := data[40:]

// Stage 1: Extract the payload from any data availability header.
// It's important that multiple DAS strategies can't both be invoked in the same batch,
// as these headers are validated by the sequencer inbox and not other DASs.
if len(payload) > 0 && IsDASMessageHeaderByte(payload[0]) {
if dasReader == nil {
log.Error("No DAS Reader configured, but sequencer message found with DAS header")
Expand All @@ -88,9 +91,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
return parsedMsg, nil
}
}
}

if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) {
} else if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) {
blobHashes := payload[1:]
if len(blobHashes)%len(common.Hash{}) != 0 {
return nil, fmt.Errorf("blob batch data is not a list of hashes as expected")
Expand All @@ -115,6 +116,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
}
}

// Stage 2: If enabled, decode the zero heavy payload (saves gas based on calldata charging).
if len(payload) > 0 && IsZeroheavyEncodedHeaderByte(payload[0]) {
pl, err := io.ReadAll(io.LimitReader(zeroheavy.NewZeroheavyDecoder(bytes.NewReader(payload[1:])), int64(maxZeroheavyDecompressedLen)))
if err != nil {
Expand All @@ -124,6 +126,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
payload = pl
}

// Stage 3: Decompress the brotli payload and fill the parsedMsg.segments list.
if len(payload) > 0 && IsBrotliMessageHeaderByte(payload[0]) {
decompressed, err := arbcompress.Decompress(payload[1:], MaxDecompressedLen)
if err == nil {
Expand Down
67 changes: 39 additions & 28 deletions cmd/conf/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,44 @@ package conf
import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/spf13/pflag"
)

type InitConfig struct {
Force bool `koanf:"force"`
Url string `koanf:"url"`
DownloadPath string `koanf:"download-path"`
DownloadPoll time.Duration `koanf:"download-poll"`
DevInit bool `koanf:"dev-init"`
DevInitAddress string `koanf:"dev-init-address"`
DevInitBlockNum uint64 `koanf:"dev-init-blocknum"`
Empty bool `koanf:"empty"`
AccountsPerSync uint `koanf:"accounts-per-sync"`
ImportFile string `koanf:"import-file"`
ThenQuit bool `koanf:"then-quit"`
Prune string `koanf:"prune"`
PruneBloomSize uint64 `koanf:"prune-bloom-size"`
ResetToMessage int64 `koanf:"reset-to-message"`
Force bool `koanf:"force"`
Url string `koanf:"url"`
DownloadPath string `koanf:"download-path"`
DownloadPoll time.Duration `koanf:"download-poll"`
DevInit bool `koanf:"dev-init"`
DevInitAddress string `koanf:"dev-init-address"`
DevInitBlockNum uint64 `koanf:"dev-init-blocknum"`
Empty bool `koanf:"empty"`
AccountsPerSync uint `koanf:"accounts-per-sync"`
ImportFile string `koanf:"import-file"`
ThenQuit bool `koanf:"then-quit"`
Prune string `koanf:"prune"`
PruneBloomSize uint64 `koanf:"prune-bloom-size"`
ResetToMessage int64 `koanf:"reset-to-message"`
RecreateMissingStateFrom uint64 `koanf:"recreate-missing-state-from"`
}

var InitConfigDefault = InitConfig{
Force: false,
Url: "",
DownloadPath: "/tmp/",
DownloadPoll: time.Minute,
DevInit: false,
DevInitAddress: "",
DevInitBlockNum: 0,
Empty: false,
ImportFile: "",
AccountsPerSync: 100000,
ThenQuit: false,
Prune: "",
PruneBloomSize: 2048,
ResetToMessage: -1,
Force: false,
Url: "",
DownloadPath: "/tmp/",
DownloadPoll: time.Minute,
DevInit: false,
DevInitAddress: "",
DevInitBlockNum: 0,
Empty: false,
ImportFile: "",
AccountsPerSync: 100000,
ThenQuit: false,
Prune: "",
PruneBloomSize: 2048,
ResetToMessage: -1,
RecreateMissingStateFrom: 0, // 0 = disabled
}

func InitConfigAddOptions(prefix string, f *pflag.FlagSet) {
Expand All @@ -55,4 +58,12 @@ func InitConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".prune", InitConfigDefault.Prune, "pruning for a given use: \"full\" for full nodes serving RPC requests, or \"validator\" for validators")
f.Uint64(prefix+".prune-bloom-size", InitConfigDefault.PruneBloomSize, "the amount of memory in megabytes to use for the pruning bloom filter (higher values prune better)")
f.Int64(prefix+".reset-to-message", InitConfigDefault.ResetToMessage, "forces a reset to an old message height. Also set max-reorg-resequence-depth=0 to force re-reading messages")
f.Uint64(prefix+".recreate-missing-state-from", InitConfigDefault.RecreateMissingStateFrom, "block number to start recreating missing states from (0 = disabled)")
}

func (c *InitConfig) Validate() error {
if c.Force && c.RecreateMissingStateFrom > 0 {
log.Warn("force init enabled, recreate-missing-state-from will have no effect")
}
return nil
}
8 changes: 8 additions & 0 deletions cmd/nitro/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/offchainlabs/nitro/cmd/conf"
"github.com/offchainlabs/nitro/cmd/ipfshelper"
"github.com/offchainlabs/nitro/cmd/pruning"
"github.com/offchainlabs/nitro/cmd/staterecovery"
"github.com/offchainlabs/nitro/cmd/util"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/statetransfer"
Expand Down Expand Up @@ -183,6 +184,13 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo
if err != nil {
return chainDb, l2BlockChain, err
}
if config.Init.RecreateMissingStateFrom > 0 {
err = staterecovery.RecreateMissingStates(chainDb, l2BlockChain, cacheConfig, config.Init.RecreateMissingStateFrom)
if err != nil {
return chainDb, l2BlockChain, fmt.Errorf("failed to recreate missing states: %w", err)
}
}

return chainDb, l2BlockChain, nil
}
readOnlyDb.Close()
Expand Down
6 changes: 6 additions & 0 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,12 @@ func (c *NodeConfig) CanReload(new *NodeConfig) error {
}

func (c *NodeConfig) Validate() error {
if c.Init.RecreateMissingStateFrom > 0 && !c.Execution.Caching.Archive {
return errors.New("recreate-missing-state-from enabled for a non-archive node")
}
if err := c.Init.Validate(); err != nil {
return err
}
if err := c.ParentChain.Validate(); err != nil {
return err
}
Expand Down
88 changes: 88 additions & 0 deletions cmd/staterecovery/staterecovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package staterecovery

import (
"fmt"
"time"

"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
)

func RecreateMissingStates(chainDb ethdb.Database, bc *core.BlockChain, cacheConfig *core.CacheConfig, startBlock uint64) error {
start := time.Now()
currentHeader := bc.CurrentBlock()
if currentHeader == nil {
return fmt.Errorf("current header is nil")
}
target := currentHeader.Number.Uint64()
current := startBlock
genesis := bc.Config().ArbitrumChainParams.GenesisBlockNum
if current < genesis+1 {
current = genesis + 1
log.Warn("recreate-missing-states-from before genesis+1, starting from genesis+1", "configured", startBlock, "override", current)
}
previousBlock := bc.GetBlockByNumber(current - 1)
if previousBlock == nil {
return fmt.Errorf("start block parent is missing, parent block number: %d", current-1)
}
hashConfig := *hashdb.Defaults
hashConfig.CleanCacheSize = cacheConfig.TrieCleanLimit
trieConfig := &trie.Config{
Preimages: false,
HashDB: &hashConfig,
}
database := state.NewDatabaseWithConfig(chainDb, trieConfig)
defer database.TrieDB().Close()
previousState, err := state.New(previousBlock.Root(), database, nil)
if err != nil {
return fmt.Errorf("state of start block parent is missing: %w", err)
}
// we don't need to reference states with `trie.Database.Reference` here, because:
// * either the state nodes will be read from disk and then cached in cleans cache
// * or they will be recreated, saved to disk and then also cached in cleans cache
logged := time.Unix(0, 0)
recreated := 0
for {
currentBlock := bc.GetBlockByNumber(current)
if currentBlock == nil {
break
}
if time.Since(logged) > 1*time.Minute {
log.Info("Recreating missing states", "block", current, "target", target, "remaining", int64(target)-int64(current), "elapsed", time.Since(start), "recreated", recreated)
logged = time.Now()
}
currentState, err := state.New(currentBlock.Root(), database, nil)
if err != nil {
_, _, _, err := bc.Processor().Process(currentBlock, previousState, vm.Config{})
if err != nil {
return fmt.Errorf("processing block %d failed: %w", current, err)
}
root, err := previousState.Commit(current, bc.Config().IsEIP158(currentBlock.Number()))
if err != nil {
return fmt.Errorf("StateDB commit failed, number %d root %v: %w", current, currentBlock.Root(), err)
}
if root.Cmp(currentBlock.Root()) != 0 {
return fmt.Errorf("reached different state root after processing block %d, have %v, want %v", current, root, currentBlock.Root())
}
// commit to disk
err = database.TrieDB().Commit(root, false)
if err != nil {
return fmt.Errorf("TrieDB commit failed, number %d root %v: %w", current, root, err)
}
currentState, err = state.New(currentBlock.Root(), database, nil)
if err != nil {
return fmt.Errorf("state reset after block %d failed: %w", current, err)
}
recreated++
}
current++
previousState = currentState
}
log.Info("Finished recreating missing states", "elapsed", time.Since(start), "recreated", recreated)
return nil
}
1 change: 1 addition & 0 deletions execution/gethexec/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var DefaultCachingConfig = CachingConfig{
MaxAmountOfGasToSkipStateSaving: 0,
}

// TODO remove stack from parameters as it is no longer needed here
func DefaultCacheConfigFor(stack *node.Node, cachingConfig *CachingConfig) *core.CacheConfig {
baseConf := ethconfig.Defaults
if cachingConfig.Archive {
Expand Down
43 changes: 38 additions & 5 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type ExecutionEngine struct {
nextScheduledVersionCheck time.Time // protected by the createBlocksMutex

reorgSequencing bool

prefetchBlock bool
}

func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) {
Expand Down Expand Up @@ -71,6 +73,16 @@ func (s *ExecutionEngine) EnableReorgSequencing() {
s.reorgSequencing = true
}

func (s *ExecutionEngine) EnablePrefetchBlock() {
if s.Started() {
panic("trying to enable prefetch block after start")
}
if s.prefetchBlock {
panic("trying to enable prefetch block when already set")
}
s.prefetchBlock = true
}

func (s *ExecutionEngine) SetTransactionStreamer(streamer execution.TransactionStreamer) {
if s.Started() {
panic("trying to set transaction streamer after start")
Expand Down Expand Up @@ -107,7 +119,11 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost
return err
}
for i := range newMessages {
err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i])
var msgForPrefetch *arbostypes.MessageWithMetadata
if i < len(newMessages)-1 {
msgForPrefetch = &newMessages[i]
}
err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch)
if err != nil {
return err
}
Expand Down Expand Up @@ -489,15 +505,20 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess
return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos)))
}

func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
// DigestMessage is used to create a block by executing msg against the latest state and storing it.
// Also, while creating a block by executing msg against the latest state,
// in parallel, creates a block by executing msgForPrefetch (msg+1) against the latest state
// but does not store the block.
// This helps in filling the cache, so that the next block creation is faster.
func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
if !s.createBlocksMutex.TryLock() {
return errors.New("createBlock mutex held")
}
defer s.createBlocksMutex.Unlock()
return s.digestMessageWithBlockMutex(num, msg)
return s.digestMessageWithBlockMutex(num, msg, msgForPrefetch)
}

func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
currentHeader, err := s.getCurrentHeader()
if err != nil {
return err
Expand All @@ -511,11 +532,23 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex,
}

startTime := time.Now()
var wg sync.WaitGroup
if s.prefetchBlock && msgForPrefetch != nil {
wg.Add(1)
go func() {
defer wg.Done()
_, _, _, err := s.createBlockFromNextMessage(msgForPrefetch)
if err != nil {
return
}
}()
}

block, statedb, receipts, err := s.createBlockFromNextMessage(msg)
if err != nil {
return err
}

wg.Wait()
err = s.appendBlock(block, statedb, receipts, time.Since(startTime))
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ func (n *ExecutionNode) StopAndWait() {
// }
}

func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
return n.ExecEngine.DigestMessage(num, msg)
func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch)
}
func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error {
return n.ExecEngine.Reorg(count, newMessages, oldMessages)
Expand Down
Loading

0 comments on commit 0fcce6a

Please sign in to comment.