Skip to content

Commit

Permalink
feat: PRT - websocket limited per ip
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Oct 11, 2024
1 parent 4394756 commit 7c9eeaa
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
37 changes: 36 additions & 1 deletion protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"strconv"
"sync"
"time"

"github.com/goccy/go-json"
Expand All @@ -28,7 +29,10 @@ import (
spectypes "github.com/lavanet/lava/v3/x/spec/types"
)

const SEP = "&"
const (
SEP = "&"
MaximumNumberOfParallelWebsocketConnectionsPerIp = 5
)

type JsonRPCChainParser struct {
BaseChainParser
Expand Down Expand Up @@ -300,6 +304,27 @@ func (apip *JsonRPCChainParser) ChainBlockStats() (allowedBlockLagForQosSync int
return apip.spec.AllowedBlockLagForQosSync, averageBlockTime, apip.spec.BlockDistanceForFinalizedData, apip.spec.BlocksInFinalizationProof
}

// Will limit a certain amount of connections per IP
type WebsocketConnectionLimiter struct {
ipToNumberOfActiveConnections map[string]uint64
lock sync.RWMutex
}

func (wcl *WebsocketConnectionLimiter) addIpConnectionAndGetCurrentAmount(ip string) uint64 {
wcl.lock.Lock()
defer wcl.lock.Unlock()
// wether it exists or not we add 1.
wcl.ipToNumberOfActiveConnections[ip] += 1
return wcl.ipToNumberOfActiveConnections[ip]
}

func (wcl *WebsocketConnectionLimiter) decreaseIpConnectionAndGetCurrentAmount(ip string) {
wcl.lock.Lock()
defer wcl.lock.Unlock()
// wether it exists or not we add 1.
wcl.ipToNumberOfActiveConnections[ip] -= 1
}

type JsonRPCChainListener struct {
endpoint *lavasession.RPCEndpoint
relaySender RelaySender
Expand All @@ -308,6 +333,7 @@ type JsonRPCChainListener struct {
refererData *RefererData
consumerWsSubscriptionManager *ConsumerWSSubscriptionManager
listeningAddress string
websocketConnectionLimiter *WebsocketConnectionLimiter
}

// NewJrpcChainListener creates a new instance of JsonRPCChainListener
Expand All @@ -325,6 +351,7 @@ func NewJrpcChainListener(ctx context.Context, listenEndpoint *lavasession.RPCEn
logger: rpcConsumerLogs,
refererData: refererData,
consumerWsSubscriptionManager: consumerWsSubscriptionManager,
websocketConnectionLimiter: &WebsocketConnectionLimiter{},
}

return chainListener
Expand Down Expand Up @@ -354,6 +381,14 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con
apiInterface := apil.endpoint.ApiInterface

webSocketCallback := websocket.New(func(websocketConn *websocket.Conn) {
ip := websocketConn.RemoteAddr().String()
numberOfActiveConnections := apil.websocketConnectionLimiter.addIpConnectionAndGetCurrentAmount(ip)
defer apil.websocketConnectionLimiter.decreaseIpConnectionAndGetCurrentAmount(ip)
if numberOfActiveConnections > MaximumNumberOfParallelWebsocketConnectionsPerIp {
websocketConn.WriteMessage(1, []byte(fmt.Sprintf("Too Many Open Connections, limited to %d", MaximumNumberOfParallelWebsocketConnectionsPerIp)))
return
}

utils.LavaFormatDebug("jsonrpc websocket opened", utils.LogAttr("consumerIp", websocketConn.LocalAddr().String()))
defer utils.LavaFormatDebug("jsonrpc websocket closed", utils.LogAttr("consumerIp", websocketConn.LocalAddr().String()))

Expand Down
10 changes: 10 additions & 0 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ type TendermintRpcChainListener struct {
refererData *RefererData
consumerWsSubscriptionManager *ConsumerWSSubscriptionManager
listeningAddress string
websocketConnectionLimiter *WebsocketConnectionLimiter
}

// NewTendermintRpcChainListener creates a new instance of TendermintRpcChainListener
Expand All @@ -351,6 +352,7 @@ func NewTendermintRpcChainListener(ctx context.Context, listenEndpoint *lavasess
logger: rpcConsumerLogs,
refererData: refererData,
consumerWsSubscriptionManager: consumerWsSubscriptionManager,
websocketConnectionLimiter: &WebsocketConnectionLimiter{},
}

return chainListener
Expand Down Expand Up @@ -378,6 +380,14 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context, cmdFlags comm
return fiber.ErrUpgradeRequired
})
webSocketCallback := websocket.New(func(websocketConn *websocket.Conn) {
ip := websocketConn.RemoteAddr().String()
numberOfActiveConnections := apil.websocketConnectionLimiter.addIpConnectionAndGetCurrentAmount(ip)
defer apil.websocketConnectionLimiter.decreaseIpConnectionAndGetCurrentAmount(ip)
if numberOfActiveConnections > MaximumNumberOfParallelWebsocketConnectionsPerIp {
websocketConn.WriteMessage(1, []byte(fmt.Sprintf("Too Many Open Connections, limited to %d", MaximumNumberOfParallelWebsocketConnectionsPerIp)))
return
}

utils.LavaFormatDebug("tendermintrpc websocket opened", utils.LogAttr("consumerIp", websocketConn.LocalAddr().String()))
defer utils.LavaFormatDebug("tendermintrpc websocket closed", utils.LogAttr("consumerIp", websocketConn.LocalAddr().String()))

Expand Down

0 comments on commit 7c9eeaa

Please sign in to comment.