Skip to content

Commit

Permalink
feat: PRT - Adding extension trailer for cached replies, and, node er… (
Browse files Browse the repository at this point in the history
#1680)

* feat: PRT - Adding extension trailer for cached replies, and, node error identifier

* fix

* fix da lint
  • Loading branch information
ranlavanet authored Sep 9, 2024
1 parent bf6c6c9 commit 0c17bba
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 4 deletions.
2 changes: 2 additions & 0 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
NODE_ERRORS_PROVIDERS_HEADER_NAME = "Lava-Node-Errors-providers"
REPORTED_PROVIDERS_HEADER_NAME = "Lava-Reported-Providers"
USER_REQUEST_TYPE = "lava-user-request-type"
LAVA_IDENTIFIED_NODE_ERROR_HEADER = "lava-identified-node-error"
LAVAP_VERSION_HEADER_NAME = "Lavap-Version"
LAVA_CONSUMER_PROCESS_GUID = "lava-consumer-process-guid"
// these headers need to be lowercase
Expand Down Expand Up @@ -259,6 +260,7 @@ type RelayResult struct {
StatusCode int
Quorum int
ProviderTrailer metadata.MD // the provider trailer attached to the request. used to transfer useful information (which is not signed so shouldn't be trusted completely).
IsNodeError bool
}

func (rr *RelayResult) GetReplyServer() pairingtypes.Relayer_RelaySubscribeClient {
Expand Down
19 changes: 15 additions & 4 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (rpccs *RPCConsumerServer) SendParsedRelay(
}

returnedResult, err := relayProcessor.ProcessingResult()
rpccs.appendHeadersToRelayResult(ctx, returnedResult, relayProcessor.ProtocolErrors(), relayProcessor, protocolMessage.GetDirectiveHeaders(), protocolMessage.GetApi().GetName())
rpccs.appendHeadersToRelayResult(ctx, returnedResult, relayProcessor.ProtocolErrors(), relayProcessor, protocolMessage, protocolMessage.GetApi().GetName())
if err != nil {
return returnedResult, utils.LavaFormatError("failed processing responses from providers", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("endpoint", rpccs.listenEndpoint.Key()))
}
Expand Down Expand Up @@ -757,9 +757,9 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider(
}

errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(protocolMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(expectedRelayTimeoutForQOS), expectedBH, numOfProviders, pairingAddressesLen, protocolMessage.GetApi().Category.HangingApi) // session done successfully

isNodeError, _ := protocolMessage.CheckResponseError(localRelayResult.Reply.Data, localRelayResult.StatusCode)
localRelayResult.IsNodeError = isNodeError
if rpccs.cache.CacheActive() && rpcclient.ValidateStatusCodes(localRelayResult.StatusCode, true) == nil {
isNodeError, _ := protocolMessage.CheckResponseError(localRelayResult.Reply.Data, localRelayResult.StatusCode)
// in case the error is a node error we don't want to cache
if !isNodeError {
// copy reply data so if it changes it doesn't panic mid async send
Expand Down Expand Up @@ -1260,10 +1260,11 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch
chainMessage.SetForceCacheRefresh(ok)
}

func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, directiveHeaders map[string]string, apiName string) {
func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, protocolMessage chainlib.ProtocolMessage, apiName string) {
if relayResult == nil {
return
}

metadataReply := []pairingtypes.Metadata{}
// add the provider that responded

Expand Down Expand Up @@ -1312,6 +1313,15 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context,
Value: apiName,
})

// add is node error flag
if relayResult.IsNodeError {
metadataReply = append(metadataReply,
pairingtypes.Metadata{
Name: common.LAVA_IDENTIFIED_NODE_ERROR_HEADER,
Value: "true",
})
}

// fetch trailer information from the provider by using the provider trailer field.
providerNodeExtensions := relayResult.ProviderTrailer.Get(chainlib.RPCProviderNodeExtension)
if len(providerNodeExtensions) > 0 {
Expand All @@ -1322,6 +1332,7 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context,
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD)
}

directiveHeaders := protocolMessage.GetDirectiveHeaders()
_, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY]
if debugRelays {
erroredProviders := relayProcessor.GetUsedProviders().GetErroredProviders()
Expand Down
3 changes: 3 additions & 0 deletions protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,9 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty
if rpcps.cache.CacheActive() && (requestedBlockHash != nil || finalized) {
rpcps.trySetRelayReplyInCache(ctx, request, chainMsg, replyWrapper, latestBlock, averageBlockTime, requestedBlockHash, finalized, ignoredMetadata)
}
} else if len(request.RelayData.Extensions) > 0 {
// if cached, Add Archive trailer if requested by the consumer.
grpc.SetTrailer(ctx, metadata.Pairs(chainlib.RPCProviderNodeExtension, string(lavasession.NewRouterKey(request.RelayData.Extensions))))
}

if dataReliabilityEnabled {
Expand Down
87 changes: 87 additions & 0 deletions scripts/pre_setups/init_lava_only_with_node_with_cache_archive.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/bin/bash
__dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
source "$__dir"/../useful_commands.sh
. "${__dir}"/../vars/variables.sh

LOGS_DIR=${__dir}/../../testutil/debugging/logs
mkdir -p $LOGS_DIR
rm $LOGS_DIR/*.log

echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@";
echo "Either use the lava cache header in requests";
echo "Or change the pruning configuration before running";
echo "If you did dont commit the changes.";
echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@";
sleep 3;

killall screen
screen -wipe

echo "[Test Setup] installing all binaries"
make install-all

echo "[Test Setup] setting up a new lava node"
screen -d -m -S node bash -c "./scripts/start_env_dev.sh"
screen -ls
echo "[Test Setup] sleeping 20 seconds for node to finish setup (if its not enough increase timeout)"
sleep 5
wait_for_lava_node_to_start

GASPRICE="0.00002ulava"
lavad tx gov submit-legacy-proposal spec-add ./cookbook/specs/ibc.json,./cookbook/specs/cosmoswasm.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/cosmossdk_45.json,./cookbook/specs/cosmossdk_full.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/cosmoshub.json,./cookbook/specs/lava.json,./cookbook/specs/osmosis.json,./cookbook/specs/fantom.json,./cookbook/specs/celo.json,./cookbook/specs/optimism.json,./cookbook/specs/arbitrum.json,./cookbook/specs/starknet.json,./cookbook/specs/aptos.json,./cookbook/specs/juno.json,./cookbook/specs/polygon.json,./cookbook/specs/evmos.json,./cookbook/specs/base.json,./cookbook/specs/canto.json,./cookbook/specs/sui.json,./cookbook/specs/solana.json,./cookbook/specs/bsc.json,./cookbook/specs/axelar.json,./cookbook/specs/avalanche.json,./cookbook/specs/fvm.json --lava-dev-test -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE &
wait_next_block
wait_next_block
lavad tx gov vote 1 yes -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
sleep 4

# Plans proposal
lavad tx gov submit-legacy-proposal plans-add ./cookbook/plans/test_plans/default.json,./cookbook/plans/test_plans/temporary-add.json -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
wait_next_block
wait_next_block
lavad tx gov vote 2 yes -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE

sleep 4

CLIENTSTAKE="500000000000ulava"
PROVIDERSTAKE="500000000000ulava"

PROVIDER1_LISTENER="127.0.0.1:2221"
PROVIDER2_LISTENER="127.0.0.1:2220"

lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
wait_next_block
lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" 1 $(operator_address) -y --from servicer1 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER2_LISTENER,1,tendermintrpc,rest,grpc,archive" 1 $(operator_address) -y --from servicer2 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE

lavad tx project set-policy $(lavad keys show user1 -a)-admin ./cookbook/projects/policy_all_chains_with_extension.yml -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
wait_next_block
sleep_until_next_epoch

screen -d -m -S cache_consumer bash -c "source ~/.bashrc; lavap cache \
127.0.0.1:20100 --metrics_address 0.0.0.0:20200 --log_level debug 2>&1 | tee $LOGS_DIR/CACHE_CONSUMER.log" && sleep 0.25
sleep 2;
screen -d -m -S cache_provider bash -c "source ~/.bashrc; lavap cache \
127.0.0.1:20101 --metrics_address 0.0.0.0:20201 --log_level debug 2>&1 | tee $LOGS_DIR/CACHE_PROVIDER.log" && sleep 0.25
sleep 2;

screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \
$PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \
$PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \
$PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --cache-be 127.0.0.1:20101 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25

screen -d -m -S provider2 bash -c "source ~/.bashrc; lavap rpcprovider config/provider_examples/lava_example_archive.yml \
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ":7777" --cache-be 127.0.0.1:20101 2>&1 | tee $LOGS_DIR/PROVIDER2.log" && sleep 0.25

screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \
127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \
$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --shared-state --chain-id lava --cache-be 127.0.0.1:20100 --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25

screen -d -m -S consumer2 bash -c "source ~/.bashrc; lavap rpcconsumer \
127.0.0.1:3363 LAV1 rest 127.0.0.1:3364 LAV1 tendermintrpc 127.0.0.1:3365 LAV1 grpc \
$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --shared-state --chain-id lava --cache-be 127.0.0.1:20100 --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS2.log" && sleep 0.25


echo "--- setting up screens done ---"
screen -ls

0 comments on commit 0c17bba

Please sign in to comment.