Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution service 4: execution->consensus interface #1535

Merged
merged 50 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
877cf11
improve separation of arb_interface
tsahee Mar 20, 2023
bf3d0f0
remove unused var lastBlockRead
tsahee Mar 20, 2023
ad926ed
remove unused code
tsahee Mar 20, 2023
a96898a
readLastBatchCount atomic instead of mutex
tsahee Mar 21, 2023
45ffab0
split sync_monitor between consensus and execution
tsahee Mar 21, 2023
b2a7e67
test more node_interface functions
tsahee Mar 23, 2023
203c8e0
Merge branch 'execution-service-3' into execution-service-4
tsahee Mar 25, 2023
8186d20
call start/stop for syncMonitor
tsahee Mar 25, 2023
aeed89a
fix TestRedisForwarder
tsahee Mar 25, 2023
fe74bc9
Sequencer price adjusts: make non-parallel
tsahee Mar 26, 2023
e2eb61e
Merge branch 'execution-service-3' into execution-service-4
tsahee Mar 26, 2023
84fc418
fix build of rce-detection tests
tsahee Mar 27, 2023
fc8bcb2
execution: only link to l1 if sequencer
tsahee Mar 29, 2023
32e809c
Merge branch 'execution-service-3' into execution-service-4
tsahee Mar 30, 2023
696d91f
syncMonitor: create Test config
tsahee Apr 3, 2023
633b586
sync monitor: ewname API to SyncTarget
tsahee Apr 3, 2023
ddfdb22
Merge branch 'execution-service-3' into execution-service-4
tsahee Apr 19, 2023
e15dced
fix merge
tsahee Apr 19, 2023
5f7b2d9
Merge branch 'execution-service-3' into execution-service-4
tsahee Apr 30, 2023
12c5c27
Merge branch 'execution-service-3' into execution-service-4
tsahee May 9, 2023
b6e16eb
Merge branch 'execution-service-3' into execution-service-4
tsahee May 11, 2023
e03fa23
Merge branch 'execution-service-3' into execution-service-4
tsahee May 26, 2023
711ad7c
Merge branch 'execution-service-3' into execution-service-4
tsahee Jun 15, 2023
4482c20
fix merge
tsahee Jun 15, 2023
258d1dd
Merge branch 'execution-service-3' into execution-service-4
tsahee Jun 27, 2023
a98e75f
Merge branch 'execution-service-3' into execution-service-4
tsahee Jun 27, 2023
4ed8eef
Merge branch 'execution-service-3' into execution-service-4
tsahee Jul 10, 2023
d8498b0
execution: update interface to support L3
tsahee Jul 10, 2023
47ece30
system_tests: handle initMsg from Deploy
tsahee Jul 10, 2023
26000bb
Merge remote-tracking branch 'origin/master' into execution-service-4
tsahee Oct 4, 2023
18616c4
separate estimation tests from nodeinterface
tsahee Oct 4, 2023
f804c7f
minor edits and renames
tsahee Oct 7, 2023
8657cdd
Merge remote-tracking branch 'origin/master' into execution-service-4
tsahee Nov 1, 2023
8505119
fix merge errors
tsahee Nov 1, 2023
112426f
Merge remote-tracking branch 'origin/master' into execution-service-4
tsahee Nov 9, 2023
d06c18f
Merge remote-tracking branch 'origin/master' into execution-service-4
tsahee Mar 20, 2024
a3e275b
wait for validation before safe or final
tsahee Mar 20, 2024
0ca597b
fix more merge errors
tsahee Mar 20, 2024
0618a80
Merge remote-tracking branch 'origin/master' into execution-service-4
tsahee Mar 20, 2024
0b7a590
support counting validations for blocks not yet posted to parent
tsahee Mar 20, 2024
69758df
NodeInterface: update comments
tsahee Mar 20, 2024
7e82acf
remove seqinbox_test from race testing
tsahee Mar 20, 2024
b49b660
Revert "remove seqinbox_test from race testing"
tsahee Mar 20, 2024
db76dff
remove nodeInterface from race tests
tsahee Mar 20, 2024
157633b
fix small PR comments
tsahee Mar 25, 2024
9777dd0
nodeinterface_test: use bindAPI
tsahee Mar 25, 2024
543be5d
sync_monitor update
tsahee Mar 25, 2024
d806f4d
inbox_tracker: add comment
tsahee Mar 25, 2024
e9f9163
Merge remote-tracking branch 'origin/master' into execution-service-4
tsahee Mar 26, 2024
c581be7
Merge branch 'master' into execution-service-4
PlasmaPower Mar 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 5 additions & 21 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"math"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -90,10 +89,6 @@ type InboxReader struct {

// Atomic
lastSeenBatchCount uint64

// Behind the mutex
lastReadMutex sync.RWMutex
lastReadBlock uint64
lastReadBatchCount uint64
}

Expand Down Expand Up @@ -353,10 +348,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
// There's nothing to do
from = arbmath.BigAddByUint(currentHeight, 1)
blocksToFetch = config.DefaultBlocksToRead
r.lastReadMutex.Lock()
r.lastReadBlock = currentHeight.Uint64()
r.lastReadBatchCount = checkingBatchCount
r.lastReadMutex.Unlock()
atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount)
storeSeenBatchCount()
if !r.caughtUp {
r.caughtUp = true
Expand Down Expand Up @@ -487,10 +479,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}
if len(sequencerBatches) > 0 {
readAnyBatches = true
r.lastReadMutex.Lock()
r.lastReadBlock = to.Uint64()
r.lastReadBatchCount = sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1
r.lastReadMutex.Unlock()
atomic.StoreUint64(&r.lastReadBatchCount, sequencerBatches[len(sequencerBatches)-1].SequenceNumber+1)
storeSeenBatchCount()
}
}
Expand All @@ -517,10 +506,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}

if !readAnyBatches {
r.lastReadMutex.Lock()
r.lastReadBlock = currentHeight.Uint64()
r.lastReadBatchCount = checkingBatchCount
r.lastReadMutex.Unlock()
atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount)
storeSeenBatchCount()
}
}
Expand Down Expand Up @@ -590,10 +576,8 @@ func (r *InboxReader) GetSequencerMessageBytes(ctx context.Context, seqNum uint6
return nil, fmt.Errorf("sequencer batch %v not found in L1 block %v (found batches %v)", seqNum, metadata.ParentChainBlock, seenBatches)
}

func (r *InboxReader) GetLastReadBlockAndBatchCount() (uint64, uint64) {
r.lastReadMutex.RLock()
defer r.lastReadMutex.RUnlock()
return r.lastReadBlock, r.lastReadBatchCount
func (r *InboxReader) GetLastReadBatchCount() uint64 {
return atomic.LoadUint64(&r.lastReadBatchCount)
}

// GetLastSeenBatchCount returns how many sequencer batches the inbox reader has read in from L1.
Expand Down
43 changes: 43 additions & 0 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ func (t *InboxTracker) GetBatchMessageCount(seqNum uint64) (arbutil.MessageIndex
return metadata.MessageCount, err
}

func (t *InboxTracker) GetBatchParentChainBlock(seqNum uint64) (uint64, error) {
metadata, err := t.GetBatchMetadata(seqNum)
return metadata.ParentChainBlock, err
}

// GetBatchAcc is a convenience function wrapping GetBatchMetadata
func (t *InboxTracker) GetBatchAcc(seqNum uint64) (common.Hash, error) {
metadata, err := t.GetBatchMetadata(seqNum)
Expand All @@ -213,6 +218,44 @@ func (t *InboxTracker) GetBatchCount() (uint64, error) {
return count, nil
}

func (t *InboxTracker) FindL1BatchForMessage(pos arbutil.MessageIndex) (uint64, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used to be static_block_validator: FindBatchContainingMessageIndex

batchCount, err := t.GetBatchCount()
if err != nil {
return 0, err
}
low := uint64(0)
high := batchCount - 1
// Iteration preconditions:
// - high >= low
// - msgCount(low - 1) <= pos implies low <= target
// - msgCount(high) > pos implies high >= target
// Therefore, if low == high, then low == high == target
for {
// Due to integer rounding, mid >= low && mid < high
mid := (low + high) / 2
count, err := t.GetBatchMessageCount(mid)
if err != nil {
return 0, err
}
if count < pos {
// Must narrow as mid >= low, therefore mid + 1 > low, therefore newLow > oldLow
// Keeps low precondition as msgCount(mid) < pos
low = mid + 1
} else if count == pos {
return mid + 1, err
} else if count == pos+1 || mid == low { // implied: count > pos
return mid, nil
} else { // implied: count > pos + 1
// Must narrow as mid < high, therefore newHigh < lowHigh
// Keeps high precondition as msgCount(mid) > pos
high = mid
}
if high == low {
return high, err
}
}
}

func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcaster) error {
batchCount, err := t.GetBatchCount()
if err != nil {
Expand Down
65 changes: 50 additions & 15 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcastclients"
Expand Down Expand Up @@ -391,6 +392,7 @@ func ConfigDefaultL1NonSequencerTest() *Config {
config.BatchPoster.Enable = false
config.SeqCoordinator.Enable = false
config.BlockValidator = staker.TestBlockValidatorConfig
config.SyncMonitor = TestSyncMonitorConfig
config.Staker = staker.TestL1ValidatorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServer.URL = ""
Expand All @@ -407,6 +409,7 @@ func ConfigDefaultL2Test() *Config {
config.SeqCoordinator.Signer.ECDSA.AcceptSequencer = false
config.SeqCoordinator.Signer.ECDSA.Dangerous.AcceptMissing = true
config.Staker = staker.TestL1ValidatorConfig
config.SyncMonitor = TestSyncMonitorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServer.URL = ""
config.TransactionStreamer = DefaultTransactionStreamerConfig
Expand Down Expand Up @@ -449,7 +452,6 @@ type Node struct {
SeqCoordinator *SeqCoordinator
MaintenanceRunner *MaintenanceRunner
DASLifecycleManager *das.LifecycleManager
ClassicOutboxRetriever *ClassicOutboxRetriever
SyncMonitor *SyncMonitor
configFetcher ConfigFetcher
ctx context.Context
Expand Down Expand Up @@ -564,17 +566,10 @@ func createNodeImpl(

l2ChainId := l2Config.ChainID.Uint64()

syncMonitor := NewSyncMonitor(&config.SyncMonitor)
var classicOutbox *ClassicOutboxRetriever
classicMsgDb, err := stack.OpenDatabase("classic-msg", 0, 0, "", true)
if err != nil {
if l2Config.ArbitrumChainParams.GenesisBlockNum > 0 {
log.Warn("Classic Msg Database not found", "err", err)
}
classicOutbox = nil
} else {
classicOutbox = NewClassicOutboxRetriever(classicMsgDb)
syncConfigFetcher := func() *SyncMonitorConfig {
return &configFetcher.Get().SyncMonitor
}
syncMonitor := NewSyncMonitor(syncConfigFetcher)

var l1Reader *headerreader.HeaderReader
if config.ParentChainReader.Enable {
Expand Down Expand Up @@ -670,7 +665,6 @@ func createNodeImpl(
SeqCoordinator: coordinator,
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: nil,
ClassicOutboxRetriever: classicOutbox,
SyncMonitor: syncMonitor,
configFetcher: configFetcher,
ctx: ctx,
Expand Down Expand Up @@ -873,7 +867,6 @@ func createNodeImpl(
SeqCoordinator: coordinator,
MaintenanceRunner: maintenanceRunner,
DASLifecycleManager: dasLifecycleManager,
ClassicOutboxRetriever: classicOutbox,
SyncMonitor: syncMonitor,
configFetcher: configFetcher,
ctx: ctx,
Expand Down Expand Up @@ -936,16 +929,19 @@ func (n *Node) Start(ctx context.Context) error {
execClient = nil
}
if execClient != nil {
err := execClient.Initialize(ctx, n, n.SyncMonitor)
err := execClient.Initialize(ctx)
if err != nil {
return fmt.Errorf("error initializing exec client: %w", err)
}
}
n.SyncMonitor.Initialize(n.InboxReader, n.TxStreamer, n.SeqCoordinator, n.Execution)
n.SyncMonitor.Initialize(n.InboxReader, n.TxStreamer, n.SeqCoordinator)
err := n.Stack.Start()
if err != nil {
return fmt.Errorf("error starting geth stack: %w", err)
}
if execClient != nil {
execClient.SetConsensusClient(n)
}
err = n.Execution.Start(ctx)
if err != nil {
return fmt.Errorf("error starting exec client: %w", err)
Expand Down Expand Up @@ -1049,6 +1045,7 @@ func (n *Node) Start(ctx context.Context) error {
if n.configFetcher != nil {
n.configFetcher.Start(ctx)
}
n.SyncMonitor.Start(ctx)
return nil
}

Expand Down Expand Up @@ -1105,10 +1102,48 @@ func (n *Node) StopAndWait() {
// Just stops the redis client (most other stuff was stopped earlier)
n.SeqCoordinator.StopAndWait()
}
n.SyncMonitor.StopAndWait()
if n.DASLifecycleManager != nil {
n.DASLifecycleManager.StopAndWaitUntil(2 * time.Second)
}
if err := n.Stack.Close(); err != nil {
log.Error("error on stak close", "err", err)
}
}

func (n *Node) FetchBatch(ctx context.Context, batchNum uint64) ([]byte, error) {
return n.InboxReader.GetSequencerMessageBytes(ctx, batchNum)
}

func (n *Node) FindL1BatchForMessage(message arbutil.MessageIndex) (uint64, error) {
return n.InboxTracker.FindL1BatchForMessage(message)
}

func (n *Node) GetBatchParentChainBlock(seqNum uint64) (uint64, error) {
return n.InboxTracker.GetBatchParentChainBlock(seqNum)
}

func (n *Node) SyncProgressMap() map[string]interface{} {
return n.SyncMonitor.SyncProgressMap()
}

func (n *Node) SyncTargetMessageCount() arbutil.MessageIndex {
return n.SyncMonitor.SyncTargetMessageCount()
}

// TODO: switch from pulling to pushing safe/finalized
func (n *Node) GetSafeMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
return n.InboxReader.GetSafeMsgCount(ctx)
}

func (n *Node) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
return n.InboxReader.GetFinalizedMsgCount(ctx)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta)
}

func (n *Node) ExpectChosenSequencer() error {
return n.TxStreamer.ExpectChosenSequencer()
}
Loading