From d9822086191c0509de5c1f76f41a9622e499d73b Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 25 Nov 2024 16:22:59 +0200 Subject: [PATCH] fix(workflows): wires up emitter --- .../capabilities/workflows/syncer/workflow_syncer_test.go | 3 +++ core/services/workflows/syncer/handler.go | 8 ++++---- core/services/workflows/syncer/workflow_registry.go | 6 +++++- core/services/workflows/syncer/workflow_registry_test.go | 4 +++- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index d6f507eac20..ba29e98526e 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" @@ -29,6 +30,7 @@ func Test_SecretsWorker(t *testing.T) { var ( ctx = coretestutils.Context(t) lggr = logger.TestLogger(t) + emitter = custmsg.NewLabeler() backendTH = testutils.NewEVMBackendTH(t) db = pgtest.NewSqlxDB(t) orm = syncer.NewWorkflowRegistryDS(db, lggr) @@ -119,6 +121,7 @@ func Test_SecretsWorker(t *testing.T) { wfRegistryAddr.Hex(), nil, nil, + emitter, syncer.WithTicker(giveTicker.C), ) diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 7f670ca7b1b..5ccb3f5e180 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -214,7 +214,7 @@ func (h *eventHandler) workflowRegisteredEvent( } // Calculate the hash of the binary and config files - hash := sha(binary, config, []byte(payload.SecretsURL)) + hash := workflowID(binary, config, []byte(payload.SecretsURL)) // Pre-check: verify that the workflowID matches; if it doesn’t abort and log an error via Beholder. if hash != wfID { @@ -335,8 +335,8 @@ func (h *eventHandler) forceUpdateSecretsEvent( return nil } -// sha calculates the sha256 hash of the wasm, config and secretsURL to determine the workflow ID. -func sha(wasm, config, secretsURL []byte) string { +// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL. +func workflowID(wasm, config, secretsURL []byte) string { sum := sha256.New() sum.Write(wasm) sum.Write(config) @@ -348,7 +348,7 @@ func sha(wasm, config, secretsURL []byte) string { func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) { err := cma.Emit(ctx, msg) if err != nil { - log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err) + log.Helper(1).Errorf("failed to send custom message with msg: %s, err: %v", msg, err) } } diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 07eff425057..cdd0c71acc0 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services" types "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/core" @@ -95,6 +96,7 @@ type workflowRegistry struct { ticker <-chan time.Time lggr logger.Logger + emitter custmsg.Labeler orm WorkflowRegistryDS reader ContractReader gateway FetcherFunc @@ -147,11 +149,13 @@ func NewWorkflowRegistry[T ContractReader]( addr string, workflowStore store.Store, capRegistry core.CapabilitiesRegistry, + emitter custmsg.Labeler, opts ...func(*workflowRegistry), ) *workflowRegistry { ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} wr := &workflowRegistry{ lggr: lggr.Named(name), + emitter: emitter, orm: orm, reader: reader, gateway: gateway, @@ -172,7 +176,7 @@ func NewWorkflowRegistry[T ContractReader]( batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), } wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry, - wr.engineRegistry, nil, secretsFetcherFunc(wr.SecretsFor), + wr.engineRegistry, wr.emitter, secretsFetcherFunc(wr.SecretsFor), ) for _, opt := range opts { opt(wr) diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 652b20deea1..58dcbed1022 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" types "github.com/smartcontractkit/chainlink-common/pkg/types" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" @@ -51,11 +52,12 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { orm = &orm{ds: db, lggr: lggr} ctx, cancel = context.WithCancel(testutils.Context(t)) reader = NewMockContractReader(t) + emitter = custmsg.NewLabeler() gateway = func(_ context.Context, _ string) ([]byte, error) { return []byte(wantContents), nil } ticker = make(chan time.Time) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, WithTicker(ticker)) + worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, emitter, WithTicker(ticker)) ) // Cleanup the worker