Skip to content

Commit

Permalink
[CAPPL-132] Fetch secrets in workflow engine
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Oct 22, 2024
1 parent 09145ba commit b8230b9
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 86 deletions.
7 changes: 7 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth"
"github.com/smartcontractkit/chainlink/v2/core/sessions/localauth"
Expand Down Expand Up @@ -212,6 +213,11 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)
}

// TODO: wire this up to config so we only instantiate it
// if a workflow registry address is provided.
workflowRegistrySyncer := syncer.NewWorkflowRegistry()
srvcs = append(srvcs, workflowRegistrySyncer)

var externalPeerWrapper p2ptypes.PeerWrapper
if cfg.Capabilities().Peering().Enabled() {
var dispatcher remotetypes.Dispatcher
Expand Down Expand Up @@ -465,6 +471,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
delegates[job.Workflow] = workflows.NewDelegate(
globalLogger,
opts.CapabilitiesRegistry,
workflowRegistrySyncer,
workflowORM,
)

Expand Down
29 changes: 16 additions & 13 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
)

type Delegate struct {
registry core.CapabilitiesRegistry
logger logger.Logger
store store.Store
registry core.CapabilitiesRegistry
secretsFetcher secretsFetcher
logger logger.Logger
store store.Store
}

var _ job.Delegate = (*Delegate)(nil)
Expand Down Expand Up @@ -47,15 +48,16 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
}

cfg := Config{
Lggr: d.logger,
Workflow: sdkSpec,
WorkflowID: spec.WorkflowSpec.WorkflowID,
WorkflowOwner: spec.WorkflowSpec.WorkflowOwner,
WorkflowName: spec.WorkflowSpec.WorkflowName,
Registry: d.registry,
Store: d.store,
Config: []byte(spec.WorkflowSpec.Config),
Binary: binary,
Lggr: d.logger,
Workflow: sdkSpec,
WorkflowID: spec.WorkflowSpec.WorkflowID,
WorkflowOwner: spec.WorkflowSpec.WorkflowOwner,
WorkflowName: spec.WorkflowSpec.WorkflowName,
Registry: d.registry,
Store: d.store,
Config: []byte(spec.WorkflowSpec.Config),
Binary: binary,
SecretsFetcher: d.secretsFetcher,
}
engine, err := NewEngine(cfg)
if err != nil {
Expand All @@ -67,9 +69,10 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
func NewDelegate(
logger logger.Logger,
registry core.CapabilitiesRegistry,
secretsFetcher secretsFetcher,
store store.Store,
) *Delegate {
return &Delegate{logger: logger, registry: registry, store: store}
return &Delegate{logger: logger, registry: registry, secretsFetcher: secretsFetcher, store: store}
}

func ValidatedWorkflowJobSpec(ctx context.Context, tomlString string) (job.Job, error) {
Expand Down
172 changes: 100 additions & 72 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (sucm *stepUpdateManager) len() int64 {
return int64(len(sucm.m))
}

type secretsFetcher interface {
SecretsFor(workflowID string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
type Engine struct {
services.StateMachine
Expand All @@ -97,6 +101,7 @@ type Engine struct {
logger logger.Logger
registry core.CapabilitiesRegistry
workflow *workflow
secretsFetcher secretsFetcher
env exec.Env
localNode capabilities.Node
executionStates store.Store
Expand Down Expand Up @@ -159,11 +164,9 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
for _, t := range e.workflow.triggers {
tg, err := e.registry.GetTrigger(ctx, t.ID)
if err != nil {
e.logger.With(cIDKey, t.ID).Errorf("failed to get trigger capability: %s", err)
cerr := e.cma.With(cIDKey, t.ID).SendLogAsCustomMessage(fmt.Sprintf("failed to resolve trigger: %s", err))
if cerr != nil {
return cerr
}
log := e.logger.With(cIDKey, t.ID)
log.Errorf("failed to get trigger capability: %s", err)
logCustMsg(e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
// we don't immediately return here, since we want to retry all triggers
// to notify the user of all errors at once.
triggersInitialized = false
Expand Down Expand Up @@ -193,10 +196,11 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {

err := e.initializeCapability(ctx, s)
if err != nil {
cerr := e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref).SendLogAsCustomMessage(fmt.Sprintf("failed to initialize capability for step: %s", err))
if cerr != nil {
return cerr
}
logCustMsg(
e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref),
fmt.Sprintf("failed to initialize capability for step: %s", err),
e.logger,
)
return &workflowError{err: err, reason: "failed to initialize capability for step",
labels: map[string]string{
wIDKey: e.workflow.id,
Expand All @@ -212,6 +216,8 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
}

func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
l := e.logger.With("capabilityID", step.ID)

// We use varadic err here so that err can be optional, but we assume that
// its length is either 0 or 1
newCPErr := func(reason string, errs ...error) *workflowError {
Expand Down Expand Up @@ -246,7 +252,6 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
// Special treatment for local targets - wrap into a transmission capability
// If the DON is nil, this is a local target.
if info.CapabilityType == capabilities.CapabilityTypeTarget && info.IsLocal {
l := e.logger.With("capabilityID", step.ID)
l.Debug("wrapping capability in local transmission protocol")
cp = transmission.NewLocalTargetCapability(
e.logger,
Expand All @@ -263,29 +268,17 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
return newCPErr("capability does not satisfy CallbackCapability")
}

if step.config == nil {
c, interpErr := exec.FindAndInterpolateEnvVars(step.Config, e.env)
if interpErr != nil {
return newCPErr("failed to convert interpolate env vars from config", interpErr)
}

config, ok := c.(map[string]any)
if !ok {
return newCPErr("failed to convert interpolate env vars from config into map")
}

configMap, newMapErr := values.NewMap(config)
if newMapErr != nil {
return newCPErr("failed to convert config to values.Map", newMapErr)
}
step.config = configMap
stepConfig, err := e.configForStep(ctx, l, step)
if err != nil {
return newCPErr("failed to get config for step", err)
}

registrationRequest := capabilities.RegisterToWorkflowRequest{
Metadata: capabilities.RegistrationMetadata{
WorkflowID: e.workflow.id,
WorkflowID: e.workflow.id,
WorkflowOwner: e.workflow.owner,
},
Config: step.config,
Config: stepConfig,
}

err = cc.RegisterToWorkflow(ctx, registrationRequest)
Expand Down Expand Up @@ -343,11 +336,9 @@ func (e *Engine) init(ctx context.Context) {
for idx, t := range e.workflow.triggers {
terr := e.registerTrigger(ctx, t, idx)
if terr != nil {
e.logger.With(cIDKey, t.ID).Errorf("failed to register trigger: %s", terr)
cerr := e.cma.With(cIDKey, t.ID).SendLogAsCustomMessage(fmt.Sprintf("failed to register trigger: %s", terr))
if cerr != nil {
e.logger.Errorf("failed to send custom message for trigger: %s", terr)
}
log := e.logger.With(cIDKey, t.ID)
log.Errorf("failed to register trigger: %s", terr)
logCustMsg(e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
}
}

Expand Down Expand Up @@ -621,10 +612,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow
// This is to ensure that any side effects are executed consistently, since otherwise
// the async nature of the workflow engine would provide no guarantees.
}
err = cma.SendLogAsCustomMessage(fmt.Sprintf("execution status: %s", status))
if err != nil {
return err
}
logCustMsg(cma, fmt.Sprintf("execution status: %s", status), l)

Check failure on line 615 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / lint

fmt.Sprintf can be replaced with string concatenation (perfsprint)
return e.finishExecution(ctx, state.ExecutionID, status)
}

Expand Down Expand Up @@ -752,37 +740,26 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
}

// TODO ks-462 inputs
err := cma.SendLogAsCustomMessage("executing step")
if err != nil {
return
}
inputs, outputs, err := e.executeStep(ctx, msg)
logCustMsg(cma, "executing step", l)

inputs, outputs, err := e.executeStep(ctx, l, msg)
var stepStatus string
switch {
case errors.Is(capabilities.ErrStopExecution, err):
lmsg := "step executed successfully with a termination"
l.Info(lmsg)
cmErr := cma.SendLogAsCustomMessage(lmsg)
if cmErr != nil {
l.Errorf("failed to send custom message with msg: %s", lmsg)
}
logCustMsg(cma, lmsg, l)
stepStatus = store.StatusCompletedEarlyExit
case err != nil:
lmsg := "step executed successfully with a termination"
l.Errorf("error executing step request: %s", err)
cmErr := cma.SendLogAsCustomMessage(fmt.Sprintf("error executing step request: %s", err))
if cmErr != nil {
l.Errorf("failed to send custom message with msg: %s", lmsg)
}
lmsg := fmt.Sprintf("error executing step request: %s", err)
l.Error(lmsg)
logCustMsg(cma, lmsg, l)
stepStatus = store.StatusErrored
default:
lmsg := "step executed successfully with a termination"
l.With("outputs", outputs).Info("step executed successfully")
lmsg := "step executed successfully"
l.With("outputs", outputs).Info(lmsg)
// TODO ks-462 emit custom message with outputs
cmErr := cma.SendLogAsCustomMessage("step executed successfully")
if cmErr != nil {
l.Errorf("failed to send custom message with msg: %s", lmsg)
}
logCustMsg(cma, lmsg, l)
stepStatus = store.StatusCompleted
}

Expand Down Expand Up @@ -820,7 +797,43 @@ func merge(baseConfig *values.Map, overrideConfig *values.Map) *values.Map {
return m
}

func (e *Engine) configForStep(ctx context.Context, executionID string, step *step) (*values.Map, error) {
func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*values.Map, error) {
conf, err := exec.FindAndInterpolateEnvVars(config, env)
if err != nil {
return nil, err
}

confm, ok := conf.(map[string]any)
if !ok {
return nil, err
}

configMap, err := values.NewMap(confm)
if err != nil {
return nil, err
}
return configMap, nil
}

// configForStep fetches the config for the step from the workflow registry (for secrets) and from the capabilities
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(e.workflow.id)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}

env := exec.Env{
Config: e.env.Config,
Binary: e.env.Binary,
Secrets: secrets,
}
config, err := e.interpolateEnvVars(step.Config, env)
if err != nil {
return nil, fmt.Errorf("failed to interpolate env vars: %w", err)
}

ID := step.info.ID

// If the capability info is missing a DON, then
Expand All @@ -834,22 +847,22 @@ func (e *Engine) configForStep(ctx context.Context, executionID string, step *st

capConfig, err := e.registry.ConfigForCapability(ctx, ID, donID)
if err != nil {
e.logger.Warnw(fmt.Sprintf("could not retrieve config from remote registry: %s", err), "executionID", executionID, "capabilityID", ID)
return step.config, nil
lggr.Warnw(fmt.Sprintf("could not retrieve config from remote registry: %s", err), "capabilityID", ID)
return config, nil
}

if capConfig.DefaultConfig == nil {
return step.config, nil
return config, nil
}

// Merge the configs with registry config overriding the step config. This is because
// some config fields are sensitive and could affect the safe running of the capability,
// so we avoid user provided values by overriding them with config from the capabilities registry.
return merge(step.config, capConfig.DefaultConfig), nil
return merge(config, capConfig.DefaultConfig), nil
}

// executeStep executes the referenced capability within a step and returns the result.
func (e *Engine) executeStep(ctx context.Context, msg stepRequest) (*values.Map, values.Value, error) {
func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRequest) (*values.Map, values.Value, error) {
step, err := e.workflow.Vertex(msg.stepRef)
if err != nil {
return nil, nil, err
Expand All @@ -872,7 +885,7 @@ func (e *Engine) executeStep(ctx context.Context, msg stepRequest) (*values.Map,
return nil, nil, err
}

config, err := e.configForStep(ctx, msg.state.ExecutionID, step)
config, err := e.configForStep(ctx, lggr, step)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1037,11 +1050,17 @@ func (e *Engine) Close() error {
return nil
}

stepConfig, err := e.configForStep(ctx, e.logger, s)
if err != nil {
return fmt.Errorf("cannot fetch config for step: %w", err)
}

reg := capabilities.UnregisterFromWorkflowRequest{
Metadata: capabilities.RegistrationMetadata{
WorkflowID: e.workflow.id,
WorkflowID: e.workflow.id,
WorkflowOwner: e.workflow.owner,
},
Config: s.config,
Config: stepConfig,
}

// if capability is nil, then we haven't initialized
Expand Down Expand Up @@ -1086,6 +1105,7 @@ type Config struct {
Store store.Store
Config []byte
Binary []byte
SecretsFetcher secretsFetcher

// For testing purposes only
maxRetries int
Expand Down Expand Up @@ -1161,11 +1181,12 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
workflow.name = hex.EncodeToString([]byte(cfg.WorkflowName))

engine = &Engine{
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
cma: monitoring.NewCustomMessageLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name),
metrics: workflowsMetricLabeler{monitoring.NewMetricsLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)},
registry: cfg.Registry,
workflow: workflow,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
cma: monitoring.NewCustomMessageLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name),
metrics: workflowsMetricLabeler{monitoring.NewMetricsLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)},
registry: cfg.Registry,
workflow: workflow,
secretsFetcher: cfg.SecretsFetcher,
env: exec.Env{
Config: cfg.Config,
Binary: cfg.Binary,
Expand Down Expand Up @@ -1219,3 +1240,10 @@ func (e *workflowError) Error() string {

return errStr
}

func logCustMsg(cma monitoring.CustomMessageLabeler, msg string, log logger.Logger) {
err := cma.SendLogAsCustomMessage(msg)
if err != nil {
log.Errorf("failed to send custom message with msg: %s", msg)
}
}
Loading

0 comments on commit b8230b9

Please sign in to comment.