Skip to content

Commit

Permalink
Port db fix (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
connorwstein authored Sep 13, 2023
1 parent cd7cbd3 commit 959be41
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 41 deletions.
16 changes: 15 additions & 1 deletion .github/actions/setup-postgres/wait-for-healthy-postgres.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,24 @@
RETRIES=10

until [ $RETRIES -eq 0 ]; do
if docker compose ps postgres --status running --format json | jq >/dev/null -e 'if (.[0].Health == "healthy") then true else false end'; then
DOCKER_OUTPUT=$(docker compose ps postgres --status running --format json)
JSON_TYPE=$(echo "$DOCKER_OUTPUT" | jq -r 'type')

if [ "$JSON_TYPE" == "array" ]; then
HEALTH_STATUS=$(echo "$DOCKER_OUTPUT" | jq -r '.[0].Health')
elif [ "$JSON_TYPE" == "object" ]; then
HEALTH_STATUS=$(echo "$DOCKER_OUTPUT" | jq -r '.Health')
else
HEALTH_STATUS="Unknown JSON type: $JSON_TYPE"
fi

echo "postgres health status: $HEALTH_STATUS"
if [ "$HEALTH_STATUS" == "healthy" ]; then
exit 0
fi

echo "Waiting for postgres server, $((RETRIES--)) remaining attempts..."
sleep 2
done

exit 1
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v2.5.0-ccip1.1.0
v2.5.0-ccip1.1.1
8 changes: 4 additions & 4 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,13 @@ func (d *Delegate) cleanupEVM(jb job.Job, q pg.Queryer, relayID relay.ID) error
d.lggr.Errorw("failed to derive ocr2keeper filter names from spec", "err", err, "spec", spec)
}
case job.CCIPCommit:
err = ccip.UnregisterCommitPluginLpFilters(context.Background(), q, spec, d.legacyChains)
err = ccip.UnregisterCommitPluginLpFilters(context.Background(), spec, d.legacyChains, pg.WithQueryer(q))
if err != nil {
d.lggr.Errorw("failed to unregister ccip commit plugin filters", "err", err, "spec", spec)
}
return nil
case job.CCIPExecution:
err = ccip.UnregisterExecPluginLpFilters(context.Background(), q, spec, d.legacyChains)
err = ccip.UnregisterExecPluginLpFilters(context.Background(), spec, d.legacyChains, pg.WithQueryer(q))
if err != nil {
d.lggr.Errorw("failed to unregister ccip exec plugin filters", "err", err, "spec", spec)
}
Expand Down Expand Up @@ -479,7 +479,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceC
OffchainKeyring: kb,
OnchainKeyring: kb,
}
return ccip.NewCommitServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, d.pipelineRunner, oracleArgsNoPlugin, logError)
return ccip.NewCommitServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, d.pipelineRunner, oracleArgsNoPlugin, logError, qopts...)
case job.CCIPExecution:
if spec.Relay != relay.EVM {
return nil, errors.New("Non evm chains are not supported for CCIP execution")
Expand Down Expand Up @@ -523,7 +523,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceC
OnchainKeyring: kb,
}

return ccip.NewExecutionServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError)
return ccip.NewExecutionServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError, qopts...)
case job.OCR2Functions:
const (
_ int32 = iota
Expand Down
14 changes: 7 additions & 7 deletions core/services/ocr2/plugins/ccip/commit_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
COMMIT_CCIP_SENDS = "Commit ccip sends"
)

func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec

var pluginConfig ccipconfig.CommitPluginJobSpecConfig
Expand Down Expand Up @@ -118,7 +118,7 @@ func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainC
checkFinalityTags: sourceChain.Config().EVM().FinalityTagEnabled(),
})

err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress)
err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress, qopts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func getCommitPluginDestLpFilters(priceRegistry common.Address, offRamp common.A
}

// UnregisterCommitPluginLpFilters unregisters all the registered filters for both source and dest chains.
func UnregisterCommitPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer) error {
func UnregisterCommitPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error {
if spec == nil {
return errors.New("spec is nil")
}
Expand Down Expand Up @@ -260,10 +260,10 @@ func UnregisterCommitPluginLpFilters(ctx context.Context, q pg.Queryer, spec *jo
if err != nil {
return err
}
return unregisterCommitPluginFilters(ctx, q, sourceChain.LogPoller(), destChain.LogPoller(), commitStore, common.HexToAddress(pluginConfig.OffRamp))
return unregisterCommitPluginFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), commitStore, common.HexToAddress(pluginConfig.OffRamp), qopts...)
}

func unregisterCommitPluginFilters(ctx context.Context, q pg.Queryer, sourceLP, destLP logpoller.LogPoller, destCommitStore commit_store.CommitStoreInterface, offRamp common.Address) error {
func unregisterCommitPluginFilters(ctx context.Context, sourceLP, destLP logpoller.LogPoller, destCommitStore commit_store.CommitStoreInterface, offRamp common.Address, qopts ...pg.QOpt) error {
staticCfg, err := destCommitStore.GetStaticConfig(&bind.CallOpts{Context: ctx})
if err != nil {
return err
Expand All @@ -275,16 +275,16 @@ func unregisterCommitPluginFilters(ctx context.Context, q pg.Queryer, sourceLP,
}

if err := unregisterLpFilters(
q,
sourceLP,
getCommitPluginSourceLpFilters(staticCfg.OnRamp),
qopts...,
); err != nil {
return err
}

return unregisterLpFilters(
q,
destLP,
getCommitPluginDestLpFilters(dynamicCfg.PriceRegistry, offRamp),
qopts...,
)
}
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/ccip/commit_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestGetCommitPluginFilterNamesFromSpec(t *testing.T) {
}
}

err := UnregisterCommitPluginLpFilters(context.Background(), nil, tc.spec, chainSet)
err := UnregisterCommitPluginLpFilters(context.Background(), tc.spec, chainSet)
if tc.expectingErr {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestGetCommitPluginFilterNames(t *testing.T) {
dstLP.On("UnregisterFilter", "Token pool added - 0xDAFeA492D9c6733Ae3D56b7eD1AdB60692C98BC4", mock.Anything).Return(nil)
dstLP.On("UnregisterFilter", "Token pool removed - 0xDAFeA492D9c6733Ae3D56b7eD1AdB60692C98BC4", mock.Anything).Return(nil)

err := unregisterCommitPluginFilters(context.Background(), nil, srcLP, dstLP, mockCommitStore, offRampAddr)
err := unregisterCommitPluginFilters(context.Background(), srcLP, dstLP, mockCommitStore, offRampAddr)
assert.NoError(t, err)

srcLP.AssertExpectations(t)
Expand Down
10 changes: 5 additions & 5 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,28 +200,28 @@ func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound t

// UpdateLogPollerFilters updates the log poller filters for the source and destination chains.
// pass zeroAddress if destPriceRegistry is unknown, filters with zero address are omitted.
func (rf *CommitReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address) error {
func (rf *CommitReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address, qopts ...pg.QOpt) error {
rf.filtersMu.Lock()
defer rf.filtersMu.Unlock()

// source chain filters
sourceFiltersBefore, sourceFiltersNow := rf.sourceChainFilters, getCommitPluginSourceLpFilters(rf.config.onRampAddress)
created, deleted := filtersDiff(sourceFiltersBefore, sourceFiltersNow)
if err := unregisterLpFilters(nilQueryer, rf.config.sourceLP, deleted); err != nil {
if err := unregisterLpFilters(rf.config.sourceLP, deleted, qopts...); err != nil {
return err
}
if err := registerLpFilters(nilQueryer, rf.config.sourceLP, created); err != nil {
if err := registerLpFilters(rf.config.sourceLP, created, qopts...); err != nil {
return err
}
rf.sourceChainFilters = sourceFiltersNow

// destination chain filters
destFiltersBefore, destFiltersNow := rf.destChainFilters, getCommitPluginDestLpFilters(destPriceRegistry, rf.config.offRamp.Address())
created, deleted = filtersDiff(destFiltersBefore, destFiltersNow)
if err := unregisterLpFilters(nilQueryer, rf.config.destLP, deleted); err != nil {
if err := unregisterLpFilters(rf.config.destLP, deleted, qopts...); err != nil {
return err
}
if err := registerLpFilters(nilQueryer, rf.config.destLP, created); err != nil {
if err := registerLpFilters(rf.config.destLP, created, qopts...); err != nil {
return err
}
rf.destChainFilters = destFiltersNow
Expand Down
16 changes: 8 additions & 8 deletions core/services/ocr2/plugins/ccip/execution_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
FEE_TOKEN_REMOVED = "Fee token removed"
)

func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec
var pluginConfig ccipconfig.ExecutionPluginJobSpecConfig
err := json.Unmarshal(spec.PluginConfig.Bytes(), &pluginConfig)
Expand Down Expand Up @@ -120,7 +120,7 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha
leafHasher: hasher.NewLeafHasher(offRampConfig.SourceChainSelector, offRampConfig.ChainSelector, onRamp.Address(), hasher.NewKeccakCtx()),
})

err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress)
err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress, qopts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func getExecutionPluginDestLpChainFilters(commitStore, offRamp, priceRegistry co
}

// UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains.
func UnregisterExecPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer) error {
func UnregisterExecPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error {
if spec == nil {
return errors.New("spec is nil")
}
Expand Down Expand Up @@ -256,18 +256,18 @@ func UnregisterExecPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job.
return errors.Wrap(err, "failed loading onRamp")
}

return unregisterExecutionPluginLpFilters(ctx, q, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client())
return unregisterExecutionPluginLpFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client(), qopts...)
}

func unregisterExecutionPluginLpFilters(
ctx context.Context,
q pg.Queryer,
sourceLP logpoller.LogPoller,
destLP logpoller.LogPoller,
destOffRamp evm_2_evm_offramp.EVM2EVMOffRampInterface,
destOffRampConfig evm_2_evm_offramp.EVM2EVMOffRampStaticConfig,
sourceOnRamp evm_2_evm_onramp.EVM2EVMOnRampInterface,
sourceChainClient client.Client) error {
sourceChainClient client.Client,
qopts ...pg.QOpt) error {
destOffRampDynCfg, err := destOffRamp.GetDynamicConfig(&bind.CallOpts{Context: ctx})
if err != nil {
return err
Expand All @@ -279,17 +279,17 @@ func unregisterExecutionPluginLpFilters(
}

if err := unregisterLpFilters(
q,
sourceLP,
getExecutionPluginSourceLpChainFilters(destOffRampConfig.OnRamp, onRampDynCfg.PriceRegistry),
qopts...,
); err != nil {
return err
}

return unregisterLpFilters(
q,
destLP,
getExecutionPluginDestLpChainFilters(destOffRampConfig.CommitStore, destOffRamp.Address(), destOffRampDynCfg.PriceRegistry),
qopts...,
)
}

Expand Down
3 changes: 1 addition & 2 deletions core/services/ocr2/plugins/ccip/execution_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestGetExecutionPluginFilterNamesFromSpec(t *testing.T) {
for _, tc := range testCases {
chainSet := &mocks.LegacyChainContainer{}
t.Run(tc.description, func(t *testing.T) {
err := UnregisterExecPluginLpFilters(context.Background(), nilQueryer, tc.spec, chainSet)
err := UnregisterExecPluginLpFilters(context.Background(), tc.spec, chainSet)
if tc.expectingErr {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -111,7 +111,6 @@ func TestGetExecutionPluginFilterNames(t *testing.T) {

err := unregisterExecutionPluginLpFilters(
context.Background(),
nilQueryer,
srcLP,
dstLP,
mockOffRamp,
Expand Down
10 changes: 5 additions & 5 deletions core/services/ocr2/plugins/ccip/execution_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,28 +191,28 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty

// UpdateLogPollerFilters updates the log poller filters for the source and destination chains.
// pass zeroAddress if dstPriceRegistry is unknown, filters with zero address are omitted.
func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address) error {
func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address, qopts ...pg.QOpt) error {
rf.filtersMu.Lock()
defer rf.filtersMu.Unlock()

// source chain filters
sourceFiltersBefore, sourceFiltersNow := rf.sourceChainFilters, getExecutionPluginSourceLpChainFilters(rf.config.onRamp.Address(), rf.config.sourcePriceRegistry.Address())
created, deleted := filtersDiff(sourceFiltersBefore, sourceFiltersNow)
if err := unregisterLpFilters(nilQueryer, rf.config.sourceLP, deleted); err != nil {
if err := unregisterLpFilters(rf.config.sourceLP, deleted, qopts...); err != nil {
return err
}
if err := registerLpFilters(nilQueryer, rf.config.sourceLP, created); err != nil {
if err := registerLpFilters(rf.config.sourceLP, created, qopts...); err != nil {
return err
}
rf.sourceChainFilters = sourceFiltersNow

// destination chain filters
destFiltersBefore, destFiltersNow := rf.destChainFilters, getExecutionPluginDestLpChainFilters(rf.config.commitStore.Address(), rf.config.offRamp.Address(), destPriceRegistry)
created, deleted = filtersDiff(destFiltersBefore, destFiltersNow)
if err := unregisterLpFilters(nilQueryer, rf.config.destLP, deleted); err != nil {
if err := unregisterLpFilters(rf.config.destLP, deleted, qopts...); err != nil {
return err
}
if err := registerLpFilters(nilQueryer, rf.config.destLP, created); err != nil {
if err := registerLpFilters(rf.config.destLP, created, qopts...); err != nil {
return err
}
rf.destChainFilters = destFiltersNow
Expand Down
10 changes: 4 additions & 6 deletions core/services/ocr2/plugins/ccip/plugins_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ const (

var zeroAddress = common.HexToAddress("0")

var nilQueryer pg.Queryer

var ErrCommitStoreIsDown = errors.New("commitStore is down")

func LoadOnRamp(onRampAddress common.Address, pluginName string, client client.Client) (evm_2_evm_onramp.EVM2EVMOnRampInterface, error) {
Expand Down Expand Up @@ -174,24 +172,24 @@ func filterContainsZeroAddress(addrs []common.Address) bool {
return false
}

func registerLpFilters(q pg.Queryer, lp logpoller.LogPoller, filters []logpoller.Filter) error {
func registerLpFilters(lp logpoller.LogPoller, filters []logpoller.Filter, qopts ...pg.QOpt) error {
for _, lpFilter := range filters {
if filterContainsZeroAddress(lpFilter.Addresses) {
continue
}
if err := lp.RegisterFilter(lpFilter); err != nil {
if err := lp.RegisterFilter(lpFilter, qopts...); err != nil {
return err
}
}
return nil
}

func unregisterLpFilters(q pg.Queryer, lp logpoller.LogPoller, filters []logpoller.Filter) error {
func unregisterLpFilters(lp logpoller.LogPoller, filters []logpoller.Filter, qopts ...pg.QOpt) error {
for _, lpFilter := range filters {
if filterContainsZeroAddress(lpFilter.Addresses) {
continue
}
if err := lp.UnregisterFilter(lpFilter.Name, pg.WithQueryer(q)); err != nil {
if err := lp.UnregisterFilter(lpFilter.Name, qopts...); err != nil {
return err
}
}
Expand Down

0 comments on commit 959be41

Please sign in to comment.