diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index b0dfdc0d..5cab0b2e 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -398,7 +398,9 @@ where .expect(ASSUME_NETWORK_ACTOR_ALIVE); } } + self.try_to_forward_pending_tlc(state).await; self.try_to_settle_down_tlc(state); + self.try_to_send_remove_tlcs(state).await; Ok(()) } FiberChannelMessage::TxSignatures(tx_signatures) => { @@ -469,72 +471,51 @@ where let flags = flags | AwaitingChannelReadyFlags::THEIR_CHANNEL_READY; state.update_state(ChannelState::AwaitingChannelReady(flags)); state.maybe_channel_is_ready(&self.network).await; - Ok(()) } FiberChannelMessage::AddTlc(add_tlc) => { let tlc_id = add_tlc.tlc_id; let tlc_count = state.tlcs.len(); - match self - .handle_add_tlc_peer_message(state, add_tlc.clone()) - .await - { - Ok((added_tlc_id, peeled_packet_bytes)) => { - if let Some(forward_packet_bytes) = peeled_packet_bytes { - // `handle_forward_onion_packet` will handle the case where forwarding TLC fails - // `remove_tlc` will be sent to the peer and proper error handling will be done - self.handle_forward_onion_packet( - state, - forward_packet_bytes, - added_tlc_id.into(), - ) - .await?; - } - Ok(()) - } - Err(e) => { - // we assume that TLC was not inserted into our state, - // so we can safely send RemoveTlc message to the peer - // note this new add_tlc may be trying to add a duplicate tlc, - // so we use tlc count to make sure no new tlc was added - // and only send RemoveTlc message to peer if the TLC is not in our state - error!("Error handling AddTlc message: {:?}", e); - assert!(tlc_count == state.tlcs.len()); - let error_detail = self.get_tlc_detail_error(state, &e).await; - if state.get_received_tlc(tlc_id).is_none() { - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::SendFiberMessage( - FiberMessageWithPeerId::new( - state.get_remote_peer_id(), - FiberMessage::remove_tlc(RemoveTlc { - channel_id: state.get_id(), - tlc_id, - reason: RemoveTlcReason::RemoveTlcFail( - TlcErrPacket::new(error_detail), - ), - }), - ), - ), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); - } - Err(e) + if let Err(e) = self.handle_add_tlc_peer_message(state, add_tlc).await { + // we assume that TLC was not inserted into our state, + // so we can safely send RemoveTlc message to the peer + // note this new add_tlc may be trying to add a duplicate tlc, + // so we use tlc count to make sure no new tlc was added + // and only send RemoveTlc message to peer if the TLC is not in our state + error!("Error handling AddTlc message: {:?}", e); + assert!(tlc_count == state.tlcs.len()); + let error_detail = self.get_tlc_detail_error(state, &e).await; + if state.get_received_tlc(tlc_id).is_none() { + self.network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId::new( + state.get_remote_peer_id(), + FiberMessage::remove_tlc(RemoveTlc { + channel_id: state.get_id(), + tlc_id, + reason: RemoveTlcReason::RemoveTlcFail(TlcErrPacket::new( + error_detail, + )), + }), + )), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); } + return Err(e); } + Ok(()) } FiberChannelMessage::RemoveTlc(remove_tlc) => { state.check_for_tlc_update(None)?; let channel_id = state.get_id(); - let tlc_details = state.remove_tlc_with_reason( - TLCId::Offered(remove_tlc.tlc_id), - &remove_tlc.reason, - )?; + let remove_reason = remove_tlc.reason.clone(); + let tlc_details = state + .remove_tlc_with_reason(TLCId::Offered(remove_tlc.tlc_id), &remove_reason)?; if let ( Some(ref udt_type_script), RemoveTlcReason::RemoveTlcFulfill(RemoveTlcFulfill { payment_preimage }), - ) = (state.funding_udt_type_script.clone(), &remove_tlc.reason) + ) = (state.funding_udt_type_script.clone(), &remove_reason) { let mut tlc = tlc_details.tlc.clone(); tlc.payment_preimage = Some(*payment_preimage); @@ -546,39 +527,14 @@ where script: udt_type_script.clone(), }); } - if let Some((previous_channel_id, previous_tlc)) = tlc_details.tlc.previous_tlc { - assert!(previous_tlc.is_received()); - info!( - "begin to remove tlc from previous channel: {:?}", - &previous_tlc - ); - assert!(previous_channel_id != state.get_id()); - let (send, recv) = oneshot::channel::>(); - let port = RpcReplyPort::from(send); - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ControlFiberChannel(ChannelCommandWithId { - channel_id: previous_channel_id, - command: ChannelCommand::RemoveTlc( - RemoveTlcCommand { - id: previous_tlc.into(), - reason: remove_tlc.reason.clone(), - }, - port, - ), - }), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); - let res = recv.await.expect("remove tlc replied"); - info!("remove tlc from previous channel: {:?}", &res); - } else { + if tlc_details.tlc.previous_tlc.is_none() { // only the original sender of the TLC should send `TlcRemoveReceived` event // because only the original sender cares about the TLC event to settle the payment self.network .send_message(NetworkActorMessage::new_event( NetworkActorEvent::TlcRemoveReceived( tlc_details.tlc.payment_hash, - remove_tlc, + remove_reason, ), )) .expect("myself alive"); @@ -741,6 +697,54 @@ where ) } + async fn try_to_forward_pending_tlc(&self, state: &mut ChannelActorState) { + let tlc_infos = state.get_tlcs_for_forwarding(); + for info in tlc_infos { + assert!(info.tlc.is_received()); + let onion_packet = info.tlc.onion_packet; + let _ = self + .handle_forward_onion_packet(state, onion_packet, info.tlc.id.into()) + .await; + } + } + + async fn try_to_send_remove_tlcs(&self, state: &mut ChannelActorState) { + let tlc_infos = state.get_tlcs_for_sending_remove_tlcs(); + for tlc_info in tlc_infos { + assert!(tlc_info.is_offered()); + let remove_reason = tlc_info.removed_at.expect("expect remove_at").1; + if let Some((previous_channel_id, previous_tlc)) = tlc_info.tlc.previous_tlc { + assert!(previous_tlc.is_received()); + info!( + "begin to remove tlc from previous channel: {:?}", + &previous_tlc + ); + assert!(previous_channel_id != state.get_id()); + let (send, recv) = oneshot::channel::>(); + let port = RpcReplyPort::from(send); + self.network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ControlFiberChannel(ChannelCommandWithId { + channel_id: previous_channel_id, + command: ChannelCommand::RemoveTlc( + RemoveTlcCommand { + id: previous_tlc.into(), + reason: remove_reason.clone(), + }, + port, + ), + }), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); + let res = recv.await.expect("remove tlc replied"); + info!("remove tlc from previous channel: {:?}", &res); + } else { + unreachable!("remove tlc without previous tlc"); + } + state.set_offered_tlc_removed(tlc_info.tlc.id.into()); + } + } + fn try_to_settle_down_tlc(&self, state: &mut ChannelActorState) { let tlcs = state.get_tlcs_for_settle_down(); let mut update_invoice_payment_hash = false; @@ -804,7 +808,7 @@ where &self, state: &mut ChannelActorState, add_tlc: AddTlc, - ) -> Result<(TLCId, Option>), ProcessingChannelError> { + ) -> Result<(), ProcessingChannelError> { state.check_for_tlc_update(Some(add_tlc.amount))?; // check the onion_packet is valid or not, if not, we should return an error. @@ -894,7 +898,8 @@ where } } - let tlc = state.create_inbounding_tlc(add_tlc.clone(), preimage)?; + let tlc = + state.create_inbounding_tlc(add_tlc.clone(), preimage, peeled_packet_bytes.clone())?; state.insert_tlc(tlc.clone())?; if let Some(payment_hash) = update_invoice_payment_hash { self.store @@ -916,7 +921,7 @@ where // The peer may falsely believe that we have already processed this message, // while we have crashed. We need a way to make sure that the peer will resend // this message, and our processing of this message is idempotent. - Ok((tlc.id, peeled_packet_bytes)) + Ok(()) } async fn handle_forward_onion_packet( @@ -960,6 +965,7 @@ where .expect(ASSUME_NETWORK_ACTOR_ALIVE); let _ = recv.await.expect("RemoveTlc command replied"); } + state.set_received_tlc_forwarded(added_tlc_id); Ok(()) } @@ -3302,7 +3308,39 @@ impl ChannelActorState { self.tlcs .values() .filter(|tlc| { - !tlc.is_offered() && tlc.creation_confirmed_at.is_some() && tlc.removed_at.is_none() + tlc.is_received() + && tlc.creation_confirmed_at.is_some() + && tlc.removed_at.is_none() + && tlc.tlc.onion_packet.is_empty() + }) + .cloned() + .collect() + } + + fn get_tlcs_for_forwarding(&self) -> Vec { + self.tlcs + .values() + .filter(|tlc| { + tlc.is_received() + && tlc.creation_confirmed_at.is_some() + && tlc.removed_at.is_none() + && tlc.tlc.previous_tlc.is_none() + && tlc.relay_status == TlcRelayStatus::WaitingForward + && !tlc.tlc.onion_packet.is_empty() + }) + .cloned() + .collect() + } + + fn get_tlcs_for_sending_remove_tlcs(&self) -> Vec { + self.tlcs + .values() + .filter(|tlc| { + tlc.is_offered() + && tlc.creation_confirmed_at.is_some() + && tlc.removed_at.is_some() + && tlc.tlc.previous_tlc.is_some() + && tlc.relay_status == TlcRelayStatus::WaitingRemove }) .cloned() .collect() @@ -3505,6 +3543,18 @@ impl ChannelActorState { self.tlcs.get(&TLCId::Received(tlc_id)) } + pub fn set_received_tlc_forwarded(&mut self, tlc_id: u64) { + if let Some(tlc) = self.tlcs.get_mut(&TLCId::Received(tlc_id)) { + tlc.relay_status = TlcRelayStatus::WaitingRemove; + } + } + + pub fn set_offered_tlc_removed(&mut self, tlc_id: u64) { + if let Some(tlc) = self.tlcs.get_mut(&TLCId::Offered(tlc_id)) { + tlc.relay_status = TlcRelayStatus::Removed; + } + } + pub fn insert_tlc(&mut self, tlc: TLC) -> Result { let payment_hash = tlc.payment_hash; if let Some(tlc) = self @@ -3572,12 +3622,27 @@ impl ChannelActorState { self.to_local_amount, self.to_remote_amount ); + + let relay_status = if !tlc.onion_packet.is_empty() { + if tlc.is_received() { + TlcRelayStatus::WaitingForward + } else { + if tlc.previous_tlc.is_none() { + TlcRelayStatus::NoForward + } else { + TlcRelayStatus::WaitingRemove + } + } + } else { + TlcRelayStatus::NoForward + }; let detailed_tlc = DetailedTLCInfo { tlc: tlc.clone(), created_at: self.get_current_commitment_numbers(), creation_confirmed_at: None, removed_at: None, removal_confirmed_at: None, + relay_status, }; self.tlcs.insert(tlc.id, detailed_tlc.clone()); if tlc.is_offered() { @@ -4093,6 +4158,7 @@ impl ChannelActorState { &self, message: AddTlc, payment_preimage: Option, + onion_packet: Option>, ) -> Result { if self.get_received_tlc(message.tlc_id).is_some() { return Err(ProcessingChannelError::InvalidParameter(format!( @@ -4114,7 +4180,7 @@ impl ChannelActorState { expiry: message.expiry, payment_preimage, hash_algorithm: message.hash_algorithm, - onion_packet: message.onion_packet, + onion_packet: onion_packet.unwrap_or_default(), previous_tlc: None, }) } @@ -5713,8 +5779,16 @@ pub struct TLC { /// Which hash algorithm is applied on the preimage pub hash_algorithm: HashAlgorithm, /// The onion packet which encodes the routing information for the payment. + /// Note: this is the onion_packet need to be forwarded to the next hop when current TLC is a middle hop. pub onion_packet: Vec, /// The previous tlc id if this tlc is a part of a multi-tlc payment. + /// Note: this is used to track the tlc chain for a multi-tlc payment, + /// we need to know previous when removing tlc backwardly. + /// + /// Node A ---------> Node B ------------> Node C ----------> Node D + /// tlc_1 <---> (tlc_1) (tlc_2) <---> (tlc_2) (tlc_3) <----> tlc_3 + /// ^^^^ ^^^^ + /// pub previous_tlc: Option<(Hash256, TLCId)>, } @@ -5756,6 +5830,14 @@ impl TLC { } } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +enum TlcRelayStatus { + NoForward, + WaitingForward, + WaitingRemove, + Removed, +} + /// A tlc output in a commitment transaction, including both the tlc output /// and the commitment_number that it first appeared (will appear) in the /// commitment transaction. @@ -5781,6 +5863,8 @@ pub struct DetailedTLCInfo { // The initial commitment number of the party (the offerer) that // has confirmed the removal of this tlc. removal_confirmed_at: Option, + // indicates the status of the tlc relaying. + relay_status: TlcRelayStatus, } impl DetailedTLCInfo { @@ -5788,6 +5872,10 @@ impl DetailedTLCInfo { self.tlc.is_offered() } + fn is_received(&self) -> bool { + self.tlc.is_received() + } + fn get_commitment_numbers(&self, local: bool) -> CommitmentNumbers { let am_i_sending_the_tlc = { if self.is_offered() { diff --git a/src/fiber/network.rs b/src/fiber/network.rs index bfca0401..89da9b6b 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -64,8 +64,8 @@ use super::types::{ FiberQueryInformation, GetBroadcastMessages, GetBroadcastMessagesResult, Hash256, NodeAnnouncement, NodeAnnouncementQuery, OpenChannel, PaymentHopData, Privkey, Pubkey, QueryBroadcastMessagesWithinTimeRange, QueryBroadcastMessagesWithinTimeRangeResult, - QueryChannelsWithinBlockRange, QueryChannelsWithinBlockRangeResult, RemoveTlc, RemoveTlcReason, - TlcErr, TlcErrData, TlcErrPacket, TlcErrorCode, + QueryChannelsWithinBlockRange, QueryChannelsWithinBlockRangeResult, RemoveTlcReason, TlcErr, + TlcErrData, TlcErrPacket, TlcErrorCode, }; use super::{FiberConfig, ASSUME_NETWORK_ACTOR_ALIVE}; @@ -564,7 +564,7 @@ pub enum NetworkActorEvent { GraphSyncerExited(PeerId, GraphSyncerExitStatus), // A tlc remove message is received. (payment_hash, remove_tlc) - TlcRemoveReceived(Hash256, RemoveTlc), + TlcRemoveReceived(Hash256, RemoveTlcReason), /// Network service events to be sent to outside observers. /// These events may be both present at `NetworkActorEvent` and @@ -1255,9 +1255,9 @@ where } state.maybe_finish_sync(); } - NetworkActorEvent::TlcRemoveReceived(payment_hash, remove_tlc) => { + NetworkActorEvent::TlcRemoveReceived(payment_hash, remove_tlc_reason) => { // When a node is restarted, RemoveTLC will also be resent if necessary - self.on_remove_tlc_event(state, payment_hash, remove_tlc.reason) + self.on_remove_tlc_event(state, payment_hash, remove_tlc_reason) .await; } } diff --git a/src/fiber/tests/channel.rs b/src/fiber/tests/channel.rs index 42877af6..a8b92f74 100644 --- a/src/fiber/tests/channel.rs +++ b/src/fiber/tests/channel.rs @@ -334,7 +334,7 @@ async fn test_network_send_payment_normal_keysend_workflow() { assert_eq!(res.status, PaymentSessionStatus::Inflight); let payment_hash = res.payment_hash; - tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; let message = |rpc_reply| -> NetworkActorMessage { NetworkActorMessage::Command(NetworkActorCommand::GetPayment(payment_hash, rpc_reply)) @@ -342,6 +342,8 @@ async fn test_network_send_payment_normal_keysend_workflow() { let res = call!(node_a.network_actor, message) .expect("node_a alive") .unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; assert_eq!(res.status, PaymentSessionStatus::Success); assert_eq!(res.failed_error, None); diff --git a/src/fiber/tests/graph.rs b/src/fiber/tests/graph.rs index c8a45ab5..bf4d4131 100644 --- a/src/fiber/tests/graph.rs +++ b/src/fiber/tests/graph.rs @@ -1026,7 +1026,6 @@ fn test_graph_payment_pay_self_with_one_node() { let payment_data = payment_data.unwrap(); let route = network.graph.build_route(payment_data); - eprintln!("final result {:?}", route); assert!(route.is_ok()); let route = route.unwrap(); assert_eq!(route[1].next_hop, Some(node0.into()));