diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index d6f507eac20..ba29e98526e 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" @@ -29,6 +30,7 @@ func Test_SecretsWorker(t *testing.T) { var ( ctx = coretestutils.Context(t) lggr = logger.TestLogger(t) + emitter = custmsg.NewLabeler() backendTH = testutils.NewEVMBackendTH(t) db = pgtest.NewSqlxDB(t) orm = syncer.NewWorkflowRegistryDS(db, lggr) @@ -119,6 +121,7 @@ func Test_SecretsWorker(t *testing.T) { wfRegistryAddr.Hex(), nil, nil, + emitter, syncer.WithTicker(giveTicker.C), ) diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 01e13d106f9..5ccb3f5e180 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -2,12 +2,18 @@ package syncer import ( "context" + "crypto/sha256" "encoding/hex" "errors" "fmt" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/platform" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) @@ -87,15 +93,28 @@ type WorkflowRegistryWorkflowDeletedV1 struct { WorkflowName string } +type secretsFetcher interface { + SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) +} + +// secretsFetcherFunc implements the secretsFetcher interface for a function. +type secretsFetcherFunc func(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) + +func (f secretsFetcherFunc) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { + return f(ctx, workflowOwner, workflowName) +} + // eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding // method that handles the event. type eventHandler struct { lggr logger.Logger - orm WorkflowSecretsDS + orm WorkflowRegistryDS fetcher FetcherFunc workflowStore store.Store capRegistry core.CapabilitiesRegistry engineRegistry *engineRegistry + emitter custmsg.MessageEmitter + secretsFetcher secretsFetcher } // newEventHandler returns a new eventHandler instance. @@ -106,6 +125,8 @@ func newEventHandler( workflowStore store.Store, capRegistry core.CapabilitiesRegistry, engineRegistry *engineRegistry, + emitter custmsg.MessageEmitter, + secretsFetcher secretsFetcher, ) *eventHandler { return &eventHandler{ lggr: lggr, @@ -114,15 +135,50 @@ func newEventHandler( workflowStore: workflowStore, capRegistry: capRegistry, engineRegistry: engineRegistry, + emitter: emitter, + secretsFetcher: secretsFetcher, } } func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error { switch event.EventType { case ForceUpdateSecretsEvent: - return h.forceUpdateSecretsEvent(ctx, event) + payload, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1) + if !ok { + return newHandlerTypeError(event.Data) + } + + cma := h.emitter.With( + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), + ) + + if err := h.forceUpdateSecretsEvent(ctx, payload); err != nil { + logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr) + return err + } + + return nil case WorkflowRegisteredEvent: - return h.workflowRegisteredEvent(ctx, event) + payload, ok := event.Data.(WorkflowRegistryWorkflowRegisteredV1) + if !ok { + return newHandlerTypeError(event.Data) + } + wfID := hex.EncodeToString(payload.WorkflowID[:]) + + cma := h.emitter.With( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + + if err := h.workflowRegisteredEvent(ctx, payload); err != nil { + logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow registered event: %v", err), h.lggr) + return err + } + + h.lggr.Debugf("workflow 0x%x registered and started", wfID) + return nil case WorkflowUpdatedEvent: return h.workflowUpdatedEvent(ctx, event) case WorkflowPausedEvent: @@ -135,12 +191,97 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) } // workflowRegisteredEvent handles the WorkflowRegisteredEvent event type. -// TODO: Implement this method func (h *eventHandler) workflowRegisteredEvent( - _ context.Context, - _ WorkflowRegistryEvent, + ctx context.Context, + payload WorkflowRegistryWorkflowRegisteredV1, ) error { - return ErrNotImplemented + wfID := hex.EncodeToString(payload.WorkflowID[:]) + + // Download the contents of binaryURL, configURL and secretsURL and cache them locally. + binary, err := h.fetcher(ctx, payload.BinaryURL) + if err != nil { + return fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err) + } + + config, err := h.fetcher(ctx, payload.ConfigURL) + if err != nil { + return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) + } + + secrets, err := h.fetcher(ctx, payload.SecretsURL) + if err != nil { + return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err) + } + + // Calculate the hash of the binary and config files + hash := workflowID(binary, config, []byte(payload.SecretsURL)) + + // Pre-check: verify that the workflowID matches; if it doesn’t abort and log an error via Beholder. + if hash != wfID { + return fmt.Errorf("workflowID mismatch: %s != %s", hash, wfID) + } + + // Save the workflow secrets + urlHash, err := h.orm.GetSecretsURLHash(payload.WorkflowOwner, []byte(payload.SecretsURL)) + if err != nil { + return fmt.Errorf("failed to get secrets URL hash: %w", err) + } + + // Create a new entry in the workflow_spec table corresponding for the new workflow, with the contents of the binaryURL + configURL in the table + status := job.WorkflowSpecStatusActive + if payload.Status == 1 { + status = job.WorkflowSpecStatusPaused + } + + entry := &job.WorkflowSpec{ + Workflow: hex.EncodeToString(binary), + Config: string(config), + WorkflowID: wfID, + Status: status, + WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner), + WorkflowName: payload.WorkflowName, + SpecType: job.WASMFile, + BinaryURL: payload.BinaryURL, + ConfigURL: payload.ConfigURL, + } + if _, err = h.orm.UpsertWorkflowSpecWithSecrets(ctx, entry, payload.SecretsURL, hex.EncodeToString(urlHash), string(secrets)); err != nil { + return fmt.Errorf("failed to upsert workflow spec with secrets: %w", err) + } + + if status != job.WorkflowSpecStatusActive { + return nil + } + + // If status == active, start a new WorkflowEngine instance, and add it to local engine registry + moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter} + sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config) + if err != nil { + return fmt.Errorf("failed to get workflow sdk spec: %w", err) + } + + cfg := workflows.Config{ + Lggr: h.lggr, + Workflow: *sdkSpec, + WorkflowID: wfID, + WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner), + WorkflowName: payload.WorkflowName, + Registry: h.capRegistry, + Store: h.workflowStore, + Config: config, + Binary: binary, + SecretsFetcher: h.secretsFetcher, + } + e, err := workflows.NewEngine(ctx, cfg) + if err != nil { + return fmt.Errorf("failed to create workflow engine: %w", err) + } + + if err := e.Start(ctx); err != nil { + return fmt.Errorf("failed to start workflow engine: %w", err) + } + + h.engineRegistry.Add(wfID, e) + return nil } // workflowUpdatedEvent handles the WorkflowUpdatedEvent event type. @@ -170,32 +311,47 @@ func (h *eventHandler) workflowActivatedEvent( // forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type. func (h *eventHandler) forceUpdateSecretsEvent( ctx context.Context, - event WorkflowRegistryEvent, + payload WorkflowRegistryForceUpdateSecretsRequestedV1, ) error { // Get the URL of the secrets file from the event data - data, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1) - if !ok { - return fmt.Errorf("invalid data type %T for event", event.Data) - } - - hash := hex.EncodeToString(data.SecretsURLHash) + hash := hex.EncodeToString(payload.SecretsURLHash) url, err := h.orm.GetSecretsURLByHash(ctx, hash) if err != nil { - h.lggr.Errorf("failed to get URL by hash %s : %s", hash, err) - return err + return fmt.Errorf("failed to get URL by hash %s : %w", hash, err) } // Fetch the contents of the secrets file from the url via the fetcher secrets, err := h.fetcher(ctx, url) if err != nil { - return err + return fmt.Errorf("failed to fetch secrets from url %s : %w", url, err) } // Update the secrets in the ORM if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil { - return err + return fmt.Errorf("failed to update secrets: %w", err) } return nil } + +// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL. +func workflowID(wasm, config, secretsURL []byte) string { + sum := sha256.New() + sum.Write(wasm) + sum.Write(config) + sum.Write(secretsURL) + return hex.EncodeToString(sum.Sum(nil)) +} + +// logCustMsg emits a custom message to the external sink and logs an error if that fails. +func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) { + err := cma.Emit(ctx, msg) + if err != nil { + log.Helper(1).Errorf("failed to send custom message with msg: %s, err: %v", msg, err) + } +} + +func newHandlerTypeError(data any) error { + return fmt.Errorf("invalid data type %T for event", data) +} diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index 17c980e4f56..42da3e8de9d 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -5,18 +5,43 @@ import ( "encoding/hex" "testing" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" + "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + wfstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/mocks" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/smartcontractkit/chainlink/v2/core/utils/matches" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +type mockFetchResp struct { + Body []byte + Err error +} + +type mockFetcher struct { + responseMap map[string]mockFetchResp +} + +func (m *mockFetcher) Fetch(_ context.Context, url string) ([]byte, error) { + return m.responseMap[url].Body, m.responseMap[url].Err +} + +func newMockFetcher(m map[string]mockFetchResp) FetcherFunc { + return (&mockFetcher{responseMap: m}).Fetch +} + func Test_Handler(t *testing.T) { lggr := logger.TestLogger(t) + emitter := custmsg.NewLabeler() t.Run("success", func(t *testing.T) { mockORM := mocks.NewORM(t) ctx := testutils.Context(t) @@ -38,7 +63,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) err = h.Handle(ctx, giveEvent) require.NoError(t, err) }) @@ -52,7 +77,7 @@ func Test_Handler(t *testing.T) { return []byte("contents"), nil } - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) err := h.Handle(ctx, giveEvent) require.Error(t, err) require.Contains(t, err.Error(), "event type unsupported") @@ -61,7 +86,7 @@ func Test_Handler(t *testing.T) { t.Run("fails to get secrets url", func(t *testing.T) { mockORM := mocks.NewORM(t) ctx := testutils.Context(t) - h := newEventHandler(lggr, mockORM, nil, nil, nil, nil) + h := newEventHandler(lggr, mockORM, nil, nil, nil, nil, emitter, nil) giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -101,7 +126,7 @@ func Test_Handler(t *testing.T) { return nil, assert.AnError } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -128,9 +153,140 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) }) } + +const ( + binaryLocation = "test/simple/cmd/testmodule.wasm" + binaryCmd = "core/capabilities/compute/test/simple/cmd" +) + +func Test_workflowRegisteredHandler(t *testing.T) { + t.Run("success with paused workflow registered", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + db = pgtest.NewSqlxDB(t) + orm = NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) + config = []byte("") + secretsURL = "http://example.com" + binaryURL = "http://example.com/binary" + configURL = "http://example.com/config" + wfOwner = []byte("0xOwner") + + fetcher = newMockFetcher(map[string]mockFetchResp{ + binaryURL: {Body: binary, Err: nil}, + configURL: {Body: config, Err: nil}, + secretsURL: {Body: []byte("secrets"), Err: nil}, + }) + ) + + giveWFID := workflowID(binary, config, []byte(secretsURL)) + + b, err := hex.DecodeString(giveWFID) + require.NoError(t, err) + wfID := make([]byte, 32) + copy(wfID, b) + + paused := WorkflowRegistryWorkflowRegisteredV1{ + Status: uint8(1), + WorkflowID: [32]byte(wfID), + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, + } + + h := &eventHandler{ + lggr: lggr, + orm: orm, + fetcher: fetcher, + emitter: emitter, + } + err = h.workflowRegisteredEvent(ctx, paused) + require.NoError(t, err) + + // Verify the record is updated in the database + dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) + require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status) + }) + + t.Run("success with active workflow registered", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + db = pgtest.NewSqlxDB(t) + orm = NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) + config = []byte("") + secretsURL = "http://example.com" + binaryURL = "http://example.com/binary" + configURL = "http://example.com/config" + wfOwner = []byte("0xOwner") + + fetcher = newMockFetcher(map[string]mockFetchResp{ + binaryURL: {Body: binary, Err: nil}, + configURL: {Body: config, Err: nil}, + secretsURL: {Body: []byte("secrets"), Err: nil}, + }) + ) + + giveWFID := workflowID(binary, config, []byte(secretsURL)) + + b, err := hex.DecodeString(giveWFID) + require.NoError(t, err) + wfID := make([]byte, 32) + copy(wfID, b) + + active := WorkflowRegistryWorkflowRegisteredV1{ + Status: uint8(0), + WorkflowID: [32]byte(wfID), + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, + } + + er := newEngineRegistry() + store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + h := &eventHandler{ + lggr: lggr, + orm: orm, + fetcher: fetcher, + emitter: emitter, + engineRegistry: er, + capRegistry: registry, + workflowStore: store, + } + err = h.workflowRegisteredEvent(ctx, active) + require.NoError(t, err) + + // Verify the record is updated in the database + dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) + require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) + + // Verify the engine is started + engine, err := h.engineRegistry.Get(giveWFID) + require.NoError(t, err) + err = engine.Ready() + require.NoError(t, err) + }) +} diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go index 2bb116cba4f..128100ea907 100644 --- a/core/services/workflows/syncer/mocks/orm.go +++ b/core/services/workflows/syncer/mocks/orm.go @@ -591,6 +591,66 @@ func (_c *ORM_UpsertWorkflowSpec_Call) RunAndReturn(run func(context.Context, *j return _c } +// UpsertWorkflowSpecWithSecrets provides a mock function with given fields: ctx, spec, url, hash, contents +func (_m *ORM) UpsertWorkflowSpecWithSecrets(ctx context.Context, spec *job.WorkflowSpec, url string, hash string, contents string) (int64, error) { + ret := _m.Called(ctx, spec, url, hash, contents) + + if len(ret) == 0 { + panic("no return value specified for UpsertWorkflowSpecWithSecrets") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec, string, string, string) (int64, error)); ok { + return rf(ctx, spec, url, hash, contents) + } + if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec, string, string, string) int64); ok { + r0 = rf(ctx, spec, url, hash, contents) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, *job.WorkflowSpec, string, string, string) error); ok { + r1 = rf(ctx, spec, url, hash, contents) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_UpsertWorkflowSpecWithSecrets_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertWorkflowSpecWithSecrets' +type ORM_UpsertWorkflowSpecWithSecrets_Call struct { + *mock.Call +} + +// UpsertWorkflowSpecWithSecrets is a helper method to define mock.On call +// - ctx context.Context +// - spec *job.WorkflowSpec +// - url string +// - hash string +// - contents string +func (_e *ORM_Expecter) UpsertWorkflowSpecWithSecrets(ctx interface{}, spec interface{}, url interface{}, hash interface{}, contents interface{}) *ORM_UpsertWorkflowSpecWithSecrets_Call { + return &ORM_UpsertWorkflowSpecWithSecrets_Call{Call: _e.mock.On("UpsertWorkflowSpecWithSecrets", ctx, spec, url, hash, contents)} +} + +func (_c *ORM_UpsertWorkflowSpecWithSecrets_Call) Run(run func(ctx context.Context, spec *job.WorkflowSpec, url string, hash string, contents string)) *ORM_UpsertWorkflowSpecWithSecrets_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*job.WorkflowSpec), args[2].(string), args[3].(string), args[4].(string)) + }) + return _c +} + +func (_c *ORM_UpsertWorkflowSpecWithSecrets_Call) Return(_a0 int64, _a1 error) *ORM_UpsertWorkflowSpecWithSecrets_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_UpsertWorkflowSpecWithSecrets_Call) RunAndReturn(run func(context.Context, *job.WorkflowSpec, string, string, string) (int64, error)) *ORM_UpsertWorkflowSpecWithSecrets_Call { + _c.Call.Return(run) + return _c +} + // NewORM creates a new instance of ORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewORM(t interface { diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index 4a5be9d1a58..16612b9a9c6 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -3,6 +3,7 @@ package syncer import ( "context" "database/sql" + "fmt" "time" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -38,6 +39,10 @@ type WorkflowSpecsDS interface { // and owner UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) + // UpsertWorkflowSpecWithSecrets inserts or updates a workflow spec with secrets in a transaction. + // Updates on conflict of workflow name and owner. + UpsertWorkflowSpecWithSecrets(ctx context.Context, spec *job.WorkflowSpec, url, hash, contents string) (int64, error) + // GetWorkflowSpec returns the workflow spec for the given owner and name. GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) @@ -221,6 +226,81 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) return id, nil } +func (orm *orm) UpsertWorkflowSpecWithSecrets( + ctx context.Context, + spec *job.WorkflowSpec, url, hash, contents string) (int64, error) { + var id int64 + err := sqlutil.TransactDataSource(ctx, orm.ds, nil, func(tx sqlutil.DataSource) error { + var sid int64 + txErr := tx.QueryRowxContext(ctx, + `INSERT INTO workflow_secrets (secrets_url, secrets_url_hash, contents) + VALUES ($1, $2, $3) + RETURNING id`, + url, hash, contents, + ).Scan(&sid) + + if txErr != nil { + return fmt.Errorf("failed to create workflow secrets: %w", txErr) + } + + spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true} + + query := ` + INSERT INTO workflow_specs ( + workflow, + config, + workflow_id, + workflow_owner, + workflow_name, + status, + binary_url, + config_url, + secrets_id, + created_at, + updated_at, + spec_type + ) VALUES ( + :workflow, + :config, + :workflow_id, + :workflow_owner, + :workflow_name, + :status, + :binary_url, + :config_url, + :secrets_id, + :created_at, + :updated_at, + :spec_type + ) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE + SET + workflow = EXCLUDED.workflow, + config = EXCLUDED.config, + workflow_id = EXCLUDED.workflow_id, + workflow_owner = EXCLUDED.workflow_owner, + workflow_name = EXCLUDED.workflow_name, + status = EXCLUDED.status, + binary_url = EXCLUDED.binary_url, + config_url = EXCLUDED.config_url, + secrets_id = EXCLUDED.secrets_id, + created_at = EXCLUDED.created_at, + updated_at = EXCLUDED.updated_at, + spec_type = EXCLUDED.spec_type + RETURNING id + ` + + stmt, txErr := tx.PrepareNamedContext(ctx, query) + if txErr != nil { + return txErr + } + defer stmt.Close() + + spec.UpdatedAt = time.Now() + return stmt.QueryRowxContext(ctx, spec).Scan(&id) + }) + return id, err +} + func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) { query := ` SELECT * diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index d8ad37646d6..cdd0c71acc0 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services" types "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/core" @@ -95,6 +96,7 @@ type workflowRegistry struct { ticker <-chan time.Time lggr logger.Logger + emitter custmsg.Labeler orm WorkflowRegistryDS reader ContractReader gateway FetcherFunc @@ -147,11 +149,13 @@ func NewWorkflowRegistry[T ContractReader]( addr string, workflowStore store.Store, capRegistry core.CapabilitiesRegistry, + emitter custmsg.Labeler, opts ...func(*workflowRegistry), ) *workflowRegistry { ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} wr := &workflowRegistry{ lggr: lggr.Named(name), + emitter: emitter, orm: orm, reader: reader, gateway: gateway, @@ -172,7 +176,7 @@ func NewWorkflowRegistry[T ContractReader]( batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), } wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry, - wr.engineRegistry, + wr.engineRegistry, wr.emitter, secretsFetcherFunc(wr.SecretsFor), ) for _, opt := range opts { opt(wr) diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 652b20deea1..58dcbed1022 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" types "github.com/smartcontractkit/chainlink-common/pkg/types" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" @@ -51,11 +52,12 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { orm = &orm{ds: db, lggr: lggr} ctx, cancel = context.WithCancel(testutils.Context(t)) reader = NewMockContractReader(t) + emitter = custmsg.NewLabeler() gateway = func(_ context.Context, _ string) ([]byte, error) { return []byte(wantContents), nil } ticker = make(chan time.Time) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, WithTicker(ticker)) + worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, emitter, WithTicker(ticker)) ) // Cleanup the worker