diff --git a/core/services/job/models.go b/core/services/job/models.go index b769106d647..233912d09c2 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -87,6 +87,7 @@ var ( Stream: true, VRF: true, Webhook: true, + Workflow: false, } supportsAsync = map[Type]bool{ BlockHeaderFeeder: false, @@ -104,6 +105,7 @@ var ( Stream: true, VRF: true, Webhook: true, + Workflow: false, } schemaVersions = map[Type]uint32{ BlockHeaderFeeder: 1, @@ -121,6 +123,7 @@ var ( Stream: 1, VRF: 1, Webhook: 1, + Workflow: 1, } ) diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 32307b9fb9e..13a8bda4043 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -1,6 +1,11 @@ package workflows import ( + "fmt" + + "github.com/google/uuid" + "github.com/pelletier/go-toml" + "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink/v2/core/capabilities/targets" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -43,3 +48,23 @@ func NewDelegate(logger logger.Logger, registry types.CapabilitiesRegistry, lega return &Delegate{logger: logger, registry: registry} } + +func ValidatedWorkflowSpec(tomlString string) (job.Job, error) { + var jb = job.Job{ExternalJobID: uuid.New()} + + tree, err := toml.Load(tomlString) + if err != nil { + return jb, fmt.Errorf("toml error on load: %w", err) + } + + err = tree.Unmarshal(&jb) + if err != nil { + return jb, fmt.Errorf("toml unmarshal error on spec: %w", err) + } + + if jb.Type != job.Workflow { + return jb, fmt.Errorf("unsupported type %s", jb.Type) + } + + return jb, nil +} diff --git a/core/services/workflows/delegate_test.go b/core/services/workflows/delegate_test.go new file mode 100644 index 00000000000..fd2df9141bc --- /dev/null +++ b/core/services/workflows/delegate_test.go @@ -0,0 +1,54 @@ +package workflows_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" +) + +func TestDelegate_JobSpecValidator(t *testing.T) { + t.Parallel() + + var tt = []struct { + name string + toml string + valid bool + }{ + { + "valid spec", + ` +type = "workflow" +schemaVersion = 1 +`, + true, + }, + { + "parse error", + ` +invalid syntax{{{{ +`, + false, + }, + { + "invalid job type", + ` +type = "work flows" +schemaVersion = 1 +`, + false, + }, + } + for _, tc := range tt { + tc := tc + t.Run(tc.name, func(t *testing.T) { + _, err := workflows.ValidatedWorkflowSpec(tc.toml) + if tc.valid { + require.NoError(t, err) + } else { + require.Error(t, err) + } + }) + } +} diff --git a/core/store/migrate/migrations/0223_workflow_spec_validation.sql b/core/store/migrate/migrations/0223_workflow_spec_validation.sql new file mode 100644 index 00000000000..6932cc2b759 --- /dev/null +++ b/core/store/migrate/migrations/0223_workflow_spec_validation.sql @@ -0,0 +1,42 @@ +-- +goose Up +ALTER TABLE + jobs +DROP + CONSTRAINT chk_specs, +ADD + CONSTRAINT chk_specs CHECK ( + num_nonnulls( + ocr_oracle_spec_id, ocr2_oracle_spec_id, + direct_request_spec_id, flux_monitor_spec_id, + keeper_spec_id, cron_spec_id, webhook_spec_id, + vrf_spec_id, blockhash_store_spec_id, + block_header_feeder_spec_id, bootstrap_spec_id, + gateway_spec_id, + legacy_gas_station_server_spec_id, + legacy_gas_station_sidecar_spec_id, + eal_spec_id, + CASE "type" WHEN 'stream' THEN 1 ELSE NULL END, -- 'stream' type lacks a spec but should not cause validation to fail + CASE "type" WHEN 'workflow' THEN 1 ELSE NULL END -- 'workflow' type currently lacks a spec but should not cause validation to fail + ) = 1 + ); + +-- +goose Down +ALTER TABLE + jobs +DROP + CONSTRAINT chk_specs, +ADD + CONSTRAINT chk_specs CHECK ( + num_nonnulls( + ocr_oracle_spec_id, ocr2_oracle_spec_id, + direct_request_spec_id, flux_monitor_spec_id, + keeper_spec_id, cron_spec_id, webhook_spec_id, + vrf_spec_id, blockhash_store_spec_id, + block_header_feeder_spec_id, bootstrap_spec_id, + gateway_spec_id, + legacy_gas_station_server_spec_id, + legacy_gas_station_sidecar_spec_id, + eal_spec_id, + CASE "type" WHEN 'stream' THEN 1 ELSE NULL END -- 'stream' type lacks a spec but should not cause validation to fail + ) = 1 + ); diff --git a/core/web/jobs_controller.go b/core/web/jobs_controller.go index 4e11f68097d..6296c6a016f 100644 --- a/core/web/jobs_controller.go +++ b/core/web/jobs_controller.go @@ -30,6 +30,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/streams" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/web/presenters" ) @@ -253,6 +254,8 @@ func (jc *JobsController) validateJobSpec(tomlString string) (jb job.Job, status jb, err = gateway.ValidatedGatewaySpec(tomlString) case job.Stream: jb, err = streams.ValidatedStreamSpec(tomlString) + case job.Workflow: + jb, err = workflows.ValidatedWorkflowSpec(tomlString) default: return jb, http.StatusUnprocessableEntity, errors.Errorf("unknown job type: %s", jobType) } diff --git a/core/web/resolver/mutation.go b/core/web/resolver/mutation.go index 996b3859a55..685fbe61ccb 100644 --- a/core/web/resolver/mutation.go +++ b/core/web/resolver/mutation.go @@ -38,6 +38,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/store/models" "github.com/smartcontractkit/chainlink/v2/core/utils" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" @@ -1047,6 +1048,8 @@ func (r *Resolver) CreateJob(ctx context.Context, args struct { jb, err = ocrbootstrap.ValidatedBootstrapSpecToml(args.Input.TOML) case job.Gateway: jb, err = gateway.ValidatedGatewaySpec(args.Input.TOML) + case job.Workflow: + jb, err = workflows.ValidatedWorkflowSpec(args.Input.TOML) default: return NewCreateJobPayload(r.App, nil, map[string]string{ "Job Type": fmt.Sprintf("unknown job type: %s", jbt),