Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add beholder logging for custom compute #15122

Merged
merged 15 commits into from
Nov 11, 2024
Merged
39 changes: 28 additions & 11 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"

Expand All @@ -21,6 +22,7 @@ import (
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
Expand Down Expand Up @@ -117,7 +119,7 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe

m, ok := c.modules.get(id)
if !ok {
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.WorkflowExecutionID, request.Metadata.ReferenceID)
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
Expand All @@ -128,10 +130,10 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe
return c.executeWithModule(ctx, m.module, cfg.Config, request)
}

func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) {
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, requestMetadata capabilities.RequestMetadata) (*module, error) {
initStart := time.Now()

cfg.Fetch = c.createFetcher(workflowID, workflowExecutionID)
cfg.Fetch = c.createFetcher()
mod, err := host.NewModule(cfg, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
Expand All @@ -140,7 +142,7 @@ func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, w
mod.Start()

initDuration := time.Since(initStart)
computeWASMInit.WithLabelValues(workflowID, referenceID).Observe(float64(initDuration))
computeWASMInit.WithLabelValues(requestMetadata.WorkflowID, requestMetadata.ReferenceID).Observe(float64(initDuration))

m := &module{module: mod}
c.modules.add(id, m)
Expand Down Expand Up @@ -201,18 +203,26 @@ func (c *Compute) Close() error {
return nil
}

func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err)
if err := validation.ValidateWorkflowOrExecutionID(req.WorkflowId); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", req.WorkflowId, err)
}
if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err)
if err := validation.ValidateWorkflowOrExecutionID(req.WorkflowExecutionId); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.WorkflowExecutionId, err)
}

cma := c.emitter.With(
wIDKey, req.WorkflowId,
wnKey, req.WorkflowName,
woIDKey, req.WorkflowOwner,
eIDKey, req.WorkflowExecutionId,
timestampKey, time.Now().UTC().Format(time.RFC3339Nano),
)

messageID := strings.Join([]string{
workflowID,
workflowExecutionID,
req.WorkflowId,
req.WorkflowExecutionId,
ghcapabilities.MethodComputeAction,
c.idGenerator(),
}, "/")
Expand Down Expand Up @@ -245,6 +255,13 @@ func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx
if err != nil {
return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err)
}
if response.StatusCode != http.StatusOK {
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode)
err = cma.Emit(ctx, msg)
if err != nil {
c.log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}
return &response, nil
}
}
Expand Down
10 changes: 10 additions & 0 deletions core/capabilities/compute/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package compute
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved

// Observability keys
const (
wIDKey = "workflowID"
eIDKey = "workflowExecutionID"
wnKey = "workflowName"
woIDKey = "workflowOwner"
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
timestampKey = "computeTimestamp"
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.27
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241101132401-00090bf7feb4
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241105152536-46c3902b0bcb
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e
github.com/smartcontractkit/chainlink-feeds v0.1.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,10 @@ github.com/smartcontractkit/chainlink-ccip v0.0.0-20241101132401-00090bf7feb4 h1
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241101132401-00090bf7feb4/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 h1:AGi0kAtMRW1zl1h7sGw+3CKO4Nlev6iA08YfEcgJCGs=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241105151155-7fba27d48ac8 h1:201Qzwo1CdzrP5z4fiyUPeXGhtmzojmFRw/WWaI5TLc=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241105151155-7fba27d48ac8/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241105152536-46c3902b0bcb h1:7aIxwoLaTqMhBluqqhnCxRlrfBWvsOk+oNAWzY7ESxk=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241105152536-46c3902b0bcb/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down