Skip to content

Commit

Permalink
add timer of tx life
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny2022da committed Aug 22, 2024
1 parent b685b8b commit 8176f78
Showing 1 changed file with 55 additions and 20 deletions.
75 changes: 55 additions & 20 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,23 @@ type SlotState struct {
}

type ParallelTxResult struct {
executedIndex int32 // record the current execute number of the tx
slotIndex int
txReq *ParallelTxRequest
receipt *types.Receipt
slotDB *state.ParallelStateDB
gpSlot *GasPool
evm *vm.EVM
result *ExecutionResult
originalNonce *uint64
err error
executedIndex int32 // record the current execute number of the tx
slotIndex int
txReq *ParallelTxRequest
receipt *types.Receipt
slotDB *state.ParallelStateDB
gpSlot *GasPool
evm *vm.EVM
result *ExecutionResult
originalNonce *uint64
err error
resultSendTime time.Time
resultReceiveTime time.Time
resultCheckTime time.Time
resultConfirmTime time.Time
resultSendStage2ChanTime time.Time
resultConfirmEndTime time.Time
resultMergeTime time.Time
}

type ParallelTxRequest struct {
Expand Down Expand Up @@ -305,16 +312,23 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR

evm, result, err := applyTransactionStageExecution(txReq.msg, gpSlot, slotDB, vmenv, p.delayGasFee)
txResult := ParallelTxResult{
executedIndex: execNum,
slotIndex: slotIndex,
txReq: txReq,
receipt: nil,
slotDB: slotDB,
err: err,
gpSlot: gpSlot,
evm: evm,
result: result,
originalNonce: &on,
executedIndex: execNum,
slotIndex: slotIndex,
txReq: txReq,
receipt: nil,
slotDB: slotDB,
err: err,
gpSlot: gpSlot,
evm: evm,
result: result,
originalNonce: &on,
resultSendTime: time.Time{},
resultReceiveTime: time.Time{},
resultCheckTime: time.Time{},
resultConfirmTime: time.Time{},
resultSendStage2ChanTime: time.Time{},
resultMergeTime: time.Time{},
resultConfirmEndTime: time.Time{},
}

if err == nil {
Expand Down Expand Up @@ -389,6 +403,7 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo
return nil
}
targetResult = results[len(results)-1]
targetResult.resultCheckTime = time.Now()
// last is the freshest, stack based priority
p.pendingConfirmResults[targetTxIndex] = p.pendingConfirmResults[targetTxIndex][:resultsLen-1] // remove from the queue
}
Expand Down Expand Up @@ -536,6 +551,7 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
if res == nil {
continue
}
res.resultSendTime = time.Now()
p.txResultChan <- res
}
totalTryRunTxDur += time.Since(innerLoopBeforeTime)
Expand Down Expand Up @@ -565,6 +581,7 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
if res == nil {
continue
}
res.resultSendTime = time.Now()
p.txResultChan <- res
}
totalTryRunTxDur += time.Since(stealLoopBeforeTime)
Expand Down Expand Up @@ -610,13 +627,15 @@ func (p *ParallelStateProcessor) handleTxResults() *ParallelTxResult {
if confirmedResult == nil {
return nil
}
confirmedResult.resultConfirmTime = time.Now()
// schedule stage 2 when new Tx has been merged, schedule once and ASAP
// stage 2,if all tx have been executed at least once, and its result has been received.
// in Stage 2, we will run check when main DB is advanced, i.e., new Tx result has been merged.
if p.inConfirmStage2 && int(p.mergedTxIndex.Load()) >= p.nextStage2TxIndex {
p.nextStage2TxIndex = int(p.mergedTxIndex.Load()) + stage2CheckNumber
p.confirmStage2Chan <- int(p.mergedTxIndex.Load())
}
confirmedResult.resultSendStage2ChanTime = time.Now()
return confirmedResult
}

Expand Down Expand Up @@ -649,6 +668,7 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, gp *Ga

// merge slotDB into mainDB
statedb.MergeSlotDB(result.slotDB, result.receipt, resultTxIndex, result.result.delayFees)
result.resultMergeTime = time.Now()
mergeDuration := time.Since(start) - conflictCheckDur
delayGasFee := result.result.delayFees
// add delayed gas fee
Expand Down Expand Up @@ -689,6 +709,20 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, gp *Ga
default:
}
}
result.resultConfirmEndTime = time.Now()
resultReceiveDur := result.resultReceiveTime.Sub(result.resultSendTime)
resultReceiveToCheck := result.resultCheckTime.Sub(result.resultReceiveTime)
resultCheckToConfirm := result.resultConfirmTime.Sub(result.resultCheckTime)
resultSendStage2ChanDur := result.resultSendStage2ChanTime.Sub(result.resultConfirmTime)
resultMergeDur := result.resultMergeTime.Sub(result.resultSendStage2ChanTime)
resultTriggerChanDur := result.resultConfirmEndTime.Sub(result.resultMergeTime)
log.Debug("ConfirmTxResult", "TX", result.txReq.txIndex,
"resultReceiveDur", common.PrettyDuration(resultReceiveDur),
"resultReceiveToCheck", common.PrettyDuration(resultReceiveToCheck),
"resultConfirmDur", common.PrettyDuration(resultCheckToConfirm),
"resultSendStage2ChanDur", common.PrettyDuration(resultSendStage2ChanDur),
"resultMergeDur", common.PrettyDuration(resultMergeDur),
"resultTriggerChainDur", common.PrettyDuration(resultTriggerChanDur))
return result, conflictCheckDur, mergeDuration
}

Expand Down Expand Up @@ -851,6 +885,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
break
}
unconfirmedResult := <-p.txResultChan
unconfirmedResult.resultReceiveTime = time.Now()
unconfirmedTxIndex := unconfirmedResult.txReq.txIndex
if unconfirmedTxIndex <= int(p.mergedTxIndex.Load()) {
log.Debug("drop merged txReq", "unconfirmedTxIndex", unconfirmedTxIndex, "p.mergedTxIndex", p.mergedTxIndex.Load())
Expand Down

0 comments on commit 8176f78

Please sign in to comment.