diff --git a/.mockery.yaml b/.mockery.yaml index 70b7a9947f6..711d70f59e9 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -579,21 +579,6 @@ packages: github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer: interfaces: ORM: - github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer: - interfaces: - ORM: - ContractReader: - config: - mockname: "Mock{{ .InterfaceName }}" - filename: contract_reader_mock.go - inpackage: true - dir: "{{ .InterfaceDir }}" - Handler: - config: - mockname: "Mock{{ .InterfaceName }}" - filename: handler_mock.go - inpackage: true - dir: "{{ .InterfaceDir }}" github.com/smartcontractkit/chainlink/v2/core/capabilities/targets: interfaces: ContractValueGetter: \ No newline at end of file diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index fef741c8c9b..abbe9dad9ab 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -215,7 +215,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { // TODO: wire this up to config so we only instantiate it // if a workflow registry address is provided. - workflowRegistrySyncer := syncer.NewNullWorkflowRegistrySyncer() + workflowRegistrySyncer := syncer.NewWorkflowRegistry() srvcs = append(srvcs, workflowRegistrySyncer) var externalPeerWrapper p2ptypes.PeerWrapper diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index fd54a39d431..9db99fcd48d 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -45,7 +45,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs" "github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight" ) @@ -1874,7 +1873,6 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { c.ID = s.ID c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow99", addr1) // insert with mismatched name c.SpecType = job.YamlSpec - c.SecretsID = s.SecretsID return mustInsertWFJob(t, o, &c) }, }, @@ -1894,7 +1892,6 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { var c job.WorkflowSpec c.ID = s.ID c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow03", addr2) // insert with mismatched owner - c.SecretsID = s.SecretsID return mustInsertWFJob(t, o, &c) }, }, @@ -1902,32 +1899,22 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { }, } - for i, tt := range tests { + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx := testutils.Context(t) ks := cltest.NewKeyStore(t, tt.fields.ds) - - secretsORM := syncer.NewWorkflowRegistryDS(tt.fields.ds, logger.TestLogger(t)) - - sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz") - require.NoError(t, err) - tt.args.spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true} - pipelineORM := pipeline.NewORM(tt.fields.ds, logger.TestLogger(t), configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns()) bridgesORM := bridges.NewORM(tt.fields.ds) o := NewTestORM(t, tt.fields.ds, pipelineORM, bridgesORM, ks) - var wantJobID int32 if tt.args.before != nil { wantJobID = tt.args.before(t, o, tt.args.spec) } - + ctx := testutils.Context(t) gotJ, err := o.FindJobIDByWorkflow(ctx, *tt.args.spec) if (err != nil) != tt.wantErr { t.Errorf("orm.FindJobByWorkflow() error = %v, wantErr %v", err, tt.wantErr) return } - if err == nil { assert.Equal(t, wantJobID, gotJ, "mismatch job id") } @@ -1949,36 +1936,25 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) { bridges.NewORM(db), cltest.NewKeyStore(t, db)) ctx := testutils.Context(t) - secretsORM := syncer.NewWorkflowRegistryDS(db, logger.TestLogger(t)) - - var sids []int64 - for i := 0; i < 3; i++ { - sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz") - require.NoError(t, err) - sids = append(sids, sid) - } wfYaml1 := pkgworkflows.WFYamlSpec(t, "workflow00", addr1) s1 := job.WorkflowSpec{ - Workflow: wfYaml1, - SpecType: job.YamlSpec, - SecretsID: sql.NullInt64{Int64: sids[0], Valid: true}, + Workflow: wfYaml1, + SpecType: job.YamlSpec, } wantJobID1 := mustInsertWFJob(t, o, &s1) wfYaml2 := pkgworkflows.WFYamlSpec(t, "workflow01", addr1) s2 := job.WorkflowSpec{ - Workflow: wfYaml2, - SpecType: job.YamlSpec, - SecretsID: sql.NullInt64{Int64: sids[1], Valid: true}, + Workflow: wfYaml2, + SpecType: job.YamlSpec, } wantJobID2 := mustInsertWFJob(t, o, &s2) wfYaml3 := pkgworkflows.WFYamlSpec(t, "workflow00", addr2) s3 := job.WorkflowSpec{ - Workflow: wfYaml3, - SpecType: job.YamlSpec, - SecretsID: sql.NullInt64{Int64: sids[2], Valid: true}, + Workflow: wfYaml3, + SpecType: job.YamlSpec, } wantJobID3 := mustInsertWFJob(t, o, &s3) @@ -2016,7 +1992,7 @@ func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 { } err = orm.CreateJob(ctx, &j) - require.NoError(t, err, "failed to insert job with wf spec %+v %s", s, err) + require.NoError(t, err, "failed to insert job with wf spec %v %s", s, s.Workflow) return j.ID } diff --git a/core/services/job/models.go b/core/services/job/models.go index 423a297c8da..231bf10fda0 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -2,7 +2,6 @@ package job import ( "context" - "database/sql" "database/sql/driver" "encoding/json" "fmt" @@ -878,9 +877,6 @@ type WorkflowSpec struct { WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow. WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow. WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. - BinaryURL string `db:"binary_url"` - ConfigURL string `db:"config_url"` - SecretsID sql.NullInt64 `db:"secrets_id"` CreatedAt time.Time `toml:"-"` UpdatedAt time.Time `toml:"-"` SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 92ec9b2e83c..5e8b5ce127f 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -433,8 +433,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error { case Stream: // 'stream' type has no associated spec, nothing to do here case Workflow: - sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, binary_url, config_url, secrets_id, created_at, updated_at, spec_type, config) - VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, :binary_url, :config_url, :secrets_id, NOW(), NOW(), :spec_type, :config) + sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type, config) + VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type, :config) RETURNING id;` specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec) if err != nil { diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go deleted file mode 100644 index 802dc427c93..00000000000 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ /dev/null @@ -1,218 +0,0 @@ -package workflow_registry_syncer_test - -import ( - "context" - "crypto/rand" - "encoding/hex" - "encoding/json" - "testing" - "time" - - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - - "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" - "github.com/smartcontractkit/chainlink-common/pkg/types" - "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" - coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - - "github.com/stretchr/testify/require" -) - -func Test_SecretsWorker(t *testing.T) { - var ( - ctx = coretestutils.Context(t) - lggr = logger.TestLogger(t) - backendTH = testutils.NewEVMBackendTH(t) - db = pgtest.NewSqlxDB(t) - orm = syncer.NewWorkflowRegistryDS(db, lggr) - - giveTicker = time.NewTicker(500 * time.Millisecond) - giveSecretsURL = "https://original-url.com" - donID = uint32(1) - giveWorkflow = RegisterWorkflowCMD{ - Name: "test-wf", - DonID: donID, - Status: uint8(1), - SecretsURL: giveSecretsURL, - } - giveContents = "contents" - wantContents = "updated contents" - fetcherFn = func(_ context.Context, _ string) ([]byte, error) { - return []byte(wantContents), nil - } - contractName = syncer.ContractName - forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent) - ) - - defer giveTicker.Stop() - - // fill ID with randomd data - var giveID [32]byte - _, err := rand.Read((giveID)[:]) - require.NoError(t, err) - giveWorkflow.ID = giveID - - // Deploy a test workflow_registry - wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client()) - backendTH.Backend.Commit() - require.NoError(t, err) - - lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex()) - - // Build the ContractReader config - contractReaderCfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - contractName: { - ContractPollingFilter: evmtypes.ContractPollingFilter{ - GenericEventNames: []string{forceUpdateSecretsEvent}, - }, - ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - forceUpdateSecretsEvent: { - ChainSpecificName: forceUpdateSecretsEvent, - ReadType: evmtypes.Event, - }, - }, - }, - }, - } - - contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) - require.NoError(t, err) - - contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes) - require.NoError(t, err) - - err = contractReader.Bind(ctx, []types.BoundContract{{Name: contractName, Address: wfRegistryAddr.Hex()}}) - require.NoError(t, err) - - // Seed the DB - hash, err := crypto.Keccak256(append(backendTH.ContractsOwner.From[:], []byte(giveSecretsURL)...)) - require.NoError(t, err) - giveHash := hex.EncodeToString(hash) - - gotID, err := orm.Create(ctx, giveSecretsURL, giveHash, giveContents) - require.NoError(t, err) - - gotSecretsURL, err := orm.GetSecretsURLByID(ctx, gotID) - require.NoError(t, err) - require.Equal(t, giveSecretsURL, gotSecretsURL) - - // verify the DB - contents, err := orm.GetContents(ctx, giveSecretsURL) - require.NoError(t, err) - require.Equal(t, contents, giveContents) - - // Create the worker - worker := syncer.NewWorkflowRegistry( - lggr, - orm, - contractReader, - fetcherFn, - wfRegistryAddr.Hex(), - syncer.WithTicker(giveTicker.C), - ) - - servicetest.Run(t, worker) - - // setup contract state to allow the secrets to be updated - updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) - updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) - registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow) - - // generate a log event - requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL) - - // Require the secrets contents to eventually be updated - require.Eventually(t, func() bool { - secrets, err := orm.GetContents(ctx, giveSecretsURL) - lggr.Debugf("got secrets %v", secrets) - require.NoError(t, err) - return secrets == wantContents - }, 5*time.Second, time.Second) -} - -func updateAuthorizedAddress( - t *testing.T, - th *testutils.EVMBackendTH, - wfRegC *workflow_registry_wrapper.WorkflowRegistry, - addresses []common.Address, - _ bool, -) { - t.Helper() - _, err := wfRegC.UpdateAuthorizedAddresses(th.ContractsOwner, addresses, true) - require.NoError(t, err, "failed to update authorised addresses") - th.Backend.Commit() - th.Backend.Commit() - th.Backend.Commit() - gotAddresses, err := wfRegC.GetAllAuthorizedAddresses(&bind.CallOpts{ - From: th.ContractsOwner.From, - }) - require.NoError(t, err) - require.ElementsMatch(t, addresses, gotAddresses) -} - -func updateAllowedDONs( - t *testing.T, - th *testutils.EVMBackendTH, - wfRegC *workflow_registry_wrapper.WorkflowRegistry, - donIDs []uint32, - allowed bool, -) { - t.Helper() - _, err := wfRegC.UpdateAllowedDONs(th.ContractsOwner, donIDs, allowed) - require.NoError(t, err, "failed to update DONs") - th.Backend.Commit() - th.Backend.Commit() - th.Backend.Commit() - gotDons, err := wfRegC.GetAllAllowedDONs(&bind.CallOpts{ - From: th.ContractsOwner.From, - }) - require.NoError(t, err) - require.ElementsMatch(t, donIDs, gotDons) -} - -type RegisterWorkflowCMD struct { - Name string - ID [32]byte - DonID uint32 - Status uint8 - BinaryURL string - ConfigURL string - SecretsURL string -} - -func registerWorkflow( - t *testing.T, - th *testutils.EVMBackendTH, - wfRegC *workflow_registry_wrapper.WorkflowRegistry, - input RegisterWorkflowCMD, -) { - t.Helper() - _, err := wfRegC.RegisterWorkflow(th.ContractsOwner, input.Name, input.ID, input.DonID, - input.Status, input.BinaryURL, input.ConfigURL, input.SecretsURL) - require.NoError(t, err, "failed to register workflow") - th.Backend.Commit() - th.Backend.Commit() - th.Backend.Commit() -} - -func requestForceUpdateSecrets( - t *testing.T, - th *testutils.EVMBackendTH, - wfRegC *workflow_registry_wrapper.WorkflowRegistry, - secretsURL string, -) { - _, err := wfRegC.RequestForceUpdateSecrets(th.ContractsOwner, secretsURL) - require.NoError(t, err) - th.Backend.Commit() - th.Backend.Commit() - th.Backend.Commit() -} diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 69655b5b39c..6de761f0841 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 { } type secretsFetcher interface { - SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) + SecretsFor(workflowOwner, workflowName string) (map[string]string, error) } // Engine handles the lifecycle of a single workflow and its executions. @@ -850,7 +850,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value // registry (for capability-level configuration). It doesn't perform any caching of the config values, since // the two registries perform their own caching. func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) { - secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name) + secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.name) if err != nil { return nil, fmt.Errorf("failed to fetch secrets: %w", err) } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 70216ac8c78..f89f82e9486 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -153,7 +153,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string, type mockSecretsFetcher struct{} -func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { +func (s mockSecretsFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) { return map[string]string{}, nil } @@ -1606,7 +1606,7 @@ type mockFetcher struct { retval map[string]string } -func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { +func (m *mockFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) { return m.retval, nil } diff --git a/core/services/workflows/syncer/contract_reader_mock.go b/core/services/workflows/syncer/contract_reader_mock.go deleted file mode 100644 index 61f59fa4e69..00000000000 --- a/core/services/workflows/syncer/contract_reader_mock.go +++ /dev/null @@ -1,148 +0,0 @@ -// Code generated by mockery v2.46.3. DO NOT EDIT. - -package syncer - -import ( - context "context" - - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" - mock "github.com/stretchr/testify/mock" - - types "github.com/smartcontractkit/chainlink-common/pkg/types" -) - -// MockContractReader is an autogenerated mock type for the ContractReader type -type MockContractReader struct { - mock.Mock -} - -type MockContractReader_Expecter struct { - mock *mock.Mock -} - -func (_m *MockContractReader) EXPECT() *MockContractReader_Expecter { - return &MockContractReader_Expecter{mock: &_m.Mock} -} - -// Bind provides a mock function with given fields: _a0, _a1 -func (_m *MockContractReader) Bind(_a0 context.Context, _a1 []types.BoundContract) error { - ret := _m.Called(_a0, _a1) - - if len(ret) == 0 { - panic("no return value specified for Bind") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []types.BoundContract) error); ok { - r0 = rf(_a0, _a1) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockContractReader_Bind_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Bind' -type MockContractReader_Bind_Call struct { - *mock.Call -} - -// Bind is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 []types.BoundContract -func (_e *MockContractReader_Expecter) Bind(_a0 interface{}, _a1 interface{}) *MockContractReader_Bind_Call { - return &MockContractReader_Bind_Call{Call: _e.mock.On("Bind", _a0, _a1)} -} - -func (_c *MockContractReader_Bind_Call) Run(run func(_a0 context.Context, _a1 []types.BoundContract)) *MockContractReader_Bind_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]types.BoundContract)) - }) - return _c -} - -func (_c *MockContractReader_Bind_Call) Return(_a0 error) *MockContractReader_Bind_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockContractReader_Bind_Call) RunAndReturn(run func(context.Context, []types.BoundContract) error) *MockContractReader_Bind_Call { - _c.Call.Return(run) - return _c -} - -// QueryKey provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 -func (_m *MockContractReader) QueryKey(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any) ([]types.Sequence, error) { - ret := _m.Called(_a0, _a1, _a2, _a3, _a4) - - if len(ret) == 0 { - panic("no return value specified for QueryKey") - } - - var r0 []types.Sequence - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error)); ok { - return rf(_a0, _a1, _a2, _a3, _a4) - } - if rf, ok := ret.Get(0).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) []types.Sequence); ok { - r0 = rf(_a0, _a1, _a2, _a3, _a4) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.Sequence) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) error); ok { - r1 = rf(_a0, _a1, _a2, _a3, _a4) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockContractReader_QueryKey_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryKey' -type MockContractReader_QueryKey_Call struct { - *mock.Call -} - -// QueryKey is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 types.BoundContract -// - _a2 query.KeyFilter -// - _a3 query.LimitAndSort -// - _a4 any -func (_e *MockContractReader_Expecter) QueryKey(_a0 interface{}, _a1 interface{}, _a2 interface{}, _a3 interface{}, _a4 interface{}) *MockContractReader_QueryKey_Call { - return &MockContractReader_QueryKey_Call{Call: _e.mock.On("QueryKey", _a0, _a1, _a2, _a3, _a4)} -} - -func (_c *MockContractReader_QueryKey_Call) Run(run func(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any)) *MockContractReader_QueryKey_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(types.BoundContract), args[2].(query.KeyFilter), args[3].(query.LimitAndSort), args[4].(any)) - }) - return _c -} - -func (_c *MockContractReader_QueryKey_Call) Return(_a0 []types.Sequence, _a1 error) *MockContractReader_QueryKey_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockContractReader_QueryKey_Call) RunAndReturn(run func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error)) *MockContractReader_QueryKey_Call { - _c.Call.Return(run) - return _c -} - -// NewMockContractReader creates a new instance of MockContractReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockContractReader(t interface { - mock.TestingT - Cleanup(func()) -}) *MockContractReader { - mock := &MockContractReader{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go deleted file mode 100644 index 0ba789b3bd3..00000000000 --- a/core/services/workflows/syncer/handler.go +++ /dev/null @@ -1,72 +0,0 @@ -package syncer - -import ( - "context" - "encoding/hex" - "fmt" - - "github.com/smartcontractkit/chainlink/v2/core/logger" -) - -// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding -// method that handles the event. -type eventHandler struct { - lggr logger.Logger - orm ORM - fetcher FetcherFunc -} - -// newEventHandler returns a new eventHandler instance. -func newEventHandler( - lggr logger.Logger, - orm ORM, - gateway FetcherFunc, -) *eventHandler { - return &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: gateway, - } -} - -func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error { - switch event.EventType { - case ForceUpdateSecretsEvent: - return h.forceUpdateSecretsEvent(ctx, event) - default: - return fmt.Errorf("event type unsupported: %v", event.EventType) - } -} - -// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type. -func (h *eventHandler) forceUpdateSecretsEvent( - ctx context.Context, - event WorkflowRegistryEvent, -) error { - // Get the URL of the secrets file from the event data - data, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1) - if !ok { - return fmt.Errorf("invalid data type %T for event", event.Data) - } - - hash := hex.EncodeToString(data.SecretsURLHash) - - url, err := h.orm.GetSecretsURLByHash(ctx, hash) - if err != nil { - h.lggr.Errorf("failed to get URL by hash %s : %s", hash, err) - return err - } - - // Fetch the contents of the secrets file from the url via the fetcher - secrets, err := h.fetcher(ctx, url) - if err != nil { - return err - } - - // Update the secrets in the ORM - if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil { - return err - } - - return nil -} diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go deleted file mode 100644 index dcdea28eda4..00000000000 --- a/core/services/workflows/syncer/handler_test.go +++ /dev/null @@ -1,136 +0,0 @@ -package syncer - -import ( - "context" - "encoding/hex" - "testing" - - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/mocks" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - "github.com/smartcontractkit/chainlink/v2/core/utils/matches" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func Test_Handler(t *testing.T) { - lggr := logger.TestLogger(t) - t.Run("success", func(t *testing.T) { - mockORM := mocks.NewORM(t) - ctx := testutils.Context(t) - giveURL := "https://original-url.com" - giveBytes, err := crypto.Keccak256([]byte(giveURL)) - require.NoError(t, err) - - giveHash := hex.EncodeToString(giveBytes) - - giveEvent := WorkflowRegistryEvent{ - EventType: ForceUpdateSecretsEvent, - Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ - SecretsURLHash: giveBytes, - }, - } - - fetcher := func(_ context.Context, _ string) ([]byte, error) { - return []byte("contents"), nil - } - mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := newEventHandler(lggr, mockORM, fetcher) - err = h.Handle(ctx, giveEvent) - require.NoError(t, err) - }) - - t.Run("fails with unsupported event type", func(t *testing.T) { - mockORM := mocks.NewORM(t) - ctx := testutils.Context(t) - - giveEvent := WorkflowRegistryEvent{} - fetcher := func(_ context.Context, _ string) ([]byte, error) { - return []byte("contents"), nil - } - - h := newEventHandler(lggr, mockORM, fetcher) - err := h.Handle(ctx, giveEvent) - require.Error(t, err) - require.Contains(t, err.Error(), "event type unsupported") - }) - - t.Run("fails to get secrets url", func(t *testing.T) { - mockORM := mocks.NewORM(t) - ctx := testutils.Context(t) - h := newEventHandler(lggr, mockORM, nil) - giveURL := "https://original-url.com" - giveBytes, err := crypto.Keccak256([]byte(giveURL)) - require.NoError(t, err) - - giveHash := hex.EncodeToString(giveBytes) - - giveEvent := WorkflowRegistryEvent{ - EventType: ForceUpdateSecretsEvent, - Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ - SecretsURLHash: giveBytes, - }, - } - mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return("", assert.AnError) - err = h.Handle(ctx, giveEvent) - require.Error(t, err) - require.ErrorContains(t, err, assert.AnError.Error()) - }) - - t.Run("fails to fetch contents", func(t *testing.T) { - mockORM := mocks.NewORM(t) - ctx := testutils.Context(t) - giveURL := "http://example.com" - - giveBytes, err := crypto.Keccak256([]byte(giveURL)) - require.NoError(t, err) - - giveHash := hex.EncodeToString(giveBytes) - - giveEvent := WorkflowRegistryEvent{ - EventType: ForceUpdateSecretsEvent, - Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ - SecretsURLHash: giveBytes, - }, - } - - fetcher := func(_ context.Context, _ string) ([]byte, error) { - return nil, assert.AnError - } - mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := newEventHandler(lggr, mockORM, fetcher) - err = h.Handle(ctx, giveEvent) - require.Error(t, err) - require.ErrorIs(t, err, assert.AnError) - }) - - t.Run("fails to update secrets", func(t *testing.T) { - mockORM := mocks.NewORM(t) - ctx := testutils.Context(t) - giveURL := "http://example.com" - giveBytes, err := crypto.Keccak256([]byte(giveURL)) - require.NoError(t, err) - - giveHash := hex.EncodeToString(giveBytes) - - giveEvent := WorkflowRegistryEvent{ - EventType: ForceUpdateSecretsEvent, - Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ - SecretsURLHash: giveBytes, - }, - } - - fetcher := func(_ context.Context, _ string) ([]byte, error) { - return []byte("contents"), nil - } - mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := newEventHandler(lggr, mockORM, fetcher) - err = h.Handle(ctx, giveEvent) - require.Error(t, err) - require.ErrorIs(t, err, assert.AnError) - }) -} diff --git a/core/services/workflows/syncer/heap.go b/core/services/workflows/syncer/heap.go deleted file mode 100644 index 061293928a3..00000000000 --- a/core/services/workflows/syncer/heap.go +++ /dev/null @@ -1,63 +0,0 @@ -package syncer - -import "container/heap" - -type Heap interface { - // Push adds a new item to the heap. - Push(x WorkflowRegistryEventResponse) - - // Pop removes the smallest item from the heap and returns it. - Pop() WorkflowRegistryEventResponse - - // Len returns the number of items in the heap. - Len() int -} - -// publicHeap is a wrapper around the heap.Interface that exposes the Push and Pop methods. -type publicHeap[T any] struct { - heap heap.Interface -} - -func (h *publicHeap[T]) Push(x T) { - heap.Push(h.heap, x) -} - -func (h *publicHeap[T]) Pop() T { - return heap.Pop(h.heap).(T) -} - -func (h *publicHeap[T]) Len() int { - return h.heap.Len() -} - -// blockHeightHeap is a heap.Interface that sorts WorkflowRegistryEventResponses by block height. -type blockHeightHeap []WorkflowRegistryEventResponse - -// newBlockHeightHeap returns an initialized heap that sorts WorkflowRegistryEventResponses by block height. -func newBlockHeightHeap() Heap { - h := blockHeightHeap(make([]WorkflowRegistryEventResponse, 0)) - heap.Init(&h) - return &publicHeap[WorkflowRegistryEventResponse]{heap: &h} -} - -func (h *blockHeightHeap) Len() int { return len(*h) } - -func (h *blockHeightHeap) Less(i, j int) bool { - return (*h)[i].Event.Head.Height < (*h)[j].Event.Head.Height -} - -func (h *blockHeightHeap) Swap(i, j int) { - (*h)[i], (*h)[j] = (*h)[j], (*h)[i] -} - -func (h *blockHeightHeap) Push(x any) { - *h = append(*h, x.(WorkflowRegistryEventResponse)) -} - -func (h *blockHeightHeap) Pop() any { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go deleted file mode 100644 index b3d82c2067d..00000000000 --- a/core/services/workflows/syncer/mocks/orm.go +++ /dev/null @@ -1,440 +0,0 @@ -// Code generated by mockery v2.46.3. DO NOT EDIT. - -package mocks - -import ( - context "context" - - mock "github.com/stretchr/testify/mock" -) - -// ORM is an autogenerated mock type for the ORM type -type ORM struct { - mock.Mock -} - -type ORM_Expecter struct { - mock *mock.Mock -} - -func (_m *ORM) EXPECT() *ORM_Expecter { - return &ORM_Expecter{mock: &_m.Mock} -} - -// Create provides a mock function with given fields: ctx, secretsURL, hash, contents -func (_m *ORM) Create(ctx context.Context, secretsURL string, hash string, contents string) (int64, error) { - ret := _m.Called(ctx, secretsURL, hash, contents) - - if len(ret) == 0 { - panic("no return value specified for Create") - } - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, string) (int64, error)); ok { - return rf(ctx, secretsURL, hash, contents) - } - if rf, ok := ret.Get(0).(func(context.Context, string, string, string) int64); ok { - r0 = rf(ctx, secretsURL, hash, contents) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context, string, string, string) error); ok { - r1 = rf(ctx, secretsURL, hash, contents) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_Create_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Create' -type ORM_Create_Call struct { - *mock.Call -} - -// Create is a helper method to define mock.On call -// - ctx context.Context -// - secretsURL string -// - hash string -// - contents string -func (_e *ORM_Expecter) Create(ctx interface{}, secretsURL interface{}, hash interface{}, contents interface{}) *ORM_Create_Call { - return &ORM_Create_Call{Call: _e.mock.On("Create", ctx, secretsURL, hash, contents)} -} - -func (_c *ORM_Create_Call) Run(run func(ctx context.Context, secretsURL string, hash string, contents string)) *ORM_Create_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string)) - }) - return _c -} - -func (_c *ORM_Create_Call) Return(_a0 int64, _a1 error) *ORM_Create_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_Create_Call) RunAndReturn(run func(context.Context, string, string, string) (int64, error)) *ORM_Create_Call { - _c.Call.Return(run) - return _c -} - -// GetContents provides a mock function with given fields: ctx, url -func (_m *ORM) GetContents(ctx context.Context, url string) (string, error) { - ret := _m.Called(ctx, url) - - if len(ret) == 0 { - panic("no return value specified for GetContents") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { - return rf(ctx, url) - } - if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { - r0 = rf(ctx, url) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, url) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetContents_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetContents' -type ORM_GetContents_Call struct { - *mock.Call -} - -// GetContents is a helper method to define mock.On call -// - ctx context.Context -// - url string -func (_e *ORM_Expecter) GetContents(ctx interface{}, url interface{}) *ORM_GetContents_Call { - return &ORM_GetContents_Call{Call: _e.mock.On("GetContents", ctx, url)} -} - -func (_c *ORM_GetContents_Call) Run(run func(ctx context.Context, url string)) *ORM_GetContents_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *ORM_GetContents_Call) Return(_a0 string, _a1 error) *ORM_GetContents_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetContents_Call) RunAndReturn(run func(context.Context, string) (string, error)) *ORM_GetContents_Call { - _c.Call.Return(run) - return _c -} - -// GetContentsByHash provides a mock function with given fields: ctx, hash -func (_m *ORM) GetContentsByHash(ctx context.Context, hash string) (string, error) { - ret := _m.Called(ctx, hash) - - if len(ret) == 0 { - panic("no return value specified for GetContentsByHash") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { - return rf(ctx, hash) - } - if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { - r0 = rf(ctx, hash) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, hash) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetContentsByHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetContentsByHash' -type ORM_GetContentsByHash_Call struct { - *mock.Call -} - -// GetContentsByHash is a helper method to define mock.On call -// - ctx context.Context -// - hash string -func (_e *ORM_Expecter) GetContentsByHash(ctx interface{}, hash interface{}) *ORM_GetContentsByHash_Call { - return &ORM_GetContentsByHash_Call{Call: _e.mock.On("GetContentsByHash", ctx, hash)} -} - -func (_c *ORM_GetContentsByHash_Call) Run(run func(ctx context.Context, hash string)) *ORM_GetContentsByHash_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *ORM_GetContentsByHash_Call) Return(_a0 string, _a1 error) *ORM_GetContentsByHash_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetContentsByHash_Call) RunAndReturn(run func(context.Context, string) (string, error)) *ORM_GetContentsByHash_Call { - _c.Call.Return(run) - return _c -} - -// GetSecretsURLByHash provides a mock function with given fields: ctx, hash -func (_m *ORM) GetSecretsURLByHash(ctx context.Context, hash string) (string, error) { - ret := _m.Called(ctx, hash) - - if len(ret) == 0 { - panic("no return value specified for GetSecretsURLByHash") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { - return rf(ctx, hash) - } - if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { - r0 = rf(ctx, hash) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, hash) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetSecretsURLByHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSecretsURLByHash' -type ORM_GetSecretsURLByHash_Call struct { - *mock.Call -} - -// GetSecretsURLByHash is a helper method to define mock.On call -// - ctx context.Context -// - hash string -func (_e *ORM_Expecter) GetSecretsURLByHash(ctx interface{}, hash interface{}) *ORM_GetSecretsURLByHash_Call { - return &ORM_GetSecretsURLByHash_Call{Call: _e.mock.On("GetSecretsURLByHash", ctx, hash)} -} - -func (_c *ORM_GetSecretsURLByHash_Call) Run(run func(ctx context.Context, hash string)) *ORM_GetSecretsURLByHash_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *ORM_GetSecretsURLByHash_Call) Return(_a0 string, _a1 error) *ORM_GetSecretsURLByHash_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetSecretsURLByHash_Call) RunAndReturn(run func(context.Context, string) (string, error)) *ORM_GetSecretsURLByHash_Call { - _c.Call.Return(run) - return _c -} - -// GetSecretsURLByID provides a mock function with given fields: ctx, id -func (_m *ORM) GetSecretsURLByID(ctx context.Context, id int64) (string, error) { - ret := _m.Called(ctx, id) - - if len(ret) == 0 { - panic("no return value specified for GetSecretsURLByID") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64) (string, error)); ok { - return rf(ctx, id) - } - if rf, ok := ret.Get(0).(func(context.Context, int64) string); ok { - r0 = rf(ctx, id) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { - r1 = rf(ctx, id) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetSecretsURLByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSecretsURLByID' -type ORM_GetSecretsURLByID_Call struct { - *mock.Call -} - -// GetSecretsURLByID is a helper method to define mock.On call -// - ctx context.Context -// - id int64 -func (_e *ORM_Expecter) GetSecretsURLByID(ctx interface{}, id interface{}) *ORM_GetSecretsURLByID_Call { - return &ORM_GetSecretsURLByID_Call{Call: _e.mock.On("GetSecretsURLByID", ctx, id)} -} - -func (_c *ORM_GetSecretsURLByID_Call) Run(run func(ctx context.Context, id int64)) *ORM_GetSecretsURLByID_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64)) - }) - return _c -} - -func (_c *ORM_GetSecretsURLByID_Call) Return(_a0 string, _a1 error) *ORM_GetSecretsURLByID_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetSecretsURLByID_Call) RunAndReturn(run func(context.Context, int64) (string, error)) *ORM_GetSecretsURLByID_Call { - _c.Call.Return(run) - return _c -} - -// GetSecretsURLHash provides a mock function with given fields: owner, secretsURL -func (_m *ORM) GetSecretsURLHash(owner []byte, secretsURL []byte) ([]byte, error) { - ret := _m.Called(owner, secretsURL) - - if len(ret) == 0 { - panic("no return value specified for GetSecretsURLHash") - } - - var r0 []byte - var r1 error - if rf, ok := ret.Get(0).(func([]byte, []byte) ([]byte, error)); ok { - return rf(owner, secretsURL) - } - if rf, ok := ret.Get(0).(func([]byte, []byte) []byte); ok { - r0 = rf(owner, secretsURL) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]byte) - } - } - - if rf, ok := ret.Get(1).(func([]byte, []byte) error); ok { - r1 = rf(owner, secretsURL) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetSecretsURLHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSecretsURLHash' -type ORM_GetSecretsURLHash_Call struct { - *mock.Call -} - -// GetSecretsURLHash is a helper method to define mock.On call -// - owner []byte -// - secretsURL []byte -func (_e *ORM_Expecter) GetSecretsURLHash(owner interface{}, secretsURL interface{}) *ORM_GetSecretsURLHash_Call { - return &ORM_GetSecretsURLHash_Call{Call: _e.mock.On("GetSecretsURLHash", owner, secretsURL)} -} - -func (_c *ORM_GetSecretsURLHash_Call) Run(run func(owner []byte, secretsURL []byte)) *ORM_GetSecretsURLHash_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].([]byte), args[1].([]byte)) - }) - return _c -} - -func (_c *ORM_GetSecretsURLHash_Call) Return(_a0 []byte, _a1 error) *ORM_GetSecretsURLHash_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetSecretsURLHash_Call) RunAndReturn(run func([]byte, []byte) ([]byte, error)) *ORM_GetSecretsURLHash_Call { - _c.Call.Return(run) - return _c -} - -// Update provides a mock function with given fields: ctx, secretsURL, contents -func (_m *ORM) Update(ctx context.Context, secretsURL string, contents string) (int64, error) { - ret := _m.Called(ctx, secretsURL, contents) - - if len(ret) == 0 { - panic("no return value specified for Update") - } - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) (int64, error)); ok { - return rf(ctx, secretsURL, contents) - } - if rf, ok := ret.Get(0).(func(context.Context, string, string) int64); ok { - r0 = rf(ctx, secretsURL, contents) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, secretsURL, contents) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_Update_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Update' -type ORM_Update_Call struct { - *mock.Call -} - -// Update is a helper method to define mock.On call -// - ctx context.Context -// - secretsURL string -// - contents string -func (_e *ORM_Expecter) Update(ctx interface{}, secretsURL interface{}, contents interface{}) *ORM_Update_Call { - return &ORM_Update_Call{Call: _e.mock.On("Update", ctx, secretsURL, contents)} -} - -func (_c *ORM_Update_Call) Run(run func(ctx context.Context, secretsURL string, contents string)) *ORM_Update_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *ORM_Update_Call) Return(_a0 int64, _a1 error) *ORM_Update_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_Update_Call) RunAndReturn(run func(context.Context, string, string) (int64, error)) *ORM_Update_Call { - _c.Call.Return(run) - return _c -} - -// NewORM creates a new instance of ORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewORM(t interface { - mock.TestingT - Cleanup(func()) -}) *ORM { - mock := &ORM{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go deleted file mode 100644 index a10eb708ddf..00000000000 --- a/core/services/workflows/syncer/orm.go +++ /dev/null @@ -1,139 +0,0 @@ -package syncer - -import ( - "context" - - "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" -) - -type ORM interface { - // GetSecretsURLByID returns the secrets URL for the given ID. - GetSecretsURLByID(ctx context.Context, id int64) (string, error) - - // GetSecretsURLByID returns the secrets URL for the given ID. - GetSecretsURLByHash(ctx context.Context, hash string) (string, error) - - // GetContents returns the contents of the secret at the given plain URL. - GetContents(ctx context.Context, url string) (string, error) - - // GetContentsByHash returns the contents of the secret at the given hashed URL. - GetContentsByHash(ctx context.Context, hash string) (string, error) - - // GetSecretsURLHash returns the keccak256 hash of the owner and secrets URL. - GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) - - // Update updates the contents of the secrets at the given plain URL or inserts a new record if not found. - Update(ctx context.Context, secretsURL, contents string) (int64, error) - - Create(ctx context.Context, secretsURL, hash, contents string) (int64, error) -} - -type WorkflowRegistryDS = ORM - -type orm struct { - ds sqlutil.DataSource - lggr logger.Logger -} - -var _ ORM = (*orm)(nil) - -func NewWorkflowRegistryDS(ds sqlutil.DataSource, lggr logger.Logger) *orm { - return &orm{ - ds: ds, - lggr: lggr, - } -} - -func (orm *orm) GetSecretsURLByID(ctx context.Context, id int64) (string, error) { - var secretsURL string - err := orm.ds.GetContext(ctx, &secretsURL, - `SELECT secrets_url FROM workflow_secrets WHERE workflow_secrets.id = $1`, - id, - ) - - return secretsURL, err -} - -func (orm *orm) GetSecretsURLByHash(ctx context.Context, hash string) (string, error) { - var secretsURL string - err := orm.ds.GetContext(ctx, &secretsURL, - `SELECT secrets_url FROM workflow_secrets WHERE workflow_secrets.secrets_url_hash = $1`, - hash, - ) - - return secretsURL, err -} - -func (orm *orm) GetContentsByHash(ctx context.Context, hash string) (string, error) { - var contents string - err := orm.ds.GetContext(ctx, &contents, - `SELECT contents - FROM workflow_secrets - WHERE secrets_url_hash = $1`, - hash, - ) - - if err != nil { - return "", err // Return an empty Artifact struct and the error - } - - return contents, nil // Return the populated Artifact struct -} - -func (orm *orm) GetContents(ctx context.Context, url string) (string, error) { - var contents string - err := orm.ds.GetContext(ctx, &contents, - `SELECT contents - FROM workflow_secrets - WHERE secrets_url = $1`, - url, - ) - - if err != nil { - return "", err // Return an empty Artifact struct and the error - } - - return contents, nil // Return the populated Artifact struct -} - -// Update updates the secrets content at the given hash or inserts a new record if not found. -func (orm *orm) Update(ctx context.Context, hash, contents string) (int64, error) { - var id int64 - err := orm.ds.QueryRowxContext(ctx, - `INSERT INTO workflow_secrets (secrets_url_hash, contents) - VALUES ($1, $2) - ON CONFLICT (secrets_url_hash) DO UPDATE - SET secrets_url_hash = EXCLUDED.secrets_url_hash, contents = EXCLUDED.contents - RETURNING id`, - hash, contents, - ).Scan(&id) - - if err != nil { - return 0, err - } - - return id, nil -} - -// Update updates the secrets content at the given hash or inserts a new record if not found. -func (orm *orm) Create(ctx context.Context, url, hash, contents string) (int64, error) { - var id int64 - err := orm.ds.QueryRowxContext(ctx, - `INSERT INTO workflow_secrets (secrets_url, secrets_url_hash, contents) - VALUES ($1, $2, $3) - RETURNING id`, - url, hash, contents, - ).Scan(&id) - - if err != nil { - return 0, err - } - - return id, nil -} - -func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { - return crypto.Keccak256(append(owner, secretsURL...)) -} diff --git a/core/services/workflows/syncer/orm_test.go b/core/services/workflows/syncer/orm_test.go deleted file mode 100644 index 8b9f685bb52..00000000000 --- a/core/services/workflows/syncer/orm_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package syncer - -import ( - "encoding/hex" - "testing" - - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestWorkflowArtifactsORM_GetAndUpdate(t *testing.T) { - db := pgtest.NewSqlxDB(t) - ctx := testutils.Context(t) - lggr := logger.TestLogger(t) - orm := &orm{ds: db, lggr: lggr} - - giveURL := "https://example.com" - giveBytes, err := crypto.Keccak256([]byte(giveURL)) - require.NoError(t, err) - giveHash := hex.EncodeToString(giveBytes) - giveContent := "some contents" - - gotID, err := orm.Create(ctx, giveURL, giveHash, giveContent) - require.NoError(t, err) - - url, err := orm.GetSecretsURLByID(ctx, gotID) - require.NoError(t, err) - assert.Equal(t, giveURL, url) - - contents, err := orm.GetContents(ctx, giveURL) - require.NoError(t, err) - assert.Equal(t, "some contents", contents) - - contents, err = orm.GetContentsByHash(ctx, giveHash) - require.NoError(t, err) - assert.Equal(t, "some contents", contents) - - _, err = orm.Update(ctx, giveHash, "new contents") - require.NoError(t, err) - - contents, err = orm.GetContents(ctx, giveURL) - require.NoError(t, err) - assert.Equal(t, "new contents", contents) - - contents, err = orm.GetContentsByHash(ctx, giveHash) - require.NoError(t, err) - assert.Equal(t, "new contents", contents) -} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index ff77da9ea6f..1d42e9d5deb 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -2,589 +2,39 @@ package syncer import ( "context" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "strconv" - "sync" - "time" "github.com/smartcontractkit/chainlink-common/pkg/services" - types "github.com/smartcontractkit/chainlink-common/pkg/types" - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" - "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" - "github.com/smartcontractkit/chainlink/v2/core/logger" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" ) -const name = "WorkflowRegistrySyncer" - -var ( - defaultTickInterval = 12 * time.Second - ContractName = "WorkflowRegistry" -) - -// WorkflowRegistryrEventType is the type of event that is emitted by the WorkflowRegistry -type WorkflowRegistryEventType string - -var ( - // ForceUpdateSecretsEvent is emitted when a request to force update a workflows secrets is made - ForceUpdateSecretsEvent WorkflowRegistryEventType = "WorkflowForceUpdateSecretsRequestedV1" -) - -// WorkflowRegistryForceUpdateSecretsRequestedV1 is a chain agnostic definition of the WorkflowRegistry -// ForceUpdateSecretsRequested event. -type WorkflowRegistryForceUpdateSecretsRequestedV1 struct { - SecretsURLHash []byte - Owner []byte - WorkflowName string -} - -type Head struct { - Hash string - Height string - Timestamp uint64 -} - -// WorkflowRegistryEvent is an event emitted by the WorkflowRegistry. Each event is typed -// so that the consumer can determine how to handle the event. -type WorkflowRegistryEvent struct { - Cursor string - Data any - EventType WorkflowRegistryEventType - Head Head -} - -// WorkflowRegistryEventResponse is a response to either parsing a queried event or handling the event. -type WorkflowRegistryEventResponse struct { - Err error - Event *WorkflowRegistryEvent -} - -// ContractEventPollerConfig is the configuration needed to poll for events on a contract. Currently -// requires the ContractEventName. -// -// TODO(mstreet3): Use LookbackBlocks instead of StartBlockNum -type ContractEventPollerConfig struct { - ContractName string - ContractAddress string - StartBlockNum uint64 - QueryCount uint64 -} - -// FetcherFunc is an abstraction for fetching the contents stored at a URL. -type FetcherFunc func(ctx context.Context, url string) ([]byte, error) - -type ContractReaderFactory interface { - NewContractReader(context.Context, []byte) (types.ContractReader, error) -} - -// ContractReader is a subset of types.ContractReader defined locally to enable mocking. -type ContractReader interface { - Bind(context.Context, []types.BoundContract) error - QueryKey(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error) -} - -// WorkflowRegistrySyncer is the public interface of the package. -type WorkflowRegistrySyncer interface { - services.Service -} - -var _ WorkflowRegistrySyncer = (*workflowRegistry)(nil) - -// workflowRegistry is the implementation of the WorkflowRegistrySyncer interface. -type workflowRegistry struct { +type WorkflowRegistry struct { services.StateMachine - - // close stopCh to stop the workflowRegistry. - stopCh services.StopChan - - // all goroutines are waited on with wg. - wg sync.WaitGroup - - // ticker is the interval at which the workflowRegistry will poll the contract for events. - ticker <-chan time.Time - - lggr logger.Logger - orm WorkflowRegistryDS - reader ContractReader - gateway FetcherFunc - - // initReader allows the workflowRegistry to initialize a contract reader if one is not provided - // and separates the contract reader initialization from the workflowRegistry start up. - initReader func(context.Context, logger.Logger, ContractReaderFactory, types.BoundContract) (types.ContractReader, error) - relayer ContractReaderFactory - - cfg ContractEventPollerConfig - eventTypes []WorkflowRegistryEventType - - // eventsCh is read by the handler and each event is handled once received. - eventsCh chan WorkflowRegistryEventResponse - handler *eventHandler - - // batchCh is a channel that receives batches of events from the contract query goroutines. - batchCh chan []WorkflowRegistryEventResponse - - // heap is a min heap that merges batches of events from the contract query goroutines. The - // default min heap is sorted by block height. - heap Heap -} - -// WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful -// for overriding the default tick interval. -func WithTicker(ticker <-chan time.Time) func(*workflowRegistry) { - return func(wr *workflowRegistry) { - wr.ticker = ticker - } -} - -func WithReader(reader types.ContractReader) func(*workflowRegistry) { - return func(wr *workflowRegistry) { - wr.reader = reader - } -} - -// NewWorkflowRegistry returns a new workflowRegistry. -// Only queries for WorkflowRegistryForceUpdateSecretsRequestedV1 events. -func NewWorkflowRegistry[T ContractReader]( - lggr logger.Logger, - orm WorkflowRegistryDS, - reader T, - gateway FetcherFunc, - addr string, - opts ...func(*workflowRegistry), -) *workflowRegistry { - ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} - wr := &workflowRegistry{ - lggr: lggr.Named(name), - orm: orm, - reader: reader, - gateway: gateway, - cfg: ContractEventPollerConfig{ - ContractName: ContractName, - ContractAddress: addr, - QueryCount: 20, - StartBlockNum: 0, - }, - initReader: newReader, - heap: newBlockHeightHeap(), - stopCh: make(services.StopChan), - eventTypes: ets, - eventsCh: make(chan WorkflowRegistryEventResponse), - batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), - } - wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway) - for _, opt := range opts { - opt(wr) - } - return wr } -// Start starts the workflowRegistry. It starts two goroutines, one for querying the contract -// and one for handling the events. -func (w *workflowRegistry) Start(_ context.Context) error { - return w.StartOnce(w.Name(), func() error { - ctx, cancel := w.stopCh.NewCtx() - - w.wg.Add(1) - go func() { - defer w.wg.Done() - defer cancel() - - w.syncEventsLoop(ctx) - }() - - w.wg.Add(1) - go func() { - defer w.wg.Done() - defer cancel() - - w.handlerLoop(ctx) - }() - - return nil - }) -} - -func (w *workflowRegistry) Close() error { - return w.StopOnce(w.Name(), func() error { - close(w.stopCh) - w.wg.Wait() - return nil - }) -} - -func (w *workflowRegistry) Ready() error { +func (w *WorkflowRegistry) Start(ctx context.Context) error { return nil } -func (w *workflowRegistry) HealthReport() map[string]error { +func (w *WorkflowRegistry) Close() error { return nil } -func (w *workflowRegistry) Name() string { - return name -} - -func (w *workflowRegistry) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { - return nil, errors.New("not implemented") -} - -// handlerLoop handles the events that are emitted by the contract. -func (w *workflowRegistry) handlerLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case resp, open := <-w.eventsCh: - if !open { - return - } - - if resp.Err != nil || resp.Event == nil { - w.lggr.Errorf("failed to handle event: %+v", resp.Err) - continue - } - - event := resp.Event - w.lggr.Debugf("handling event: %+v", event) - if err := w.handler.Handle(ctx, *event); err != nil { - w.lggr.Errorf("failed to handle event: %+v", event) - continue - } - } - } -} - -// syncEventsLoop polls the contract for events and passes them to a channel for handling. -func (w *workflowRegistry) syncEventsLoop(ctx context.Context) { - var ( - // sendLog is a helper that sends a WorkflowRegistryEventResponse to the eventsCh in a - // blocking way that will send the response or be canceled. - sendLog = func(resp WorkflowRegistryEventResponse) { - select { - case w.eventsCh <- resp: - case <-ctx.Done(): - } - } - - ticker = w.getTicker() - - signals = make(map[WorkflowRegistryEventType]chan struct{}, 0) - ) - - // critical failure if there is no reader, the loop will exit and the parent context will be - // canceled. - reader, err := w.getContractReader(ctx) - if err != nil { - w.lggr.Criticalf("contract reader unavailable : %s", err) - return - } - - // fan out and query for each event type - for i := 0; i < len(w.eventTypes); i++ { - signal := make(chan struct{}, 1) - signals[w.eventTypes[i]] = signal - w.wg.Add(1) - go func() { - defer w.wg.Done() - - queryEvent( - ctx, - signal, - w.lggr, - reader, - w.cfg, - w.eventTypes[i], - w.batchCh, - ) - }() - } - - // Periodically send a signal to all the queryEvent goroutines to query the contract - for { - select { - case <-ctx.Done(): - return - case <-ticker: - // for each event type, send a signal for it to execute a query and produce a new - // batch of event logs - for i := 0; i < len(w.eventTypes); i++ { - signal := signals[w.eventTypes[i]] - select { - case signal <- struct{}{}: - case <-ctx.Done(): - return - } - } - - // block on fan-in until all fetched event logs are sent to the handlers - w.orderAndSend( - ctx, - len(w.eventTypes), - w.batchCh, - sendLog, - ) - } - } -} - -// orderAndSend reads n batches from the batch channel, heapifies all the batches then dequeues -// the min heap via the sendLog function. -func (w *workflowRegistry) orderAndSend( - ctx context.Context, - batchCount int, - batchCh <-chan []WorkflowRegistryEventResponse, - sendLog func(WorkflowRegistryEventResponse), -) { - for { - select { - case <-ctx.Done(): - return - case batch := <-batchCh: - for _, response := range batch { - w.heap.Push(response) - } - batchCount-- - - // If we have received responses for all the events, then we can drain the heap. - if batchCount == 0 { - for w.heap.Len() > 0 { - sendLog(w.heap.Pop()) - } - return - } - } - } -} - -// getTicker returns the ticker that the workflowRegistry will use to poll for events. If the ticker -// is nil, then a default ticker is returned. -func (w *workflowRegistry) getTicker() <-chan time.Time { - if w.ticker == nil { - return time.NewTicker(defaultTickInterval).C - } - - return w.ticker -} - -// getContractReader initializes a contract reader if needed, otherwise returns the existing -// reader. -func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReader, error) { - c := types.BoundContract{ - Name: w.cfg.ContractName, - Address: w.cfg.ContractAddress, - } - - if w.reader == nil { - reader, err := w.initReader(ctx, w.lggr, w.relayer, c) - if err != nil { - return nil, err - } - - w.reader = reader - } - - return w.reader, nil -} - -// queryEvent queries the contract for events of the given type on each tick from the ticker. -// Sends a batch of event logs to the batch channel. The batch represents all the -// event logs read since the last query. Loops until the context is canceled. -func queryEvent( - ctx context.Context, - ticker <-chan struct{}, - lggr logger.Logger, - reader ContractReader, - cfg ContractEventPollerConfig, - et WorkflowRegistryEventType, - batchCh chan<- []WorkflowRegistryEventResponse, -) { - // create query - var ( - responseBatch []WorkflowRegistryEventResponse - logData values.Value - cursor = "" - limitAndSort = query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: cfg.QueryCount}, - } - bc = types.BoundContract{ - Name: cfg.ContractName, - Address: cfg.ContractAddress, - } - ) - - // Loop until canceled - for { - select { - case <-ctx.Done(): - return - case <-ticker: - if cursor != "" { - limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, cfg.QueryCount) - } - - logs, err := reader.QueryKey( - ctx, - bc, - query.KeyFilter{ - Key: string(et), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block(strconv.FormatUint(cfg.StartBlockNum, 10), primitives.Gte), - }, - }, - limitAndSort, - &logData, - ) - - if err != nil { - lggr.Errorw("QueryKey failure", "err", err) - continue - } - - // ChainReader QueryKey API provides logs including the cursor value and not - // after the cursor value. If the response only consists of the log corresponding - // to the cursor and no log after it, then we understand that there are no new - // logs - if len(logs) == 1 && logs[0].Cursor == cursor { - lggr.Infow("No new logs since", "cursor", cursor) - continue - } - - for _, log := range logs { - if log.Cursor == cursor { - continue - } - - responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log, et, lggr)) - cursor = log.Cursor - } - batchCh <- responseBatch - } - } -} - -func newReader( - ctx context.Context, - lggr logger.Logger, - factory ContractReaderFactory, - bc types.BoundContract, -) (types.ContractReader, error) { - contractReaderCfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - ContractName: { - ContractPollingFilter: evmtypes.ContractPollingFilter{ - GenericEventNames: []string{string(ForceUpdateSecretsEvent)}, - }, - ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - string(ForceUpdateSecretsEvent): { - ChainSpecificName: string(ForceUpdateSecretsEvent), - ReadType: evmtypes.Event, - }, - }, - }, - }, - } - - marshalledCfg, err := json.Marshal(contractReaderCfg) - if err != nil { - return nil, err - } - - reader, err := factory.NewContractReader(ctx, marshalledCfg) - if err != nil { - return nil, err - } - - // bind contract to contract reader - if err := reader.Bind(ctx, []types.BoundContract{bc}); err != nil { - return nil, err - } - - return reader, nil -} - -// toWorkflowRegistryEventResponse converts a types.Sequence to a WorkflowRegistryEventResponse. -func toWorkflowRegistryEventResponse( - log types.Sequence, - evt WorkflowRegistryEventType, - lggr logger.Logger, -) WorkflowRegistryEventResponse { - resp := WorkflowRegistryEventResponse{ - Event: &WorkflowRegistryEvent{ - Cursor: log.Cursor, - EventType: evt, - Head: Head{ - Hash: hex.EncodeToString(log.Hash), - Height: log.Height, - Timestamp: log.Timestamp, - }, - }, - } - - dataAsValuesMap, err := values.WrapMap(log.Data) - if err != nil { - return WorkflowRegistryEventResponse{ - Err: err, - } - } - - switch evt { - case ForceUpdateSecretsEvent: - var data WorkflowRegistryForceUpdateSecretsRequestedV1 - if err := dataAsValuesMap.UnwrapTo(&data); err != nil { - lggr.Errorf("failed to unwrap data: %+v", log.Data) - resp.Event = nil - resp.Err = err - return resp - } - resp.Event.Data = data - default: - lggr.Errorf("unknown event type: %s", evt) - resp.Event = nil - resp.Err = fmt.Errorf("unknown event type: %s", evt) - } - - return resp -} - -type nullWorkflowRegistrySyncer struct { - services.Service -} - -func NewNullWorkflowRegistrySyncer() *nullWorkflowRegistrySyncer { - return &nullWorkflowRegistrySyncer{} -} - -// Start -func (u *nullWorkflowRegistrySyncer) Start(context.Context) error { +func (w *WorkflowRegistry) Ready() error { return nil } -// Close -func (u *nullWorkflowRegistrySyncer) Close() error { +func (w *WorkflowRegistry) HealthReport() map[string]error { return nil } -// SecretsFor -func (u *nullWorkflowRegistrySyncer) SecretsFor(context.Context, string, string) (map[string]string, error) { - return nil, nil -} - -func (u *nullWorkflowRegistrySyncer) Ready() error { - return nil +func (w *WorkflowRegistry) Name() string { + return "WorkflowRegistrySyncer" } -func (u *nullWorkflowRegistrySyncer) HealthReport() map[string]error { - return nil +func (w *WorkflowRegistry) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) { + // TODO: actually get this from the right place. + return map[string]string{}, nil } -func (u *nullWorkflowRegistrySyncer) Name() string { - return "Null" + name +func NewWorkflowRegistry() *WorkflowRegistry { + return &WorkflowRegistry{} } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go deleted file mode 100644 index d979437d54d..00000000000 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ /dev/null @@ -1,101 +0,0 @@ -package syncer - -import ( - "context" - "encoding/hex" - "strconv" - "testing" - "time" - - "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" - types "github.com/smartcontractkit/chainlink-common/pkg/types" - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" - "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - "github.com/smartcontractkit/chainlink/v2/core/utils/matches" - - "github.com/stretchr/testify/require" -) - -func Test_Workflow_Registry_Syncer(t *testing.T) { - var ( - giveContents = "contents" - wantContents = "updated contents" - giveCfg = ContractEventPollerConfig{ - ContractName: ContractName, - ContractAddress: "0xdeadbeef", - StartBlockNum: 0, - QueryCount: 20, - } - giveURL = "http://example.com" - giveHash, err = crypto.Keccak256([]byte(giveURL)) - - giveLog = types.Sequence{ - Data: map[string]any{ - "SecretsURLHash": giveHash, - "Owner": "0xowneraddr", - }, - Cursor: "cursor", - } - ) - - require.NoError(t, err) - - var ( - lggr = logger.TestLogger(t) - db = pgtest.NewSqlxDB(t) - orm = &orm{ds: db, lggr: lggr} - ctx, cancel = context.WithCancel(testutils.Context(t)) - reader = NewMockContractReader(t) - gateway = func(_ context.Context, _ string) ([]byte, error) { - return []byte(wantContents), nil - } - ticker = make(chan time.Time) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, WithTicker(ticker)) - ) - - // Cleanup the worker - defer cancel() - - // Seed the DB with an original entry - _, err = orm.Create(ctx, giveURL, hex.EncodeToString(giveHash), giveContents) - require.NoError(t, err) - - // Mock out the contract reader query - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: giveCfg.ContractName, - Address: giveCfg.ContractAddress, - }, - query.KeyFilter{ - Key: string(ForceUpdateSecretsEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block(strconv.FormatUint(giveCfg.StartBlockNum, 10), primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{giveLog}, nil) - - // Go run the worker - servicetest.Run(t, worker) - - // Send a tick to start a query - ticker <- time.Now() - - // Require the secrets contents to eventually be updated - require.Eventually(t, func() bool { - secrets, err := orm.GetContents(ctx, giveURL) - require.NoError(t, err) - return secrets == wantContents - }, 5*time.Second, time.Second) -} diff --git a/core/store/migrate/migrations/0259_add_workflow_secrets.sql b/core/store/migrate/migrations/0259_add_workflow_secrets.sql deleted file mode 100644 index fb76d945571..00000000000 --- a/core/store/migrate/migrations/0259_add_workflow_secrets.sql +++ /dev/null @@ -1,41 +0,0 @@ --- +goose Up --- +goose StatementBegin --- Create the workflow_artifacts table -CREATE TABLE workflow_secrets ( - id SERIAL PRIMARY KEY, - secrets_url TEXT, - secrets_url_hash TEXT UNIQUE, - contents TEXT -); - --- Create an index on the secrets_url_hash column -CREATE INDEX idx_secrets_url ON workflow_secrets(secrets_url); - --- Alter the workflow_specs table -ALTER TABLE workflow_specs -ADD COLUMN binary_url TEXT DEFAULT '', -ADD COLUMN config_url TEXT DEFAULT '', -ADD COLUMN secrets_id INT UNIQUE REFERENCES workflow_secrets(id) ON DELETE CASCADE; - --- Alter the config column type -ALTER TABLE workflow_specs -ALTER COLUMN config TYPE TEXT; --- +goose StatementEnd - --- +goose Down --- +goose StatementBegin -ALTER TABLE workflow_specs -DROP COLUMN IF EXISTS secrets_id, -DROP COLUMN IF EXISTS config_url, -DROP COLUMN IF EXISTS binary_url; - --- Change the config column back to character varying(255) -ALTER TABLE workflow_specs -ALTER COLUMN config TYPE CHARACTER VARYING(255); - --- Drop the index on the secrets_url_hash column -DROP INDEX IF EXISTS idx_secrets_url_hash; - --- Drop the workflow_artifacts table -DROP TABLE IF EXISTS workflow_secrets; --- +goose StatementEnd \ No newline at end of file diff --git a/core/utils/crypto/keccak_256.go b/core/utils/crypto/keccak_256.go deleted file mode 100644 index b6218d72cf0..00000000000 --- a/core/utils/crypto/keccak_256.go +++ /dev/null @@ -1,16 +0,0 @@ -package crypto - -import ( - "golang.org/x/crypto/sha3" -) - -func Keccak256(input []byte) ([]byte, error) { - // Create a Keccak-256 hash - hash := sha3.NewLegacyKeccak256() - _, err := hash.Write(input) - if err != nil { - return nil, err - } - - return hash.Sum(nil), nil -} diff --git a/core/utils/matches/matches.go b/core/utils/matches/matches.go deleted file mode 100644 index 90606af57e2..00000000000 --- a/core/utils/matches/matches.go +++ /dev/null @@ -1,21 +0,0 @@ -package matches - -import ( - "context" - - "github.com/stretchr/testify/mock" -) - -func anyContext(_ context.Context) bool { - return true -} - -func anyString(_ string) bool { - return true -} - -// AnyContext is an argument matcher that matches any argument of type context.Context. -var AnyContext = mock.MatchedBy(anyContext) - -// AnyString is an argument matcher that matches any argument of type string. -var AnyString = mock.MatchedBy(anyString)