Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2607] Change atomic.Store to atomic.Type #2449

Merged
merged 5 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type BatchPoster struct {
// This is an atomic variable that should only be accessed atomically.
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
backlog uint64
backlog atomic.Uint64
lastHitL1Bounds time.Time // The last time we wanted to post a message but hit the L1 bounds

batchReverted atomic.Bool // indicates whether data poster batch was reverted
Expand Down Expand Up @@ -1086,7 +1086,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if config.IgnoreBlobPrice {
use4844 = true
} else {
backlog := atomic.LoadUint64(&b.backlog)
backlog := b.backlog.Load()
// Logic to prevent switching from non-4844 batches to 4844 batches too often,
// so that blocks can be filled efficiently. The geth txpool rejects txs for
// accounts that already have the other type of txs in the pool with
Expand Down Expand Up @@ -1437,7 +1437,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
// Setting the backlog to 0 here ensures that we don't lower compression as a result.
backlog = 0
}
atomic.StoreUint64(&b.backlog, backlog)
b.backlog.Store(backlog)
b.building = nil

// If we aren't queueing up transactions, wait for the receipt before moving on to the next batch.
Expand All @@ -1453,7 +1453,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}

func (b *BatchPoster) GetBacklogEstimate() uint64 {
return atomic.LoadUint64(&b.backlog)
return b.backlog.Load()
}

func (b *BatchPoster) Start(ctxIn context.Context) {
Expand Down
16 changes: 8 additions & 8 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type InboxReader struct {
l1Reader *headerreader.HeaderReader

// Atomic
lastSeenBatchCount uint64
lastReadBatchCount uint64
lastSeenBatchCount atomic.Uint64
lastReadBatchCount atomic.Uint64
}

func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) {
Expand Down Expand Up @@ -240,7 +240,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
seenBatchCountStored := uint64(math.MaxUint64)
storeSeenBatchCount := func() {
if seenBatchCountStored != seenBatchCount {
atomic.StoreUint64(&r.lastSeenBatchCount, seenBatchCount)
r.lastSeenBatchCount.Store(seenBatchCount)
seenBatchCountStored = seenBatchCount
}
}
Expand Down Expand Up @@ -394,7 +394,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
// There's nothing to do
from = arbmath.BigAddByUint(currentHeight, 1)
blocksToFetch = config.DefaultBlocksToRead
atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount)
r.lastReadBatchCount.Store(checkingBatchCount)
storeSeenBatchCount()
if !r.caughtUp && readMode == "latest" {
r.caughtUp = true
Expand Down Expand Up @@ -526,7 +526,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}
if len(sequencerBatches) > 0 {
readAnyBatches = true
atomic.StoreUint64(&r.lastReadBatchCount, sequencerBatches[len(sequencerBatches)-1].SequenceNumber+1)
r.lastReadBatchCount.Store(sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1)
storeSeenBatchCount()
}
}
Expand All @@ -553,7 +553,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}

if !readAnyBatches {
atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount)
r.lastReadBatchCount.Store(checkingBatchCount)
storeSeenBatchCount()
}
}
Expand Down Expand Up @@ -625,15 +625,15 @@ func (r *InboxReader) GetSequencerMessageBytes(ctx context.Context, seqNum uint6
}

func (r *InboxReader) GetLastReadBatchCount() uint64 {
return atomic.LoadUint64(&r.lastReadBatchCount)
return r.lastReadBatchCount.Load()
}

// GetLastSeenBatchCount returns how many sequencer batches the inbox reader has read in from L1.
// Return values:
// >0 - last batchcount seen in run() - only written after lastReadBatchCount updated
// 0 - no batch seen, error
func (r *InboxReader) GetLastSeenBatchCount() uint64 {
return atomic.LoadUint64(&r.lastSeenBatchCount)
return r.lastSeenBatchCount.Load()
}

func (r *InboxReader) GetDelayBlocks() uint64 {
Expand Down
10 changes: 5 additions & 5 deletions arbnode/redislock/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Simple struct {
stopwaiter.StopWaiter
client redis.UniversalClient
config SimpleCfgFetcher
lockedUntil int64
lockedUntil atomic.Int64
mutex sync.Mutex
stopping bool
readyToLock func() bool
Expand Down Expand Up @@ -239,12 +239,12 @@ func execTestPipe(pipe redis.Pipeliner, ctx context.Context) error {
}

// notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter.
func atomicTimeRead(addr *int64) time.Time {
asint64 := atomic.LoadInt64(addr)
func atomicTimeRead(addr *atomic.Int64) time.Time {
asint64 := addr.Load()
return time.UnixMilli(asint64)
}

func atomicTimeWrite(addr *int64, t time.Time) {
func atomicTimeWrite(addr *atomic.Int64, t time.Time) {
asint64 := t.UnixMilli()
atomic.StoreInt64(addr, asint64)
addr.Store(asint64)
}
12 changes: 6 additions & 6 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type SeqCoordinator struct {
prevChosenSequencer string
reportedWantsLockout bool

lockoutUntil int64 // atomic
lockoutUntil atomic.Int64 // atomic

wantsLockoutMutex sync.Mutex // manages access to acquireLockoutAndWriteMessage and generally the wants lockout key
avoidLockout int // If > 0, prevents acquiring the lockout but not extending the lockout if no alternative sequencer wants the lockout. Protected by chosenUpdateMutex.
Expand Down Expand Up @@ -191,14 +191,14 @@ func StandaloneSeqCoordinatorInvalidateMsgIndex(ctx context.Context, redisClient
return nil
}

func atomicTimeWrite(addr *int64, t time.Time) {
func atomicTimeWrite(addr *atomic.Int64, t time.Time) {
asint64 := t.UnixMilli()
atomic.StoreInt64(addr, asint64)
addr.Store(asint64)
}

// notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter.
func atomicTimeRead(addr *int64) time.Time {
asint64 := atomic.LoadInt64(addr)
func atomicTimeRead(addr *atomic.Int64) time.Time {
asint64 := addr.Load()
return time.UnixMilli(asint64)
}

Expand Down Expand Up @@ -692,7 +692,7 @@ func (c *SeqCoordinator) DebugPrint() string {
return fmt.Sprint("Url:", c.config.Url(),
" prevChosenSequencer:", c.prevChosenSequencer,
" reportedWantsLockout:", c.reportedWantsLockout,
" lockoutUntil:", c.lockoutUntil,
" lockoutUntil:", c.lockoutUntil.Load(),
" redisErrors:", c.redisErrors)
}

Expand Down
18 changes: 9 additions & 9 deletions arbnode/seq_coordinator_atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ import (
const messagesPerRound = 20

type CoordinatorTestData struct {
messageCount uint64
messageCount atomic.Uint64

sequencer []string
err error
mutex sync.Mutex

waitForCoords sync.WaitGroup
testStartRound int32
testStartRound atomic.Int32
}

func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *CoordinatorTestData) {
nextRound := int32(0)
for {
sequenced := make([]bool, messagesPerRound)
for atomic.LoadInt32(&data.testStartRound) < nextRound {
for data.testStartRound.Load() < nextRound {
if ctx.Err() != nil {
return
}
Expand All @@ -44,7 +44,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo
nextRound++
var execError error
for {
messageCount := atomic.LoadUint64(&data.messageCount)
messageCount := data.messageCount.Load()
if messageCount >= messagesPerRound {
break
}
Expand All @@ -53,7 +53,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo
err := coord.acquireLockoutAndWriteMessage(ctx, asIndex, asIndex+1, &arbostypes.EmptyTestMessageWithMetadata)
if err == nil {
sequenced[messageCount] = true
atomic.StoreUint64(&data.messageCount, messageCount+1)
data.messageCount.Store(messageCount + 1)
randNr := rand.Intn(20)
if randNr > 15 {
execError = coord.chosenOneRelease(ctx)
Expand Down Expand Up @@ -105,9 +105,9 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {
coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true
coordConfig.Signer.Symmetric.SigningKey = ""
testData := CoordinatorTestData{
testStartRound: -1,
sequencer: make([]string, messagesPerRound),
sequencer: make([]string, messagesPerRound),
}
testData.testStartRound.Store(-1)
nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil)
Require(t, err)

Expand All @@ -134,12 +134,12 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {

for round := int32(0); round < 10; round++ {
redisClient.Del(ctx, redisutil.CHOSENSEQ_KEY, redisutil.MSG_COUNT_KEY)
testData.messageCount = 0
testData.messageCount.Store(0)
for i := 0; i < messagesPerRound; i++ {
testData.sequencer[i] = ""
}
testData.waitForCoords.Add(NumOfThreads)
atomic.StoreInt32(&testData.testStartRound, round)
testData.testStartRound.Store(round)
testData.waitForCoords.Wait()
Require(t, testData.err)
seqList := ""
Expand Down
14 changes: 7 additions & 7 deletions arbnode/simple_redis_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ const test_release_frac = 5
const test_delay = time.Millisecond
const test_redisKey_prefix = "__TEMP_SimpleRedisLockTest__"

func attemptLock(ctx context.Context, s *redislock.Simple, flag *int32, wg *sync.WaitGroup) {
func attemptLock(ctx context.Context, s *redislock.Simple, flag *atomic.Int32, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < test_attempts; i++ {
if s.AttemptLock(ctx) {
atomic.AddInt32(flag, 1)
flag.Add(1)
} else if rand.Intn(test_release_frac) == 0 {
s.Release(ctx)
}
Expand Down Expand Up @@ -76,17 +76,17 @@ func simpleRedisLockTest(t *testing.T, redisKeySuffix string, chosen int, backgo
<-time.After(time.Second)
}
wg := sync.WaitGroup{}
counters := make([]int32, test_threads)
counters := make([]atomic.Int32, test_threads)
for i, lock := range locks {
wg.Add(1)
go attemptLock(ctx, lock, &counters[i], &wg)
}
wg.Wait()
successful := -1
for i, counter := range counters {
if counter != 0 {
if counter != test_attempts {
t.Fatalf("counter %d value %d", i, counter)
for i := range counters {
if counters[i].Load() != 0 {
if counters[i].Load() != test_attempts {
t.Fatalf("counter %d value %d", i, counters[i].Load())
}
if successful > 0 {
t.Fatalf("counter %d and %d both positive", i, successful)
Expand Down
20 changes: 10 additions & 10 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type TransactionStreamer struct {
nextAllowedFeedReorgLog time.Time

broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockHash
broadcasterQueuedMessagesPos uint64
broadcasterQueuedMessagesPos atomic.Uint64
broadcasterQueuedMessagesActiveReorg bool

coordinator *SeqCoordinator
Expand Down Expand Up @@ -491,14 +491,14 @@ func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreC
}

func (s *TransactionStreamer) FeedPendingMessageCount() arbutil.MessageIndex {
pos := atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)
pos := s.broadcasterQueuedMessagesPos.Load()
if pos == 0 {
return 0
}

s.insertionMutex.Lock()
defer s.insertionMutex.Unlock()
pos = atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)
pos = s.broadcasterQueuedMessagesPos.Load()
if pos == 0 {
return 0
}
Expand Down Expand Up @@ -552,14 +552,14 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe
if len(s.broadcasterQueuedMessages) == 0 || (feedReorg && !s.broadcasterQueuedMessagesActiveReorg) {
// Empty cache or feed different from database, save current feed messages until confirmed L1 messages catch up.
s.broadcasterQueuedMessages = messages
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos))
s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos))
s.broadcasterQueuedMessagesActiveReorg = feedReorg
} else {
broadcasterQueuedMessagesPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos))
broadcasterQueuedMessagesPos := arbutil.MessageIndex(s.broadcasterQueuedMessagesPos.Load())
if broadcasterQueuedMessagesPos >= broadcastStartPos {
// Feed messages older than cache
s.broadcasterQueuedMessages = messages
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos))
s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos))
s.broadcasterQueuedMessagesActiveReorg = feedReorg
} else if broadcasterQueuedMessagesPos+arbutil.MessageIndex(len(s.broadcasterQueuedMessages)) == broadcastStartPos {
// Feed messages can be added directly to end of cache
Expand All @@ -579,7 +579,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe
)
}
s.broadcasterQueuedMessages = messages
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos))
s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos))
s.broadcasterQueuedMessagesActiveReorg = feedReorg
}
}
Expand Down Expand Up @@ -795,7 +795,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil
var cacheClearLen int

messagesAfterPos := messageStartPos + arbutil.MessageIndex(len(messages))
broadcastStartPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos))
broadcastStartPos := arbutil.MessageIndex(s.broadcasterQueuedMessagesPos.Load())

if messagesAreConfirmed {
var duplicates int
Expand Down Expand Up @@ -903,10 +903,10 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil
// Check if new messages were added at the end of cache, if they were, then dont remove those particular messages
if len(s.broadcasterQueuedMessages) > cacheClearLen {
s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[cacheClearLen:]
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)+uint64(cacheClearLen))
s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos) + uint64(cacheClearLen))
} else {
s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[:0]
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, 0)
s.broadcasterQueuedMessagesPos.Store(0)
}
s.broadcasterQueuedMessagesActiveReorg = false
}
Expand Down
4 changes: 2 additions & 2 deletions arbos/programs/native_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

var apiObjects sync.Map
var apiIds uintptr // atomic and sequential
var apiIds atomic.Uintptr // atomic and sequential

type NativeApi struct {
handler RequestHandler
Expand All @@ -49,7 +49,7 @@ func newApi(
memoryModel *MemoryModel,
) NativeApi {
handler := newApiClosures(interpreter, tracingInfo, scope, memoryModel)
apiId := atomic.AddUintptr(&apiIds, 1)
apiId := apiIds.Add(1)
id := usize(apiId)
api := NativeApi{
handler: handler,
Expand Down
6 changes: 3 additions & 3 deletions broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ type BroadcastClient struct {
connMutex sync.Mutex
conn net.Conn

retryCount int64
retryCount atomic.Int64

retrying bool
shuttingDown bool
Expand Down Expand Up @@ -435,7 +435,7 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) {
}

func (bc *BroadcastClient) GetRetryCount() int64 {
return atomic.LoadInt64(&bc.retryCount)
return bc.retryCount.Load()
}

func (bc *BroadcastClient) isShuttingDown() bool {
Expand All @@ -458,7 +458,7 @@ func (bc *BroadcastClient) retryConnect(ctx context.Context) io.Reader {
case <-timer.C:
}

atomic.AddInt64(&bc.retryCount, 1)
bc.retryCount.Add(1)
earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum)
if err == nil {
bc.retrying = false
Expand Down
Loading
Loading