Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(watchers): align taker fee validation retries with makers #2263

Merged
merged 5 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,15 @@ impl ElectrumConnection {
};

let Some(dns_name) = uri.host().map(String::from) else {
return Err(ElectrumConnectionErr::Irrecoverable("Couldn't retrieve host from address".to_string()));
return Err(ElectrumConnectionErr::Irrecoverable(
"Couldn't retrieve host from address".to_string(),
));
};

let Ok(dns) = server_name_from_domain(dns_name.as_str()) else {
return Err(ElectrumConnectionErr::Irrecoverable("Address isn't a valid domain name".to_string()));
return Err(ElectrumConnectionErr::Irrecoverable(
"Address isn't a valid domain name".to_string(),
));
};

let tls_connector = if connection.settings.disable_cert_verification {
Expand Down
27 changes: 13 additions & 14 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ use instant::{Duration, Instant};
use lazy_static::lazy_static;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::MmError;
use mm2_err_handle::prelude::*;
use mm2_libp2p::p2p_ctx::P2PContext;
use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerAddress, TopicPrefix};
use ser_error_derive::SerializeErrorType;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::sync::Mutex;

use crate::lp_network::broadcast_p2p_msg;
use crate::lp_network::{broadcast_p2p_msg, P2PRequestError, P2PRequestResult};

pub(crate) const PEER_HEALTHCHECK_PREFIX: TopicPrefix = "hcheck";

Expand Down Expand Up @@ -279,7 +280,10 @@ pub async fn peer_connection_healthcheck_rpc(
Ok(rx.timeout(timeout_duration).await == Ok(Ok(())))
}

pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_libp2p::GossipsubMessage) {
pub(crate) async fn process_p2p_healthcheck_message(
ctx: &MmArc,
message: mm2_libp2p::GossipsubMessage,
) -> P2PRequestResult<()> {
macro_rules! try_or_return {
($exp:expr, $msg: expr) => {
match $exp {
Expand All @@ -292,24 +296,17 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
};
}

let data = try_or_return!(
HealthcheckMessage::decode(&message.data),
"Couldn't decode healthcheck message"
);
let data = HealthcheckMessage::decode(&message.data)
.map_to_mm(|e| P2PRequestError::DecodeError(format!("Couldn't decode healthcheck message: {}", e)))?;
let sender_peer = data.is_received_message_valid().map_to_mm(|e| {
P2PRequestError::ValidationFailed(format!("Received an invalid healthcheck message. Error: {}", e))
})?;

let ctx = ctx.clone();

// Pass the remaining work to another thread to free up this one as soon as possible,
// so KDF can handle a high amount of healthcheck messages more efficiently.
ctx.spawner().spawn(async move {
let sender_peer = match data.is_received_message_valid() {
Ok(t) => t,
Err(e) => {
log::error!("Received an invalid healthcheck message. Error: {e}");
return;
},
};

if data.should_reply() {
// Reply the message so they know we are healthy.

Expand Down Expand Up @@ -337,6 +334,8 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
};
}
});

Ok(())
}

#[cfg(any(test, target_arch = "wasm32"))]
Expand Down
20 changes: 15 additions & 5 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub enum P2PRequestError {
ResponseError(String),
#[display(fmt = "Expected 1 response, found {}", _0)]
ExpectedSingleResponseError(usize),
ValidationFailed(String),
}

/// Enum covering error cases that can happen during P2P message processing.
Expand Down Expand Up @@ -190,15 +191,16 @@ async fn process_p2p_message(
to_propagate = true;
},
Some(lp_swap::TX_HELPER_PREFIX) => {
if let Some(pair) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await {
if let Some(ticker) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, ticker).await {
if let Err(e) = coin.tx_enum_from_bytes(&message.data) {
log::error!("Message cannot continue the process due to: {:?}", e);
return;
};

let fut = coin.send_raw_tx_bytes(&message.data);
ctx.spawner().spawn(async {
if coin.is_utxo_in_native_mode() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about that, Should we allow seed nodes to send failed swap transactions to electrums? My opinion is no, but opening this to discussion. There is also this comment #1238 (comment) to think about.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we sending the tx only when running in top of native?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use this in case the electrum/s used by the user in the swap is down or censoring transactions, if any one node in the network have the coin enabled with a native daemon, it will be an additional way to get the transaction through. After your electrums manager PR, we can add all the available electrums to the user's list and since all electrums are tried for broadcasting, this message will be only used as fallback if all electrums fail. Now, one of the seed nodes can have a different electrum which is why I thought about "Should we allow seed nodes to send failed swap transactions to electrums?" but the drawback of this is that this will be resource exhaustive for electrums which is another reason we only send the tx to nodes running the coin on top of native daemons.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean "send failed swap transactions"? (Are those refund transactions - why then should they be treated differently)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are spend or refund transactions. If electrums censor or fail the broadcasting of these transactions, we rely on nodes running native daemon to get it into the mempool. It should be combined with spv validation in the future, so even if an electrum returns transaction broadcasted successfully, we make sure by validating it and if spv fails we try this p2p transaction helper as a last resort.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also wondered what would happen if several nodes in native mode are requested to send same transaction (like banning nodes).
Apparently not, as I can see in kmd daemons code: if a transaction is relayed from some node's mempool to other nodes (which also have the same transaction in mempool) they do not consider the originating node as misbehaving.

let fut = coin.send_raw_tx_bytes(&message.data);
ctx.spawner().spawn(async {
match fut.compat().await {
Ok(id) => log::debug!("Transaction broadcasted successfully: {:?} ", id),
// TODO (After https://github.com/KomodoPlatform/atomicDEX-API/pull/1433)
Expand All @@ -207,11 +209,19 @@ async fn process_p2p_message(
Err(e) => log::error!("Broadcast transaction failed (ignore this error if the transaction already sent by another seednode). {}", e),
};
})
}
}

to_propagate = true;
}
},
Some(lp_healthcheck::PEER_HEALTHCHECK_PREFIX) => {
lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await
if let Err(e) = lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await {
log::error!("{}", e);
return;
}

to_propagate = true;
},
None | Some(_) => (),
}
Expand Down
5 changes: 4 additions & 1 deletion mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,11 @@ pub const TX_HELPER_PREFIX: TopicPrefix = "txhlp";
pub(crate) const LEGACY_SWAP_TYPE: u8 = 0;
pub(crate) const MAKER_SWAP_V2_TYPE: u8 = 1;
pub(crate) const TAKER_SWAP_V2_TYPE: u8 = 2;
const MAX_STARTED_AT_DIFF: u64 = 60;

pub(crate) const TAKER_FEE_VALIDATION_ATTEMPTS: usize = 6;
pub(crate) const TAKER_FEE_VALIDATION_RETRY_DELAY_SECS: f64 = 10.;

const MAX_STARTED_AT_DIFF: u64 = 60;
const NEGOTIATE_SEND_INTERVAL: f64 = 30.;

/// If a certain P2P message is not received, swap will be aborted after this time expires.
Expand Down
7 changes: 4 additions & 3 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use super::{broadcast_my_swap_status, broadcast_p2p_tx_msg, broadcast_swap_msg_e
wait_for_maker_payment_conf_until, AtomicSwap, LockedAmount, MySwapInfo, NegotiationDataMsg,
NegotiationDataV2, NegotiationDataV3, RecoveredSwap, RecoveredSwapAction, SavedSwap, SavedSwapIo,
SavedTradeFee, SecretHashAlgo, SwapConfirmationsSettings, SwapError, SwapMsg, SwapPubkeys, SwapTxDataMsg,
SwapsContext, TransactionIdentifier, INCLUDE_REFUND_FEE, NO_REFUND_FEE, WAIT_CONFIRM_INTERVAL_SEC};
SwapsContext, TransactionIdentifier, INCLUDE_REFUND_FEE, NO_REFUND_FEE, TAKER_FEE_VALIDATION_ATTEMPTS,
TAKER_FEE_VALIDATION_RETRY_DELAY_SECS, WAIT_CONFIRM_INTERVAL_SEC};
use crate::lp_dispatcher::{DispatcherContext, LpEvents};
use crate::lp_network::subscribe_to_topic;
use crate::lp_ordermatch::MakerOrderBuilder;
Expand Down Expand Up @@ -771,13 +772,13 @@ impl MakerSwap {
{
Ok(_) => break,
Err(err) => {
if attempts >= 6 {
if attempts >= TAKER_FEE_VALIDATION_ATTEMPTS {
return Ok((Some(MakerSwapCommand::Finish), vec![
MakerSwapEvent::TakerFeeValidateFailed(ERRL!("{}", err).into()),
]));
} else {
attempts += 1;
Timer::sleep(10.).await;
Timer::sleep(TAKER_FEE_VALIDATION_RETRY_DELAY_SECS).await;
}
},
};
Expand Down
44 changes: 26 additions & 18 deletions mm2src/mm2_main/src/lp_swap/swap_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{broadcast_p2p_tx_msg, get_payment_locktime, lp_coinfind, taker_payment_spend_deadline, tx_helper_topic,
H256Json, SwapsContext, WAIT_CONFIRM_INTERVAL_SEC};
H256Json, SwapsContext, TAKER_FEE_VALIDATION_ATTEMPTS, TAKER_FEE_VALIDATION_RETRY_DELAY_SECS,
WAIT_CONFIRM_INTERVAL_SEC};
use crate::lp_network::{P2PRequestError, P2PRequestResult};

use crate::MmError;
Expand Down Expand Up @@ -181,24 +182,31 @@ impl State for ValidateTakerFee {

async fn on_changed(self: Box<Self>, watcher_ctx: &mut WatcherStateMachine) -> StateResult<WatcherStateMachine> {
debug!("Watcher validate taker fee");
let validated_f = watcher_ctx
.taker_coin
.watcher_validate_taker_fee(WatcherValidateTakerFeeInput {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this watcher_validate_taker_fee fn used to fail because inside there is a check that tx.height should be over min_block_number. The case when the tx is in mempool (tx.height is None) is processed and validation does not fail.
But tx.height also may be not None and contain value of '-1' (at least in KMD) if the tx is in a forked branch (so tx validation may fail)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But tx.height also may be not None and contain value of '-1' (at least in KMD) if the tx is in a forked branch (so tx validation may fail)

But we use confirmations not height

if tx.confirmations > 0 {
let current_block = try_s!(coin.as_ref().rpc_client.get_block_count().compat().await);
let confirmed_at = current_block + 1 - tx.confirmations as u64;

I think it fails due to the get_block_count delay in having the current height of the blockchain, as the watcher can use a different electrum other than the taker. This is why the retries fix this.

taker_fee_hash: watcher_ctx.data.taker_fee_hash.clone(),
sender_pubkey: watcher_ctx.verified_pub.clone(),
min_block_number: watcher_ctx.data.taker_coin_start_block,
fee_addr: DEX_FEE_ADDR_RAW_PUBKEY.clone(),
lock_duration: watcher_ctx.data.lock_duration,
})
.compat();

if let Err(err) = validated_f.await {
return Self::change_state(Stopped::from_reason(StopReason::Error(
WatcherError::InvalidTakerFee(format!("{:?}", err)).into(),
)));
};

Self::change_state(ValidateTakerPayment {})
let validation_result = retry_on_err!(async {
watcher_ctx
.taker_coin
.watcher_validate_taker_fee(WatcherValidateTakerFeeInput {
taker_fee_hash: watcher_ctx.data.taker_fee_hash.clone(),
sender_pubkey: watcher_ctx.verified_pub.clone(),
min_block_number: watcher_ctx.data.taker_coin_start_block,
fee_addr: DEX_FEE_ADDR_RAW_PUBKEY.clone(),
lock_duration: watcher_ctx.data.lock_duration,
})
.compat()
.await
})
.repeat_every_secs(TAKER_FEE_VALIDATION_RETRY_DELAY_SECS)
.attempts(TAKER_FEE_VALIDATION_ATTEMPTS)
.inspect_err(|e| error!("Error validating taker fee: {}", e))
.await;

match validation_result {
Ok(_) => Self::change_state(ValidateTakerPayment {}),
Err(repeat_err) => Self::change_state(Stopped::from_reason(StopReason::Error(
WatcherError::InvalidTakerFee(repeat_err.to_string()).into(),
))),
}
}
}

Expand Down
Loading