Skip to content

Commit

Permalink
Revert "core/services: remove unused pg.QOpts from Delegate.ServicesF…
Browse files Browse the repository at this point in the history
…orSpec (#10844)" (#317)
  • Loading branch information
mateusz-sekara authored Nov 29, 2023
2 parents 9e0fa24 + 69fe72d commit 7d332c9
Show file tree
Hide file tree
Showing 14 changed files with 22 additions and 25 deletions.
2 changes: 1 addition & 1 deletion core/services/blockhashstore/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d *Delegate) JobType() job.Type {
}

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
if jb.BlockhashStoreSpec == nil {
return nil, errors.Errorf(
"blockhashstore.Delegate expects a BlockhashStoreSpec to be present, got %+v", jb)
Expand Down
2 changes: 1 addition & 1 deletion core/services/blockheaderfeeder/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (d *Delegate) JobType() job.Type {
}

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
if jb.BlockHeaderFeederSpec == nil {
return nil, errors.Errorf("Delegate expects a BlockHeaderFeederSpec to be present, got %+v", jb)
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/cron/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }

// ServicesForSpec returns the scheduler to be used for running cron jobs
func (d *Delegate) ServicesForSpec(spec job.Job) (services []job.ServiceCtx, err error) {
func (d *Delegate) ServicesForSpec(spec job.Job, qopts ...pg.QOpt) (services []job.ServiceCtx, err error) {
if spec.CronSpec == nil {
return nil, errors.Errorf("services.Delegate expects a *jobSpec.CronSpec to be present, got %v", spec)
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/directrequest/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }

// ServicesForSpec returns the log listener service for a direct request job
func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
if jb.DirectRequestSpec == nil {
return nil, errors.Errorf("DirectRequest: directrequest.Delegate expects a *job.DirectRequestSpec to be present, got %v", jb)
}
Expand Down
3 changes: 1 addition & 2 deletions core/services/fluxmonitorv2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fluxmonitorv2

import (
"github.com/pkg/errors"

"github.com/smartcontractkit/sqlx"

txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
Expand Down Expand Up @@ -60,7 +59,7 @@ func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }

// ServicesForSpec returns the flux monitor service for the job spec
func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err error) {
func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) (services []job.ServiceCtx, err error) {
if jb.FluxMonitorSpec == nil {
return nil, errors.Errorf("Delegate expects a *job.FluxMonitorSpec to be present, got %v", jb)
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/gateway/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }

// ServicesForSpec returns the scheduler to be used for running observer jobs
func (d *Delegate) ServicesForSpec(spec job.Job) (services []job.ServiceCtx, err error) {
func (d *Delegate) ServicesForSpec(spec job.Job, qopts ...pg.QOpt) (services []job.ServiceCtx, err error) {
if spec.GatewaySpec == nil {
return nil, errors.Errorf("services.Delegate expects a *jobSpec.GatewaySpec to be present, got %v", spec)
}
Expand Down
7 changes: 3 additions & 4 deletions core/services/job/spawner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"

pkgerrors "github.com/pkg/errors"

"github.com/smartcontractkit/sqlx"

relayservices "github.com/smartcontractkit/chainlink-relay/pkg/services"
Expand Down Expand Up @@ -66,7 +65,7 @@ type (
// job. In case a given job type relies upon well-defined startup/shutdown
// ordering for services, they are started in the order they are given
// and stopped in reverse order.
ServicesForSpec(spec Job) ([]ServiceCtx, error)
ServicesForSpec(spec Job, qopts ...pg.QOpt) ([]ServiceCtx, error)
AfterJobCreated(spec Job)
BeforeJobDeleted(spec Job)
// OnDeleteJob will be called from within DELETE db transaction. Any db
Expand Down Expand Up @@ -211,7 +210,7 @@ func (js *spawner) StartService(ctx context.Context, jb Job, qopts ...pg.QOpt) e
jb.PipelineSpec.GasLimit = &jb.GasLimit.Uint32
}

srvs, err := delegate.ServicesForSpec(jb)
srvs, err := delegate.ServicesForSpec(jb, qopts...)
if err != nil {
lggr.Errorw("Error creating services for job", "err", err)
cctx, cancel := js.chStop.NewCtx()
Expand Down Expand Up @@ -387,7 +386,7 @@ func (n *NullDelegate) JobType() Type {
}

// ServicesForSpec does no-op.
func (n *NullDelegate) ServicesForSpec(spec Job) (s []ServiceCtx, err error) {
func (n *NullDelegate) ServicesForSpec(spec Job, qopts ...pg.QOpt) (s []ServiceCtx, err error) {
return
}

Expand Down
2 changes: 1 addition & 1 deletion core/services/job/spawner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d delegate) JobType() job.Type {
}

// ServicesForSpec satisfies the job.Delegate interface.
func (d delegate) ServicesForSpec(js job.Job) ([]job.ServiceCtx, error) {
func (d delegate) ServicesForSpec(js job.Job, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
if js.Type != d.jobType {
return nil, nil
}
Expand Down
3 changes: 1 addition & 2 deletions core/services/keeper/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package keeper

import (
"github.com/pkg/errors"

"github.com/smartcontractkit/sqlx"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm"
Expand Down Expand Up @@ -55,7 +54,7 @@ func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(spec job.Job) (services []job.ServiceCtx, err error) {
func (d *Delegate) ServicesForSpec(spec job.Job, qopts ...pg.QOpt) (services []job.ServiceCtx, err error) {
if spec.KeeperSpec == nil {
return nil, errors.Errorf("Delegate expects a *job.KeeperSpec to be present, got %v", spec)
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }

// ServicesForSpec returns the OCR services that need to run for this job
func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err error) {
func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) (services []job.ServiceCtx, err error) {
if jb.OCROracleSpec == nil {
return nil, errors.Errorf("offchainreporting.Delegate expects an *job.OffchainreportingOracleSpec to be present, got %v", jb)
}
Expand Down
14 changes: 7 additions & 7 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (d *Delegate) cleanupEVM(jb job.Job, q pg.Queryer, relayID relay.ID) error
}

// ServicesForSpec returns the OCR2 services that need to run for this job
func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec
if spec == nil {
return nil, errors.Errorf("offchainreporting2.Delegate expects an *job.OCR2OracleSpec to be present, got %v", jb)
Expand Down Expand Up @@ -443,9 +443,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
return d.newServicesOCR2Functions(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc, ocrLogger)

case types.CCIPCommit:
return d.newServicesCCIPCommit(lggr, jb, bootstrapPeers, kb, ocrDB, lc, transmitterID)
return d.newServicesCCIPCommit(lggr, jb, bootstrapPeers, kb, ocrDB, lc, transmitterID, qopts...)
case types.CCIPExecution:
return d.newServicesCCIPExecution(lggr, jb, bootstrapPeers, kb, ocrDB, lc, transmitterID)
return d.newServicesCCIPExecution(lggr, jb, bootstrapPeers, kb, ocrDB, lc, transmitterID, qopts...)
default:
return nil, errors.Errorf("plugin type %s not supported", spec.PluginType)
}
Expand Down Expand Up @@ -1320,7 +1320,7 @@ func (d *Delegate) newServicesOCR2Functions(
return append([]job.ServiceCtx{runResultSaver, functionsProvider, thresholdProvider, s4Provider}, functionsServices...), nil
}

func (d *Delegate) newServicesCCIPCommit(lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string) ([]job.ServiceCtx, error) {
func (d *Delegate) newServicesCCIPCommit(lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec
if spec.Relay != relay.EVM {
return nil, errors.New("Non evm chains are not supported for CCIP commit")
Expand Down Expand Up @@ -1371,10 +1371,10 @@ func (d *Delegate) newServicesCCIPCommit(lggr logger.SugaredLogger, jb job.Job,
logError := func(msg string) {
lggr.ErrorIf(d.jobORM.RecordError(jb.ID, msg), "unable to record error")
}
return ccip.NewCommitServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, d.pipelineRunner, oracleArgsNoPlugin, logError)
return ccip.NewCommitServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, d.pipelineRunner, oracleArgsNoPlugin, logError, qopts...)
}

func (d *Delegate) newServicesCCIPExecution(lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string) ([]job.ServiceCtx, error) {
func (d *Delegate) newServicesCCIPExecution(lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec
if spec.Relay != relay.EVM {
return nil, errors.New("Non evm chains are not supported for CCIP execution")
Expand Down Expand Up @@ -1424,7 +1424,7 @@ func (d *Delegate) newServicesCCIPExecution(lggr logger.SugaredLogger, jb job.Jo
logError := func(msg string) {
lggr.ErrorIf(d.jobORM.RecordError(jb.ID, msg), "unable to record error")
}
return ccip.NewExecutionServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError)
return ccip.NewExecutionServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError, qopts...)
}

// errorLog implements [loop.ErrorLog]
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocrbootstrap/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (d *Delegate) BeforeJobCreated(spec job.Job) {
}

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err error) {
func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) (services []job.ServiceCtx, err error) {
spec := jb.BootstrapSpec
if spec == nil {
return nil, errors.Errorf("Bootstrap.Delegate expects an *job.BootstrapSpec to be present, got %v", jb)
Expand Down
2 changes: 1 addition & 1 deletion core/services/vrf/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (d *Delegate) BeforeJobDeleted(spec job.Job) {}
func (d *Delegate) OnDeleteJob(spec job.Job, q pg.Queryer) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
if jb.VRFSpec == nil || jb.PipelineSpec == nil {
return nil, errors.Errorf("vrf.Delegate expects a VRFSpec and PipelineSpec to be present, got %+v", jb)
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/webhook/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (d *Delegate) BeforeJobDeleted(spec job.Job) {
func (d *Delegate) OnDeleteJob(jb job.Job, q pg.Queryer) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(spec job.Job) ([]job.ServiceCtx, error) {
func (d *Delegate) ServicesForSpec(spec job.Job, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
service := &pseudoService{
spec: spec,
webhookJobRunner: d.webhookJobRunner,
Expand Down

0 comments on commit 7d332c9

Please sign in to comment.