Skip to content

Commit

Permalink
Cappl 391 wire in workflow registry (#15460)
Browse files Browse the repository at this point in the history
* don notifier added

* wip

* wip

* wire up of wf syncer

* test udpate

* srvcs fix

* review comments

* lint

* attempt to fix ci tests
  • Loading branch information
ettec authored Nov 29, 2024
1 parent 90403e2 commit 44cab8d
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 93 deletions.
43 changes: 43 additions & 0 deletions core/capabilities/don_notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package capabilities

import (
"context"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
)

type DonNotifier struct {
mu sync.Mutex
don capabilities.DON
notified bool
ch chan struct{}
}

func NewDonNotifier() *DonNotifier {
return &DonNotifier{
ch: make(chan struct{}),
}
}

func (n *DonNotifier) NotifyDonSet(don capabilities.DON) {
n.mu.Lock()
defer n.mu.Unlock()
if !n.notified {
n.don = don
n.notified = true
close(n.ch)
}
}

func (n *DonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) {
select {
case <-ctx.Done():
return capabilities.DON{}, ctx.Err()
case <-n.ch:
}
<-n.ch
n.mu.Lock()
defer n.mu.Unlock()
return n.don, nil
}
49 changes: 49 additions & 0 deletions core/capabilities/don_notifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package capabilities_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"

"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink/v2/core/capabilities"
)

func TestDonNotifier_WaitForDon(t *testing.T) {
notifier := capabilities.NewDonNotifier()
don := commoncap.DON{
ID: 1,
}

go func() {
time.Sleep(100 * time.Millisecond)
notifier.NotifyDonSet(don)
}()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

result, err := notifier.WaitForDon(ctx)
require.NoError(t, err)
assert.Equal(t, don, result)

result, err = notifier.WaitForDon(ctx)
require.NoError(t, err)
assert.Equal(t, don, result)
}

func TestDonNotifier_WaitForDon_ContextTimeout(t *testing.T) {
notifier := capabilities.NewDonNotifier()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()

_, err := notifier.WaitForDon(ctx)
require.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
}
29 changes: 19 additions & 10 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ var defaultStreamConfig = p2ptypes.StreamConfig{

type launcher struct {
services.StateMachine
lggr logger.Logger
peerWrapper p2ptypes.PeerWrapper
dispatcher remotetypes.Dispatcher
registry *Registry
subServices []services.Service
lggr logger.Logger
peerWrapper p2ptypes.PeerWrapper
dispatcher remotetypes.Dispatcher
registry *Registry
subServices []services.Service
workflowDonNotifier donNotifier
}

func unmarshalCapabilityConfig(data []byte) (capabilities.CapabilityConfiguration, error) {
Expand Down Expand Up @@ -86,18 +87,24 @@ func unmarshalCapabilityConfig(data []byte) (capabilities.CapabilityConfiguratio
}, nil
}

type donNotifier interface {
NotifyDonSet(don capabilities.DON)
}

func NewLauncher(
lggr logger.Logger,
peerWrapper p2ptypes.PeerWrapper,
dispatcher remotetypes.Dispatcher,
registry *Registry,
workflowDonNotifier donNotifier,
) *launcher {
return &launcher{
lggr: lggr.Named("CapabilitiesLauncher"),
peerWrapper: peerWrapper,
dispatcher: dispatcher,
registry: registry,
subServices: []services.Service{},
lggr: lggr.Named("CapabilitiesLauncher"),
peerWrapper: peerWrapper,
dispatcher: dispatcher,
registry: registry,
subServices: []services.Service{},
workflowDonNotifier: workflowDonNotifier,
}
}

Expand Down Expand Up @@ -215,6 +222,8 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist
return errors.New("invariant violation: node is part of more than one workflowDON")
}

w.workflowDonNotifier.NotifyDonSet(myDON.DON)

for _, rcd := range remoteCapabilityDONs {
err := w.addRemoteCapabilities(ctx, myDON, rcd, state)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import (

var _ capabilities.TriggerCapability = (*mockTrigger)(nil)

type mockDonNotifier struct {
}

func (m *mockDonNotifier) NotifyDonSet(don capabilities.DON) {
}

type mockTrigger struct {
capabilities.CapabilityInfo
}
Expand Down Expand Up @@ -196,6 +202,7 @@ func TestLauncher(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil)
Expand Down Expand Up @@ -305,6 +312,7 @@ func TestLauncher(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

err = launcher.Launch(ctx, state)
Expand Down Expand Up @@ -409,6 +417,7 @@ func TestLauncher(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

err = launcher.Launch(ctx, state)
Expand Down Expand Up @@ -600,6 +609,7 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
Expand Down Expand Up @@ -752,6 +762,7 @@ func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

// If the DON were public, this would fail with two errors:
Expand Down Expand Up @@ -917,6 +928,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
Expand Down Expand Up @@ -1082,6 +1094,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

dispatcher.On("SetReceiver", fullTriggerCapID, triggerCapDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
Expand Down Expand Up @@ -1232,6 +1245,7 @@ func TestLauncher_SucceedsEvenIfDispatcherAlreadyHasReceiver(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

err = launcher.Launch(ctx, state)
Expand Down
85 changes: 74 additions & 11 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
Expand All @@ -33,6 +34,7 @@ import (
gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand All @@ -48,6 +50,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/feeds"
"github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway"
capabilities2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
common2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/headreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keeper"
Expand All @@ -71,6 +75,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth"
"github.com/smartcontractkit/chainlink/v2/core/sessions/localauth"
Expand Down Expand Up @@ -212,6 +217,17 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)
}

var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper
if cfg.Capabilities().GatewayConnector().DonID() != "" {
globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID())
gatewayConnectorWrapper = gatewayconnector.NewGatewayConnectorServiceWrapper(
cfg.Capabilities().GatewayConnector(),
keyStore.Eth(),
clockwork.NewRealClock(),
globalLogger)
srvcs = append(srvcs, gatewayConnectorWrapper)
}

var externalPeerWrapper p2ptypes.PeerWrapper
if cfg.Capabilities().Peering().Enabled() {
var dispatcher remotetypes.Dispatcher
Expand Down Expand Up @@ -256,32 +272,79 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("could not configure syncer: %w", err)
}

workflowDonNotifier := capabilities.NewDonNotifier()

wfLauncher := capabilities.NewLauncher(
globalLogger,
externalPeerWrapper,
dispatcher,
opts.CapabilitiesRegistry,
workflowDonNotifier,
)
registrySyncer.AddLauncher(wfLauncher)

srvcs = append(srvcs, wfLauncher, registrySyncer)

if cfg.Capabilities().WorkflowRegistry().Address() != "" {
if gatewayConnectorWrapper == nil {
return nil, errors.New("unable to create workflow registry syncer without gateway connector")
}

err = keyStore.Workflow().EnsureKey(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to ensure workflow key: %w", err)
}

keys, err := keyStore.Workflow().GetAll()
if err != nil {
return nil, fmt.Errorf("failed to get all workflow keys: %w", err)
}
if len(keys) != 1 {
return nil, fmt.Errorf("expected 1 key, got %d", len(keys))
}

connector := gatewayConnectorWrapper.GetGatewayConnector()
webAPILggr := globalLogger.Named("WebAPITarget")

webAPIConfig := webapi.ServiceConfig{
RateLimiter: common2.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}

outgoingConnectorHandler, err := webapi.NewOutgoingConnectorHandler(connector,
webAPIConfig,
capabilities2.MethodWebAPITarget, webAPILggr)
if err != nil {
return nil, fmt.Errorf("could not create outgoing connector handler: %w", err)
}

eventHandler := syncer.NewEventHandler(globalLogger, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger),
syncer.NewFetcherFunc(globalLogger, outgoingConnectorHandler), workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()), opts.CapabilitiesRegistry,
custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0])

loader := syncer.NewWorkflowRegistryContractLoader(cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
}, eventHandler)

wfSyncer := syncer.NewWorkflowRegistry(globalLogger, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
}, cfg.Capabilities().WorkflowRegistry().Address(),
syncer.WorkflowEventPollerConfig{
QueryCount: 100,
}, eventHandler, loader, workflowDonNotifier)

srvcs = append(srvcs, wfSyncer)
}
}
} else {
globalLogger.Debug("External registry not configured, skipping registry syncer and starting with an empty registry")
opts.CapabilitiesRegistry.SetLocalRegistry(&capabilities.TestMetadataRegistry{})
}

var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper
if cfg.Capabilities().GatewayConnector().DonID() != "" {
globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID())
gatewayConnectorWrapper = gatewayconnector.NewGatewayConnectorServiceWrapper(
cfg.Capabilities().GatewayConnector(),
keyStore.Eth(),
clockwork.NewRealClock(),
globalLogger)
srvcs = append(srvcs, gatewayConnectorWrapper)
}

// LOOPs can be created as options, in the case of LOOP relayers, or
// as OCR2 job implementations, in the case of Median today.
// We will have a non-nil registry here in LOOP relayers are being used, otherwise
Expand Down
Loading

0 comments on commit 44cab8d

Please sign in to comment.