diff --git a/protocol/common/endpoints.go b/protocol/common/endpoints.go index 06c95ba5f9..99cd6504ac 100644 --- a/protocol/common/endpoints.go +++ b/protocol/common/endpoints.go @@ -28,6 +28,7 @@ const ( NODE_ERRORS_PROVIDERS_HEADER_NAME = "Lava-Node-Errors-providers" REPORTED_PROVIDERS_HEADER_NAME = "Lava-Reported-Providers" USER_REQUEST_TYPE = "lava-user-request-type" + LAVA_IDENTIFIED_NODE_ERROR_HEADER = "lava-identified-node-error" LAVAP_VERSION_HEADER_NAME = "Lavap-Version" LAVA_CONSUMER_PROCESS_GUID = "lava-consumer-process-guid" // these headers need to be lowercase @@ -259,6 +260,7 @@ type RelayResult struct { StatusCode int Quorum int ProviderTrailer metadata.MD // the provider trailer attached to the request. used to transfer useful information (which is not signed so shouldn't be trusted completely). + IsNodeError bool } func (rr *RelayResult) GetReplyServer() pairingtypes.Relayer_RelaySubscribeClient { diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index a1f0adfa1d..d74a4fd141 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -403,7 +403,7 @@ func (rpccs *RPCConsumerServer) SendParsedRelay( } returnedResult, err := relayProcessor.ProcessingResult() - rpccs.appendHeadersToRelayResult(ctx, returnedResult, relayProcessor.ProtocolErrors(), relayProcessor, protocolMessage.GetDirectiveHeaders(), protocolMessage.GetApi().GetName()) + rpccs.appendHeadersToRelayResult(ctx, returnedResult, relayProcessor.ProtocolErrors(), relayProcessor, protocolMessage, protocolMessage.GetApi().GetName()) if err != nil { return returnedResult, utils.LavaFormatError("failed processing responses from providers", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("endpoint", rpccs.listenEndpoint.Key())) } @@ -757,9 +757,9 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( } errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(protocolMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(expectedRelayTimeoutForQOS), expectedBH, numOfProviders, pairingAddressesLen, protocolMessage.GetApi().Category.HangingApi) // session done successfully - + isNodeError, _ := protocolMessage.CheckResponseError(localRelayResult.Reply.Data, localRelayResult.StatusCode) + localRelayResult.IsNodeError = isNodeError if rpccs.cache.CacheActive() && rpcclient.ValidateStatusCodes(localRelayResult.StatusCode, true) == nil { - isNodeError, _ := protocolMessage.CheckResponseError(localRelayResult.Reply.Data, localRelayResult.StatusCode) // in case the error is a node error we don't want to cache if !isNodeError { // copy reply data so if it changes it doesn't panic mid async send @@ -1260,10 +1260,11 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch chainMessage.SetForceCacheRefresh(ok) } -func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, directiveHeaders map[string]string, apiName string) { +func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, protocolMessage chainlib.ProtocolMessage, apiName string) { if relayResult == nil { return } + metadataReply := []pairingtypes.Metadata{} // add the provider that responded @@ -1312,6 +1313,15 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, Value: apiName, }) + // add is node error flag + if relayResult.IsNodeError { + metadataReply = append(metadataReply, + pairingtypes.Metadata{ + Name: common.LAVA_IDENTIFIED_NODE_ERROR_HEADER, + Value: "true", + }) + } + // fetch trailer information from the provider by using the provider trailer field. providerNodeExtensions := relayResult.ProviderTrailer.Get(chainlib.RPCProviderNodeExtension) if len(providerNodeExtensions) > 0 { @@ -1322,6 +1332,7 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD) } + directiveHeaders := protocolMessage.GetDirectiveHeaders() _, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY] if debugRelays { erroredProviders := relayProcessor.GetUsedProviders().GetErroredProviders() diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 181a4d355e..539e728165 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -780,6 +780,9 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty if rpcps.cache.CacheActive() && (requestedBlockHash != nil || finalized) { rpcps.trySetRelayReplyInCache(ctx, request, chainMsg, replyWrapper, latestBlock, averageBlockTime, requestedBlockHash, finalized, ignoredMetadata) } + } else if len(request.RelayData.Extensions) > 0 { + // if cached, Add Archive trailer if requested by the consumer. + grpc.SetTrailer(ctx, metadata.Pairs(chainlib.RPCProviderNodeExtension, string(lavasession.NewRouterKey(request.RelayData.Extensions)))) } if dataReliabilityEnabled { diff --git a/scripts/pre_setups/init_lava_only_with_node_with_cache_archive.sh b/scripts/pre_setups/init_lava_only_with_node_with_cache_archive.sh new file mode 100755 index 0000000000..76ba4ecbcf --- /dev/null +++ b/scripts/pre_setups/init_lava_only_with_node_with_cache_archive.sh @@ -0,0 +1,87 @@ +#!/bin/bash +__dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source "$__dir"/../useful_commands.sh +. "${__dir}"/../vars/variables.sh + +LOGS_DIR=${__dir}/../../testutil/debugging/logs +mkdir -p $LOGS_DIR +rm $LOGS_DIR/*.log + +echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"; +echo "Either use the lava cache header in requests"; +echo "Or change the pruning configuration before running"; +echo "If you did dont commit the changes."; +echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"; +sleep 3; + +killall screen +screen -wipe + +echo "[Test Setup] installing all binaries" +make install-all + +echo "[Test Setup] setting up a new lava node" +screen -d -m -S node bash -c "./scripts/start_env_dev.sh" +screen -ls +echo "[Test Setup] sleeping 20 seconds for node to finish setup (if its not enough increase timeout)" +sleep 5 +wait_for_lava_node_to_start + +GASPRICE="0.00002ulava" +lavad tx gov submit-legacy-proposal spec-add ./cookbook/specs/ibc.json,./cookbook/specs/cosmoswasm.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/cosmossdk_45.json,./cookbook/specs/cosmossdk_full.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/cosmoshub.json,./cookbook/specs/lava.json,./cookbook/specs/osmosis.json,./cookbook/specs/fantom.json,./cookbook/specs/celo.json,./cookbook/specs/optimism.json,./cookbook/specs/arbitrum.json,./cookbook/specs/starknet.json,./cookbook/specs/aptos.json,./cookbook/specs/juno.json,./cookbook/specs/polygon.json,./cookbook/specs/evmos.json,./cookbook/specs/base.json,./cookbook/specs/canto.json,./cookbook/specs/sui.json,./cookbook/specs/solana.json,./cookbook/specs/bsc.json,./cookbook/specs/axelar.json,./cookbook/specs/avalanche.json,./cookbook/specs/fvm.json --lava-dev-test -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE & +wait_next_block +wait_next_block +lavad tx gov vote 1 yes -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +sleep 4 + +# Plans proposal +lavad tx gov submit-legacy-proposal plans-add ./cookbook/plans/test_plans/default.json,./cookbook/plans/test_plans/temporary-add.json -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +wait_next_block +wait_next_block +lavad tx gov vote 2 yes -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE + +sleep 4 + +CLIENTSTAKE="500000000000ulava" +PROVIDERSTAKE="500000000000ulava" + +PROVIDER1_LISTENER="127.0.0.1:2221" +PROVIDER2_LISTENER="127.0.0.1:2220" + +lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +wait_next_block +lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" 1 $(operator_address) -y --from servicer1 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER2_LISTENER,1,tendermintrpc,rest,grpc,archive" 1 $(operator_address) -y --from servicer2 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE + +lavad tx project set-policy $(lavad keys show user1 -a)-admin ./cookbook/projects/policy_all_chains_with_extension.yml -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +wait_next_block +sleep_until_next_epoch + +screen -d -m -S cache_consumer bash -c "source ~/.bashrc; lavap cache \ +127.0.0.1:20100 --metrics_address 0.0.0.0:20200 --log_level debug 2>&1 | tee $LOGS_DIR/CACHE_CONSUMER.log" && sleep 0.25 +sleep 2; +screen -d -m -S cache_provider bash -c "source ~/.bashrc; lavap cache \ +127.0.0.1:20101 --metrics_address 0.0.0.0:20201 --log_level debug 2>&1 | tee $LOGS_DIR/CACHE_PROVIDER.log" && sleep 0.25 +sleep 2; + +screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ +$PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ +$PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \ +$PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --cache-be 127.0.0.1:20101 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 + +screen -d -m -S provider2 bash -c "source ~/.bashrc; lavap rpcprovider config/provider_examples/lava_example_archive.yml \ +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ":7777" --cache-be 127.0.0.1:20101 2>&1 | tee $LOGS_DIR/PROVIDER2.log" && sleep 0.25 + +screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ +127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ +$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --shared-state --chain-id lava --cache-be 127.0.0.1:20100 --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 + +screen -d -m -S consumer2 bash -c "source ~/.bashrc; lavap rpcconsumer \ +127.0.0.1:3363 LAV1 rest 127.0.0.1:3364 LAV1 tendermintrpc 127.0.0.1:3365 LAV1 grpc \ +$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --shared-state --chain-id lava --cache-be 127.0.0.1:20100 --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS2.log" && sleep 0.25 + + +echo "--- setting up screens done ---" +screen -ls +