Skip to content

Commit

Permalink
Merge branch 'main' into PRT-1205-support-null-id-in-json-rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
shleikes authored May 21, 2024
2 parents f3e52d6 + bc15366 commit fe1ea1d
Show file tree
Hide file tree
Showing 14 changed files with 489 additions and 203 deletions.
2 changes: 1 addition & 1 deletion proto/lavanet/lava/pairing/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ message QueryProviderRequest {
}

message QueryProviderResponse {
lavanet.lava.epochstorage.StakeEntry stakeEntry = 1 [(gogoproto.nullable) = false];
repeated lavanet.lava.epochstorage.StakeEntry stakeEntries = 1 [(gogoproto.nullable) = false];
}

message QueryGetPairingRequest {
Expand Down
4 changes: 2 additions & 2 deletions protocol/integration/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func (m *mockProviderStateTracker) TxRelayPayment(ctx context.Context, relayRequ
return nil
}

func (m *mockProviderStateTracker) SendVoteReveal(voteID string, vote *reliabilitymanager.VoteData) error {
func (m *mockProviderStateTracker) SendVoteReveal(voteID string, vote *reliabilitymanager.VoteData, specID string) error {
return nil
}

func (m *mockProviderStateTracker) SendVoteCommitment(voteID string, vote *reliabilitymanager.VoteData) error {
func (m *mockProviderStateTracker) SendVoteCommitment(voteID string, vote *reliabilitymanager.VoteData, specID string) error {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ const (
)

type TxSender interface {
SendVoteReveal(voteID string, vote *VoteData) error
SendVoteCommitment(voteID string, vote *VoteData) error
SendVoteReveal(voteID string, vote *VoteData, specID string) error
SendVoteCommitment(voteID string, vote *VoteData, specID string) error
}

type ChainTrackerInf interface {
Expand Down Expand Up @@ -80,7 +80,7 @@ func (rm *ReliabilityManager) VoteHandler(voteParams *VoteParams, nodeHeight uin
}
utils.LavaFormatInfo(" Received Vote Reveal for vote, sending Reveal for result",
utils.Attribute{Key: "voteID", Value: voteID}, utils.Attribute{Key: "voteData", Value: vote})
rm.txSender.SendVoteReveal(voteID, vote)
rm.txSender.SendVoteReveal(voteID, vote, voteParams.ChainID)
return nil
} else {
// new vote
Expand Down Expand Up @@ -131,7 +131,7 @@ func (rm *ReliabilityManager) VoteHandler(voteParams *VoteParams, nodeHeight uin
vote = &VoteData{RelayDataHash: replyDataHash, Nonce: nonce, CommitHash: commitHash}
rm.votes[voteID] = vote
utils.LavaFormatInfo("Received Vote start, sending commitment for result", utils.Attribute{Key: "voteID", Value: voteID}, utils.Attribute{Key: "voteData", Value: vote})
rm.txSender.SendVoteCommitment(voteID, vote)
rm.txSender.SendVoteCommitment(voteID, vote, voteParams.ChainID)
return nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,17 @@ func TestFullFlowReliabilityCompare(t *testing.T) {
}

type mockTx struct {
callbackCommit func(voteID string, voteData *reliabilitymanager.VoteData)
callbackReveal func(voteID string, voteData *reliabilitymanager.VoteData)
callbackCommit func(voteID string, voteData *reliabilitymanager.VoteData, specID string)
callbackReveal func(voteID string, voteData *reliabilitymanager.VoteData, specID string)
}

func (m mockTx) SendVoteCommitment(voteID string, voteData *reliabilitymanager.VoteData) error {
m.callbackCommit(voteID, voteData)
func (m mockTx) SendVoteCommitment(voteID string, voteData *reliabilitymanager.VoteData, specID string) error {
m.callbackCommit(voteID, voteData, specID)
return nil
}

func (m mockTx) SendVoteReveal(voteID string, voteData *reliabilitymanager.VoteData) error {
m.callbackReveal(voteID, voteData)
func (m mockTx) SendVoteReveal(voteID string, voteData *reliabilitymanager.VoteData, specID string) error {
m.callbackReveal(voteID, voteData, specID)
return nil
}

Expand Down Expand Up @@ -326,13 +326,13 @@ func TestFullFlowReliabilityConflict(t *testing.T) {

commitCalled := false
revealCalled := false
sendVoteCommit := func(voteID string, vote *reliabilitymanager.VoteData) {
sendVoteCommit := func(voteID string, vote *reliabilitymanager.VoteData, specID string) {
commitCalled = true
msg := conflicttypes.NewMsgConflictVoteCommit(votingProvider.Addr.String(), voteID, vote.CommitHash)
_, err := ts.Servers.ConflictServer.ConflictVoteCommit(ts.Ctx, msg)
require.NoError(t, err)
}
sendVoteReveal := func(voteID string, vote *reliabilitymanager.VoteData) {
sendVoteReveal := func(voteID string, vote *reliabilitymanager.VoteData, specID string) {
revealCalled = true
msg := conflicttypes.NewMsgConflictVoteReveal(votingProvider.Addr.String(), voteID, vote.Nonce, vote.RelayDataHash)
_, err := ts.Servers.ConflictServer.ConflictVoteReveal(ts.Ctx, msg)
Expand Down
4 changes: 2 additions & 2 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ type ProviderStateTrackerInf interface {
RegisterForEpochUpdates(ctx context.Context, epochUpdatable updaters.EpochUpdatable)
RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error
TxRelayPayment(ctx context.Context, relayRequests []*pairingtypes.RelaySession, description string, latestBlocks []*pairingtypes.LatestBlockReport) error
SendVoteReveal(voteID string, vote *reliabilitymanager.VoteData) error
SendVoteCommitment(voteID string, vote *reliabilitymanager.VoteData) error
SendVoteReveal(voteID string, vote *reliabilitymanager.VoteData, specID string) error
SendVoteCommitment(voteID string, vote *reliabilitymanager.VoteData, specID string) error
LatestBlock() int64
GetMaxCuForUser(ctx context.Context, consumerAddress, chainID string, epocu uint64) (maxCu uint64, err error)
VerifyPairing(ctx context.Context, consumerAddress, providerAddress string, epoch uint64, chainID string) (valid bool, total int64, projectId string, err error)
Expand Down
9 changes: 5 additions & 4 deletions protocol/statetracker/provider_state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func NewProviderStateTracker(ctx context.Context, txFactory tx.Factory, clientCt
}

pst.RegisterForEpochUpdates(ctx, emergencyTracker)
pst.RegisterForEpochUpdates(ctx, txSender)
err = pst.RegisterForDowntimeParamsUpdates(ctx, emergencyTracker)
return pst, err
}
Expand Down Expand Up @@ -127,12 +128,12 @@ func (pst *ProviderStateTracker) TxRelayPayment(ctx context.Context, relayReques
return pst.txSender.TxRelayPayment(ctx, relayRequests, description, latestBlocks)
}

func (pst *ProviderStateTracker) SendVoteReveal(voteID string, vote *reliabilitymanager.VoteData) error {
return pst.txSender.SendVoteReveal(voteID, vote)
func (pst *ProviderStateTracker) SendVoteReveal(voteID string, vote *reliabilitymanager.VoteData, specID string) error {
return pst.txSender.SendVoteReveal(context.Background(), voteID, vote, specID)
}

func (pst *ProviderStateTracker) SendVoteCommitment(voteID string, vote *reliabilitymanager.VoteData) error {
return pst.txSender.SendVoteCommitment(voteID, vote)
func (pst *ProviderStateTracker) SendVoteCommitment(voteID string, vote *reliabilitymanager.VoteData, specID string) error {
return pst.txSender.SendVoteCommitment(context.Background(), voteID, vote, specID)
}

func (pst *ProviderStateTracker) LatestBlock() int64 {
Expand Down
117 changes: 110 additions & 7 deletions protocol/statetracker/tx_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/goccy/go-json"
Expand All @@ -16,9 +17,11 @@ import (
"github.com/cosmos/cosmos-sdk/client/tx"
sdk "github.com/cosmos/cosmos-sdk/types"
typestx "github.com/cosmos/cosmos-sdk/types/tx"
"github.com/cosmos/cosmos-sdk/x/feegrant"
commontypes "github.com/lavanet/lava/common/types"
"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/protocol/rpcprovider/reliabilitymanager"
updaters "github.com/lavanet/lava/protocol/statetracker/updaters"
"github.com/lavanet/lava/utils"
conflicttypes "github.com/lavanet/lava/x/conflict/types"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
Expand Down Expand Up @@ -77,8 +80,11 @@ func (ts *TxSender) checkProfitability(simResult *typestx.SimulateResponse, gasU
return nil
}

func (ts *TxSender) SimulateAndBroadCastTxWithRetryOnSeqMismatch(msg sdk.Msg, checkProfitability bool) error {
func (ts *TxSender) SimulateAndBroadCastTxWithRetryOnSeqMismatch(ctx context.Context, msg sdk.Msg, checkProfitability bool, feeGranter sdk.AccAddress) error {
txfactory := ts.txFactory.WithGasPrices(defaultGasPrice)
if feeGranter != nil {
txfactory = ts.txFactory.WithFeeGranter(feeGranter)
}

if err := msg.ValidateBasic(); err != nil {
return err
Expand Down Expand Up @@ -302,7 +308,7 @@ func NewConsumerTxSender(ctx context.Context, clientCtx client.Context, txFactor

func (ts *ConsumerTxSender) TxSenderConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, sameProviderConflict *conflicttypes.FinalizationConflict) error {
msg := conflicttypes.NewMsgDetection(ts.clientCtx.FromAddress.String(), finalizationConflict, responseConflict, sameProviderConflict)
err := ts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(msg, false)
err := ts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(ctx, msg, false, nil)
if err != nil {
return utils.LavaFormatError("discrepancyChecker - SimulateAndBroadCastTx Failed", err)
}
Expand All @@ -311,6 +317,8 @@ func (ts *ConsumerTxSender) TxSenderConflictDetection(ctx context.Context, final

type ProviderTxSender struct {
*TxSender
vaults map[string]sdk.AccAddress // chain ID -> vault that is a fee granter
vaultsLock sync.RWMutex
}

func NewProviderTxSender(ctx context.Context, clientCtx client.Context, txFactory tx.Factory) (ret *ProviderTxSender, err error) {
Expand All @@ -319,31 +327,126 @@ func NewProviderTxSender(ctx context.Context, clientCtx client.Context, txFactor
return nil, err
}
ts := &ProviderTxSender{TxSender: txSender}
ctxWithTimeout, cancel := context.WithTimeout(ctx, updaters.TimeOutForFetchingLavaBlocks)
defer cancel()
ts.vaults, err = ts.getVaults(ctxWithTimeout)
if err != nil {
utils.LavaFormatInfo("failed getting vaults from chain, could be provider is not staked yet", utils.LogAttr("reason", err))
}
return ts, nil
}

func (pts *ProviderTxSender) UpdateEpoch(epoch uint64) {
// need to update vault information
ctx, cancel := context.WithTimeout(context.Background(), updaters.TimeOutForFetchingLavaBlocks)
defer cancel()
vaults, err := pts.getVaults(ctx)
if err != nil {
// failed getting vaults, try again next epoch
return
}
utils.LavaFormatDebug("ProviderTxSender got epoch update, updating vaults", utils.LogAttr("new vaults", vaults), utils.LogAttr("epoch", epoch))
pts.vaultsLock.Lock()
defer pts.vaultsLock.Unlock()
pts.vaults = vaults
}

func (pts *ProviderTxSender) getVaults(ctx context.Context) (map[string]sdk.AccAddress, error) {
vaults := map[string]sdk.AccAddress{}
pairingQuerier := pairingtypes.NewQueryClient(pts.clientCtx)
res, err := pairingQuerier.Provider(ctx, &pairingtypes.QueryProviderRequest{Address: pts.clientCtx.FromAddress.String()})
if err != nil {
utils.LavaFormatWarning("could not get provider vault addresses. provider is not staked on any chain", err,
utils.LogAttr("provider", pts.clientCtx.FromAddress.String()),
)
return vaults, err
}

// this can happen if the provider is not staked yet
if len(res.StakeEntries) == 0 {
return vaults, utils.LavaFormatWarning("couldn't find entries on chain - could be that the provider is not staked yet", nil)
}

feegrantQuerier := feegrant.NewQueryClient(pts.clientCtx)
for _, stakeEntry := range res.StakeEntries {
if stakeEntry.Vault == stakeEntry.Address {
// if provider == vault, there is no feegrant, skip the stake entry
continue
}
vaultAcc, err := sdk.AccAddressFromBech32(stakeEntry.Vault)
if err != nil {
utils.LavaFormatError("critical: invalid vault address in stake entry", err,
utils.LogAttr("provider", pts.clientCtx.FromAddress.String()),
utils.LogAttr("vault", stakeEntry.Vault),
utils.LogAttr("chain_id", stakeEntry.Chain),
)
continue
}
res, err := feegrantQuerier.Allowance(ctx, &feegrant.QueryAllowanceRequest{
Granter: stakeEntry.Vault,
Grantee: pts.clientCtx.FromAddress.String(),
})
if err != nil {
// Allowance doesn't return an error if allowance doesn't exist. Error is returned on parsing issues
utils.LavaFormatWarning("could not get gas fee allowance for provider and vault", err,
utils.LogAttr("provider", pts.clientCtx.FromAddress.String()),
utils.LogAttr("vault", stakeEntry.Vault),
utils.LogAttr("chain_id", stakeEntry.Chain),
)
continue
}
if res.Allowance != nil {
vaults[stakeEntry.Chain] = vaultAcc
}
}

return vaults, nil
}

func (pts *ProviderTxSender) getFeeGranterFromVaults(chainId string) sdk.AccAddress {
pts.vaultsLock.RLock()
defer pts.vaultsLock.RUnlock()
if len(pts.vaults) == 0 {
return nil
}

feeGranter, ok := pts.vaults[chainId]
if ok {
return feeGranter
}
// else get the first fee granter
for _, val := range pts.vaults {
return val
}

return nil
}

func (pts *ProviderTxSender) TxRelayPayment(ctx context.Context, relayRequests []*pairingtypes.RelaySession, description string, latestBlocks []*pairingtypes.LatestBlockReport) error {
msg := pairingtypes.NewMsgRelayPayment(pts.clientCtx.FromAddress.String(), relayRequests, description, latestBlocks)
utils.LavaFormatDebug("Sending reward TX", utils.LogAttr("Number_of_relay_sessions_for_payment", len(relayRequests)))
err := pts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(msg, true)
feeGranter := pts.getFeeGranterFromVaults("")
err := pts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(ctx, msg, true, feeGranter)
if err != nil {
return utils.LavaFormatError("relay_payment - sending Tx Failed", err)
}
return nil
}

func (pts *ProviderTxSender) SendVoteReveal(voteID string, vote *reliabilitymanager.VoteData) error {
func (pts *ProviderTxSender) SendVoteReveal(ctx context.Context, voteID string, vote *reliabilitymanager.VoteData, specId string) error {
msg := conflicttypes.NewMsgConflictVoteReveal(pts.clientCtx.FromAddress.String(), voteID, vote.Nonce, vote.RelayDataHash)
err := pts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(msg, false)
feeGranter := pts.getFeeGranterFromVaults(specId)
err := pts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(ctx, msg, false, feeGranter)
if err != nil {
return utils.LavaFormatError("SendVoteReveal - SimulateAndBroadCastTx Failed", err)
}
return nil
}

func (pts *ProviderTxSender) SendVoteCommitment(voteID string, vote *reliabilitymanager.VoteData) error {
func (pts *ProviderTxSender) SendVoteCommitment(ctx context.Context, voteID string, vote *reliabilitymanager.VoteData, specId string) error {
msg := conflicttypes.NewMsgConflictVoteCommit(pts.clientCtx.FromAddress.String(), voteID, vote.CommitHash)
err := pts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(msg, false)
feeGranter := pts.getFeeGranterFromVaults(specId)
err := pts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(ctx, msg, false, feeGranter)
if err != nil {
return utils.LavaFormatError("SendVoteCommitment - SimulateAndBroadCastTx Failed", err)
}
Expand Down
Loading

0 comments on commit fe1ea1d

Please sign in to comment.