Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata exchange to trigger and handler #15043

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 64 additions & 5 deletions core/capabilities/webapi/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"sync"
"time"

ethCommon "github.com/ethereum/go-ethereum/common"

Expand All @@ -14,6 +16,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"

"github.com/smartcontractkit/chainlink/v2/common/types"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
Expand All @@ -23,6 +27,7 @@ import (
)

const defaultSendChannelBufferSize = 1000
const updateGatewayIntervalS = 60

const TriggerType = "[email protected]"

Expand All @@ -38,6 +43,7 @@ type webapiTrigger struct {
ch chan<- capabilities.TriggerResponse
config webapicap.TriggerConfig
rateLimiter *common.RateLimiter
rawConfig *values.Map
Copy link
Contributor

@jinhoonbang jinhoonbang Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what data is in this rawConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the config that is in the triggers config, ie allowedSenders, allowedTopics, RateLimiter.. I wanted to make the metadata exchange "future-proof" so that changes to the config don't require rewriting the exchange logic.

}

type triggerConnectorHandler struct {
Expand All @@ -50,6 +56,7 @@ type triggerConnectorHandler struct {
mu sync.Mutex
registeredWorkflows map[string]webapiTrigger
registry core.CapabilitiesRegistry
wg sync.WaitGroup
}

var _ capabilities.TriggerCapability = (*triggerConnectorHandler)(nil)
Expand Down Expand Up @@ -132,14 +139,14 @@ func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID
}

func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
// TODO: Validate Signature
// TODO: Validate Signature https://smartcontract-it.atlassian.net/browse/CAPPL-92
body := &msg.Body
sender := ethCommon.HexToAddress(body.Sender)
var payload webapicap.TriggerRequestPayload
err := json.Unmarshal(body.Payload, &payload)
if err != nil {
h.lggr.Errorw("error decoding payload", "err", err)
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s decoding payload", err.Error()).Error()})
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s decoding payload", err.Error())})
if err != nil {
h.lggr.Errorw("error sending response", "err", err)
}
Expand All @@ -164,13 +171,62 @@ func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gate

default:
h.lggr.Errorw("unsupported method", "id", gatewayID, "method", body.Method)
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("unsupported method %s", body.Method).Error()})
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: "unsupported method " + body.Method})
if err != nil {
h.lggr.Errorw("error sending response", "err", err)
}
}
}

// Periodically update the gateways with the state of the workflow triggers.
func (h *triggerConnectorHandler) loop(ctx context.Context) {
defer h.wg.Done()

for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(updateGatewayIntervalS) * time.Millisecond):
err := h.updateGateways(ctx)
if err != nil {
h.lggr.Errorw("error updating gateways", "err", err)
}
}
}
}

// helper function to update gateways with the state of the workflow triggers
func (h *triggerConnectorHandler) updateGateways(ctx context.Context) error {
h.mu.Lock()
defer h.mu.Unlock()
var workflowConfigs = make(map[string]*values.Map)
for triggerID, trigger := range h.registeredWorkflows {
workflowConfigs[triggerID] = trigger.rawConfig
}
payloadJSON, err := json.Marshal(workflowConfigs)
if err != nil {
h.lggr.Errorw("error marshalling payload", "err", err)
// TODO: Should we return here instead?
payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s marshalling payload", err.Error())})
Comment on lines +209 to +210
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can log the error

}

for gatewayID := range h.connector.GatewayIDs() {
gatewayIDStr := strconv.Itoa(gatewayID)
body := api.MessageBody{
MessageId: types.RandomID().String(),
DonId: h.connector.DonID(),
Method: ghcapabilities.MethodWebAPITriggerUpdateMetadata,
Receiver: gatewayIDStr,
Payload: payloadJSON,
}
err = h.connector.SignAndSendToGateway(ctx, gatewayIDStr, &body)
if err != nil {
h.lggr.Errorw("error sending message", "gateway", gatewayIDStr, "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can return errors.Wrap(err, "error sending message") here and it will get logged in loop()

}
}
return nil
}

func (h *triggerConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
cfg := req.Config
if cfg == nil {
Expand Down Expand Up @@ -234,7 +290,7 @@ func (h *triggerConnectorHandler) UnregisterTrigger(ctx context.Context, req cap
defer h.mu.Unlock()
workflow, ok := h.registeredWorkflows[req.TriggerID]
if !ok {
return fmt.Errorf("triggerId %s not registered", req.TriggerID)
return fmt.Errorf("triggerID %s not registered", req.TriggerID)
}

close(workflow.ch)
Expand All @@ -251,11 +307,14 @@ func (h *triggerConnectorHandler) Start(ctx context.Context) error {
return err
}
return h.StartOnce("GatewayConnectorServiceWrapper", func() error {
h.wg.Add(1)
go h.loop(ctx)
return h.connector.AddHandler([]string{"web_api_trigger"}, h)
})
}
func (h *triggerConnectorHandler) Close() error {
return h.StopOnce("GatewayConnectorServiceWrapper", func() error {
h.wg.Wait()
return nil
})
}
Expand All @@ -272,7 +331,7 @@ func (h *triggerConnectorHandler) sendResponse(ctx context.Context, gatewayID st
payloadJSON, err := json.Marshal(payload)
if err != nil {
h.lggr.Errorw("error marshalling payload", "err", err)
payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s marshalling payload", err.Error()).Error()})
payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s marshalling payload", err.Error())})
}

body := &api.MessageBody{
Expand Down
53 changes: 26 additions & 27 deletions core/capabilities/webapi/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/jonboulle/clockwork"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
registrymock "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks"
"github.com/smartcontractkit/chainlink-common/pkg/values"
trigger_test_utils "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/triggertestutils"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -45,26 +46,6 @@ type testHarness struct {
trigger *triggerConnectorHandler
}

func workflowTriggerConfig(_ testHarness, addresses []string, topics []string) (*values.Map, error) {
var rateLimitConfig, err = values.NewMap(map[string]any{
"GlobalRPS": 100.0,
"GlobalBurst": 101,
"PerSenderRPS": 102.0,
"PerSenderBurst": 103,
})
if err != nil {
return nil, err
}

triggerRegistrationConfig, err := values.NewMap(map[string]interface{}{
"RateLimiter": rateLimitConfig,
"AllowedSenders": addresses,
"AllowedTopics": topics,
"RequiredParams": []string{"bid", "ask"},
})
return triggerRegistrationConfig, err
}

func setup(t *testing.T) testHarness {
registry := registrymock.NewCapabilitiesRegistry(t)
connector := gcmocks.NewGatewayConnector(t)
Expand Down Expand Up @@ -151,7 +132,7 @@ func TestTriggerExecute(t *testing.T) {
th := setup(t)
ctx := testutils.Context(t)
ctx, cancelContext := context.WithDeadline(ctx, time.Now().Add(10*time.Second))
Config, _ := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update", "ad_hoc_price_update"})
_, Config, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update", "ad_hoc_price_update"})
triggerReq := capabilities.TriggerRegistrationRequest{
TriggerID: triggerID1,
Metadata: capabilities.RequestMetadata{
Expand All @@ -163,7 +144,7 @@ func TestTriggerExecute(t *testing.T) {
channel, err := th.trigger.RegisterTrigger(ctx, triggerReq)
require.NoError(t, err)

Config2, err := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update2", "ad_hoc_price_update"})
_, Config2, err := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update2", "ad_hoc_price_update"})
require.NoError(t, err)

triggerReq2 := capabilities.TriggerRegistrationRequest{
Expand Down Expand Up @@ -277,17 +258,32 @@ func TestTriggerExecute(t *testing.T) {
requireNoChanMsg(t, channel2)
})

t.Run("happy case metadata exchange sent to gateway", func(t *testing.T) {
// th.connector.On("SignAndSendToGateway", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
th.connector.On("AddHandler", mock.Anything, mock.Anything).Return(nil).Once()
th.registry.On("Add", mock.Anything, mock.Anything).Return(nil).Once()
err = th.trigger.Start(ctx)
require.NoError(t, err)

fakeClock := clockwork.NewFakeClock()
// TODO: This is not causing the timer of
// case <-time.After(time.Duration(updateGatewayIntervalS) * time.Millisecond):
// to trigger.
fakeClock.Advance(61 * time.Second)
})

err = th.trigger.UnregisterTrigger(ctx, triggerReq)
require.NoError(t, err)
err = th.trigger.UnregisterTrigger(ctx, triggerReq2)
require.NoError(t, err)
cancelContext()

}

func TestRegisterNoAllowedSenders(t *testing.T) {
th := setup(t)
ctx := testutils.Context(t)
Config, _ := workflowTriggerConfig(th, []string{}, []string{"daily_price_update"})
_, Config, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{}, []string{"daily_price_update"})

triggerReq := capabilities.TriggerRegistrationRequest{
TriggerID: triggerID1,
Expand All @@ -307,7 +303,7 @@ func TestTriggerExecute2WorkflowsSameTopicDifferentAllowLists(t *testing.T) {
th := setup(t)
ctx := testutils.Context(t)
ctx, cancelContext := context.WithDeadline(ctx, time.Now().Add(10*time.Second))
Config, _ := workflowTriggerConfig(th, []string{address2}, []string{"daily_price_update"})
_, Config, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{address2}, []string{"daily_price_update"})
triggerReq := capabilities.TriggerRegistrationRequest{
TriggerID: triggerID1,
Metadata: capabilities.RequestMetadata{
Expand All @@ -319,7 +315,7 @@ func TestTriggerExecute2WorkflowsSameTopicDifferentAllowLists(t *testing.T) {
channel, err := th.trigger.RegisterTrigger(ctx, triggerReq)
require.NoError(t, err)

Config2, err := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update"})
_, Config2, err := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update"})
require.NoError(t, err)

triggerReq2 := capabilities.TriggerRegistrationRequest{
Expand Down Expand Up @@ -363,7 +359,7 @@ func TestTriggerExecute2WorkflowsSameTopicDifferentAllowLists(t *testing.T) {
func TestRegisterUnregister(t *testing.T) {
th := setup(t)
ctx := testutils.Context(t)
Config, err := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update"})
_, Config, err := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update"})
require.NoError(t, err)

triggerReq := capabilities.TriggerRegistrationRequest{
Expand All @@ -384,3 +380,6 @@ func TestRegisterUnregister(t *testing.T) {
_, open := <-channel
require.Equal(t, open, false)
}

// TODO: add test to send gateaway message
// TODO: add test to not send gateway message before timer.
38 changes: 38 additions & 0 deletions core/capabilities/webapi/triggertestutils/trigger_test_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package triggertestutils

import (
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap"
)

func NewWorkflowTriggerConfig(addresses []string, topics []string) (webapicap.TriggerConfig, *values.Map, error) {
triggerConfig := webapicap.TriggerConfig{
AllowedSenders: addresses,
AllowedTopics: topics,
RateLimiter: webapicap.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 101,
PerSenderRPS: 102.0,
PerSenderBurst: 103,
},
RequiredParams: []string{"bid", "ask"},
}

var rateLimitConfig, err = values.NewMap(map[string]any{
"GlobalRPS": 100.0,
"GlobalBurst": 101,
"PerSenderRPS": 102.0,
"PerSenderBurst": 103,
})
if err != nil {
return triggerConfig, nil, err
}

triggerRegistrationConfig, err := values.NewMap(map[string]interface{}{
"RateLimiter": rateLimitConfig,
"AllowedSenders": addresses,
"AllowedTopics": topics,
"RequiredParams": []string{"bid", "ask"},
})
return triggerConfig, triggerRegistrationConfig, err
}
Loading
Loading