Skip to content

Commit

Permalink
Add metadata exchange to trigger and handler
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidOrchard committed Nov 11, 2024
1 parent 50c1b3d commit e966e1e
Show file tree
Hide file tree
Showing 5 changed files with 422 additions and 38 deletions.
69 changes: 64 additions & 5 deletions core/capabilities/webapi/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"sync"
"time"

ethCommon "github.com/ethereum/go-ethereum/common"

Expand All @@ -14,6 +16,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"

"github.com/smartcontractkit/chainlink/v2/common/types"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
Expand All @@ -23,6 +27,7 @@ import (
)

const defaultSendChannelBufferSize = 1000
const updateGatewayIntervalS = 60

const TriggerType = "[email protected]"

Expand All @@ -38,6 +43,7 @@ type webapiTrigger struct {
ch chan<- capabilities.TriggerResponse
config webapicap.TriggerConfig
rateLimiter *common.RateLimiter
rawConfig *values.Map
}

type triggerConnectorHandler struct {
Expand All @@ -50,6 +56,7 @@ type triggerConnectorHandler struct {
mu sync.Mutex
registeredWorkflows map[string]webapiTrigger
registry core.CapabilitiesRegistry
wg sync.WaitGroup
}

var _ capabilities.TriggerCapability = (*triggerConnectorHandler)(nil)
Expand Down Expand Up @@ -132,14 +139,14 @@ func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID
}

func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
// TODO: Validate Signature
// TODO: Validate Signature https://smartcontract-it.atlassian.net/browse/CAPPL-92
body := &msg.Body
sender := ethCommon.HexToAddress(body.Sender)
var payload webapicap.TriggerRequestPayload
err := json.Unmarshal(body.Payload, &payload)
if err != nil {
h.lggr.Errorw("error decoding payload", "err", err)
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s decoding payload", err.Error()).Error()})
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s decoding payload", err.Error())})
if err != nil {
h.lggr.Errorw("error sending response", "err", err)
}
Expand All @@ -164,13 +171,62 @@ func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gate

default:
h.lggr.Errorw("unsupported method", "id", gatewayID, "method", body.Method)
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("unsupported method %s", body.Method).Error()})
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("unsupported method %s", body.Method)})

Check failure on line 174 in core/capabilities/webapi/trigger/trigger.go

View workflow job for this annotation

GitHub Actions / lint

fmt.Sprintf can be replaced with string concatenation (perfsprint)
if err != nil {
h.lggr.Errorw("error sending response", "err", err)
}
}
}

// Periodically update the gateways with the state of the workflow triggers.
func (h *triggerConnectorHandler) loop(ctx context.Context) {
defer h.wg.Done()

for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(updateGatewayIntervalS) * time.Millisecond):
err := h.updateGateways(ctx)
if err != nil {
h.lggr.Errorw("error updating gateways", "err", err)
}
}
}
}

// helper function to update gateways with the state of the workflow triggers
func (h *triggerConnectorHandler) updateGateways(ctx context.Context) error {
h.mu.Lock()
defer h.mu.Unlock()
var workflowConfigs = make(map[string]*values.Map)
for triggerID, trigger := range h.registeredWorkflows {
workflowConfigs[triggerID] = trigger.rawConfig
}
payloadJSON, err := json.Marshal(workflowConfigs)
if err != nil {
h.lggr.Errorw("error marshalling payload", "err", err)
// TODO: Should we return here instead?
payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s marshalling payload", err.Error())})
}

for gatewayID := range h.connector.GatewayIDs() {
gatewayIDStr := strconv.Itoa(gatewayID)
body := api.MessageBody{
MessageId: types.RandomID().String(),
DonId: h.connector.DonID(),
Method: ghcapabilities.MethodWebAPITriggerUpdateMetadata,
Receiver: gatewayIDStr,
Payload: payloadJSON,
}
err = h.connector.SignAndSendToGateway(ctx, gatewayIDStr, &body)
if err != nil {
h.lggr.Errorw("error sending message", "gateway", gatewayIDStr, "err", err)
}
}
return nil
}

func (h *triggerConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
cfg := req.Config
if cfg == nil {
Expand Down Expand Up @@ -234,7 +290,7 @@ func (h *triggerConnectorHandler) UnregisterTrigger(ctx context.Context, req cap
defer h.mu.Unlock()
workflow, ok := h.registeredWorkflows[req.TriggerID]
if !ok {
return fmt.Errorf("triggerId %s not registered", req.TriggerID)
return fmt.Errorf("triggerID %s not registered", req.TriggerID)
}

close(workflow.ch)
Expand All @@ -251,11 +307,14 @@ func (h *triggerConnectorHandler) Start(ctx context.Context) error {
return err
}
return h.StartOnce("GatewayConnectorServiceWrapper", func() error {
h.wg.Add(1)
go h.loop(ctx)
return h.connector.AddHandler([]string{"web_api_trigger"}, h)
})
}
func (h *triggerConnectorHandler) Close() error {
return h.StopOnce("GatewayConnectorServiceWrapper", func() error {
h.wg.Wait()
return nil
})
}
Expand All @@ -272,7 +331,7 @@ func (h *triggerConnectorHandler) sendResponse(ctx context.Context, gatewayID st
payloadJSON, err := json.Marshal(payload)
if err != nil {
h.lggr.Errorw("error marshalling payload", "err", err)
payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s marshalling payload", err.Error()).Error()})
payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s marshalling payload", err.Error())})
}

body := &api.MessageBody{
Expand Down
53 changes: 26 additions & 27 deletions core/capabilities/webapi/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/jonboulle/clockwork"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
registrymock "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks"
"github.com/smartcontractkit/chainlink-common/pkg/values"
trigger_test_utils "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/triggertestutils"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -45,26 +46,6 @@ type testHarness struct {
trigger *triggerConnectorHandler
}

func workflowTriggerConfig(_ testHarness, addresses []string, topics []string) (*values.Map, error) {
var rateLimitConfig, err = values.NewMap(map[string]any{
"GlobalRPS": 100.0,
"GlobalBurst": 101,
"PerSenderRPS": 102.0,
"PerSenderBurst": 103,
})
if err != nil {
return nil, err
}

triggerRegistrationConfig, err := values.NewMap(map[string]interface{}{
"RateLimiter": rateLimitConfig,
"AllowedSenders": addresses,
"AllowedTopics": topics,
"RequiredParams": []string{"bid", "ask"},
})
return triggerRegistrationConfig, err
}

func setup(t *testing.T) testHarness {
registry := registrymock.NewCapabilitiesRegistry(t)
connector := gcmocks.NewGatewayConnector(t)
Expand Down Expand Up @@ -151,7 +132,7 @@ func TestTriggerExecute(t *testing.T) {
th := setup(t)
ctx := testutils.Context(t)
ctx, cancelContext := context.WithDeadline(ctx, time.Now().Add(10*time.Second))
Config, _ := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update", "ad_hoc_price_update"})
_, Config, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update", "ad_hoc_price_update"})
triggerReq := capabilities.TriggerRegistrationRequest{
TriggerID: triggerID1,
Metadata: capabilities.RequestMetadata{
Expand All @@ -163,7 +144,7 @@ func TestTriggerExecute(t *testing.T) {
channel, err := th.trigger.RegisterTrigger(ctx, triggerReq)
require.NoError(t, err)

Config2, err := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update2", "ad_hoc_price_update"})
_, Config2, err := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update2", "ad_hoc_price_update"})
require.NoError(t, err)

triggerReq2 := capabilities.TriggerRegistrationRequest{
Expand Down Expand Up @@ -277,17 +258,32 @@ func TestTriggerExecute(t *testing.T) {
requireNoChanMsg(t, channel2)
})

t.Run("happy case metadata exchange sent to gateway", func(t *testing.T) {
// th.connector.On("SignAndSendToGateway", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
th.connector.On("AddHandler", mock.Anything, mock.Anything).Return(nil).Once()
th.registry.On("Add", mock.Anything, mock.Anything).Return(nil).Once()
err := th.trigger.Start(ctx)

Check failure on line 265 in core/capabilities/webapi/trigger/trigger_test.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 144 (govet)
require.NoError(t, err)

fakeClock := clockwork.NewFakeClock()
// TODO: This is not causing the timer of
// case <-time.After(time.Duration(updateGatewayIntervalS) * time.Millisecond):
// to trigger.
fakeClock.Advance(61 * time.Second)
})

err = th.trigger.UnregisterTrigger(ctx, triggerReq)
require.NoError(t, err)
err = th.trigger.UnregisterTrigger(ctx, triggerReq2)
require.NoError(t, err)
cancelContext()

}

func TestRegisterNoAllowedSenders(t *testing.T) {
th := setup(t)
ctx := testutils.Context(t)
Config, _ := workflowTriggerConfig(th, []string{}, []string{"daily_price_update"})
_, Config, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{}, []string{"daily_price_update"})

triggerReq := capabilities.TriggerRegistrationRequest{
TriggerID: triggerID1,
Expand All @@ -307,7 +303,7 @@ func TestTriggerExecute2WorkflowsSameTopicDifferentAllowLists(t *testing.T) {
th := setup(t)
ctx := testutils.Context(t)
ctx, cancelContext := context.WithDeadline(ctx, time.Now().Add(10*time.Second))
Config, _ := workflowTriggerConfig(th, []string{address2}, []string{"daily_price_update"})
_, Config, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{address2}, []string{"daily_price_update"})
triggerReq := capabilities.TriggerRegistrationRequest{
TriggerID: triggerID1,
Metadata: capabilities.RequestMetadata{
Expand All @@ -319,7 +315,7 @@ func TestTriggerExecute2WorkflowsSameTopicDifferentAllowLists(t *testing.T) {
channel, err := th.trigger.RegisterTrigger(ctx, triggerReq)
require.NoError(t, err)

Config2, err := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update"})
_, Config2, err := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update"})
require.NoError(t, err)

triggerReq2 := capabilities.TriggerRegistrationRequest{
Expand Down Expand Up @@ -363,7 +359,7 @@ func TestTriggerExecute2WorkflowsSameTopicDifferentAllowLists(t *testing.T) {
func TestRegisterUnregister(t *testing.T) {
th := setup(t)
ctx := testutils.Context(t)
Config, err := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update"})
_, Config, err := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update"})
require.NoError(t, err)

triggerReq := capabilities.TriggerRegistrationRequest{
Expand All @@ -384,3 +380,6 @@ func TestRegisterUnregister(t *testing.T) {
_, open := <-channel
require.Equal(t, open, false)
}

// TODO: add test to send gateaway message
// TODO: add test to not send gateway message before timer.
38 changes: 38 additions & 0 deletions core/capabilities/webapi/triggertestutils/trigger_test_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package triggertestutils

import (
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap"
)

func NewWorkflowTriggerConfig(addresses []string, topics []string) (webapicap.TriggerConfig, *values.Map, error) {
triggerConfig := webapicap.TriggerConfig{
AllowedSenders: addresses,
AllowedTopics: topics,
RateLimiter: webapicap.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 101,
PerSenderRPS: 102.0,
PerSenderBurst: 103,
},
RequiredParams: []string{"bid", "ask"},
}

var rateLimitConfig, err = values.NewMap(map[string]any{
"GlobalRPS": 100.0,
"GlobalBurst": 101,
"PerSenderRPS": 102.0,
"PerSenderBurst": 103,
})
if err != nil {
return triggerConfig, nil, err
}

triggerRegistrationConfig, err := values.NewMap(map[string]interface{}{
"RateLimiter": rateLimitConfig,
"AllowedSenders": addresses,
"AllowedTopics": topics,
"RequiredParams": []string{"bid", "ask"},
})
return triggerConfig, triggerRegistrationConfig, err
}
Loading

0 comments on commit e966e1e

Please sign in to comment.