Skip to content

Commit

Permalink
Improves var naming
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzaldysanchez committed Nov 5, 2024
1 parent d8422c8 commit 707c752
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 62 deletions.
8 changes: 4 additions & 4 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
}

cma := c.emitter.With(
wIDKey, req.Metadata.WorkflowId,
wnKey, req.Metadata.WorkflowName,
woIDKey, req.Metadata.WorkflowOwner,
eIDKey, req.Metadata.WorkflowExecutionId,
workflowIDKey, req.Metadata.WorkflowId,
workflowNameKey, req.Metadata.WorkflowName,
workflowOwnerKey, req.Metadata.WorkflowOwner,
workflowExecutionIDKey, req.Metadata.WorkflowExecutionId,
timestampKey, time.Now().UTC().Format(time.RFC3339Nano),
)

Expand Down
10 changes: 5 additions & 5 deletions core/capabilities/compute/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package compute

// Observability keys
const (
wIDKey = "workflowID"
eIDKey = "workflowExecutionID"
wnKey = "workflowName"
woIDKey = "workflowOwner"
timestampKey = "computeTimestamp"
workflowIDKey = "workflowID"
workflowExecutionIDKey = "workflowExecutionID"
workflowNameKey = "workflowName"
workflowOwnerKey = "workflowOwner"
timestampKey = "computeTimestamp"
)
2 changes: 1 addition & 1 deletion core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) {
cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName)
cma := custmsg.NewLabeler().With(workflowIDKey, spec.WorkflowSpec.WorkflowID, workflowOwnerKey, spec.WorkflowSpec.WorkflowOwner, workflowNameKey, spec.WorkflowSpec.WorkflowName)
sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger)
Expand Down
72 changes: 36 additions & 36 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
for _, t := range e.workflow.triggers {
tg, err := e.registry.GetTrigger(ctx, t.ID)
if err != nil {
log := e.logger.With(cIDKey, t.ID)
log := e.logger.With(capabilityIDKey, t.ID)
log.Errorf("failed to get trigger capability: %s", err)
logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
logCustMsg(ctx, e.cma.With(capabilityIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
// we don't immediately return here, since we want to retry all triggers
// to notify the user of all errors at once.
triggersInitialized = false
Expand All @@ -179,7 +179,7 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
}
if !triggersInitialized {
return &workflowError{reason: "failed to resolve triggers", labels: map[string]string{
wIDKey: e.workflow.id,
workflowIDKey: e.workflow.id,
}}
}

Expand All @@ -201,15 +201,15 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
if err != nil {
logCustMsg(
ctx,
e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref),
e.cma.With(workflowIDKey, e.workflow.id, stepIDKey, s.ID, stepRefKey, s.Ref),
fmt.Sprintf("failed to initialize capability for step: %s", err),
e.logger,
)
return &workflowError{err: err, reason: "failed to initialize capability for step",
labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: s.ID,
sRKey: s.Ref,
workflowIDKey: e.workflow.id,
stepIDKey: s.ID,
stepRefKey: s.Ref,
}}
}

Expand All @@ -231,8 +231,8 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
}

return &workflowError{reason: reason, err: err, labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: step.ID,
workflowIDKey: e.workflow.id,
stepIDKey: step.ID,
}}
}

Expand Down Expand Up @@ -318,7 +318,7 @@ func (e *Engine) init(ctx context.Context) {
if err != nil {
return &workflowError{err: err, reason: "failed to resolve workflow capabilities",
labels: map[string]string{
wIDKey: e.workflow.id,
workflowIDKey: e.workflow.id,
}}
}
return nil
Expand All @@ -341,9 +341,9 @@ func (e *Engine) init(ctx context.Context) {
for idx, t := range e.workflow.triggers {
terr := e.registerTrigger(ctx, t, idx)
if terr != nil {
log := e.logger.With(cIDKey, t.ID)
log := e.logger.With(capabilityIDKey, t.ID)
log.Errorf("failed to register trigger: %s", terr)
logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
logCustMsg(ctx, e.cma.With(capabilityIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
}
}

Expand Down Expand Up @@ -451,9 +451,9 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
// and triggerID might be "wf_123_trigger_0"
return &workflowError{err: err, reason: fmt.Sprintf("failed to register trigger: %+v", triggerRegRequest),
labels: map[string]string{
wIDKey: e.workflow.id,
cIDKey: t.ID,
tIDKey: triggerID,
workflowIDKey: e.workflow.id,
capabilityIDKey: t.ID,
triggerIDKey: triggerID,
}}
}

Expand Down Expand Up @@ -491,7 +491,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
// `executionState`.
func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpdateCh chan store.WorkflowExecutionStep, workflowCreatedAt *time.Time) {
defer e.wg.Done()
lggr := e.logger.With(eIDKey, executionID)
lggr := e.logger.With(workflowExecutionIDKey, executionID)
e.logger.Debugf("running stepUpdateLoop for execution %s", executionID)
for {
select {
Expand All @@ -505,11 +505,11 @@ func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpd
}
// Executed synchronously to ensure we correctly schedule subsequent tasks.
e.logger.Debugw(fmt.Sprintf("received step update for execution %s", stepUpdate.ExecutionID),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)
err := e.handleStepUpdate(ctx, stepUpdate, workflowCreatedAt)
if err != nil {
e.logger.Errorf(fmt.Sprintf("failed to update step state: %+v, %s", stepUpdate, err),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)
}
}
}
Expand All @@ -532,7 +532,7 @@ func generateExecutionID(workflowID, eventID string) (string, error) {

// startExecution kicks off a new workflow execution when a trigger event is received.
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
lggr := e.logger.With("event", event, eIDKey, executionID)
lggr := e.logger.With("event", event, workflowExecutionIDKey, executionID)
lggr.Debug("executing on a trigger event")
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
Expand Down Expand Up @@ -584,8 +584,8 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event *
}

func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.WorkflowExecutionStep, workflowCreatedAt *time.Time) error {
l := e.logger.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
cma := e.cma.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
l := e.logger.With(workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)
cma := e.cma.With(workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)

// If we've been executing for too long, let's time the workflow step out and continue.
if workflowCreatedAt != nil && e.clock.Since(*workflowCreatedAt) > e.maxExecutionDuration {
Expand Down Expand Up @@ -658,7 +658,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {

// If all dependencies are completed, enqueue the step.
if !waitingOnDependencies {
e.logger.With(sRKey, step.Ref, eIDKey, state.ExecutionID, "state", copyState(state)).
e.logger.With(stepRefKey, step.Ref, workflowExecutionIDKey, state.ExecutionID, "state", copyState(state)).
Debug("step request enqueued")
e.pendingStepRequests <- stepRequest{
state: copyState(state),
Expand All @@ -668,7 +668,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {
}

func (e *Engine) finishExecution(ctx context.Context, executionID string, status string) error {
e.logger.With(eIDKey, executionID, "status", status).Info("finishing execution")
e.logger.With(workflowExecutionIDKey, executionID, "status", status).Info("finishing execution")
metrics := e.metrics.with("status", status)
err := e.executionStates.UpdateStatus(ctx, executionID, status)
if err != nil {
Expand Down Expand Up @@ -713,23 +713,23 @@ func (e *Engine) worker(ctx context.Context) {
te := resp.Event

if te.ID == "" {
e.logger.With(tIDKey, te.TriggerType).Error("trigger event ID is empty; not executing")
e.logger.With(triggerIDKey, te.TriggerType).Error("trigger event ID is empty; not executing")
continue
}

executionID, err := generateExecutionID(e.workflow.id, te.ID)
if err != nil {
e.logger.With(tIDKey, te.ID).Errorf("could not generate execution ID: %v", err)
e.logger.With(triggerIDKey, te.ID).Errorf("could not generate execution ID: %v", err)
continue
}

cma := e.cma.With(eIDKey, executionID)
cma := e.cma.With(workflowExecutionIDKey, executionID)
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
if err != nil {
e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err)
e.logger.With(workflowExecutionIDKey, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
} else {
e.logger.With(eIDKey, executionID).Debug("execution started")
e.logger.With(workflowExecutionIDKey, executionID).Debug("execution started")
logCustMsg(ctx, cma, "execution started", e.logger)
}
case <-ctx.Done():
Expand All @@ -741,8 +741,8 @@ func (e *Engine) worker(ctx context.Context) {
func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// Instantiate a child logger; in addition to the WorkflowID field the workflow
// logger will already have, this adds the `stepRef` and `executionID`
l := e.logger.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
cma := e.cma.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
l := e.logger.With(stepRefKey, msg.stepRef, workflowExecutionIDKey, msg.state.ExecutionID)
cma := e.cma.With(stepRefKey, msg.stepRef, workflowExecutionIDKey, msg.state.ExecutionID)

l.Debug("executing on a step event")
stepState := &store.WorkflowExecutionStep{
Expand Down Expand Up @@ -1104,9 +1104,9 @@ func (e *Engine) Close() error {
return &workflowError{err: innerErr,
reason: fmt.Sprintf("failed to unregister capability from workflow: %+v", reg),
labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: s.ID,
sRKey: s.Ref,
workflowIDKey: e.workflow.id,
stepIDKey: s.ID,
stepRefKey: s.Ref,
}}
}

Expand Down Expand Up @@ -1157,7 +1157,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
if cfg.Store == nil {
return nil, &workflowError{reason: "store is nil",
labels: map[string]string{
wIDKey: cfg.WorkflowID,
workflowIDKey: cfg.WorkflowID,
},
}
}
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
// - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist)
// - etc.

cma := custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, cfg.WorkflowName)
cma := custmsg.NewLabeler().With(workflowIDKey, cfg.WorkflowID, workflowOwnerKey, cfg.WorkflowOwner, workflowNameKey, cfg.WorkflowName)
workflow, err := Parse(cfg.Workflow)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
Expand All @@ -1220,7 +1220,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)},
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(workflowIDKey, cfg.WorkflowID, workflowOwnerKey, cfg.WorkflowOwner, workflowNameKey, workflow.name)},
registry: cfg.Registry,
workflow: workflow,
secretsFetcher: cfg.SecretsFetcher,
Expand Down
15 changes: 8 additions & 7 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
Expand Down Expand Up @@ -973,26 +974,26 @@ func TestEngine_Error(t *testing.T) {
}{
{
name: "Error with error and reason",
labels: map[string]string{wIDKey: "my-workflow-id"},
labels: map[string]string{workflowIDKey: "my-workflow-id"},
err: err,
reason: "some reason",
want: "workflowID my-workflow-id: some reason: some error",
},
{
name: "Error with error and no reason",
labels: map[string]string{eIDKey: "dd3708ac7d8dd6fa4fae0fb87b73f318a4da2526c123e159b72435e3b2fe8751"},
labels: map[string]string{workflowExecutionIDKey: "dd3708ac7d8dd6fa4fae0fb87b73f318a4da2526c123e159b72435e3b2fe8751"},
err: err,
want: "workflowExecutionID dd3708ac7d8dd6fa4fae0fb87b73f318a4da2526c123e159b72435e3b2fe8751: some error",
},
{
name: "Error with no error and reason",
labels: map[string]string{cIDKey: "streams-trigger:[email protected]"},
labels: map[string]string{capabilityIDKey: "streams-trigger:[email protected]"},
reason: "some reason",
want: "capabilityID streams-trigger:[email protected]: some reason",
},
{
name: "Error with no error and no reason",
labels: map[string]string{tIDKey: "wf_123_trigger_456"},
labels: map[string]string{triggerIDKey: "wf_123_trigger_456"},
want: "triggerID wf_123_trigger_456: ",
},
{
Expand All @@ -1005,9 +1006,9 @@ func TestEngine_Error(t *testing.T) {
{
name: "Multiple labels",
labels: map[string]string{
wIDKey: "my-workflow-id",
eIDKey: "dd3708ac7d8dd6fa4fae0fb87b73f318a4da2526c123e159b72435e3b2fe8751",
cIDKey: "streams-trigger:[email protected]",
workflowIDKey: "my-workflow-id",
workflowExecutionIDKey: "dd3708ac7d8dd6fa4fae0fb87b73f318a4da2526c123e159b72435e3b2fe8751",
capabilityIDKey: "streams-trigger:[email protected]",
},
err: err,
reason: "some reason",
Expand Down
18 changes: 9 additions & 9 deletions core/services/workflows/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Cont

// Observability keys
const (
cIDKey = "capabilityID"
tIDKey = "triggerID"
wIDKey = "workflowID"
eIDKey = "workflowExecutionID"
wnKey = "workflowName"
woIDKey = "workflowOwner"
sIDKey = "stepID"
sRKey = "stepRef"
capabilityIDKey = "capabilityID"
triggerIDKey = "triggerID"
workflowIDKey = "workflowID"
workflowExecutionIDKey = "workflowExecutionID"
workflowNameKey = "workflowName"
workflowOwnerKey = "workflowOwner"
stepIDKey = "stepID"
stepRefKey = "stepRef"
)

var orderedLabelKeys = []string{sRKey, sIDKey, tIDKey, cIDKey, eIDKey, wIDKey}
var orderedLabelKeys = []string{stepRefKey, stepIDKey, triggerIDKey, capabilityIDKey, workflowExecutionIDKey, workflowIDKey}

0 comments on commit 707c752

Please sign in to comment.