Skip to content

Commit

Permalink
Add beholder logging for custom compute (#15122)
Browse files Browse the repository at this point in the history
* Adds beholder logging

* Bumps `common`

* Bumps `common`

* Bumps `common`

* Improves var naming

* make `gomodtidy`

* Repurposes monitoring keys

* Fixes CI

* Bumps CCIP

* Bumps common

* Address review comments

* Reverts unsupported approach
  • Loading branch information
vyzaldysanchez authored Nov 7, 2024
1 parent 864d76a commit 2ecdf34
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 70 deletions.
43 changes: 32 additions & 11 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"

Expand All @@ -21,8 +22,10 @@ import (
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/platform"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
)

Expand Down Expand Up @@ -117,7 +120,7 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe

m, ok := c.modules.get(id)
if !ok {
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.WorkflowExecutionID, request.Metadata.ReferenceID)
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
Expand All @@ -128,10 +131,10 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe
return c.executeWithModule(ctx, m.module, cfg.Config, request)
}

func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) {
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, requestMetadata capabilities.RequestMetadata) (*module, error) {
initStart := time.Now()

cfg.Fetch = c.createFetcher(workflowID, workflowExecutionID)
cfg.Fetch = c.createFetcher()
mod, err := host.NewModule(cfg, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
Expand All @@ -140,7 +143,7 @@ func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, w
mod.Start()

initDuration := time.Since(initStart)
computeWASMInit.WithLabelValues(workflowID, referenceID).Observe(float64(initDuration))
computeWASMInit.WithLabelValues(requestMetadata.WorkflowID, requestMetadata.ReferenceID).Observe(float64(initDuration))

m := &module{module: mod}
c.modules.add(id, m)
Expand Down Expand Up @@ -201,18 +204,26 @@ func (c *Compute) Close() error {
return nil
}

func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err)
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowId); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", req.Metadata.WorkflowId, err)
}
if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err)
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionId); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.Metadata.WorkflowExecutionId, err)
}

cma := c.emitter.With(
platform.KeyWorkflowID, req.Metadata.WorkflowId,
platform.KeyWorkflowName, req.Metadata.WorkflowName,
platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner,
platform.KeyWorkflowExecutionID, req.Metadata.WorkflowExecutionId,
timestampKey, time.Now().UTC().Format(time.RFC3339Nano),
)

messageID := strings.Join([]string{
workflowID,
workflowExecutionID,
req.Metadata.WorkflowId,
req.Metadata.WorkflowExecutionId,
ghcapabilities.MethodComputeAction,
c.idGenerator(),
}, "/")
Expand Down Expand Up @@ -245,6 +256,16 @@ func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx
if err != nil {
return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err)
}

// Only log if the response is not in the 200 range
if response.StatusCode < http.StatusOK || response.StatusCode >= http.StatusMultipleChoices {
msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode)
err = cma.Emit(ctx, msg)
if err != nil {
c.log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}

return &response, nil
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/capabilities/compute/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package compute

const timestampKey = "computeTimestamp"
15 changes: 15 additions & 0 deletions core/platform/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package platform

// Observability keys
const (
KeyCapabilityID = "capabilityID"
KeyTriggerID = "triggerID"
KeyWorkflowID = "workflowID"
KeyWorkflowExecutionID = "workflowExecutionID"
KeyWorkflowName = "workflowName"
KeyWorkflowOwner = "workflowOwner"
KeyStepID = "stepID"
KeyStepRef = "stepRef"
)

var OrderedLabelKeys = []string{KeyStepRef, KeyStepID, KeyTriggerID, KeyCapabilityID, KeyWorkflowExecutionID, KeyWorkflowID}
3 changes: 2 additions & 1 deletion core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)
Expand Down Expand Up @@ -39,7 +40,7 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) {
cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName)
cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, spec.WorkflowSpec.WorkflowID, platform.KeyWorkflowOwner, spec.WorkflowSpec.WorkflowOwner, platform.KeyWorkflowName, spec.WorkflowSpec.WorkflowName)
sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger)
Expand Down
75 changes: 38 additions & 37 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

Expand Down Expand Up @@ -167,9 +168,9 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
for _, t := range e.workflow.triggers {
tg, err := e.registry.GetTrigger(ctx, t.ID)
if err != nil {
log := e.logger.With(cIDKey, t.ID)
log := e.logger.With(platform.KeyCapabilityID, t.ID)
log.Errorf("failed to get trigger capability: %s", err)
logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
logCustMsg(ctx, e.cma.With(platform.KeyCapabilityID, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
// we don't immediately return here, since we want to retry all triggers
// to notify the user of all errors at once.
triggersInitialized = false
Expand All @@ -179,7 +180,7 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
}
if !triggersInitialized {
return &workflowError{reason: "failed to resolve triggers", labels: map[string]string{
wIDKey: e.workflow.id,
platform.KeyWorkflowID: e.workflow.id,
}}
}

Expand All @@ -201,15 +202,15 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
if err != nil {
logCustMsg(
ctx,
e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref),
e.cma.With(platform.KeyWorkflowID, e.workflow.id, platform.KeyStepID, s.ID, platform.KeyStepRef, s.Ref),
fmt.Sprintf("failed to initialize capability for step: %s", err),
e.logger,
)
return &workflowError{err: err, reason: "failed to initialize capability for step",
labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: s.ID,
sRKey: s.Ref,
platform.KeyWorkflowID: e.workflow.id,
platform.KeyStepID: s.ID,
platform.KeyStepRef: s.Ref,
}}
}

Expand All @@ -231,8 +232,8 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
}

return &workflowError{reason: reason, err: err, labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: step.ID,
platform.KeyWorkflowID: e.workflow.id,
platform.KeyStepID: step.ID,
}}
}

Expand Down Expand Up @@ -318,7 +319,7 @@ func (e *Engine) init(ctx context.Context) {
if err != nil {
return &workflowError{err: err, reason: "failed to resolve workflow capabilities",
labels: map[string]string{
wIDKey: e.workflow.id,
platform.KeyWorkflowID: e.workflow.id,
}}
}
return nil
Expand All @@ -341,9 +342,9 @@ func (e *Engine) init(ctx context.Context) {
for idx, t := range e.workflow.triggers {
terr := e.registerTrigger(ctx, t, idx)
if terr != nil {
log := e.logger.With(cIDKey, t.ID)
log := e.logger.With(platform.KeyCapabilityID, t.ID)
log.Errorf("failed to register trigger: %s", terr)
logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
logCustMsg(ctx, e.cma.With(platform.KeyCapabilityID, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
}
}

Expand Down Expand Up @@ -451,9 +452,9 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
// and triggerID might be "wf_123_trigger_0"
return &workflowError{err: err, reason: fmt.Sprintf("failed to register trigger: %+v", triggerRegRequest),
labels: map[string]string{
wIDKey: e.workflow.id,
cIDKey: t.ID,
tIDKey: triggerID,
platform.KeyWorkflowID: e.workflow.id,
platform.KeyCapabilityID: t.ID,
platform.KeyTriggerID: triggerID,
}}
}

Expand Down Expand Up @@ -491,7 +492,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
// `executionState`.
func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpdateCh chan store.WorkflowExecutionStep, workflowCreatedAt *time.Time) {
defer e.wg.Done()
lggr := e.logger.With(eIDKey, executionID)
lggr := e.logger.With(platform.KeyWorkflowExecutionID, executionID)
e.logger.Debugf("running stepUpdateLoop for execution %s", executionID)
for {
select {
Expand All @@ -505,11 +506,11 @@ func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpd
}
// Executed synchronously to ensure we correctly schedule subsequent tasks.
e.logger.Debugw(fmt.Sprintf("received step update for execution %s", stepUpdate.ExecutionID),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref)
err := e.handleStepUpdate(ctx, stepUpdate, workflowCreatedAt)
if err != nil {
e.logger.Errorf(fmt.Sprintf("failed to update step state: %+v, %s", stepUpdate, err),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref)
}
}
}
Expand All @@ -532,7 +533,7 @@ func generateExecutionID(workflowID, eventID string) (string, error) {

// startExecution kicks off a new workflow execution when a trigger event is received.
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
lggr := e.logger.With("event", event, eIDKey, executionID)
lggr := e.logger.With("event", event, platform.KeyWorkflowExecutionID, executionID)
lggr.Debug("executing on a trigger event")
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
Expand Down Expand Up @@ -584,8 +585,8 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event *
}

func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.WorkflowExecutionStep, workflowCreatedAt *time.Time) error {
l := e.logger.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
cma := e.cma.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
l := e.logger.With(platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref)
cma := e.cma.With(platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref)

// If we've been executing for too long, let's time the workflow step out and continue.
if workflowCreatedAt != nil && e.clock.Since(*workflowCreatedAt) > e.maxExecutionDuration {
Expand Down Expand Up @@ -658,7 +659,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {

// If all dependencies are completed, enqueue the step.
if !waitingOnDependencies {
e.logger.With(sRKey, step.Ref, eIDKey, state.ExecutionID, "state", copyState(state)).
e.logger.With(platform.KeyStepRef, step.Ref, platform.KeyWorkflowExecutionID, state.ExecutionID, "state", copyState(state)).
Debug("step request enqueued")
e.pendingStepRequests <- stepRequest{
state: copyState(state),
Expand All @@ -668,7 +669,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {
}

func (e *Engine) finishExecution(ctx context.Context, executionID string, status string) error {
e.logger.With(eIDKey, executionID, "status", status).Info("finishing execution")
e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status).Info("finishing execution")
metrics := e.metrics.with("status", status)
err := e.executionStates.UpdateStatus(ctx, executionID, status)
if err != nil {
Expand Down Expand Up @@ -713,23 +714,23 @@ func (e *Engine) worker(ctx context.Context) {
te := resp.Event

if te.ID == "" {
e.logger.With(tIDKey, te.TriggerType).Error("trigger event ID is empty; not executing")
e.logger.With(platform.KeyTriggerID, te.TriggerType).Error("trigger event ID is empty; not executing")
continue
}

executionID, err := generateExecutionID(e.workflow.id, te.ID)
if err != nil {
e.logger.With(tIDKey, te.ID).Errorf("could not generate execution ID: %v", err)
e.logger.With(platform.KeyTriggerID, te.ID).Errorf("could not generate execution ID: %v", err)
continue
}

cma := e.cma.With(eIDKey, executionID)
cma := e.cma.With(platform.KeyWorkflowExecutionID, executionID)
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
if err != nil {
e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err)
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
} else {
e.logger.With(eIDKey, executionID).Debug("execution started")
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Debug("execution started")
logCustMsg(ctx, cma, "execution started", e.logger)
}
case <-ctx.Done():
Expand All @@ -741,8 +742,8 @@ func (e *Engine) worker(ctx context.Context) {
func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// Instantiate a child logger; in addition to the WorkflowID field the workflow
// logger will already have, this adds the `stepRef` and `executionID`
l := e.logger.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
cma := e.cma.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
l := e.logger.With(platform.KeyStepRef, msg.stepRef, platform.KeyWorkflowExecutionID, msg.state.ExecutionID)
cma := e.cma.With(platform.KeyStepRef, msg.stepRef, platform.KeyWorkflowExecutionID, msg.state.ExecutionID)

l.Debug("executing on a step event")
stepState := &store.WorkflowExecutionStep{
Expand Down Expand Up @@ -1104,9 +1105,9 @@ func (e *Engine) Close() error {
return &workflowError{err: innerErr,
reason: fmt.Sprintf("failed to unregister capability from workflow: %+v", reg),
labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: s.ID,
sRKey: s.Ref,
platform.KeyWorkflowID: e.workflow.id,
platform.KeyStepID: s.ID,
platform.KeyStepRef: s.Ref,
}}
}

Expand Down Expand Up @@ -1157,7 +1158,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
if cfg.Store == nil {
return nil, &workflowError{reason: "store is nil",
labels: map[string]string{
wIDKey: cfg.WorkflowID,
platform.KeyWorkflowID: cfg.WorkflowID,
},
}
}
Expand Down Expand Up @@ -1206,7 +1207,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
// - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist)
// - etc.

cma := custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, cfg.WorkflowName)
cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName)
workflow, err := Parse(cfg.Workflow)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
Expand All @@ -1220,7 +1221,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)},
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, workflow.name)},
registry: cfg.Registry,
workflow: workflow,
secretsFetcher: cfg.SecretsFetcher,
Expand Down Expand Up @@ -1268,7 +1269,7 @@ func (e *workflowError) Error() string {
}

// prefix the error with the labels
for _, label := range orderedLabelKeys {
for _, label := range platform.OrderedLabelKeys {
// This will silently ignore any labels that are not present in the map
// are we ok with this?
if value, ok := e.labels[label]; ok {
Expand Down
Loading

0 comments on commit 2ecdf34

Please sign in to comment.