diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 23a3f9b7e2..499f6231a6 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -543,23 +543,28 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) { nextIdx := p.mergedTxIndex.Load() + 1 if nextIdx < int32(len(p.allTxReqs)) { nextMergeReq := p.allTxReqs[nextIdx] - if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) { - // execute. - res := p.executeInSlot(slotIndex, nextMergeReq) - if res != nil { - p.txResultChan <- res + if nextMergeReq.runnable == 1 { + if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) { + // execute. + res := p.executeInSlot(slotIndex, nextMergeReq) + if res != nil { + p.txResultChan <- res + } } } } - // try the next req in loop sequence. - if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) { - continue - } - res := p.executeInSlot(slotIndex, txReq) - if res == nil { - continue + + if txReq.runnable == 1 { + // try the next req in loop sequence. + if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) { + continue + } + res := p.executeInSlot(slotIndex, txReq) + if res == nil { + continue + } + p.txResultChan <- res } - p.txResultChan <- res } // switched to the other slot. if interrupted { @@ -588,23 +593,27 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) { nextIdx := p.mergedTxIndex.Load() + 1 if nextIdx < int32(len(p.allTxReqs)) { nextMergeReq := p.allTxReqs[nextIdx] - if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) { - // execute. - res := p.executeInSlot(slotIndex, nextMergeReq) - if res != nil { - p.txResultChan <- res + if nextMergeReq.runnable == 1 { + if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) { + // execute. + res := p.executeInSlot(slotIndex, nextMergeReq) + if res != nil { + p.txResultChan <- res + } } } } - if !atomic.CompareAndSwapInt32(&stealTxReq.runnable, 1, 0) { - continue - } - res := p.executeInSlot(slotIndex, stealTxReq) - if res == nil { - continue + if stealTxReq.runnable == 1 { + if !atomic.CompareAndSwapInt32(&stealTxReq.runnable, 1, 0) { + continue + } + res := p.executeInSlot(slotIndex, stealTxReq) + if res == nil { + continue + } + p.txResultChan <- res } - p.txResultChan <- res } } } @@ -643,13 +652,15 @@ func (p *ParallelStateProcessor) runQuickMergeSlotLoop(slotIndex int, slotType i if txReq.conflictIndex.Load() > p.mergedTxIndex.Load() { break } - if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) { - continue - } - res := p.executeInSlot(slotIndex, txReq) - if res != nil { - executed-- - p.txResultChan <- res + if txReq.runnable == 1 { + if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) { + continue + } + res := p.executeInSlot(slotIndex, txReq) + if res != nil { + executed-- + p.txResultChan <- res + } } } }