From 306eadcf505b54502b7112efc395457d4bd0f3c5 Mon Sep 17 00:00:00 2001 From: Dimitris Grigoriou Date: Tue, 12 Dec 2023 14:35:33 +0200 Subject: [PATCH] Remove core utils dependencies from common (#11425) * Change difficulty from Big to BigInt * Fix headtracker mock head * Remove EsnureClosed * Fix mock heads * Migrate to common Mailbox * Fix Tracker close on txm * Change to EnsureHexPrefix * Change names to mailbox * Remove core/null dependency from common * Remove core mailbox * Fix dependencies * Tidy * Fix dependencies * Change path to internal utils * Minor fixes * Rename MinKey function * Update MinFunc * Fix utils conflicts --- common/client/node_lifecycle.go | 7 +- common/client/send_only_node_lifecycle.go | 2 +- common/headtracker/head_broadcaster.go | 6 +- common/headtracker/head_listener.go | 2 +- common/headtracker/head_tracker.go | 14 +- common/internal/utils/utils.go | 36 ++++ common/txmgr/broadcaster.go | 2 +- common/txmgr/confirmer.go | 14 +- common/txmgr/reaper.go | 2 +- common/txmgr/resender.go | 2 +- common/txmgr/tracker.go | 8 +- common/txmgr/txmgr.go | 8 +- common/txmgr/types/tx.go | 3 +- .../chains/evm/gas/block_history_estimator.go | 6 +- .../evm/headtracker/head_broadcaster_test.go | 3 +- core/chains/evm/headtracker/head_tracker.go | 5 +- .../evm/headtracker/head_tracker_test.go | 9 +- core/chains/evm/log/broadcaster.go | 13 +- core/chains/evm/log/helpers_internal_test.go | 5 +- core/chains/evm/log/helpers_test.go | 4 +- core/chains/evm/log/integration_test.go | 3 +- core/chains/evm/txmgr/evm_tx_store.go | 3 +- core/chains/legacyevm/chain.go | 3 +- core/chains/legacyevm/chain_test.go | 7 +- core/cmd/shell.go | 3 +- core/cmd/shell_local_test.go | 6 +- core/internal/cltest/cltest.go | 3 +- core/internal/testutils/evmtest/evmtest.go | 5 +- core/services/chainlink/application.go | 3 +- .../relayer_chain_interoperators_test.go | 6 +- core/services/directrequest/delegate.go | 18 +- core/services/directrequest/delegate_test.go | 5 +- core/services/functions/listener_test.go | 4 +- core/services/job/runner_integration_test.go | 12 +- core/services/job/spawner_test.go | 9 +- core/services/keeper/delegate.go | 6 +- .../keeper/registry_synchronizer_core.go | 10 +- .../registry_synchronizer_helper_test.go | 4 +- core/services/keeper/upkeep_executer.go | 7 +- core/services/ocr/contract_tracker.go | 10 +- core/services/ocr/contract_tracker_test.go | 4 +- core/services/ocr/delegate.go | 5 +- core/services/ocr2/delegate.go | 6 +- .../services/ocr2/plugins/functions/plugin.go | 3 +- core/services/pipeline/task.eth_tx.go | 2 +- core/services/pipeline/task.eth_tx_test.go | 2 +- core/services/promreporter/prom_reporter.go | 7 +- core/services/vrf/delegate.go | 8 +- core/services/vrf/delegate_test.go | 5 +- core/services/vrf/v1/listener_v1.go | 5 +- core/services/vrf/v2/listener_v2.go | 1 + core/services/vrf/v2/listener_v2_test.go | 3 +- core/utils/mailbox.go | 126 ------------ core/utils/mailbox_prom.go | 93 --------- core/utils/mailbox_test.go | 181 ------------------ core/utils/utils.go | 21 -- 56 files changed, 200 insertions(+), 550 deletions(-) create mode 100644 common/internal/utils/utils.go delete mode 100644 core/utils/mailbox.go delete mode 100644 core/utils/mailbox_prom.go delete mode 100644 core/utils/mailbox_test.go diff --git a/common/client/node_lifecycle.go b/common/client/node_lifecycle.go index 5ba0bff3238..eda137d5100 100644 --- a/common/client/node_lifecycle.go +++ b/common/client/node_lifecycle.go @@ -12,9 +12,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils" bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math" - "github.com/smartcontractkit/chainlink/v2/core/utils" + iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils" ) var ( @@ -360,7 +361,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() { lggr := logger.Named(n.lfcLog, "Unreachable") lggr.Debugw("Trying to revive unreachable RPC node", "nodeState", n.State()) - dialRetryBackoff := utils.NewRedialBackoff() + dialRetryBackoff := iutils.NewRedialBackoff() for { select { @@ -416,7 +417,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() { lggr := logger.Named(n.lfcLog, "InvalidChainID") lggr.Debugw(fmt.Sprintf("Periodically re-checking RPC node %s with invalid chain ID", n.String()), "nodeState", n.State()) - chainIDRecheckBackoff := utils.NewRedialBackoff() + chainIDRecheckBackoff := iutils.NewRedialBackoff() for { select { diff --git a/common/client/send_only_node_lifecycle.go b/common/client/send_only_node_lifecycle.go index 4d5b102b5bd..c66d267ed42 100644 --- a/common/client/send_only_node_lifecycle.go +++ b/common/client/send_only_node_lifecycle.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/smartcontractkit/chainlink/v2/core/utils" + "github.com/smartcontractkit/chainlink/v2/common/internal/utils" ) // verifyLoop may only be triggered once, on Start, if initial chain ID check diff --git a/common/headtracker/head_broadcaster.go b/common/headtracker/head_broadcaster.go index 0e676f864fa..758a7713846 100644 --- a/common/headtracker/head_broadcaster.go +++ b/common/headtracker/head_broadcaster.go @@ -9,9 +9,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) const TrackableCallbackTimeout = 2 * time.Second @@ -30,7 +30,7 @@ type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct services.StateMachine logger logger.Logger callbacks callbackSet[H, BLOCK_HASH] - mailbox *utils.Mailbox[H] + mailbox *mailbox.Mailbox[H] mutex sync.Mutex chClose services.StopChan wgDone sync.WaitGroup @@ -48,7 +48,7 @@ func NewHeadBroadcaster[ return &HeadBroadcaster[H, BLOCK_HASH]{ logger: logger.Named(lggr, "HeadBroadcaster"), callbacks: make(callbackSet[H, BLOCK_HASH]), - mailbox: utils.NewSingleMailbox[H](), + mailbox: mailbox.NewSingle[H](), chClose: make(chan struct{}), } } diff --git a/common/headtracker/head_listener.go b/common/headtracker/head_listener.go index 0aebf606634..e7ea4fb51ae 100644 --- a/common/headtracker/head_listener.go +++ b/common/headtracker/head_listener.go @@ -14,8 +14,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" + "github.com/smartcontractkit/chainlink/v2/common/internal/utils" "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var ( diff --git a/common/headtracker/head_tracker.go b/common/headtracker/head_tracker.go index c977eb023cc..373aa5a958f 100644 --- a/common/headtracker/head_tracker.go +++ b/common/headtracker/head_tracker.go @@ -12,10 +12,10 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var ( @@ -43,14 +43,14 @@ type HeadTracker[ log logger.Logger headBroadcaster types.HeadBroadcaster[HTH, BLOCK_HASH] headSaver types.HeadSaver[HTH, BLOCK_HASH] - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor client htrktypes.Client[HTH, S, ID, BLOCK_HASH] chainID ID config htrktypes.Config htConfig htrktypes.HeadTrackerConfig - backfillMB *utils.Mailbox[HTH] - broadcastMB *utils.Mailbox[HTH] + backfillMB *mailbox.Mailbox[HTH] + broadcastMB *mailbox.Mailbox[HTH] headListener types.HeadListener[HTH, BLOCK_HASH] chStop services.StopChan wgDone sync.WaitGroup @@ -70,7 +70,7 @@ func NewHeadTracker[ htConfig htrktypes.HeadTrackerConfig, headBroadcaster types.HeadBroadcaster[HTH, BLOCK_HASH], headSaver types.HeadSaver[HTH, BLOCK_HASH], - mailMon *utils.MailboxMonitor, + mailMon *mailbox.Monitor, getNilHead func() HTH, ) types.HeadTracker[HTH, BLOCK_HASH] { chStop := make(chan struct{}) @@ -82,8 +82,8 @@ func NewHeadTracker[ config: config, htConfig: htConfig, log: lggr, - backfillMB: utils.NewSingleMailbox[HTH](), - broadcastMB: utils.NewMailbox[HTH](HeadsBufferSize), + backfillMB: mailbox.NewSingle[HTH](), + broadcastMB: mailbox.New[HTH](HeadsBufferSize), chStop: chStop, headListener: NewHeadListener[HTH, S, ID, BLOCK_HASH](lggr, client, config, chStop), headSaver: headSaver, diff --git a/common/internal/utils/utils.go b/common/internal/utils/utils.go new file mode 100644 index 00000000000..1e285868c53 --- /dev/null +++ b/common/internal/utils/utils.go @@ -0,0 +1,36 @@ +package utils + +import ( + "cmp" + "slices" + "time" + + "github.com/jpillora/backoff" + "golang.org/x/exp/constraints" +) + +// NewRedialBackoff is a standard backoff to use for redialling or reconnecting to +// unreachable network endpoints +func NewRedialBackoff() backoff.Backoff { + return backoff.Backoff{ + Min: 1 * time.Second, + Max: 15 * time.Second, + Jitter: true, + } + +} + +// MinFunc returns the minimum value of the given element array with respect +// to the given key function. In the event U is not a compound type (e.g a +// struct) an identity function can be provided. +func MinFunc[U any, T constraints.Ordered](elems []U, f func(U) T) T { + var min T + if len(elems) == 0 { + return min + } + + e := slices.MinFunc(elems, func(a, b U) int { + return cmp.Compare(f(a), f(b)) + }) + return f(e) +} diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index f10ecafc670..dba2b976c33 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -18,12 +18,12 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chains/label" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink/v2/common/client" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) const ( diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index 95be9ad23e6..aabdf45ae32 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -17,13 +17,15 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chains/label" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/common/client" commonfee "github.com/smartcontractkit/chainlink/v2/common/fee" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) const ( @@ -129,7 +131,7 @@ type Confirmer[ ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] enabledAddresses []ADDR - mb *utils.Mailbox[HEAD] + mb *mailbox.Mailbox[HEAD] ctx context.Context ctxCancel context.CancelFunc wg sync.WaitGroup @@ -174,7 +176,7 @@ func NewConfirmer[ dbConfig: dbConfig, chainID: client.ConfiguredChainID(), ks: keystore, - mb: utils.NewSingleMailbox[HEAD](), + mb: mailbox.NewSingle[HEAD](), isReceiptNil: isReceiptNil, } } @@ -223,7 +225,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) clo ec.initSync.Lock() defer ec.initSync.Unlock() if !ec.isStarted { - return fmt.Errorf("Confirmer is not started: %w", utils.ErrAlreadyStopped) + return fmt.Errorf("Confirmer is not started: %w", services.ErrAlreadyStopped) } ec.ctxCancel() ec.wg.Wait() @@ -869,7 +871,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han "err", sendError, "fee", attempt.TxFee, "feeLimit", etx.FeeLimit, - "signedRawTx", utils.AddHexPrefix(hex.EncodeToString(attempt.SignedRawTx)), + "signedRawTx", utils.EnsureHexPrefix(hex.EncodeToString(attempt.SignedRawTx)), "blockHeight", blockHeight, ) ec.SvcErrBuffer.Append(sendError) @@ -1147,7 +1149,7 @@ func observeUntilTxConfirmed[ // Since a tx can have many attempts, we take the number of blocks to confirm as the block number // of the receipt minus the block number of the first ever broadcast for this transaction. - broadcastBefore := utils.MinKey(attempt.Tx.TxAttempts, func(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) int64 { + broadcastBefore := iutils.MinFunc(attempt.Tx.TxAttempts, func(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) int64 { if attempt.BroadcastBeforeBlockNum != nil { return *attempt.BroadcastBeforeBlockNum } diff --git a/common/txmgr/reaper.go b/common/txmgr/reaper.go index 385a9a17c3d..3ed05b2caee 100644 --- a/common/txmgr/reaper.go +++ b/common/txmgr/reaper.go @@ -7,10 +7,10 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) // Reaper handles periodic database cleanup for Txm diff --git a/common/txmgr/resender.go b/common/txmgr/resender.go index 06c466e1730..74cf3d1389c 100644 --- a/common/txmgr/resender.go +++ b/common/txmgr/resender.go @@ -8,12 +8,12 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chains/label" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink/v2/common/client" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) const ( diff --git a/common/txmgr/tracker.go b/common/txmgr/tracker.go index 1a24dd5b5fe..3ef2fc07208 100644 --- a/common/txmgr/tracker.go +++ b/common/txmgr/tracker.go @@ -8,11 +8,11 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" - - "github.com/smartcontractkit/chainlink/v2/core/utils" ) const ( @@ -56,7 +56,7 @@ type Tracker[ txCache map[int64]AbandonedTx[ADDR] ttl time.Duration lock sync.Mutex - mb *utils.Mailbox[int64] + mb *mailbox.Mailbox[int64] wg sync.WaitGroup isStarted bool ctx context.Context @@ -85,7 +85,7 @@ func NewTracker[ enabledAddrs: map[ADDR]bool{}, txCache: map[int64]AbandonedTx[ADDR]{}, ttl: defaultTTL, - mb: utils.NewSingleMailbox[int64](), + mb: mailbox.NewSingle[int64](), lock: sync.Mutex{}, wg: sync.WaitGroup{}, } diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 228ab4ec8bf..e43a16b29ef 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -14,10 +14,12 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils" + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) // For more information about the Txm architecture, see the design doc: @@ -342,7 +344,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() ctx, cancel := b.chStop.NewCtx() defer cancel() // Retry indefinitely on failure - backoff := utils.NewRedialBackoff() + backoff := iutils.NewRedialBackoff() for { select { case <-time.After(backoff.Duration()): @@ -361,7 +363,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() go func() { defer wg.Done() // Retry indefinitely on failure - backoff := utils.NewRedialBackoff() + backoff := iutils.NewRedialBackoff() for { select { case <-time.After(backoff.Duration()): diff --git a/common/txmgr/types/tx.go b/common/txmgr/types/tx.go index 3af43b19617..caac763fc0f 100644 --- a/common/txmgr/types/tx.go +++ b/common/txmgr/types/tx.go @@ -15,9 +15,10 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + clnull "github.com/smartcontractkit/chainlink-common/pkg/utils/null" + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" "github.com/smartcontractkit/chainlink/v2/common/types" - clnull "github.com/smartcontractkit/chainlink/v2/core/null" ) // TxStrategy controls how txes are queued and sent diff --git a/core/chains/evm/gas/block_history_estimator.go b/core/chains/evm/gas/block_history_estimator.go index 0ec4721b797..844b9e547f2 100644 --- a/core/chains/evm/gas/block_history_estimator.go +++ b/core/chains/evm/gas/block_history_estimator.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil" "github.com/smartcontractkit/chainlink/v2/common/config" @@ -25,7 +26,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) // MaxStartTime is the maximum amount of time we are allowed to spend @@ -109,7 +109,7 @@ type ( blocks []evmtypes.Block blocksMu sync.RWMutex size int64 - mb *utils.Mailbox[*evmtypes.Head] + mb *mailbox.Mailbox[*evmtypes.Head] wg *sync.WaitGroup ctx context.Context ctxCancel context.CancelFunc @@ -139,7 +139,7 @@ func NewBlockHistoryEstimator(lggr logger.Logger, ethClient evmclient.Client, cf blocks: make([]evmtypes.Block, 0), // Must have enough blocks for both estimator and connectivity checker size: int64(mathutil.Max(bhCfg.BlockHistorySize(), bhCfg.CheckInclusionBlocks())), - mb: utils.NewSingleMailbox[*evmtypes.Head](), + mb: mailbox.NewSingle[*evmtypes.Head](), wg: new(sync.WaitGroup), ctx: ctx, ctxCancel: cancel, diff --git a/core/chains/evm/headtracker/head_broadcaster_test.go b/core/chains/evm/headtracker/head_broadcaster_test.go index ac43c08fe87..b9fab9cdd48 100644 --- a/core/chains/evm/headtracker/head_broadcaster_test.go +++ b/core/chains/evm/headtracker/head_broadcaster_test.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" commonhtrk "github.com/smartcontractkit/chainlink/v2/common/headtracker" commonmocks "github.com/smartcontractkit/chainlink/v2/common/types/mocks" @@ -71,7 +72,7 @@ func TestHeadBroadcaster_Subscribe(t *testing.T) { orm := headtracker.NewORM(db, logger, cfg.Database(), *ethClient.ConfiguredChainID()) hs := headtracker.NewHeadSaver(logger, orm, evmCfg.EVM(), evmCfg.EVM().HeadTracker()) - mailMon := utils.NewMailboxMonitor(t.Name()) + mailMon := mailbox.NewMonitor(t.Name()) servicetest.Run(t, mailMon) hb := headtracker.NewHeadBroadcaster(logger) servicetest.Run(t, hb) diff --git a/core/chains/evm/headtracker/head_tracker.go b/core/chains/evm/headtracker/head_tracker.go index b86a6b5fe22..3cddfb71d09 100644 --- a/core/chains/evm/headtracker/head_tracker.go +++ b/core/chains/evm/headtracker/head_tracker.go @@ -9,12 +9,13 @@ import ( "go.uber.org/zap/zapcore" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + "github.com/smartcontractkit/chainlink/v2/common/headtracker" commontypes "github.com/smartcontractkit/chainlink/v2/common/types" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) type headTracker = headtracker.HeadTracker[*evmtypes.Head, ethereum.Subscription, *big.Int, common.Hash] @@ -28,7 +29,7 @@ func NewHeadTracker( htConfig HeadTrackerConfig, headBroadcaster httypes.HeadBroadcaster, headSaver httypes.HeadSaver, - mailMon *utils.MailboxMonitor, + mailMon *mailbox.Monitor, ) httypes.HeadTracker { return headtracker.NewHeadTracker[*evmtypes.Head, ethereum.Subscription, *big.Int, common.Hash]( lggr, diff --git a/core/chains/evm/headtracker/head_tracker_test.go b/core/chains/evm/headtracker/head_tracker_test.go index 4d3cebd24e2..d8abb1328ac 100644 --- a/core/chains/evm/headtracker/head_tracker_test.go +++ b/core/chains/evm/headtracker/head_tracker_test.go @@ -22,6 +22,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" commonmocks "github.com/smartcontractkit/chainlink/v2/common/types/mocks" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -992,7 +993,7 @@ func createHeadTracker(t *testing.T, ethClient evmclient.Client, config headtrac lggr := logger.Test(t) hb := headtracker.NewHeadBroadcaster(lggr) hs := headtracker.NewHeadSaver(lggr, orm, config, htConfig) - mailMon := utils.NewMailboxMonitor(t.Name()) + mailMon := mailbox.NewMonitor(t.Name()) return &headTrackerUniverse{ mu: new(sync.Mutex), headTracker: headtracker.NewHeadTracker(lggr, ethClient, config, htConfig, hb, hs, mailMon), @@ -1007,7 +1008,7 @@ func createHeadTrackerWithNeverSleeper(t *testing.T, ethClient evmclient.Client, lggr := logger.Test(t) hb := headtracker.NewHeadBroadcaster(lggr) hs := headtracker.NewHeadSaver(lggr, orm, evmcfg.EVM(), evmcfg.EVM().HeadTracker()) - mailMon := utils.NewMailboxMonitor(t.Name()) + mailMon := mailbox.NewMonitor(t.Name()) ht := headtracker.NewHeadTracker(lggr, ethClient, evmcfg.EVM(), evmcfg.EVM().HeadTracker(), hb, hs, mailMon) _, err := hs.Load(testutils.Context(t)) require.NoError(t, err) @@ -1025,7 +1026,7 @@ func createHeadTrackerWithChecker(t *testing.T, ethClient evmclient.Client, conf hb := headtracker.NewHeadBroadcaster(lggr) hs := headtracker.NewHeadSaver(lggr, orm, config, htConfig) hb.Subscribe(checker) - mailMon := utils.NewMailboxMonitor(t.Name()) + mailMon := mailbox.NewMonitor(t.Name()) ht := headtracker.NewHeadTracker(lggr, ethClient, config, htConfig, hb, hs, mailMon) return &headTrackerUniverse{ mu: new(sync.Mutex), @@ -1042,7 +1043,7 @@ type headTrackerUniverse struct { headTracker httypes.HeadTracker headBroadcaster httypes.HeadBroadcaster headSaver httypes.HeadSaver - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor } func (u *headTrackerUniverse) Backfill(ctx context.Context, head *evmtypes.Head, depth uint) error { diff --git a/core/chains/evm/log/broadcaster.go b/core/chains/evm/log/broadcaster.go index f4528396093..393d1c1b266 100644 --- a/core/chains/evm/log/broadcaster.go +++ b/core/chains/evm/log/broadcaster.go @@ -14,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" @@ -102,11 +103,11 @@ type ( registrations *registrations logPool *logPool - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor // Use the same channel for subs/unsubs so ordering is preserved // (unsubscribe must happen after subscribe) - changeSubscriberStatus *utils.Mailbox[changeSubscriberStatus] - newHeads *utils.Mailbox[*evmtypes.Head] + changeSubscriberStatus *mailbox.Mailbox[changeSubscriberStatus] + newHeads *mailbox.Mailbox[*evmtypes.Head] utils.DependentAwaiter @@ -165,7 +166,7 @@ const ( var _ Broadcaster = (*broadcaster)(nil) // NewBroadcaster creates a new instance of the broadcaster -func NewBroadcaster(orm ORM, ethClient evmclient.Client, config Config, lggr logger.Logger, highestSavedHead *evmtypes.Head, mailMon *utils.MailboxMonitor) *broadcaster { +func NewBroadcaster(orm ORM, ethClient evmclient.Client, config Config, lggr logger.Logger, highestSavedHead *evmtypes.Head, mailMon *mailbox.Monitor) *broadcaster { chStop := make(chan struct{}) lggr = logger.Named(lggr, "LogBroadcaster") chainId := ethClient.ConfiguredChainID() @@ -178,8 +179,8 @@ func NewBroadcaster(orm ORM, ethClient evmclient.Client, config Config, lggr log registrations: newRegistrations(lggr, *chainId), logPool: newLogPool(lggr), mailMon: mailMon, - changeSubscriberStatus: utils.NewHighCapacityMailbox[changeSubscriberStatus](), - newHeads: utils.NewSingleMailbox[*evmtypes.Head](), + changeSubscriberStatus: mailbox.NewHighCapacity[changeSubscriberStatus](), + newHeads: mailbox.NewSingle[*evmtypes.Head](), DependentAwaiter: utils.NewDependentAwaiter(), chStop: chStop, highestSavedHead: highestSavedHead, diff --git a/core/chains/evm/log/helpers_internal_test.go b/core/chains/evm/log/helpers_internal_test.go index 38f40bd329e..4d4318cdf9d 100644 --- a/core/chains/evm/log/helpers_internal_test.go +++ b/core/chains/evm/log/helpers_internal_test.go @@ -4,13 +4,14 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) // NewTestBroadcaster creates a broadcaster with Pause/Resume enabled. -func NewTestBroadcaster(orm ORM, ethClient evmclient.Client, config Config, lggr logger.Logger, highestSavedHead *evmtypes.Head, mailMon *utils.MailboxMonitor) *broadcaster { +func NewTestBroadcaster(orm ORM, ethClient evmclient.Client, config Config, lggr logger.Logger, highestSavedHead *evmtypes.Head, mailMon *mailbox.Monitor) *broadcaster { b := NewBroadcaster(orm, ethClient, config, lggr, highestSavedHead, mailMon) b.testPause, b.testResume = make(chan struct{}), make(chan struct{}) return b diff --git a/core/chains/evm/log/helpers_test.go b/core/chains/evm/log/helpers_test.go index e41f08e8d20..13dfe1ffab6 100644 --- a/core/chains/evm/log/helpers_test.go +++ b/core/chains/evm/log/helpers_test.go @@ -22,6 +22,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" @@ -42,7 +43,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) type broadcasterHelper struct { @@ -90,7 +90,7 @@ func newBroadcasterHelperWithEthClient(t *testing.T, ethClient evmclient.Client, }) config := evmtest.NewChainScopedConfig(t, globalConfig) lggr := logger.Test(t) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) db := pgtest.NewSqlxDB(t) orm := log.NewORM(db, lggr, config.Database(), cltest.FixtureChainID) diff --git a/core/chains/evm/log/integration_test.go b/core/chains/evm/log/integration_test.go index e5b6ad3caf5..b26e87e668c 100644 --- a/core/chains/evm/log/integration_test.go +++ b/core/chains/evm/log/integration_test.go @@ -18,6 +18,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log" @@ -1325,7 +1326,7 @@ func TestBroadcaster_AppendLogChannel(t *testing.T) { ch3 := make(chan types.Log) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - mailMon := servicetest.RunHealthy(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.RunHealthy(t, mailbox.NewMonitor(t.Name())) lb := log.NewBroadcaster(nil, ethClient, nil, logger.Test(t), nil, mailMon) chCombined := lb.ExportedAppendLogChannel(ch1, ch2) chCombined = lb.ExportedAppendLogChannel(chCombined, ch3) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 730809e8dda..1c9868741fa 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -20,13 +20,14 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink-common/pkg/utils/null" + "github.com/smartcontractkit/chainlink/v2/common/txmgr" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/label" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/null" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) diff --git a/core/chains/legacyevm/chain.go b/core/chains/legacyevm/chain.go index 4b4c69f1ab6..18277a55d03 100644 --- a/core/chains/legacyevm/chain.go +++ b/core/chains/legacyevm/chain.go @@ -16,6 +16,7 @@ import ( common "github.com/smartcontractkit/chainlink-common/pkg/chains" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" commonclient "github.com/smartcontractkit/chainlink/v2/common/client" commonconfig "github.com/smartcontractkit/chainlink/v2/common/config" @@ -164,7 +165,7 @@ type ChainOpts struct { AppConfig AppConfig EventBroadcaster pg.EventBroadcaster - MailMon *utils.MailboxMonitor + MailMon *mailbox.Monitor GasEstimator gas.EvmFeeEstimator *sqlx.DB diff --git a/core/chains/legacyevm/chain_test.go b/core/chains/legacyevm/chain_test.go index 4fcd51c39d9..93332348aa0 100644 --- a/core/chains/legacyevm/chain_test.go +++ b/core/chains/legacyevm/chain_test.go @@ -8,12 +8,13 @@ import ( "github.com/jmoiron/sqlx" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/services/pg" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) func TestLegacyChains(t *testing.T) { @@ -35,7 +36,7 @@ func TestChainOpts_Validate(t *testing.T) { type fields struct { AppConfig legacyevm.AppConfig EventBroadcaster pg.EventBroadcaster - MailMon *utils.MailboxMonitor + MailMon *mailbox.Monitor DB *sqlx.DB } tests := []struct { @@ -48,7 +49,7 @@ func TestChainOpts_Validate(t *testing.T) { fields: fields{ AppConfig: configtest.NewTestGeneralConfig(t), EventBroadcaster: pg.NewNullEventBroadcaster(), - MailMon: &utils.MailboxMonitor{}, + MailMon: &mailbox.Monitor{}, DB: pgtest.NewSqlxDB(t), }, }, diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 2e382be4ccf..3810559cf34 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -32,6 +32,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/build" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -153,7 +154,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G } keyStore := keystore.New(db, utils.GetScryptParams(cfg), appLggr, cfg.Database()) - mailMon := utils.NewMailboxMonitor(cfg.AppID().String()) + mailMon := mailbox.NewMonitor(cfg.AppID().String()) dbListener := cfg.Database().Listener() eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), dbListener.MinReconnectInterval(), dbListener.MaxReconnectDuration(), appLggr, cfg.AppID()) diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index fac2d7f040b..56da90d811c 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + "github.com/smartcontractkit/chainlink/v2/common/client" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/cmd" @@ -89,7 +91,7 @@ func TestShell_RunNodeWithPasswords(t *testing.T) { ChainOpts: legacyevm.ChainOpts{ AppConfig: cfg, EventBroadcaster: pg.NewNullEventBroadcaster(), - MailMon: &utils.MailboxMonitor{}, + MailMon: &mailbox.Monitor{}, DB: db, }, } @@ -194,7 +196,7 @@ func TestShell_RunNodeWithAPICredentialsFile(t *testing.T) { ChainOpts: legacyevm.ChainOpts{ AppConfig: cfg, EventBroadcaster: pg.NewNullEventBroadcaster(), - MailMon: &utils.MailboxMonitor{}, + MailMon: &mailbox.Monitor{}, DB: db, }, } diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 88abc3de5c6..dc90201890d 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -38,6 +38,7 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting/types" "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/common/client" commonmocks "github.com/smartcontractkit/chainlink/v2/common/types/mocks" @@ -339,7 +340,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn keyStore := keystore.NewInMemory(db, utils.FastScryptParams, lggr, cfg.Database()) - mailMon := utils.NewMailboxMonitor(cfg.AppID().String()) + mailMon := mailbox.NewMonitor(cfg.AppID().String()) loopRegistry := plugins.NewLoopRegistry(lggr, nil) mercuryPool := wsrpc.NewPool(lggr, cache.Config{ diff --git a/core/internal/testutils/evmtest/evmtest.go b/core/internal/testutils/evmtest/evmtest.go index e0a447a3274..095ea1a35c9 100644 --- a/core/internal/testutils/evmtest/evmtest.go +++ b/core/internal/testutils/evmtest/evmtest.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" commonmocks "github.com/smartcontractkit/chainlink/v2/common/types/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains" @@ -65,7 +66,7 @@ type TestChainOpts struct { DB *sqlx.DB TxManager txmgr.TxManager KeyStore keystore.Eth - MailMon *utils.MailboxMonitor + MailMon *mailbox.Monitor GasEstimator gas.EvmFeeEstimator } @@ -118,7 +119,7 @@ func NewChainRelayExtOpts(t testing.TB, testopts TestChainOpts) legacyevm.ChainR } } if opts.MailMon == nil { - opts.MailMon = servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + opts.MailMon = servicetest.Run(t, mailbox.NewMonitor(t.Name())) } if testopts.GasEstimator != nil { opts.GenGasEstimator = func(*big.Int) gas.EvmFeeEstimator { diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 5c204d693e9..ed043086586 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -20,6 +20,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/static" "github.com/smartcontractkit/chainlink/v2/core/bridges" @@ -149,7 +150,7 @@ type ApplicationOpts struct { Config GeneralConfig Logger logger.Logger EventBroadcaster pg.EventBroadcaster - MailMon *utils.MailboxMonitor + MailMon *mailbox.Monitor SqlxDB *sqlx.DB KeyStore keystore.Master RelayerChainInteroperators *CoreRelayerChainInteroperators diff --git a/core/services/chainlink/relayer_chain_interoperators_test.go b/core/services/chainlink/relayer_chain_interoperators_test.go index 6a5445d9f21..a0754fa0139 100644 --- a/core/services/chainlink/relayer_chain_interoperators_test.go +++ b/core/services/chainlink/relayer_chain_interoperators_test.go @@ -11,6 +11,8 @@ import ( commoncfg "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + coscfg "github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/config" solcfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" stkcfg "github.com/smartcontractkit/chainlink-starknet/relayer/pkg/chainlink/config" @@ -206,7 +208,7 @@ func TestCoreRelayerChainInteroperators(t *testing.T) { ChainOpts: legacyevm.ChainOpts{ AppConfig: cfg, EventBroadcaster: pg.NewNullEventBroadcaster(), - MailMon: &utils.MailboxMonitor{}, + MailMon: &mailbox.Monitor{}, DB: db, }, CSAETHKeystore: keyStore, @@ -280,7 +282,7 @@ func TestCoreRelayerChainInteroperators(t *testing.T) { ChainOpts: legacyevm.ChainOpts{ AppConfig: cfg, EventBroadcaster: pg.NewNullEventBroadcaster(), - MailMon: &utils.MailboxMonitor{}, + MailMon: &mailbox.Monitor{}, DB: db, }, CSAETHKeystore: keyStore, diff --git a/core/services/directrequest/delegate.go b/core/services/directrequest/delegate.go index a21029ea177..cfdf1eed116 100644 --- a/core/services/directrequest/delegate.go +++ b/core/services/directrequest/delegate.go @@ -11,6 +11,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/assets" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -21,7 +22,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/store/models" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) type ( @@ -31,7 +31,7 @@ type ( pipelineORM pipeline.ORM chHeads chan *evmtypes.Head legacyChains legacyevm.LegacyChainContainer - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor } Config interface { @@ -47,7 +47,7 @@ func NewDelegate( pipelineRunner pipeline.Runner, pipelineORM pipeline.ORM, legacyChains legacyevm.LegacyChainContainer, - mailMon *utils.MailboxMonitor, + mailMon *mailbox.Monitor, ) *Delegate { return &Delegate{ logger: logger.Named("DirectRequest"), @@ -101,8 +101,8 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { pipelineORM: d.pipelineORM, mailMon: d.mailMon, job: jb, - mbOracleRequests: utils.NewHighCapacityMailbox[log.Broadcast](), - mbOracleCancelRequests: utils.NewHighCapacityMailbox[log.Broadcast](), + mbOracleRequests: mailbox.NewHighCapacity[log.Broadcast](), + mbOracleCancelRequests: mailbox.NewHighCapacity[log.Broadcast](), minIncomingConfirmations: concreteSpec.MinIncomingConfirmations.Uint32, requesters: concreteSpec.Requesters, minContractPayment: concreteSpec.MinContractPayment, @@ -127,12 +127,12 @@ type listener struct { oracle operator_wrapper.OperatorInterface pipelineRunner pipeline.Runner pipelineORM pipeline.ORM - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor job job.Job runs sync.Map // map[string]services.StopChan shutdownWaitGroup sync.WaitGroup - mbOracleRequests *utils.Mailbox[log.Broadcast] - mbOracleCancelRequests *utils.Mailbox[log.Broadcast] + mbOracleRequests *mailbox.Mailbox[log.Broadcast] + mbOracleCancelRequests *mailbox.Mailbox[log.Broadcast] minIncomingConfirmations uint32 requesters models.AddressCollection minContractPayment *assets.Link @@ -238,7 +238,7 @@ func (l *listener) processCancelOracleRequests() { } } -func (l *listener) handleReceivedLogs(mailbox *utils.Mailbox[log.Broadcast]) { +func (l *listener) handleReceivedLogs(mailbox *mailbox.Mailbox[log.Broadcast]) { for { select { case <-l.chStop: diff --git a/core/services/directrequest/delegate_test.go b/core/services/directrequest/delegate_test.go index 2d5b9bef03b..865edb1b481 100644 --- a/core/services/directrequest/delegate_test.go +++ b/core/services/directrequest/delegate_test.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/assets" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log" @@ -44,7 +45,7 @@ func TestDelegate_ServicesForSpec(t *testing.T) { c.EVM[0].MinIncomingConfirmations = ptr[uint32](1) }) keyStore := cltest.NewKeyStore(t, db, cfg.Database()) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) relayerExtenders := evmtest.NewChainRelayExtenders(t, evmtest.TestChainOpts{DB: db, GeneralConfig: cfg, Client: ethClient, MailMon: mailMon, KeyStore: keyStore.Eth()}) lggr := logger.TestLogger(t) @@ -81,7 +82,7 @@ func NewDirectRequestUniverseWithConfig(t *testing.T, cfg chainlink.GeneralConfi runner := pipeline_mocks.NewRunner(t) broadcaster.On("AddDependents", 1) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) db := pgtest.NewSqlxDB(t) keyStore := cltest.NewKeyStore(t, db, cfg.Database()) diff --git a/core/services/functions/listener_test.go b/core/services/functions/listener_test.go index 5020537bf61..07bd82ed288 100644 --- a/core/services/functions/listener_test.go +++ b/core/services/functions/listener_test.go @@ -20,6 +20,7 @@ import ( decryptionPlugin "github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" log_mocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" @@ -44,7 +45,6 @@ import ( sync_mocks "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) type FunctionsListenerUniverse struct { @@ -82,7 +82,7 @@ func NewFunctionsListenerUniverse(t *testing.T, timeoutSec int, pruneFrequencySe ethClient := evmtest.NewEthClientMockWithDefaultChain(t) broadcaster := log_mocks.NewBroadcaster(t) broadcaster.On("AddDependents", 1) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) db := pgtest.NewSqlxDB(t) kst := cltest.NewKeyStore(t, db, cfg.Database()) diff --git a/core/services/job/runner_integration_test.go b/core/services/job/runner_integration_test.go index 0223f1a10d5..14a5c41b396 100644 --- a/core/services/job/runner_integration_test.go +++ b/core/services/job/runner_integration_test.go @@ -24,6 +24,7 @@ import ( "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/auth" "github.com/smartcontractkit/chainlink/v2/core/bridges" @@ -45,7 +46,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" "github.com/smartcontractkit/chainlink/v2/core/store/models" - "github.com/smartcontractkit/chainlink/v2/core/utils" "github.com/smartcontractkit/chainlink/v2/core/web" ) @@ -462,7 +462,7 @@ answer1 [type=median index=0]; legacyChains, lggr, config.Database(), - servicetest.Run(t, utils.NewMailboxMonitor(t.Name())), + servicetest.Run(t, mailbox.NewMonitor(t.Name())), ) _, err = sd.ServicesForSpec(jb) require.NoError(t, err) @@ -496,7 +496,7 @@ answer1 [type=median index=0]; legacyChains, lggr, config.Database(), - servicetest.Run(t, utils.NewMailboxMonitor(t.Name())), + servicetest.Run(t, mailbox.NewMonitor(t.Name())), ) _, err = sd.ServicesForSpec(jb) require.NoError(t, err) @@ -524,7 +524,7 @@ answer1 [type=median index=0]; legacyChains, lggr, config.Database(), - servicetest.Run(t, utils.NewMailboxMonitor(t.Name())), + servicetest.Run(t, mailbox.NewMonitor(t.Name())), ) _, err = sd.ServicesForSpec(jb) require.NoError(t, err) @@ -579,7 +579,7 @@ answer1 [type=median index=0]; legacyChains, lggr, config.Database(), - servicetest.Run(t, utils.NewMailboxMonitor(t.Name())), + servicetest.Run(t, mailbox.NewMonitor(t.Name())), ) jb.OCROracleSpec.CaptureEATelemetry = tc.jbCaptureEATelemetry @@ -623,7 +623,7 @@ answer1 [type=median index=0]; legacyChains, lggr, config.Database(), - servicetest.Run(t, utils.NewMailboxMonitor(t.Name())), + servicetest.Run(t, mailbox.NewMonitor(t.Name())), ) services, err := sd.ServicesForSpec(*jb) require.NoError(t, err) diff --git a/core/services/job/spawner_test.go b/core/services/job/spawner_test.go index d639ce859af..b82aa73c0b5 100644 --- a/core/services/job/spawner_test.go +++ b/core/services/job/spawner_test.go @@ -14,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/bridges" mocklp "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" @@ -128,7 +129,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { serviceA2 := mocks.NewServiceCtx(t) serviceA1.On("Start", mock.Anything).Return(nil).Once() serviceA2.On("Start", mock.Anything).Return(nil).Once().Run(func(mock.Arguments) { eventuallyA.ItHappened() }) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) dA := ocr.NewDelegate(nil, orm, nil, nil, nil, monitoringEndpoint, legacyChains, logger.TestLogger(t), config.Database(), mailMon) delegateA := &delegate{jobA.Type, []job.ServiceCtx{serviceA1, serviceA2}, 0, make(chan struct{}), dA} @@ -187,7 +188,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { lggr := logger.TestLogger(t) orm := NewTestORM(t, db, pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns()), bridges.NewORM(db, lggr, config.Database()), keyStore, config.Database()) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) d := ocr.NewDelegate(nil, orm, nil, nil, nil, monitoringEndpoint, legacyChains, logger.TestLogger(t), config.Database(), mailMon) delegateA := &delegate{jobA.Type, []job.ServiceCtx{serviceA1, serviceA2}, 0, nil, d} spawner := job.NewSpawner(orm, config.Database(), noopChecker{}, map[job.Type]job.Delegate{ @@ -221,7 +222,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { lggr := logger.TestLogger(t) orm := NewTestORM(t, db, pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns()), bridges.NewORM(db, lggr, config.Database()), keyStore, config.Database()) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) d := ocr.NewDelegate(nil, orm, nil, nil, nil, monitoringEndpoint, legacyChains, logger.TestLogger(t), config.Database(), mailMon) delegateA := &delegate{jobA.Type, []job.ServiceCtx{serviceA1, serviceA2}, 0, nil, d} spawner := job.NewSpawner(orm, config.Database(), noopChecker{}, map[job.Type]job.Delegate{ @@ -299,7 +300,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { jobOCR2VRF := makeOCR2VRFJobSpec(t, keyStore, config, address, chain.ID(), 2) orm := NewTestORM(t, db, pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns()), bridges.NewORM(db, lggr, config.Database()), keyStore, config.Database()) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return nil, nil }) ocr2DelegateConfig := ocr2.NewDelegateConfig(config.OCR2(), config.Mercury(), config.Threshold(), config.Insecure(), config.JobPipeline(), config.Database(), processConfig) diff --git a/core/services/keeper/delegate.go b/core/services/keeper/delegate.go index 0dbf584c56f..4418bea670a 100644 --- a/core/services/keeper/delegate.go +++ b/core/services/keeper/delegate.go @@ -5,12 +5,12 @@ import ( "github.com/jmoiron/sqlx" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) // To make sure Delegate struct implements job.Delegate interface @@ -22,7 +22,7 @@ type Delegate struct { jrm job.ORM pr pipeline.Runner legacyChains legacyevm.LegacyChainContainer - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor } // NewDelegate is the constructor of Delegate @@ -32,7 +32,7 @@ func NewDelegate( pr pipeline.Runner, logger logger.Logger, legacyChains legacyevm.LegacyChainContainer, - mailMon *utils.MailboxMonitor, + mailMon *mailbox.Monitor, ) *Delegate { return &Delegate{ logger: logger, diff --git a/core/services/keeper/registry_synchronizer_core.go b/core/services/keeper/registry_synchronizer_core.go index db7cca1763f..f26c38fc2e1 100644 --- a/core/services/keeper/registry_synchronizer_core.go +++ b/core/services/keeper/registry_synchronizer_core.go @@ -10,6 +10,8 @@ import ( "github.com/pkg/errors" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" @@ -28,7 +30,7 @@ type RegistrySynchronizerOptions struct { ORM ORM JRM job.ORM LogBroadcaster log.Broadcaster - MailMon *utils.MailboxMonitor + MailMon *mailbox.Monitor SyncInterval time.Duration MinIncomingConfirmations uint32 Logger logger.Logger @@ -44,14 +46,14 @@ type RegistrySynchronizer struct { job job.Job jrm job.ORM logBroadcaster log.Broadcaster - mbLogs *utils.Mailbox[log.Broadcast] + mbLogs *mailbox.Mailbox[log.Broadcast] minIncomingConfirmations uint32 effectiveKeeperAddress common.Address orm ORM logger logger.SugaredLogger wgDone sync.WaitGroup syncUpkeepQueueSize uint32 //Represents the max number of upkeeps that can be synced in parallel - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor } // NewRegistrySynchronizer is the constructor of RegistrySynchronizer @@ -63,7 +65,7 @@ func NewRegistrySynchronizer(opts RegistrySynchronizerOptions) *RegistrySynchron job: opts.Job, jrm: opts.JRM, logBroadcaster: opts.LogBroadcaster, - mbLogs: utils.NewMailbox[log.Broadcast](5_000), // Arbitrary limit, better to have excess capacity + mbLogs: mailbox.New[log.Broadcast](5_000), // Arbitrary limit, better to have excess capacity minIncomingConfirmations: opts.MinIncomingConfirmations, orm: opts.ORM, effectiveKeeperAddress: opts.EffectiveKeeperAddress, diff --git a/core/services/keeper/registry_synchronizer_helper_test.go b/core/services/keeper/registry_synchronizer_helper_test.go index 5ba60db3962..dff97202f6c 100644 --- a/core/services/keeper/registry_synchronizer_helper_test.go +++ b/core/services/keeper/registry_synchronizer_helper_test.go @@ -11,6 +11,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log" @@ -23,7 +24,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keeper" evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) const syncInterval = 1000 * time.Hour // prevents sync timer from triggering during test @@ -73,7 +73,7 @@ func setupRegistrySync(t *testing.T, version keeper.RegistryVersion) ( })).Maybe().Return(func() {}) lbMock.On("IsConnected").Return(true).Maybe() - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) orm := keeper.NewORM(db, logger.TestLogger(t), ch.Config().Database()) synchronizer := keeper.NewRegistrySynchronizer(keeper.RegistrySynchronizerOptions{ diff --git a/core/services/keeper/upkeep_executer.go b/core/services/keeper/upkeep_executer.go index 84349ba2dca..bab2f73edfc 100644 --- a/core/services/keeper/upkeep_executer.go +++ b/core/services/keeper/upkeep_executer.go @@ -13,6 +13,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" @@ -23,7 +25,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) const ( @@ -62,7 +63,7 @@ type UpkeepExecuter struct { headBroadcaster httypes.HeadBroadcasterRegistry gasEstimator gas.EvmFeeEstimator job job.Job - mailbox *utils.Mailbox[*evmtypes.Head] + mailbox *mailbox.Mailbox[*evmtypes.Head] orm ORM pr pipeline.Runner logger logger.Logger @@ -89,7 +90,7 @@ func NewUpkeepExecuter( headBroadcaster: headBroadcaster, gasEstimator: gasEstimator, job: job, - mailbox: utils.NewSingleMailbox[*evmtypes.Head](), + mailbox: mailbox.NewSingle[*evmtypes.Head](), config: config, orm: orm, pr: pr, diff --git a/core/services/ocr/contract_tracker.go b/core/services/ocr/contract_tracker.go index 4f79bcfc31a..1287e52e9b5 100644 --- a/core/services/ocr/contract_tracker.go +++ b/core/services/ocr/contract_tracker.go @@ -21,6 +21,7 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting/types" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/common/config" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -31,7 +32,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pg" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) // configMailboxSanityLimit is the maximum number of configs that can be held @@ -67,7 +67,7 @@ type ( q pg.Q blockTranslator ocrcommon.BlockTranslator cfg ocrcommon.Config - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor // HeadBroadcaster headBroadcaster httypes.HeadBroadcaster @@ -83,7 +83,7 @@ type ( lrrMu sync.RWMutex // ContractConfig - configsMB *utils.Mailbox[ocrtypes.ContractConfig] + configsMB *mailbox.Mailbox[ocrtypes.ContractConfig] chConfigs chan ocrtypes.ContractConfig // LatestBlockHeight @@ -117,7 +117,7 @@ func NewOCRContractTracker( cfg ocrcommon.Config, q pg.QConfig, headBroadcaster httypes.HeadBroadcaster, - mailMon *utils.MailboxMonitor, + mailMon *mailbox.Monitor, ) (o *OCRContractTracker) { logger = logger.Named("OCRContractTracker") return &OCRContractTracker{ @@ -136,7 +136,7 @@ func NewOCRContractTracker( headBroadcaster: headBroadcaster, chStop: make(services.StopChan), latestRoundRequested: offchainaggregator.OffchainAggregatorRoundRequested{}, - configsMB: utils.NewMailbox[ocrtypes.ContractConfig](configMailboxSanityLimit), + configsMB: mailbox.New[ocrtypes.ContractConfig](configMailboxSanityLimit), chConfigs: make(chan ocrtypes.ContractConfig), latestBlockHeight: -1, } diff --git a/core/services/ocr/contract_tracker_test.go b/core/services/ocr/contract_tracker_test.go index af65f330d66..f7ebbe08481 100644 --- a/core/services/ocr/contract_tracker_test.go +++ b/core/services/ocr/contract_tracker_test.go @@ -16,6 +16,7 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting/types" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" commonmocks "github.com/smartcontractkit/chainlink/v2/common/mocks" evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" @@ -31,7 +32,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr" ocrmocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr/mocks" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) func mustNewContract(t *testing.T, address gethCommon.Address) *offchain_aggregator_wrapper.OffchainAggregator { @@ -84,7 +84,7 @@ func newContractTrackerUni(t *testing.T, opts ...interface{}) (uni contractTrack uni.hb = commonmocks.NewHeadBroadcaster[*evmtypes.Head, common.Hash](t) uni.ec = evmtest.NewEthClientMock(t) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) db := pgtest.NewSqlxDB(t) uni.tracker = ocr.NewOCRContractTracker( contract, diff --git a/core/services/ocr/delegate.go b/core/services/ocr/delegate.go index d3d133e7121..0eed680a3d8 100644 --- a/core/services/ocr/delegate.go +++ b/core/services/ocr/delegate.go @@ -12,6 +12,7 @@ import ( "github.com/jmoiron/sqlx" commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/libocr/gethwrappers/offchainaggregator" ocr "github.com/smartcontractkit/libocr/offchainreporting" @@ -43,7 +44,7 @@ type Delegate struct { legacyChains legacyevm.LegacyChainContainer lggr logger.Logger cfg Config - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor } var _ job.Delegate = (*Delegate)(nil) @@ -60,7 +61,7 @@ func NewDelegate( legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger, cfg Config, - mailMon *utils.MailboxMonitor, + mailMon *mailbox.Monitor, ) *Delegate { return &Delegate{ db: db, diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 16f02282afb..5200866e3af 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -34,6 +34,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -70,7 +71,6 @@ import ( evmrelaytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" - "github.com/smartcontractkit/chainlink/v2/core/utils" "github.com/smartcontractkit/chainlink/v2/plugins" ) @@ -116,7 +116,7 @@ type Delegate struct { ethKs keystore.Eth RelayGetter isNewlyCreatedJob bool // Set to true if this is a new job freshly added, false if job was present already on node boot. - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor legacyChains legacyevm.LegacyChainContainer // legacy: use relayers instead } @@ -226,7 +226,7 @@ func NewDelegate( dkgEncryptKs keystore.DKGEncrypt, ethKs keystore.Eth, relayers RelayGetter, - mailMon *utils.MailboxMonitor, + mailMon *mailbox.Monitor, eventBroadcaster pg.EventBroadcaster, ) *Delegate { return &Delegate{ diff --git a/core/services/ocr2/plugins/functions/plugin.go b/core/services/ocr2/plugins/functions/plugin.go index 61fa7f5d825..7e2b15bdccf 100644 --- a/core/services/ocr2/plugins/functions/plugin.go +++ b/core/services/ocr2/plugins/functions/plugin.go @@ -14,6 +14,7 @@ import ( libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" "github.com/smartcontractkit/chainlink-common/pkg/assets" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -43,7 +44,7 @@ type FunctionsServicesConfig struct { Chain legacyevm.Chain ContractID string Logger logger.Logger - MailMon *utils.MailboxMonitor + MailMon *mailbox.Monitor URLsMonEndpoint commontypes.MonitoringEndpoint EthKeystore keystore.Eth ThresholdKeyShare []byte diff --git a/core/services/pipeline/task.eth_tx.go b/core/services/pipeline/task.eth_tx.go index c421b340c91..58e9f6f2c15 100644 --- a/core/services/pipeline/task.eth_tx.go +++ b/core/services/pipeline/task.eth_tx.go @@ -13,11 +13,11 @@ import ( "go.uber.org/multierr" "gopkg.in/guregu/null.v4" + clnull "github.com/smartcontractkit/chainlink-common/pkg/utils/null" txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" - clnull "github.com/smartcontractkit/chainlink/v2/core/null" "github.com/smartcontractkit/chainlink/v2/core/utils" ) diff --git a/core/services/pipeline/task.eth_tx_test.go b/core/services/pipeline/task.eth_tx_test.go index a0ff54d4448..5f5019d1967 100644 --- a/core/services/pipeline/task.eth_tx_test.go +++ b/core/services/pipeline/task.eth_tx_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v4" + clnull "github.com/smartcontractkit/chainlink-common/pkg/utils/null" txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" "github.com/smartcontractkit/chainlink/v2/core/chains" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" @@ -19,7 +20,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" - clnull "github.com/smartcontractkit/chainlink/v2/core/null" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" keystoremocks "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" diff --git a/core/services/promreporter/prom_reporter.go b/core/services/promreporter/prom_reporter.go index 3e1444a6da1..a302a6fa220 100644 --- a/core/services/promreporter/prom_reporter.go +++ b/core/services/promreporter/prom_reporter.go @@ -17,10 +17,11 @@ import ( "go.uber.org/multierr" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) //go:generate mockery --quiet --name PrometheusBackend --output ../../internal/mocks/ --case=underscore @@ -31,7 +32,7 @@ type ( chains legacyevm.LegacyChainContainer lggr logger.Logger backend PrometheusBackend - newHeads *utils.Mailbox[*evmtypes.Head] + newHeads *mailbox.Mailbox[*evmtypes.Head] chStop services.StopChan wgDone sync.WaitGroup reportPeriod time.Duration @@ -109,7 +110,7 @@ func NewPromReporter(db *sql.DB, chainContainer legacyevm.LegacyChainContainer, chains: chainContainer, lggr: lggr.Named("PromReporter"), backend: backend, - newHeads: utils.NewSingleMailbox[*evmtypes.Head](), + newHeads: mailbox.NewSingle[*evmtypes.Head](), chStop: chStop, reportPeriod: period, } diff --git a/core/services/vrf/delegate.go b/core/services/vrf/delegate.go index a13df71d9a3..03e40614a10 100644 --- a/core/services/vrf/delegate.go +++ b/core/services/vrf/delegate.go @@ -12,6 +12,7 @@ import ( "github.com/jmoiron/sqlx" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -29,7 +30,6 @@ import ( v1 "github.com/smartcontractkit/chainlink/v2/core/services/vrf/v1" v2 "github.com/smartcontractkit/chainlink/v2/core/services/vrf/v2" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) type Delegate struct { @@ -39,7 +39,7 @@ type Delegate struct { ks keystore.Master legacyChains legacyevm.LegacyChainContainer lggr logger.Logger - mailMon *utils.MailboxMonitor + mailMon *mailbox.Monitor } func NewDelegate( @@ -50,7 +50,7 @@ func NewDelegate( legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger, cfg pg.QConfig, - mailMon *utils.MailboxMonitor) *Delegate { + mailMon *mailbox.Monitor) *Delegate { return &Delegate{ q: pg.NewQ(db, lggr, cfg), ks: ks, @@ -250,7 +250,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { MailMon: d.mailMon, // Note the mailbox size effectively sets a limit on how many logs we can replay // in the event of a VRF outage. - ReqLogs: utils.NewHighCapacityMailbox[log.Broadcast](), + ReqLogs: mailbox.NewHighCapacity[log.Broadcast](), ChStop: make(chan struct{}), WaitOnStop: make(chan struct{}), NewHead: make(chan struct{}, 1), diff --git a/core/services/vrf/delegate_test.go b/core/services/vrf/delegate_test.go index d957e3c721e..3b643d19b0b 100644 --- a/core/services/vrf/delegate_test.go +++ b/core/services/vrf/delegate_test.go @@ -16,6 +16,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" @@ -149,7 +150,7 @@ func setup(t *testing.T) (vrfUniverse, *v1.Listener, job.Job) { cfg := configtest.NewTestGeneralConfig(t) vuni := buildVrfUni(t, db, cfg) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) vd := vrf.NewDelegate( db, @@ -673,7 +674,7 @@ func Test_VRFV2PlusServiceFailsWhenVRFOwnerProvided(t *testing.T) { cfg := configtest.NewTestGeneralConfig(t) vuni := buildVrfUni(t, db, cfg) - mailMon := servicetest.Run(t, utils.NewMailboxMonitor(t.Name())) + mailMon := servicetest.Run(t, mailbox.NewMonitor(t.Name())) vd := vrf.NewDelegate( db, diff --git a/core/services/vrf/v1/listener_v1.go b/core/services/vrf/v1/listener_v1.go index 66a8ddcd58c..f4e813d7d61 100644 --- a/core/services/vrf/v1/listener_v1.go +++ b/core/services/vrf/v1/listener_v1.go @@ -17,6 +17,7 @@ import ( "github.com/theodesp/go-heaps/pairing" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log" @@ -57,8 +58,8 @@ type Listener struct { Job job.Job Q pg.Q GethKs vrfcommon.GethKeyStore - MailMon *utils.MailboxMonitor - ReqLogs *utils.Mailbox[log.Broadcast] + MailMon *mailbox.Monitor + ReqLogs *mailbox.Mailbox[log.Broadcast] ChStop services.StopChan WaitOnStop chan struct{} NewHead chan struct{} diff --git a/core/services/vrf/v2/listener_v2.go b/core/services/vrf/v2/listener_v2.go index 5878bf54763..6556bbd2186 100644 --- a/core/services/vrf/v2/listener_v2.go +++ b/core/services/vrf/v2/listener_v2.go @@ -14,6 +14,7 @@ import ( "github.com/theodesp/go-heaps/pairing" "github.com/smartcontractkit/chainlink-common/pkg/services" + txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" diff --git a/core/services/vrf/v2/listener_v2_test.go b/core/services/vrf/v2/listener_v2_test.go index 6192db95dfe..d8bc0a6695b 100644 --- a/core/services/vrf/v2/listener_v2_test.go +++ b/core/services/vrf/v2/listener_v2_test.go @@ -17,6 +17,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + clnull "github.com/smartcontractkit/chainlink-common/pkg/utils/null" + txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" @@ -28,7 +30,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" - clnull "github.com/smartcontractkit/chainlink/v2/core/null" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs" "github.com/smartcontractkit/chainlink/v2/core/utils" diff --git a/core/utils/mailbox.go b/core/utils/mailbox.go deleted file mode 100644 index 87fe1627f37..00000000000 --- a/core/utils/mailbox.go +++ /dev/null @@ -1,126 +0,0 @@ -package utils - -import ( - "sync" - "sync/atomic" -) - -// Mailbox contains a notify channel, -// a mutual exclusive lock, -// a queue of interfaces, -// and a queue capacity. -type Mailbox[T any] struct { - mu sync.Mutex - chNotify chan struct{} - queue []T - queueLen atomic.Int64 // atomic so monitor can read w/o blocking the queue - - // capacity - number of items the mailbox can buffer - // NOTE: if the capacity is 1, it's possible that an empty Retrieve may occur after a notification. - capacity uint64 - // onCloseFn is a hook used to stop monitoring, if non-nil - onCloseFn func() -} - -// NewHighCapacityMailbox create a new mailbox with a capacity -// that is better able to handle e.g. large log replays. -func NewHighCapacityMailbox[T any]() *Mailbox[T] { - return NewMailbox[T](100_000) -} - -// NewSingleMailbox returns a new Mailbox with capacity one. -func NewSingleMailbox[T any]() *Mailbox[T] { return NewMailbox[T](1) } - -// NewMailbox creates a new mailbox instance. If name is non-empty, it must be unique and calling Start will launch -// prometheus metric monitor that periodically reports mailbox load until Close() is called. -func NewMailbox[T any](capacity uint64) *Mailbox[T] { - queueCap := capacity - if queueCap == 0 { - queueCap = 100 - } - return &Mailbox[T]{ - chNotify: make(chan struct{}, 1), - queue: make([]T, 0, queueCap), - capacity: capacity, - } -} - -// Notify returns the contents of the notify channel -func (m *Mailbox[T]) Notify() <-chan struct{} { - return m.chNotify -} - -func (m *Mailbox[T]) Close() error { - if m.onCloseFn != nil { - m.onCloseFn() - } - return nil -} - -func (m *Mailbox[T]) onClose(fn func()) { m.onCloseFn = fn } - -func (m *Mailbox[T]) load() (capacity uint64, loadPercent float64) { - capacity = m.capacity - loadPercent = 100 * float64(m.queueLen.Load()) / float64(capacity) - return -} - -// Deliver appends to the queue and returns true if the queue was full, causing a message to be dropped. -func (m *Mailbox[T]) Deliver(x T) (wasOverCapacity bool) { - m.mu.Lock() - defer m.mu.Unlock() - - m.queue = append([]T{x}, m.queue...) - if uint64(len(m.queue)) > m.capacity && m.capacity > 0 { - m.queue = m.queue[:len(m.queue)-1] - wasOverCapacity = true - } else { - m.queueLen.Add(1) - } - - select { - case m.chNotify <- struct{}{}: - default: - } - return -} - -// Retrieve fetches one element from the queue. -func (m *Mailbox[T]) Retrieve() (t T, ok bool) { - m.mu.Lock() - defer m.mu.Unlock() - if len(m.queue) == 0 { - return - } - t = m.queue[len(m.queue)-1] - m.queue = m.queue[:len(m.queue)-1] - m.queueLen.Add(-1) - ok = true - return -} - -// RetrieveAll fetches all elements from the queue. -func (m *Mailbox[T]) RetrieveAll() []T { - m.mu.Lock() - defer m.mu.Unlock() - queue := m.queue - m.queue = nil - m.queueLen.Store(0) - for i, j := 0, len(queue)-1; i < j; i, j = i+1, j-1 { - queue[i], queue[j] = queue[j], queue[i] - } - return queue -} - -// RetrieveLatestAndClear fetch the latest value (or nil), and clears the rest of the queue (if any). -func (m *Mailbox[T]) RetrieveLatestAndClear() (t T) { - m.mu.Lock() - defer m.mu.Unlock() - if len(m.queue) == 0 { - return - } - t = m.queue[0] - m.queue = nil - m.queueLen.Store(0) - return -} diff --git a/core/utils/mailbox_prom.go b/core/utils/mailbox_prom.go deleted file mode 100644 index 33cbb2357b1..00000000000 --- a/core/utils/mailbox_prom.go +++ /dev/null @@ -1,93 +0,0 @@ -package utils - -import ( - "context" - "strconv" - "strings" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/smartcontractkit/chainlink-common/pkg/services" -) - -var mailboxLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "mailbox_load_percent", - Help: "Percent of mailbox capacity used", -}, - []string{"appID", "name", "capacity"}, -) - -const mailboxPromInterval = 5 * time.Second - -type MailboxMonitor struct { - services.StateMachine - appID string - - mailboxes sync.Map - stop func() - done chan struct{} -} - -func NewMailboxMonitor(appID string) *MailboxMonitor { - return &MailboxMonitor{appID: appID} -} - -func (m *MailboxMonitor) Name() string { return "MailboxMonitor" } - -func (m *MailboxMonitor) Start(context.Context) error { - return m.StartOnce("MailboxMonitor", func() error { - t := time.NewTicker(WithJitter(mailboxPromInterval)) - ctx, cancel := context.WithCancel(context.Background()) - m.stop = func() { - t.Stop() - cancel() - } - m.done = make(chan struct{}) - go m.monitorLoop(ctx, t.C) - return nil - }) -} - -func (m *MailboxMonitor) Close() error { - return m.StopOnce("MailboxMonitor", func() error { - m.stop() - <-m.done - return nil - }) -} - -func (m *MailboxMonitor) HealthReport() map[string]error { - return map[string]error{m.Name(): m.Healthy()} -} - -func (m *MailboxMonitor) monitorLoop(ctx context.Context, c <-chan time.Time) { - defer close(m.done) - for { - select { - case <-ctx.Done(): - return - case <-c: - m.mailboxes.Range(func(k, v any) bool { - name, mb := k.(string), v.(mailbox) - c, p := mb.load() - capacity := strconv.FormatUint(c, 10) - mailboxLoad.WithLabelValues(m.appID, name, capacity).Set(p) - return true - }) - } - } -} - -type mailbox interface { - load() (capacity uint64, percent float64) - onClose(func()) -} - -func (m *MailboxMonitor) Monitor(mb mailbox, name ...string) { - n := strings.Join(name, ".") - m.mailboxes.Store(n, mb) - mb.onClose(func() { m.mailboxes.Delete(n) }) -} diff --git a/core/utils/mailbox_test.go b/core/utils/mailbox_test.go deleted file mode 100644 index c83d0035baa..00000000000 --- a/core/utils/mailbox_test.go +++ /dev/null @@ -1,181 +0,0 @@ -package utils - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestMailbox(t *testing.T) { - var ( - expected = []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11} - toDeliver = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} - ) - - const capacity = 10 - m := NewMailbox[int](capacity) - - // Queue deliveries - for i, d := range toDeliver { - atCapacity := m.Deliver(d) - if atCapacity && i < capacity { - t.Errorf("mailbox at capacity %d", i) - } else if !atCapacity && i >= capacity { - t.Errorf("mailbox below capacity %d", i) - } - } - - // Retrieve them - var recvd []int - chDone := make(chan struct{}) - go func() { - defer close(chDone) - for range m.Notify() { - for { - x, exists := m.Retrieve() - if !exists { - break - } - recvd = append(recvd, x) - } - } - }() - - close(m.chNotify) - <-chDone - - require.Equal(t, expected, recvd) -} - -func TestMailbox_RetrieveAll(t *testing.T) { - var ( - expected = []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11} - toDeliver = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} - ) - - const capacity = 10 - m := NewMailbox[int](capacity) - - // Queue deliveries - for i, d := range toDeliver { - atCapacity := m.Deliver(d) - if atCapacity && i < capacity { - t.Errorf("mailbox at capacity %d", i) - } else if !atCapacity && i >= capacity { - t.Errorf("mailbox below capacity %d", i) - } - } - - require.Equal(t, expected, m.RetrieveAll()) -} - -func TestMailbox_RetrieveLatestAndClear(t *testing.T) { - var ( - expected = 11 - toDeliver = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} - ) - - const capacity = 10 - m := NewMailbox[int](capacity) - - // Queue deliveries - for i, d := range toDeliver { - atCapacity := m.Deliver(d) - if atCapacity && i < capacity { - t.Errorf("mailbox at capacity %d", i) - } else if !atCapacity && i >= capacity { - t.Errorf("mailbox below capacity %d", i) - } - } - - require.Equal(t, expected, m.RetrieveLatestAndClear()) - require.Len(t, m.RetrieveAll(), 0) -} - -func TestMailbox_NoEmptyReceivesWhenCapacityIsTwo(t *testing.T) { - m := NewMailbox[int](2) - - var ( - recvd []int - emptyReceives []int - ) - - chDone := make(chan struct{}) - go func() { - defer close(chDone) - for range m.Notify() { - x, exists := m.Retrieve() - if !exists { - emptyReceives = append(emptyReceives, recvd[len(recvd)-1]) - } else { - recvd = append(recvd, x) - } - } - }() - - for i := 0; i < 100000; i++ { - m.Deliver(i) - } - close(m.chNotify) - - <-chDone - require.Len(t, emptyReceives, 0) -} - -func TestMailbox_load(t *testing.T) { - for _, tt := range []struct { - name string - capacity uint64 - deliver []int - exp float64 - - retrieve int - exp2 float64 - - all bool - }{ - {"single-all", 1, []int{1}, 100, 0, 100, true}, - {"single-latest", 1, []int{1}, 100, 0, 100, false}, - {"ten-low", 10, []int{1}, 10, 1, 0.0, false}, - {"ten-full-all", 10, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, 100, 5, 50, true}, - {"ten-full-latest", 10, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, 100, 5, 50, false}, - {"ten-overflow", 10, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 100, 5, 50, false}, - {"nine", 9, []int{1, 2, 3}, 100.0 / 3.0, 2, 100.0 / 9.0, true}, - } { - t.Run(tt.name, func(t *testing.T) { - m := NewMailbox[int](tt.capacity) - - // Queue deliveries - for i, d := range tt.deliver { - atCapacity := m.Deliver(d) - if atCapacity && i < int(tt.capacity) { - t.Errorf("mailbox at capacity %d", i) - } else if !atCapacity && i >= int(tt.capacity) { - t.Errorf("mailbox below capacity %d", i) - } - } - gotCap, gotLoad := m.load() - require.Equal(t, gotCap, tt.capacity) - require.Equal(t, gotLoad, tt.exp) - - // Retrieve some - for i := 0; i < tt.retrieve; i++ { - _, ok := m.Retrieve() - require.True(t, ok) - } - gotCap, gotLoad = m.load() - require.Equal(t, gotCap, tt.capacity) - require.Equal(t, gotLoad, tt.exp2) - - // Drain it - if tt.all { - m.RetrieveAll() - } else { - m.RetrieveLatestAndClear() - } - gotCap, gotLoad = m.load() - require.Equal(t, gotCap, tt.capacity) - require.Equal(t, gotLoad, 0.0) - }) - } -} diff --git a/core/utils/utils.go b/core/utils/utils.go index b887fe9a800..4f3f9212337 100644 --- a/core/utils/utils.go +++ b/core/utils/utils.go @@ -27,7 +27,6 @@ import ( "github.com/robfig/cron/v3" "golang.org/x/crypto/bcrypt" "golang.org/x/crypto/sha3" - "golang.org/x/exp/constraints" ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" @@ -879,26 +878,6 @@ func TryParseHex(s string) (b []byte, err error) { return } -// MinKey returns the minimum value of the given element array with respect -// to the given key function. In the event U is not a compound type (e.g a -// struct) an identity function can be provided. -func MinKey[U any, T constraints.Ordered](elems []U, key func(U) T) T { - var min T - if len(elems) == 0 { - return min - } - - min = key(elems[0]) - for i := 1; i < len(elems); i++ { - v := key(elems[i]) - if v < min { - min = v - } - } - - return min -} - // ErrorBuffer uses joinedErrors interface to join multiple errors into a single error. // This is useful to track the most recent N errors in a service and flush them as a single error. type ErrorBuffer struct {