Skip to content

Commit

Permalink
Merge pull request #2449 from OffchainLabs/atomic
Browse files Browse the repository at this point in the history
[NIT-2607] Change atomic.Store to atomic.Type
  • Loading branch information
joshuacolvin0 authored Jul 15, 2024
2 parents 1acbed1 + 0c6d4a0 commit c768543
Show file tree
Hide file tree
Showing 32 changed files with 157 additions and 152 deletions.
8 changes: 4 additions & 4 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type BatchPoster struct {
// This is an atomic variable that should only be accessed atomically.
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
backlog uint64
backlog atomic.Uint64
lastHitL1Bounds time.Time // The last time we wanted to post a message but hit the L1 bounds

batchReverted atomic.Bool // indicates whether data poster batch was reverted
Expand Down Expand Up @@ -1086,7 +1086,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if config.IgnoreBlobPrice {
use4844 = true
} else {
backlog := atomic.LoadUint64(&b.backlog)
backlog := b.backlog.Load()
// Logic to prevent switching from non-4844 batches to 4844 batches too often,
// so that blocks can be filled efficiently. The geth txpool rejects txs for
// accounts that already have the other type of txs in the pool with
Expand Down Expand Up @@ -1437,7 +1437,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
// Setting the backlog to 0 here ensures that we don't lower compression as a result.
backlog = 0
}
atomic.StoreUint64(&b.backlog, backlog)
b.backlog.Store(backlog)
b.building = nil

// If we aren't queueing up transactions, wait for the receipt before moving on to the next batch.
Expand All @@ -1453,7 +1453,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}

func (b *BatchPoster) GetBacklogEstimate() uint64 {
return atomic.LoadUint64(&b.backlog)
return b.backlog.Load()
}

func (b *BatchPoster) Start(ctxIn context.Context) {
Expand Down
16 changes: 8 additions & 8 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type InboxReader struct {
l1Reader *headerreader.HeaderReader

// Atomic
lastSeenBatchCount uint64
lastReadBatchCount uint64
lastSeenBatchCount atomic.Uint64
lastReadBatchCount atomic.Uint64
}

func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) {
Expand Down Expand Up @@ -240,7 +240,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
seenBatchCountStored := uint64(math.MaxUint64)
storeSeenBatchCount := func() {
if seenBatchCountStored != seenBatchCount {
atomic.StoreUint64(&r.lastSeenBatchCount, seenBatchCount)
r.lastSeenBatchCount.Store(seenBatchCount)
seenBatchCountStored = seenBatchCount
}
}
Expand Down Expand Up @@ -394,7 +394,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
// There's nothing to do
from = arbmath.BigAddByUint(currentHeight, 1)
blocksToFetch = config.DefaultBlocksToRead
atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount)
r.lastReadBatchCount.Store(checkingBatchCount)
storeSeenBatchCount()
if !r.caughtUp && readMode == "latest" {
r.caughtUp = true
Expand Down Expand Up @@ -526,7 +526,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}
if len(sequencerBatches) > 0 {
readAnyBatches = true
atomic.StoreUint64(&r.lastReadBatchCount, sequencerBatches[len(sequencerBatches)-1].SequenceNumber+1)
r.lastReadBatchCount.Store(sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1)
storeSeenBatchCount()
}
}
Expand All @@ -553,7 +553,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}

if !readAnyBatches {
atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount)
r.lastReadBatchCount.Store(checkingBatchCount)
storeSeenBatchCount()
}
}
Expand Down Expand Up @@ -625,15 +625,15 @@ func (r *InboxReader) GetSequencerMessageBytes(ctx context.Context, seqNum uint6
}

func (r *InboxReader) GetLastReadBatchCount() uint64 {
return atomic.LoadUint64(&r.lastReadBatchCount)
return r.lastReadBatchCount.Load()
}

// GetLastSeenBatchCount returns how many sequencer batches the inbox reader has read in from L1.
// Return values:
// >0 - last batchcount seen in run() - only written after lastReadBatchCount updated
// 0 - no batch seen, error
func (r *InboxReader) GetLastSeenBatchCount() uint64 {
return atomic.LoadUint64(&r.lastSeenBatchCount)
return r.lastSeenBatchCount.Load()
}

func (r *InboxReader) GetDelayBlocks() uint64 {
Expand Down
10 changes: 5 additions & 5 deletions arbnode/redislock/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Simple struct {
stopwaiter.StopWaiter
client redis.UniversalClient
config SimpleCfgFetcher
lockedUntil int64
lockedUntil atomic.Int64
mutex sync.Mutex
stopping bool
readyToLock func() bool
Expand Down Expand Up @@ -239,12 +239,12 @@ func execTestPipe(pipe redis.Pipeliner, ctx context.Context) error {
}

// notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter.
func atomicTimeRead(addr *int64) time.Time {
asint64 := atomic.LoadInt64(addr)
func atomicTimeRead(addr *atomic.Int64) time.Time {
asint64 := addr.Load()
return time.UnixMilli(asint64)
}

func atomicTimeWrite(addr *int64, t time.Time) {
func atomicTimeWrite(addr *atomic.Int64, t time.Time) {
asint64 := t.UnixMilli()
atomic.StoreInt64(addr, asint64)
addr.Store(asint64)
}
12 changes: 6 additions & 6 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type SeqCoordinator struct {
prevChosenSequencer string
reportedWantsLockout bool

lockoutUntil int64 // atomic
lockoutUntil atomic.Int64 // atomic

wantsLockoutMutex sync.Mutex // manages access to acquireLockoutAndWriteMessage and generally the wants lockout key
avoidLockout int // If > 0, prevents acquiring the lockout but not extending the lockout if no alternative sequencer wants the lockout. Protected by chosenUpdateMutex.
Expand Down Expand Up @@ -191,14 +191,14 @@ func StandaloneSeqCoordinatorInvalidateMsgIndex(ctx context.Context, redisClient
return nil
}

func atomicTimeWrite(addr *int64, t time.Time) {
func atomicTimeWrite(addr *atomic.Int64, t time.Time) {
asint64 := t.UnixMilli()
atomic.StoreInt64(addr, asint64)
addr.Store(asint64)
}

// notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter.
func atomicTimeRead(addr *int64) time.Time {
asint64 := atomic.LoadInt64(addr)
func atomicTimeRead(addr *atomic.Int64) time.Time {
asint64 := addr.Load()
return time.UnixMilli(asint64)
}

Expand Down Expand Up @@ -692,7 +692,7 @@ func (c *SeqCoordinator) DebugPrint() string {
return fmt.Sprint("Url:", c.config.Url(),
" prevChosenSequencer:", c.prevChosenSequencer,
" reportedWantsLockout:", c.reportedWantsLockout,
" lockoutUntil:", c.lockoutUntil,
" lockoutUntil:", c.lockoutUntil.Load(),
" redisErrors:", c.redisErrors)
}

Expand Down
18 changes: 9 additions & 9 deletions arbnode/seq_coordinator_atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ import (
const messagesPerRound = 20

type CoordinatorTestData struct {
messageCount uint64
messageCount atomic.Uint64

sequencer []string
err error
mutex sync.Mutex

waitForCoords sync.WaitGroup
testStartRound int32
testStartRound atomic.Int32
}

func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *CoordinatorTestData) {
nextRound := int32(0)
for {
sequenced := make([]bool, messagesPerRound)
for atomic.LoadInt32(&data.testStartRound) < nextRound {
for data.testStartRound.Load() < nextRound {
if ctx.Err() != nil {
return
}
Expand All @@ -44,7 +44,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo
nextRound++
var execError error
for {
messageCount := atomic.LoadUint64(&data.messageCount)
messageCount := data.messageCount.Load()
if messageCount >= messagesPerRound {
break
}
Expand All @@ -53,7 +53,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo
err := coord.acquireLockoutAndWriteMessage(ctx, asIndex, asIndex+1, &arbostypes.EmptyTestMessageWithMetadata)
if err == nil {
sequenced[messageCount] = true
atomic.StoreUint64(&data.messageCount, messageCount+1)
data.messageCount.Store(messageCount + 1)
randNr := rand.Intn(20)
if randNr > 15 {
execError = coord.chosenOneRelease(ctx)
Expand Down Expand Up @@ -105,9 +105,9 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {
coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true
coordConfig.Signer.Symmetric.SigningKey = ""
testData := CoordinatorTestData{
testStartRound: -1,
sequencer: make([]string, messagesPerRound),
sequencer: make([]string, messagesPerRound),
}
testData.testStartRound.Store(-1)
nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil)
Require(t, err)

Expand All @@ -134,12 +134,12 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {

for round := int32(0); round < 10; round++ {
redisClient.Del(ctx, redisutil.CHOSENSEQ_KEY, redisutil.MSG_COUNT_KEY)
testData.messageCount = 0
testData.messageCount.Store(0)
for i := 0; i < messagesPerRound; i++ {
testData.sequencer[i] = ""
}
testData.waitForCoords.Add(NumOfThreads)
atomic.StoreInt32(&testData.testStartRound, round)
testData.testStartRound.Store(round)
testData.waitForCoords.Wait()
Require(t, testData.err)
seqList := ""
Expand Down
14 changes: 7 additions & 7 deletions arbnode/simple_redis_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ const test_release_frac = 5
const test_delay = time.Millisecond
const test_redisKey_prefix = "__TEMP_SimpleRedisLockTest__"

func attemptLock(ctx context.Context, s *redislock.Simple, flag *int32, wg *sync.WaitGroup) {
func attemptLock(ctx context.Context, s *redislock.Simple, flag *atomic.Int32, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < test_attempts; i++ {
if s.AttemptLock(ctx) {
atomic.AddInt32(flag, 1)
flag.Add(1)
} else if rand.Intn(test_release_frac) == 0 {
s.Release(ctx)
}
Expand Down Expand Up @@ -76,17 +76,17 @@ func simpleRedisLockTest(t *testing.T, redisKeySuffix string, chosen int, backgo
<-time.After(time.Second)
}
wg := sync.WaitGroup{}
counters := make([]int32, test_threads)
counters := make([]atomic.Int32, test_threads)
for i, lock := range locks {
wg.Add(1)
go attemptLock(ctx, lock, &counters[i], &wg)
}
wg.Wait()
successful := -1
for i, counter := range counters {
if counter != 0 {
if counter != test_attempts {
t.Fatalf("counter %d value %d", i, counter)
for i := range counters {
if counters[i].Load() != 0 {
if counters[i].Load() != test_attempts {
t.Fatalf("counter %d value %d", i, counters[i].Load())
}
if successful > 0 {
t.Fatalf("counter %d and %d both positive", i, successful)
Expand Down
20 changes: 10 additions & 10 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type TransactionStreamer struct {
nextAllowedFeedReorgLog time.Time

broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockHash
broadcasterQueuedMessagesPos uint64
broadcasterQueuedMessagesPos atomic.Uint64
broadcasterQueuedMessagesActiveReorg bool

coordinator *SeqCoordinator
Expand Down Expand Up @@ -491,14 +491,14 @@ func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreC
}

func (s *TransactionStreamer) FeedPendingMessageCount() arbutil.MessageIndex {
pos := atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)
pos := s.broadcasterQueuedMessagesPos.Load()
if pos == 0 {
return 0
}

s.insertionMutex.Lock()
defer s.insertionMutex.Unlock()
pos = atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)
pos = s.broadcasterQueuedMessagesPos.Load()
if pos == 0 {
return 0
}
Expand Down Expand Up @@ -552,14 +552,14 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe
if len(s.broadcasterQueuedMessages) == 0 || (feedReorg && !s.broadcasterQueuedMessagesActiveReorg) {
// Empty cache or feed different from database, save current feed messages until confirmed L1 messages catch up.
s.broadcasterQueuedMessages = messages
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos))
s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos))
s.broadcasterQueuedMessagesActiveReorg = feedReorg
} else {
broadcasterQueuedMessagesPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos))
broadcasterQueuedMessagesPos := arbutil.MessageIndex(s.broadcasterQueuedMessagesPos.Load())
if broadcasterQueuedMessagesPos >= broadcastStartPos {
// Feed messages older than cache
s.broadcasterQueuedMessages = messages
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos))
s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos))
s.broadcasterQueuedMessagesActiveReorg = feedReorg
} else if broadcasterQueuedMessagesPos+arbutil.MessageIndex(len(s.broadcasterQueuedMessages)) == broadcastStartPos {
// Feed messages can be added directly to end of cache
Expand All @@ -579,7 +579,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe
)
}
s.broadcasterQueuedMessages = messages
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos))
s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos))
s.broadcasterQueuedMessagesActiveReorg = feedReorg
}
}
Expand Down Expand Up @@ -795,7 +795,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil
var cacheClearLen int

messagesAfterPos := messageStartPos + arbutil.MessageIndex(len(messages))
broadcastStartPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos))
broadcastStartPos := arbutil.MessageIndex(s.broadcasterQueuedMessagesPos.Load())

if messagesAreConfirmed {
var duplicates int
Expand Down Expand Up @@ -903,10 +903,10 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil
// Check if new messages were added at the end of cache, if they were, then dont remove those particular messages
if len(s.broadcasterQueuedMessages) > cacheClearLen {
s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[cacheClearLen:]
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)+uint64(cacheClearLen))
s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos) + uint64(cacheClearLen))
} else {
s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[:0]
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, 0)
s.broadcasterQueuedMessagesPos.Store(0)
}
s.broadcasterQueuedMessagesActiveReorg = false
}
Expand Down
4 changes: 2 additions & 2 deletions arbos/programs/native_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

var apiObjects sync.Map
var apiIds uintptr // atomic and sequential
var apiIds atomic.Uintptr // atomic and sequential

type NativeApi struct {
handler RequestHandler
Expand All @@ -49,7 +49,7 @@ func newApi(
memoryModel *MemoryModel,
) NativeApi {
handler := newApiClosures(interpreter, tracingInfo, scope, memoryModel)
apiId := atomic.AddUintptr(&apiIds, 1)
apiId := apiIds.Add(1)
id := usize(apiId)
api := NativeApi{
handler: handler,
Expand Down
6 changes: 3 additions & 3 deletions broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ type BroadcastClient struct {
connMutex sync.Mutex
conn net.Conn

retryCount int64
retryCount atomic.Int64

retrying bool
shuttingDown bool
Expand Down Expand Up @@ -435,7 +435,7 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) {
}

func (bc *BroadcastClient) GetRetryCount() int64 {
return atomic.LoadInt64(&bc.retryCount)
return bc.retryCount.Load()
}

func (bc *BroadcastClient) isShuttingDown() bool {
Expand All @@ -458,7 +458,7 @@ func (bc *BroadcastClient) retryConnect(ctx context.Context) io.Reader {
case <-timer.C:
}

atomic.AddInt64(&bc.retryCount, 1)
bc.retryCount.Add(1)
earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum)
if err == nil {
bc.retrying = false
Expand Down
Loading

0 comments on commit c768543

Please sign in to comment.