Skip to content

Commit

Permalink
core/utils: deprecate StartStopOnce in favor of services.StateMachine (
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored Oct 30, 2023
1 parent c942403 commit 67a79f1
Show file tree
Hide file tree
Showing 86 changed files with 254 additions and 488 deletions.
26 changes: 12 additions & 14 deletions common/headtracker/head_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/smartcontractkit/chainlink-relay/pkg/services"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand All @@ -25,13 +26,13 @@ func (set callbackSet[H, BLOCK_HASH]) values() []types.HeadTrackable[H, BLOCK_HA
}

type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct {
logger logger.Logger
callbacks callbackSet[H, BLOCK_HASH]
mailbox *utils.Mailbox[H]
mutex *sync.Mutex
chClose utils.StopChan
wgDone sync.WaitGroup
utils.StartStopOnce
services.StateMachine
logger logger.Logger
callbacks callbackSet[H, BLOCK_HASH]
mailbox *utils.Mailbox[H]
mutex sync.Mutex
chClose utils.StopChan
wgDone sync.WaitGroup
latest H
lastCallbackID int
}
Expand All @@ -44,13 +45,10 @@ func NewHeadBroadcaster[
lggr logger.Logger,
) *HeadBroadcaster[H, BLOCK_HASH] {
return &HeadBroadcaster[H, BLOCK_HASH]{
logger: lggr.Named("HeadBroadcaster"),
callbacks: make(callbackSet[H, BLOCK_HASH]),
mailbox: utils.NewSingleMailbox[H](),
mutex: &sync.Mutex{},
chClose: make(chan struct{}),
wgDone: sync.WaitGroup{},
StartStopOnce: utils.StartStopOnce{},
logger: lggr.Named("HeadBroadcaster"),
callbacks: make(callbackSet[H, BLOCK_HASH]),
mailbox: utils.NewSingleMailbox[H](),
chClose: make(chan struct{}),
}
}

Expand Down
4 changes: 2 additions & 2 deletions common/headtracker/head_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type HeadTracker[
ID types.ID,
BLOCK_HASH types.Hashable,
] struct {
services.StateMachine
log logger.Logger
headBroadcaster types.HeadBroadcaster[HTH, BLOCK_HASH]
headSaver types.HeadSaver[HTH, BLOCK_HASH]
Expand All @@ -54,8 +55,7 @@ type HeadTracker[
headListener types.HeadListener[HTH, BLOCK_HASH]
chStop utils.StopChan
wgDone sync.WaitGroup
utils.StartStopOnce
getNilHead func() HTH
getNilHead func() HTH
}

// NewHeadTracker instantiates a new HeadTracker using HeadSaver to persist new block numbers.
Expand Down
3 changes: 2 additions & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.uber.org/multierr"
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-relay/pkg/services"
clienttypes "github.com/smartcontractkit/chainlink/v2/common/chains/client"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
Expand Down Expand Up @@ -105,6 +106,7 @@ type Broadcaster[
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
services.StateMachine
logger logger.Logger
txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]
client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
Expand Down Expand Up @@ -140,7 +142,6 @@ type Broadcaster[

initSync sync.Mutex
isStarted bool
utils.StartStopOnce

parseAddr func(string) (ADDR, error)

Expand Down
3 changes: 2 additions & 1 deletion common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink-relay/pkg/services"
clienttypes "github.com/smartcontractkit/chainlink/v2/common/chains/client"
commonfee "github.com/smartcontractkit/chainlink/v2/common/fee"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
Expand Down Expand Up @@ -112,7 +113,7 @@ type Confirmer[
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
utils.StartStopOnce
services.StateMachine
txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
lggr logger.Logger
client txmgrtypes.TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Txm[
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
utils.StartStopOnce
services.StateMachine
logger logger.Logger
txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
config txmgrtypes.TransactionManagerChainConfig
Expand Down
5 changes: 2 additions & 3 deletions core/chains/cosmos/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains"
"github.com/smartcontractkit/chainlink/v2/core/chains/cosmos/cosmostxm"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

// DefaultRequestTimeout is the default Cosmos client timeout.
Expand Down Expand Up @@ -93,7 +92,7 @@ func NewChain(cfg *CosmosConfig, opts ChainOpts) (adapters.Chain, error) {
var _ adapters.Chain = (*chain)(nil)

type chain struct {
utils.StartStopOnce
services.StateMachine
id string
cfg *CosmosConfig
txm *cosmostxm.Txm
Expand Down Expand Up @@ -197,7 +196,7 @@ func (c *chain) Close() error {

func (c *chain) Ready() error {
return multierr.Combine(
c.StartStopOnce.Ready(),
c.StateMachine.Ready(),
c.txm.Ready(),
)
}
Expand Down
23 changes: 12 additions & 11 deletions core/chains/cosmos/cosmostxm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@ import (

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/loop"
"github.com/smartcontractkit/chainlink-relay/pkg/services"

"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var (
_ services.ServiceCtx = (*Txm)(nil)
_ adapters.TxManager = (*Txm)(nil)
_ services.Service = (*Txm)(nil)
_ adapters.TxManager = (*Txm)(nil)
)

// Txm manages transactions for the cosmos blockchain.
type Txm struct {
starter utils.StartStopOnce
services.StateMachine
eb pg.EventBroadcaster
sub pg.Subscription
orm *ORM
Expand All @@ -58,7 +58,6 @@ func NewTxm(db *sqlx.DB, tc func() (cosmosclient.ReaderWriter, error), gpe cosmo
lggr = logger.Named(lggr, "Txm")
keystoreAdapter := NewKeystoreAdapter(ks, cfg.Bech32Prefix())
return &Txm{
starter: utils.StartStopOnce{},
eb: eb,
orm: NewORM(chainID, db, lggr, logCfg),
lggr: lggr,
Expand All @@ -73,7 +72,7 @@ func NewTxm(db *sqlx.DB, tc func() (cosmosclient.ReaderWriter, error), gpe cosmo

// Start subscribes to pg notifications about cosmos msg inserts and processes them.
func (txm *Txm) Start(context.Context) error {
return txm.starter.StartOnce("cosmostxm", func() error {
return txm.StartOnce("Txm", func() error {
sub, err := txm.eb.Subscribe(pg.ChannelInsertOnCosmosMsg, "")
if err != nil {
return err
Expand Down Expand Up @@ -520,13 +519,15 @@ func (txm *Txm) GasPrice() (sdk.DecCoin, error) {

// Close close service
func (txm *Txm) Close() error {
txm.sub.Close()
close(txm.stop)
<-txm.done
return nil
return txm.StopOnce("Txm", func() error {
txm.sub.Close()
close(txm.stop)
<-txm.done
return nil
})
}

func (txm *Txm) Name() string { return "cosmostxm" }
func (txm *Txm) Name() string { return txm.lggr.Name() }

// Healthy service is healthy
func (txm *Txm) Healthy() error {
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *LegacyChains) Get(id string) (Chain, error) {
}

type chain struct {
utils.StartStopOnce
services.StateMachine
id *big.Int
cfg *evmconfig.ChainScoped
client evmclient.Client
Expand Down Expand Up @@ -366,7 +366,7 @@ func (c *chain) Close() error {

func (c *chain) Ready() (merr error) {
merr = multierr.Combine(
c.StartStopOnce.Ready(),
c.StateMachine.Ready(),
c.txm.Ready(),
c.headBroadcaster.Ready(),
c.headTracker.Ready(),
Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-relay/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -132,7 +133,7 @@ type rawclient struct {
// Node represents one ethereum node.
// It must have a ws url and may have a http url
type node struct {
utils.StartStopOnce
services.StateMachine
lfcLog logger.Logger
rpcLog logger.Logger
name string
Expand Down
6 changes: 3 additions & 3 deletions core/chains/evm/client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink-relay/pkg/services"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -57,7 +57,7 @@ type PoolConfig interface {
// Pool represents an abstraction over one or more primary nodes
// It is responsible for liveness checking and balancing queries across live nodes
type Pool struct {
utils.StartStopOnce
services.StateMachine
nodes []Node
sendonlys []SendOnlyNode
chainID *big.Int
Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/client/send_only_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"

"github.com/smartcontractkit/chainlink-relay/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ var _ SendOnlyNode = &sendOnlyNode{}
// It only supports sending transactions
// It must a http(s) url
type sendOnlyNode struct {
utils.StartStopOnce
services.StateMachine

stateMu sync.RWMutex // protects state* fields
state NodeState
Expand Down
7 changes: 3 additions & 4 deletions core/chains/evm/forwarders/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"

"github.com/smartcontractkit/sqlx"

"github.com/smartcontractkit/chainlink-relay/pkg/services"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
evmlogpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand All @@ -30,7 +32,7 @@ type Config interface {
}

type FwdMgr struct {
utils.StartStopOnce
services.StateMachine
ORM ORM
evmClient evmclient.Client
cfg Config
Expand Down Expand Up @@ -61,9 +63,6 @@ func NewFwdMgr(db *sqlx.DB, client evmclient.Client, logpoller evmlogpoller.LogP
ORM: NewORM(db, lggr, dbConfig),
logpoller: logpoller,
sendersCache: make(map[common.Address][]common.Address),
cacheMu: sync.RWMutex{},
wg: sync.WaitGroup{},
latestBlock: 0,
}
fwdMgr.ctx, fwdMgr.cancel = context.WithCancel(context.Background())
return &fwdMgr
Expand Down
5 changes: 2 additions & 3 deletions core/chains/evm/gas/arbitrum_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ethClient interface {

// arbitrumEstimator is an Estimator which extends l2SuggestedPriceEstimator to use getPricesInArbGas() for gas limit estimation.
type arbitrumEstimator struct {
services.StateMachine
cfg ArbConfig

EvmEstimator // *l2SuggestedPriceEstimator
Expand All @@ -49,8 +50,6 @@ type arbitrumEstimator struct {
chInitialised chan struct{}
chStop utils.StopChan
chDone chan struct{}

utils.StartStopOnce
}

func NewArbitrumEstimator(lggr logger.Logger, cfg ArbConfig, rpcClient rpcClient, ethClient ethClient) EvmEstimator {
Expand Down Expand Up @@ -91,7 +90,7 @@ func (a *arbitrumEstimator) Close() error {
})
}

func (a *arbitrumEstimator) Ready() error { return a.StartStopOnce.Ready() }
func (a *arbitrumEstimator) Ready() error { return a.StateMachine.Ready() }

func (a *arbitrumEstimator) HealthReport() map[string]error {
hp := map[string]error{a.Name(): a.Healthy()}
Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/gas/block_history_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-relay/pkg/services"
commonfee "github.com/smartcontractkit/chainlink/v2/common/fee"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/core/assets"
Expand Down Expand Up @@ -96,7 +97,7 @@ type estimatorGasEstimatorConfig interface {
//go:generate mockery --quiet --name Config --output ./mocks/ --case=underscore
type (
BlockHistoryEstimator struct {
utils.StartStopOnce
services.StateMachine
ethClient evmclient.Client
chainID big.Int
config chainConfig
Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/gas/l2_suggested_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-relay/pkg/services"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/core/assets"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand All @@ -28,7 +29,7 @@ type rpcClient interface {

// l2SuggestedPriceEstimator is an Estimator which uses the L2 suggested gas price from eth_gasPrice.
type l2SuggestedPriceEstimator struct {
utils.StartStopOnce
services.StateMachine

client rpcClient
pollPeriod time.Duration
Expand Down
3 changes: 1 addition & 2 deletions core/chains/evm/gas/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
bigmath "github.com/smartcontractkit/chainlink/v2/core/utils/big_math"
)

Expand Down Expand Up @@ -150,10 +149,10 @@ func (fee EvmFee) ValidDynamic() bool {

// WrappedEvmEstimator provides a struct that wraps the EVM specific dynamic and legacy estimators into one estimator that conforms to the generic FeeEstimator
type WrappedEvmEstimator struct {
services.StateMachine
EvmEstimator
EIP1559Enabled bool
l1Oracle rollups.L1Oracle
utils.StartStopOnce
}

var _ EvmFeeEstimator = (*WrappedEvmEstimator)(nil)
Expand Down
Loading

0 comments on commit 67a79f1

Please sign in to comment.