Skip to content

Commit

Permalink
audit background contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Oct 24, 2024
1 parent 00912cd commit cf797c9
Show file tree
Hide file tree
Showing 38 changed files with 478 additions and 595 deletions.
3 changes: 2 additions & 1 deletion core/capabilities/ccip/launcher/integration_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package launcher

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -115,7 +116,7 @@ type oracleCreatorPrints struct {
t *testing.T
}

func (o *oracleCreatorPrints) Create(_ uint32, config cctypes.OCR3ConfigWithMeta) (cctypes.CCIPOracle, error) {
func (o *oracleCreatorPrints) Create(ctx context.Context, _ uint32, config cctypes.OCR3ConfigWithMeta) (cctypes.CCIPOracle, error) {
pluginType := cctypes.PluginType(config.Config.PluginType)
o.t.Logf("Creating plugin oracle (pluginType: %s) with config %+v\n", pluginType, config)
return &oraclePrints{pluginType: pluginType, config: config, t: o.t}, nil
Expand Down
47 changes: 28 additions & 19 deletions core/capabilities/ccip/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type launcher struct {
p2pID ragep2ptypes.PeerID
lggr logger.Logger
homeChainReader ccipreader.HomeChain
stopChan chan struct{}
stopChan services.StopChan
// latestState is the latest capability registry state received from the syncer.
latestState registrysyncer.LocalRegistry
// regState is the latest capability registry state that we have successfully processed.
Expand Down Expand Up @@ -131,19 +131,23 @@ func (l *launcher) Start(context.Context) error {
func (l *launcher) monitor() {
defer l.wg.Done()
ticker := time.NewTicker(l.tickInterval)

ctx, cancel := l.stopChan.NewCtx()
defer cancel()

for {
select {
case <-l.stopChan:
case <-ctx.Done():
return
case <-ticker.C:
if err := l.tick(); err != nil {
if err := l.tick(ctx); err != nil {
l.lggr.Errorw("Failed to tick", "err", err)
}
}
}
}

func (l *launcher) tick() error {
func (l *launcher) tick(ctx context.Context) error {
// Ensure that the home chain reader is healthy.
// For new jobs it may be possible that the home chain reader is not yet ready
// so we won't be able to fetch configs and start any OCR instances.
Expand All @@ -160,7 +164,7 @@ func (l *launcher) tick() error {
return fmt.Errorf("failed to diff capability registry states: %w", err)
}

err = l.processDiff(diffRes)
err = l.processDiff(ctx, diffRes)
if err != nil {
return fmt.Errorf("failed to process diff: %w", err)
}
Expand All @@ -172,15 +176,15 @@ func (l *launcher) tick() error {
// for any added OCR instances, it will launch them.
// for any removed OCR instances, it will shut them down.
// for any updated OCR instances, it will restart them with the new configuration.
func (l *launcher) processDiff(diff diffResult) error {
func (l *launcher) processDiff(ctx context.Context, diff diffResult) error {
err := l.processRemoved(diff.removed)
err = multierr.Append(err, l.processAdded(diff.added))
err = multierr.Append(err, l.processUpdate(diff.updated))
err = multierr.Append(err, l.processAdded(ctx, diff.added))
err = multierr.Append(err, l.processUpdate(ctx, diff.updated))

return err
}

func (l *launcher) processUpdate(updated map[registrysyncer.DonID]registrysyncer.DON) error {
func (l *launcher) processUpdate(ctx context.Context, updated map[registrysyncer.DonID]registrysyncer.DON) error {
l.lock.Lock()
defer l.lock.Unlock()

Expand All @@ -191,6 +195,7 @@ func (l *launcher) processUpdate(updated map[registrysyncer.DonID]registrysyncer
}

futDeployment, err := updateDON(
ctx,
l.lggr,
l.p2pID,
l.homeChainReader,
Expand All @@ -216,12 +221,13 @@ func (l *launcher) processUpdate(updated map[registrysyncer.DonID]registrysyncer
return nil
}

func (l *launcher) processAdded(added map[registrysyncer.DonID]registrysyncer.DON) error {
func (l *launcher) processAdded(ctx context.Context, added map[registrysyncer.DonID]registrysyncer.DON) error {
l.lock.Lock()
defer l.lock.Unlock()

for donID, don := range added {
dep, err := createDON(
ctx,
l.lggr,
l.p2pID,
l.homeChainReader,
Expand Down Expand Up @@ -281,6 +287,7 @@ func (l *launcher) processRemoved(removed map[registrysyncer.DonID]registrysynce
// It returns a new ccipDeployment that can then be used to perform the active-candidate deployment,
// based on the previous deployment.
func updateDON(
ctx context.Context,
lggr logger.Logger,
p2pID ragep2ptypes.PeerID,
homeChainReader ccipreader.HomeChain,
Expand All @@ -294,24 +301,24 @@ func updateDON(
}

// this should be a retryable error.
commitOCRConfigs, err := homeChainReader.GetOCRConfigs(context.Background(), don.ID, uint8(cctypes.PluginTypeCCIPCommit))
commitOCRConfigs, err := homeChainReader.GetOCRConfigs(ctx, don.ID, uint8(cctypes.PluginTypeCCIPCommit))
if err != nil {
return nil, fmt.Errorf("failed to fetch OCR configs for CCIP commit plugin (don id: %d) from home chain config contract: %w",
don.ID, err)
}

execOCRConfigs, err := homeChainReader.GetOCRConfigs(context.Background(), don.ID, uint8(cctypes.PluginTypeCCIPExec))
execOCRConfigs, err := homeChainReader.GetOCRConfigs(ctx, don.ID, uint8(cctypes.PluginTypeCCIPExec))
if err != nil {
return nil, fmt.Errorf("failed to fetch OCR configs for CCIP exec plugin (don id: %d) from home chain config contract: %w",
don.ID, err)
}

commitBgd, err := createFutureActiveCandidateDeployment(don.ID, prevDeployment, commitOCRConfigs, oracleCreator, cctypes.PluginTypeCCIPCommit)
commitBgd, err := createFutureActiveCandidateDeployment(ctx, don.ID, prevDeployment, commitOCRConfigs, oracleCreator, cctypes.PluginTypeCCIPCommit)
if err != nil {
return nil, fmt.Errorf("failed to create future active-candidate deployment for CCIP commit plugin: %w, don id: %d", err, don.ID)
}

execBgd, err := createFutureActiveCandidateDeployment(don.ID, prevDeployment, execOCRConfigs, oracleCreator, cctypes.PluginTypeCCIPExec)
execBgd, err := createFutureActiveCandidateDeployment(ctx, don.ID, prevDeployment, execOCRConfigs, oracleCreator, cctypes.PluginTypeCCIPExec)
if err != nil {
return nil, fmt.Errorf("failed to create future active-candidate deployment for CCIP exec plugin: %w, don id: %d", err, don.ID)
}
Expand All @@ -327,6 +334,7 @@ func updateDON(
// b) len(ocrConfigs) == 1 && prevDeployment.HasCandidateInstance(): this is a promotion of candidate->active.
// All other cases are invalid. This is enforced in the ccip config contract.
func createFutureActiveCandidateDeployment(
ctx context.Context,
donID uint32,
prevDeployment ccipDeployment,
ocrConfigs []ccipreader.OCR3ConfigWithMeta,
Expand All @@ -336,7 +344,7 @@ func createFutureActiveCandidateDeployment(
var deployment activeCandidateDeployment
if isNewCandidateInstance(pluginType, ocrConfigs, prevDeployment) {
// this is a new candidate instance.
greenOracle, err := oracleCreator.Create(donID, cctypes.OCR3ConfigWithMeta(ocrConfigs[1]))
greenOracle, err := oracleCreator.Create(ctx, donID, cctypes.OCR3ConfigWithMeta(ocrConfigs[1]))
if err != nil {
return activeCandidateDeployment{}, fmt.Errorf("failed to create CCIP commit oracle: %w", err)
}
Expand All @@ -356,20 +364,21 @@ func createFutureActiveCandidateDeployment(
// createDON is a pure function that handles the case where a new DON is added to the capability registry.
// It returns a new ccipDeployment that can then be used to start the active instance.
func createDON(
ctx context.Context,
lggr logger.Logger,
p2pID ragep2ptypes.PeerID,
homeChainReader ccipreader.HomeChain,
don registrysyncer.DON,
oracleCreator cctypes.OracleCreator,
) (*ccipDeployment, error) {
// this should be a retryable error.
commitOCRConfigs, err := homeChainReader.GetOCRConfigs(context.Background(), don.ID, uint8(cctypes.PluginTypeCCIPCommit))
commitOCRConfigs, err := homeChainReader.GetOCRConfigs(ctx, don.ID, uint8(cctypes.PluginTypeCCIPCommit))
if err != nil {
return nil, fmt.Errorf("failed to fetch OCR configs for CCIP commit plugin (don id: %d) from home chain config contract: %w",
don.ID, err)
}

execOCRConfigs, err := homeChainReader.GetOCRConfigs(context.Background(), don.ID, uint8(cctypes.PluginTypeCCIPExec))
execOCRConfigs, err := homeChainReader.GetOCRConfigs(ctx, don.ID, uint8(cctypes.PluginTypeCCIPExec))
if err != nil {
return nil, fmt.Errorf("failed to fetch OCR configs for CCIP exec plugin (don id: %d) from home chain config contract: %w",
don.ID, err)
Expand All @@ -391,12 +400,12 @@ func createDON(

// at this point we know we are either a member of the DON or a bootstrap node.
// the injected oracleCreator will create the appropriate oracle.
commitOracle, err := oracleCreator.Create(don.ID, cctypes.OCR3ConfigWithMeta(commitOCRConfigs[0]))
commitOracle, err := oracleCreator.Create(ctx, don.ID, cctypes.OCR3ConfigWithMeta(commitOCRConfigs[0]))
if err != nil {
return nil, fmt.Errorf("failed to create CCIP commit oracle: %w", err)
}

execOracle, err := oracleCreator.Create(don.ID, cctypes.OCR3ConfigWithMeta(execOCRConfigs[0]))
execOracle, err := oracleCreator.Create(ctx, don.ID, cctypes.OCR3ConfigWithMeta(execOCRConfigs[0]))
if err != nil {
return nil, fmt.Errorf("failed to create CCIP exec oracle: %w", err)
}
Expand Down
23 changes: 12 additions & 11 deletions core/capabilities/ccip/launcher/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

cctypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"

ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -96,7 +97,7 @@ func Test_createDON(t *testing.T) {
},
}}, nil)
oracleCreator.EXPECT().Type().Return(cctypes.OracleTypeBootstrap).Once()
oracleCreator.EXPECT().Create(mock.Anything, mock.Anything).Return(mocks.NewCCIPOracle(t), nil).Twice()
oracleCreator.EXPECT().Create(mock.Anything, mock.Anything, mock.Anything).Return(mocks.NewCCIPOracle(t), nil).Twice()
},
false,
},
Expand Down Expand Up @@ -127,11 +128,11 @@ func Test_createDON(t *testing.T) {
},
}}, nil)

oracleCreator.EXPECT().Create(mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
oracleCreator.EXPECT().Create(mock.Anything, mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
return cfg.Config.PluginType == uint8(cctypes.PluginTypeCCIPCommit)
})).
Return(mocks.NewCCIPOracle(t), nil)
oracleCreator.EXPECT().Create(mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
oracleCreator.EXPECT().Create(mock.Anything, mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
return cfg.Config.PluginType == uint8(cctypes.PluginTypeCCIPExec)
})).
Return(mocks.NewCCIPOracle(t), nil)
Expand All @@ -145,7 +146,7 @@ func Test_createDON(t *testing.T) {
tt.expect(t, tt.args, tt.args.oracleCreator, tt.args.homeChainReader)
}

_, err := createDON(tt.args.lggr, tt.args.p2pID, tt.args.homeChainReader, tt.args.don, tt.args.oracleCreator)
_, err := createDON(testutils.Context(t), tt.args.lggr, tt.args.p2pID, tt.args.homeChainReader, tt.args.don, tt.args.oracleCreator)
if tt.wantErr {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -173,7 +174,7 @@ func Test_createFutureBlueGreenDeployment(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := createFutureActiveCandidateDeployment(tt.args.donID, tt.args.prevDeployment, tt.args.ocrConfigs, tt.args.oracleCreator, tt.args.pluginType)
got, err := createFutureActiveCandidateDeployment(testutils.Context(t), tt.args.donID, tt.args.prevDeployment, tt.args.ocrConfigs, tt.args.oracleCreator, tt.args.pluginType)
if (err != nil) != tt.wantErr {
t.Errorf("createFutureActiveCandidateDeployment() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -204,7 +205,7 @@ func Test_updateDON(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotFutDeployment, err := updateDON(tt.args.lggr, tt.args.p2pID, tt.args.homeChainReader, tt.args.prevDeployment, tt.args.don, tt.args.oracleCreator)
gotFutDeployment, err := updateDON(testutils.Context(t), tt.args.lggr, tt.args.p2pID, tt.args.homeChainReader, tt.args.prevDeployment, tt.args.don, tt.args.oracleCreator)
if (err != nil) != tt.wantErr {
t.Errorf("updateDON() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -303,11 +304,11 @@ func Test_launcher_processDiff(t *testing.T) {
commitOracle.On("Start").Return(nil)
execOracle := mocks.NewCCIPOracle(t)
execOracle.On("Start").Return(nil)
m.EXPECT().Create(mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
m.EXPECT().Create(mock.Anything, mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
return cfg.Config.PluginType == uint8(cctypes.PluginTypeCCIPCommit)
})).
Return(commitOracle, nil)
m.EXPECT().Create(mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
m.EXPECT().Create(mock.Anything, mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
return cfg.Config.PluginType == uint8(cctypes.PluginTypeCCIPExec)
})).
Return(execOracle, nil)
Expand Down Expand Up @@ -366,11 +367,11 @@ func Test_launcher_processDiff(t *testing.T) {
commitOracle.On("Start").Return(nil)
execOracle := mocks.NewCCIPOracle(t)
execOracle.On("Start").Return(nil)
m.EXPECT().Create(mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
m.EXPECT().Create(mock.Anything, mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
return cfg.Config.PluginType == uint8(cctypes.PluginTypeCCIPCommit)
})).
Return(commitOracle, nil)
m.EXPECT().Create(mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
m.EXPECT().Create(mock.Anything, mock.Anything, mock.MatchedBy(func(cfg cctypes.OCR3ConfigWithMeta) bool {
return cfg.Config.PluginType == uint8(cctypes.PluginTypeCCIPExec)
})).
Return(execOracle, nil)
Expand Down Expand Up @@ -424,7 +425,7 @@ func Test_launcher_processDiff(t *testing.T) {
homeChainReader: tt.fields.homeChainReader,
oracleCreator: tt.fields.oracleCreator,
}
err := l.processDiff(tt.args.diff)
err := l.processDiff(testutils.Context(t), tt.args.diff)
if tt.wantErr {
require.Error(t, err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/ccip/oraclecreator/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (i *bootstrapOracleCreator) Type() cctypes.OracleType {
}

// Create implements types.OracleCreator.
func (i *bootstrapOracleCreator) Create(_ uint32, config cctypes.OCR3ConfigWithMeta) (cctypes.CCIPOracle, error) {
func (i *bootstrapOracleCreator) Create(ctx context.Context, _ uint32, config cctypes.OCR3ConfigWithMeta) (cctypes.CCIPOracle, error) {
// Assuming that the chain selector is referring to an evm chain for now.
// TODO: add an api that returns chain family.
// NOTE: this doesn't really matter for the bootstrap node, it doesn't do anything on-chain.
Expand Down
18 changes: 10 additions & 8 deletions core/capabilities/ccip/oraclecreator/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (i *pluginOracleCreator) Type() cctypes.OracleType {
}

// Create implements types.OracleCreator.
func (i *pluginOracleCreator) Create(donID uint32, config cctypes.OCR3ConfigWithMeta) (cctypes.CCIPOracle, error) {
func (i *pluginOracleCreator) Create(ctx context.Context, donID uint32, config cctypes.OCR3ConfigWithMeta) (cctypes.CCIPOracle, error) {
pluginType := cctypes.PluginType(config.Config.PluginType)

// Assuming that the chain selector is referring to an evm chain for now.
Expand All @@ -137,6 +137,7 @@ func (i *pluginOracleCreator) Create(donID uint32, config cctypes.OCR3ConfigWith
}

contractReaders, chainWriters, err := i.createReadersAndWriters(
ctx,
destChainID,
pluginType,
config,
Expand Down Expand Up @@ -284,6 +285,7 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter(
}

func (i *pluginOracleCreator) createReadersAndWriters(
ctx context.Context,
destChainID uint64,
pluginType cctypes.PluginType,
config cctypes.OCR3ConfigWithMeta,
Expand Down Expand Up @@ -330,15 +332,14 @@ func (i *pluginOracleCreator) createReadersAndWriters(
return nil, nil, fmt.Errorf("failed to get chain reader config: %w", err1)
}

// TODO: context.
cr, err1 := relayer.NewContractReader(context.Background(), chainReaderConfig)
cr, err1 := relayer.NewContractReader(ctx, chainReaderConfig)
if err1 != nil {
return nil, nil, err1
}

if chainID.Uint64() == destChainID {
offrampAddressHex := common.BytesToAddress(config.Config.OfframpAddress).Hex()
err2 := cr.Bind(context.Background(), []types.BoundContract{
err2 := cr.Bind(ctx, []types.BoundContract{
{
Address: offrampAddressHex,
Name: consts.ContractNameOffRamp,
Expand All @@ -349,11 +350,12 @@ func (i *pluginOracleCreator) createReadersAndWriters(
}
}

if err2 := cr.Start(context.Background()); err2 != nil {
if err2 := cr.Start(ctx); err2 != nil {
return nil, nil, fmt.Errorf("failed to start contract reader for chain %s: %w", chainID.String(), err2)
}

cw, err1 := createChainWriter(
ctx,
chainID,
i.evmConfigs,
relayer,
Expand All @@ -363,7 +365,7 @@ func (i *pluginOracleCreator) createReadersAndWriters(
return nil, nil, err1
}

if err4 := cw.Start(context.Background()); err4 != nil {
if err4 := cw.Start(ctx); err4 != nil {
return nil, nil, fmt.Errorf("failed to start chain writer for chain %s: %w", chainID.String(), err4)
}

Expand Down Expand Up @@ -466,6 +468,7 @@ func isUSDCEnabled(chainID uint64, destChainID uint64, ofc offChainConfig) bool
}

func createChainWriter(
ctx context.Context,
chainID *big.Int,
evmConfigs toml.EVMConfigs,
relayer loop.Relayer,
Expand Down Expand Up @@ -499,8 +502,7 @@ func createChainWriter(
return nil, fmt.Errorf("failed to marshal chain writer config: %w", err)
}

// TODO: context.
cw, err := relayer.NewChainWriter(context.Background(), chainWriterConfig)
cw, err := relayer.NewChainWriter(ctx, chainWriterConfig)
if err != nil {
return nil, fmt.Errorf("failed to create chain writer for chain %s: %w", chainID.String(), err)
}
Expand Down
Loading

0 comments on commit cf797c9

Please sign in to comment.