Skip to content

Commit

Permalink
feat(workflows): adds heap for joining multiple event types
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 13, 2024
1 parent 89aaf63 commit a79599a
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper"
coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/utils/signalers"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -84,18 +84,21 @@ func Test_SecretsWorker(t *testing.T) {
contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes)
require.NoError(t, err)

err = contractReader.Bind(ctx, []types.BoundContract{{Name: contractName, Address: wfRegistryAddr.Hex()}})
require.NoError(t, err)

// Seed the DB
gotID, err := orm.Update(ctx, giveSecretsURL, giveContents)
require.NoError(t, err)

gotSecretsURL, err := orm.GetSecretsURLByID(ctx, gotID)
assert.NoError(t, err)
assert.Equal(t, giveSecretsURL, gotSecretsURL)
require.NoError(t, err)
require.Equal(t, giveSecretsURL, gotSecretsURL)

// verify the DB
contents, err := orm.GetContents(ctx, giveSecretsURL)
assert.NoError(t, err)
assert.Equal(t, contents, giveContents)
require.NoError(t, err)
require.Equal(t, contents, giveContents)

worker := syncer.NewWorkflowRegistry(
lggr,
Expand Down
8 changes: 4 additions & 4 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"

"github.com/mitchellh/mapstructure"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

Expand Down Expand Up @@ -71,9 +70,10 @@ func (h *eventHandler) forceUpdateSecretsEvent(
// getSecretsURL returns the URL of the secrets contents from the event data and fails
// if the URL is not found or is not a string.
func getSecretsURL(event WorkflowRegistryEvent) (string, error) {
var data WorkflowRegistryForceUpdateSecretsRequestedV1
if err := mapstructure.Decode(event.Data, &data); err != nil {
return "", fmt.Errorf("failed to decode event data: %v", err)
data, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1)
if !ok {
return "", fmt.Errorf("invalid data type %T for event", event.Data)
}

return data.SecretsURL, nil
}
32 changes: 23 additions & 9 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,29 @@ func Test_Handler(t *testing.T) {
}

func Test_getURLHash(t *testing.T) {
giveURL := "http://example.com"
t.Run("success", func(t *testing.T) {
giveURL := "http://example.com"

giveEvent := WorkflowRegistryEvent{
Data: map[string]any{
"SecretsURL": giveURL,
},
}
gotURL, err := getSecretsURL(giveEvent)
require.NoError(t, err)
giveEvent := WorkflowRegistryEvent{
Data: WorkflowRegistryForceUpdateSecretsRequestedV1{
SecretsURL: giveURL,
},
}
gotURL, err := getSecretsURL(giveEvent)
require.NoError(t, err)

assert.Equal(t, giveURL, gotURL)
})

assert.Equal(t, giveURL, gotURL)
t.Run("fail with incorrect type", func(t *testing.T) {
giveURL := "http://example.com"

giveEvent := WorkflowRegistryEvent{
Data: map[string]any{
"SecretsURL": giveURL,
},
}
_, err := getSecretsURL(giveEvent)
require.Error(t, err)
})
}
63 changes: 63 additions & 0 deletions core/services/workflows/syncer/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package syncer

import "container/heap"

type Heap interface {
// Push adds a new item to the heap.
Push(x WorkflowRegistryEventResponse)

// Pop removes the smallest item from the heap and returns it.
Pop() WorkflowRegistryEventResponse

// Len returns the number of items in the heap.
Len() int
}

// publicHeap is a wrapper around the heap.Interface that exposes the Push and Pop methods.
type publicHeap[T any] struct {
heap heap.Interface
}

func (h *publicHeap[T]) Push(x T) {
heap.Push(h.heap, x)
}

func (h *publicHeap[T]) Pop() T {
return heap.Pop(h.heap).(T)
}

func (h *publicHeap[T]) Len() int {
return h.heap.Len()
}

// blockHeightHeap is a heap.Interface that sorts WorkflowRegistryEventResponses by block height.
type blockHeightHeap []WorkflowRegistryEventResponse

// newBlockHeightHeap returns an initialized heap that sorts WorkflowRegistryEventResponses by block height.
func newBlockHeightHeap() Heap {
h := blockHeightHeap(make([]WorkflowRegistryEventResponse, 0))
heap.Init(&h)
return &publicHeap[WorkflowRegistryEventResponse]{heap: &h}
}

func (h *blockHeightHeap) Len() int { return len(*h) }

func (h *blockHeightHeap) Less(i, j int) bool {
return (*h)[i].Event.Head.Height < (*h)[j].Event.Head.Height
}

func (h *blockHeightHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
}

func (h *blockHeightHeap) Push(x any) {
*h = append(*h, x.(WorkflowRegistryEventResponse))
}

func (h *blockHeightHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
Loading

0 comments on commit a79599a

Please sign in to comment.