Skip to content

Commit

Permalink
do CompareAndSwap only necessary
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny2022da committed Aug 30, 2024
1 parent 2d75ded commit 4687c11
Showing 1 changed file with 43 additions and 32 deletions.
75 changes: 43 additions & 32 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,23 +543,28 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
nextIdx := p.mergedTxIndex.Load() + 1
if nextIdx < int32(len(p.allTxReqs)) {
nextMergeReq := p.allTxReqs[nextIdx]
if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) {
// execute.
res := p.executeInSlot(slotIndex, nextMergeReq)
if res != nil {
p.txResultChan <- res
if nextMergeReq.runnable == 1 {
if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) {
// execute.
res := p.executeInSlot(slotIndex, nextMergeReq)
if res != nil {
p.txResultChan <- res
}
}
}
}
// try the next req in loop sequence.
if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) {
continue
}
res := p.executeInSlot(slotIndex, txReq)
if res == nil {
continue

if txReq.runnable == 1 {
// try the next req in loop sequence.
if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) {
continue
}
res := p.executeInSlot(slotIndex, txReq)
if res == nil {
continue
}
p.txResultChan <- res
}
p.txResultChan <- res
}
// switched to the other slot.
if interrupted {
Expand Down Expand Up @@ -588,23 +593,27 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
nextIdx := p.mergedTxIndex.Load() + 1
if nextIdx < int32(len(p.allTxReqs)) {
nextMergeReq := p.allTxReqs[nextIdx]
if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) {
// execute.
res := p.executeInSlot(slotIndex, nextMergeReq)
if res != nil {
p.txResultChan <- res
if nextMergeReq.runnable == 1 {
if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) {
// execute.
res := p.executeInSlot(slotIndex, nextMergeReq)
if res != nil {
p.txResultChan <- res
}
}
}
}

if !atomic.CompareAndSwapInt32(&stealTxReq.runnable, 1, 0) {
continue
}
res := p.executeInSlot(slotIndex, stealTxReq)
if res == nil {
continue
if stealTxReq.runnable == 1 {
if !atomic.CompareAndSwapInt32(&stealTxReq.runnable, 1, 0) {
continue
}
res := p.executeInSlot(slotIndex, stealTxReq)
if res == nil {
continue
}
p.txResultChan <- res
}
p.txResultChan <- res
}
}
}
Expand Down Expand Up @@ -643,13 +652,15 @@ func (p *ParallelStateProcessor) runQuickMergeSlotLoop(slotIndex int, slotType i
if txReq.conflictIndex.Load() > p.mergedTxIndex.Load() {
break
}
if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) {
continue
}
res := p.executeInSlot(slotIndex, txReq)
if res != nil {
executed--
p.txResultChan <- res
if txReq.runnable == 1 {
if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) {
continue
}
res := p.executeInSlot(slotIndex, txReq)
if res != nil {
executed--
p.txResultChan <- res
}
}
}
}
Expand Down

0 comments on commit 4687c11

Please sign in to comment.