From 6b38bf7069866fbbeed747cec329f5eb559cc3e2 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Fri, 5 Jan 2024 14:58:19 +0200 Subject: [PATCH] RPC SubmitTransaction: Dequeue old responses from previous requests (#2262) --- app/rpc/rpchandlers/submit_transaction.go | 3 +- cmd/kaspawallet/daemon/server/broadcast.go | 6 ++- .../rpcclient/rpc_send_raw_transaction.go | 41 ++++++++++++++----- .../mempool-limits/transactions.go | 2 +- testing/integration/tx_relay_test.go | 2 +- testing/integration/utxo_index_test.go | 8 ++-- 6 files changed, 43 insertions(+), 19 deletions(-) diff --git a/app/rpc/rpchandlers/submit_transaction.go b/app/rpc/rpchandlers/submit_transaction.go index e962181b83..e1729fcd02 100644 --- a/app/rpc/rpchandlers/submit_transaction.go +++ b/app/rpc/rpchandlers/submit_transaction.go @@ -28,7 +28,8 @@ func HandleSubmitTransaction(context *rpccontext.Context, _ *router.Router, requ } log.Debugf("Rejected transaction %s: %s", transactionID, err) - errorMessage := &appmessage.SubmitTransactionResponseMessage{} + // Return the ID also in the case of error, so that clients can match the response to the correct transaction submit request + errorMessage := appmessage.NewSubmitTransactionResponseMessage(transactionID.String()) errorMessage.Error = appmessage.RPCErrorf("Rejected transaction %s: %s", transactionID, err) return errorMessage, nil } diff --git a/cmd/kaspawallet/daemon/server/broadcast.go b/cmd/kaspawallet/daemon/server/broadcast.go index 027ff41aca..0d01ff6cf7 100644 --- a/cmd/kaspawallet/daemon/server/broadcast.go +++ b/cmd/kaspawallet/daemon/server/broadcast.go @@ -2,14 +2,16 @@ package server import ( "context" + "time" + "github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb" "github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet" "github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet/serialization" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + "github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing" "github.com/kaspanet/kaspad/infrastructure/network/rpcclient" "github.com/pkg/errors" - "time" ) func (s *server) Broadcast(_ context.Context, request *pb.BroadcastRequest) (*pb.BroadcastResponse, error) { @@ -59,7 +61,7 @@ func (s *server) broadcast(transactions [][]byte, isDomain bool) ([]string, erro } func sendTransaction(client *rpcclient.RPCClient, tx *externalapi.DomainTransaction) (string, error) { - submitTransactionResponse, err := client.SubmitTransaction(appmessage.DomainTransactionToRPCTransaction(tx), false) + submitTransactionResponse, err := client.SubmitTransaction(appmessage.DomainTransactionToRPCTransaction(tx), consensushashing.TransactionID(tx).String(), false) if err != nil { return "", errors.Wrapf(err, "error submitting transaction") } diff --git a/infrastructure/network/rpcclient/rpc_send_raw_transaction.go b/infrastructure/network/rpcclient/rpc_send_raw_transaction.go index 1e0f63e82f..295f69b84d 100644 --- a/infrastructure/network/rpcclient/rpc_send_raw_transaction.go +++ b/infrastructure/network/rpcclient/rpc_send_raw_transaction.go @@ -1,23 +1,44 @@ package rpcclient import ( + "strings" + "github.com/kaspanet/kaspad/app/appmessage" ) // SubmitTransaction sends an RPC request respective to the function's name and returns the RPC server's response -func (c *RPCClient) SubmitTransaction(transaction *appmessage.RPCTransaction, allowOrphan bool) (*appmessage.SubmitTransactionResponseMessage, error) { +func (c *RPCClient) SubmitTransaction(transaction *appmessage.RPCTransaction, transactionID string, allowOrphan bool) (*appmessage.SubmitTransactionResponseMessage, error) { err := c.rpcRouter.outgoingRoute().Enqueue(appmessage.NewSubmitTransactionRequestMessage(transaction, allowOrphan)) if err != nil { return nil, err } - response, err := c.route(appmessage.CmdSubmitTransactionResponseMessage).DequeueWithTimeout(c.timeout) - if err != nil { - return nil, err - } - submitTransactionResponse := response.(*appmessage.SubmitTransactionResponseMessage) - if submitTransactionResponse.Error != nil { - return nil, c.convertRPCError(submitTransactionResponse.Error) - } + for { + response, err := c.route(appmessage.CmdSubmitTransactionResponseMessage).DequeueWithTimeout(c.timeout) + if err != nil { + return nil, err + } + submitTransactionResponse := response.(*appmessage.SubmitTransactionResponseMessage) + // Match the response to the expected ID. If they are different it means we got an old response which we + // previously timed-out on, so we log and continue waiting for the correct current response. + if submitTransactionResponse.TransactionID != transactionID { + if submitTransactionResponse.Error != nil { + // A non-updated Kaspad might return an empty ID in the case of error, so in + // such a case we fallback to checking if the error contains the expected ID + if submitTransactionResponse.TransactionID != "" || !strings.Contains(submitTransactionResponse.Error.Message, transactionID) { + log.Warnf("SubmitTransaction: received an error response for previous request: %s", submitTransactionResponse.Error) + continue + } + + } else { + log.Warnf("SubmitTransaction: received a successful response for previous request with ID %s", + submitTransactionResponse.TransactionID) + continue + } + } + if submitTransactionResponse.Error != nil { + return nil, c.convertRPCError(submitTransactionResponse.Error) + } - return submitTransactionResponse, nil + return submitTransactionResponse, nil + } } diff --git a/stability-tests/mempool-limits/transactions.go b/stability-tests/mempool-limits/transactions.go index 9123f74fb6..4c4782437f 100644 --- a/stability-tests/mempool-limits/transactions.go +++ b/stability-tests/mempool-limits/transactions.go @@ -86,7 +86,7 @@ func submitAnAmountOfTransactionsToTheMempool(t *testing.T, rpcClient *rpcclient for i, transaction := range transactions { rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction) - _, err := rpcClient.SubmitTransaction(rpcTransaction, false) + _, err := rpcClient.SubmitTransaction(rpcTransaction, consensushashing.TransactionID(transaction).String(), false) if err != nil { if ignoreOrphanRejects && strings.Contains(err.Error(), "orphan") { continue diff --git a/testing/integration/tx_relay_test.go b/testing/integration/tx_relay_test.go index b6863e7cc9..cd01efd1f2 100644 --- a/testing/integration/tx_relay_test.go +++ b/testing/integration/tx_relay_test.go @@ -53,7 +53,7 @@ func TestTxRelay(t *testing.T) { msgTx := generateTx(t, secondBlock.Transactions[transactionhelper.CoinbaseTransactionIndex], payer, payee) domainTransaction := appmessage.MsgTxToDomainTransaction(msgTx) rpcTransaction := appmessage.DomainTransactionToRPCTransaction(domainTransaction) - response, err := payer.rpcClient.SubmitTransaction(rpcTransaction, false) + response, err := payer.rpcClient.SubmitTransaction(rpcTransaction, consensushashing.TransactionID(domainTransaction).String(), false) if err != nil { t.Fatalf("Error submitting transaction: %+v", err) } diff --git a/testing/integration/utxo_index_test.go b/testing/integration/utxo_index_test.go index 772c96646c..04e4ff5584 100644 --- a/testing/integration/utxo_index_test.go +++ b/testing/integration/utxo_index_test.go @@ -88,8 +88,8 @@ func TestUTXOIndex(t *testing.T) { // Submit a few transactions that spends some UTXOs const transactionAmountToSpend = 5 for i := 0; i < transactionAmountToSpend; i++ { - rpcTransaction := buildTransactionForUTXOIndexTest(t, notificationEntries[i]) - _, err = kaspad.rpcClient.SubmitTransaction(rpcTransaction, false) + rpcTransaction, transactionID := buildTransactionForUTXOIndexTest(t, notificationEntries[i]) + _, err = kaspad.rpcClient.SubmitTransaction(rpcTransaction, transactionID, false) if err != nil { t.Fatalf("Error submitting transaction: %s", err) } @@ -171,7 +171,7 @@ func TestUTXOIndex(t *testing.T) { } } -func buildTransactionForUTXOIndexTest(t *testing.T, entry *appmessage.UTXOsByAddressesEntry) *appmessage.RPCTransaction { +func buildTransactionForUTXOIndexTest(t *testing.T, entry *appmessage.UTXOsByAddressesEntry) (*appmessage.RPCTransaction, string) { transactionIDBytes, err := hex.DecodeString(entry.Outpoint.TransactionID) if err != nil { t.Fatalf("Error decoding transaction ID: %s", err) @@ -224,5 +224,5 @@ func buildTransactionForUTXOIndexTest(t *testing.T, entry *appmessage.UTXOsByAdd msgTx.TxIn[0].SignatureScript = signatureScript domainTransaction := appmessage.MsgTxToDomainTransaction(msgTx) - return appmessage.DomainTransactionToRPCTransaction(domainTransaction) + return appmessage.DomainTransactionToRPCTransaction(domainTransaction), consensushashing.TransactionID(domainTransaction).String() }