Skip to content

Commit

Permalink
Repurposes monitoring keys
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzaldysanchez committed Nov 5, 2024
1 parent 0b383bc commit de97942
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 63 deletions.
9 changes: 5 additions & 4 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/platform"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
)

Expand Down Expand Up @@ -213,10 +214,10 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
}

cma := c.emitter.With(
workflowIDKey, req.Metadata.WorkflowId,
workflowNameKey, req.Metadata.WorkflowName,
workflowOwnerKey, req.Metadata.WorkflowOwner,
workflowExecutionIDKey, req.Metadata.WorkflowExecutionId,
platform.WorkflowIDKey, req.Metadata.WorkflowId,
platform.WorkflowNameKey, req.Metadata.WorkflowName,
platform.WorkflowOwnerKey, req.Metadata.WorkflowOwner,
platform.WorkflowExecutionIDKey, req.Metadata.WorkflowExecutionId,
timestampKey, time.Now().UTC().Format(time.RFC3339Nano),
)

Expand Down
9 changes: 1 addition & 8 deletions core/capabilities/compute/monitoring.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
package compute

// Observability keys
const (
workflowIDKey = "workflowID"
workflowExecutionIDKey = "workflowExecutionID"
workflowNameKey = "workflowName"
workflowOwnerKey = "workflowOwner"
timestampKey = "computeTimestamp"
)
const timestampKey = "computeTimestamp"
15 changes: 15 additions & 0 deletions core/platform/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package platform

// Observability keys
const (
CapabilityIDKey = "capabilityID"
TriggerIDKey = "triggerID"
WorkflowIDKey = "workflowID"
WorkflowExecutionIDKey = "workflowExecutionID"
WorkflowNameKey = "workflowName"
WorkflowOwnerKey = "workflowOwner"
StepIDKey = "stepID"
StepRefKey = "stepRef"
)

var OrderedLabelKeys = []string{StepRefKey, StepIDKey, TriggerIDKey, CapabilityIDKey, WorkflowExecutionIDKey, WorkflowIDKey}
75 changes: 38 additions & 37 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

Expand Down Expand Up @@ -167,9 +168,9 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
for _, t := range e.workflow.triggers {
tg, err := e.registry.GetTrigger(ctx, t.ID)
if err != nil {
log := e.logger.With(capabilityIDKey, t.ID)
log := e.logger.With(platform.CapabilityIDKey, t.ID)
log.Errorf("failed to get trigger capability: %s", err)
logCustMsg(ctx, e.cma.With(capabilityIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
logCustMsg(ctx, e.cma.With(platform.CapabilityIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
// we don't immediately return here, since we want to retry all triggers
// to notify the user of all errors at once.
triggersInitialized = false
Expand All @@ -179,7 +180,7 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
}
if !triggersInitialized {
return &workflowError{reason: "failed to resolve triggers", labels: map[string]string{
workflowIDKey: e.workflow.id,
platform.WorkflowIDKey: e.workflow.id,
}}
}

Expand All @@ -201,15 +202,15 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
if err != nil {
logCustMsg(
ctx,
e.cma.With(workflowIDKey, e.workflow.id, stepIDKey, s.ID, stepRefKey, s.Ref),
e.cma.With(platform.WorkflowIDKey, e.workflow.id, platform.StepIDKey, s.ID, platform.StepRefKey, s.Ref),
fmt.Sprintf("failed to initialize capability for step: %s", err),
e.logger,
)
return &workflowError{err: err, reason: "failed to initialize capability for step",
labels: map[string]string{
workflowIDKey: e.workflow.id,
stepIDKey: s.ID,
stepRefKey: s.Ref,
platform.WorkflowIDKey: e.workflow.id,
platform.StepIDKey: s.ID,
platform.StepRefKey: s.Ref,
}}
}

Expand All @@ -231,8 +232,8 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
}

return &workflowError{reason: reason, err: err, labels: map[string]string{
workflowIDKey: e.workflow.id,
stepIDKey: step.ID,
platform.WorkflowIDKey: e.workflow.id,
platform.StepIDKey: step.ID,
}}
}

Expand Down Expand Up @@ -318,7 +319,7 @@ func (e *Engine) init(ctx context.Context) {
if err != nil {
return &workflowError{err: err, reason: "failed to resolve workflow capabilities",
labels: map[string]string{
workflowIDKey: e.workflow.id,
platform.WorkflowIDKey: e.workflow.id,
}}
}
return nil
Expand All @@ -341,9 +342,9 @@ func (e *Engine) init(ctx context.Context) {
for idx, t := range e.workflow.triggers {
terr := e.registerTrigger(ctx, t, idx)
if terr != nil {
log := e.logger.With(capabilityIDKey, t.ID)
log := e.logger.With(platform.CapabilityIDKey, t.ID)
log.Errorf("failed to register trigger: %s", terr)
logCustMsg(ctx, e.cma.With(capabilityIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
logCustMsg(ctx, e.cma.With(platform.CapabilityIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
}
}

Expand Down Expand Up @@ -451,9 +452,9 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
// and triggerID might be "wf_123_trigger_0"
return &workflowError{err: err, reason: fmt.Sprintf("failed to register trigger: %+v", triggerRegRequest),
labels: map[string]string{
workflowIDKey: e.workflow.id,
capabilityIDKey: t.ID,
triggerIDKey: triggerID,
platform.WorkflowIDKey: e.workflow.id,
platform.CapabilityIDKey: t.ID,
platform.TriggerIDKey: triggerID,
}}
}

Expand Down Expand Up @@ -491,7 +492,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
// `executionState`.
func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpdateCh chan store.WorkflowExecutionStep, workflowCreatedAt *time.Time) {
defer e.wg.Done()
lggr := e.logger.With(workflowExecutionIDKey, executionID)
lggr := e.logger.With(platform.WorkflowExecutionIDKey, executionID)
e.logger.Debugf("running stepUpdateLoop for execution %s", executionID)
for {
select {
Expand All @@ -505,11 +506,11 @@ func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpd
}
// Executed synchronously to ensure we correctly schedule subsequent tasks.
e.logger.Debugw(fmt.Sprintf("received step update for execution %s", stepUpdate.ExecutionID),
workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)
platform.WorkflowExecutionIDKey, stepUpdate.ExecutionID, platform.StepRefKey, stepUpdate.Ref)
err := e.handleStepUpdate(ctx, stepUpdate, workflowCreatedAt)
if err != nil {
e.logger.Errorf(fmt.Sprintf("failed to update step state: %+v, %s", stepUpdate, err),
workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)
platform.WorkflowExecutionIDKey, stepUpdate.ExecutionID, platform.StepRefKey, stepUpdate.Ref)
}
}
}
Expand All @@ -532,7 +533,7 @@ func generateExecutionID(workflowID, eventID string) (string, error) {

// startExecution kicks off a new workflow execution when a trigger event is received.
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
lggr := e.logger.With("event", event, workflowExecutionIDKey, executionID)
lggr := e.logger.With("event", event, platform.WorkflowExecutionIDKey, executionID)
lggr.Debug("executing on a trigger event")
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
Expand Down Expand Up @@ -584,8 +585,8 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event *
}

func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.WorkflowExecutionStep, workflowCreatedAt *time.Time) error {
l := e.logger.With(workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)
cma := e.cma.With(workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)
l := e.logger.With(platform.WorkflowExecutionIDKey, stepUpdate.ExecutionID, platform.StepRefKey, stepUpdate.Ref)
cma := e.cma.With(platform.WorkflowExecutionIDKey, stepUpdate.ExecutionID, platform.StepRefKey, stepUpdate.Ref)

// If we've been executing for too long, let's time the workflow step out and continue.
if workflowCreatedAt != nil && e.clock.Since(*workflowCreatedAt) > e.maxExecutionDuration {
Expand Down Expand Up @@ -658,7 +659,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {

// If all dependencies are completed, enqueue the step.
if !waitingOnDependencies {
e.logger.With(stepRefKey, step.Ref, workflowExecutionIDKey, state.ExecutionID, "state", copyState(state)).
e.logger.With(platform.StepRefKey, step.Ref, platform.WorkflowExecutionIDKey, state.ExecutionID, "state", copyState(state)).
Debug("step request enqueued")
e.pendingStepRequests <- stepRequest{
state: copyState(state),
Expand All @@ -668,7 +669,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {
}

func (e *Engine) finishExecution(ctx context.Context, executionID string, status string) error {
e.logger.With(workflowExecutionIDKey, executionID, "status", status).Info("finishing execution")
e.logger.With(platform.WorkflowExecutionIDKey, executionID, "status", status).Info("finishing execution")
metrics := e.metrics.with("status", status)
err := e.executionStates.UpdateStatus(ctx, executionID, status)
if err != nil {
Expand Down Expand Up @@ -713,23 +714,23 @@ func (e *Engine) worker(ctx context.Context) {
te := resp.Event

if te.ID == "" {
e.logger.With(triggerIDKey, te.TriggerType).Error("trigger event ID is empty; not executing")
e.logger.With(platform.TriggerIDKey, te.TriggerType).Error("trigger event ID is empty; not executing")
continue
}

executionID, err := generateExecutionID(e.workflow.id, te.ID)
if err != nil {
e.logger.With(triggerIDKey, te.ID).Errorf("could not generate execution ID: %v", err)
e.logger.With(platform.TriggerIDKey, te.ID).Errorf("could not generate execution ID: %v", err)
continue
}

cma := e.cma.With(workflowExecutionIDKey, executionID)
cma := e.cma.With(platform.WorkflowExecutionIDKey, executionID)
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
if err != nil {
e.logger.With(workflowExecutionIDKey, executionID).Errorf("failed to start execution: %v", err)
e.logger.With(platform.WorkflowExecutionIDKey, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
} else {
e.logger.With(workflowExecutionIDKey, executionID).Debug("execution started")
e.logger.With(platform.WorkflowExecutionIDKey, executionID).Debug("execution started")
logCustMsg(ctx, cma, "execution started", e.logger)
}
case <-ctx.Done():
Expand All @@ -741,8 +742,8 @@ func (e *Engine) worker(ctx context.Context) {
func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// Instantiate a child logger; in addition to the WorkflowID field the workflow
// logger will already have, this adds the `stepRef` and `executionID`
l := e.logger.With(stepRefKey, msg.stepRef, workflowExecutionIDKey, msg.state.ExecutionID)
cma := e.cma.With(stepRefKey, msg.stepRef, workflowExecutionIDKey, msg.state.ExecutionID)
l := e.logger.With(platform.StepRefKey, msg.stepRef, platform.WorkflowExecutionIDKey, msg.state.ExecutionID)
cma := e.cma.With(platform.StepRefKey, msg.stepRef, platform.WorkflowExecutionIDKey, msg.state.ExecutionID)

l.Debug("executing on a step event")
stepState := &store.WorkflowExecutionStep{
Expand Down Expand Up @@ -1104,9 +1105,9 @@ func (e *Engine) Close() error {
return &workflowError{err: innerErr,
reason: fmt.Sprintf("failed to unregister capability from workflow: %+v", reg),
labels: map[string]string{
workflowIDKey: e.workflow.id,
stepIDKey: s.ID,
stepRefKey: s.Ref,
platform.WorkflowIDKey: e.workflow.id,
platform.StepIDKey: s.ID,
platform.StepRefKey: s.Ref,
}}
}

Expand Down Expand Up @@ -1157,7 +1158,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
if cfg.Store == nil {
return nil, &workflowError{reason: "store is nil",
labels: map[string]string{
workflowIDKey: cfg.WorkflowID,
platform.WorkflowIDKey: cfg.WorkflowID,
},
}
}
Expand Down Expand Up @@ -1206,7 +1207,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
// - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist)
// - etc.

cma := custmsg.NewLabeler().With(workflowIDKey, cfg.WorkflowID, workflowOwnerKey, cfg.WorkflowOwner, workflowNameKey, cfg.WorkflowName)
cma := custmsg.NewLabeler().With(platform.WorkflowIDKey, cfg.WorkflowID, platform.WorkflowOwnerKey, cfg.WorkflowOwner, platform.WorkflowNameKey, cfg.WorkflowName)
workflow, err := Parse(cfg.Workflow)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
Expand All @@ -1220,7 +1221,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(workflowIDKey, cfg.WorkflowID, workflowOwnerKey, cfg.WorkflowOwner, workflowNameKey, workflow.name)},
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.WorkflowIDKey, cfg.WorkflowID, platform.WorkflowOwnerKey, cfg.WorkflowOwner, platform.WorkflowNameKey, workflow.name)},
registry: cfg.Registry,
workflow: workflow,
secretsFetcher: cfg.SecretsFetcher,
Expand Down Expand Up @@ -1268,7 +1269,7 @@ func (e *workflowError) Error() string {
}

// prefix the error with the labels
for _, label := range orderedLabelKeys {
for _, label := range platform.OrderedLabelKeys {
// This will silently ignore any labels that are not present in the map
// are we ok with this?
if value, ok := e.labels[label]; ok {
Expand Down
14 changes: 0 additions & 14 deletions core/services/workflows/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,3 @@ func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Cont
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

// Observability keys
const (
capabilityIDKey = "capabilityID"
triggerIDKey = "triggerID"
workflowIDKey = "workflowID"
workflowExecutionIDKey = "workflowExecutionID"
workflowNameKey = "workflowName"
workflowOwnerKey = "workflowOwner"
stepIDKey = "stepID"
stepRefKey = "stepRef"
)

var orderedLabelKeys = []string{stepRefKey, stepIDKey, triggerIDKey, capabilityIDKey, workflowExecutionIDKey, workflowIDKey}

0 comments on commit de97942

Please sign in to comment.