Skip to content

Commit

Permalink
feat(handler): implements handle workflow registered event (#15383)
Browse files Browse the repository at this point in the history
* feat(workflows): adds orm methods for managing specs

* feat(handler): implements handle workflow registered event

* chore(syncer): lint changes

* refactor(workflows/handler): check event type higher up

* refactor(workflows/handler): force update secrets with beholder

* refactor(workflows/handler): use custom error function

* fix(workflows): wires up emitter
  • Loading branch information
MStreet3 authored Nov 25, 2024
1 parent f5d228e commit fa78941
Show file tree
Hide file tree
Showing 7 changed files with 486 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper"
Expand All @@ -29,6 +30,7 @@ func Test_SecretsWorker(t *testing.T) {
var (
ctx = coretestutils.Context(t)
lggr = logger.TestLogger(t)
emitter = custmsg.NewLabeler()
backendTH = testutils.NewEVMBackendTH(t)
db = pgtest.NewSqlxDB(t)
orm = syncer.NewWorkflowRegistryDS(db, lggr)
Expand Down Expand Up @@ -119,6 +121,7 @@ func Test_SecretsWorker(t *testing.T) {
wfRegistryAddr.Hex(),
nil,
nil,
emitter,
syncer.WithTicker(giveTicker.C),
)

Expand Down
192 changes: 174 additions & 18 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ package syncer

import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

Expand Down Expand Up @@ -87,15 +93,28 @@ type WorkflowRegistryWorkflowDeletedV1 struct {
WorkflowName string
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
}

// secretsFetcherFunc implements the secretsFetcher interface for a function.
type secretsFetcherFunc func(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)

func (f secretsFetcherFunc) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
return f(ctx, workflowOwner, workflowName)
}

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm WorkflowSecretsDS
orm WorkflowRegistryDS
fetcher FetcherFunc
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *engineRegistry
emitter custmsg.MessageEmitter
secretsFetcher secretsFetcher
}

// newEventHandler returns a new eventHandler instance.
Expand All @@ -106,6 +125,8 @@ func newEventHandler(
workflowStore store.Store,
capRegistry core.CapabilitiesRegistry,
engineRegistry *engineRegistry,
emitter custmsg.MessageEmitter,
secretsFetcher secretsFetcher,
) *eventHandler {
return &eventHandler{
lggr: lggr,
Expand All @@ -114,15 +135,50 @@ func newEventHandler(
workflowStore: workflowStore,
capRegistry: capRegistry,
engineRegistry: engineRegistry,
emitter: emitter,
secretsFetcher: secretsFetcher,
}
}

func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error {
switch event.EventType {
case ForceUpdateSecretsEvent:
return h.forceUpdateSecretsEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1)
if !ok {
return newHandlerTypeError(event.Data)
}

cma := h.emitter.With(
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner),
)

if err := h.forceUpdateSecretsEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr)
return err
}

return nil
case WorkflowRegisteredEvent:
return h.workflowRegisteredEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowRegisteredV1)
if !ok {
return newHandlerTypeError(event.Data)
}
wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowRegisteredEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow registered event: %v", err), h.lggr)
return err
}

h.lggr.Debugf("workflow 0x%x registered and started", wfID)
return nil
case WorkflowUpdatedEvent:
return h.workflowUpdatedEvent(ctx, event)
case WorkflowPausedEvent:
Expand All @@ -135,12 +191,97 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent)
}

// workflowRegisteredEvent handles the WorkflowRegisteredEvent event type.
// TODO: Implement this method
func (h *eventHandler) workflowRegisteredEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowRegisteredV1,
) error {
return ErrNotImplemented
wfID := hex.EncodeToString(payload.WorkflowID[:])

// Download the contents of binaryURL, configURL and secretsURL and cache them locally.
binary, err := h.fetcher(ctx, payload.BinaryURL)
if err != nil {
return fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err)
}

config, err := h.fetcher(ctx, payload.ConfigURL)
if err != nil {
return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err)
}

secrets, err := h.fetcher(ctx, payload.SecretsURL)
if err != nil {
return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err)
}

// Calculate the hash of the binary and config files
hash := workflowID(binary, config, []byte(payload.SecretsURL))

// Pre-check: verify that the workflowID matches; if it doesn’t abort and log an error via Beholder.
if hash != wfID {
return fmt.Errorf("workflowID mismatch: %s != %s", hash, wfID)
}

// Save the workflow secrets
urlHash, err := h.orm.GetSecretsURLHash(payload.WorkflowOwner, []byte(payload.SecretsURL))
if err != nil {
return fmt.Errorf("failed to get secrets URL hash: %w", err)
}

// Create a new entry in the workflow_spec table corresponding for the new workflow, with the contents of the binaryURL + configURL in the table
status := job.WorkflowSpecStatusActive
if payload.Status == 1 {
status = job.WorkflowSpecStatusPaused
}

entry := &job.WorkflowSpec{
Workflow: hex.EncodeToString(binary),
Config: string(config),
WorkflowID: wfID,
Status: status,
WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner),
WorkflowName: payload.WorkflowName,
SpecType: job.WASMFile,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
}
if _, err = h.orm.UpsertWorkflowSpecWithSecrets(ctx, entry, payload.SecretsURL, hex.EncodeToString(urlHash), string(secrets)); err != nil {
return fmt.Errorf("failed to upsert workflow spec with secrets: %w", err)
}

if status != job.WorkflowSpecStatusActive {
return nil
}

// If status == active, start a new WorkflowEngine instance, and add it to local engine registry
moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter}
sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config)
if err != nil {
return fmt.Errorf("failed to get workflow sdk spec: %w", err)
}

cfg := workflows.Config{
Lggr: h.lggr,
Workflow: *sdkSpec,
WorkflowID: wfID,
WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner),
WorkflowName: payload.WorkflowName,
Registry: h.capRegistry,
Store: h.workflowStore,
Config: config,
Binary: binary,
SecretsFetcher: h.secretsFetcher,
}
e, err := workflows.NewEngine(ctx, cfg)
if err != nil {
return fmt.Errorf("failed to create workflow engine: %w", err)
}

if err := e.Start(ctx); err != nil {
return fmt.Errorf("failed to start workflow engine: %w", err)
}

h.engineRegistry.Add(wfID, e)
return nil
}

// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type.
Expand Down Expand Up @@ -170,32 +311,47 @@ func (h *eventHandler) workflowActivatedEvent(
// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type.
func (h *eventHandler) forceUpdateSecretsEvent(
ctx context.Context,
event WorkflowRegistryEvent,
payload WorkflowRegistryForceUpdateSecretsRequestedV1,
) error {
// Get the URL of the secrets file from the event data
data, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

hash := hex.EncodeToString(data.SecretsURLHash)
hash := hex.EncodeToString(payload.SecretsURLHash)

url, err := h.orm.GetSecretsURLByHash(ctx, hash)
if err != nil {
h.lggr.Errorf("failed to get URL by hash %s : %s", hash, err)
return err
return fmt.Errorf("failed to get URL by hash %s : %w", hash, err)
}

// Fetch the contents of the secrets file from the url via the fetcher
secrets, err := h.fetcher(ctx, url)
if err != nil {
return err
return fmt.Errorf("failed to fetch secrets from url %s : %w", url, err)
}

// Update the secrets in the ORM
if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil {
return err
return fmt.Errorf("failed to update secrets: %w", err)
}

return nil
}

// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL.
func workflowID(wasm, config, secretsURL []byte) string {
sum := sha256.New()
sum.Write(wasm)
sum.Write(config)
sum.Write(secretsURL)
return hex.EncodeToString(sum.Sum(nil))
}

// logCustMsg emits a custom message to the external sink and logs an error if that fails.
func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(ctx, msg)
if err != nil {
log.Helper(1).Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}

func newHandlerTypeError(data any) error {
return fmt.Errorf("invalid data type %T for event", data)
}
Loading

0 comments on commit fa78941

Please sign in to comment.