Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAPPL-121] propagate context on fetch #14956

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe
m = mod
}

return c.executeWithModule(m.module, cfg.Config, request)
return c.executeWithModule(ctx, m.module, cfg.Config, request)
}

func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) {
Expand All @@ -141,7 +141,7 @@ func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, w
return m, nil
}

func (c *Compute) executeWithModule(module *host.Module, config []byte, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
func (c *Compute) executeWithModule(ctx context.Context, module *host.Module, config []byte, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
executeStart := time.Now()
capReq := capabilitiespb.CapabilityRequestToProto(req)

Expand All @@ -154,7 +154,7 @@ func (c *Compute) executeWithModule(module *host.Module, config []byte, req capa
},
},
}
resp, err := module.Run(wasmReq)
resp, err := module.Run(ctx, wasmReq)
if err != nil {
return capabilities.CapabilityResponse{}, fmt.Errorf("error running module: %w", err)
}
Expand Down Expand Up @@ -195,8 +195,8 @@ func (c *Compute) Close() error {
return nil
}

func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return func(req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err)
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(req
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}

resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(context.Background(), messageID, payloadBytes)
resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.20.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v1.0.0-alpha.0.0.20241023165837-8c05ee9b97d5
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595
github.com/smartcontractkit/chainlink/integration-tests v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1074,8 +1074,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.0-alpha.0.0.20241023165837
github.com/smartcontractkit/chainlink-automation v1.0.0-alpha.0.0.20241023165837-8c05ee9b97d5/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb h1:LfcX2Dl59DdxAj49NnbiVJPM0oJVDE7dr+SO+Yz4qUE=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d h1:34F6OuNyPwCwBXBG8I+s6BbngHlVNOtDKWMOZ9iXOpY=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 h1:H6i0LEvXB0se/63E3jE9N0/7TugOYLpK4e6TT6a0omc=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
4 changes: 2 additions & 2 deletions core/services/job/wasm_file_spec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

type WasmFileSpecFactory struct{}

func (w WasmFileSpecFactory) Spec(_ context.Context, workflow, configLocation string) (sdk.WorkflowSpec, []byte, string, error) {
func (w WasmFileSpecFactory) Spec(ctx context.Context, workflow, configLocation string) (sdk.WorkflowSpec, []byte, string, error) {
config, err := os.ReadFile(configLocation)
if err != nil {
return sdk.WorkflowSpec{}, nil, "", err
Expand All @@ -33,7 +33,7 @@ func (w WasmFileSpecFactory) Spec(_ context.Context, workflow, configLocation st
}

moduleConfig := &host.ModuleConfig{Logger: logger.NullLogger}
spec, err := host.GetWorkflowSpec(moduleConfig, compressedBinary, config)
spec, err := host.GetWorkflowSpec(ctx, moduleConfig, compressedBinary, config)
if err != nil {
return sdk.WorkflowSpec{}, nil, "", err
} else if spec == nil {
Expand Down
6 changes: 4 additions & 2 deletions core/services/job/wasm_file_spec_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ func TestWasmFileSpecFactory(t *testing.T) {
require.NoError(t, bwr.Close())

t.Run("Raw binary", func(t *testing.T) {
ctx := testutils.Context(t)
factory := job.WasmFileSpecFactory{}
actual, rawSpec, actualSha, err2 := factory.Spec(testutils.Context(t), binaryLocation, configLocation)
require.NoError(t, err2)

expected, err2 := host.GetWorkflowSpec(&host.ModuleConfig{Logger: logger.NullLogger, IsUncompressed: true}, rawBinary, config)
expected, err2 := host.GetWorkflowSpec(ctx, &host.ModuleConfig{Logger: logger.NullLogger, IsUncompressed: true}, rawBinary, config)
require.NoError(t, err2)

expectedSha := sha256.New()
Expand All @@ -55,6 +56,7 @@ func TestWasmFileSpecFactory(t *testing.T) {
})

t.Run("Compressed binary", func(t *testing.T) {
ctx := testutils.Context(t)
brLoc := strings.Replace(binaryLocation, ".wasm", ".br", 1)
compressedBytes := b.Bytes()
require.NoError(t, os.WriteFile(brLoc, compressedBytes, 0600))
Expand All @@ -63,7 +65,7 @@ func TestWasmFileSpecFactory(t *testing.T) {
actual, rawSpec, actualSha, err2 := factory.Spec(testutils.Context(t), brLoc, configLocation)
require.NoError(t, err2)

expected, err2 := host.GetWorkflowSpec(&host.ModuleConfig{Logger: logger.NullLogger, IsUncompressed: true}, rawBinary, config)
expected, err2 := host.GetWorkflowSpec(ctx, &host.ModuleConfig{Logger: logger.NullLogger, IsUncompressed: true}, rawBinary, config)
require.NoError(t, err2)

expectedSha := sha256.New()
Expand Down
6 changes: 3 additions & 3 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type secretsFetcher interface {
// Engine handles the lifecycle of a single workflow and its executions.
type Engine struct {
services.StateMachine
cma custmsg.Labeler
cma custmsg.MessageEmitter
metrics workflowsMetricLabeler
logger logger.Logger
registry core.CapabilitiesRegistry
Expand Down Expand Up @@ -1240,8 +1240,8 @@ func (e *workflowError) Error() string {
return errStr
}

func logCustMsg(cma custmsg.Labeler, msg string, log logger.Logger) {
err := cma.SendLogAsCustomMessage(msg)
func logCustMsg(cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(msg)
if err != nil {
log.Errorf("failed to send custom message with msg: %s", msg)
}
Expand Down
2 changes: 2 additions & 0 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ func TestEngine_WithCustomComputeStep(t *testing.T) {
binaryB := wasmtest.CreateTestBinary(cmd, binary, true, t)

spec, err := host.GetWorkflowSpec(
ctx,
&host.ModuleConfig{Logger: log},
binaryB,
nil, // config
Expand Down Expand Up @@ -1511,6 +1512,7 @@ func TestEngine_CustomComputePropagatesBreaks(t *testing.T) {
binaryB := wasmtest.CreateTestBinary(cmd, binary, true, t)

spec, err := host.GetWorkflowSpec(
ctx,
&host.ModuleConfig{Logger: log},
binaryB,
nil, // config
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.27
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e
github.com/smartcontractkit/chainlink-feeds v0.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1057,8 +1057,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb h1:LfcX2Dl59DdxAj49NnbiVJPM0oJVDE7dr+SO+Yz4qUE=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d h1:34F6OuNyPwCwBXBG8I+s6BbngHlVNOtDKWMOZ9iXOpY=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 h1:H6i0LEvXB0se/63E3jE9N0/7TugOYLpK4e6TT6a0omc=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.27
github.com/smartcontractkit/chainlink-automation v1.0.0-alpha.0.0.20241023165837-8c05ee9b97d5
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595
github.com/smartcontractkit/chainlink-protos/job-distributor v0.4.0
github.com/smartcontractkit/chainlink-testing-framework/havoc v1.50.0
github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.12
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1404,8 +1404,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.0-alpha.0.0.20241023165837
github.com/smartcontractkit/chainlink-automation v1.0.0-alpha.0.0.20241023165837-8c05ee9b97d5/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb h1:LfcX2Dl59DdxAj49NnbiVJPM0oJVDE7dr+SO+Yz4qUE=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d h1:34F6OuNyPwCwBXBG8I+s6BbngHlVNOtDKWMOZ9iXOpY=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 h1:H6i0LEvXB0se/63E3jE9N0/7TugOYLpK4e6TT6a0omc=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/load/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.33.0
github.com/slack-go/slack v0.15.0
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595
github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.12
github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.1
github.com/smartcontractkit/chainlink-testing-framework/wasp v1.50.0
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/load/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1381,8 +1381,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.0-alpha.0.0.20241023165837
github.com/smartcontractkit/chainlink-automation v1.0.0-alpha.0.0.20241023165837-8c05ee9b97d5/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb h1:LfcX2Dl59DdxAj49NnbiVJPM0oJVDE7dr+SO+Yz4qUE=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d h1:34F6OuNyPwCwBXBG8I+s6BbngHlVNOtDKWMOZ9iXOpY=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 h1:H6i0LEvXB0se/63E3jE9N0/7TugOYLpK4e6TT6a0omc=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
Loading