diff --git a/.gitignore b/.gitignore index acc79fb4..7d39da08 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ tests/nodes/.ports /coverage-report /*.info +.vscode diff --git a/src/cch/actor.rs b/src/cch/actor.rs index 01bc386b..97843d6a 100644 --- a/src/cch/actor.rs +++ b/src/cch/actor.rs @@ -22,7 +22,7 @@ use crate::fiber::hash_algorithm::HashAlgorithm; use crate::fiber::types::{Hash256, RemoveTlcFulfill, RemoveTlcReason}; use crate::fiber::{NetworkActorCommand, NetworkActorMessage}; use crate::invoice::Currency; -use crate::now_timestamp; +use crate::now_timestamp_as_millis_u64; use super::error::CchDbError; use super::{CchConfig, CchError, CchOrderStatus, CchOrdersDb, ReceiveBTCOrder, SendBTCOrder}; @@ -592,7 +592,8 @@ impl CchActor { payment_hash: Some( Hash256::from_str(&order.payment_hash).expect("parse Hash256"), ), - expiry: now_timestamp() + self.config.ckb_final_tlc_expiry_delta, + expiry: now_timestamp_as_millis_u64() + + self.config.ckb_final_tlc_expiry_delta, hash_algorithm: HashAlgorithm::Sha256, onion_packet: vec![], previous_tlc: None, diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index c589a766..f2e321d0 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -10,11 +10,15 @@ use crate::{ types::{ChannelUpdate, TlcErr, TlcErrPacket, TlcErrorCode}, }, invoice::{CkbInvoice, CkbInvoiceStatus, InvoiceStore}, + now_timestamp_as_millis_u64, }; use ckb_hash::{blake2b_256, new_blake2b}; use ckb_sdk::{Since, SinceType}; use ckb_types::{ - core::{EpochNumberWithFraction, FeeRate, TransactionBuilder, TransactionView}, + core::{ + Capacity, CapacityError, EpochNumberWithFraction, FeeRate, TransactionBuilder, + TransactionView, + }, packed::{Bytes, CellInput, CellOutput, OutPoint, Script, Transaction}, prelude::{AsTransactionBuilder, IntoTransactionView, Pack, Unpack}, }; @@ -52,7 +56,6 @@ use crate::{ FundingRequest, }, fiber::{ - config::{DEFAULT_UDT_MINIMAL_CKB_AMOUNT, MIN_OCCUPIED_CAPACITY}, fee::{calculate_commitment_tx_fee, shutdown_tx_size}, network::{emit_service_event, sign_network_message}, types::{AnnouncementSignatures, Shutdown}, @@ -61,8 +64,9 @@ use crate::{ }; use super::{ - config::{DEFAULT_CHANNEL_MINIMAL_CKB_AMOUNT, MIN_UDT_OCCUPIED_CAPACITY}, - fee::{calculate_shutdown_tx_fee, default_minimal_ckb_amount}, + config::DEFAULT_MIN_SHUTDOWN_FEE, + config::{MAX_PAYMENT_TLC_EXPIRY_LIMIT, MIN_TLC_EXPIRY_DELTA}, + fee::calculate_shutdown_tx_fee, hash_algorithm::HashAlgorithm, key::blake2b_hash_with_salt, network::FiberMessageWithPeerId, @@ -396,7 +400,9 @@ where .expect(ASSUME_NETWORK_ACTOR_ALIVE); } } + self.try_to_forward_pending_tlc(state).await; self.try_to_settle_down_tlc(state); + self.try_to_send_remove_tlcs(state).await; Ok(()) } FiberChannelMessage::TxSignatures(tx_signatures) => { @@ -467,72 +473,52 @@ where let flags = flags | AwaitingChannelReadyFlags::THEIR_CHANNEL_READY; state.update_state(ChannelState::AwaitingChannelReady(flags)); state.maybe_channel_is_ready(&self.network).await; - Ok(()) } FiberChannelMessage::AddTlc(add_tlc) => { let tlc_id = add_tlc.tlc_id; let tlc_count = state.tlcs.len(); - match self - .handle_add_tlc_peer_message(state, add_tlc.clone()) - .await - { - Ok((added_tlc_id, peeled_packet_bytes)) => { - if let Some(forward_packet_bytes) = peeled_packet_bytes { - // `handle_forward_onion_packet` will handle the case where forwarding TLC fails - // `remove_tlc` will be sent to the peer and proper error handling will be done - self.handle_forward_onion_packet( - state, - forward_packet_bytes, - added_tlc_id.into(), - ) - .await?; - } - Ok(()) - } - Err(e) => { - // we assume that TLC was not inserted into our state, - // so we can safely send RemoveTlc message to the peer - // note this new add_tlc may be trying to add a duplicate tlc, - // so we use tlc count to make sure no new tlc was added - // and only send RemoveTlc message to peer if the TLC is not in our state - error!("Error handling AddTlc message: {:?}", e); - assert!(tlc_count == state.tlcs.len()); - let error_detail = self.get_tlc_detail_error(state, &e).await; - if state.get_received_tlc(tlc_id).is_none() { - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::SendFiberMessage( - FiberMessageWithPeerId::new( - state.get_remote_peer_id(), - FiberMessage::remove_tlc(RemoveTlc { - channel_id: state.get_id(), - tlc_id, - reason: RemoveTlcReason::RemoveTlcFail( - TlcErrPacket::new(error_detail), - ), - }), - ), - ), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); - } - Err(e) + if let Err(e) = self.handle_add_tlc_peer_message(state, add_tlc).await { + // we assume that TLC was not inserted into our state, + // so we can safely send RemoveTlc message to the peer + // note this new add_tlc may be trying to add a duplicate tlc, + // so we use tlc count to make sure no new tlc was added + // and only send RemoveTlc message to peer if the TLC is not in our state + error!("Error handling AddTlc message: {:?}", e); + assert!(tlc_count == state.tlcs.len()); + let error_detail = self.get_tlc_detail_error(state, &e).await; + if state.get_received_tlc(tlc_id).is_none() { + eprintln!("Sending RemoveTlc message to peer due to error: {:?}", e); + self.network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId::new( + state.get_remote_peer_id(), + FiberMessage::remove_tlc(RemoveTlc { + channel_id: state.get_id(), + tlc_id, + reason: RemoveTlcReason::RemoveTlcFail(TlcErrPacket::new( + error_detail, + )), + }), + )), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); } + return Err(e); } + Ok(()) } FiberChannelMessage::RemoveTlc(remove_tlc) => { state.check_for_tlc_update(None)?; let channel_id = state.get_id(); - let tlc_details = state.remove_tlc_with_reason( - TLCId::Offered(remove_tlc.tlc_id), - &remove_tlc.reason, - )?; + let remove_reason = remove_tlc.reason.clone(); + let tlc_details = state + .remove_tlc_with_reason(TLCId::Offered(remove_tlc.tlc_id), &remove_reason)?; if let ( Some(ref udt_type_script), RemoveTlcReason::RemoveTlcFulfill(RemoveTlcFulfill { payment_preimage }), - ) = (state.funding_udt_type_script.clone(), &remove_tlc.reason) + ) = (state.funding_udt_type_script.clone(), &remove_reason) { let mut tlc = tlc_details.tlc.clone(); tlc.payment_preimage = Some(*payment_preimage); @@ -544,39 +530,14 @@ where script: udt_type_script.clone(), }); } - if let Some((previous_channel_id, previous_tlc)) = tlc_details.tlc.previous_tlc { - assert!(previous_tlc.is_received()); - info!( - "begin to remove tlc from previous channel: {:?}", - &previous_tlc - ); - assert!(previous_channel_id != state.get_id()); - let (send, recv) = oneshot::channel::>(); - let port = RpcReplyPort::from(send); - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ControlFiberChannel(ChannelCommandWithId { - channel_id: previous_channel_id, - command: ChannelCommand::RemoveTlc( - RemoveTlcCommand { - id: previous_tlc.into(), - reason: remove_tlc.reason.clone(), - }, - port, - ), - }), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); - let res = recv.await.expect("remove tlc replied"); - info!("remove tlc from previous channel: {:?}", &res); - } else { + if tlc_details.tlc.previous_tlc.is_none() { // only the original sender of the TLC should send `TlcRemoveReceived` event // because only the original sender cares about the TLC event to settle the payment self.network .send_message(NetworkActorMessage::new_event( NetworkActorEvent::TlcRemoveReceived( tlc_details.tlc.payment_hash, - remove_tlc, + remove_reason, ), )) .expect("myself alive"); @@ -691,6 +652,8 @@ where let error_code = match error { ProcessingChannelError::PeelingOnionPacketError(_) => TlcErrorCode::InvalidOnionPayload, ProcessingChannelError::TlcForwardFeeIsTooLow => TlcErrorCode::FeeInsufficient, + ProcessingChannelError::TlcExpirySoon => TlcErrorCode::ExpiryTooSoon, + ProcessingChannelError::TlcExpiryTooFar => TlcErrorCode::ExpiryTooFar, ProcessingChannelError::FinalInvoiceInvalid(status) => match status { CkbInvoiceStatus::Expired => TlcErrorCode::InvoiceExpired, CkbInvoiceStatus::Cancelled => TlcErrorCode::InvoiceCancelled, @@ -701,7 +664,7 @@ where TlcErrorCode::IncorrectOrUnknownPaymentDetails } ProcessingChannelError::FinalIncorrectHTLCAmount => { - TlcErrorCode::FinalIncorrectHtlcAmount + TlcErrorCode::FinalIncorrectTlcAmount } ProcessingChannelError::TlcAmountIsTooLow => TlcErrorCode::AmountBelowMinimum, ProcessingChannelError::TlcNumberExceedLimit @@ -739,6 +702,55 @@ where ) } + async fn try_to_forward_pending_tlc(&self, state: &mut ChannelActorState) { + let tlc_infos = state.get_tlcs_for_forwarding(); + for info in tlc_infos { + assert!(info.tlc.is_received()); + let onion_packet = info.tlc.onion_packet; + let _ = self + .handle_forward_onion_packet(state, onion_packet, info.tlc.id.into()) + .await; + } + } + + async fn try_to_send_remove_tlcs(&self, state: &mut ChannelActorState) { + let tlc_infos = state.get_tlcs_for_sending_remove_tlcs(); + eprintln!("try to send remove tlcs: {:?}", tlc_infos); + for tlc_info in tlc_infos { + assert!(tlc_info.is_offered()); + let remove_reason = tlc_info.removed_at.expect("expect remove_at").1; + if let Some((previous_channel_id, previous_tlc)) = tlc_info.tlc.previous_tlc { + assert!(previous_tlc.is_received()); + info!( + "begin to remove tlc from previous channel: {:?}", + &previous_tlc + ); + assert!(previous_channel_id != state.get_id()); + let (send, recv) = oneshot::channel::>(); + let port = RpcReplyPort::from(send); + self.network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ControlFiberChannel(ChannelCommandWithId { + channel_id: previous_channel_id, + command: ChannelCommand::RemoveTlc( + RemoveTlcCommand { + id: previous_tlc.into(), + reason: remove_reason.clone(), + }, + port, + ), + }), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); + let res = recv.await.expect("remove tlc replied"); + info!("remove tlc from previous channel: {:?}", &res); + } else { + unreachable!("remove tlc without previous tlc"); + } + state.set_offered_tlc_removed(tlc_info.tlc.id.into()); + } + } + fn try_to_settle_down_tlc(&self, state: &mut ChannelActorState) { let tlcs = state.get_tlcs_for_settle_down(); let mut update_invoice_payment_hash = false; @@ -802,8 +814,9 @@ where &self, state: &mut ChannelActorState, add_tlc: AddTlc, - ) -> Result<(TLCId, Option>), ProcessingChannelError> { + ) -> Result<(), ProcessingChannelError> { state.check_for_tlc_update(Some(add_tlc.amount))?; + state.check_tlc_expiry(add_tlc.expiry)?; // check the onion_packet is valid or not, if not, we should return an error. // If there is a next hop, we should send the AddTlc message to the next hop. @@ -892,7 +905,8 @@ where } } - let tlc = state.create_inbounding_tlc(add_tlc.clone(), preimage)?; + let tlc = + state.create_inbounding_tlc(add_tlc.clone(), preimage, peeled_packet_bytes.clone())?; state.insert_tlc(tlc.clone())?; if let Some(payment_hash) = update_invoice_payment_hash { self.store @@ -914,7 +928,7 @@ where // The peer may falsely believe that we have already processed this message, // while we have crashed. We need a way to make sure that the peer will resend // this message, and our processing of this message is idempotent. - Ok((tlc.id, peeled_packet_bytes)) + Ok(()) } async fn handle_forward_onion_packet( @@ -958,6 +972,7 @@ where .expect(ASSUME_NETWORK_ACTOR_ALIVE); let _ = recv.await.expect("RemoveTlc command replied"); } + state.set_received_tlc_forwarded(added_tlc_id); Ok(()) } @@ -1085,6 +1100,7 @@ where ) -> Result { debug!("handle add tlc command : {:?}", &command); state.check_for_tlc_update(Some(command.amount))?; + state.check_tlc_expiry(command.expiry)?; let tlc = state.create_outbounding_tlc(command); state.insert_tlc(tlc.clone())?; @@ -1231,6 +1247,12 @@ where } if let Some(delta) = tlc_expiry_delta { + if delta < MIN_TLC_EXPIRY_DELTA { + return Err(ProcessingChannelError::InvalidParameter(format!( + "TLC expiry delta is too small, expect larger than {}", + MIN_TLC_EXPIRY_DELTA + ))); + } updated |= state.update_our_tlc_expiry_delta(delta); } @@ -1493,27 +1515,6 @@ where Ok(()) } - fn get_funding_and_reserved_amount( - &self, - funding_amount: u128, - udt_type_script: &Option