diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 2617a9a629..ccf9b4aab7 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -112,7 +112,7 @@ type BatchPoster struct { // This is an atomic variable that should only be accessed atomically. // An estimate of the number of batches we want to post but haven't yet. // This doesn't include batches which we don't want to post yet due to the L1 bounds. - backlog uint64 + backlog atomic.Uint64 lastHitL1Bounds time.Time // The last time we wanted to post a message but hit the L1 bounds batchReverted atomic.Bool // indicates whether data poster batch was reverted @@ -1086,7 +1086,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) if config.IgnoreBlobPrice { use4844 = true } else { - backlog := atomic.LoadUint64(&b.backlog) + backlog := b.backlog.Load() // Logic to prevent switching from non-4844 batches to 4844 batches too often, // so that blocks can be filled efficiently. The geth txpool rejects txs for // accounts that already have the other type of txs in the pool with @@ -1437,7 +1437,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) // Setting the backlog to 0 here ensures that we don't lower compression as a result. backlog = 0 } - atomic.StoreUint64(&b.backlog, backlog) + b.backlog.Store(backlog) b.building = nil // If we aren't queueing up transactions, wait for the receipt before moving on to the next batch. @@ -1453,7 +1453,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) } func (b *BatchPoster) GetBacklogEstimate() uint64 { - return atomic.LoadUint64(&b.backlog) + return b.backlog.Load() } func (b *BatchPoster) Start(ctxIn context.Context) { diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 3ba9aa78f3..78b4db1929 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -97,8 +97,8 @@ type InboxReader struct { l1Reader *headerreader.HeaderReader // Atomic - lastSeenBatchCount uint64 - lastReadBatchCount uint64 + lastSeenBatchCount atomic.Uint64 + lastReadBatchCount atomic.Uint64 } func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) { @@ -240,7 +240,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { seenBatchCountStored := uint64(math.MaxUint64) storeSeenBatchCount := func() { if seenBatchCountStored != seenBatchCount { - atomic.StoreUint64(&r.lastSeenBatchCount, seenBatchCount) + r.lastSeenBatchCount.Store(seenBatchCount) seenBatchCountStored = seenBatchCount } } @@ -394,7 +394,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { // There's nothing to do from = arbmath.BigAddByUint(currentHeight, 1) blocksToFetch = config.DefaultBlocksToRead - atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount) + r.lastReadBatchCount.Store(checkingBatchCount) storeSeenBatchCount() if !r.caughtUp && readMode == "latest" { r.caughtUp = true @@ -526,7 +526,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } if len(sequencerBatches) > 0 { readAnyBatches = true - atomic.StoreUint64(&r.lastReadBatchCount, sequencerBatches[len(sequencerBatches)-1].SequenceNumber+1) + r.lastReadBatchCount.Store(sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1) storeSeenBatchCount() } } @@ -553,7 +553,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } if !readAnyBatches { - atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount) + r.lastReadBatchCount.Store(checkingBatchCount) storeSeenBatchCount() } } @@ -625,7 +625,7 @@ func (r *InboxReader) GetSequencerMessageBytes(ctx context.Context, seqNum uint6 } func (r *InboxReader) GetLastReadBatchCount() uint64 { - return atomic.LoadUint64(&r.lastReadBatchCount) + return r.lastReadBatchCount.Load() } // GetLastSeenBatchCount returns how many sequencer batches the inbox reader has read in from L1. @@ -633,7 +633,7 @@ func (r *InboxReader) GetLastReadBatchCount() uint64 { // >0 - last batchcount seen in run() - only written after lastReadBatchCount updated // 0 - no batch seen, error func (r *InboxReader) GetLastSeenBatchCount() uint64 { - return atomic.LoadUint64(&r.lastSeenBatchCount) + return r.lastSeenBatchCount.Load() } func (r *InboxReader) GetDelayBlocks() uint64 { diff --git a/arbnode/redislock/redis.go b/arbnode/redislock/redis.go index 09296e3c79..7e26010cae 100644 --- a/arbnode/redislock/redis.go +++ b/arbnode/redislock/redis.go @@ -21,7 +21,7 @@ type Simple struct { stopwaiter.StopWaiter client redis.UniversalClient config SimpleCfgFetcher - lockedUntil int64 + lockedUntil atomic.Int64 mutex sync.Mutex stopping bool readyToLock func() bool @@ -239,12 +239,12 @@ func execTestPipe(pipe redis.Pipeliner, ctx context.Context) error { } // notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter. -func atomicTimeRead(addr *int64) time.Time { - asint64 := atomic.LoadInt64(addr) +func atomicTimeRead(addr *atomic.Int64) time.Time { + asint64 := addr.Load() return time.UnixMilli(asint64) } -func atomicTimeWrite(addr *int64, t time.Time) { +func atomicTimeWrite(addr *atomic.Int64, t time.Time) { asint64 := t.UnixMilli() - atomic.StoreInt64(addr, asint64) + addr.Store(asint64) } diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index cdf1011b11..98c19ce361 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -48,7 +48,7 @@ type SeqCoordinator struct { prevChosenSequencer string reportedWantsLockout bool - lockoutUntil int64 // atomic + lockoutUntil atomic.Int64 // atomic wantsLockoutMutex sync.Mutex // manages access to acquireLockoutAndWriteMessage and generally the wants lockout key avoidLockout int // If > 0, prevents acquiring the lockout but not extending the lockout if no alternative sequencer wants the lockout. Protected by chosenUpdateMutex. @@ -191,14 +191,14 @@ func StandaloneSeqCoordinatorInvalidateMsgIndex(ctx context.Context, redisClient return nil } -func atomicTimeWrite(addr *int64, t time.Time) { +func atomicTimeWrite(addr *atomic.Int64, t time.Time) { asint64 := t.UnixMilli() - atomic.StoreInt64(addr, asint64) + addr.Store(asint64) } // notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter. -func atomicTimeRead(addr *int64) time.Time { - asint64 := atomic.LoadInt64(addr) +func atomicTimeRead(addr *atomic.Int64) time.Time { + asint64 := addr.Load() return time.UnixMilli(asint64) } @@ -692,7 +692,7 @@ func (c *SeqCoordinator) DebugPrint() string { return fmt.Sprint("Url:", c.config.Url(), " prevChosenSequencer:", c.prevChosenSequencer, " reportedWantsLockout:", c.reportedWantsLockout, - " lockoutUntil:", c.lockoutUntil, + " lockoutUntil:", c.lockoutUntil.Load(), " redisErrors:", c.redisErrors) } diff --git a/arbnode/seq_coordinator_atomic_test.go b/arbnode/seq_coordinator_atomic_test.go index 61468a3adb..9b9d9dea81 100644 --- a/arbnode/seq_coordinator_atomic_test.go +++ b/arbnode/seq_coordinator_atomic_test.go @@ -21,21 +21,21 @@ import ( const messagesPerRound = 20 type CoordinatorTestData struct { - messageCount uint64 + messageCount atomic.Uint64 sequencer []string err error mutex sync.Mutex waitForCoords sync.WaitGroup - testStartRound int32 + testStartRound atomic.Int32 } func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *CoordinatorTestData) { nextRound := int32(0) for { sequenced := make([]bool, messagesPerRound) - for atomic.LoadInt32(&data.testStartRound) < nextRound { + for data.testStartRound.Load() < nextRound { if ctx.Err() != nil { return } @@ -44,7 +44,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo nextRound++ var execError error for { - messageCount := atomic.LoadUint64(&data.messageCount) + messageCount := data.messageCount.Load() if messageCount >= messagesPerRound { break } @@ -53,7 +53,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo err := coord.acquireLockoutAndWriteMessage(ctx, asIndex, asIndex+1, &arbostypes.EmptyTestMessageWithMetadata) if err == nil { sequenced[messageCount] = true - atomic.StoreUint64(&data.messageCount, messageCount+1) + data.messageCount.Store(messageCount + 1) randNr := rand.Intn(20) if randNr > 15 { execError = coord.chosenOneRelease(ctx) @@ -105,9 +105,9 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) { coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true coordConfig.Signer.Symmetric.SigningKey = "" testData := CoordinatorTestData{ - testStartRound: -1, - sequencer: make([]string, messagesPerRound), + sequencer: make([]string, messagesPerRound), } + testData.testStartRound.Store(-1) nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil) Require(t, err) @@ -134,12 +134,12 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) { for round := int32(0); round < 10; round++ { redisClient.Del(ctx, redisutil.CHOSENSEQ_KEY, redisutil.MSG_COUNT_KEY) - testData.messageCount = 0 + testData.messageCount.Store(0) for i := 0; i < messagesPerRound; i++ { testData.sequencer[i] = "" } testData.waitForCoords.Add(NumOfThreads) - atomic.StoreInt32(&testData.testStartRound, round) + testData.testStartRound.Store(round) testData.waitForCoords.Wait() Require(t, testData.err) seqList := "" diff --git a/arbnode/simple_redis_lock_test.go b/arbnode/simple_redis_lock_test.go index c9dd576749..72f3dfa837 100644 --- a/arbnode/simple_redis_lock_test.go +++ b/arbnode/simple_redis_lock_test.go @@ -21,11 +21,11 @@ const test_release_frac = 5 const test_delay = time.Millisecond const test_redisKey_prefix = "__TEMP_SimpleRedisLockTest__" -func attemptLock(ctx context.Context, s *redislock.Simple, flag *int32, wg *sync.WaitGroup) { +func attemptLock(ctx context.Context, s *redislock.Simple, flag *atomic.Int32, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < test_attempts; i++ { if s.AttemptLock(ctx) { - atomic.AddInt32(flag, 1) + flag.Add(1) } else if rand.Intn(test_release_frac) == 0 { s.Release(ctx) } @@ -76,17 +76,17 @@ func simpleRedisLockTest(t *testing.T, redisKeySuffix string, chosen int, backgo <-time.After(time.Second) } wg := sync.WaitGroup{} - counters := make([]int32, test_threads) + counters := make([]atomic.Int32, test_threads) for i, lock := range locks { wg.Add(1) go attemptLock(ctx, lock, &counters[i], &wg) } wg.Wait() successful := -1 - for i, counter := range counters { - if counter != 0 { - if counter != test_attempts { - t.Fatalf("counter %d value %d", i, counter) + for i := range counters { + if counters[i].Load() != 0 { + if counters[i].Load() != test_attempts { + t.Fatalf("counter %d value %d", i, counters[i].Load()) } if successful > 0 { t.Fatalf("counter %d and %d both positive", i, successful) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 5c02129ee6..22ecdd553c 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -62,7 +62,7 @@ type TransactionStreamer struct { nextAllowedFeedReorgLog time.Time broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockHash - broadcasterQueuedMessagesPos uint64 + broadcasterQueuedMessagesPos atomic.Uint64 broadcasterQueuedMessagesActiveReorg bool coordinator *SeqCoordinator @@ -491,14 +491,14 @@ func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreC } func (s *TransactionStreamer) FeedPendingMessageCount() arbutil.MessageIndex { - pos := atomic.LoadUint64(&s.broadcasterQueuedMessagesPos) + pos := s.broadcasterQueuedMessagesPos.Load() if pos == 0 { return 0 } s.insertionMutex.Lock() defer s.insertionMutex.Unlock() - pos = atomic.LoadUint64(&s.broadcasterQueuedMessagesPos) + pos = s.broadcasterQueuedMessagesPos.Load() if pos == 0 { return 0 } @@ -552,14 +552,14 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if len(s.broadcasterQueuedMessages) == 0 || (feedReorg && !s.broadcasterQueuedMessagesActiveReorg) { // Empty cache or feed different from database, save current feed messages until confirmed L1 messages catch up. s.broadcasterQueuedMessages = messages - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos)) s.broadcasterQueuedMessagesActiveReorg = feedReorg } else { - broadcasterQueuedMessagesPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)) + broadcasterQueuedMessagesPos := arbutil.MessageIndex(s.broadcasterQueuedMessagesPos.Load()) if broadcasterQueuedMessagesPos >= broadcastStartPos { // Feed messages older than cache s.broadcasterQueuedMessages = messages - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos)) s.broadcasterQueuedMessagesActiveReorg = feedReorg } else if broadcasterQueuedMessagesPos+arbutil.MessageIndex(len(s.broadcasterQueuedMessages)) == broadcastStartPos { // Feed messages can be added directly to end of cache @@ -579,7 +579,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe ) } s.broadcasterQueuedMessages = messages - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos)) s.broadcasterQueuedMessagesActiveReorg = feedReorg } } @@ -795,7 +795,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil var cacheClearLen int messagesAfterPos := messageStartPos + arbutil.MessageIndex(len(messages)) - broadcastStartPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)) + broadcastStartPos := arbutil.MessageIndex(s.broadcasterQueuedMessagesPos.Load()) if messagesAreConfirmed { var duplicates int @@ -903,10 +903,10 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil // Check if new messages were added at the end of cache, if they were, then dont remove those particular messages if len(s.broadcasterQueuedMessages) > cacheClearLen { s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[cacheClearLen:] - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)+uint64(cacheClearLen)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos) + uint64(cacheClearLen)) } else { s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[:0] - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, 0) + s.broadcasterQueuedMessagesPos.Store(0) } s.broadcasterQueuedMessagesActiveReorg = false } diff --git a/arbos/programs/native_api.go b/arbos/programs/native_api.go index 136f74c964..6fbb630ef3 100644 --- a/arbos/programs/native_api.go +++ b/arbos/programs/native_api.go @@ -34,7 +34,7 @@ import ( ) var apiObjects sync.Map -var apiIds uintptr // atomic and sequential +var apiIds atomic.Uintptr // atomic and sequential type NativeApi struct { handler RequestHandler @@ -49,7 +49,7 @@ func newApi( memoryModel *MemoryModel, ) NativeApi { handler := newApiClosures(interpreter, tracingInfo, scope, memoryModel) - apiId := atomic.AddUintptr(&apiIds, 1) + apiId := apiIds.Add(1) id := usize(apiId) api := NativeApi{ handler: handler, diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 2225341560..7d27c57fe9 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -133,7 +133,7 @@ type BroadcastClient struct { connMutex sync.Mutex conn net.Conn - retryCount int64 + retryCount atomic.Int64 retrying bool shuttingDown bool @@ -435,7 +435,7 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) { } func (bc *BroadcastClient) GetRetryCount() int64 { - return atomic.LoadInt64(&bc.retryCount) + return bc.retryCount.Load() } func (bc *BroadcastClient) isShuttingDown() bool { @@ -458,7 +458,7 @@ func (bc *BroadcastClient) retryConnect(ctx context.Context) io.Reader { case <-timer.C: } - atomic.AddInt64(&bc.retryCount, 1) + bc.retryCount.Add(1) earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum) if err == nil { bc.retrying = false diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 23c4bd7738..8cd124bfe0 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -49,7 +49,7 @@ type BroadcastClients struct { secondaryRouter *Router // Use atomic access - connected int32 + connected atomic.Int32 } func NewBroadcastClients( @@ -113,7 +113,7 @@ func NewBroadcastClients( } func (bcs *BroadcastClients) adjustCount(delta int32) { - connected := atomic.AddInt32(&bcs.connected, delta) + connected := bcs.connected.Add(delta) if connected <= 0 { log.Error("no connected feed") } diff --git a/das/reader_aggregator_strategies.go b/das/reader_aggregator_strategies.go index d20760bd5b..8e10d52c16 100644 --- a/das/reader_aggregator_strategies.go +++ b/das/reader_aggregator_strategies.go @@ -41,7 +41,7 @@ func (s *abstractAggregatorStrategy) update(readers []daprovider.DASReader, stat // Exponentially growing Explore Exploit Strategy type simpleExploreExploitStrategy struct { - iterations uint32 + iterations atomic.Uint32 exploreIterations uint32 exploitIterations uint32 @@ -49,7 +49,7 @@ type simpleExploreExploitStrategy struct { } func (s *simpleExploreExploitStrategy) newInstance() aggregatorStrategyInstance { - iterations := atomic.AddUint32(&s.iterations, 1) + iterations := s.iterations.Add(1) readerSets := make([][]daprovider.DASReader, 0) s.RLock() diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 2bace9b677..bc93309c66 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -321,7 +321,7 @@ type Sequencer struct { onForwarderSet chan struct{} L1BlockAndTimeMutex sync.Mutex - l1BlockNumber uint64 + l1BlockNumber atomic.Uint64 l1Timestamp uint64 // activeMutex manages pauseChan (pauses execution) and forwarder @@ -356,7 +356,6 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead config: configFetcher, senderWhitelist: senderWhitelist, nonceCache: newNonceCache(config.NonceCacheSize), - l1BlockNumber: 0, l1Timestamp: 0, pauseChan: nil, onForwarderSet: make(chan struct{}, 1), @@ -900,7 +899,7 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { timestamp := time.Now().Unix() s.L1BlockAndTimeMutex.Lock() - l1Block := s.l1BlockNumber + l1Block := s.l1BlockNumber.Load() l1Timestamp := s.l1Timestamp s.L1BlockAndTimeMutex.Unlock() @@ -1014,9 +1013,9 @@ func (s *Sequencer) updateLatestParentChainBlock(header *types.Header) { defer s.L1BlockAndTimeMutex.Unlock() l1BlockNumber := arbutil.ParentHeaderToL1BlockNumber(header) - if header.Time > s.l1Timestamp || (header.Time == s.l1Timestamp && l1BlockNumber > s.l1BlockNumber) { + if header.Time > s.l1Timestamp || (header.Time == s.l1Timestamp && l1BlockNumber > s.l1BlockNumber.Load()) { s.l1Timestamp = header.Time - s.l1BlockNumber = l1BlockNumber + s.l1BlockNumber.Store(l1BlockNumber) } } @@ -1082,7 +1081,7 @@ func (s *Sequencer) Start(ctxIn context.Context) error { } if s.l1Reader != nil { - initialBlockNr := atomic.LoadUint64(&s.l1BlockNumber) + initialBlockNr := s.l1BlockNumber.Load() if initialBlockNr == 0 { return errors.New("sequencer not initialized") } diff --git a/staker/block_validator.go b/staker/block_validator.go index 94ee907da5..d7b5f4f6a2 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -64,9 +64,9 @@ type BlockValidator struct { // can be read (atomic.Load) by anyone holding reorg-read // written (atomic.Set) by appropriate thread or (any way) holding reorg-write - createdA uint64 - recordSentA uint64 - validatedA uint64 + createdA atomic.Uint64 + recordSentA atomic.Uint64 + validatedA atomic.Uint64 validations containers.SyncMap[arbutil.MessageIndex, *validationStatus] config BlockValidatorConfigFetcher @@ -208,19 +208,19 @@ const ( ) type validationStatus struct { - Status uint32 // atomic: value is one of validationStatus* + Status atomic.Uint32 // atomic: value is one of validationStatus* Cancel func() // non-atomic: only read/written to with reorg mutex Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared Runs []validator.ValidationRun // if status >= ValidationSent } func (s *validationStatus) getStatus() valStatusField { - uintStat := atomic.LoadUint32(&s.Status) + uintStat := s.Status.Load() return valStatusField(uintStat) } func (s *validationStatus) replaceStatus(old, new valStatusField) bool { - return atomic.CompareAndSwapUint32(&s.Status, uint32(old), uint32(new)) + return s.Status.CompareAndSwap(uint32(old), uint32(new)) } func NewBlockValidator( @@ -283,12 +283,12 @@ func NewBlockValidator( return ret, nil } -func atomicStorePos(addr *uint64, val arbutil.MessageIndex) { - atomic.StoreUint64(addr, uint64(val)) +func atomicStorePos(addr *atomic.Uint64, val arbutil.MessageIndex) { + addr.Store(uint64(val)) } -func atomicLoadPos(addr *uint64) arbutil.MessageIndex { - return arbutil.MessageIndex(atomic.LoadUint64(addr)) +func atomicLoadPos(addr *atomic.Uint64) arbutil.MessageIndex { + return arbutil.MessageIndex(addr.Load()) } func (v *BlockValidator) created() arbutil.MessageIndex { @@ -582,9 +582,9 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e return false, err } status := &validationStatus{ - Status: uint32(Created), - Entry: entry, + Entry: entry, } + status.Status.Store(uint32(Created)) v.validations.Store(pos, status) v.nextCreateStartGS = endGS v.nextCreatePrevDelayed = msg.DelayedMessagesRead @@ -962,13 +962,13 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt v.nextCreateStartGS = globalState v.nextCreatePrevDelayed = msg.DelayedMessagesRead v.nextCreateBatchReread = true - v.createdA = countUint64 + v.createdA.Store(countUint64) } // under the reorg mutex we don't need atomic access - if v.recordSentA < countUint64 { - v.recordSentA = countUint64 + if v.recordSentA.Load() < countUint64 { + v.recordSentA.Store(countUint64) } - v.validatedA = countUint64 + v.validatedA.Store(countUint64) v.valLoopPos = count validatorMsgCountValidatedGauge.Update(int64(countUint64)) err = v.writeLastValidated(globalState, nil) // we don't know which wasm roots were validated @@ -1027,13 +1027,13 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex) v.nextCreatePrevDelayed = msg.DelayedMessagesRead v.nextCreateBatchReread = true countUint64 := uint64(count) - v.createdA = countUint64 + v.createdA.Store(countUint64) // under the reorg mutex we don't need atomic access - if v.recordSentA > countUint64 { - v.recordSentA = countUint64 + if v.recordSentA.Load() > countUint64 { + v.recordSentA.Store(countUint64) } - if v.validatedA > countUint64 { - v.validatedA = countUint64 + if v.validatedA.Load() > countUint64 { + v.validatedA.Store(countUint64) validatorMsgCountValidatedGauge.Update(int64(countUint64)) err := v.writeLastValidated(v.nextCreateStartGS, nil) // we don't know which wasm roots were validated if err != nil { diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 16d6b2f131..984b12b6e1 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -327,7 +327,6 @@ func BridgeBalance( l2info.SetFullAccountInfo(account, &AccountInfo{ Address: l1acct.Address, PrivateKey: l1acct.PrivateKey, - Nonce: 0, }) } else { l2acct := l2info.GetInfoWithPrivKey(account) diff --git a/system_tests/conditionaltx_test.go b/system_tests/conditionaltx_test.go index 4f800d9769..286060e666 100644 --- a/system_tests/conditionaltx_test.go +++ b/system_tests/conditionaltx_test.go @@ -58,7 +58,7 @@ func testConditionalTxThatShouldSucceed(t *testing.T, ctx context.Context, idx i func testConditionalTxThatShouldFail(t *testing.T, ctx context.Context, idx int, l2info info, rpcClient *rpc.Client, options *arbitrum_types.ConditionalOptions, expectedErrorCode int) { t.Helper() accountInfo := l2info.GetInfoWithPrivKey("Owner") - nonce := accountInfo.Nonce + nonce := accountInfo.Nonce.Load() tx := l2info.PrepareTx("Owner", "User2", l2info.TransferGas, big.NewInt(1e12), nil) err := arbitrum.SendConditionalTransactionRPC(ctx, rpcClient, tx, options) if err == nil { @@ -77,7 +77,7 @@ func testConditionalTxThatShouldFail(t *testing.T, ctx context.Context, idx int, Fatal(t, "unexpected error type, err:", err) } } - accountInfo.Nonce = nonce // revert nonce as the tx failed + accountInfo.Nonce.Store(nonce) // revert nonce as the tx failed } func getEmptyOptions(address common.Address) []*arbitrum_types.ConditionalOptions { diff --git a/system_tests/seq_nonce_test.go b/system_tests/seq_nonce_test.go index f0e3dcffd7..72629e1978 100644 --- a/system_tests/seq_nonce_test.go +++ b/system_tests/seq_nonce_test.go @@ -67,7 +67,7 @@ func TestSequencerNonceTooHigh(t *testing.T) { cleanup := builder.Build(t) defer cleanup() - builder.L2Info.GetInfoWithPrivKey("Owner").Nonce++ + builder.L2Info.GetInfoWithPrivKey("Owner").Nonce.Add(1) before := time.Now() tx := builder.L2Info.PrepareTx("Owner", "Owner", builder.L2Info.TransferGas, common.Big0, nil) @@ -97,21 +97,21 @@ func TestSequencerNonceTooHighQueueFull(t *testing.T) { defer cleanup() count := 15 - var completed uint64 + var completed atomic.Uint64 for i := 0; i < count; i++ { - builder.L2Info.GetInfoWithPrivKey("Owner").Nonce++ + builder.L2Info.GetInfoWithPrivKey("Owner").Nonce.Add(1) tx := builder.L2Info.PrepareTx("Owner", "Owner", builder.L2Info.TransferGas, common.Big0, nil) go func() { err := builder.L2.Client.SendTransaction(ctx, tx) if err == nil { Fatal(t, "No error when nonce was too high") } - atomic.AddUint64(&completed, 1) + completed.Add(1) }() } for wait := 9; wait >= 0; wait-- { - got := int(atomic.LoadUint64(&completed)) + got := int(completed.Load()) expected := count - builder.execConfig.Sequencer.NonceFailureCacheSize if got == expected { break diff --git a/system_tests/seq_reject_test.go b/system_tests/seq_reject_test.go index 76bdfc2612..2dbdba6804 100644 --- a/system_tests/seq_reject_test.go +++ b/system_tests/seq_reject_test.go @@ -54,7 +54,7 @@ func TestSequencerRejection(t *testing.T) { } wg := sync.WaitGroup{} - var stopBackground int32 + var stopBackground atomic.Int32 for user := 0; user < 9; user++ { user := user name := fmt.Sprintf("User%v", user) @@ -78,12 +78,12 @@ func TestSequencerRejection(t *testing.T) { GasFeeCap: arbmath.BigMulByUint(builderSeq.L2Info.GasPrice, 100), Value: common.Big0, } - for atomic.LoadInt32(&stopBackground) == 0 { - txData.Nonce = info.Nonce + for stopBackground.Load() == 0 { + txData.Nonce = info.Nonce.Load() var expectedErr string if user%3 == 0 { txData.Data = noopId - info.Nonce += 1 + info.Nonce.Add(1) } else if user%3 == 1 { txData.Data = revertId expectedErr = "execution reverted: SOLIDITY_REVERTING" @@ -116,7 +116,7 @@ func TestSequencerRejection(t *testing.T) { } } - atomic.StoreInt32(&stopBackground, 1) + stopBackground.Store(1) wg.Wait() header1, err := builderSeq.L2.Client.HeaderByNumber(ctx, nil) diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index ab30598b60..e3a98b4961 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -192,7 +192,7 @@ func testLyingSequencer(t *testing.T, dasModeStr string) { builder.L2Info.GenerateAccount("RealUser") fraudTx := builder.L2Info.PrepareTx("Owner", "FraudUser", builder.L2Info.TransferGas, big.NewInt(1e12), nil) - builder.L2Info.GetInfoWithPrivKey("Owner").Nonce -= 1 // Use same l2info object for different l2s + builder.L2Info.GetInfoWithPrivKey("Owner").Nonce.Add(^uint64(0)) // Use same l2info object for different l2s realTx := builder.L2Info.PrepareTx("Owner", "RealUser", builder.L2Info.TransferGas, big.NewInt(1e12), nil) for i := 0; i < 10; i++ { diff --git a/system_tests/staker_test.go b/system_tests/staker_test.go index 4afe2e8ccd..52f16614f7 100644 --- a/system_tests/staker_test.go +++ b/system_tests/staker_test.go @@ -42,7 +42,7 @@ import ( func makeBackgroundTxs(ctx context.Context, builder *NodeBuilder) error { for i := uint64(0); ctx.Err() == nil; i++ { - builder.L2Info.Accounts["BackgroundUser"].Nonce = i + builder.L2Info.Accounts["BackgroundUser"].Nonce.Store(i) tx := builder.L2Info.PrepareTx("BackgroundUser", "BackgroundUser", builder.L2Info.TransferGas, common.Big0, nil) err := builder.L2.Client.SendTransaction(ctx, tx) if err != nil { diff --git a/system_tests/test_info.go b/system_tests/test_info.go index 764a8ae396..a4fbb44c44 100644 --- a/system_tests/test_info.go +++ b/system_tests/test_info.go @@ -29,7 +29,7 @@ var simulatedChainID = big.NewInt(1337) type AccountInfo struct { Address common.Address PrivateKey *ecdsa.PrivateKey - Nonce uint64 + Nonce atomic.Uint64 } type BlockchainTestInfo struct { @@ -93,7 +93,6 @@ func (b *BlockchainTestInfo) GenerateAccount(name string) { b.Accounts[name] = &AccountInfo{ PrivateKey: privateKey, Address: crypto.PubkeyToAddress(privateKey.PublicKey), - Nonce: 0, } log.Info("New Key ", "name", name, "Address", b.Accounts[name].Address) } @@ -139,8 +138,11 @@ func (b *BlockchainTestInfo) SetContract(name string, address common.Address) { } func (b *BlockchainTestInfo) SetFullAccountInfo(name string, info *AccountInfo) { - infoCopy := *info - b.Accounts[name] = &infoCopy + b.Accounts[name] = &AccountInfo{ + Address: info.Address, + PrivateKey: info.PrivateKey, + } + b.Accounts[name].Nonce.Store(info.Nonce.Load()) } func (b *BlockchainTestInfo) GetAddress(name string) common.Address { @@ -177,7 +179,7 @@ func (b *BlockchainTestInfo) GetDefaultTransactOpts(name string, ctx context.Con if err != nil { return nil, err } - atomic.AddUint64(&info.Nonce, 1) // we don't set Nonce, but try to keep track.. + info.Nonce.Add(1) // we don't set Nonce, but try to keep track.. return tx.WithSignature(b.Signer, signature) }, GasMargin: 2000, // adjust by 20% @@ -215,7 +217,7 @@ func (b *BlockchainTestInfo) PrepareTxTo( ) *types.Transaction { b.T.Helper() info := b.GetInfoWithPrivKey(from) - txNonce := atomic.AddUint64(&info.Nonce, 1) - 1 + txNonce := info.Nonce.Add(1) - 1 if value == nil { value = common.Big0 } diff --git a/system_tests/twonodeslong_test.go b/system_tests/twonodeslong_test.go index ce3244462f..83cd975dd8 100644 --- a/system_tests/twonodeslong_test.go +++ b/system_tests/twonodeslong_test.go @@ -120,7 +120,7 @@ func testTwoNodesLong(t *testing.T, dasModeStr string) { } } // create bad tx on delayed inbox - builder.L2Info.GetInfoWithPrivKey("ErrorTxSender").Nonce = 10 + builder.L2Info.GetInfoWithPrivKey("ErrorTxSender").Nonce.Store(10) builder.L1.SendWaitTestTransactions(t, []*types.Transaction{ WrapL2ForDelayed(t, builder.L2Info.PrepareTx("ErrorTxSender", "DelayedReceiver", 30002, delayedFaucetNeeds, nil), builder.L1Info, "User", 100000), }) diff --git a/util/containers/promise.go b/util/containers/promise.go index a180c094eb..54f0df195d 100644 --- a/util/containers/promise.go +++ b/util/containers/promise.go @@ -20,7 +20,7 @@ type Promise[R any] struct { chanReady chan struct{} result R err error - produced uint32 + produced atomic.Bool cancel func() } @@ -67,7 +67,7 @@ func (p *Promise[R]) Cancel() { } func (p *Promise[R]) ProduceErrorSafe(err error) error { - if !atomic.CompareAndSwapUint32(&p.produced, 0, 1) { + if !p.produced.CompareAndSwap(false, true) { return errors.New("cannot produce two values") } p.err = err @@ -83,7 +83,7 @@ func (p *Promise[R]) ProduceError(err error) { } func (p *Promise[R]) ProduceSafe(value R) error { - if !atomic.CompareAndSwapUint32(&p.produced, 0, 1) { + if !p.produced.CompareAndSwap(false, true) { return errors.New("cannot produce two values") } p.result = value diff --git a/util/containers/promise_test.go b/util/containers/promise_test.go index 61ee4f2c3a..3e41a6465d 100644 --- a/util/containers/promise_test.go +++ b/util/containers/promise_test.go @@ -25,8 +25,8 @@ func TestPromise(t *testing.T) { t.Fatal("unexpected Promise.Current when ready") } - cancelCalled := int64(0) - cancelFunc := func() { atomic.AddInt64(&cancelCalled, 1) } + var cancelCalled atomic.Int64 + cancelFunc := func() { cancelCalled.Add(1) } tempPromise = NewPromise[int](cancelFunc) res, err = tempPromise.Current() @@ -68,12 +68,12 @@ func TestPromise(t *testing.T) { t.Fatal("unexpected Promise.Current 2nd time") } - if atomic.LoadInt64(&cancelCalled) != 0 { + if cancelCalled.Load() != 0 { t.Fatal("cancel called by await/current when it shouldn't be") } tempPromise.Cancel() - if atomic.LoadInt64(&cancelCalled) != 0 { + if cancelCalled.Load() != 0 { t.Fatal("cancel called after error produced") } @@ -84,11 +84,11 @@ func TestPromise(t *testing.T) { if res != 0 || !errors.Is(err, context.DeadlineExceeded) { t.Fatal("unexpected Promise.Await with timeout") } - if atomic.LoadInt64(&cancelCalled) != 1 { + if cancelCalled.Load() != 1 { t.Fatal("cancel not called by await on timeout") } tempPromise.Cancel() - if atomic.LoadInt64(&cancelCalled) != 2 { + if cancelCalled.Load() != 2 { t.Fatal("cancel not called by promise.Cancel") } } diff --git a/util/rpcclient/rpcclient.go b/util/rpcclient/rpcclient.go index 56aebef396..e171e6e99b 100644 --- a/util/rpcclient/rpcclient.go +++ b/util/rpcclient/rpcclient.go @@ -77,7 +77,7 @@ type RpcClient struct { config ClientConfigFetcher client *rpc.Client autoStack *node.Node - logId uint64 + logId atomic.Uint64 } func NewRpcClient(config ClientConfigFetcher, stack *node.Node) *RpcClient { @@ -154,7 +154,7 @@ func (c *RpcClient) CallContext(ctx_in context.Context, result interface{}, meth if c.client == nil { return errors.New("not connected") } - logId := atomic.AddUint64(&c.logId, 1) + logId := c.logId.Add(1) log.Trace("sending RPC request", "method", method, "logId", logId, "args", limitedArgumentsMarshal{int(c.config().ArgLogLimit), args}) var err error for i := 0; i < int(c.config().Retries)+1; i++ { diff --git a/util/rpcclient/rpcclient_test.go b/util/rpcclient/rpcclient_test.go index 8613671d37..ae7e4fc226 100644 --- a/util/rpcclient/rpcclient_test.go +++ b/util/rpcclient/rpcclient_test.go @@ -46,10 +46,13 @@ func createTestNode(t *testing.T, ctx context.Context, stuckOrFailed int64) *nod stack, err := node.New(&stackConf) Require(t, err) + service := &testAPI{} + service.stuckCalls.Store(stuckOrFailed) + service.failedCalls.Store(stuckOrFailed) testAPIs := []rpc.API{{ Namespace: "test", Version: "1.0", - Service: &testAPI{stuckOrFailed, stuckOrFailed}, + Service: service, Public: true, Authenticated: false, }} @@ -67,12 +70,12 @@ func createTestNode(t *testing.T, ctx context.Context, stuckOrFailed int64) *nod } type testAPI struct { - stuckCalls int64 - failedCalls int64 + stuckCalls atomic.Int64 + failedCalls atomic.Int64 } func (t *testAPI) StuckAtFirst(ctx context.Context) error { - stuckRemaining := atomic.AddInt64(&t.stuckCalls, -1) + 1 + stuckRemaining := t.stuckCalls.Add(-1) + 1 if stuckRemaining <= 0 { return nil } @@ -81,7 +84,7 @@ func (t *testAPI) StuckAtFirst(ctx context.Context) error { } func (t *testAPI) FailAtFirst(ctx context.Context) error { - failedRemaining := atomic.AddInt64(&t.failedCalls, -1) + 1 + failedRemaining := t.failedCalls.Add(-1) + 1 if failedRemaining <= 0 { return nil } diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 4aa4031350..7594421f9a 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -58,7 +58,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { type ValidationClient struct { stopwaiter.StopWaiter name string - room int32 + room atomic.Int32 // producers stores moduleRoot to producer mapping. producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] producerConfig pubsub.ProducerConfig @@ -75,14 +75,15 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) if err != nil { return nil, err } - return &ValidationClient{ + validationClient := &ValidationClient{ name: cfg.Name, - room: cfg.Room, producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), producerConfig: cfg.ProducerConfig, redisClient: redisClient, createStreams: cfg.CreateStreams, - }, nil + } + validationClient.room.Store(cfg.Room) + return validationClient, nil } func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { @@ -114,8 +115,8 @@ func (c *ValidationClient) WasmModuleRoots() ([]common.Hash, error) { } func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&c.room, -1) - defer atomic.AddInt32(&c.room, 1) + c.room.Add(-1) + defer c.room.Add(1) producer, found := c.producers[moduleRoot] if !found { errPromise := containers.NewReadyPromise(validator.GoGlobalState{}, fmt.Errorf("no validation is configured for wasm root %v", moduleRoot)) @@ -152,5 +153,5 @@ func (c *ValidationClient) Name() string { } func (c *ValidationClient) Room() int { - return int(c.room) + return int(c.room.Load()) } diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index 949260002d..79ecc6bdf4 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -29,7 +29,7 @@ type ValidationClient struct { stopwaiter.StopWaiter client *rpcclient.RpcClient name string - room int32 + room atomic.Int32 wasmModuleRoots []common.Hash } @@ -40,12 +40,12 @@ func NewValidationClient(config rpcclient.ClientConfigFetcher, stack *node.Node) } func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&c.room, -1) + c.room.Add(-1) promise := stopwaiter.LaunchPromiseThread[validator.GoGlobalState](c, func(ctx context.Context) (validator.GoGlobalState, error) { input := server_api.ValidationInputToJson(entry) var res validator.GoGlobalState err := c.client.CallContext(ctx, &res, server_api.Namespace+"_validate", input, moduleRoot) - atomic.AddInt32(&c.room, 1) + c.room.Add(1) return res, err }) return server_common.NewValRun(promise, moduleRoot) @@ -81,7 +81,7 @@ func (c *ValidationClient) Start(ctx_in context.Context) error { } else { log.Info("connected to validation server", "name", name, "room", room) } - atomic.StoreInt32(&c.room, int32(room)) + c.room.Store(int32(room)) c.wasmModuleRoots = moduleRoots c.name = name return nil @@ -106,7 +106,7 @@ func (c *ValidationClient) Name() string { } func (c *ValidationClient) Room() int { - room32 := atomic.LoadInt32(&c.room) + room32 := c.room.Load() if room32 < 0 { return 0 } diff --git a/validator/server_arb/machine.go b/validator/server_arb/machine.go index cffd3db0ee..adca9695e2 100644 --- a/validator/server_arb/machine.go +++ b/validator/server_arb/machine.go @@ -59,7 +59,7 @@ type ArbitratorMachine struct { var _ MachineInterface = (*ArbitratorMachine)(nil) var preimageResolvers containers.SyncMap[int64, GoPreimageResolver] -var lastPreimageResolverId int64 // atomic +var lastPreimageResolverId atomic.Int64 // atomic // Any future calls to this machine will result in a panic func (m *ArbitratorMachine) Destroy() { @@ -382,7 +382,7 @@ func (m *ArbitratorMachine) SetPreimageResolver(resolver GoPreimageResolver) err if m.frozen { return errors.New("machine frozen") } - id := atomic.AddInt64(&lastPreimageResolverId, 1) + id := lastPreimageResolverId.Add(1) preimageResolvers.Store(id, resolver) m.contextId = &id runtime.SetFinalizer(m.contextId, freeContextId) diff --git a/validator/server_arb/validator_spawner.go b/validator/server_arb/validator_spawner.go index dca15c369e..7b9293f7bd 100644 --- a/validator/server_arb/validator_spawner.go +++ b/validator/server_arb/validator_spawner.go @@ -59,7 +59,7 @@ func DefaultArbitratorSpawnerConfigFetcher() *ArbitratorSpawnerConfig { type ArbitratorSpawner struct { stopwaiter.StopWaiter - count int32 + count atomic.Int32 locator *server_common.MachineLocator machineLoader *ArbMachineLoader config ArbitratorSpawnerConfigFecher @@ -176,9 +176,9 @@ func (v *ArbitratorSpawner) execute( } func (v *ArbitratorSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&v.count, 1) + v.count.Add(1) promise := stopwaiter.LaunchPromiseThread[validator.GoGlobalState](v, func(ctx context.Context) (validator.GoGlobalState, error) { - defer atomic.AddInt32(&v.count, -1) + defer v.count.Add(-1) return v.execute(ctx, entry, moduleRoot) }) return server_common.NewValRun(promise, moduleRoot) diff --git a/validator/server_jit/spawner.go b/validator/server_jit/spawner.go index 703e761af5..eda74b2911 100644 --- a/validator/server_jit/spawner.go +++ b/validator/server_jit/spawner.go @@ -39,7 +39,7 @@ func JitSpawnerConfigAddOptions(prefix string, f *flag.FlagSet) { type JitSpawner struct { stopwaiter.StopWaiter - count int32 + count atomic.Int32 locator *server_common.MachineLocator machineLoader *JitMachineLoader config JitSpawnerConfigFecher @@ -91,9 +91,9 @@ func (s *JitSpawner) Name() string { } func (v *JitSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&v.count, 1) + v.count.Add(1) promise := stopwaiter.LaunchPromiseThread[validator.GoGlobalState](v, func(ctx context.Context) (validator.GoGlobalState, error) { - defer atomic.AddInt32(&v.count, -1) + defer v.count.Add(-1) return v.execute(ctx, entry, moduleRoot) }) return server_common.NewValRun(promise, moduleRoot) diff --git a/wsbroadcastserver/clientconnection.go b/wsbroadcastserver/clientconnection.go index ba70756c98..7fb88bf9eb 100644 --- a/wsbroadcastserver/clientconnection.go +++ b/wsbroadcastserver/clientconnection.go @@ -51,7 +51,7 @@ type ClientConnection struct { requestedSeqNum arbutil.MessageIndex LastSentSeqNum atomic.Uint64 - lastHeardUnix int64 + lastHeardUnix atomic.Int64 out chan message backlog backlog.Backlog registered chan bool @@ -74,7 +74,7 @@ func NewClientConnection( delay time.Duration, bklg backlog.Backlog, ) *ClientConnection { - return &ClientConnection{ + clientConnection := &ClientConnection{ conn: conn, clientIp: connectingIP, desc: desc, @@ -82,7 +82,6 @@ func NewClientConnection( Name: fmt.Sprintf("%s@%s-%d", connectingIP, conn.RemoteAddr(), rand.Intn(10)), clientAction: clientAction, requestedSeqNum: requestedSeqNum, - lastHeardUnix: time.Now().Unix(), out: make(chan message, maxSendQueue), compression: compression, flateReader: NewFlateReader(), @@ -91,6 +90,8 @@ func NewClientConnection( registered: make(chan bool, 1), backlogSent: false, } + clientConnection.lastHeardUnix.Store(time.Now().Unix()) + return clientConnection } func (cc *ClientConnection) Age() time.Duration { @@ -287,7 +288,7 @@ func (cc *ClientConnection) RequestedSeqNum() arbutil.MessageIndex { } func (cc *ClientConnection) GetLastHeard() time.Time { - return time.Unix(atomic.LoadInt64(&cc.lastHeardUnix), 0) + return time.Unix(cc.lastHeardUnix.Load(), 0) } // Receive reads next message from client's underlying connection. @@ -307,7 +308,7 @@ func (cc *ClientConnection) readRequest(ctx context.Context, timeout time.Durati cc.ioMutex.Lock() defer cc.ioMutex.Unlock() - atomic.StoreInt64(&cc.lastHeardUnix, time.Now().Unix()) + cc.lastHeardUnix.Store(time.Now().Unix()) var data []byte var opCode ws.OpCode diff --git a/wsbroadcastserver/clientmanager.go b/wsbroadcastserver/clientmanager.go index a88716756a..4b7b7a2bcb 100644 --- a/wsbroadcastserver/clientmanager.go +++ b/wsbroadcastserver/clientmanager.go @@ -44,7 +44,7 @@ type ClientManager struct { stopwaiter.StopWaiter clientPtrMap map[*ClientConnection]bool - clientCount int32 + clientCount atomic.Int32 pool *gopool.Pool poller netpoll.Poller broadcastChan chan *m.BroadcastMessage @@ -85,7 +85,7 @@ func (cm *ClientManager) registerClient(ctx context.Context, clientConnection *C clientsCurrentGauge.Inc(1) clientsConnectCount.Inc(1) - atomic.AddInt32(&cm.clientCount, 1) + cm.clientCount.Add(1) cm.clientPtrMap[clientConnection] = true clientsTotalSuccessCounter.Inc(1) @@ -120,7 +120,7 @@ func (cm *ClientManager) removeClientImpl(clientConnection *ClientConnection) { clientsDurationHistogram.Update(clientConnection.Age().Microseconds()) clientsCurrentGauge.Dec(1) clientsDisconnectCount.Inc(1) - atomic.AddInt32(&cm.clientCount, -1) + cm.clientCount.Add(-1) } func (cm *ClientManager) removeClient(clientConnection *ClientConnection) { @@ -137,7 +137,7 @@ func (cm *ClientManager) removeClient(clientConnection *ClientConnection) { } func (cm *ClientManager) ClientCount() int32 { - return atomic.LoadInt32(&cm.clientCount) + return cm.clientCount.Load() } // Broadcast sends batch item to all clients.