Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow Engine beholder logging #14958

Merged
merged 11 commits into from
Oct 31, 2024
4 changes: 4 additions & 0 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/google/uuid"
"github.com/pelletier/go-toml"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"

"github.com/smartcontractkit/chainlink-common/pkg/types/core"

Expand Down Expand Up @@ -37,13 +38,16 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) {
cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName)
sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx)
if err != nil {
logCustMsg(cma, fmt.Sprintf("failed to convert to sdk workflow spec: %v", err), d.logger)
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

binary, err := spec.WorkflowSpec.RawSpec(ctx)
if err != nil {
logCustMsg(cma, fmt.Sprintf("failed to fetch binary: %v", err), d.logger)
return nil, err
}

Expand Down
41 changes: 39 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type Engine struct {
stopCh services.StopChan
newWorkerTimeout time.Duration
maxExecutionDuration time.Duration
heartbeatCadence time.Duration
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved

// testing lifecycle hook to signal when an execution is finished.
onExecutionFinished func(string)
Expand Down Expand Up @@ -147,6 +148,9 @@ func (e *Engine) Start(_ context.Context) error {
e.wg.Add(1)
go e.init(ctx)

e.wg.Add(1)
go e.heartbeat(ctx)

return nil
})
}
Expand Down Expand Up @@ -342,6 +346,7 @@ func (e *Engine) init(ctx context.Context) {
}

e.logger.Info("engine initialized")
logCustMsg(e.cma, "workflow registered", e.logger)
cedric-cordenier marked this conversation as resolved.
Show resolved Hide resolved
e.afterInit(true)
}

Expand Down Expand Up @@ -715,9 +720,15 @@ func (e *Engine) worker(ctx context.Context) {
continue
}

cma := e.cma.With(eIDKey, executionID)
logCustMsg(cma, "starting execution on a trigger event", e.logger)
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
if err != nil {
e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
} else {
e.logger.With(eIDKey, executionID).Debug("execution started")
logCustMsg(cma, "execution started", e.logger)
}
case <-ctx.Done():
return
Expand Down Expand Up @@ -1026,6 +1037,23 @@ func (e *Engine) isWorkflowFullyProcessed(ctx context.Context, state store.Workf
return workflowProcessed, store.StatusCompleted, nil
}

func (e *Engine) heartbeat(ctx context.Context) {
defer e.wg.Done()

ticker := time.NewTicker(e.heartbeatCadence)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
e.logger.Info("shutting down heartbeat")
return
case <-ticker.C:
logCustMsg(e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger)
}
}
}

func (e *Engine) Close() error {
return e.StopOnce("Engine", func() error {
e.logger.Info("shutting down engine")
Expand Down Expand Up @@ -1085,7 +1113,7 @@ func (e *Engine) Close() error {
if err != nil {
return err
}

logCustMsg(e.cma, "workflow unregistered", e.logger)
return nil
})
}
Expand All @@ -1105,6 +1133,7 @@ type Config struct {
Config []byte
Binary []byte
SecretsFetcher secretsFetcher
HeartbeatCadence time.Duration

// For testing purposes only
maxRetries int
Expand All @@ -1119,6 +1148,7 @@ const (
defaultQueueSize = 100000
defaultNewWorkerTimeout = 2 * time.Second
defaultMaxExecutionDuration = 10 * time.Minute
defaultHeartbeatCadence = 5 * time.Minute
)

func NewEngine(cfg Config) (engine *Engine, err error) {
Expand Down Expand Up @@ -1146,6 +1176,10 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
cfg.MaxExecutionDuration = defaultMaxExecutionDuration
}

if cfg.HeartbeatCadence == 0 {
cfg.HeartbeatCadence = defaultHeartbeatCadence
}

if cfg.retryMs == 0 {
cfg.retryMs = 5000
}
Expand All @@ -1172,6 +1206,8 @@ func NewEngine(cfg Config) (engine *Engine, err error) {

workflow, err := Parse(cfg.Workflow)
if err != nil {
cma := custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, cfg.WorkflowName)
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
logCustMsg(cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
return nil, err
}

Expand All @@ -1197,6 +1233,7 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
stopCh: make(chan struct{}),
newWorkerTimeout: cfg.NewWorkerTimeout,
maxExecutionDuration: cfg.MaxExecutionDuration,
heartbeatCadence: cfg.HeartbeatCadence,
onExecutionFinished: cfg.onExecutionFinished,
afterInit: cfg.afterInit,
maxRetries: cfg.maxRetries,
Expand Down Expand Up @@ -1243,6 +1280,6 @@ func (e *workflowError) Error() string {
func logCustMsg(cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(msg)
if err != nil {
log.Errorf("failed to send custom message with msg: %s", msg)
log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}
Loading