Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add beholder logging for custom compute #15122

Merged
merged 15 commits into from
Nov 11, 2024
Merged
42 changes: 31 additions & 11 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"

Expand All @@ -21,6 +22,7 @@ import (
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
Expand Down Expand Up @@ -117,7 +119,7 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe

m, ok := c.modules.get(id)
if !ok {
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.WorkflowExecutionID, request.Metadata.ReferenceID)
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
Expand All @@ -128,10 +130,10 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe
return c.executeWithModule(ctx, m.module, cfg.Config, request)
}

func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) {
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, requestMetadata capabilities.RequestMetadata) (*module, error) {
initStart := time.Now()

cfg.Fetch = c.createFetcher(workflowID, workflowExecutionID)
cfg.Fetch = c.createFetcher()
mod, err := host.NewModule(cfg, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
Expand All @@ -140,7 +142,7 @@ func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, w
mod.Start()

initDuration := time.Since(initStart)
computeWASMInit.WithLabelValues(workflowID, referenceID).Observe(float64(initDuration))
computeWASMInit.WithLabelValues(requestMetadata.WorkflowID, requestMetadata.ReferenceID).Observe(float64(initDuration))

m := &module{module: mod}
c.modules.add(id, m)
Expand Down Expand Up @@ -201,18 +203,26 @@ func (c *Compute) Close() error {
return nil
}

func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err)
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowId); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", req.Metadata.WorkflowId, err)
}
if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err)
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionId); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.Metadata.WorkflowExecutionId, err)
}

cma := c.emitter.With(
workflowIDKey, req.Metadata.WorkflowId,
workflowNameKey, req.Metadata.WorkflowName,
workflowOwnerKey, req.Metadata.WorkflowOwner,
workflowExecutionIDKey, req.Metadata.WorkflowExecutionId,
timestampKey, time.Now().UTC().Format(time.RFC3339Nano),
)

messageID := strings.Join([]string{
workflowID,
workflowExecutionID,
req.Metadata.WorkflowId,
req.Metadata.WorkflowExecutionId,
ghcapabilities.MethodComputeAction,
c.idGenerator(),
}, "/")
Expand Down Expand Up @@ -245,6 +255,16 @@ func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx
if err != nil {
return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err)
}

// Only log if the response is not in the 200 range
if response.StatusCode < http.StatusOK || response.StatusCode >= http.StatusMultipleChoices {
msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode)
err = cma.Emit(ctx, msg)
if err != nil {
c.log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}

return &response, nil
}
}
Expand Down
10 changes: 10 additions & 0 deletions core/capabilities/compute/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package compute
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved

// Observability keys
const (
workflowIDKey = "workflowID"
workflowExecutionIDKey = "workflowExecutionID"
workflowNameKey = "workflowName"
workflowOwnerKey = "workflowOwner"
timestampKey = "computeTimestamp"
)
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241105163318-e1b7c81d582a
github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241104130643-4b7e196370c4 h1:GWjim4uGGFbye4XbJP0cPAbARhc8u3cAJU8jLYy0mXM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241104130643-4b7e196370c4/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 h1:AGi0kAtMRW1zl1h7sGw+3CKO4Nlev6iA08YfEcgJCGs=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241105163318-e1b7c81d582a h1:cmE2fzK1hsRArr5s4Ygao7bBdSgfLJRHIGFMYxBlEhc=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241105163318-e1b7c81d582a/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) {
cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName)
cma := custmsg.NewLabeler().With(workflowIDKey, spec.WorkflowSpec.WorkflowID, workflowOwnerKey, spec.WorkflowSpec.WorkflowOwner, workflowNameKey, spec.WorkflowSpec.WorkflowName)
sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger)
Expand Down
72 changes: 36 additions & 36 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
for _, t := range e.workflow.triggers {
tg, err := e.registry.GetTrigger(ctx, t.ID)
if err != nil {
log := e.logger.With(cIDKey, t.ID)
log := e.logger.With(capabilityIDKey, t.ID)
log.Errorf("failed to get trigger capability: %s", err)
logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
logCustMsg(ctx, e.cma.With(capabilityIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
// we don't immediately return here, since we want to retry all triggers
// to notify the user of all errors at once.
triggersInitialized = false
Expand All @@ -179,7 +179,7 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
}
if !triggersInitialized {
return &workflowError{reason: "failed to resolve triggers", labels: map[string]string{
wIDKey: e.workflow.id,
workflowIDKey: e.workflow.id,
}}
}

Expand All @@ -201,15 +201,15 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
if err != nil {
logCustMsg(
ctx,
e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref),
e.cma.With(workflowIDKey, e.workflow.id, stepIDKey, s.ID, stepRefKey, s.Ref),
fmt.Sprintf("failed to initialize capability for step: %s", err),
e.logger,
)
return &workflowError{err: err, reason: "failed to initialize capability for step",
labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: s.ID,
sRKey: s.Ref,
workflowIDKey: e.workflow.id,
stepIDKey: s.ID,
stepRefKey: s.Ref,
}}
}

Expand All @@ -231,8 +231,8 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
}

return &workflowError{reason: reason, err: err, labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: step.ID,
workflowIDKey: e.workflow.id,
stepIDKey: step.ID,
}}
}

Expand Down Expand Up @@ -318,7 +318,7 @@ func (e *Engine) init(ctx context.Context) {
if err != nil {
return &workflowError{err: err, reason: "failed to resolve workflow capabilities",
labels: map[string]string{
wIDKey: e.workflow.id,
workflowIDKey: e.workflow.id,
}}
}
return nil
Expand All @@ -341,9 +341,9 @@ func (e *Engine) init(ctx context.Context) {
for idx, t := range e.workflow.triggers {
terr := e.registerTrigger(ctx, t, idx)
if terr != nil {
log := e.logger.With(cIDKey, t.ID)
log := e.logger.With(capabilityIDKey, t.ID)
log.Errorf("failed to register trigger: %s", terr)
logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
logCustMsg(ctx, e.cma.With(capabilityIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
}
}

Expand Down Expand Up @@ -451,9 +451,9 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
// and triggerID might be "wf_123_trigger_0"
return &workflowError{err: err, reason: fmt.Sprintf("failed to register trigger: %+v", triggerRegRequest),
labels: map[string]string{
wIDKey: e.workflow.id,
cIDKey: t.ID,
tIDKey: triggerID,
workflowIDKey: e.workflow.id,
capabilityIDKey: t.ID,
triggerIDKey: triggerID,
}}
}

Expand Down Expand Up @@ -491,7 +491,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
// `executionState`.
func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpdateCh chan store.WorkflowExecutionStep, workflowCreatedAt *time.Time) {
defer e.wg.Done()
lggr := e.logger.With(eIDKey, executionID)
lggr := e.logger.With(workflowExecutionIDKey, executionID)
e.logger.Debugf("running stepUpdateLoop for execution %s", executionID)
for {
select {
Expand All @@ -505,11 +505,11 @@ func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpd
}
// Executed synchronously to ensure we correctly schedule subsequent tasks.
e.logger.Debugw(fmt.Sprintf("received step update for execution %s", stepUpdate.ExecutionID),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)
err := e.handleStepUpdate(ctx, stepUpdate, workflowCreatedAt)
if err != nil {
e.logger.Errorf(fmt.Sprintf("failed to update step state: %+v, %s", stepUpdate, err),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)
}
}
}
Expand All @@ -532,7 +532,7 @@ func generateExecutionID(workflowID, eventID string) (string, error) {

// startExecution kicks off a new workflow execution when a trigger event is received.
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
lggr := e.logger.With("event", event, eIDKey, executionID)
lggr := e.logger.With("event", event, workflowExecutionIDKey, executionID)
lggr.Debug("executing on a trigger event")
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
Expand Down Expand Up @@ -584,8 +584,8 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event *
}

func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.WorkflowExecutionStep, workflowCreatedAt *time.Time) error {
l := e.logger.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
cma := e.cma.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
l := e.logger.With(workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)
cma := e.cma.With(workflowExecutionIDKey, stepUpdate.ExecutionID, stepRefKey, stepUpdate.Ref)

// If we've been executing for too long, let's time the workflow step out and continue.
if workflowCreatedAt != nil && e.clock.Since(*workflowCreatedAt) > e.maxExecutionDuration {
Expand Down Expand Up @@ -658,7 +658,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {

// If all dependencies are completed, enqueue the step.
if !waitingOnDependencies {
e.logger.With(sRKey, step.Ref, eIDKey, state.ExecutionID, "state", copyState(state)).
e.logger.With(stepRefKey, step.Ref, workflowExecutionIDKey, state.ExecutionID, "state", copyState(state)).
Debug("step request enqueued")
e.pendingStepRequests <- stepRequest{
state: copyState(state),
Expand All @@ -668,7 +668,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {
}

func (e *Engine) finishExecution(ctx context.Context, executionID string, status string) error {
e.logger.With(eIDKey, executionID, "status", status).Info("finishing execution")
e.logger.With(workflowExecutionIDKey, executionID, "status", status).Info("finishing execution")
metrics := e.metrics.with("status", status)
err := e.executionStates.UpdateStatus(ctx, executionID, status)
if err != nil {
Expand Down Expand Up @@ -713,23 +713,23 @@ func (e *Engine) worker(ctx context.Context) {
te := resp.Event

if te.ID == "" {
e.logger.With(tIDKey, te.TriggerType).Error("trigger event ID is empty; not executing")
e.logger.With(triggerIDKey, te.TriggerType).Error("trigger event ID is empty; not executing")
continue
}

executionID, err := generateExecutionID(e.workflow.id, te.ID)
if err != nil {
e.logger.With(tIDKey, te.ID).Errorf("could not generate execution ID: %v", err)
e.logger.With(triggerIDKey, te.ID).Errorf("could not generate execution ID: %v", err)
continue
}

cma := e.cma.With(eIDKey, executionID)
cma := e.cma.With(workflowExecutionIDKey, executionID)
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
if err != nil {
e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err)
e.logger.With(workflowExecutionIDKey, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
} else {
e.logger.With(eIDKey, executionID).Debug("execution started")
e.logger.With(workflowExecutionIDKey, executionID).Debug("execution started")
logCustMsg(ctx, cma, "execution started", e.logger)
}
case <-ctx.Done():
Expand All @@ -741,8 +741,8 @@ func (e *Engine) worker(ctx context.Context) {
func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// Instantiate a child logger; in addition to the WorkflowID field the workflow
// logger will already have, this adds the `stepRef` and `executionID`
l := e.logger.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
cma := e.cma.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
l := e.logger.With(stepRefKey, msg.stepRef, workflowExecutionIDKey, msg.state.ExecutionID)
cma := e.cma.With(stepRefKey, msg.stepRef, workflowExecutionIDKey, msg.state.ExecutionID)

l.Debug("executing on a step event")
stepState := &store.WorkflowExecutionStep{
Expand Down Expand Up @@ -1104,9 +1104,9 @@ func (e *Engine) Close() error {
return &workflowError{err: innerErr,
reason: fmt.Sprintf("failed to unregister capability from workflow: %+v", reg),
labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: s.ID,
sRKey: s.Ref,
workflowIDKey: e.workflow.id,
stepIDKey: s.ID,
stepRefKey: s.Ref,
}}
}

Expand Down Expand Up @@ -1157,7 +1157,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
if cfg.Store == nil {
return nil, &workflowError{reason: "store is nil",
labels: map[string]string{
wIDKey: cfg.WorkflowID,
workflowIDKey: cfg.WorkflowID,
},
}
}
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
// - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist)
// - etc.

cma := custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, cfg.WorkflowName)
cma := custmsg.NewLabeler().With(workflowIDKey, cfg.WorkflowID, workflowOwnerKey, cfg.WorkflowOwner, workflowNameKey, cfg.WorkflowName)
workflow, err := Parse(cfg.Workflow)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
Expand All @@ -1220,7 +1220,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)},
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(workflowIDKey, cfg.WorkflowID, workflowOwnerKey, cfg.WorkflowOwner, workflowNameKey, workflow.name)},
registry: cfg.Registry,
workflow: workflow,
secretsFetcher: cfg.SecretsFetcher,
Expand Down
Loading
Loading