Skip to content

Commit

Permalink
feature complete.
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Oct 11, 2024
1 parent 7c9eeaa commit b39dd76
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 16 deletions.
23 changes: 13 additions & 10 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
)

const (
SEP = "&"
MaximumNumberOfParallelWebsocketConnectionsPerIp = 5
SEP = "&"
)

var MaximumNumberOfParallelWebsocketConnectionsPerIp int64 = 0

type JsonRPCChainParser struct {
BaseChainParser
}
Expand Down Expand Up @@ -306,11 +307,11 @@ func (apip *JsonRPCChainParser) ChainBlockStats() (allowedBlockLagForQosSync int

// Will limit a certain amount of connections per IP
type WebsocketConnectionLimiter struct {
ipToNumberOfActiveConnections map[string]uint64
ipToNumberOfActiveConnections map[string]int64
lock sync.RWMutex
}

func (wcl *WebsocketConnectionLimiter) addIpConnectionAndGetCurrentAmount(ip string) uint64 {
func (wcl *WebsocketConnectionLimiter) addIpConnectionAndGetCurrentAmount(ip string) int64 {
wcl.lock.Lock()
defer wcl.lock.Unlock()
// wether it exists or not we add 1.
Expand Down Expand Up @@ -381,12 +382,14 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con
apiInterface := apil.endpoint.ApiInterface

webSocketCallback := websocket.New(func(websocketConn *websocket.Conn) {
ip := websocketConn.RemoteAddr().String()
numberOfActiveConnections := apil.websocketConnectionLimiter.addIpConnectionAndGetCurrentAmount(ip)
defer apil.websocketConnectionLimiter.decreaseIpConnectionAndGetCurrentAmount(ip)
if numberOfActiveConnections > MaximumNumberOfParallelWebsocketConnectionsPerIp {
websocketConn.WriteMessage(1, []byte(fmt.Sprintf("Too Many Open Connections, limited to %d", MaximumNumberOfParallelWebsocketConnectionsPerIp)))
return
if MaximumNumberOfParallelWebsocketConnectionsPerIp > 0 { // 0 is disabled.
ip := websocketConn.RemoteAddr().String()
numberOfActiveConnections := apil.websocketConnectionLimiter.addIpConnectionAndGetCurrentAmount(ip)
defer apil.websocketConnectionLimiter.decreaseIpConnectionAndGetCurrentAmount(ip)
if numberOfActiveConnections > MaximumNumberOfParallelWebsocketConnectionsPerIp {
websocketConn.WriteMessage(1, []byte(fmt.Sprintf("Too Many Open Connections, limited to %d", MaximumNumberOfParallelWebsocketConnectionsPerIp)))
return
}
}

utils.LavaFormatDebug("jsonrpc websocket opened", utils.LogAttr("consumerIp", websocketConn.LocalAddr().String()))
Expand Down
14 changes: 8 additions & 6 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,14 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context, cmdFlags comm
return fiber.ErrUpgradeRequired
})
webSocketCallback := websocket.New(func(websocketConn *websocket.Conn) {
ip := websocketConn.RemoteAddr().String()
numberOfActiveConnections := apil.websocketConnectionLimiter.addIpConnectionAndGetCurrentAmount(ip)
defer apil.websocketConnectionLimiter.decreaseIpConnectionAndGetCurrentAmount(ip)
if numberOfActiveConnections > MaximumNumberOfParallelWebsocketConnectionsPerIp {
websocketConn.WriteMessage(1, []byte(fmt.Sprintf("Too Many Open Connections, limited to %d", MaximumNumberOfParallelWebsocketConnectionsPerIp)))
return
if MaximumNumberOfParallelWebsocketConnectionsPerIp > 0 { // 0 is disabled.
ip := websocketConn.RemoteAddr().String()
numberOfActiveConnections := apil.websocketConnectionLimiter.addIpConnectionAndGetCurrentAmount(ip)
defer apil.websocketConnectionLimiter.decreaseIpConnectionAndGetCurrentAmount(ip)
if numberOfActiveConnections > MaximumNumberOfParallelWebsocketConnectionsPerIp {
websocketConn.WriteMessage(1, []byte(fmt.Sprintf("Too Many Open Connections, limited to %d", MaximumNumberOfParallelWebsocketConnectionsPerIp)))
return
}
}

utils.LavaFormatDebug("tendermintrpc websocket opened", utils.LogAttr("consumerIp", websocketConn.LocalAddr().String()))
Expand Down
1 change: 1 addition & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
// websocket flags
RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection"
BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded"
LimitParallelWebsocketConnectionsPerIpFlag = "limit-parallel-websocket-connections-per-ip"
)

const (
Expand Down
1 change: 1 addition & 0 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
cmdRPCConsumer.Flags().Float64Var(&provideroptimizer.LastTierChance, common.SetProviderOptimizerWorstTierPickChance, 0.0, "set the chances for picking a provider from the worse group, default is 0% -> 0.0")
cmdRPCConsumer.Flags().IntVar(&provideroptimizer.OptimizerNumTiers, common.SetProviderOptimizerNumberOfTiersToCreate, 4, "set the number of groups to create, default is 4")
cmdRPCConsumer.Flags().IntVar(&chainlib.WebSocketRateLimit, common.RateLimitWebSocketFlag, chainlib.WebSocketRateLimit, "rate limit (per second) websocket requests per user connection, default is unlimited")
cmdRPCConsumer.Flags().Int64Var(&chainlib.MaximumNumberOfParallelWebsocketConnectionsPerIp, common.LimitParallelWebsocketConnectionsPerIpFlag, chainlib.MaximumNumberOfParallelWebsocketConnectionsPerIp, "limit number of parallel connections to websocket, per ip, default is unlimited (0)")
cmdRPCConsumer.Flags().DurationVar(&chainlib.WebSocketBanDuration, common.BanDurationForWebsocketRateLimitExceededFlag, chainlib.WebSocketBanDuration, "once websocket rate limit is reached, user will be banned Xfor a duration, default no ban")
common.AddRollingLogConfig(cmdRPCConsumer)
return cmdRPCConsumer
Expand Down

0 comments on commit b39dd76

Please sign in to comment.