diff --git a/.github/workflows/merge-checks.yml b/.github/workflows/merge-checks.yml index 6561c429e2..c9f7957389 100644 --- a/.github/workflows/merge-checks.yml +++ b/.github/workflows/merge-checks.yml @@ -17,7 +17,7 @@ jobs: run: | set -x pipefail status_state="pending" - if ${{ contains(github.event.*.labels.*.name, 'design-approved') }}; then + if ${{ contains(github.event.pull_request.labels.*.name, 'design-approved') && !contains(github.event.pull_request.labels.*.name, 'after-next-version') }}; then status_state="success" else resp="$(curl -sSL --fail-with-body \ diff --git a/.github/workflows/submodule-pin-check.sh b/.github/workflows/submodule-pin-check.sh new file mode 100755 index 0000000000..aecb287ce1 --- /dev/null +++ b/.github/workflows/submodule-pin-check.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +declare -Ar exceptions=( + [contracts]=origin/develop + [nitro-testnode]=origin/master + + #TODO Rachel to check these are the intended branches. + [arbitrator/langs/c]=origin/vm-storage-cache + [arbitrator/tools/wasmer]=origin/adopt-v4.2.8 +) + +divergent=0 +for mod in `git submodule --quiet foreach 'echo $name'`; do + branch=origin/HEAD + if [[ -v exceptions[$mod] ]]; then + branch=${exceptions[$mod]} + fi + + if ! git -C $mod merge-base --is-ancestor HEAD $branch; then + echo $mod diverges from $branch + divergent=1 + fi +done + +exit $divergent + diff --git a/.github/workflows/submodule-pin-check.yml b/.github/workflows/submodule-pin-check.yml new file mode 100644 index 0000000000..e459bad34d --- /dev/null +++ b/.github/workflows/submodule-pin-check.yml @@ -0,0 +1,21 @@ +name: Submodule Pin Check + +on: + pull_request: + branches: [ master ] + types: [synchronize, opened, reopened] + +jobs: + submodule-pin-check: + name: Submodule Pin Check + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + submodules: recursive + + - name: Check all submodules are ancestors of origin/HEAD or configured branch + run: ${{ github.workspace }}/.github/workflows/submodule-pin-check.sh + diff --git a/Dockerfile b/Dockerfile index 69e688679b..2fced8fade 100644 --- a/Dockerfile +++ b/Dockerfile @@ -250,7 +250,10 @@ COPY --from=node-builder /workspace/target/bin/nitro /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/relay /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/nitro-val /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/seq-coordinator-manager /usr/local/bin/ +COPY --from=node-builder /workspace/target/bin/prover /usr/local/bin/ COPY --from=machine-versions /workspace/machines /home/user/target/machines +COPY ./scripts/validate-wasm-module-root.sh . +RUN ./validate-wasm-module-root.sh /home/user/target/machines /usr/local/bin/prover USER root RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ diff --git a/arbitrator/langs/bf b/arbitrator/langs/bf index 062b87bad1..cb5750580f 160000 --- a/arbitrator/langs/bf +++ b/arbitrator/langs/bf @@ -1 +1 @@ -Subproject commit 062b87bad1ec00d42b9cc2b5ee41e63cd6ff1cbb +Subproject commit cb5750580f6990b5166ffce83de11b766a25ca31 diff --git a/arbitrator/prover/src/main.rs b/arbitrator/prover/src/main.rs index 9ddd5020c8..e10a415389 100644 --- a/arbitrator/prover/src/main.rs +++ b/arbitrator/prover/src/main.rs @@ -41,6 +41,9 @@ struct Opts { #[structopt(long)] /// print modules to the console print_modules: bool, + #[structopt(long)] + /// print wasm module root to the console + print_wasmmoduleroot: bool, /// profile output instead of generting proofs #[structopt(short = "p", long)] profile_run: bool, @@ -122,6 +125,18 @@ const DELAYED_HEADER_LEN: usize = 112; // also in test-case's host-io.rs & contr fn main() -> Result<()> { let opts = Opts::from_args(); + if opts.print_wasmmoduleroot { + match Machine::new_from_wavm(&opts.binary) { + Ok(mach) => { + println!("0x{}", mach.get_modules_root()); + return Ok(()); + } + Err(err) => { + eprintln!("Error loading binary: {err}"); + return Err(err); + } + } + } let mut inbox_contents = HashMap::default(); let mut inbox_position = opts.inbox_position; let mut delayed_position = opts.delayed_inbox_position; diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 2617a9a629..ccf9b4aab7 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -112,7 +112,7 @@ type BatchPoster struct { // This is an atomic variable that should only be accessed atomically. // An estimate of the number of batches we want to post but haven't yet. // This doesn't include batches which we don't want to post yet due to the L1 bounds. - backlog uint64 + backlog atomic.Uint64 lastHitL1Bounds time.Time // The last time we wanted to post a message but hit the L1 bounds batchReverted atomic.Bool // indicates whether data poster batch was reverted @@ -1086,7 +1086,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) if config.IgnoreBlobPrice { use4844 = true } else { - backlog := atomic.LoadUint64(&b.backlog) + backlog := b.backlog.Load() // Logic to prevent switching from non-4844 batches to 4844 batches too often, // so that blocks can be filled efficiently. The geth txpool rejects txs for // accounts that already have the other type of txs in the pool with @@ -1437,7 +1437,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) // Setting the backlog to 0 here ensures that we don't lower compression as a result. backlog = 0 } - atomic.StoreUint64(&b.backlog, backlog) + b.backlog.Store(backlog) b.building = nil // If we aren't queueing up transactions, wait for the receipt before moving on to the next batch. @@ -1453,7 +1453,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) } func (b *BatchPoster) GetBacklogEstimate() uint64 { - return atomic.LoadUint64(&b.backlog) + return b.backlog.Load() } func (b *BatchPoster) Start(ctxIn context.Context) { diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 3ba9aa78f3..78b4db1929 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -97,8 +97,8 @@ type InboxReader struct { l1Reader *headerreader.HeaderReader // Atomic - lastSeenBatchCount uint64 - lastReadBatchCount uint64 + lastSeenBatchCount atomic.Uint64 + lastReadBatchCount atomic.Uint64 } func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) { @@ -240,7 +240,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { seenBatchCountStored := uint64(math.MaxUint64) storeSeenBatchCount := func() { if seenBatchCountStored != seenBatchCount { - atomic.StoreUint64(&r.lastSeenBatchCount, seenBatchCount) + r.lastSeenBatchCount.Store(seenBatchCount) seenBatchCountStored = seenBatchCount } } @@ -394,7 +394,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { // There's nothing to do from = arbmath.BigAddByUint(currentHeight, 1) blocksToFetch = config.DefaultBlocksToRead - atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount) + r.lastReadBatchCount.Store(checkingBatchCount) storeSeenBatchCount() if !r.caughtUp && readMode == "latest" { r.caughtUp = true @@ -526,7 +526,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } if len(sequencerBatches) > 0 { readAnyBatches = true - atomic.StoreUint64(&r.lastReadBatchCount, sequencerBatches[len(sequencerBatches)-1].SequenceNumber+1) + r.lastReadBatchCount.Store(sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1) storeSeenBatchCount() } } @@ -553,7 +553,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } if !readAnyBatches { - atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount) + r.lastReadBatchCount.Store(checkingBatchCount) storeSeenBatchCount() } } @@ -625,7 +625,7 @@ func (r *InboxReader) GetSequencerMessageBytes(ctx context.Context, seqNum uint6 } func (r *InboxReader) GetLastReadBatchCount() uint64 { - return atomic.LoadUint64(&r.lastReadBatchCount) + return r.lastReadBatchCount.Load() } // GetLastSeenBatchCount returns how many sequencer batches the inbox reader has read in from L1. @@ -633,7 +633,7 @@ func (r *InboxReader) GetLastReadBatchCount() uint64 { // >0 - last batchcount seen in run() - only written after lastReadBatchCount updated // 0 - no batch seen, error func (r *InboxReader) GetLastSeenBatchCount() uint64 { - return atomic.LoadUint64(&r.lastSeenBatchCount) + return r.lastSeenBatchCount.Load() } func (r *InboxReader) GetDelayBlocks() uint64 { diff --git a/arbnode/redislock/redis.go b/arbnode/redislock/redis.go index 09296e3c79..7e26010cae 100644 --- a/arbnode/redislock/redis.go +++ b/arbnode/redislock/redis.go @@ -21,7 +21,7 @@ type Simple struct { stopwaiter.StopWaiter client redis.UniversalClient config SimpleCfgFetcher - lockedUntil int64 + lockedUntil atomic.Int64 mutex sync.Mutex stopping bool readyToLock func() bool @@ -239,12 +239,12 @@ func execTestPipe(pipe redis.Pipeliner, ctx context.Context) error { } // notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter. -func atomicTimeRead(addr *int64) time.Time { - asint64 := atomic.LoadInt64(addr) +func atomicTimeRead(addr *atomic.Int64) time.Time { + asint64 := addr.Load() return time.UnixMilli(asint64) } -func atomicTimeWrite(addr *int64, t time.Time) { +func atomicTimeWrite(addr *atomic.Int64, t time.Time) { asint64 := t.UnixMilli() - atomic.StoreInt64(addr, asint64) + addr.Store(asint64) } diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index cdf1011b11..98c19ce361 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -48,7 +48,7 @@ type SeqCoordinator struct { prevChosenSequencer string reportedWantsLockout bool - lockoutUntil int64 // atomic + lockoutUntil atomic.Int64 // atomic wantsLockoutMutex sync.Mutex // manages access to acquireLockoutAndWriteMessage and generally the wants lockout key avoidLockout int // If > 0, prevents acquiring the lockout but not extending the lockout if no alternative sequencer wants the lockout. Protected by chosenUpdateMutex. @@ -191,14 +191,14 @@ func StandaloneSeqCoordinatorInvalidateMsgIndex(ctx context.Context, redisClient return nil } -func atomicTimeWrite(addr *int64, t time.Time) { +func atomicTimeWrite(addr *atomic.Int64, t time.Time) { asint64 := t.UnixMilli() - atomic.StoreInt64(addr, asint64) + addr.Store(asint64) } // notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter. -func atomicTimeRead(addr *int64) time.Time { - asint64 := atomic.LoadInt64(addr) +func atomicTimeRead(addr *atomic.Int64) time.Time { + asint64 := addr.Load() return time.UnixMilli(asint64) } @@ -692,7 +692,7 @@ func (c *SeqCoordinator) DebugPrint() string { return fmt.Sprint("Url:", c.config.Url(), " prevChosenSequencer:", c.prevChosenSequencer, " reportedWantsLockout:", c.reportedWantsLockout, - " lockoutUntil:", c.lockoutUntil, + " lockoutUntil:", c.lockoutUntil.Load(), " redisErrors:", c.redisErrors) } diff --git a/arbnode/seq_coordinator_atomic_test.go b/arbnode/seq_coordinator_atomic_test.go index 61468a3adb..9b9d9dea81 100644 --- a/arbnode/seq_coordinator_atomic_test.go +++ b/arbnode/seq_coordinator_atomic_test.go @@ -21,21 +21,21 @@ import ( const messagesPerRound = 20 type CoordinatorTestData struct { - messageCount uint64 + messageCount atomic.Uint64 sequencer []string err error mutex sync.Mutex waitForCoords sync.WaitGroup - testStartRound int32 + testStartRound atomic.Int32 } func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *CoordinatorTestData) { nextRound := int32(0) for { sequenced := make([]bool, messagesPerRound) - for atomic.LoadInt32(&data.testStartRound) < nextRound { + for data.testStartRound.Load() < nextRound { if ctx.Err() != nil { return } @@ -44,7 +44,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo nextRound++ var execError error for { - messageCount := atomic.LoadUint64(&data.messageCount) + messageCount := data.messageCount.Load() if messageCount >= messagesPerRound { break } @@ -53,7 +53,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo err := coord.acquireLockoutAndWriteMessage(ctx, asIndex, asIndex+1, &arbostypes.EmptyTestMessageWithMetadata) if err == nil { sequenced[messageCount] = true - atomic.StoreUint64(&data.messageCount, messageCount+1) + data.messageCount.Store(messageCount + 1) randNr := rand.Intn(20) if randNr > 15 { execError = coord.chosenOneRelease(ctx) @@ -105,9 +105,9 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) { coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true coordConfig.Signer.Symmetric.SigningKey = "" testData := CoordinatorTestData{ - testStartRound: -1, - sequencer: make([]string, messagesPerRound), + sequencer: make([]string, messagesPerRound), } + testData.testStartRound.Store(-1) nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil) Require(t, err) @@ -134,12 +134,12 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) { for round := int32(0); round < 10; round++ { redisClient.Del(ctx, redisutil.CHOSENSEQ_KEY, redisutil.MSG_COUNT_KEY) - testData.messageCount = 0 + testData.messageCount.Store(0) for i := 0; i < messagesPerRound; i++ { testData.sequencer[i] = "" } testData.waitForCoords.Add(NumOfThreads) - atomic.StoreInt32(&testData.testStartRound, round) + testData.testStartRound.Store(round) testData.waitForCoords.Wait() Require(t, testData.err) seqList := "" diff --git a/arbnode/simple_redis_lock_test.go b/arbnode/simple_redis_lock_test.go index c9dd576749..72f3dfa837 100644 --- a/arbnode/simple_redis_lock_test.go +++ b/arbnode/simple_redis_lock_test.go @@ -21,11 +21,11 @@ const test_release_frac = 5 const test_delay = time.Millisecond const test_redisKey_prefix = "__TEMP_SimpleRedisLockTest__" -func attemptLock(ctx context.Context, s *redislock.Simple, flag *int32, wg *sync.WaitGroup) { +func attemptLock(ctx context.Context, s *redislock.Simple, flag *atomic.Int32, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < test_attempts; i++ { if s.AttemptLock(ctx) { - atomic.AddInt32(flag, 1) + flag.Add(1) } else if rand.Intn(test_release_frac) == 0 { s.Release(ctx) } @@ -76,17 +76,17 @@ func simpleRedisLockTest(t *testing.T, redisKeySuffix string, chosen int, backgo <-time.After(time.Second) } wg := sync.WaitGroup{} - counters := make([]int32, test_threads) + counters := make([]atomic.Int32, test_threads) for i, lock := range locks { wg.Add(1) go attemptLock(ctx, lock, &counters[i], &wg) } wg.Wait() successful := -1 - for i, counter := range counters { - if counter != 0 { - if counter != test_attempts { - t.Fatalf("counter %d value %d", i, counter) + for i := range counters { + if counters[i].Load() != 0 { + if counters[i].Load() != test_attempts { + t.Fatalf("counter %d value %d", i, counters[i].Load()) } if successful > 0 { t.Fatalf("counter %d and %d both positive", i, successful) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 5c02129ee6..22ecdd553c 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -62,7 +62,7 @@ type TransactionStreamer struct { nextAllowedFeedReorgLog time.Time broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockHash - broadcasterQueuedMessagesPos uint64 + broadcasterQueuedMessagesPos atomic.Uint64 broadcasterQueuedMessagesActiveReorg bool coordinator *SeqCoordinator @@ -491,14 +491,14 @@ func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreC } func (s *TransactionStreamer) FeedPendingMessageCount() arbutil.MessageIndex { - pos := atomic.LoadUint64(&s.broadcasterQueuedMessagesPos) + pos := s.broadcasterQueuedMessagesPos.Load() if pos == 0 { return 0 } s.insertionMutex.Lock() defer s.insertionMutex.Unlock() - pos = atomic.LoadUint64(&s.broadcasterQueuedMessagesPos) + pos = s.broadcasterQueuedMessagesPos.Load() if pos == 0 { return 0 } @@ -552,14 +552,14 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if len(s.broadcasterQueuedMessages) == 0 || (feedReorg && !s.broadcasterQueuedMessagesActiveReorg) { // Empty cache or feed different from database, save current feed messages until confirmed L1 messages catch up. s.broadcasterQueuedMessages = messages - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos)) s.broadcasterQueuedMessagesActiveReorg = feedReorg } else { - broadcasterQueuedMessagesPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)) + broadcasterQueuedMessagesPos := arbutil.MessageIndex(s.broadcasterQueuedMessagesPos.Load()) if broadcasterQueuedMessagesPos >= broadcastStartPos { // Feed messages older than cache s.broadcasterQueuedMessages = messages - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos)) s.broadcasterQueuedMessagesActiveReorg = feedReorg } else if broadcasterQueuedMessagesPos+arbutil.MessageIndex(len(s.broadcasterQueuedMessages)) == broadcastStartPos { // Feed messages can be added directly to end of cache @@ -579,7 +579,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe ) } s.broadcasterQueuedMessages = messages - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos)) s.broadcasterQueuedMessagesActiveReorg = feedReorg } } @@ -795,7 +795,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil var cacheClearLen int messagesAfterPos := messageStartPos + arbutil.MessageIndex(len(messages)) - broadcastStartPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)) + broadcastStartPos := arbutil.MessageIndex(s.broadcasterQueuedMessagesPos.Load()) if messagesAreConfirmed { var duplicates int @@ -903,10 +903,10 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil // Check if new messages were added at the end of cache, if they were, then dont remove those particular messages if len(s.broadcasterQueuedMessages) > cacheClearLen { s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[cacheClearLen:] - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)+uint64(cacheClearLen)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos) + uint64(cacheClearLen)) } else { s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[:0] - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, 0) + s.broadcasterQueuedMessagesPos.Store(0) } s.broadcasterQueuedMessagesActiveReorg = false } diff --git a/arbos/programs/api.go b/arbos/programs/api.go index 65a58a47c2..0519cd9f01 100644 --- a/arbos/programs/api.go +++ b/arbos/programs/api.go @@ -137,16 +137,31 @@ func newApiClosures( startGas := am.SaturatingUSub(gasLeft, baseCost) * 63 / 64 gas := am.MinInt(startGas, gasReq) - // Tracing: emit the call (value transfer is done later in evm.Call) - if tracingInfo != nil { - tracingInfo.Tracer.CaptureState(0, opcode, startGas, baseCost+gas, scope, []byte{}, depth, nil) - } - // EVM rule: calls that pay get a stipend (opCall) if value.Sign() != 0 { gas = am.SaturatingUAdd(gas, params.CallStipend) } + // Tracing: emit the call (value transfer is done later in evm.Call) + if tracingInfo != nil { + var args []uint256.Int + args = append(args, *uint256.NewInt(gas)) // gas + args = append(args, *uint256.NewInt(0).SetBytes(contract.Bytes())) // to address + if opcode == vm.CALL { + args = append(args, *uint256.NewInt(0).SetBytes(value.Bytes())) // call value + } + args = append(args, *uint256.NewInt(0)) // memory offset + args = append(args, *uint256.NewInt(uint64(len(input)))) // memory length + args = append(args, *uint256.NewInt(0)) // return offset + args = append(args, *uint256.NewInt(0)) // return size + s := &vm.ScopeContext{ + Memory: util.TracingMemoryFromBytes(input), + Stack: util.TracingStackFromArgs(args...), + Contract: scope.Contract, + } + tracingInfo.Tracer.CaptureState(0, opcode, startGas, baseCost+gas, s, []byte{}, depth, nil) + } + var ret []byte var returnGas uint64 diff --git a/arbos/programs/native_api.go b/arbos/programs/native_api.go index 136f74c964..6fbb630ef3 100644 --- a/arbos/programs/native_api.go +++ b/arbos/programs/native_api.go @@ -34,7 +34,7 @@ import ( ) var apiObjects sync.Map -var apiIds uintptr // atomic and sequential +var apiIds atomic.Uintptr // atomic and sequential type NativeApi struct { handler RequestHandler @@ -49,7 +49,7 @@ func newApi( memoryModel *MemoryModel, ) NativeApi { handler := newApiClosures(interpreter, tracingInfo, scope, memoryModel) - apiId := atomic.AddUintptr(&apiIds, 1) + apiId := apiIds.Add(1) id := usize(apiId) api := NativeApi{ handler: handler, diff --git a/arbos/util/tracing.go b/arbos/util/tracing.go index f0f101bc20..f3564143c5 100644 --- a/arbos/util/tracing.go +++ b/arbos/util/tracing.go @@ -56,11 +56,8 @@ func (info *TracingInfo) RecordEmitLog(topics []common.Hash, data []byte) { for _, topic := range topics { args = append(args, HashToUint256(topic)) // topic: 32-byte value. Max topics count is 4 } - memory := vm.NewMemory() - memory.Resize(size) - memory.Set(0, size, data) scope := &vm.ScopeContext{ - Memory: memory, + Memory: TracingMemoryFromBytes(data), Stack: TracingStackFromArgs(args...), Contract: info.Contract, } diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 2225341560..7d27c57fe9 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -133,7 +133,7 @@ type BroadcastClient struct { connMutex sync.Mutex conn net.Conn - retryCount int64 + retryCount atomic.Int64 retrying bool shuttingDown bool @@ -435,7 +435,7 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) { } func (bc *BroadcastClient) GetRetryCount() int64 { - return atomic.LoadInt64(&bc.retryCount) + return bc.retryCount.Load() } func (bc *BroadcastClient) isShuttingDown() bool { @@ -458,7 +458,7 @@ func (bc *BroadcastClient) retryConnect(ctx context.Context) io.Reader { case <-timer.C: } - atomic.AddInt64(&bc.retryCount, 1) + bc.retryCount.Add(1) earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum) if err == nil { bc.retrying = false diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 23c4bd7738..8cd124bfe0 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -49,7 +49,7 @@ type BroadcastClients struct { secondaryRouter *Router // Use atomic access - connected int32 + connected atomic.Int32 } func NewBroadcastClients( @@ -113,7 +113,7 @@ func NewBroadcastClients( } func (bcs *BroadcastClients) adjustCount(delta int32) { - connected := atomic.AddInt32(&bcs.connected, delta) + connected := bcs.connected.Add(delta) if connected <= 0 { log.Error("no connected feed") } diff --git a/cmd/nitro-val/nitro_val.go b/cmd/nitro-val/nitro_val.go index 6f5f546430..1a7d2e6283 100644 --- a/cmd/nitro-val/nitro_val.go +++ b/cmd/nitro-val/nitro_val.go @@ -70,6 +70,9 @@ func mainImpl() int { nodeConfig.WS.Apply(&stackConf) nodeConfig.Auth.Apply(&stackConf) nodeConfig.IPC.Apply(&stackConf) + stackConf.P2P.ListenAddr = "" + stackConf.P2P.NoDial = true + stackConf.P2P.NoDiscovery = true vcsRevision, strippedRevision, vcsTime := confighelpers.GetVersion() stackConf.Version = strippedRevision diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 04bdeb3228..3a34229768 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -183,6 +183,9 @@ func mainImpl() int { if nodeConfig.WS.ExposeAll { stackConf.WSModules = append(stackConf.WSModules, "personal") } + stackConf.P2P.ListenAddr = "" + stackConf.P2P.NoDial = true + stackConf.P2P.NoDiscovery = true vcsRevision, strippedRevision, vcsTime := confighelpers.GetVersion() stackConf.Version = strippedRevision @@ -369,11 +372,6 @@ func mainImpl() int { return 0 } - if nodeConfig.Execution.Caching.Archive && nodeConfig.Execution.TxLookupLimit != 0 { - log.Info("retaining ability to lookup full transaction history as archive mode is enabled") - nodeConfig.Execution.TxLookupLimit = 0 - } - if err := resourcemanager.Init(&nodeConfig.Node.ResourceMgmt); err != nil { flag.Usage() log.Crit("Failed to start resource management module", "err", err) @@ -909,6 +907,12 @@ func ParseNode(ctx context.Context, args []string) (*NodeConfig, *genericconf.Wa if nodeConfig.Execution.Caching.Archive { nodeConfig.Node.MessagePruner.Enable = false } + + if nodeConfig.Execution.Caching.Archive && nodeConfig.Execution.TxLookupLimit != 0 { + log.Info("retaining ability to lookup full transaction history as archive mode is enabled") + nodeConfig.Execution.TxLookupLimit = 0 + } + err = nodeConfig.Validate() if err != nil { return nil, nil, err diff --git a/das/reader_aggregator_strategies.go b/das/reader_aggregator_strategies.go index d20760bd5b..8e10d52c16 100644 --- a/das/reader_aggregator_strategies.go +++ b/das/reader_aggregator_strategies.go @@ -41,7 +41,7 @@ func (s *abstractAggregatorStrategy) update(readers []daprovider.DASReader, stat // Exponentially growing Explore Exploit Strategy type simpleExploreExploitStrategy struct { - iterations uint32 + iterations atomic.Uint32 exploreIterations uint32 exploitIterations uint32 @@ -49,7 +49,7 @@ type simpleExploreExploitStrategy struct { } func (s *simpleExploreExploitStrategy) newInstance() aggregatorStrategyInstance { - iterations := atomic.AddUint32(&s.iterations, 1) + iterations := s.iterations.Add(1) readerSets := make([][]daprovider.DASReader, 0) s.RLock() diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 2bace9b677..bc93309c66 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -321,7 +321,7 @@ type Sequencer struct { onForwarderSet chan struct{} L1BlockAndTimeMutex sync.Mutex - l1BlockNumber uint64 + l1BlockNumber atomic.Uint64 l1Timestamp uint64 // activeMutex manages pauseChan (pauses execution) and forwarder @@ -356,7 +356,6 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead config: configFetcher, senderWhitelist: senderWhitelist, nonceCache: newNonceCache(config.NonceCacheSize), - l1BlockNumber: 0, l1Timestamp: 0, pauseChan: nil, onForwarderSet: make(chan struct{}, 1), @@ -900,7 +899,7 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { timestamp := time.Now().Unix() s.L1BlockAndTimeMutex.Lock() - l1Block := s.l1BlockNumber + l1Block := s.l1BlockNumber.Load() l1Timestamp := s.l1Timestamp s.L1BlockAndTimeMutex.Unlock() @@ -1014,9 +1013,9 @@ func (s *Sequencer) updateLatestParentChainBlock(header *types.Header) { defer s.L1BlockAndTimeMutex.Unlock() l1BlockNumber := arbutil.ParentHeaderToL1BlockNumber(header) - if header.Time > s.l1Timestamp || (header.Time == s.l1Timestamp && l1BlockNumber > s.l1BlockNumber) { + if header.Time > s.l1Timestamp || (header.Time == s.l1Timestamp && l1BlockNumber > s.l1BlockNumber.Load()) { s.l1Timestamp = header.Time - s.l1BlockNumber = l1BlockNumber + s.l1BlockNumber.Store(l1BlockNumber) } } @@ -1082,7 +1081,7 @@ func (s *Sequencer) Start(ctxIn context.Context) error { } if s.l1Reader != nil { - initialBlockNr := atomic.LoadUint64(&s.l1BlockNumber) + initialBlockNr := s.l1BlockNumber.Load() if initialBlockNr == 0 { return errors.New("sequencer not initialized") } diff --git a/go-ethereum b/go-ethereum index e35bf9cdd3..c186780e82 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit e35bf9cdd3d02034ac1be34a479d101f12012ba6 +Subproject commit c186780e82856be27d1576db4d1fd79984562165 diff --git a/scripts/validate-wasm-module-root.sh b/scripts/validate-wasm-module-root.sh new file mode 100755 index 0000000000..f62a46a8d7 --- /dev/null +++ b/scripts/validate-wasm-module-root.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -e + +MACHINES_DIR=$1 +PROVER=$2 + +for machine in "$MACHINES_DIR"/*/ ; do + if [ -d "$machine" ]; then + expectedWasmModuleRoot=$(cat "$machine/module-root.txt") + actualWasmModuleRoot=$(cd "$machine" && "$PROVER" machine.wavm.br --print-wasmmoduleroot) + if [ "$expectedWasmModuleRoot" != "$actualWasmModuleRoot" ]; then + echo "Error: Expected module root $expectedWasmModuleRoot but found $actualWasmModuleRoot in $machine" + exit 1 + fi + fi +done diff --git a/staker/block_validator.go b/staker/block_validator.go index 94ee907da5..d7b5f4f6a2 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -64,9 +64,9 @@ type BlockValidator struct { // can be read (atomic.Load) by anyone holding reorg-read // written (atomic.Set) by appropriate thread or (any way) holding reorg-write - createdA uint64 - recordSentA uint64 - validatedA uint64 + createdA atomic.Uint64 + recordSentA atomic.Uint64 + validatedA atomic.Uint64 validations containers.SyncMap[arbutil.MessageIndex, *validationStatus] config BlockValidatorConfigFetcher @@ -208,19 +208,19 @@ const ( ) type validationStatus struct { - Status uint32 // atomic: value is one of validationStatus* + Status atomic.Uint32 // atomic: value is one of validationStatus* Cancel func() // non-atomic: only read/written to with reorg mutex Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared Runs []validator.ValidationRun // if status >= ValidationSent } func (s *validationStatus) getStatus() valStatusField { - uintStat := atomic.LoadUint32(&s.Status) + uintStat := s.Status.Load() return valStatusField(uintStat) } func (s *validationStatus) replaceStatus(old, new valStatusField) bool { - return atomic.CompareAndSwapUint32(&s.Status, uint32(old), uint32(new)) + return s.Status.CompareAndSwap(uint32(old), uint32(new)) } func NewBlockValidator( @@ -283,12 +283,12 @@ func NewBlockValidator( return ret, nil } -func atomicStorePos(addr *uint64, val arbutil.MessageIndex) { - atomic.StoreUint64(addr, uint64(val)) +func atomicStorePos(addr *atomic.Uint64, val arbutil.MessageIndex) { + addr.Store(uint64(val)) } -func atomicLoadPos(addr *uint64) arbutil.MessageIndex { - return arbutil.MessageIndex(atomic.LoadUint64(addr)) +func atomicLoadPos(addr *atomic.Uint64) arbutil.MessageIndex { + return arbutil.MessageIndex(addr.Load()) } func (v *BlockValidator) created() arbutil.MessageIndex { @@ -582,9 +582,9 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e return false, err } status := &validationStatus{ - Status: uint32(Created), - Entry: entry, + Entry: entry, } + status.Status.Store(uint32(Created)) v.validations.Store(pos, status) v.nextCreateStartGS = endGS v.nextCreatePrevDelayed = msg.DelayedMessagesRead @@ -962,13 +962,13 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt v.nextCreateStartGS = globalState v.nextCreatePrevDelayed = msg.DelayedMessagesRead v.nextCreateBatchReread = true - v.createdA = countUint64 + v.createdA.Store(countUint64) } // under the reorg mutex we don't need atomic access - if v.recordSentA < countUint64 { - v.recordSentA = countUint64 + if v.recordSentA.Load() < countUint64 { + v.recordSentA.Store(countUint64) } - v.validatedA = countUint64 + v.validatedA.Store(countUint64) v.valLoopPos = count validatorMsgCountValidatedGauge.Update(int64(countUint64)) err = v.writeLastValidated(globalState, nil) // we don't know which wasm roots were validated @@ -1027,13 +1027,13 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex) v.nextCreatePrevDelayed = msg.DelayedMessagesRead v.nextCreateBatchReread = true countUint64 := uint64(count) - v.createdA = countUint64 + v.createdA.Store(countUint64) // under the reorg mutex we don't need atomic access - if v.recordSentA > countUint64 { - v.recordSentA = countUint64 + if v.recordSentA.Load() > countUint64 { + v.recordSentA.Store(countUint64) } - if v.validatedA > countUint64 { - v.validatedA = countUint64 + if v.validatedA.Load() > countUint64 { + v.validatedA.Store(countUint64) validatorMsgCountValidatedGauge.Update(int64(countUint64)) err := v.writeLastValidated(v.nextCreateStartGS, nil) // we don't know which wasm roots were validated if err != nil { diff --git a/staker/l1_validator.go b/staker/l1_validator.go index d68365ede0..dd9673ee0b 100644 --- a/staker/l1_validator.go +++ b/staker/l1_validator.go @@ -381,7 +381,7 @@ func (v *L1Validator) generateNodeAction( return nil, false, nil } - successorNodes, err := v.rollup.LookupNodeChildren(ctx, stakerInfo.LatestStakedNode, stakerInfo.LatestStakedNodeHash) + successorNodes, err := v.rollup.LookupNodeChildren(ctx, stakerInfo.LatestStakedNode, stakerConfig.LogQueryBatchSize, stakerInfo.LatestStakedNodeHash) if err != nil { return nil, false, fmt.Errorf("error looking up node %v (hash %v) children: %w", stakerInfo.LatestStakedNode, stakerInfo.LatestStakedNodeHash, err) } diff --git a/staker/rollup_watcher.go b/staker/rollup_watcher.go index 118ce15b44..b35bebd1c6 100644 --- a/staker/rollup_watcher.go +++ b/staker/rollup_watcher.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/core/types" ) var rollupInitializedID common.Hash @@ -165,7 +166,7 @@ func (r *RollupWatcher) LookupNode(ctx context.Context, number uint64) (*NodeInf }, nil } -func (r *RollupWatcher) LookupNodeChildren(ctx context.Context, nodeNum uint64, nodeHash common.Hash) ([]*NodeInfo, error) { +func (r *RollupWatcher) LookupNodeChildren(ctx context.Context, nodeNum uint64, logQueryRangeSize uint64, nodeHash common.Hash) ([]*NodeInfo, error) { node, err := r.RollupUserLogic.GetNode(r.getCallOpts(ctx), nodeNum) if err != nil { return nil, err @@ -180,17 +181,32 @@ func (r *RollupWatcher) LookupNodeChildren(ctx context.Context, nodeNum uint64, Addresses: []common.Address{r.address}, Topics: [][]common.Hash{{nodeCreatedID}, nil, {nodeHash}}, } - query.FromBlock, err = r.getNodeCreationBlock(ctx, nodeNum) + fromBlock, err := r.getNodeCreationBlock(ctx, nodeNum) if err != nil { return nil, err } - query.ToBlock, err = r.getNodeCreationBlock(ctx, node.LatestChildNumber) + toBlock, err := r.getNodeCreationBlock(ctx, node.LatestChildNumber) if err != nil { return nil, err } - logs, err := r.client.FilterLogs(ctx, query) - if err != nil { - return nil, err + var logs []types.Log + // break down the query to avoid eth_getLogs query limit + for toBlock.Cmp(fromBlock) > 0 { + query.FromBlock = fromBlock + if logQueryRangeSize == 0 { + query.ToBlock = toBlock + } else { + query.ToBlock = new(big.Int).Add(fromBlock, big.NewInt(int64(logQueryRangeSize))) + } + if query.ToBlock.Cmp(toBlock) > 0 { + query.ToBlock = toBlock + } + segment, err := r.client.FilterLogs(ctx, query) + if err != nil { + return nil, err + } + logs = append(logs, segment...) + fromBlock = new(big.Int).Add(query.ToBlock, big.NewInt(1)) } infos := make([]*NodeInfo, 0, len(logs)) lastHash := nodeHash diff --git a/staker/staker.go b/staker/staker.go index da6413e122..24f5dc61e3 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -90,6 +90,7 @@ type L1ValidatorConfig struct { ExtraGas uint64 `koanf:"extra-gas" reload:"hot"` Dangerous DangerousConfig `koanf:"dangerous"` ParentChainWallet genericconf.WalletConfig `koanf:"parent-chain-wallet"` + LogQueryBatchSize uint64 `koanf:"log-query-batch-size" reload:"hot"` strategy StakerStrategy gasRefunder common.Address @@ -156,6 +157,7 @@ var DefaultL1ValidatorConfig = L1ValidatorConfig{ ExtraGas: 50000, Dangerous: DefaultDangerousConfig, ParentChainWallet: DefaultValidatorL1WalletConfig, + LogQueryBatchSize: 0, } var TestL1ValidatorConfig = L1ValidatorConfig{ @@ -176,6 +178,7 @@ var TestL1ValidatorConfig = L1ValidatorConfig{ ExtraGas: 50000, Dangerous: DefaultDangerousConfig, ParentChainWallet: DefaultValidatorL1WalletConfig, + LogQueryBatchSize: 0, } var DefaultValidatorL1WalletConfig = genericconf.WalletConfig{ @@ -201,6 +204,7 @@ func L1ValidatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".gas-refunder-address", DefaultL1ValidatorConfig.GasRefunderAddress, "The gas refunder contract address (optional)") f.String(prefix+".redis-url", DefaultL1ValidatorConfig.RedisUrl, "redis url for L1 validator") f.Uint64(prefix+".extra-gas", DefaultL1ValidatorConfig.ExtraGas, "use this much more gas than estimation says is necessary to post transactions") + f.Uint64(prefix+".log-query-batch-size", DefaultL1ValidatorConfig.LogQueryBatchSize, "range ro query from eth_getLogs") dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfigForValidator) DangerousConfigAddOptions(prefix+".dangerous", f) genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultL1ValidatorConfig.ParentChainWallet.Pathname) diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 1cf3d7a4c3..ec235c4bf5 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "net/url" "testing" "github.com/offchainlabs/nitro/arbstate/daprovider" @@ -428,8 +429,13 @@ func (v *StatelessBlockValidator) Start(ctx_in context.Context) error { return fmt.Errorf("starting execution spawner: %w", err) } } - for _, spawner := range v.execSpawners { + for i, spawner := range v.execSpawners { if err := spawner.Start(ctx_in); err != nil { + if u, parseErr := url.Parse(v.config.ValidationServerConfigs[i].URL); parseErr == nil { + if u.Scheme != "ws" && u.Scheme != "wss" { + return fmt.Errorf("validation server's url scheme is unsupported, it should either be ws or wss, url:%s err: %w", v.config.ValidationServerConfigs[i].URL, err) + } + } return err } } diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 16d6b2f131..984b12b6e1 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -327,7 +327,6 @@ func BridgeBalance( l2info.SetFullAccountInfo(account, &AccountInfo{ Address: l1acct.Address, PrivateKey: l1acct.PrivateKey, - Nonce: 0, }) } else { l2acct := l2info.GetInfoWithPrivKey(account) diff --git a/system_tests/conditionaltx_test.go b/system_tests/conditionaltx_test.go index 4f800d9769..286060e666 100644 --- a/system_tests/conditionaltx_test.go +++ b/system_tests/conditionaltx_test.go @@ -58,7 +58,7 @@ func testConditionalTxThatShouldSucceed(t *testing.T, ctx context.Context, idx i func testConditionalTxThatShouldFail(t *testing.T, ctx context.Context, idx int, l2info info, rpcClient *rpc.Client, options *arbitrum_types.ConditionalOptions, expectedErrorCode int) { t.Helper() accountInfo := l2info.GetInfoWithPrivKey("Owner") - nonce := accountInfo.Nonce + nonce := accountInfo.Nonce.Load() tx := l2info.PrepareTx("Owner", "User2", l2info.TransferGas, big.NewInt(1e12), nil) err := arbitrum.SendConditionalTransactionRPC(ctx, rpcClient, tx, options) if err == nil { @@ -77,7 +77,7 @@ func testConditionalTxThatShouldFail(t *testing.T, ctx context.Context, idx int, Fatal(t, "unexpected error type, err:", err) } } - accountInfo.Nonce = nonce // revert nonce as the tx failed + accountInfo.Nonce.Store(nonce) // revert nonce as the tx failed } func getEmptyOptions(address common.Address) []*arbitrum_types.ConditionalOptions { diff --git a/system_tests/seq_nonce_test.go b/system_tests/seq_nonce_test.go index f0e3dcffd7..72629e1978 100644 --- a/system_tests/seq_nonce_test.go +++ b/system_tests/seq_nonce_test.go @@ -67,7 +67,7 @@ func TestSequencerNonceTooHigh(t *testing.T) { cleanup := builder.Build(t) defer cleanup() - builder.L2Info.GetInfoWithPrivKey("Owner").Nonce++ + builder.L2Info.GetInfoWithPrivKey("Owner").Nonce.Add(1) before := time.Now() tx := builder.L2Info.PrepareTx("Owner", "Owner", builder.L2Info.TransferGas, common.Big0, nil) @@ -97,21 +97,21 @@ func TestSequencerNonceTooHighQueueFull(t *testing.T) { defer cleanup() count := 15 - var completed uint64 + var completed atomic.Uint64 for i := 0; i < count; i++ { - builder.L2Info.GetInfoWithPrivKey("Owner").Nonce++ + builder.L2Info.GetInfoWithPrivKey("Owner").Nonce.Add(1) tx := builder.L2Info.PrepareTx("Owner", "Owner", builder.L2Info.TransferGas, common.Big0, nil) go func() { err := builder.L2.Client.SendTransaction(ctx, tx) if err == nil { Fatal(t, "No error when nonce was too high") } - atomic.AddUint64(&completed, 1) + completed.Add(1) }() } for wait := 9; wait >= 0; wait-- { - got := int(atomic.LoadUint64(&completed)) + got := int(completed.Load()) expected := count - builder.execConfig.Sequencer.NonceFailureCacheSize if got == expected { break diff --git a/system_tests/seq_reject_test.go b/system_tests/seq_reject_test.go index 76bdfc2612..2dbdba6804 100644 --- a/system_tests/seq_reject_test.go +++ b/system_tests/seq_reject_test.go @@ -54,7 +54,7 @@ func TestSequencerRejection(t *testing.T) { } wg := sync.WaitGroup{} - var stopBackground int32 + var stopBackground atomic.Int32 for user := 0; user < 9; user++ { user := user name := fmt.Sprintf("User%v", user) @@ -78,12 +78,12 @@ func TestSequencerRejection(t *testing.T) { GasFeeCap: arbmath.BigMulByUint(builderSeq.L2Info.GasPrice, 100), Value: common.Big0, } - for atomic.LoadInt32(&stopBackground) == 0 { - txData.Nonce = info.Nonce + for stopBackground.Load() == 0 { + txData.Nonce = info.Nonce.Load() var expectedErr string if user%3 == 0 { txData.Data = noopId - info.Nonce += 1 + info.Nonce.Add(1) } else if user%3 == 1 { txData.Data = revertId expectedErr = "execution reverted: SOLIDITY_REVERTING" @@ -116,7 +116,7 @@ func TestSequencerRejection(t *testing.T) { } } - atomic.StoreInt32(&stopBackground, 1) + stopBackground.Store(1) wg.Wait() header1, err := builderSeq.L2.Client.HeaderByNumber(ctx, nil) diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index ab30598b60..e3a98b4961 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -192,7 +192,7 @@ func testLyingSequencer(t *testing.T, dasModeStr string) { builder.L2Info.GenerateAccount("RealUser") fraudTx := builder.L2Info.PrepareTx("Owner", "FraudUser", builder.L2Info.TransferGas, big.NewInt(1e12), nil) - builder.L2Info.GetInfoWithPrivKey("Owner").Nonce -= 1 // Use same l2info object for different l2s + builder.L2Info.GetInfoWithPrivKey("Owner").Nonce.Add(^uint64(0)) // Use same l2info object for different l2s realTx := builder.L2Info.PrepareTx("Owner", "RealUser", builder.L2Info.TransferGas, big.NewInt(1e12), nil) for i := 0; i < 10; i++ { diff --git a/system_tests/staker_test.go b/system_tests/staker_test.go index 4afe2e8ccd..52f16614f7 100644 --- a/system_tests/staker_test.go +++ b/system_tests/staker_test.go @@ -42,7 +42,7 @@ import ( func makeBackgroundTxs(ctx context.Context, builder *NodeBuilder) error { for i := uint64(0); ctx.Err() == nil; i++ { - builder.L2Info.Accounts["BackgroundUser"].Nonce = i + builder.L2Info.Accounts["BackgroundUser"].Nonce.Store(i) tx := builder.L2Info.PrepareTx("BackgroundUser", "BackgroundUser", builder.L2Info.TransferGas, common.Big0, nil) err := builder.L2.Client.SendTransaction(ctx, tx) if err != nil { diff --git a/system_tests/test_info.go b/system_tests/test_info.go index 764a8ae396..a4fbb44c44 100644 --- a/system_tests/test_info.go +++ b/system_tests/test_info.go @@ -29,7 +29,7 @@ var simulatedChainID = big.NewInt(1337) type AccountInfo struct { Address common.Address PrivateKey *ecdsa.PrivateKey - Nonce uint64 + Nonce atomic.Uint64 } type BlockchainTestInfo struct { @@ -93,7 +93,6 @@ func (b *BlockchainTestInfo) GenerateAccount(name string) { b.Accounts[name] = &AccountInfo{ PrivateKey: privateKey, Address: crypto.PubkeyToAddress(privateKey.PublicKey), - Nonce: 0, } log.Info("New Key ", "name", name, "Address", b.Accounts[name].Address) } @@ -139,8 +138,11 @@ func (b *BlockchainTestInfo) SetContract(name string, address common.Address) { } func (b *BlockchainTestInfo) SetFullAccountInfo(name string, info *AccountInfo) { - infoCopy := *info - b.Accounts[name] = &infoCopy + b.Accounts[name] = &AccountInfo{ + Address: info.Address, + PrivateKey: info.PrivateKey, + } + b.Accounts[name].Nonce.Store(info.Nonce.Load()) } func (b *BlockchainTestInfo) GetAddress(name string) common.Address { @@ -177,7 +179,7 @@ func (b *BlockchainTestInfo) GetDefaultTransactOpts(name string, ctx context.Con if err != nil { return nil, err } - atomic.AddUint64(&info.Nonce, 1) // we don't set Nonce, but try to keep track.. + info.Nonce.Add(1) // we don't set Nonce, but try to keep track.. return tx.WithSignature(b.Signer, signature) }, GasMargin: 2000, // adjust by 20% @@ -215,7 +217,7 @@ func (b *BlockchainTestInfo) PrepareTxTo( ) *types.Transaction { b.T.Helper() info := b.GetInfoWithPrivKey(from) - txNonce := atomic.AddUint64(&info.Nonce, 1) - 1 + txNonce := info.Nonce.Add(1) - 1 if value == nil { value = common.Big0 } diff --git a/system_tests/twonodeslong_test.go b/system_tests/twonodeslong_test.go index ce3244462f..83cd975dd8 100644 --- a/system_tests/twonodeslong_test.go +++ b/system_tests/twonodeslong_test.go @@ -120,7 +120,7 @@ func testTwoNodesLong(t *testing.T, dasModeStr string) { } } // create bad tx on delayed inbox - builder.L2Info.GetInfoWithPrivKey("ErrorTxSender").Nonce = 10 + builder.L2Info.GetInfoWithPrivKey("ErrorTxSender").Nonce.Store(10) builder.L1.SendWaitTestTransactions(t, []*types.Transaction{ WrapL2ForDelayed(t, builder.L2Info.PrepareTx("ErrorTxSender", "DelayedReceiver", 30002, delayedFaucetNeeds, nil), builder.L1Info, "User", 100000), }) diff --git a/util/containers/promise.go b/util/containers/promise.go index a180c094eb..54f0df195d 100644 --- a/util/containers/promise.go +++ b/util/containers/promise.go @@ -20,7 +20,7 @@ type Promise[R any] struct { chanReady chan struct{} result R err error - produced uint32 + produced atomic.Bool cancel func() } @@ -67,7 +67,7 @@ func (p *Promise[R]) Cancel() { } func (p *Promise[R]) ProduceErrorSafe(err error) error { - if !atomic.CompareAndSwapUint32(&p.produced, 0, 1) { + if !p.produced.CompareAndSwap(false, true) { return errors.New("cannot produce two values") } p.err = err @@ -83,7 +83,7 @@ func (p *Promise[R]) ProduceError(err error) { } func (p *Promise[R]) ProduceSafe(value R) error { - if !atomic.CompareAndSwapUint32(&p.produced, 0, 1) { + if !p.produced.CompareAndSwap(false, true) { return errors.New("cannot produce two values") } p.result = value diff --git a/util/containers/promise_test.go b/util/containers/promise_test.go index 61ee4f2c3a..3e41a6465d 100644 --- a/util/containers/promise_test.go +++ b/util/containers/promise_test.go @@ -25,8 +25,8 @@ func TestPromise(t *testing.T) { t.Fatal("unexpected Promise.Current when ready") } - cancelCalled := int64(0) - cancelFunc := func() { atomic.AddInt64(&cancelCalled, 1) } + var cancelCalled atomic.Int64 + cancelFunc := func() { cancelCalled.Add(1) } tempPromise = NewPromise[int](cancelFunc) res, err = tempPromise.Current() @@ -68,12 +68,12 @@ func TestPromise(t *testing.T) { t.Fatal("unexpected Promise.Current 2nd time") } - if atomic.LoadInt64(&cancelCalled) != 0 { + if cancelCalled.Load() != 0 { t.Fatal("cancel called by await/current when it shouldn't be") } tempPromise.Cancel() - if atomic.LoadInt64(&cancelCalled) != 0 { + if cancelCalled.Load() != 0 { t.Fatal("cancel called after error produced") } @@ -84,11 +84,11 @@ func TestPromise(t *testing.T) { if res != 0 || !errors.Is(err, context.DeadlineExceeded) { t.Fatal("unexpected Promise.Await with timeout") } - if atomic.LoadInt64(&cancelCalled) != 1 { + if cancelCalled.Load() != 1 { t.Fatal("cancel not called by await on timeout") } tempPromise.Cancel() - if atomic.LoadInt64(&cancelCalled) != 2 { + if cancelCalled.Load() != 2 { t.Fatal("cancel not called by promise.Cancel") } } diff --git a/util/rpcclient/rpcclient.go b/util/rpcclient/rpcclient.go index 56aebef396..e171e6e99b 100644 --- a/util/rpcclient/rpcclient.go +++ b/util/rpcclient/rpcclient.go @@ -77,7 +77,7 @@ type RpcClient struct { config ClientConfigFetcher client *rpc.Client autoStack *node.Node - logId uint64 + logId atomic.Uint64 } func NewRpcClient(config ClientConfigFetcher, stack *node.Node) *RpcClient { @@ -154,7 +154,7 @@ func (c *RpcClient) CallContext(ctx_in context.Context, result interface{}, meth if c.client == nil { return errors.New("not connected") } - logId := atomic.AddUint64(&c.logId, 1) + logId := c.logId.Add(1) log.Trace("sending RPC request", "method", method, "logId", logId, "args", limitedArgumentsMarshal{int(c.config().ArgLogLimit), args}) var err error for i := 0; i < int(c.config().Retries)+1; i++ { diff --git a/util/rpcclient/rpcclient_test.go b/util/rpcclient/rpcclient_test.go index 8613671d37..ae7e4fc226 100644 --- a/util/rpcclient/rpcclient_test.go +++ b/util/rpcclient/rpcclient_test.go @@ -46,10 +46,13 @@ func createTestNode(t *testing.T, ctx context.Context, stuckOrFailed int64) *nod stack, err := node.New(&stackConf) Require(t, err) + service := &testAPI{} + service.stuckCalls.Store(stuckOrFailed) + service.failedCalls.Store(stuckOrFailed) testAPIs := []rpc.API{{ Namespace: "test", Version: "1.0", - Service: &testAPI{stuckOrFailed, stuckOrFailed}, + Service: service, Public: true, Authenticated: false, }} @@ -67,12 +70,12 @@ func createTestNode(t *testing.T, ctx context.Context, stuckOrFailed int64) *nod } type testAPI struct { - stuckCalls int64 - failedCalls int64 + stuckCalls atomic.Int64 + failedCalls atomic.Int64 } func (t *testAPI) StuckAtFirst(ctx context.Context) error { - stuckRemaining := atomic.AddInt64(&t.stuckCalls, -1) + 1 + stuckRemaining := t.stuckCalls.Add(-1) + 1 if stuckRemaining <= 0 { return nil } @@ -81,7 +84,7 @@ func (t *testAPI) StuckAtFirst(ctx context.Context) error { } func (t *testAPI) FailAtFirst(ctx context.Context) error { - failedRemaining := atomic.AddInt64(&t.failedCalls, -1) + 1 + failedRemaining := t.failedCalls.Add(-1) + 1 if failedRemaining <= 0 { return nil } diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 4aa4031350..7594421f9a 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -58,7 +58,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { type ValidationClient struct { stopwaiter.StopWaiter name string - room int32 + room atomic.Int32 // producers stores moduleRoot to producer mapping. producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] producerConfig pubsub.ProducerConfig @@ -75,14 +75,15 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) if err != nil { return nil, err } - return &ValidationClient{ + validationClient := &ValidationClient{ name: cfg.Name, - room: cfg.Room, producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), producerConfig: cfg.ProducerConfig, redisClient: redisClient, createStreams: cfg.CreateStreams, - }, nil + } + validationClient.room.Store(cfg.Room) + return validationClient, nil } func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { @@ -114,8 +115,8 @@ func (c *ValidationClient) WasmModuleRoots() ([]common.Hash, error) { } func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&c.room, -1) - defer atomic.AddInt32(&c.room, 1) + c.room.Add(-1) + defer c.room.Add(1) producer, found := c.producers[moduleRoot] if !found { errPromise := containers.NewReadyPromise(validator.GoGlobalState{}, fmt.Errorf("no validation is configured for wasm root %v", moduleRoot)) @@ -152,5 +153,5 @@ func (c *ValidationClient) Name() string { } func (c *ValidationClient) Room() int { - return int(c.room) + return int(c.room.Load()) } diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index 949260002d..79ecc6bdf4 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -29,7 +29,7 @@ type ValidationClient struct { stopwaiter.StopWaiter client *rpcclient.RpcClient name string - room int32 + room atomic.Int32 wasmModuleRoots []common.Hash } @@ -40,12 +40,12 @@ func NewValidationClient(config rpcclient.ClientConfigFetcher, stack *node.Node) } func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&c.room, -1) + c.room.Add(-1) promise := stopwaiter.LaunchPromiseThread[validator.GoGlobalState](c, func(ctx context.Context) (validator.GoGlobalState, error) { input := server_api.ValidationInputToJson(entry) var res validator.GoGlobalState err := c.client.CallContext(ctx, &res, server_api.Namespace+"_validate", input, moduleRoot) - atomic.AddInt32(&c.room, 1) + c.room.Add(1) return res, err }) return server_common.NewValRun(promise, moduleRoot) @@ -81,7 +81,7 @@ func (c *ValidationClient) Start(ctx_in context.Context) error { } else { log.Info("connected to validation server", "name", name, "room", room) } - atomic.StoreInt32(&c.room, int32(room)) + c.room.Store(int32(room)) c.wasmModuleRoots = moduleRoots c.name = name return nil @@ -106,7 +106,7 @@ func (c *ValidationClient) Name() string { } func (c *ValidationClient) Room() int { - room32 := atomic.LoadInt32(&c.room) + room32 := c.room.Load() if room32 < 0 { return 0 } diff --git a/validator/server_arb/machine.go b/validator/server_arb/machine.go index cffd3db0ee..adca9695e2 100644 --- a/validator/server_arb/machine.go +++ b/validator/server_arb/machine.go @@ -59,7 +59,7 @@ type ArbitratorMachine struct { var _ MachineInterface = (*ArbitratorMachine)(nil) var preimageResolvers containers.SyncMap[int64, GoPreimageResolver] -var lastPreimageResolverId int64 // atomic +var lastPreimageResolverId atomic.Int64 // atomic // Any future calls to this machine will result in a panic func (m *ArbitratorMachine) Destroy() { @@ -382,7 +382,7 @@ func (m *ArbitratorMachine) SetPreimageResolver(resolver GoPreimageResolver) err if m.frozen { return errors.New("machine frozen") } - id := atomic.AddInt64(&lastPreimageResolverId, 1) + id := lastPreimageResolverId.Add(1) preimageResolvers.Store(id, resolver) m.contextId = &id runtime.SetFinalizer(m.contextId, freeContextId) diff --git a/validator/server_arb/validator_spawner.go b/validator/server_arb/validator_spawner.go index dca15c369e..7b9293f7bd 100644 --- a/validator/server_arb/validator_spawner.go +++ b/validator/server_arb/validator_spawner.go @@ -59,7 +59,7 @@ func DefaultArbitratorSpawnerConfigFetcher() *ArbitratorSpawnerConfig { type ArbitratorSpawner struct { stopwaiter.StopWaiter - count int32 + count atomic.Int32 locator *server_common.MachineLocator machineLoader *ArbMachineLoader config ArbitratorSpawnerConfigFecher @@ -176,9 +176,9 @@ func (v *ArbitratorSpawner) execute( } func (v *ArbitratorSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&v.count, 1) + v.count.Add(1) promise := stopwaiter.LaunchPromiseThread[validator.GoGlobalState](v, func(ctx context.Context) (validator.GoGlobalState, error) { - defer atomic.AddInt32(&v.count, -1) + defer v.count.Add(-1) return v.execute(ctx, entry, moduleRoot) }) return server_common.NewValRun(promise, moduleRoot) diff --git a/validator/server_jit/spawner.go b/validator/server_jit/spawner.go index 703e761af5..eda74b2911 100644 --- a/validator/server_jit/spawner.go +++ b/validator/server_jit/spawner.go @@ -39,7 +39,7 @@ func JitSpawnerConfigAddOptions(prefix string, f *flag.FlagSet) { type JitSpawner struct { stopwaiter.StopWaiter - count int32 + count atomic.Int32 locator *server_common.MachineLocator machineLoader *JitMachineLoader config JitSpawnerConfigFecher @@ -91,9 +91,9 @@ func (s *JitSpawner) Name() string { } func (v *JitSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&v.count, 1) + v.count.Add(1) promise := stopwaiter.LaunchPromiseThread[validator.GoGlobalState](v, func(ctx context.Context) (validator.GoGlobalState, error) { - defer atomic.AddInt32(&v.count, -1) + defer v.count.Add(-1) return v.execute(ctx, entry, moduleRoot) }) return server_common.NewValRun(promise, moduleRoot) diff --git a/wsbroadcastserver/clientconnection.go b/wsbroadcastserver/clientconnection.go index ba70756c98..7fb88bf9eb 100644 --- a/wsbroadcastserver/clientconnection.go +++ b/wsbroadcastserver/clientconnection.go @@ -51,7 +51,7 @@ type ClientConnection struct { requestedSeqNum arbutil.MessageIndex LastSentSeqNum atomic.Uint64 - lastHeardUnix int64 + lastHeardUnix atomic.Int64 out chan message backlog backlog.Backlog registered chan bool @@ -74,7 +74,7 @@ func NewClientConnection( delay time.Duration, bklg backlog.Backlog, ) *ClientConnection { - return &ClientConnection{ + clientConnection := &ClientConnection{ conn: conn, clientIp: connectingIP, desc: desc, @@ -82,7 +82,6 @@ func NewClientConnection( Name: fmt.Sprintf("%s@%s-%d", connectingIP, conn.RemoteAddr(), rand.Intn(10)), clientAction: clientAction, requestedSeqNum: requestedSeqNum, - lastHeardUnix: time.Now().Unix(), out: make(chan message, maxSendQueue), compression: compression, flateReader: NewFlateReader(), @@ -91,6 +90,8 @@ func NewClientConnection( registered: make(chan bool, 1), backlogSent: false, } + clientConnection.lastHeardUnix.Store(time.Now().Unix()) + return clientConnection } func (cc *ClientConnection) Age() time.Duration { @@ -287,7 +288,7 @@ func (cc *ClientConnection) RequestedSeqNum() arbutil.MessageIndex { } func (cc *ClientConnection) GetLastHeard() time.Time { - return time.Unix(atomic.LoadInt64(&cc.lastHeardUnix), 0) + return time.Unix(cc.lastHeardUnix.Load(), 0) } // Receive reads next message from client's underlying connection. @@ -307,7 +308,7 @@ func (cc *ClientConnection) readRequest(ctx context.Context, timeout time.Durati cc.ioMutex.Lock() defer cc.ioMutex.Unlock() - atomic.StoreInt64(&cc.lastHeardUnix, time.Now().Unix()) + cc.lastHeardUnix.Store(time.Now().Unix()) var data []byte var opCode ws.OpCode diff --git a/wsbroadcastserver/clientmanager.go b/wsbroadcastserver/clientmanager.go index a88716756a..4b7b7a2bcb 100644 --- a/wsbroadcastserver/clientmanager.go +++ b/wsbroadcastserver/clientmanager.go @@ -44,7 +44,7 @@ type ClientManager struct { stopwaiter.StopWaiter clientPtrMap map[*ClientConnection]bool - clientCount int32 + clientCount atomic.Int32 pool *gopool.Pool poller netpoll.Poller broadcastChan chan *m.BroadcastMessage @@ -85,7 +85,7 @@ func (cm *ClientManager) registerClient(ctx context.Context, clientConnection *C clientsCurrentGauge.Inc(1) clientsConnectCount.Inc(1) - atomic.AddInt32(&cm.clientCount, 1) + cm.clientCount.Add(1) cm.clientPtrMap[clientConnection] = true clientsTotalSuccessCounter.Inc(1) @@ -120,7 +120,7 @@ func (cm *ClientManager) removeClientImpl(clientConnection *ClientConnection) { clientsDurationHistogram.Update(clientConnection.Age().Microseconds()) clientsCurrentGauge.Dec(1) clientsDisconnectCount.Inc(1) - atomic.AddInt32(&cm.clientCount, -1) + cm.clientCount.Add(-1) } func (cm *ClientManager) removeClient(clientConnection *ClientConnection) { @@ -137,7 +137,7 @@ func (cm *ClientManager) removeClient(clientConnection *ClientConnection) { } func (cm *ClientManager) ClientCount() int32 { - return atomic.LoadInt32(&cm.clientCount) + return cm.clientCount.Load() } // Broadcast sends batch item to all clients.