Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow Engine beholder logging #14958

Merged
merged 11 commits into from
Oct 31, 2024
5 changes: 5 additions & 0 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/google/uuid"
"github.com/pelletier/go-toml"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"

"github.com/smartcontractkit/chainlink-common/pkg/types/core"

Expand Down Expand Up @@ -37,18 +38,22 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) {
cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName)
sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx)
if err != nil {
logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger)
return nil, err
}

binary, err := spec.WorkflowSpec.RawSpec(ctx)
if err != nil {
logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to fetch workflow spec binary: %v", err), d.logger)
return nil, err
}

config, err := spec.WorkflowSpec.GetConfig(ctx)
if err != nil {
logCustMsg(cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow spec config: %v", err), d.logger)
return nil, err
}

Expand Down
44 changes: 41 additions & 3 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type Engine struct {
stopCh services.StopChan
newWorkerTimeout time.Duration
maxExecutionDuration time.Duration
heartbeatCadence time.Duration
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved

// testing lifecycle hook to signal when an execution is finished.
onExecutionFinished func(string)
Expand Down Expand Up @@ -147,6 +148,9 @@ func (e *Engine) Start(_ context.Context) error {
e.wg.Add(1)
go e.init(ctx)

e.wg.Add(1)
go e.heartbeat(ctx)

return nil
})
}
Expand Down Expand Up @@ -321,6 +325,7 @@ func (e *Engine) init(ctx context.Context) {

if retryErr != nil {
e.logger.Errorf("initialization failed: %s", retryErr)
logCustMsg(e.cma, fmt.Sprintf("workflow registration failed: %s", retryErr), e.logger)
e.afterInit(false)
return
}
Expand All @@ -342,6 +347,7 @@ func (e *Engine) init(ctx context.Context) {
}

e.logger.Info("engine initialized")
logCustMsg(e.cma, "workflow registered", e.logger)
cedric-cordenier marked this conversation as resolved.
Show resolved Hide resolved
e.afterInit(true)
}

Expand Down Expand Up @@ -715,9 +721,14 @@ func (e *Engine) worker(ctx context.Context) {
continue
}

cma := e.cma.With(eIDKey, executionID)
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
if err != nil {
e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
} else {
e.logger.With(eIDKey, executionID).Debug("execution started")
logCustMsg(cma, "execution started", e.logger)
}
case <-ctx.Done():
return
Expand Down Expand Up @@ -1026,6 +1037,24 @@ func (e *Engine) isWorkflowFullyProcessed(ctx context.Context, state store.Workf
return workflowProcessed, store.StatusCompleted, nil
}

func (e *Engine) heartbeat(ctx context.Context) {
defer e.wg.Done()

ticker := time.NewTicker(e.heartbeatCadence)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
e.logger.Info("shutting down heartbeat")
return
case <-ticker.C:
e.metrics.incrementEngineHeartbeatCounter(ctx)
logCustMsg(e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger)
}
}
}

func (e *Engine) Close() error {
return e.StopOnce("Engine", func() error {
e.logger.Info("shutting down engine")
Expand Down Expand Up @@ -1085,7 +1114,7 @@ func (e *Engine) Close() error {
if err != nil {
return err
}

logCustMsg(e.cma, "workflow unregistered", e.logger)
return nil
})
}
Expand All @@ -1105,6 +1134,7 @@ type Config struct {
Config []byte
Binary []byte
SecretsFetcher secretsFetcher
HeartbeatCadence time.Duration

// For testing purposes only
maxRetries int
Expand All @@ -1119,6 +1149,7 @@ const (
defaultQueueSize = 100000
defaultNewWorkerTimeout = 2 * time.Second
defaultMaxExecutionDuration = 10 * time.Minute
defaultHeartbeatCadence = 5 * time.Minute
)

func NewEngine(cfg Config) (engine *Engine, err error) {
Expand Down Expand Up @@ -1146,6 +1177,10 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
cfg.MaxExecutionDuration = defaultMaxExecutionDuration
}

if cfg.HeartbeatCadence == 0 {
cfg.HeartbeatCadence = defaultHeartbeatCadence
}

if cfg.retryMs == 0 {
cfg.retryMs = 5000
}
Expand All @@ -1170,8 +1205,10 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
// - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist)
// - etc.

cma := custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, cfg.WorkflowName)
workflow, err := Parse(cfg.Workflow)
if err != nil {
logCustMsg(cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
return nil, err
}

Expand All @@ -1180,8 +1217,8 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
workflow.name = hex.EncodeToString([]byte(cfg.WorkflowName))

engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
cma: custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)},
registry: cfg.Registry,
workflow: workflow,
Expand All @@ -1197,6 +1234,7 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
stopCh: make(chan struct{}),
newWorkerTimeout: cfg.NewWorkerTimeout,
maxExecutionDuration: cfg.MaxExecutionDuration,
heartbeatCadence: cfg.HeartbeatCadence,
onExecutionFinished: cfg.onExecutionFinished,
afterInit: cfg.afterInit,
maxRetries: cfg.maxRetries,
Expand Down Expand Up @@ -1243,6 +1281,6 @@ func (e *workflowError) Error() string {
func logCustMsg(cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(msg)
if err != nil {
log.Errorf("failed to send custom message with msg: %s", msg)
log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}
11 changes: 11 additions & 0 deletions core/services/workflows/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var workflowsRunningGauge metric.Int64Gauge
var capabilityInvocationCounter metric.Int64Counter
var workflowExecutionLatencyGauge metric.Int64Gauge // ms
var workflowStepErrorCounter metric.Int64Counter
var engineHeartbeatCounter metric.Int64UpDownCounter

func initMonitoringResources() (err error) {
registerTriggerFailureCounter, err = beholder.GetMeter().Int64Counter("RegisterTriggerFailure")
Expand Down Expand Up @@ -44,6 +45,11 @@ func initMonitoringResources() (err error) {
return fmt.Errorf("failed to register workflow step error counter: %w", err)
}

engineHeartbeatCounter, err = beholder.GetMeter().Int64UpDownCounter("EngineHeartbeat")
if err != nil {
return fmt.Errorf("failed to register engine heartbeat counter: %w", err)
}

return nil
}

Expand Down Expand Up @@ -82,6 +88,11 @@ func (c workflowsMetricLabeler) updateTotalWorkflowsGauge(ctx context.Context, v
workflowsRunningGauge.Record(ctx, val, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

// Observability keys
const (
cIDKey = "capabilityID"
Expand Down
Loading