Skip to content

Commit

Permalink
fix(workflows): wires up emitter
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 25, 2024
1 parent dd6ebda commit d982208
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper"
Expand All @@ -29,6 +30,7 @@ func Test_SecretsWorker(t *testing.T) {
var (
ctx = coretestutils.Context(t)
lggr = logger.TestLogger(t)
emitter = custmsg.NewLabeler()
backendTH = testutils.NewEVMBackendTH(t)
db = pgtest.NewSqlxDB(t)
orm = syncer.NewWorkflowRegistryDS(db, lggr)
Expand Down Expand Up @@ -119,6 +121,7 @@ func Test_SecretsWorker(t *testing.T) {
wfRegistryAddr.Hex(),
nil,
nil,
emitter,
syncer.WithTicker(giveTicker.C),
)

Expand Down
8 changes: 4 additions & 4 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (h *eventHandler) workflowRegisteredEvent(
}

// Calculate the hash of the binary and config files
hash := sha(binary, config, []byte(payload.SecretsURL))
hash := workflowID(binary, config, []byte(payload.SecretsURL))

// Pre-check: verify that the workflowID matches; if it doesn’t abort and log an error via Beholder.
if hash != wfID {
Expand Down Expand Up @@ -335,8 +335,8 @@ func (h *eventHandler) forceUpdateSecretsEvent(
return nil
}

// sha calculates the sha256 hash of the wasm, config and secretsURL to determine the workflow ID.
func sha(wasm, config, secretsURL []byte) string {
// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL.
func workflowID(wasm, config, secretsURL []byte) string {
sum := sha256.New()
sum.Write(wasm)
sum.Write(config)
Expand All @@ -348,7 +348,7 @@ func sha(wasm, config, secretsURL []byte) string {
func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(ctx, msg)
if err != nil {
log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
log.Helper(1).Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}

Expand Down
6 changes: 5 additions & 1 deletion core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services"
types "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
Expand Down Expand Up @@ -95,6 +96,7 @@ type workflowRegistry struct {
ticker <-chan time.Time

lggr logger.Logger
emitter custmsg.Labeler
orm WorkflowRegistryDS
reader ContractReader
gateway FetcherFunc
Expand Down Expand Up @@ -147,11 +149,13 @@ func NewWorkflowRegistry[T ContractReader](
addr string,
workflowStore store.Store,
capRegistry core.CapabilitiesRegistry,
emitter custmsg.Labeler,
opts ...func(*workflowRegistry),
) *workflowRegistry {
ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent}
wr := &workflowRegistry{
lggr: lggr.Named(name),
emitter: emitter,
orm: orm,
reader: reader,
gateway: gateway,
Expand All @@ -172,7 +176,7 @@ func NewWorkflowRegistry[T ContractReader](
batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)),
}
wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry,
wr.engineRegistry, nil, secretsFetcherFunc(wr.SecretsFor),
wr.engineRegistry, wr.emitter, secretsFetcherFunc(wr.SecretsFor),
)
for _, opt := range opts {
opt(wr)
Expand Down
4 changes: 3 additions & 1 deletion core/services/workflows/syncer/workflow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
types "github.com/smartcontractkit/chainlink-common/pkg/types"
query "github.com/smartcontractkit/chainlink-common/pkg/types/query"
Expand Down Expand Up @@ -51,11 +52,12 @@ func Test_Workflow_Registry_Syncer(t *testing.T) {
orm = &orm{ds: db, lggr: lggr}
ctx, cancel = context.WithCancel(testutils.Context(t))
reader = NewMockContractReader(t)
emitter = custmsg.NewLabeler()
gateway = func(_ context.Context, _ string) ([]byte, error) {
return []byte(wantContents), nil
}
ticker = make(chan time.Time)
worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, WithTicker(ticker))
worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, emitter, WithTicker(ticker))
)

// Cleanup the worker
Expand Down

0 comments on commit d982208

Please sign in to comment.