-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add metadata exchange to trigger and handler
- Loading branch information
1 parent
50c1b3d
commit 5117aae
Showing
5 changed files
with
425 additions
and
38 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,9 @@ import ( | |
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
ethCommon "github.com/ethereum/go-ethereum/common" | ||
|
||
|
@@ -14,6 +16,8 @@ import ( | |
"github.com/smartcontractkit/chainlink-common/pkg/types/core" | ||
"github.com/smartcontractkit/chainlink-common/pkg/values" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/common/types" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" | ||
|
@@ -23,6 +27,7 @@ import ( | |
) | ||
|
||
const defaultSendChannelBufferSize = 1000 | ||
const updateGatewayIntervalS = 60 | ||
|
||
const TriggerType = "[email protected]" | ||
|
||
|
@@ -38,6 +43,7 @@ type webapiTrigger struct { | |
ch chan<- capabilities.TriggerResponse | ||
config webapicap.TriggerConfig | ||
rateLimiter *common.RateLimiter | ||
rawConfig *values.Map | ||
} | ||
|
||
type triggerConnectorHandler struct { | ||
|
@@ -50,6 +56,7 @@ type triggerConnectorHandler struct { | |
mu sync.Mutex | ||
registeredWorkflows map[string]webapiTrigger | ||
registry core.CapabilitiesRegistry | ||
wg sync.WaitGroup | ||
} | ||
|
||
var _ capabilities.TriggerCapability = (*triggerConnectorHandler)(nil) | ||
|
@@ -132,14 +139,14 @@ func (h *triggerConnectorHandler) processTrigger(ctx context.Context, gatewayID | |
} | ||
|
||
func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) { | ||
// TODO: Validate Signature | ||
// TODO: Validate Signature https://smartcontract-it.atlassian.net/browse/CAPPL-92 | ||
body := &msg.Body | ||
sender := ethCommon.HexToAddress(body.Sender) | ||
var payload webapicap.TriggerRequestPayload | ||
err := json.Unmarshal(body.Payload, &payload) | ||
if err != nil { | ||
h.lggr.Errorw("error decoding payload", "err", err) | ||
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s decoding payload", err.Error()).Error()}) | ||
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s decoding payload", err.Error())}) | ||
if err != nil { | ||
h.lggr.Errorw("error sending response", "err", err) | ||
} | ||
|
@@ -164,13 +171,62 @@ func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gate | |
|
||
default: | ||
h.lggr.Errorw("unsupported method", "id", gatewayID, "method", body.Method) | ||
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("unsupported method %s", body.Method).Error()}) | ||
err = h.sendResponse(ctx, gatewayID, body, ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("unsupported method %s", body.Method)}) | ||
if err != nil { | ||
h.lggr.Errorw("error sending response", "err", err) | ||
} | ||
} | ||
} | ||
|
||
// Periodically update the gateways with the state of the workflow triggers. | ||
func (h *triggerConnectorHandler) loop(ctx context.Context) { | ||
defer h.wg.Done() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-time.After(time.Duration(updateGatewayIntervalS) * time.Millisecond): | ||
err := h.updateGateways(ctx) | ||
if err != nil { | ||
h.lggr.Errorw("error updating gateways", "err", err) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// helper function to update gateways with the state of the workflow triggers | ||
func (h *triggerConnectorHandler) updateGateways(ctx context.Context) error { | ||
h.mu.Lock() | ||
defer h.mu.Unlock() | ||
var workflowConfigs = make(map[string]*values.Map) | ||
for triggerID, trigger := range h.registeredWorkflows { | ||
workflowConfigs[triggerID] = trigger.rawConfig | ||
} | ||
payloadJSON, err := json.Marshal(workflowConfigs) | ||
if err != nil { | ||
h.lggr.Errorw("error marshalling payload", "err", err) | ||
// TODO: Should we return here instead? | ||
payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s marshalling payload", err.Error())}) | ||
} | ||
|
||
for gatewayID := range h.connector.GatewayIDs() { | ||
gatewayIDStr := strconv.Itoa(gatewayID) | ||
body := api.MessageBody{ | ||
MessageId: types.RandomID().String(), | ||
DonId: h.connector.DonID(), | ||
Method: ghcapabilities.MethodWebAPITriggerUpdateMetadata, | ||
Receiver: gatewayIDStr, | ||
Payload: payloadJSON, | ||
} | ||
err = h.connector.SignAndSendToGateway(ctx, gatewayIDStr, &body) | ||
if err != nil { | ||
h.lggr.Errorw("error sending message", "gateway", gatewayIDStr, "err", err) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (h *triggerConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { | ||
cfg := req.Config | ||
if cfg == nil { | ||
|
@@ -234,7 +290,7 @@ func (h *triggerConnectorHandler) UnregisterTrigger(ctx context.Context, req cap | |
defer h.mu.Unlock() | ||
workflow, ok := h.registeredWorkflows[req.TriggerID] | ||
if !ok { | ||
return fmt.Errorf("triggerId %s not registered", req.TriggerID) | ||
return fmt.Errorf("triggerID %s not registered", req.TriggerID) | ||
} | ||
|
||
close(workflow.ch) | ||
|
@@ -251,11 +307,14 @@ func (h *triggerConnectorHandler) Start(ctx context.Context) error { | |
return err | ||
} | ||
return h.StartOnce("GatewayConnectorServiceWrapper", func() error { | ||
h.wg.Add(1) | ||
go h.loop(ctx) | ||
return h.connector.AddHandler([]string{"web_api_trigger"}, h) | ||
}) | ||
} | ||
func (h *triggerConnectorHandler) Close() error { | ||
return h.StopOnce("GatewayConnectorServiceWrapper", func() error { | ||
h.wg.Wait() | ||
return nil | ||
}) | ||
} | ||
|
@@ -272,7 +331,7 @@ func (h *triggerConnectorHandler) sendResponse(ctx context.Context, gatewayID st | |
payloadJSON, err := json.Marshal(payload) | ||
if err != nil { | ||
h.lggr.Errorw("error marshalling payload", "err", err) | ||
payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Errorf("error %s marshalling payload", err.Error()).Error()}) | ||
payloadJSON, _ = json.Marshal(ghcapabilities.TriggerResponsePayload{Status: "ERROR", ErrorMessage: fmt.Sprintf("error %s marshalling payload", err.Error())}) | ||
} | ||
|
||
body := &api.MessageBody{ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
core/capabilities/webapi/triggertestutils/trigger_test_utils.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package triggertestutils | ||
|
||
import ( | ||
"github.com/smartcontractkit/chainlink-common/pkg/values" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/webapicap" | ||
) | ||
|
||
func NewWorkflowTriggerConfig(addresses []string, topics []string) (webapicap.TriggerConfig, *values.Map, error) { | ||
triggerConfig := webapicap.TriggerConfig{ | ||
AllowedSenders: addresses, | ||
AllowedTopics: topics, | ||
RateLimiter: webapicap.RateLimiterConfig{ | ||
GlobalRPS: 100.0, | ||
GlobalBurst: 101, | ||
PerSenderRPS: 102.0, | ||
PerSenderBurst: 103, | ||
}, | ||
RequiredParams: []string{"bid", "ask"}, | ||
} | ||
|
||
var rateLimitConfig, err = values.NewMap(map[string]any{ | ||
"GlobalRPS": 100.0, | ||
"GlobalBurst": 101, | ||
"PerSenderRPS": 102.0, | ||
"PerSenderBurst": 103, | ||
}) | ||
if err != nil { | ||
return triggerConfig, nil, err | ||
} | ||
|
||
triggerRegistrationConfig, err := values.NewMap(map[string]interface{}{ | ||
"RateLimiter": rateLimitConfig, | ||
"AllowedSenders": addresses, | ||
"AllowedTopics": topics, | ||
"RequiredParams": []string{"bid", "ask"}, | ||
}) | ||
return triggerConfig, triggerRegistrationConfig, err | ||
} |
Oops, something went wrong.