diff --git a/core/capabilities/webapi/trigger/trigger.go b/core/capabilities/webapi/trigger/trigger.go index a08d2f577ff..9cb62071d84 100644 --- a/core/capabilities/webapi/trigger/trigger.go +++ b/core/capabilities/webapi/trigger/trigger.go @@ -5,7 +5,9 @@ import ( "encoding/json" "errors" "fmt" + "strconv" "sync" + "time" ethCommon "github.com/ethereum/go-ethereum/common" @@ -14,6 +16,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/common/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" @@ -23,6 +27,7 @@ import ( ) const defaultSendChannelBufferSize = 1000 +const updateGatewayIntervalS = 60 const TriggerType = "web-api-trigger@1.0.0" @@ -38,6 +43,7 @@ type webapiTrigger struct { ch chan<- capabilities.TriggerResponse config webapicap.TriggerConfig rateLimiter *common.RateLimiter + rawConfig *values.Map } type triggerConnectorHandler struct { @@ -50,6 +56,7 @@ type triggerConnectorHandler struct { mu sync.Mutex registeredWorkflows map[string]webapiTrigger registry core.CapabilitiesRegistry + wg sync.WaitGroup } var _ capabilities.TriggerCapability = (*triggerConnectorHandler)(nil) @@ -132,14 +139,14 @@ func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID } func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) { - // TODO: Validate Signature + // TODO: Validate Signature https://smartcontract-it.atlassian.net/browse/CAPPL-92 body := &msg.Body sender := ethCommon.HexToAddress(body.Sender) var payload webapicap.TriggerRequestPayload err := json.Unmarshal(body.Payload, &payload) if err != nil { h.lggr.Errorw("error decoding payload", "err", err) - err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s decoding payload", err.Error()).Error()}) + err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s decoding payload", err.Error())}) if err != nil { h.lggr.Errorw("error sending response", "err", err) } @@ -164,13 +171,62 @@ func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gate default: h.lggr.Errorw("unsupported method", "id", gatewayID, "method", body.Method) - err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("unsupported method %s", body.Method).Error()}) + err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("unsupported method %s", body.Method)}) if err != nil { h.lggr.Errorw("error sending response", "err", err) } } } +// Periodically update the gateways with the state of the workflow triggers. +func (h *triggerConnectorHandler) loop(ctx context.Context) { + defer h.wg.Done() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(updateGatewayIntervalS) * time.Millisecond): + err := h.updateGateways(ctx) + if err != nil { + h.lggr.Errorw("error updating gateways", "err", err) + } + } + } +} + +// helper function to update gateways with the state of the workflow triggers +func (h *triggerConnectorHandler) updateGateways(ctx context.Context) error { + h.mu.Lock() + defer h.mu.Unlock() + var workflowConfigs = make(map[string]*values.Map) + for triggerID, trigger := range h.registeredWorkflows { + workflowConfigs[triggerID] = trigger.rawConfig + } + payloadJSON, err := json.Marshal(workflowConfigs) + if err != nil { + h.lggr.Errorw("error marshalling payload", "err", err) + // TODO: Should we return here instead? + payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s marshalling payload", err.Error())}) + } + + for gatewayID := range h.connector.GatewayIDs() { + gatewayIDStr := strconv.Itoa(gatewayID) + body := api.MessageBody{ + MessageId: types.RandomID().String(), + DonId: h.connector.DonID(), + Method: ghcapabilities.MethodWebAPITriggerUpdateMetadata, + Receiver: gatewayIDStr, + Payload: payloadJSON, + } + err = h.connector.SignAndSendToGateway(ctx, gatewayIDStr, &body) + if err != nil { + h.lggr.Errorw("error sending message", "gateway", gatewayIDStr, "err", err) + } + } + return nil +} + func (h *triggerConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { cfg := req.Config if cfg == nil { @@ -234,7 +290,7 @@ func (h *triggerConnectorHandler) UnregisterTrigger(ctx context.Context, req cap defer h.mu.Unlock() workflow, ok := h.registeredWorkflows[req.TriggerID] if !ok { - return fmt.Errorf("triggerId %s not registered", req.TriggerID) + return fmt.Errorf("triggerID %s not registered", req.TriggerID) } close(workflow.ch) @@ -251,11 +307,14 @@ func (h *triggerConnectorHandler) Start(ctx context.Context) error { return err } return h.StartOnce("GatewayConnectorServiceWrapper", func() error { + h.wg.Add(1) + go h.loop(ctx) return h.connector.AddHandler([]string{"web_api_trigger"}, h) }) } func (h *triggerConnectorHandler) Close() error { return h.StopOnce("GatewayConnectorServiceWrapper", func() error { + h.wg.Wait() return nil }) } @@ -272,7 +331,7 @@ func (h *triggerConnectorHandler) sendResponse(ctx context.Context, gatewayID st payloadJSON, err := json.Marshal(payload) if err != nil { h.lggr.Errorw("error marshalling payload", "err", err) - payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s marshalling payload", err.Error()).Error()}) + payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s marshalling payload", err.Error())}) } body := &api.MessageBody{ diff --git a/core/capabilities/webapi/trigger/trigger_test.go b/core/capabilities/webapi/trigger/trigger_test.go index 0c73e31fe62..b4a78c7c826 100644 --- a/core/capabilities/webapi/trigger/trigger_test.go +++ b/core/capabilities/webapi/trigger/trigger_test.go @@ -15,7 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/logger" registrymock "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" - "github.com/smartcontractkit/chainlink-common/pkg/values" + trigger_test_utils "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/triggertestutils" "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -45,26 +45,6 @@ type testHarness struct { trigger *triggerConnectorHandler } -func workflowTriggerConfig(_ testHarness, addresses []string, topics []string) (*values.Map, error) { - var rateLimitConfig, err = values.NewMap(map[string]any{ - "GlobalRPS": 100.0, - "GlobalBurst": 101, - "PerSenderRPS": 102.0, - "PerSenderBurst": 103, - }) - if err != nil { - return nil, err - } - - triggerRegistrationConfig, err := values.NewMap(map[string]interface{}{ - "RateLimiter": rateLimitConfig, - "AllowedSenders": addresses, - "AllowedTopics": topics, - "RequiredParams": []string{"bid", "ask"}, - }) - return triggerRegistrationConfig, err -} - func setup(t *testing.T) testHarness { registry := registrymock.NewCapabilitiesRegistry(t) connector := gcmocks.NewGatewayConnector(t) @@ -151,7 +131,7 @@ func TestTriggerExecute(t *testing.T) { th := setup(t) ctx := testutils.Context(t) ctx, cancelContext := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) - Config, _ := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update", "ad_hoc_price_update"}) + _, Config, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update", "ad_hoc_price_update"}) triggerReq := capabilities.TriggerRegistrationRequest{ TriggerID: triggerID1, Metadata: capabilities.RequestMetadata{ @@ -163,7 +143,7 @@ func TestTriggerExecute(t *testing.T) { channel, err := th.trigger.RegisterTrigger(ctx, triggerReq) require.NoError(t, err) - Config2, err := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update2", "ad_hoc_price_update"}) + _, Config2, err := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update2", "ad_hoc_price_update"}) require.NoError(t, err) triggerReq2 := capabilities.TriggerRegistrationRequest{ @@ -287,7 +267,7 @@ func TestTriggerExecute(t *testing.T) { func TestRegisterNoAllowedSenders(t *testing.T) { th := setup(t) ctx := testutils.Context(t) - Config, _ := workflowTriggerConfig(th, []string{}, []string{"daily_price_update"}) + _, Config, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{}, []string{"daily_price_update"}) triggerReq := capabilities.TriggerRegistrationRequest{ TriggerID: triggerID1, @@ -307,7 +287,7 @@ func TestTriggerExecute2WorkflowsSameTopicDifferentAllowLists(t *testing.T) { th := setup(t) ctx := testutils.Context(t) ctx, cancelContext := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) - Config, _ := workflowTriggerConfig(th, []string{address2}, []string{"daily_price_update"}) + _, Config, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{address2}, []string{"daily_price_update"}) triggerReq := capabilities.TriggerRegistrationRequest{ TriggerID: triggerID1, Metadata: capabilities.RequestMetadata{ @@ -319,7 +299,7 @@ func TestTriggerExecute2WorkflowsSameTopicDifferentAllowLists(t *testing.T) { channel, err := th.trigger.RegisterTrigger(ctx, triggerReq) require.NoError(t, err) - Config2, err := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update"}) + _, Config2, err := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update"}) require.NoError(t, err) triggerReq2 := capabilities.TriggerRegistrationRequest{ @@ -363,7 +343,7 @@ func TestTriggerExecute2WorkflowsSameTopicDifferentAllowLists(t *testing.T) { func TestRegisterUnregister(t *testing.T) { th := setup(t) ctx := testutils.Context(t) - Config, err := workflowTriggerConfig(th, []string{address1}, []string{"daily_price_update"}) + _, Config, err := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update"}) require.NoError(t, err) triggerReq := capabilities.TriggerRegistrationRequest{ diff --git a/core/capabilities/webapi/triggertestutils/trigger_test_utils.go b/core/capabilities/webapi/triggertestutils/trigger_test_utils.go new file mode 100644 index 00000000000..43acd2cd60b --- /dev/null +++ b/core/capabilities/webapi/triggertestutils/trigger_test_utils.go @@ -0,0 +1,38 @@ +package triggertestutils + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap" +) + +func NewWorkflowTriggerConfig(addresses []string, topics []string) (webapicap.TriggerConfig, *values.Map, error) { + triggerConfig := webapicap.TriggerConfig{ + AllowedSenders: addresses, + AllowedTopics: topics, + RateLimiter: webapicap.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 101, + PerSenderRPS: 102.0, + PerSenderBurst: 103, + }, + RequiredParams: []string{"bid", "ask"}, + } + + var rateLimitConfig, err = values.NewMap(map[string]any{ + "GlobalRPS": 100.0, + "GlobalBurst": 101, + "PerSenderRPS": 102.0, + "PerSenderBurst": 103, + }) + if err != nil { + return triggerConfig, nil, err + } + + triggerRegistrationConfig, err := values.NewMap(map[string]interface{}{ + "RateLimiter": rateLimitConfig, + "AllowedSenders": addresses, + "AllowedTopics": topics, + "RequiredParams": []string{"bid", "ask"}, + }) + return triggerConfig, triggerRegistrationConfig, err +} diff --git a/core/services/gateway/handlers/capabilities/handler.go b/core/services/gateway/handlers/capabilities/handler.go index 904a64c8896..9bc5105b717 100644 --- a/core/services/gateway/handlers/capabilities/handler.go +++ b/core/services/gateway/handlers/capabilities/handler.go @@ -2,14 +2,23 @@ package capabilities import ( "context" + "crypto/sha256" + "encoding/base64" + "encoding/hex" "encoding/json" "fmt" + "sort" "sync" "time" "go.uber.org/multierr" + "google.golang.org/protobuf/proto" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink-common/pkg/values/pb" "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" @@ -19,13 +28,26 @@ import ( ) const ( - // NOTE: more methods will go here. HTTP trigger/action/target; etc. - MethodWebAPITarget = "web_api_target" - MethodWebAPITrigger = "web_api_trigger" - MethodComputeAction = "compute_action" + MethodComputeAction = "compute_action" + MethodWebAPITarget = "web_api_target" + MethodWebAPITrigger = "web_api_trigger" + MethodWebAPITriggerUpdateMetadata = "web_api_trigger_update_metadata" ) +type TriggersConfig struct { + lastUpdatedAt time.Time + + // map of trigger ID to trigger config + triggersConfig map[string]webapicap.TriggerConfig +} + +type AllNodesTriggersConfig struct { + // map of node address to node's triggerConfig + triggersConfigMap map[string]TriggersConfig +} + type handler struct { + capabilities.Validator[webapicap.TriggerConfig, struct{}, capabilities.TriggerResponse] config HandlerConfig don handlers.DON donConfig *config.DONConfig @@ -35,9 +57,15 @@ type handler struct { httpClient network.HTTPClient nodeRateLimiter *common.RateLimiter wg sync.WaitGroup + // each gateway node has a map of trigger IDs to trigger configs + triggersConfig AllNodesTriggersConfig + // One of the nodes' trigger configs that is part of consensus + consensusConfig TriggersConfig } type HandlerConfig struct { + // Is this part of standard capability so doesn't need to be defined here? + F uint `json:"F"` NodeRateLimiter common.RateLimiterConfig `json:"nodeRateLimiter"` MaxAllowedMessageAgeSec uint `json:"maxAllowedMessageAgeSec"` } @@ -53,6 +81,7 @@ func NewHandler(handlerConfig json.RawMessage, donConfig *config.DONConfig, don var cfg HandlerConfig err := json.Unmarshal(handlerConfig, &cfg) if err != nil { + lggr.Errorf("error unmarshalling config: %s, err: %s", string(handlerConfig), err.Error()) return nil, err } nodeRateLimiter, err := common.NewRateLimiter(cfg.NodeRateLimiter) @@ -61,7 +90,9 @@ func NewHandler(handlerConfig json.RawMessage, donConfig *config.DONConfig, don } return &handler{ + Validator: capabilities.NewValidator[webapicap.TriggerConfig, struct{}, capabilities.TriggerResponse](capabilities.ValidatorArgs{}), config: cfg, + consensusConfig: TriggersConfig{triggersConfig: make(map[string]webapicap.TriggerConfig)}, don: don, donConfig: donConfig, lggr: lggr.Named("WebAPIHandler." + donConfig.DonId), @@ -69,6 +100,9 @@ func NewHandler(handlerConfig json.RawMessage, donConfig *config.DONConfig, don nodeRateLimiter: nodeRateLimiter, wg: sync.WaitGroup{}, savedCallbacks: make(map[string]*savedCallback), + triggersConfig: AllNodesTriggersConfig{ + triggersConfigMap: make(map[string]TriggersConfig), + }, }, nil } @@ -110,7 +144,7 @@ func (h *handler) handleWebAPITriggerMessage(ctx context.Context, msg *api.Messa if found { // Send first response from a node back to the user, ignore any other ones. // TODO: in practice, we should wait for at least 2F+1 nodes to respond and then return an aggregated response - // back to the user. + // back to the user. https://smartcontract-it.atlassian.net/browse/CAPPL-237 savedCb.callbackCh <- handlers.UserCallbackPayload{Msg: msg, ErrCode: api.NoError, ErrMsg: ""} close(savedCb.callbackCh) } @@ -182,12 +216,137 @@ func (h *handler) handleWebAPIOutgoingMessage(ctx context.Context, msg *api.Mess return nil } +// body := api.MessageBody{ +// MessageId: types.RandomID().String(), +// DonId: h.connector.DonID(), +// Method: webapicapabilities.MethodWebAPITriggerUpdateMetadata, +// Receiver: gatewayIDStr, +// Payload: payloadJSON, +// } +func (h *handler) handleWebAPITriggerUpdateMetadata(ctx context.Context, msg *api.Message, nodeAddr string) error { + body := msg.Body + h.lggr.Debugw("handleWebAPITriggerUpdateMetadata", "body", body, "payload", string(body.Payload)) + + var payload map[string]string + err := json.Unmarshal(body.Payload, &payload) + donID := body.DonId + if err != nil { + h.lggr.Errorw("error decoding payload", "err", err, "payload", string(body.Payload)) + return err + } + triggersConfig := make(map[string]webapicap.TriggerConfig) + for triggerID, configPbString := range payload { + pbBytes, err := base64.StdEncoding.DecodeString(configPbString) + if err != nil { + h.lggr.Errorw("error decoding pb bytes", "err", err) + return err + } + pb := &pb.Map{} + err = proto.Unmarshal(pbBytes, pb) + if err != nil { + h.lggr.Errorw("error unmarshalling", "err", err) + return err + } + vmap, err := values.FromMapValueProto(pb) + if err != nil { + h.lggr.Errorw("error FromMapValueProto", "err", err) + return err + } + if vmap == nil { + h.lggr.Errorw("error FromMapValueProto nil vmap") + return err + } + reqConfig, err := h.ValidateConfig(vmap) + if err != nil { + h.lggr.Errorw("error validating config", "err", err) + return err + } + triggersConfig[triggerID] = *reqConfig + } + h.triggersConfig.triggersConfigMap[donID] = TriggersConfig{lastUpdatedAt: time.Now(), triggersConfig: triggersConfig} + return nil +} + +type triggerStruct struct { + config webapicap.TriggerConfig + count uint +} + +func (h *handler) updateTriggerConsensus() { + // calculate the mode of the trigger configs to determine consensus + + // 1. Make list of node configs that haven't expired + triggers := h.triggersConfig.triggersConfigMap + + // 2. Make list of hashes of each node's config. + // Update, make a list of hashes of each node's triggerId and config. + // 3. Count the number of times each hash appears. + + // triggerId => triggerConfig hash => triggerStruct + triggersHashmap := make(map[string]map[string]triggerStruct) + + // We need to know which triggerConfig has the most votes for each triggerId. + // We need to know the count of each triggerConfig for each triggerId. + // Need a structure that is for each triggerId, there is a set of triggerConfigs, their hashes and the count of each hash. + // convert map to list of triggerConfigs to triggerConfig + + // Performance is a concern as this is O(nodes) * O(triggers) + + for _, triggerConfig := range triggers { + for triggerID, triggerConfig := range triggerConfig.triggersConfig { + _, isTriggerInHashMap := triggersHashmap[triggerID] + if !isTriggerInHashMap { + triggersHashmap[triggerID] = make(map[string]triggerStruct) + } + s := fmt.Sprintf("%v", triggerConfig) + h := sha256.New() + h.Write([]byte(s)) + hash := hex.EncodeToString(h.Sum(nil)) + t, isHashInHashMap := triggersHashmap[triggerID][hash] + if !isHashInHashMap { + triggersHashmap[triggerID][hash] = triggerStruct{config: triggerConfig, count: 1} + } else { + t.count++ + triggersHashmap[triggerID][hash] = t + } + } + } + + type kv struct { + Hash string + Value uint + } + // 4. Find the hash that appears the most. + // https://stackoverflow.com/questions/18695346/how-can-i-sort-a-mapstringint-by-its-values + + for triggerID, triggersHashmapByHash := range triggersHashmap { + var ss []kv + for triggerConfigHash, v := range triggersHashmapByHash { + ss = append(ss, kv{triggerConfigHash, v.count}) + } + + sort.Slice(ss, func(i, j int) bool { + return ss[i].Value > ss[j].Value + }) + // 5. Check if the hash that appears the most appears more than F times. + if ss[0].Value < h.config.F+1 { + // 6. If it doesn't, do nothing + return + } + // 7. If it does, update the consensus config to that config. + // TODO: how do stale triggerIds get cleaned up? https://smartcontract-it.atlassian.net/browse/CAPPL-236 + h.consensusConfig.triggersConfig[triggerID] = triggersHashmapByHash[ss[0].Hash].config + h.consensusConfig.lastUpdatedAt = time.Now() + } +} func (h *handler) HandleNodeMessage(ctx context.Context, msg *api.Message, nodeAddr string) error { switch msg.Body.Method { case MethodWebAPITrigger: return h.handleWebAPITriggerMessage(ctx, msg, nodeAddr) case MethodWebAPITarget, MethodComputeAction: return h.handleWebAPIOutgoingMessage(ctx, msg, nodeAddr) + case MethodWebAPITriggerUpdateMetadata: + return h.handleWebAPITriggerUpdateMetadata(ctx, msg, nodeAddr) default: return fmt.Errorf("unsupported method: %s", msg.Body.Method) } @@ -229,7 +388,7 @@ func (h *handler) HandleUserMessage(ctx context.Context, msg *api.Message, callb close(callbackCh) return nil } - // TODO: apply allowlist and rate-limiting here + // TODO: apply allowlist and rate-limiting here https://smartcontract-it.atlassian.net/browse/CAPPL-238 if msg.Body.Method != MethodWebAPITrigger { h.lggr.Errorw("unsupported method", "method", body.Method) callbackCh <- handlers.UserCallbackPayload{Msg: msg, ErrCode: api.HandlerError, ErrMsg: fmt.Sprintf("invalid method %s", msg.Body.Method)} diff --git a/core/services/gateway/handlers/capabilities/handler_test.go b/core/services/gateway/handlers/capabilities/handler_test.go index eb5d883ac14..222b383e231 100644 --- a/core/services/gateway/handlers/capabilities/handler_test.go +++ b/core/services/gateway/handlers/capabilities/handler_test.go @@ -1,6 +1,7 @@ package capabilities import ( + "encoding/base64" "encoding/json" "errors" "fmt" @@ -8,12 +9,17 @@ import ( "testing" "time" + "google.golang.org/protobuf/proto" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/ethereum/go-ethereum/crypto" + trigger_test_utils "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/triggertestutils" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -50,6 +56,7 @@ func setupHandler(t *testing.T) (*handler, *mocks.HTTPClient, *handlermocks.DON, PerSenderBurst: 100, } handlerConfig := HandlerConfig{ + F: 1, NodeRateLimiter: nodeRateLimiterConfig, MaxAllowedMessageAgeSec: 30, } @@ -427,3 +434,125 @@ func TestHandleComputeActionMessage(t *testing.T) { }, tests.WaitTimeout(t), 100*time.Millisecond) }) } + +// function to convert map of triggerIds to triggerConfigs (which is values.Map) to byte array +func convertValuesMapToBytes(valuesMap map[string]*values.Map) []byte { + var workflowConfigsPB = make(map[string]string) + for key, triggerConfig := range valuesMap { + configProtoMap := values.ProtoMap(triggerConfig) + configProtoBytes, _ := proto.Marshal(configProtoMap) + encoded := base64.StdEncoding.EncodeToString(configProtoBytes) + workflowConfigsPB[key] = encoded + } + + cfgBytes, _ := json.Marshal(workflowConfigsPB) + return cfgBytes +} + +// Test that a MethodWebAPITriggerUpdateMetadata message updates gateways metadata for the given workflow node +func TestHandlerRecieveMetadataMessageFromWorkflowNode(t *testing.T) { + handler, _, _, nodes := setupHandler(t) + nodeAddr := nodes[0].Address + ctx := testutils.Context(t) + + // ctx, cancelContext := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) + triggerConfig, configMap, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update", "ad_hoc_price_update"}) + triggerConfig2, configMap2, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update", "ad_hoc_price_update"}) + triggerConfig3, configMap3, _ := trigger_test_utils.NewWorkflowTriggerConfig([]string{address1}, []string{"daily_price_update"}) + + var workflowConfigs = make(map[string]*values.Map) + workflowConfigs["foo"] = configMap + workflowConfigs["bar"] = configMap2 + triggerConfigs := make(map[string]webapicap.TriggerConfig) + triggerConfigs["foo"] = triggerConfig + triggerConfigs["bar"] = triggerConfig2 + cfgBytes := convertValuesMapToBytes(workflowConfigs) + + // for update tests. + var workflowConfigs3 = make(map[string]*values.Map) + workflowConfigs3["foo"] = configMap + workflowConfigs3["bar"] = configMap3 + triggerConfigs3 := make(map[string]webapicap.TriggerConfig) + triggerConfigs3["foo"] = triggerConfig + triggerConfigs3["bar"] = triggerConfig3 + cfgBytes3 := convertValuesMapToBytes(workflowConfigs3) + + msg := &api.Message{ + Body: api.MessageBody{ + MessageId: "123", + Method: MethodWebAPITriggerUpdateMetadata, + DonId: "testDonId", + Payload: cfgBytes, + }, + } + + msg3 := &api.Message{ + Body: api.MessageBody{ + MessageId: "123", + Method: MethodWebAPITriggerUpdateMetadata, + DonId: "testDonId2", + Payload: cfgBytes3, + }, + } + + err := handler.HandleNodeMessage(ctx, msg, nodeAddr) + require.NoError(t, err) + require.NotEmpty(t, handler.triggersConfig.triggersConfigMap["testDonId"]) + require.NotEmpty(t, handler.triggersConfig.triggersConfigMap["testDonId"].lastUpdatedAt) + require.Equal(t, triggerConfigs, handler.triggersConfig.triggersConfigMap["testDonId"].triggersConfig) + require.Empty(t, handler.consensusConfig.triggersConfig) + + t.Run("happy case 2 nodes agree", func(t *testing.T) { + msg2 := &api.Message{ + Body: api.MessageBody{ + MessageId: "123", + Method: MethodWebAPITriggerUpdateMetadata, + DonId: "testDonId2", + Payload: cfgBytes, + }, + } + err = handler.HandleNodeMessage(ctx, msg2, nodeAddr) + require.NoError(t, err) + handler.updateTriggerConsensus() + require.NotEmpty(t, handler.triggersConfig.triggersConfigMap["testDonId2"]) + require.NotEmpty(t, handler.triggersConfig.triggersConfigMap["testDonId2"].lastUpdatedAt) + require.Equal(t, triggerConfigs, handler.triggersConfig.triggersConfigMap["testDonId2"].triggersConfig) + require.Equal(t, handler.triggersConfig.triggersConfigMap["testDonId"].triggersConfig, handler.consensusConfig.triggersConfig) + }) + t.Run("happy case 2 nodes agree, update 1 node doesn't change consensus", func(t *testing.T) { + err = handler.HandleNodeMessage(ctx, msg3, nodeAddr) + require.NoError(t, err) + handler.updateTriggerConsensus() + require.NotEmpty(t, handler.triggersConfig.triggersConfigMap["testDonId2"]) + require.NotEmpty(t, handler.triggersConfig.triggersConfigMap["testDonId2"].lastUpdatedAt) + require.Equal(t, triggerConfigs, handler.triggersConfig.triggersConfigMap["testDonId"].triggersConfig) + require.Equal(t, triggerConfigs3, handler.triggersConfig.triggersConfigMap["testDonId2"].triggersConfig) + // consensus should not change + require.Equal(t, handler.triggersConfig.triggersConfigMap["testDonId"].triggersConfig, handler.consensusConfig.triggersConfig) + }) + + t.Run("happy case 2 nodes agree, update both nodes changes consensus", func(t *testing.T) { + msg4 := &api.Message{ + Body: api.MessageBody{ + MessageId: "123", + Method: MethodWebAPITriggerUpdateMetadata, + DonId: "testDonId", + Payload: cfgBytes3, + }, + } + err = handler.HandleNodeMessage(ctx, msg3, nodeAddr) + require.NoError(t, err) + err = handler.HandleNodeMessage(ctx, msg4, nodeAddr) + require.NoError(t, err) + handler.updateTriggerConsensus() + require.NotEmpty(t, handler.triggersConfig.triggersConfigMap["testDonId"]) + require.NotEmpty(t, handler.triggersConfig.triggersConfigMap["testDonId"].lastUpdatedAt) + require.Equal(t, triggerConfigs3, handler.triggersConfig.triggersConfigMap["testDonId"].triggersConfig) + require.Equal(t, triggerConfigs3, handler.triggersConfig.triggersConfigMap["testDonId2"].triggersConfig) + // consensus should change + require.Equal(t, handler.triggersConfig.triggersConfigMap["testDonId"].triggersConfig, handler.consensusConfig.triggersConfig) + }) +} + +// Other test cases: +// two nodes updated with equal different value but out of time window so no change.