Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT - Adding extension trailer for cached replies, and, node er… #1680

Merged
merged 4 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
NODE_ERRORS_PROVIDERS_HEADER_NAME = "Lava-Node-Errors-providers"
REPORTED_PROVIDERS_HEADER_NAME = "Lava-Reported-Providers"
USER_REQUEST_TYPE = "lava-user-request-type"
LAVA_IDENTIFIED_NODE_ERROR_HEADER = "lava-identified-node-error"
LAVAP_VERSION_HEADER_NAME = "Lavap-Version"
LAVA_CONSUMER_PROCESS_GUID = "lava-consumer-process-guid"
// these headers need to be lowercase
Expand Down Expand Up @@ -259,6 +260,7 @@ type RelayResult struct {
StatusCode int
Quorum int
ProviderTrailer metadata.MD // the provider trailer attached to the request. used to transfer useful information (which is not signed so shouldn't be trusted completely).
IsNodeError bool
}

func (rr *RelayResult) GetReplyServer() pairingtypes.Relayer_RelaySubscribeClient {
Expand Down
20 changes: 16 additions & 4 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@
}

returnedResult, err := relayProcessor.ProcessingResult()
rpccs.appendHeadersToRelayResult(ctx, returnedResult, relayProcessor.ProtocolErrors(), relayProcessor, protocolMessage.GetDirectiveHeaders(), protocolMessage.GetApi().GetName())
rpccs.appendHeadersToRelayResult(ctx, returnedResult, relayProcessor.ProtocolErrors(), relayProcessor, protocolMessage, protocolMessage.GetApi().GetName())
if err != nil {
return returnedResult, utils.LavaFormatError("failed processing responses from providers", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("endpoint", rpccs.listenEndpoint.Key()))
}
Expand Down Expand Up @@ -757,9 +757,9 @@
}

errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(protocolMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(expectedRelayTimeoutForQOS), expectedBH, numOfProviders, pairingAddressesLen, protocolMessage.GetApi().Category.HangingApi) // session done successfully

isNodeError, _ := protocolMessage.CheckResponseError(localRelayResult.Reply.Data, localRelayResult.StatusCode)
localRelayResult.IsNodeError = isNodeError
if rpccs.cache.CacheActive() && rpcclient.ValidateStatusCodes(localRelayResult.StatusCode, true) == nil {
isNodeError, _ := protocolMessage.CheckResponseError(localRelayResult.Reply.Data, localRelayResult.StatusCode)
// in case the error is a node error we don't want to cache
if !isNodeError {
// copy reply data so if it changes it doesn't panic mid async send
Expand Down Expand Up @@ -1260,10 +1260,11 @@
chainMessage.SetForceCacheRefresh(ok)
}

func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, directiveHeaders map[string]string, apiName string) {
func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, protocolMessage chainlib.ProtocolMessage, apiName string) {
if relayResult == nil {
return
}

metadataReply := []pairingtypes.Metadata{}
// add the provider that responded

Expand Down Expand Up @@ -1312,6 +1313,16 @@
Value: apiName,
})

// add is node error flag
if relayResult.IsNodeError {
metadataReply = append(metadataReply,
pairingtypes.Metadata{
Name: common.LAVA_IDENTIFIED_NODE_ERROR_HEADER,
Value: "true",
})

Check failure on line 1323 in protocol/rpcconsumer/rpcconsumer_server.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
}

Check failure on line 1324 in protocol/rpcconsumer/rpcconsumer_server.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

// fetch trailer information from the provider by using the provider trailer field.
providerNodeExtensions := relayResult.ProviderTrailer.Get(chainlib.RPCProviderNodeExtension)
if len(providerNodeExtensions) > 0 {
Expand All @@ -1322,6 +1333,7 @@
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD)
}

directiveHeaders := protocolMessage.GetDirectiveHeaders()
_, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY]
if debugRelays {
erroredProviders := relayProcessor.GetUsedProviders().GetErroredProviders()
Expand Down
6 changes: 6 additions & 0 deletions protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,12 @@
if rpcps.cache.CacheActive() && (requestedBlockHash != nil || finalized) {
rpcps.trySetRelayReplyInCache(ctx, request, chainMsg, replyWrapper, latestBlock, averageBlockTime, requestedBlockHash, finalized, ignoredMetadata)
}
} else {

Check failure on line 783 in protocol/rpcprovider/rpcprovider_server.go

View workflow job for this annotation

GitHub Actions / lint

elseif: can replace 'else {if cond {}}' with 'else if cond {}' (gocritic)
// cache Hit.
// if cached, Add Archive trailer if requested by the consumer.
if len(request.RelayData.Extensions) > 0 {
grpc.SetTrailer(ctx, metadata.Pairs(chainlib.RPCProviderNodeExtension, string(lavasession.NewRouterKey(request.RelayData.Extensions))))
}
}

if dataReliabilityEnabled {
Expand Down
87 changes: 87 additions & 0 deletions scripts/pre_setups/init_lava_only_with_node_with_cache_archive.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/bin/bash
__dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
source "$__dir"/../useful_commands.sh
. "${__dir}"/../vars/variables.sh

LOGS_DIR=${__dir}/../../testutil/debugging/logs
mkdir -p $LOGS_DIR
rm $LOGS_DIR/*.log

echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@";
echo "Either use the lava cache header in requests";
echo "Or change the pruning configuration before running";
echo "If you did dont commit the changes.";
echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@";
sleep 3;

killall screen
screen -wipe

echo "[Test Setup] installing all binaries"
make install-all

echo "[Test Setup] setting up a new lava node"
screen -d -m -S node bash -c "./scripts/start_env_dev.sh"
screen -ls
echo "[Test Setup] sleeping 20 seconds for node to finish setup (if its not enough increase timeout)"
sleep 5
wait_for_lava_node_to_start

GASPRICE="0.00002ulava"
lavad tx gov submit-legacy-proposal spec-add ./cookbook/specs/ibc.json,./cookbook/specs/cosmoswasm.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/cosmossdk_45.json,./cookbook/specs/cosmossdk_full.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/cosmoshub.json,./cookbook/specs/lava.json,./cookbook/specs/osmosis.json,./cookbook/specs/fantom.json,./cookbook/specs/celo.json,./cookbook/specs/optimism.json,./cookbook/specs/arbitrum.json,./cookbook/specs/starknet.json,./cookbook/specs/aptos.json,./cookbook/specs/juno.json,./cookbook/specs/polygon.json,./cookbook/specs/evmos.json,./cookbook/specs/base.json,./cookbook/specs/canto.json,./cookbook/specs/sui.json,./cookbook/specs/solana.json,./cookbook/specs/bsc.json,./cookbook/specs/axelar.json,./cookbook/specs/avalanche.json,./cookbook/specs/fvm.json --lava-dev-test -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE &
wait_next_block
wait_next_block
lavad tx gov vote 1 yes -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
sleep 4

# Plans proposal
lavad tx gov submit-legacy-proposal plans-add ./cookbook/plans/test_plans/default.json,./cookbook/plans/test_plans/temporary-add.json -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
wait_next_block
wait_next_block
lavad tx gov vote 2 yes -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE

sleep 4

CLIENTSTAKE="500000000000ulava"
PROVIDERSTAKE="500000000000ulava"

PROVIDER1_LISTENER="127.0.0.1:2221"
PROVIDER2_LISTENER="127.0.0.1:2220"

lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
wait_next_block
lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" 1 $(operator_address) -y --from servicer1 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER2_LISTENER,1,tendermintrpc,rest,grpc,archive" 1 $(operator_address) -y --from servicer2 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE

lavad tx project set-policy $(lavad keys show user1 -a)-admin ./cookbook/projects/policy_all_chains_with_extension.yml -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
wait_next_block
sleep_until_next_epoch

screen -d -m -S cache_consumer bash -c "source ~/.bashrc; lavap cache \
127.0.0.1:20100 --metrics_address 0.0.0.0:20200 --log_level debug 2>&1 | tee $LOGS_DIR/CACHE_CONSUMER.log" && sleep 0.25
sleep 2;
screen -d -m -S cache_provider bash -c "source ~/.bashrc; lavap cache \
127.0.0.1:20101 --metrics_address 0.0.0.0:20201 --log_level debug 2>&1 | tee $LOGS_DIR/CACHE_PROVIDER.log" && sleep 0.25
sleep 2;

screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \
$PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \
$PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \
$PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --cache-be 127.0.0.1:20101 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25

screen -d -m -S provider2 bash -c "source ~/.bashrc; lavap rpcprovider config/provider_examples/lava_example_archive.yml \
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ":7777" --cache-be 127.0.0.1:20101 2>&1 | tee $LOGS_DIR/PROVIDER2.log" && sleep 0.25

screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \
127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \
$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --shared-state --chain-id lava --cache-be 127.0.0.1:20100 --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25

screen -d -m -S consumer2 bash -c "source ~/.bashrc; lavap rpcconsumer \
127.0.0.1:3363 LAV1 rest 127.0.0.1:3364 LAV1 tendermintrpc 127.0.0.1:3365 LAV1 grpc \
$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --shared-state --chain-id lava --cache-be 127.0.0.1:20100 --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS2.log" && sleep 0.25


echo "--- setting up screens done ---"
screen -ls

Loading