diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 1326d8694c3..e264ee138a1 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -27,7 +27,11 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) -const fifteenMinutesMs = 15 * 60 * 1000 +const ( + fifteenMinutesMs = 15 * 60 * 1000 + reservedFieldNameStepTimeout = "cre_step_timeout" + maxStepTimeoutOverrideSec = 10 * 60 // 10 minutes +) type stepRequest struct { stepRef string @@ -769,10 +773,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { // TODO ks-462 inputs logCustMsg(ctx, cma, "executing step", l) - stepCtx, cancel := context.WithTimeout(ctx, e.stepTimeoutDuration) - defer cancel() - - inputs, outputs, err := e.executeStep(stepCtx, l, msg) + inputs, outputs, err := e.executeStep(ctx, l, msg) var stepStatus string switch { case errors.Is(capabilities.ErrStopExecution, err): @@ -919,6 +920,20 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe if err != nil { return nil, nil, err } + stepTimeoutDuration := e.stepTimeoutDuration + if timeoutOverride, ok := config.Underlying[reservedFieldNameStepTimeout]; ok { + var desiredTimeout int64 + err2 := timeoutOverride.UnwrapTo(&desiredTimeout) + if err2 != nil { + e.logger.Warnw("couldn't decode step timeout override, using default", "error", err2, "default", stepTimeoutDuration) + } else { + if desiredTimeout > maxStepTimeoutOverrideSec { + e.logger.Warnw("desired step timeout is too large, limiting to max value", "maxValue", maxStepTimeoutOverrideSec) + desiredTimeout = maxStepTimeoutOverrideSec + } + stepTimeoutDuration = time.Duration(desiredTimeout) * time.Second + } + } tr := capabilities.CapabilityRequest{ Inputs: inputsMap, @@ -934,8 +949,11 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe }, } - e.metrics.incrementCapabilityInvocationCounter(ctx) - output, err := step.capability.Execute(ctx, tr) + stepCtx, cancel := context.WithTimeout(ctx, stepTimeoutDuration) + defer cancel() + + e.metrics.incrementCapabilityInvocationCounter(stepCtx) + output, err := step.capability.Execute(stepCtx, tr) if err != nil { return inputsMap, nil, err } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 7b6993be564..70216ac8c78 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -86,6 +86,7 @@ targets: address: "0x54e220867af6683aE6DcBF535B4f952cB5116510" params: ["$(report)"] abi: "receive(report bytes)" + cre_step_timeout: 610 ` type testHooks struct {