Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add finalizer component to TXM #13638

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
633dcf4
Added a finalizer component that assesses confirmed transactions for …
amit-momin Jun 12, 2024
af1e9a0
Moved Finalizer component into EVM code and addressed feedback
amit-momin Jun 24, 2024
8af7d77
Fixed linting and renumbered sql migration
amit-momin Jun 24, 2024
c809c1a
Added limit to Finalizer RPC batch calls
amit-momin Jun 25, 2024
4dab78b
Cleaned up unneeded code
amit-momin Jun 26, 2024
37807b9
Renumbered sql migration
amit-momin Jun 28, 2024
2a47486
Updated Finalizer to use LatestAndFinalizedBlock method from HeadTracker
amit-momin Jul 1, 2024
a65977d
Fixed health check tests and fixed linting
amit-momin Jul 1, 2024
a4dc48b
Fixed lint error
amit-momin Jul 1, 2024
599edac
Fixed lint error
amit-momin Jul 1, 2024
911d342
Added finalized state to replace finalized column
amit-momin Jul 3, 2024
005d972
Updated finalizer batch RPC validation to use blockByNumber and added…
amit-momin Jul 10, 2024
db94c84
Updated reaper to reap old confirmed transactions
amit-momin Jul 10, 2024
ac2adbf
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jul 10, 2024
ffe0dc4
Fixed migration test
amit-momin Jul 10, 2024
03b92f0
Fixed lint error
amit-momin Jul 10, 2024
21443a3
Changed log level
amit-momin Jul 11, 2024
8a3cda3
Merge remote-tracking branch 'origin/develop' into BCI-3486-implement…
amit-momin Jul 15, 2024
e3a8ab3
Renumbered sql migration
amit-momin Jul 16, 2024
3d373e0
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jul 16, 2024
a7d398d
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jul 17, 2024
27f71de
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
silaslenihan Jul 24, 2024
ecba4ca
Updated Finalizer to only process on new finalized heads and improved…
amit-momin Jul 24, 2024
523fc80
Fixed mocks
amit-momin Jul 29, 2024
84f8c28
Updated TxStore method name and fixed mocks
amit-momin Jul 29, 2024
92a7383
Fixed mock
amit-momin Jul 30, 2024
e5812da
Updated TxStore method to exit early
amit-momin Jul 30, 2024
1367e78
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jul 30, 2024
f8d2487
Removed unused error
amit-momin Jul 30, 2024
5a7d52a
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Jul 30, 2024
da5c78e
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
silaslenihan Aug 5, 2024
e707ede
Merge branch 'develop' into BCI-3486-implement-finality-check-for-the…
amit-momin Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/itchy-bugs-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Introduced finalized transaction state. Added a finalizer component to the TXM to mark transactions as finalized. #internal
1 change: 1 addition & 0 deletions common/txmgr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ const (
TxUnconfirmed = txmgrtypes.TxState("unconfirmed")
TxConfirmed = txmgrtypes.TxState("confirmed")
TxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt")
TxFinalized = txmgrtypes.TxState("finalized")
)
9 changes: 3 additions & 6 deletions common/txmgr/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
// Reaper handles periodic database cleanup for Txm
type Reaper[CHAIN_ID types.ID] struct {
store txmgrtypes.TxHistoryReaper[CHAIN_ID]
config txmgrtypes.ReaperChainConfig
txConfig txmgrtypes.ReaperTransactionsConfig
chainID CHAIN_ID
log logger.Logger
Expand All @@ -25,10 +24,9 @@ type Reaper[CHAIN_ID types.ID] struct {
}

// NewReaper instantiates a new reaper object
func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], config txmgrtypes.ReaperChainConfig, txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID] {
func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID] {
r := &Reaper[CHAIN_ID]{
store,
config,
txConfig,
chainID,
logger.Named(lggr, "Reaper"),
Expand Down Expand Up @@ -103,13 +101,12 @@ func (r *Reaper[CHAIN_ID]) ReapTxes(headNum int64) error {
r.log.Debug("Transactions.ReaperThreshold set to 0; skipping ReapTxes")
return nil
}
minBlockNumberToKeep := headNum - int64(r.config.FinalityDepth())
mark := time.Now()
timeThreshold := mark.Add(-threshold)

r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold, "minBlockNumberToKeep", minBlockNumberToKeep)
r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold)

if err := r.store.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, r.chainID); err != nil {
if err := r.store.ReapTxHistory(ctx, timeThreshold, r.chainID); err != nil {
return err
}

Expand Down
20 changes: 17 additions & 3 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type Txm[
broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newErrorClassifier NewErrorClassifier
Expand Down Expand Up @@ -143,6 +144,7 @@ func NewTxm[
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD],
newErrorClassifierFunc NewErrorClassifier,
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
Expand All @@ -165,13 +167,14 @@ func NewTxm[
resender: resender,
tracker: tracker,
newErrorClassifier: newErrorClassifierFunc,
finalizer: finalizer,
}

if txCfg.ResendAfterThreshold() <= 0 {
b.logger.Info("Resender: Disabled")
}
if txCfg.ReaperThreshold() > 0 && txCfg.ReaperInterval() > 0 {
b.reaper = NewReaper[CHAIN_ID](lggr, b.txStore, cfg, txCfg, chainId)
b.reaper = NewReaper[CHAIN_ID](lggr, b.txStore, txCfg, chainId)
} else {
b.logger.Info("TxReaper: Disabled")
}
Expand Down Expand Up @@ -199,6 +202,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return fmt.Errorf("Txm: Tracker failed to start: %w", err)
}

if err := ms.Start(ctx, b.finalizer); err != nil {
return fmt.Errorf("Txm: Finalizer failed to start: %w", err)
}

b.logger.Info("Txm starting runLoop")
b.wg.Add(1)
go b.runLoop()
Expand Down Expand Up @@ -293,6 +300,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HealthRepo
services.CopyHealth(report, b.broadcaster.HealthReport())
services.CopyHealth(report, b.confirmer.HealthReport())
services.CopyHealth(report, b.txAttemptBuilder.HealthReport())
services.CopyHealth(report, b.finalizer.HealthReport())
})

if b.txConfig.ForwardersEnabled() {
Expand Down Expand Up @@ -415,6 +423,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
case head := <-b.chHeads:
b.confirmer.mb.Deliver(head)
b.tracker.mb.Deliver(head.BlockNumber())
b.finalizer.DeliverLatestHead(head)
case reset := <-b.reset:
// This check prevents the weird edge-case where you can select
// into this block after chStop has already been closed and the
Expand Down Expand Up @@ -446,6 +455,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err)
}
err = b.finalizer.Close()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
return
case <-keysChanged:
// This check prevents the weird edge-case where you can select
Expand Down Expand Up @@ -644,9 +657,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransac
// Return unconfirmed for ConfirmedMissingReceipt since a receipt is required to determine if it is finalized
return commontypes.Unconfirmed, nil
case TxConfirmed:
// TODO: Check for finality and return finalized status
// Return unconfirmed if tx receipt's block is newer than the latest finalized block
// Return unconfirmed for confirmed transactions because they are not yet finalized
return commontypes.Unconfirmed, nil
case TxFinalized:
return commontypes.Finalized, nil
case TxFatalError:
// Use an ErrorClassifier to determine if the transaction is considered Fatal
txErr := b.newErrorClassifier(tx.GetError())
Expand Down
6 changes: 0 additions & 6 deletions common/txmgr/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import "time"
type TransactionManagerChainConfig interface {
BroadcasterChainConfig
ConfirmerChainConfig
ReaperChainConfig
}

type TransactionManagerFeeConfig interface {
Expand Down Expand Up @@ -74,11 +73,6 @@ type ResenderTransactionsConfig interface {
MaxInFlight() uint32
}

// ReaperConfig is the config subset used by the reaper
type ReaperChainConfig interface {
FinalityDepth() uint32
}

type ReaperTransactionsConfig interface {
ReaperInterval() time.Duration
ReaperThreshold() time.Duration
Expand Down
12 changes: 12 additions & 0 deletions common/txmgr/types/finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package types

import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

type Finalizer[BLOCK_HASH types.Hashable, HEAD types.Head[BLOCK_HASH]] interface {
// interfaces for running the underlying estimator
services.Service
DeliverLatestHead(head HEAD) bool
}
77 changes: 0 additions & 77 deletions common/txmgr/types/mocks/reaper_chain_config.go

This file was deleted.

80 changes: 10 additions & 70 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,10 @@ type TransactionStore[
UpdateTxUnstartedToInProgress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
UpdateTxFatalError(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
UpdateTxForRebroadcast(ctx context.Context, etx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (finalized bool, err error)
}

type TxHistoryReaper[CHAIN_ID types.ID] interface {
ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error
ReapTxHistory(ctx context.Context, timeThreshold time.Time, chainID CHAIN_ID) error
}

type UnstartedTxQueuePruner interface {
Expand Down
Loading
Loading