From 983d7ff3d39b79fa54a5b70e6d29a9e3a0dd39cc Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 18 Nov 2024 10:31:11 +0800 Subject: [PATCH] Fix the async broadcast & resolveFlushedLocks (#1493) close pingcap/tidb#57213 Signed-off-by: ekexium --- txnkv/transaction/pipelined_flush.go | 4 ++-- txnkv/transaction/txn.go | 36 +++++++++++++++------------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 8cadf63df..7d9dbbe24 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -486,7 +486,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end ) runner.SetStatLogInterval(30 * time.Second) - go func() { + c.txn.spawnWithStorePool(func() { if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil { logutil.Logger(bo.GetCtx()).Error("[pipelined dml] resolve flushed locks failed", zap.String("txn-status", status), @@ -529,5 +529,5 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end c.resourceGroupTag, ) } - }() + }) } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 7c124a678..4eaa84112 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -842,23 +842,27 @@ func (txn *KVTxn) Rollback() error { // no need to clean up locks when no flush triggered. pipelinedStart, pipelinedEnd := txn.committer.pipelinedCommitInfo.pipelinedStart, txn.committer.pipelinedCommitInfo.pipelinedEnd needCleanUpLocks := len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 - broadcastToAllStores( - txn, - txn.committer.store, - retry.NewBackofferWithVars( - txn.store.Ctx(), - broadcastMaxBackoff, - txn.committer.txn.vars, - ), - &kvrpcpb.TxnStatus{ - StartTs: txn.startTS, - MinCommitTs: txn.committer.minCommitTSMgr.get(), - CommitTs: 0, - RolledBack: true, - IsCompleted: !needCleanUpLocks, + txn.spawnWithStorePool( + func() { + broadcastToAllStores( + txn, + txn.committer.store, + retry.NewBackofferWithVars( + txn.store.Ctx(), + broadcastMaxBackoff, + txn.committer.txn.vars, + ), + &kvrpcpb.TxnStatus{ + StartTs: txn.startTS, + MinCommitTs: txn.committer.minCommitTSMgr.get(), + CommitTs: 0, + RolledBack: true, + IsCompleted: !needCleanUpLocks, + }, + txn.resourceGroupName, + txn.resourceGroupTag, + ) }, - txn.resourceGroupName, - txn.resourceGroupTag, ) if needCleanUpLocks { rollbackBo := retry.NewBackofferWithVars(txn.store.Ctx(), CommitSecondaryMaxBackoff, txn.vars)