diff --git a/core/services/job/models.go b/core/services/job/models.go index 423a297c8da..26d563c7ac8 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -869,21 +869,30 @@ const ( DefaultSpecType = "" ) +type WorkflowSpecStatus string + +const ( + WorkflowSpecStatusActive WorkflowSpecStatus = "active" + WorkflowSpecStatusPaused WorkflowSpecStatus = "paused" + WorkflowSpecStatusDefault WorkflowSpecStatus = "" +) + type WorkflowSpec struct { ID int32 `toml:"-"` Workflow string `toml:"workflow"` // the raw representation of the workflow Config string `toml:"config" db:"config"` // the raw representation of the config // fields derived from the yaml spec, used for indexing the database // note: i tried to make these private, but translating them to the database seems to require them to be public - WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow. - WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow. - WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. - BinaryURL string `db:"binary_url"` - ConfigURL string `db:"config_url"` - SecretsID sql.NullInt64 `db:"secrets_id"` - CreatedAt time.Time `toml:"-"` - UpdatedAt time.Time `toml:"-"` - SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` + WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow. + WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow. + WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. + Status WorkflowSpecStatus `db:"status"` + BinaryURL string `db:"binary_url"` + ConfigURL string `db:"config_url"` + SecretsID sql.NullInt64 `db:"secrets_id"` + CreatedAt time.Time `toml:"-"` + UpdatedAt time.Time `toml:"-"` + SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` sdkWorkflow *sdk.WorkflowSpec rawSpec []byte config []byte diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go index 19c459fa0ee..2bb116cba4f 100644 --- a/core/services/workflows/syncer/mocks/orm.go +++ b/core/services/workflows/syncer/mocks/orm.go @@ -81,59 +81,50 @@ func (_c *ORM_Create_Call) RunAndReturn(run func(context.Context, string, string return _c } -// CreateWorkflowSpec provides a mock function with given fields: ctx, spec -func (_m *ORM) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { - ret := _m.Called(ctx, spec) +// DeleteWorkflowSpec provides a mock function with given fields: ctx, owner, name +func (_m *ORM) DeleteWorkflowSpec(ctx context.Context, owner string, name string) error { + ret := _m.Called(ctx, owner, name) if len(ret) == 0 { - panic("no return value specified for CreateWorkflowSpec") + panic("no return value specified for DeleteWorkflowSpec") } - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) (int64, error)); ok { - return rf(ctx, spec) - } - if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) int64); ok { - r0 = rf(ctx, spec) + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, owner, name) } else { - r0 = ret.Get(0).(int64) + r0 = ret.Error(0) } - if rf, ok := ret.Get(1).(func(context.Context, *job.WorkflowSpec) error); ok { - r1 = rf(ctx, spec) - } else { - r1 = ret.Error(1) - } - - return r0, r1 + return r0 } -// ORM_CreateWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateWorkflowSpec' -type ORM_CreateWorkflowSpec_Call struct { +// ORM_DeleteWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteWorkflowSpec' +type ORM_DeleteWorkflowSpec_Call struct { *mock.Call } -// CreateWorkflowSpec is a helper method to define mock.On call +// DeleteWorkflowSpec is a helper method to define mock.On call // - ctx context.Context -// - spec *job.WorkflowSpec -func (_e *ORM_Expecter) CreateWorkflowSpec(ctx interface{}, spec interface{}) *ORM_CreateWorkflowSpec_Call { - return &ORM_CreateWorkflowSpec_Call{Call: _e.mock.On("CreateWorkflowSpec", ctx, spec)} +// - owner string +// - name string +func (_e *ORM_Expecter) DeleteWorkflowSpec(ctx interface{}, owner interface{}, name interface{}) *ORM_DeleteWorkflowSpec_Call { + return &ORM_DeleteWorkflowSpec_Call{Call: _e.mock.On("DeleteWorkflowSpec", ctx, owner, name)} } -func (_c *ORM_CreateWorkflowSpec_Call) Run(run func(ctx context.Context, spec *job.WorkflowSpec)) *ORM_CreateWorkflowSpec_Call { +func (_c *ORM_DeleteWorkflowSpec_Call) Run(run func(ctx context.Context, owner string, name string)) *ORM_DeleteWorkflowSpec_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*job.WorkflowSpec)) + run(args[0].(context.Context), args[1].(string), args[2].(string)) }) return _c } -func (_c *ORM_CreateWorkflowSpec_Call) Return(_a0 int64, _a1 error) *ORM_CreateWorkflowSpec_Call { - _c.Call.Return(_a0, _a1) +func (_c *ORM_DeleteWorkflowSpec_Call) Return(_a0 error) *ORM_DeleteWorkflowSpec_Call { + _c.Call.Return(_a0) return _c } -func (_c *ORM_CreateWorkflowSpec_Call) RunAndReturn(run func(context.Context, *job.WorkflowSpec) (int64, error)) *ORM_CreateWorkflowSpec_Call { +func (_c *ORM_DeleteWorkflowSpec_Call) RunAndReturn(run func(context.Context, string, string) error) *ORM_DeleteWorkflowSpec_Call { _c.Call.Return(run) return _c } @@ -425,6 +416,66 @@ func (_c *ORM_GetSecretsURLHash_Call) RunAndReturn(run func([]byte, []byte) ([]b return _c } +// GetWorkflowSpec provides a mock function with given fields: ctx, owner, name +func (_m *ORM) GetWorkflowSpec(ctx context.Context, owner string, name string) (*job.WorkflowSpec, error) { + ret := _m.Called(ctx, owner, name) + + if len(ret) == 0 { + panic("no return value specified for GetWorkflowSpec") + } + + var r0 *job.WorkflowSpec + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (*job.WorkflowSpec, error)); ok { + return rf(ctx, owner, name) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) *job.WorkflowSpec); ok { + r0 = rf(ctx, owner, name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*job.WorkflowSpec) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, owner, name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_GetWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowSpec' +type ORM_GetWorkflowSpec_Call struct { + *mock.Call +} + +// GetWorkflowSpec is a helper method to define mock.On call +// - ctx context.Context +// - owner string +// - name string +func (_e *ORM_Expecter) GetWorkflowSpec(ctx interface{}, owner interface{}, name interface{}) *ORM_GetWorkflowSpec_Call { + return &ORM_GetWorkflowSpec_Call{Call: _e.mock.On("GetWorkflowSpec", ctx, owner, name)} +} + +func (_c *ORM_GetWorkflowSpec_Call) Run(run func(ctx context.Context, owner string, name string)) *ORM_GetWorkflowSpec_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *ORM_GetWorkflowSpec_Call) Return(_a0 *job.WorkflowSpec, _a1 error) *ORM_GetWorkflowSpec_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_GetWorkflowSpec_Call) RunAndReturn(run func(context.Context, string, string) (*job.WorkflowSpec, error)) *ORM_GetWorkflowSpec_Call { + _c.Call.Return(run) + return _c +} + // Update provides a mock function with given fields: ctx, secretsURL, contents func (_m *ORM) Update(ctx context.Context, secretsURL string, contents string) (int64, error) { ret := _m.Called(ctx, secretsURL, contents) @@ -483,6 +534,63 @@ func (_c *ORM_Update_Call) RunAndReturn(run func(context.Context, string, string return _c } +// UpsertWorkflowSpec provides a mock function with given fields: ctx, spec +func (_m *ORM) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { + ret := _m.Called(ctx, spec) + + if len(ret) == 0 { + panic("no return value specified for UpsertWorkflowSpec") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) (int64, error)); ok { + return rf(ctx, spec) + } + if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) int64); ok { + r0 = rf(ctx, spec) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, *job.WorkflowSpec) error); ok { + r1 = rf(ctx, spec) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_UpsertWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertWorkflowSpec' +type ORM_UpsertWorkflowSpec_Call struct { + *mock.Call +} + +// UpsertWorkflowSpec is a helper method to define mock.On call +// - ctx context.Context +// - spec *job.WorkflowSpec +func (_e *ORM_Expecter) UpsertWorkflowSpec(ctx interface{}, spec interface{}) *ORM_UpsertWorkflowSpec_Call { + return &ORM_UpsertWorkflowSpec_Call{Call: _e.mock.On("UpsertWorkflowSpec", ctx, spec)} +} + +func (_c *ORM_UpsertWorkflowSpec_Call) Run(run func(ctx context.Context, spec *job.WorkflowSpec)) *ORM_UpsertWorkflowSpec_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*job.WorkflowSpec)) + }) + return _c +} + +func (_c *ORM_UpsertWorkflowSpec_Call) Return(_a0 int64, _a1 error) *ORM_UpsertWorkflowSpec_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_UpsertWorkflowSpec_Call) RunAndReturn(run func(context.Context, *job.WorkflowSpec) (int64, error)) *ORM_UpsertWorkflowSpec_Call { + _c.Call.Return(run) + return _c +} + // NewORM creates a new instance of ORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewORM(t interface { diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index d43dbe09b78..4a5be9d1a58 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -2,7 +2,8 @@ package syncer import ( "context" - "errors" + "database/sql" + "time" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -33,7 +34,15 @@ type WorkflowSecretsDS interface { } type WorkflowSpecsDS interface { - CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) + // UpsertWorkflowSpec inserts or updates a workflow spec. Updates on conflict of workflow name + // and owner + UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) + + // GetWorkflowSpec returns the workflow spec for the given owner and name. + GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) + + // DeleteWorkflowSpec deletes the workflow spec for the given owner and name. + DeleteWorkflowSpec(ctx context.Context, owner, name string) error } type ORM interface { @@ -149,6 +158,104 @@ func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { return crypto.Keccak256(append(owner, secretsURL...)) } -func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { - return 0, errors.New("not implemented") +func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { + var id int64 + + query := ` + INSERT INTO workflow_specs ( + workflow, + config, + workflow_id, + workflow_owner, + workflow_name, + status, + binary_url, + config_url, + secrets_id, + created_at, + updated_at, + spec_type + ) VALUES ( + :workflow, + :config, + :workflow_id, + :workflow_owner, + :workflow_name, + :status, + :binary_url, + :config_url, + :secrets_id, + :created_at, + :updated_at, + :spec_type + ) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE + SET + workflow = EXCLUDED.workflow, + config = EXCLUDED.config, + workflow_id = EXCLUDED.workflow_id, + workflow_owner = EXCLUDED.workflow_owner, + workflow_name = EXCLUDED.workflow_name, + status = EXCLUDED.status, + binary_url = EXCLUDED.binary_url, + config_url = EXCLUDED.config_url, + secrets_id = EXCLUDED.secrets_id, + created_at = EXCLUDED.created_at, + updated_at = EXCLUDED.updated_at, + spec_type = EXCLUDED.spec_type + RETURNING id + ` + + stmt, err := orm.ds.PrepareNamedContext(ctx, query) + if err != nil { + return 0, err + } + defer stmt.Close() + + spec.UpdatedAt = time.Now() + err = stmt.QueryRowxContext(ctx, spec).Scan(&id) + + if err != nil { + return 0, err + } + + return id, nil +} + +func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) { + query := ` + SELECT * + FROM workflow_specs + WHERE workflow_owner = $1 AND workflow_name = $2 + ` + + var spec job.WorkflowSpec + err := orm.ds.GetContext(ctx, &spec, query, owner, name) + if err != nil { + return nil, err + } + + return &spec, nil +} + +func (orm *orm) DeleteWorkflowSpec(ctx context.Context, owner, name string) error { + query := ` + DELETE FROM workflow_specs + WHERE workflow_owner = $1 AND workflow_name = $2 + ` + + result, err := orm.ds.ExecContext(ctx, query, owner, name) + if err != nil { + return err + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return err + } + + if rowsAffected == 0 { + return sql.ErrNoRows // No spec deleted + } + + return nil } diff --git a/core/services/workflows/syncer/orm_test.go b/core/services/workflows/syncer/orm_test.go index 8b9f685bb52..1be4e54f472 100644 --- a/core/services/workflows/syncer/orm_test.go +++ b/core/services/workflows/syncer/orm_test.go @@ -1,12 +1,15 @@ package syncer import ( + "database/sql" "encoding/hex" "testing" + "time" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/stretchr/testify/assert" @@ -51,3 +54,145 @@ func TestWorkflowArtifactsORM_GetAndUpdate(t *testing.T) { require.NoError(t, err) assert.Equal(t, "new contents", contents) } + +func Test_UpsertWorkflowSpec(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := &orm{ds: db, lggr: lggr} + + t.Run("inserts new spec", func(t *testing.T) { + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-123", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow", + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + _, err := orm.UpsertWorkflowSpec(ctx, spec) + require.NoError(t, err) + + // Verify the record exists in the database + var dbSpec job.WorkflowSpec + err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName) + require.NoError(t, err) + require.Equal(t, spec.Workflow, dbSpec.Workflow) + }) + + t.Run("updates existing spec", func(t *testing.T) { + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-123", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow", + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + _, err := orm.UpsertWorkflowSpec(ctx, spec) + require.NoError(t, err) + + // Update the status + spec.Status = job.WorkflowSpecStatusPaused + + _, err = orm.UpsertWorkflowSpec(ctx, spec) + require.NoError(t, err) + + // Verify the record is updated in the database + var dbSpec job.WorkflowSpec + err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName) + require.NoError(t, err) + require.Equal(t, spec.Config, dbSpec.Config) + require.Equal(t, spec.Status, dbSpec.Status) + }) +} + +func Test_DeleteWorkflowSpec(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := &orm{ds: db, lggr: lggr} + + t.Run("deletes a workflow spec", func(t *testing.T) { + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-123", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow", + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + id, err := orm.UpsertWorkflowSpec(ctx, spec) + require.NoError(t, err) + require.NotZero(t, id) + + err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) + require.NoError(t, err) + + // Verify the record is deleted from the database + var dbSpec job.WorkflowSpec + err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE id = $1`, id) + require.Error(t, err) + require.Equal(t, sql.ErrNoRows, err) + }) + + t.Run("fails if no workflow spec exists", func(t *testing.T) { + err := orm.DeleteWorkflowSpec(ctx, "owner-123", "Test Workflow") + require.Error(t, err) + require.Equal(t, sql.ErrNoRows, err) + }) +} + +func Test_GetWorkflowSpec(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := &orm{ds: db, lggr: lggr} + + t.Run("gets a workflow spec", func(t *testing.T) { + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-123", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow", + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + id, err := orm.UpsertWorkflowSpec(ctx, spec) + require.NoError(t, err) + require.NotZero(t, id) + + dbSpec, err := orm.GetWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) + require.NoError(t, err) + require.Equal(t, spec.Workflow, dbSpec.Workflow) + + err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) + require.NoError(t, err) + }) + + t.Run("fails if no workflow spec exists", func(t *testing.T) { + dbSpec, err := orm.GetWorkflowSpec(ctx, "owner-123", "Test Workflow") + require.Error(t, err) + require.Nil(t, dbSpec) + }) +} diff --git a/core/store/migrate/migrations/0260_add_status_workflow_spec.sql b/core/store/migrate/migrations/0260_add_status_workflow_spec.sql new file mode 100644 index 00000000000..66c38eef2f7 --- /dev/null +++ b/core/store/migrate/migrations/0260_add_status_workflow_spec.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- Add a `status` column to the `workflow_specs` table. +ALTER TABLE workflow_specs +ADD COLUMN status TEXT DEFAULT '' NOT NULL; + +-- +goose Down +-- Remove the `status` column from the `workflow_specs` table. +ALTER TABLE workflow_specs +DROP COLUMN status; \ No newline at end of file