From 674eac31cc161250fdadb838ba2a0fc7c796e932 Mon Sep 17 00:00:00 2001 From: Bolek <1416262+bolekk@users.noreply.github.com> Date: Thu, 29 Aug 2024 12:24:04 -0700 Subject: [PATCH] [KS-420] Add rate-limiting to Dispatcher (#14239) * [KS-420] Add rate-limiting to Dispatcher * Implements rate limiter on dispatcher * Improves TOML config * Updates toml config docs * Fixes config_capabilities.go * Adds changeset * Updates txtar files * Fixes tests + updates docs * Renames interface methods * Fixes CI * Fixes CI --------- Co-authored-by: vyzaldysanchez --- .changeset/calm-laws-begin.md | 5 ++ core/capabilities/remote/dispatcher.go | 49 ++++++++---- core/capabilities/remote/dispatcher_test.go | 80 ++++++++++++++++++- core/config/capabilities_config.go | 1 + core/config/dispatcher_config.go | 14 ++++ core/config/docs/core.toml | 16 ++++ core/config/toml/types.go | 42 ++++++++++ core/services/chainlink/application.go | 5 +- .../services/chainlink/config_capabilities.go | 40 ++++++++++ core/services/chainlink/config_test.go | 10 +++ .../testdata/config-empty-effective.toml | 10 +++ .../chainlink/testdata/config-full.toml | 10 +++ .../config-multi-chain-effective.toml | 10 +++ .../testdata/config-empty-effective.toml | 10 +++ core/web/resolver/testdata/config-full.toml | 10 +++ .../config-multi-chain-effective.toml | 10 +++ docs/CONFIG.md | 54 +++++++++++++ testdata/scripts/health/default.txtar | 2 +- testdata/scripts/node/validate/default.txtar | 10 +++ .../disk-based-logging-disabled.txtar | 10 +++ .../validate/disk-based-logging-no-dir.txtar | 10 +++ .../node/validate/disk-based-logging.txtar | 10 +++ .../node/validate/invalid-ocr-p2p.txtar | 10 +++ testdata/scripts/node/validate/invalid.txtar | 10 +++ testdata/scripts/node/validate/valid.txtar | 10 +++ testdata/scripts/node/validate/warnings.txtar | 10 +++ 26 files changed, 435 insertions(+), 23 deletions(-) create mode 100644 .changeset/calm-laws-begin.md create mode 100644 core/config/dispatcher_config.go diff --git a/.changeset/calm-laws-begin.md b/.changeset/calm-laws-begin.md new file mode 100644 index 00000000000..5549f1a966f --- /dev/null +++ b/.changeset/calm-laws-begin.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added Implements rate limiter for capabilities dispatcher diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go index bed485c286e..f27d691bb66 100644 --- a/core/capabilities/remote/dispatcher.go +++ b/core/capabilities/remote/dispatcher.go @@ -2,20 +2,22 @@ package remote import ( "context" - "errors" "fmt" - sync "sync" + "sync" "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" - remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) @@ -25,11 +27,13 @@ var ( // dispatcher en/decodes messages and routes traffic between peers and capabilities type dispatcher struct { + cfg config.Dispatcher peerWrapper p2ptypes.PeerWrapper peer p2ptypes.Peer peerID p2ptypes.PeerID signer p2ptypes.Signer registry core.CapabilitiesRegistry + rateLimiter *common.RateLimiter receivers map[key]*receiver mu sync.RWMutex stopCh services.StopChan @@ -44,17 +48,26 @@ type key struct { var _ services.Service = &dispatcher{} -const supportedVersion = 1 - -func NewDispatcher(peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry core.CapabilitiesRegistry, lggr logger.Logger) *dispatcher { +func NewDispatcher(cfg config.Dispatcher, peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry core.CapabilitiesRegistry, lggr logger.Logger) (*dispatcher, error) { + rl, err := common.NewRateLimiter(common.RateLimiterConfig{ + GlobalRPS: cfg.RateLimit().GlobalRPS(), + GlobalBurst: cfg.RateLimit().GlobalBurst(), + PerSenderRPS: cfg.RateLimit().PerSenderRPS(), + PerSenderBurst: cfg.RateLimit().PerSenderBurst(), + }) + if err != nil { + return nil, errors.Wrap(err, "failed to create rate limiter") + } return &dispatcher{ + cfg: cfg, peerWrapper: peerWrapper, signer: signer, registry: registry, + rateLimiter: rl, receivers: make(map[key]*receiver), stopCh: make(services.StopChan), lggr: lggr.Named("Dispatcher"), - } + }, nil } func (d *dispatcher) Start(ctx context.Context) error { @@ -85,14 +98,12 @@ var capReceiveChannelUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{ Help: "The usage of the receive channel for each capability, 0 indicates empty, 1 indicates full.", }, []string{"capabilityId", "donId"}) -const receiverBufferSize = 10000 - type receiver struct { cancel context.CancelFunc - ch chan *remotetypes.MessageBody + ch chan *types.MessageBody } -func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec remotetypes.Receiver) error { +func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Receiver) error { d.mu.Lock() defer d.mu.Unlock() k := key{capabilityId, donId} @@ -101,7 +112,7 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec remotety return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityId, donId) } - receiverCh := make(chan *remotetypes.MessageBody, receiverBufferSize) + receiverCh := make(chan *types.MessageBody, d.cfg.ReceiverBufferSize()) ctx, cancelCtx := d.stopCh.NewCtx() d.wg.Add(1) @@ -139,8 +150,8 @@ func (d *dispatcher) RemoveReceiver(capabilityId string, donId uint32) { } } -func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error { - msgBody.Version = supportedVersion +func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error { + msgBody.Version = uint32(d.cfg.SupportedVersion()) msgBody.Sender = d.peerID[:] msgBody.Receiver = peerID[:] msgBody.Timestamp = time.Now().UnixMilli() @@ -152,7 +163,7 @@ func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBo if err != nil { return err } - msg := &remotetypes.Message{Signature: signature, Body: rawBody} + msg := &types.Message{Signature: signature, Body: rawBody} rawMsg, err := proto.Marshal(msg) if err != nil { return err @@ -168,6 +179,10 @@ func (d *dispatcher) receive() { d.lggr.Info("stopped - exiting receive") return case msg := <-recvCh: + if !d.rateLimiter.Allow(msg.Sender.String()) { + d.lggr.Debugw("rate limit exceeded, dropping message", "sender", msg.Sender) + continue + } body, err := ValidateMessage(msg, d.peerID) if err != nil { d.lggr.Debugw("received invalid message", "error", err) @@ -184,7 +199,7 @@ func (d *dispatcher) receive() { continue } - receiverQueueUsage := float64(len(receiver.ch)) / receiverBufferSize + receiverQueueUsage := float64(len(receiver.ch)) / float64(d.cfg.ReceiverBufferSize()) capReceiveChannelUsage.WithLabelValues(k.capId, fmt.Sprint(k.donId)).Set(receiverQueueUsage) select { case receiver.ch <- body: @@ -195,7 +210,7 @@ func (d *dispatcher) receive() { } } -func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *remotetypes.MessageBody, errType types.Error) { +func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *types.MessageBody, errType types.Error) { if body == nil { return } diff --git a/core/capabilities/remote/dispatcher_test.go b/core/capabilities/remote/dispatcher_test.go index 7ea4c2e2626..50edc5f3530 100644 --- a/core/capabilities/remote/dispatcher_test.go +++ b/core/capabilities/remote/dispatcher_test.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" @@ -32,6 +33,47 @@ func (r *testReceiver) Receive(_ context.Context, msg *remotetypes.MessageBody) r.ch <- msg } +type testRateLimitConfig struct { + globalRPS float64 + globalBurst int + rps float64 + burst int +} + +func (c testRateLimitConfig) GlobalRPS() float64 { + return c.globalRPS +} + +func (c testRateLimitConfig) GlobalBurst() int { + return c.globalBurst +} + +func (c testRateLimitConfig) PerSenderRPS() float64 { + return c.rps +} + +func (c testRateLimitConfig) PerSenderBurst() int { + return c.burst +} + +type testConfig struct { + supportedVersion int + receiverBufferSize int + rateLimit testRateLimitConfig +} + +func (c testConfig) SupportedVersion() int { + return c.supportedVersion +} + +func (c testConfig) ReceiverBufferSize() int { + return c.receiverBufferSize +} + +func (c testConfig) RateLimit() config.DispatcherRateLimit { + return c.rateLimit +} + func TestDispatcher_CleanStartClose(t *testing.T) { lggr := logger.TestLogger(t) ctx := testutils.Context(t) @@ -44,7 +86,17 @@ func TestDispatcher_CleanStartClose(t *testing.T) { signer := mocks.NewSigner(t) registry := commonMocks.NewCapabilitiesRegistry(t) - dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr) + dispatcher, err := remote.NewDispatcher(testConfig{ + supportedVersion: 1, + receiverBufferSize: 10000, + rateLimit: testRateLimitConfig{ + globalRPS: 800.0, + globalBurst: 100, + rps: 10.0, + burst: 50, + }, + }, wrapper, signer, registry, lggr) + require.NoError(t, err) require.NoError(t, dispatcher.Start(ctx)) require.NoError(t, dispatcher.Close()) } @@ -65,11 +117,21 @@ func TestDispatcher_Receive(t *testing.T) { signer.On("Sign", mock.Anything).Return(nil, errors.New("not implemented")) registry := commonMocks.NewCapabilitiesRegistry(t) - dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr) + dispatcher, err := remote.NewDispatcher(testConfig{ + supportedVersion: 1, + receiverBufferSize: 10000, + rateLimit: testRateLimitConfig{ + globalRPS: 800.0, + globalBurst: 100, + rps: 10.0, + burst: 50, + }, + }, wrapper, signer, registry, lggr) + require.NoError(t, err) require.NoError(t, dispatcher.Start(ctx)) rcv := newReceiver() - err := dispatcher.SetReceiver(capId1, donId1, rcv) + err = dispatcher.SetReceiver(capId1, donId1, rcv) require.NoError(t, err) // supported capability @@ -113,7 +175,17 @@ func TestDispatcher_RespondWithError(t *testing.T) { signer.On("Sign", mock.Anything).Return([]byte{}, nil) registry := commonMocks.NewCapabilitiesRegistry(t) - dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr) + dispatcher, err := remote.NewDispatcher(testConfig{ + supportedVersion: 1, + receiverBufferSize: 10000, + rateLimit: testRateLimitConfig{ + globalRPS: 800.0, + globalBurst: 100, + rps: 10.0, + burst: 50, + }, + }, wrapper, signer, registry, lggr) + require.NoError(t, err) require.NoError(t, dispatcher.Start(ctx)) // unknown capability diff --git a/core/config/capabilities_config.go b/core/config/capabilities_config.go index ae542c062c5..8c10896d093 100644 --- a/core/config/capabilities_config.go +++ b/core/config/capabilities_config.go @@ -13,5 +13,6 @@ type CapabilitiesExternalRegistry interface { type Capabilities interface { Peering() P2P + Dispatcher() Dispatcher ExternalRegistry() CapabilitiesExternalRegistry } diff --git a/core/config/dispatcher_config.go b/core/config/dispatcher_config.go new file mode 100644 index 00000000000..ec6f13e8f4a --- /dev/null +++ b/core/config/dispatcher_config.go @@ -0,0 +1,14 @@ +package config + +type DispatcherRateLimit interface { + GlobalRPS() float64 + GlobalBurst() int + PerSenderRPS() float64 + PerSenderBurst() int +} + +type Dispatcher interface { + SupportedVersion() int + ReceiverBufferSize() int + RateLimit() DispatcherRateLimit +} diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index 3783689db38..e8524c29186 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -452,6 +452,22 @@ NetworkID = 'evm' # Default # ChainID identifies the target chain id where the remote registry is located. ChainID = '1' # Default +[Capabilities.Dispatcher] +# SupportedVersion is the version of the version of message schema. +SupportedVersion = 1 # Default +# ReceiverBufferSize is the size of the buffer for incoming messages. +ReceiverBufferSize = 10000 # Default + +[Capabilities.Dispatcher.RateLimit] +# GlobalRPS is the global rate limit for the dispatcher. +GlobalRPS = 800 # Default +# GlobalBurst is the global burst limit for the dispatcher. +GlobalBurst = 1000 # Default +# PerSenderRPS is the per-sender rate limit for the dispatcher. +PerSenderRPS = 10 # Default +# PerSenderBurst is the per-sender burst limit for the dispatcher. +PerSenderBurst = 50 # Default + [Capabilities.Peering] # IncomingMessageBufferSize is the per-remote number of incoming # messages to buffer. Any additional messages received on top of those diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 427e3f01cb5..47c8cb46b70 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1438,14 +1438,56 @@ func (r *ExternalRegistry) setFrom(f *ExternalRegistry) { } } +type Dispatcher struct { + SupportedVersion *int + ReceiverBufferSize *int + RateLimit DispatcherRateLimit +} + +func (d *Dispatcher) setFrom(f *Dispatcher) { + d.RateLimit.setFrom(&f.RateLimit) + + if f.ReceiverBufferSize != nil { + d.ReceiverBufferSize = f.ReceiverBufferSize + } + + if f.SupportedVersion != nil { + d.SupportedVersion = f.SupportedVersion + } +} + +type DispatcherRateLimit struct { + GlobalRPS *float64 + GlobalBurst *int + PerSenderRPS *float64 + PerSenderBurst *int +} + +func (drl *DispatcherRateLimit) setFrom(f *DispatcherRateLimit) { + if f.GlobalRPS != nil { + drl.GlobalRPS = f.GlobalRPS + } + if f.GlobalBurst != nil { + drl.GlobalBurst = f.GlobalBurst + } + if f.PerSenderRPS != nil { + drl.PerSenderRPS = f.PerSenderRPS + } + if f.PerSenderBurst != nil { + drl.PerSenderBurst = f.PerSenderBurst + } +} + type Capabilities struct { Peering P2P `toml:",omitempty"` + Dispatcher Dispatcher `toml:",omitempty"` ExternalRegistry ExternalRegistry `toml:",omitempty"` } func (c *Capabilities) setFrom(f *Capabilities) { c.Peering.setFrom(&f.Peering) c.ExternalRegistry.setFrom(&f.ExternalRegistry) + c.Dispatcher.setFrom(&f.Dispatcher) } type ThresholdKeyShareSecrets struct { diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 3efc92c2270..6df51045c02 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -213,7 +213,10 @@ func NewApplication(opts ApplicationOpts) (Application, error) { externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger) signer := externalPeer externalPeerWrapper = externalPeer - remoteDispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger) + remoteDispatcher, err := remote.NewDispatcher(cfg.Capabilities().Dispatcher(), externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger) + if err != nil { + return nil, fmt.Errorf("could not create dispatcher: %w", err) + } dispatcher = remoteDispatcher } else { dispatcher = opts.CapabilitiesDispatcher diff --git a/core/services/chainlink/config_capabilities.go b/core/services/chainlink/config_capabilities.go index c438ca249dd..734a5ae2701 100644 --- a/core/services/chainlink/config_capabilities.go +++ b/core/services/chainlink/config_capabilities.go @@ -23,6 +23,46 @@ func (c *capabilitiesConfig) ExternalRegistry() config.CapabilitiesExternalRegis } } +func (c *capabilitiesConfig) Dispatcher() config.Dispatcher { + return &dispatcher{d: c.c.Dispatcher} +} + +type dispatcher struct { + d toml.Dispatcher +} + +func (d *dispatcher) SupportedVersion() int { + return *d.d.SupportedVersion +} + +func (d *dispatcher) ReceiverBufferSize() int { + return *d.d.ReceiverBufferSize +} + +func (d *dispatcher) RateLimit() config.DispatcherRateLimit { + return &dispatcherRateLimit{r: d.d.RateLimit} +} + +type dispatcherRateLimit struct { + r toml.DispatcherRateLimit +} + +func (r *dispatcherRateLimit) GlobalRPS() float64 { + return *r.r.GlobalRPS +} + +func (r *dispatcherRateLimit) GlobalBurst() int { + return *r.r.GlobalBurst +} + +func (r *dispatcherRateLimit) PerSenderRPS() float64 { + return *r.r.PerSenderRPS +} + +func (r *dispatcherRateLimit) PerSenderBurst() int { + return *r.r.PerSenderBurst +} + type capabilitiesExternalRegistry struct { c toml.ExternalRegistry } diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index a151b78fede..52ad0487d52 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -452,6 +452,16 @@ func TestConfig_Marshal(t *testing.T) { ChainID: ptr("1"), NetworkID: ptr("evm"), }, + Dispatcher: toml.Dispatcher{ + SupportedVersion: ptr(1), + ReceiverBufferSize: ptr(10000), + RateLimit: toml.DispatcherRateLimit{ + GlobalRPS: ptr(800.0), + GlobalBurst: ptr(1000), + PerSenderRPS: ptr(10.0), + PerSenderBurst: ptr(50), + }, + }, } full.Keeper = toml.Keeper{ DefaultTransactionQueueDepth: ptr[uint32](17), diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index d549e4024ed..6050401634a 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -253,6 +253,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index fb43bed841b..77f60ef5503 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -263,6 +263,16 @@ DeltaDial = '1m0s' DeltaReconcile = '2s' ListenAddresses = ['foo', 'bar'] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index ffd4903bcd1..e7d02446066 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -253,6 +253,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index d549e4024ed..6050401634a 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -253,6 +253,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index ba994650c61..426caf39eb9 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -263,6 +263,16 @@ DeltaDial = '1m0s' DeltaReconcile = '2s' ListenAddresses = ['foo', 'bar'] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 8f8121613b2..658d364d109 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -253,6 +253,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 03908e08ed9..d10da596e71 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1240,6 +1240,60 @@ ChainID = '1' # Default ``` ChainID identifies the target chain id where the remote registry is located. +## Capabilities.Dispatcher +```toml +[Capabilities.Dispatcher] +SupportedVersion = 1 # Default +ReceiverBufferSize = 10000 # Default +``` + + +### SupportedVersion +```toml +SupportedVersion = 1 # Default +``` +SupportedVersion is the version of the version of message schema. + +### ReceiverBufferSize +```toml +ReceiverBufferSize = 10000 # Default +``` +ReceiverBufferSize is the size of the buffer for incoming messages. + +## Capabilities.Dispatcher.RateLimit +```toml +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800 # Default +GlobalBurst = 1000 # Default +PerSenderRPS = 10 # Default +PerSenderBurst = 50 # Default +``` + + +### GlobalRPS +```toml +GlobalRPS = 800 # Default +``` +GlobalRPS is the global rate limit for the dispatcher. + +### GlobalBurst +```toml +GlobalBurst = 1000 # Default +``` +GlobalBurst is the global burst limit for the dispatcher. + +### PerSenderRPS +```toml +PerSenderRPS = 10 # Default +``` +PerSenderRPS is the per-sender rate limit for the dispatcher. + +### PerSenderBurst +```toml +PerSenderBurst = 50 # Default +``` +PerSenderBurst is the per-sender burst limit for the dispatcher. + ## Capabilities.Peering ```toml [Capabilities.Peering] diff --git a/testdata/scripts/health/default.txtar b/testdata/scripts/health/default.txtar index 777d3e5e126..8480345e273 100644 --- a/testdata/scripts/health/default.txtar +++ b/testdata/scripts/health/default.txtar @@ -126,4 +126,4 @@ ok TelemetryManager } } ] -} +} \ No newline at end of file diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 114bb9f29a8..cca4f4ef5e0 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -265,6 +265,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index c95144dbf75..26544e58016 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -309,6 +309,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index 0cdeff36064..17d86ef41e1 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -309,6 +309,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index 60059e2ab8a..e8eb6b93d00 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -309,6 +309,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 14a8449ee13..3fe384e4877 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -294,6 +294,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 70a6cde2437..4bfa96d3029 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -299,6 +299,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index ca2f10fb88e..cea4c1b4f83 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -306,6 +306,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm' diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index b6ebc1dc125..ed727815b30 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -288,6 +288,16 @@ DeltaDial = '15s' DeltaReconcile = '1m0s' ListenAddresses = [] +[Capabilities.Dispatcher] +SupportedVersion = 1 +ReceiverBufferSize = 10000 + +[Capabilities.Dispatcher.RateLimit] +GlobalRPS = 800.0 +GlobalBurst = 1000 +PerSenderRPS = 10.0 +PerSenderBurst = 50 + [Capabilities.ExternalRegistry] Address = '' NetworkID = 'evm'