From b8230b92d2c1b3d5ff894b2442a8930173623b41 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Tue, 22 Oct 2024 12:34:36 +0100 Subject: [PATCH] [CAPPL-132] Fetch secrets in workflow engine --- core/services/chainlink/application.go | 7 + core/services/workflows/delegate.go | 29 +-- core/services/workflows/engine.go | 172 ++++++++++-------- core/services/workflows/engine_test.go | 123 ++++++++++++- .../workflows/syncer/workflow_registry.go | 40 ++++ 5 files changed, 285 insertions(+), 86 deletions(-) create mode 100644 core/services/workflows/syncer/workflow_registry.go diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index a0a50477614..112b87cf0af 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -71,6 +71,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/webhook" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/sessions" "github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth" "github.com/smartcontractkit/chainlink/v2/core/sessions/localauth" @@ -212,6 +213,11 @@ func NewApplication(opts ApplicationOpts) (Application, error) { opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger) } + // TODO: wire this up to config so we only instantiate it + // if a workflow registry address is provided. + workflowRegistrySyncer := syncer.NewWorkflowRegistry() + srvcs = append(srvcs, workflowRegistrySyncer) + var externalPeerWrapper p2ptypes.PeerWrapper if cfg.Capabilities().Peering().Enabled() { var dispatcher remotetypes.Dispatcher @@ -465,6 +471,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { delegates[job.Workflow] = workflows.NewDelegate( globalLogger, opts.CapabilitiesRegistry, + workflowRegistrySyncer, workflowORM, ) diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 34b9cd3ad2d..266402c9afb 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -15,9 +15,10 @@ import ( ) type Delegate struct { - registry core.CapabilitiesRegistry - logger logger.Logger - store store.Store + registry core.CapabilitiesRegistry + secretsFetcher secretsFetcher + logger logger.Logger + store store.Store } var _ job.Delegate = (*Delegate)(nil) @@ -47,15 +48,16 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser } cfg := Config{ - Lggr: d.logger, - Workflow: sdkSpec, - WorkflowID: spec.WorkflowSpec.WorkflowID, - WorkflowOwner: spec.WorkflowSpec.WorkflowOwner, - WorkflowName: spec.WorkflowSpec.WorkflowName, - Registry: d.registry, - Store: d.store, - Config: []byte(spec.WorkflowSpec.Config), - Binary: binary, + Lggr: d.logger, + Workflow: sdkSpec, + WorkflowID: spec.WorkflowSpec.WorkflowID, + WorkflowOwner: spec.WorkflowSpec.WorkflowOwner, + WorkflowName: spec.WorkflowSpec.WorkflowName, + Registry: d.registry, + Store: d.store, + Config: []byte(spec.WorkflowSpec.Config), + Binary: binary, + SecretsFetcher: d.secretsFetcher, } engine, err := NewEngine(cfg) if err != nil { @@ -67,9 +69,10 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser func NewDelegate( logger logger.Logger, registry core.CapabilitiesRegistry, + secretsFetcher secretsFetcher, store store.Store, ) *Delegate { - return &Delegate{logger: logger, registry: registry, store: store} + return &Delegate{logger: logger, registry: registry, secretsFetcher: secretsFetcher, store: store} } func ValidatedWorkflowJobSpec(ctx context.Context, tomlString string) (job.Job, error) { diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index d6b1e793287..55d11ae1ec4 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -89,6 +89,10 @@ func (sucm *stepUpdateManager) len() int64 { return int64(len(sucm.m)) } +type secretsFetcher interface { + SecretsFor(workflowID string) (map[string]string, error) +} + // Engine handles the lifecycle of a single workflow and its executions. type Engine struct { services.StateMachine @@ -97,6 +101,7 @@ type Engine struct { logger logger.Logger registry core.CapabilitiesRegistry workflow *workflow + secretsFetcher secretsFetcher env exec.Env localNode capabilities.Node executionStates store.Store @@ -159,11 +164,9 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { for _, t := range e.workflow.triggers { tg, err := e.registry.GetTrigger(ctx, t.ID) if err != nil { - e.logger.With(cIDKey, t.ID).Errorf("failed to get trigger capability: %s", err) - cerr := e.cma.With(cIDKey, t.ID).SendLogAsCustomMessage(fmt.Sprintf("failed to resolve trigger: %s", err)) - if cerr != nil { - return cerr - } + log := e.logger.With(cIDKey, t.ID) + log.Errorf("failed to get trigger capability: %s", err) + logCustMsg(e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log) // we don't immediately return here, since we want to retry all triggers // to notify the user of all errors at once. triggersInitialized = false @@ -193,10 +196,11 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { err := e.initializeCapability(ctx, s) if err != nil { - cerr := e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref).SendLogAsCustomMessage(fmt.Sprintf("failed to initialize capability for step: %s", err)) - if cerr != nil { - return cerr - } + logCustMsg( + e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref), + fmt.Sprintf("failed to initialize capability for step: %s", err), + e.logger, + ) return &workflowError{err: err, reason: "failed to initialize capability for step", labels: map[string]string{ wIDKey: e.workflow.id, @@ -212,6 +216,8 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { } func (e *Engine) initializeCapability(ctx context.Context, step *step) error { + l := e.logger.With("capabilityID", step.ID) + // We use varadic err here so that err can be optional, but we assume that // its length is either 0 or 1 newCPErr := func(reason string, errs ...error) *workflowError { @@ -246,7 +252,6 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error { // Special treatment for local targets - wrap into a transmission capability // If the DON is nil, this is a local target. if info.CapabilityType == capabilities.CapabilityTypeTarget && info.IsLocal { - l := e.logger.With("capabilityID", step.ID) l.Debug("wrapping capability in local transmission protocol") cp = transmission.NewLocalTargetCapability( e.logger, @@ -263,29 +268,17 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error { return newCPErr("capability does not satisfy CallbackCapability") } - if step.config == nil { - c, interpErr := exec.FindAndInterpolateEnvVars(step.Config, e.env) - if interpErr != nil { - return newCPErr("failed to convert interpolate env vars from config", interpErr) - } - - config, ok := c.(map[string]any) - if !ok { - return newCPErr("failed to convert interpolate env vars from config into map") - } - - configMap, newMapErr := values.NewMap(config) - if newMapErr != nil { - return newCPErr("failed to convert config to values.Map", newMapErr) - } - step.config = configMap + stepConfig, err := e.configForStep(ctx, l, step) + if err != nil { + return newCPErr("failed to get config for step", err) } registrationRequest := capabilities.RegisterToWorkflowRequest{ Metadata: capabilities.RegistrationMetadata{ - WorkflowID: e.workflow.id, + WorkflowID: e.workflow.id, + WorkflowOwner: e.workflow.owner, }, - Config: step.config, + Config: stepConfig, } err = cc.RegisterToWorkflow(ctx, registrationRequest) @@ -343,11 +336,9 @@ func (e *Engine) init(ctx context.Context) { for idx, t := range e.workflow.triggers { terr := e.registerTrigger(ctx, t, idx) if terr != nil { - e.logger.With(cIDKey, t.ID).Errorf("failed to register trigger: %s", terr) - cerr := e.cma.With(cIDKey, t.ID).SendLogAsCustomMessage(fmt.Sprintf("failed to register trigger: %s", terr)) - if cerr != nil { - e.logger.Errorf("failed to send custom message for trigger: %s", terr) - } + log := e.logger.With(cIDKey, t.ID) + log.Errorf("failed to register trigger: %s", terr) + logCustMsg(e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log) } } @@ -621,10 +612,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow // This is to ensure that any side effects are executed consistently, since otherwise // the async nature of the workflow engine would provide no guarantees. } - err = cma.SendLogAsCustomMessage(fmt.Sprintf("execution status: %s", status)) - if err != nil { - return err - } + logCustMsg(cma, fmt.Sprintf("execution status: %s", status), l) return e.finishExecution(ctx, state.ExecutionID, status) } @@ -752,37 +740,26 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { } // TODO ks-462 inputs - err := cma.SendLogAsCustomMessage("executing step") - if err != nil { - return - } - inputs, outputs, err := e.executeStep(ctx, msg) + logCustMsg(cma, "executing step", l) + + inputs, outputs, err := e.executeStep(ctx, l, msg) var stepStatus string switch { case errors.Is(capabilities.ErrStopExecution, err): lmsg := "step executed successfully with a termination" l.Info(lmsg) - cmErr := cma.SendLogAsCustomMessage(lmsg) - if cmErr != nil { - l.Errorf("failed to send custom message with msg: %s", lmsg) - } + logCustMsg(cma, lmsg, l) stepStatus = store.StatusCompletedEarlyExit case err != nil: - lmsg := "step executed successfully with a termination" - l.Errorf("error executing step request: %s", err) - cmErr := cma.SendLogAsCustomMessage(fmt.Sprintf("error executing step request: %s", err)) - if cmErr != nil { - l.Errorf("failed to send custom message with msg: %s", lmsg) - } + lmsg := fmt.Sprintf("error executing step request: %s", err) + l.Error(lmsg) + logCustMsg(cma, lmsg, l) stepStatus = store.StatusErrored default: - lmsg := "step executed successfully with a termination" - l.With("outputs", outputs).Info("step executed successfully") + lmsg := "step executed successfully" + l.With("outputs", outputs).Info(lmsg) // TODO ks-462 emit custom message with outputs - cmErr := cma.SendLogAsCustomMessage("step executed successfully") - if cmErr != nil { - l.Errorf("failed to send custom message with msg: %s", lmsg) - } + logCustMsg(cma, lmsg, l) stepStatus = store.StatusCompleted } @@ -820,7 +797,43 @@ func merge(baseConfig *values.Map, overrideConfig *values.Map) *values.Map { return m } -func (e *Engine) configForStep(ctx context.Context, executionID string, step *step) (*values.Map, error) { +func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*values.Map, error) { + conf, err := exec.FindAndInterpolateEnvVars(config, env) + if err != nil { + return nil, err + } + + confm, ok := conf.(map[string]any) + if !ok { + return nil, err + } + + configMap, err := values.NewMap(confm) + if err != nil { + return nil, err + } + return configMap, nil +} + +// configForStep fetches the config for the step from the workflow registry (for secrets) and from the capabilities +// registry (for capability-level configuration). It doesn't perform any caching of the config values, since +// the two registries perform their own caching. +func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) { + secrets, err := e.secretsFetcher.SecretsFor(e.workflow.id) + if err != nil { + return nil, fmt.Errorf("failed to fetch secrets: %w", err) + } + + env := exec.Env{ + Config: e.env.Config, + Binary: e.env.Binary, + Secrets: secrets, + } + config, err := e.interpolateEnvVars(step.Config, env) + if err != nil { + return nil, fmt.Errorf("failed to interpolate env vars: %w", err) + } + ID := step.info.ID // If the capability info is missing a DON, then @@ -834,22 +847,22 @@ func (e *Engine) configForStep(ctx context.Context, executionID string, step *st capConfig, err := e.registry.ConfigForCapability(ctx, ID, donID) if err != nil { - e.logger.Warnw(fmt.Sprintf("could not retrieve config from remote registry: %s", err), "executionID", executionID, "capabilityID", ID) - return step.config, nil + lggr.Warnw(fmt.Sprintf("could not retrieve config from remote registry: %s", err), "capabilityID", ID) + return config, nil } if capConfig.DefaultConfig == nil { - return step.config, nil + return config, nil } // Merge the configs with registry config overriding the step config. This is because // some config fields are sensitive and could affect the safe running of the capability, // so we avoid user provided values by overriding them with config from the capabilities registry. - return merge(step.config, capConfig.DefaultConfig), nil + return merge(config, capConfig.DefaultConfig), nil } // executeStep executes the referenced capability within a step and returns the result. -func (e *Engine) executeStep(ctx context.Context, msg stepRequest) (*values.Map, values.Value, error) { +func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRequest) (*values.Map, values.Value, error) { step, err := e.workflow.Vertex(msg.stepRef) if err != nil { return nil, nil, err @@ -872,7 +885,7 @@ func (e *Engine) executeStep(ctx context.Context, msg stepRequest) (*values.Map, return nil, nil, err } - config, err := e.configForStep(ctx, msg.state.ExecutionID, step) + config, err := e.configForStep(ctx, lggr, step) if err != nil { return nil, nil, err } @@ -1037,11 +1050,17 @@ func (e *Engine) Close() error { return nil } + stepConfig, err := e.configForStep(ctx, e.logger, s) + if err != nil { + return fmt.Errorf("cannot fetch config for step: %w", err) + } + reg := capabilities.UnregisterFromWorkflowRequest{ Metadata: capabilities.RegistrationMetadata{ - WorkflowID: e.workflow.id, + WorkflowID: e.workflow.id, + WorkflowOwner: e.workflow.owner, }, - Config: s.config, + Config: stepConfig, } // if capability is nil, then we haven't initialized @@ -1086,6 +1105,7 @@ type Config struct { Store store.Store Config []byte Binary []byte + SecretsFetcher secretsFetcher // For testing purposes only maxRetries int @@ -1161,11 +1181,12 @@ func NewEngine(cfg Config) (engine *Engine, err error) { workflow.name = hex.EncodeToString([]byte(cfg.WorkflowName)) engine = &Engine{ - logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), - cma: monitoring.NewCustomMessageLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name), - metrics: workflowsMetricLabeler{monitoring.NewMetricsLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)}, - registry: cfg.Registry, - workflow: workflow, + logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), + cma: monitoring.NewCustomMessageLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name), + metrics: workflowsMetricLabeler{monitoring.NewMetricsLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)}, + registry: cfg.Registry, + workflow: workflow, + secretsFetcher: cfg.SecretsFetcher, env: exec.Env{ Config: cfg.Config, Binary: cfg.Binary, @@ -1219,3 +1240,10 @@ func (e *workflowError) Error() string { return errStr } + +func logCustMsg(cma monitoring.CustomMessageLabeler, msg string, log logger.Logger) { + err := cma.SendLogAsCustomMessage(msg) + if err != nil { + log.Errorf("failed to send custom message with msg: %s", msg) + } +} diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 67ec5c49cac..c3479809ec2 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -31,6 +31,7 @@ import ( p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" ) const testWorkflowId = "" @@ -169,7 +170,8 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec onExecutionFinished: func(weid string) { executionFinished <- weid }, - clock: clock, + SecretsFetcher: syncer.NewWorkflowRegistry(), + clock: clock, } for _, o := range opts { o(&cfg) @@ -1499,3 +1501,122 @@ func TestEngine_CustomComputePropagatesBreaks(t *testing.T) { assert.Equal(t, state.Status, store.StatusCompletedEarlyExit) } + +const secretsWorkflow = ` +triggers: + - id: "mercury-trigger@1.0.0" + config: + feedlist: + - "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD + - "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD + - "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD + +actions: + - id: custom_compute@1.0.0 + ref: custom_compute + config: + fidelityToken: $(ENV.secrets.fidelity) + inputs: + action: + - $(trigger.outputs) + +consensus: + - id: "offchain_reporting@1.0.0" + ref: "evm_median" + inputs: + observations: + - "$(trigger.outputs)" + config: + aggregation_method: "data_feeds_2_0" + aggregation_config: + "0x1111111111111111111100000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: 3600 + "0x2222222222222222222200000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: 3600 + "0x3333333333333333333300000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: 3600 + encoder: "EVM" + encoder_config: + abi: "mercury_reports bytes[]" + +targets: + - id: "write_ethereum-testnet-sepolia@1.0.0" + inputs: "$(evm_median.outputs)" + config: + address: "0x54e220867af6683aE6DcBF535B4f952cB5116510" + params: ["$(report)"] + abi: "receive(report bytes)" +` + +type mockFetcher struct { + retval map[string]string +} + +func (m *mockFetcher) SecretsFor(workflowID string) (map[string]string, error) { + return m.retval, nil +} + +func TestEngine_FetchesSecrets(t *testing.T) { + ctx := testutils.Context(t) + reg := coreCap.NewRegistry(logger.TestLogger(t)) + + trigger, _ := mockTrigger(t) + require.NoError(t, reg.Add(ctx, trigger)) + + require.NoError(t, reg.Add(ctx, mockConsensus(""))) + + target := mockTarget("write_ethereum-testnet-sepolia@1.0.0") + require.NoError(t, reg.Add(ctx, target)) + + var gotConfig *values.Map + action := newMockCapability( + // Create a remote capability so we don't use the local transmission protocol. + capabilities.MustNewRemoteCapabilityInfo( + "custom_compute@1.0.0", + capabilities.CapabilityTypeAction, + "a custom compute action with custom config", + &capabilities.DON{ID: 1}, + ), + func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { + // Replace the empty config with the write target config. + gotConfig = req.Config + + return capabilities.CapabilityResponse{ + Value: req.Inputs, + }, nil + }, + ) + require.NoError(t, reg.Add(ctx, action)) + + eng, testHooks := newTestEngineWithYAMLSpec( + t, + reg, + secretsWorkflow, + func(c *Config) { + c.SecretsFetcher = &mockFetcher{ + retval: map[string]string{ + "fidelity": "aFidelitySecret", + }, + } + }, + ) + + servicetest.Run(t, eng) + + eid := getExecutionId(t, eng, testHooks) + + state, err := eng.executionStates.Get(ctx, eid) + require.NoError(t, err) + + assert.Equal(t, state.Status, store.StatusCompleted) + + expected := map[string]any{ + "fidelityToken": "aFidelitySecret", + } + expm, err := values.Wrap(expected) + require.NoError(t, err) + assert.Equal(t, gotConfig, expm) +} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go new file mode 100644 index 00000000000..46cd987fc21 --- /dev/null +++ b/core/services/workflows/syncer/workflow_registry.go @@ -0,0 +1,40 @@ +package syncer + +import ( + "context" + + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +type WorkflowRegistry struct { + services.StateMachine +} + +func (w *WorkflowRegistry) Start(ctx context.Context) error { + return nil +} + +func (w *WorkflowRegistry) Close() error { + return nil +} + +func (w *WorkflowRegistry) Ready() error { + return nil +} + +func (w *WorkflowRegistry) HealthReport() map[string]error { + return nil +} + +func (w *WorkflowRegistry) Name() string { + return "WorkflowRegistrySyncer" +} + +func (w *WorkflowRegistry) SecretsFor(workflowID string) (map[string]string, error) { + // TODO: actually get this from the right place. + return map[string]string{}, nil +} + +func NewWorkflowRegistry() *WorkflowRegistry { + return &WorkflowRegistry{} +}