Skip to content

Commit

Permalink
fix all comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Sep 1, 2024
1 parent 43ec0c6 commit a3af18e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
8 changes: 5 additions & 3 deletions protocol/rpcprovider/provider_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ type RelaySender interface {
type ProviderStateMachine struct {
relayRetriesManager lavaprotocol.RelayRetriesManagerInf
chainId string
relaySender RelaySender
}

func NewProviderStateMachine(chainId string, relayRetriesManager lavaprotocol.RelayRetriesManagerInf) *ProviderStateMachine {
func NewProviderStateMachine(chainId string, relayRetriesManager lavaprotocol.RelayRetriesManagerInf, relaySender RelaySender) *ProviderStateMachine {
return &ProviderStateMachine{
relayRetriesManager: relayRetriesManager,
chainId: chainId,
relaySender: relaySender,
}
}

func (psm *ProviderStateMachine) SendNodeMessage(ctx context.Context, relaySender RelaySender, chainMsg chainlib.ChainMessage, request *pairingtypes.RelayRequest) (*chainlib.RelayReplyWrapper, error) {
func (psm *ProviderStateMachine) SendNodeMessage(ctx context.Context, chainMsg chainlib.ChainMessage, request *pairingtypes.RelayRequest) (*chainlib.RelayReplyWrapper, error) {
hash, err := chainMsg.GetRawRequestHash()
requestHashString := ""
if err != nil {
Expand All @@ -41,7 +43,7 @@ func (psm *ProviderStateMachine) SendNodeMessage(ctx context.Context, relaySende
var isNodeError bool
for retryAttempt := 0; retryAttempt <= numberOfRetriesAllowedOnNodeErrors; retryAttempt++ {
sendTime := time.Now()
replyWrapper, _, _, _, _, err = relaySender.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions)
replyWrapper, _, _, _, _, err = psm.relaySender.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions)
if err != nil {
return nil, utils.LavaFormatError("Sending chainMsg failed", err, utils.LogAttr("attempt", retryAttempt), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId))
}
Expand Down
22 changes: 10 additions & 12 deletions protocol/rpcprovider/provider_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func (rs *relaySenderMock) SendNodeMsg(ctx context.Context, ch chan interface{},
}

func TestStateMachineHappyFlow(t *testing.T) {
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager())
relaySender := &relaySenderMock{}
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender)
chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t))
chainMsgMock.
EXPECT().
Expand All @@ -43,15 +43,15 @@ func TestStateMachineHappyFlow(t *testing.T) {

Check failure on line 43 in protocol/rpcprovider/provider_state_machine_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
}).

Check failure on line 44 in protocol/rpcprovider/provider_state_machine_test.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
AnyTimes()
stateMachine.SendNodeMessage(context.Background(), relaySender, chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
hash, _ := chainMsgMock.GetRawRequestHash()
require.Equal(t, relaySender.numberOfTimesHitSendNodeMsg, numberOfRetriesAllowedOnNodeErrors)
require.False(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash)))
}

func TestStateMachineAllFailureFlows(t *testing.T) {
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager())
relaySender := &relaySenderMock{}
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender)
chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t))
returnFalse := false
chainMsgMock.
Expand All @@ -69,7 +69,7 @@ func TestStateMachineAllFailureFlows(t *testing.T) {
return true, ""
}).
AnyTimes()
stateMachine.SendNodeMessage(context.Background(), relaySender, chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
hash, _ := chainMsgMock.GetRawRequestHash()
require.Equal(t, numberOfRetriesAllowedOnNodeErrors+1, relaySender.numberOfTimesHitSendNodeMsg)
for i := 0; i < 10; i++ {
Expand All @@ -82,14 +82,13 @@ func TestStateMachineAllFailureFlows(t *testing.T) {
require.True(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash)))

// send second relay with same hash.
relaySender2 := &relaySenderMock{}
stateMachine.SendNodeMessage(context.Background(), relaySender2, chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
require.Equal(t, 1, relaySender2.numberOfTimesHitSendNodeMsg) // no retries.
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
require.Equal(t, 4, relaySender.numberOfTimesHitSendNodeMsg) // no retries.
}

func TestStateMachineFailureAndRecoveryFlow(t *testing.T) {
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager())
relaySender := &relaySenderMock{}
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender)
chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t))
returnFalse := false
chainMsgMock.
Expand All @@ -107,7 +106,7 @@ func TestStateMachineFailureAndRecoveryFlow(t *testing.T) {
return true, ""
}).
AnyTimes()
stateMachine.SendNodeMessage(context.Background(), relaySender, chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
hash, _ := chainMsgMock.GetRawRequestHash()
require.Equal(t, numberOfRetriesAllowedOnNodeErrors+1, relaySender.numberOfTimesHitSendNodeMsg)
for i := 0; i < 10; i++ {
Expand All @@ -120,10 +119,9 @@ func TestStateMachineFailureAndRecoveryFlow(t *testing.T) {
require.True(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash)))

// send second relay with same hash.
relaySender3 := &relaySenderMock{}
returnFalse = true
stateMachine.SendNodeMessage(context.Background(), relaySender3, chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
require.Equal(t, 1, relaySender3.numberOfTimesHitSendNodeMsg) // no retries, first success.
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
require.Equal(t, 4, relaySender.numberOfTimesHitSendNodeMsg) // no retries, first success.
// wait for routine to end..
for i := 0; i < 10; i++ {
if !stateMachine.relayRetriesManager.CheckHashInCache(string(hash)) {
Expand Down
4 changes: 2 additions & 2 deletions protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests(
rpcps.metrics = providerMetrics
rpcps.relaysMonitor = relaysMonitor
rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager
rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager())
rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager(), chainRouter)

rpcps.initRelaysMonitor(ctx)
}
Expand Down Expand Up @@ -884,7 +884,7 @@ func (rpcps *RPCProviderServer) sendRelayMessageToNode(ctx context.Context, requ
utils.LavaFormatDebug("adding stickiness header", utils.LogAttr("tokenFromContext", common.GetTokenFromGrpcContext(ctx)), utils.LogAttr("unique_token", common.GetUniqueToken(common.UserData{DappId: consumerAddr.String(), ConsumerIp: common.GetIpFromGrpcContext(ctx)})))
}
// use the provider state machine to send the messages
return rpcps.providerStateMachine.SendNodeMessage(ctx, rpcps.chainRouter, chainMsg, request)
return rpcps.providerStateMachine.SendNodeMessage(ctx, chainMsg, request)
}

func (rpcps *RPCProviderServer) TryRelayUnsubscribe(ctx context.Context, request *pairingtypes.RelayRequest, consumerAddress sdk.AccAddress, chainMessage chainlib.ChainMessage) (*pairingtypes.RelayReply, error) {
Expand Down

0 comments on commit a3af18e

Please sign in to comment.