From f9765c470f74eb04b09ebbc6bb1d6bafdd61bc00 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 13 Nov 2024 01:58:37 +0000 Subject: [PATCH 1/9] Always require a `PeerState` for the CP when claiming an HTLC Now that we track the latest `ChannelMonitorUpdate::update_id` for each closed channel in `PeerState::closed_channel_monitor_update_ids`, we should always have a `PeerState` entry for the channel counterparty any time we go to claim an HTLC on a channel, even if its closed. Here we make this a hard assertion as we'll need to access that `PeerState` in the coming commits to track in-flight updates against closed channels. --- lightning/src/ln/channelmanager.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index b312e0055ee..e2cd122642c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -7089,10 +7089,14 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.channel_id; + const MISSING_MON_ERROR: &'static str = + "If we're going to claim an HTLC against a channel, we should always have *some* state for the channel, even if just the latest ChannelMonitor update_id. This failure indicates we need to claim an HTLC from a channel for which we did not have a ChannelMonitor at startup and didn't create one while running."; + let peer_state_opt = prev_hop.counterparty_node_id.as_ref().map( |counterparty_node_id| per_peer_state.get(counterparty_node_id) .map(|peer_mutex| peer_mutex.lock().unwrap()) - ).unwrap_or(None); + .expect(MISSING_MON_ERROR) + ); if peer_state_opt.is_some() { let mut peer_state_lock = peer_state_opt.unwrap(); From 78fb13d8f437970a5a4f43c69bfd3cc98ae5dd6b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 18 Nov 2024 18:30:32 +0000 Subject: [PATCH 2/9] Set closed chan mon upd `update_id`s at creation not application In c99d3d785dd78f594837d283636b82222c3b9ef1 we added a new `apply_post_close_monitor_update` method which takes a `ChannelMonitorUpdate` (possibly) for a channel which has been closed, sets the `update_id` to the right value to keep our updates well-ordered, and then applies it. Setting the `update_id` at application time here is fine - updates don't really have an order after the channel has been closed, they can be applied in any order - and was done for practical reasons as calculating the right `update_id` at generation time takes a bit more work on startup, and was impossible without new assumptions during claim. In the previous commit we added exactly the new assumption we need at claiming (as it's required for the next few commits anyway), so now the only thing stopping us is the extra complexity. In the coming commits, we'll move to tracking post-close `ChannelMonitorUpdate`s as in-flight like any other updates, which requires having an `update_id` at generation-time so that we know what updates are still in-flight. Thus, we go ahead and eat the complexity here, creating `update_id`s when the `ChannelMonitorUpdate`s are generated for closed-channel updates, like we do for channels which are still live. We also ensure that we always insert `ChannelMonitorUpdate`s in the pending updates set when we push the background event, avoiding a race where we push an update as a background event, then while its processing another update finishes and the post-update actions get run. --- lightning/src/ln/channelmanager.rs | 359 +++++++++++++++-------------- 1 file changed, 191 insertions(+), 168 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index e2cd122642c..5212a40f0a6 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1338,11 +1338,12 @@ pub(super) struct PeerState where SP::Target: SignerProvider { /// entry here to note that the channel with the key's ID is blocked on a set of actions. actions_blocking_raa_monitor_updates: BTreeMap>, /// The latest [`ChannelMonitor::get_latest_update_id`] value for all closed channels as they - /// exist on-disk/in our [`chain::Watch`]. This *ignores* all pending updates not yet applied - /// in [`ChannelManager::pending_background_events`]. + /// exist on-disk/in our [`chain::Watch`]. /// /// If there are any updates pending in [`Self::in_flight_monitor_updates`] this will contain - /// the highest `update_id` of all the pending in-flight updates. + /// the highest `update_id` of all the pending in-flight updates (note that any pending updates + /// not yet applied sitting in [`ChannelManager::pending_background_events`] will also be + /// considered as they are also in [`Self::in_flight_monitor_updates`]). closed_channel_monitor_update_ids: BTreeMap, /// The peer is currently connected (i.e. we've seen a /// [`ChannelMessageHandler::peer_connected`] and no corresponding @@ -3002,25 +3003,8 @@ macro_rules! locked_close_channel { // channel, we need to store the last update_id of it. However, we don't want to insert // into the map (which prevents the `PeerState` from being cleaned up) for channels that // never even got confirmations (which would open us up to DoS attacks). - let mut update_id = $channel_context.get_latest_monitor_update_id(); + let update_id = $channel_context.get_latest_monitor_update_id(); if $channel_context.get_funding_tx_confirmation_height().is_some() || $channel_context.minimum_depth() == Some(0) || update_id > 1 { - // There may be some pending background events which we have to ignore when setting the - // latest update ID. - for event in $self.pending_background_events.lock().unwrap().iter() { - match event { - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, channel_id, update, .. } => { - if *channel_id == $channel_context.channel_id() && *counterparty_node_id == $channel_context.get_counterparty_node_id() { - update_id = cmp::min(update_id, update.update_id - 1); - } - }, - BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(..) => { - // This is only generated for very old channels which were already closed - // on startup, so it should never be present for a channel that is closing - // here. - }, - BackgroundEvent::MonitorUpdatesComplete { .. } => {}, - } - } let chan_id = $channel_context.channel_id(); $peer_state.closed_channel_monitor_update_ids.insert(chan_id, update_id); } @@ -3939,33 +3923,9 @@ where self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script) } - /// Ensures any saved latest ID in [`PeerState::closed_channel_monitor_update_ids`] is updated, - /// then applies the provided [`ChannelMonitorUpdate`]. - #[must_use] - fn apply_post_close_monitor_update( - &self, counterparty_node_id: PublicKey, channel_id: ChannelId, funding_txo: OutPoint, - mut monitor_update: ChannelMonitorUpdate, - ) -> ChannelMonitorUpdateStatus { - // Note that there may be some post-close updates which need to be well-ordered with - // respect to the `update_id`, so we hold the `closed_channel_monitor_update_ids` lock - // here (and also make sure the `monitor_update` we're applying has the right id. - let per_peer_state = self.per_peer_state.read().unwrap(); - let mut peer_state_lock = per_peer_state.get(&counterparty_node_id) - .expect("We must always have a peer entry for a peer with which we have channels that have ChannelMonitors") - .lock().unwrap(); - let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(channel_id) { - hash_map::Entry::Occupied(mut chan_phase) => { - if let ChannelPhase::Funded(chan) = chan_phase.get_mut() { - let completed = handle_new_monitor_update!(self, funding_txo, - monitor_update, peer_state_lock, peer_state, per_peer_state, chan); - return if completed { ChannelMonitorUpdateStatus::Completed } else { ChannelMonitorUpdateStatus::InProgress }; - } else { - debug_assert!(false, "We shouldn't have an update for a non-funded channel"); - } - }, - hash_map::Entry::Vacant(_) => {}, - } + fn set_closed_chan_next_monitor_update_id( + peer_state: &mut PeerState, channel_id: ChannelId, monitor_update: &mut ChannelMonitorUpdate, + ) { match peer_state.closed_channel_monitor_update_ids.entry(channel_id) { btree_map::Entry::Vacant(entry) => { let is_closing_unupdated_monitor = monitor_update.update_id == 1 @@ -3994,6 +3954,33 @@ where monitor_update.update_id = *latest_update_id; } } + } + + /// Applies a [`ChannelMonitorUpdate`] which may or may not be for a channel which is closed. + #[must_use] + fn apply_post_close_monitor_update( + &self, counterparty_node_id: PublicKey, channel_id: ChannelId, funding_txo: OutPoint, + monitor_update: ChannelMonitorUpdate, + ) -> ChannelMonitorUpdateStatus { + // Note that there may be some post-close updates which need to be well-ordered with + // respect to the `update_id`, so we hold the `peer_state` lock here. + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut peer_state_lock = per_peer_state.get(&counterparty_node_id) + .expect("We must always have a peer entry for a peer with which we have channels that have ChannelMonitors") + .lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(channel_id) { + hash_map::Entry::Occupied(mut chan_phase) => { + if let ChannelPhase::Funded(chan) = chan_phase.get_mut() { + let completed = handle_new_monitor_update!(self, funding_txo, + monitor_update, peer_state_lock, peer_state, per_peer_state, chan); + return if completed { ChannelMonitorUpdateStatus::Completed } else { ChannelMonitorUpdateStatus::InProgress }; + } else { + debug_assert!(false, "We shouldn't have an update for a non-funded channel"); + } + }, + hash_map::Entry::Vacant(_) => {}, + } self.chain_monitor.update_channel(funding_txo, &monitor_update) } @@ -7085,133 +7072,121 @@ where debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread); debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread); - { - let per_peer_state = self.per_peer_state.read().unwrap(); - let chan_id = prev_hop.channel_id; - - const MISSING_MON_ERROR: &'static str = - "If we're going to claim an HTLC against a channel, we should always have *some* state for the channel, even if just the latest ChannelMonitor update_id. This failure indicates we need to claim an HTLC from a channel for which we did not have a ChannelMonitor at startup and didn't create one while running."; + let per_peer_state = self.per_peer_state.read().unwrap(); + let chan_id = prev_hop.channel_id; - let peer_state_opt = prev_hop.counterparty_node_id.as_ref().map( - |counterparty_node_id| per_peer_state.get(counterparty_node_id) - .map(|peer_mutex| peer_mutex.lock().unwrap()) - .expect(MISSING_MON_ERROR) - ); + const MISSING_MON_ERROR: &'static str = + "If we're going to claim an HTLC against a channel, we should always have *some* state for the channel, even if just the latest ChannelMonitor update_id. This failure indicates we need to claim an HTLC from a channel for which we did not have a ChannelMonitor at startup and didn't create one while running."; - if peer_state_opt.is_some() { - let mut peer_state_lock = peer_state_opt.unwrap(); - let peer_state = &mut *peer_state_lock; - if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(chan_id) { - if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { - let counterparty_node_id = chan.context.get_counterparty_node_id(); - let logger = WithChannelContext::from(&self.logger, &chan.context, None); - let fulfill_res = - chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, payment_info, &&logger); - - match fulfill_res { - UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => { - let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false); - if let Some(action) = action_opt { - log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}", - chan_id, action); - peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); - } - if let Some(raa_blocker) = raa_blocker_opt { - peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker); - } - if !during_init { - handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_lock, - peer_state, per_peer_state, chan); - } else { - // If we're running during init we cannot update a monitor directly - - // they probably haven't actually been loaded yet. Instead, push the - // monitor update as a background event. - self.pending_background_events.lock().unwrap().push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, - funding_txo: prev_hop.funding_txo, - channel_id: prev_hop.channel_id, - update: monitor_update.clone(), - }); - } - } - UpdateFulfillCommitFetch::DuplicateClaim {} => { - let (action_opt, raa_blocker_opt) = completion_action(None, true); - if let Some(raa_blocker) = raa_blocker_opt { - // If we're making a claim during startup, its a replay of a - // payment claim from a `ChannelMonitor`. In some cases (MPP or - // if the HTLC was only recently removed) we make such claims - // after an HTLC has been removed from a channel entirely, and - // thus the RAA blocker has long since completed. - // - // In any other case, the RAA blocker must still be present and - // blocking RAAs. - debug_assert!(during_init || - peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker)); - } - let action = if let Some(action) = action_opt { - action - } else { - return; - }; + // Note here that `peer_state_opt` is always `Some` if `prev_hop.counterparty_node_id` is + // `Some`. This is relied on in the closed-channel case below. + let mut peer_state_opt = prev_hop.counterparty_node_id.as_ref().map( + |counterparty_node_id| per_peer_state.get(counterparty_node_id) + .map(|peer_mutex| peer_mutex.lock().unwrap()) + .expect(MISSING_MON_ERROR) + ); - mem::drop(peer_state_lock); + if let Some(peer_state_lock) = peer_state_opt.as_mut() { + let peer_state = &mut **peer_state_lock; + if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(chan_id) { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let counterparty_node_id = chan.context.get_counterparty_node_id(); + let logger = WithChannelContext::from(&self.logger, &chan.context, None); + let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, payment_info, &&logger); - log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}", + match fulfill_res { + UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => { + let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false); + if let Some(action) = action_opt { + log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}", chan_id, action); - if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately { - downstream_counterparty_node_id: node_id, - downstream_funding_outpoint: _, - blocking_action: blocker, downstream_channel_id: channel_id, - } = action { - if let Some(peer_state_mtx) = per_peer_state.get(&node_id) { - let mut peer_state = peer_state_mtx.lock().unwrap(); - if let Some(blockers) = peer_state - .actions_blocking_raa_monitor_updates - .get_mut(&channel_id) - { - let mut found_blocker = false; - blockers.retain(|iter| { - // Note that we could actually be blocked, in - // which case we need to only remove the one - // blocker which was added duplicatively. - let first_blocker = !found_blocker; - if *iter == blocker { found_blocker = true; } - *iter != blocker || !first_blocker - }); - debug_assert!(found_blocker); - } - } else { - debug_assert!(false); + peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); + } + if let Some(raa_blocker) = raa_blocker_opt { + peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker); + } + if !during_init { + handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_opt, + peer_state, per_peer_state, chan); + } else { + // If we're running during init we cannot update a monitor directly - + // they probably haven't actually been loaded yet. Instead, push the + // monitor update as a background event. + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, + funding_txo: prev_hop.funding_txo, + channel_id: prev_hop.channel_id, + update: monitor_update.clone(), + }); + } + } + UpdateFulfillCommitFetch::DuplicateClaim {} => { + let (action_opt, raa_blocker_opt) = completion_action(None, true); + if let Some(raa_blocker) = raa_blocker_opt { + // If we're making a claim during startup, its a replay of a + // payment claim from a `ChannelMonitor`. In some cases (MPP or + // if the HTLC was only recently removed) we make such claims + // after an HTLC has been removed from a channel entirely, and + // thus the RAA blocker has long since completed. + // + // In any other case, the RAA blocker must still be present and + // blocking RAAs. + debug_assert!(during_init || + peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker)); + } + let action = if let Some(action) = action_opt { + action + } else { + return; + }; + + mem::drop(peer_state_opt); + + log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}", + chan_id, action); + if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately { + downstream_counterparty_node_id: node_id, + downstream_funding_outpoint: _, + blocking_action: blocker, downstream_channel_id: channel_id, + } = action { + if let Some(peer_state_mtx) = per_peer_state.get(&node_id) { + let mut peer_state = peer_state_mtx.lock().unwrap(); + if let Some(blockers) = peer_state + .actions_blocking_raa_monitor_updates + .get_mut(&channel_id) + { + let mut found_blocker = false; + blockers.retain(|iter| { + // Note that we could actually be blocked, in + // which case we need to only remove the one + // blocker which was added duplicatively. + let first_blocker = !found_blocker; + if *iter == blocker { found_blocker = true; } + *iter != blocker || !first_blocker + }); + debug_assert!(found_blocker); } - } else if matches!(action, MonitorUpdateCompletionAction::PaymentClaimed { .. }) { - debug_assert!(during_init, - "Duplicate claims should always either be for forwarded payments(freeing another channel immediately) or during init (for claim replay)"); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions([action]); } else { - debug_assert!(false, - "Duplicate claims should always either be for forwarded payments(freeing another channel immediately) or during init (for claim replay)"); - return; - }; - } + debug_assert!(false); + } + } else if matches!(action, MonitorUpdateCompletionAction::PaymentClaimed { .. }) { + debug_assert!(during_init, + "Duplicate claims should always either be for forwarded payments(freeing another channel immediately) or during init (for claim replay)"); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions([action]); + } else { + debug_assert!(false, + "Duplicate claims should always either be for forwarded payments(freeing another channel immediately) or during init (for claim replay)"); + return; + }; } } - return; } + return; } } - let preimage_update = ChannelMonitorUpdate { - update_id: 0, // apply_post_close_monitor_update will set the right value - counterparty_node_id: prev_hop.counterparty_node_id, - updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { - payment_preimage, - payment_info, - }], - channel_id: Some(prev_hop.channel_id), - }; - if prev_hop.counterparty_node_id.is_none() { let payment_hash: PaymentHash = payment_preimage.into(); panic!( @@ -7221,6 +7196,27 @@ where ); } let counterparty_node_id = prev_hop.counterparty_node_id.expect("Checked immediately above"); + let mut peer_state = peer_state_opt.expect("peer_state_opt is always Some when the counterparty_node_id is Some"); + + let mut preimage_update = ChannelMonitorUpdate { + update_id: 0, // set in set_closed_chan_next_monitor_update_id + counterparty_node_id: prev_hop.counterparty_node_id, + updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage, + payment_info, + }], + channel_id: Some(prev_hop.channel_id), + }; + + // Note that the below is race-y - we set the `update_id` here and then drop the peer_state + // lock before applying the update in `apply_post_close_monitor_update` (or via the + // background events pipeline). During that time, some other update could be created and + // then applied, resultin in `ChannelMonitorUpdate`s being applied out of order and causing + // a panic. + Self::set_closed_chan_next_monitor_update_id(&mut *peer_state, prev_hop.channel_id, &mut preimage_update); + + mem::drop(peer_state); + mem::drop(per_peer_state); if !during_init { // We update the ChannelMonitor on the backward link, after @@ -13241,8 +13237,8 @@ where // Our channel information is out of sync with the `ChannelMonitor`, so // force the update to use the `ChannelMonitor`'s update_id for the close // update. - let latest_update_id = monitor.get_latest_update_id(); - update.update_id = latest_update_id.saturating_add(1); + let latest_update_id = monitor.get_latest_update_id().saturating_add(1); + update.update_id = latest_update_id; per_peer_state.entry(counterparty_node_id) .or_insert_with(|| Mutex::new(empty_peer_state())) .lock().unwrap() @@ -13326,6 +13322,7 @@ where for (funding_txo, monitor) in args.channel_monitors.iter() { if !funding_txo_set.contains(funding_txo) { + let mut should_queue_fc_update = false; if let Some(counterparty_node_id) = monitor.get_counterparty_node_id() { // If the ChannelMonitor had any updates, we may need to update it further and // thus track it in `closed_channel_monitor_update_ids`. If the channel never @@ -13334,17 +13331,21 @@ where // Note that a `ChannelMonitor` is created with `update_id` 0 and after we // provide it with a closure update its `update_id` will be at 1. if !monitor.offchain_closed() || monitor.get_latest_update_id() > 1 { + should_queue_fc_update = !monitor.offchain_closed(); + let mut latest_update_id = monitor.get_latest_update_id(); + if should_queue_fc_update { + latest_update_id += 1; + } per_peer_state.entry(counterparty_node_id) .or_insert_with(|| Mutex::new(empty_peer_state())) .lock().unwrap() .closed_channel_monitor_update_ids.entry(monitor.channel_id()) - .and_modify(|v| *v = cmp::max(monitor.get_latest_update_id(), *v)) - .or_insert(monitor.get_latest_update_id()); + .and_modify(|v| *v = cmp::max(latest_update_id, *v)) + .or_insert(latest_update_id); } } - if monitor.offchain_closed() { - // We already appled a ChannelForceClosed update. + if !should_queue_fc_update { continue; } @@ -13564,6 +13565,10 @@ where counterparty_node_id: $counterparty_node_id, channel_id: $monitor.channel_id(), }); + } else { + $peer_state.closed_channel_monitor_update_ids.entry($monitor.channel_id()) + .and_modify(|v| *v = cmp::max(max_in_flight_update_id, *v)) + .or_insert(max_in_flight_update_id); } if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() { log_error!($logger, "Duplicate in-flight monitor update set for the same channel!"); @@ -13651,6 +13656,7 @@ where } = &mut new_event { debug_assert_eq!(update.updates.len(), 1); debug_assert!(matches!(update.updates[0], ChannelMonitorUpdateStep::ChannelForceClosed { .. })); + let mut updated_id = false; for pending_event in pending_background_events.iter() { if let BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id: pending_cp, funding_txo: pending_funding, @@ -13660,15 +13666,32 @@ where && funding_txo == pending_funding && channel_id == pending_chan_id; if for_same_channel { + debug_assert!(update.update_id >= pending_update.update_id); if pending_update.updates.iter().any(|upd| matches!(upd, ChannelMonitorUpdateStep::ChannelForceClosed { .. })) { // If the background event we're looking at is just // force-closing the channel which already has a pending // force-close update, no need to duplicate it. continue 'each_bg_event; } + update.update_id = pending_update.update_id.saturating_add(1); + updated_id = true; } } } + let mut per_peer_state = per_peer_state.get(counterparty_node_id) + .expect("If we have pending updates for a channel it must have an entry") + .lock().unwrap(); + if updated_id { + per_peer_state + .closed_channel_monitor_update_ids.entry(*channel_id) + .and_modify(|v| *v = cmp::max(update.update_id, *v)) + .or_insert(update.update_id); + } + let in_flight_updates = per_peer_state.in_flight_monitor_updates + .entry(*funding_txo) + .or_insert_with(Vec::new); + debug_assert!(!in_flight_updates.iter().any(|upd| upd == update)); + in_flight_updates.push(update.clone()); } pending_background_events.push(new_event); } From 6fca688435b81b61b51deae06a525a1ee74742a7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 13 Dec 2024 16:34:53 +0000 Subject: [PATCH 3/9] f move to reduce diff --- lightning/src/ln/channelmanager.rs | 56 +++++++++++++++--------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 5212a40f0a6..d6e51047aca 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3923,6 +3923,34 @@ where self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script) } + /// Applies a [`ChannelMonitorUpdate`] which may or may not be for a channel which is closed. + #[must_use] + fn apply_post_close_monitor_update( + &self, counterparty_node_id: PublicKey, channel_id: ChannelId, funding_txo: OutPoint, + monitor_update: ChannelMonitorUpdate, + ) -> ChannelMonitorUpdateStatus { + // Note that there may be some post-close updates which need to be well-ordered with + // respect to the `update_id`, so we hold the `peer_state` lock here. + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut peer_state_lock = per_peer_state.get(&counterparty_node_id) + .expect("We must always have a peer entry for a peer with which we have channels that have ChannelMonitors") + .lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(channel_id) { + hash_map::Entry::Occupied(mut chan_phase) => { + if let ChannelPhase::Funded(chan) = chan_phase.get_mut() { + let completed = handle_new_monitor_update!(self, funding_txo, + monitor_update, peer_state_lock, peer_state, per_peer_state, chan); + return if completed { ChannelMonitorUpdateStatus::Completed } else { ChannelMonitorUpdateStatus::InProgress }; + } else { + debug_assert!(false, "We shouldn't have an update for a non-funded channel"); + } + }, + hash_map::Entry::Vacant(_) => {}, + } + self.chain_monitor.update_channel(funding_txo, &monitor_update) + } + fn set_closed_chan_next_monitor_update_id( peer_state: &mut PeerState, channel_id: ChannelId, monitor_update: &mut ChannelMonitorUpdate, ) { @@ -3956,34 +3984,6 @@ where } } - /// Applies a [`ChannelMonitorUpdate`] which may or may not be for a channel which is closed. - #[must_use] - fn apply_post_close_monitor_update( - &self, counterparty_node_id: PublicKey, channel_id: ChannelId, funding_txo: OutPoint, - monitor_update: ChannelMonitorUpdate, - ) -> ChannelMonitorUpdateStatus { - // Note that there may be some post-close updates which need to be well-ordered with - // respect to the `update_id`, so we hold the `peer_state` lock here. - let per_peer_state = self.per_peer_state.read().unwrap(); - let mut peer_state_lock = per_peer_state.get(&counterparty_node_id) - .expect("We must always have a peer entry for a peer with which we have channels that have ChannelMonitors") - .lock().unwrap(); - let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(channel_id) { - hash_map::Entry::Occupied(mut chan_phase) => { - if let ChannelPhase::Funded(chan) = chan_phase.get_mut() { - let completed = handle_new_monitor_update!(self, funding_txo, - monitor_update, peer_state_lock, peer_state, per_peer_state, chan); - return if completed { ChannelMonitorUpdateStatus::Completed } else { ChannelMonitorUpdateStatus::InProgress }; - } else { - debug_assert!(false, "We shouldn't have an update for a non-funded channel"); - } - }, - hash_map::Entry::Vacant(_) => {}, - } - self.chain_monitor.update_channel(funding_txo, &monitor_update) - } - /// When a channel is removed, two things need to happen: /// (a) [`locked_close_channel`] must be called in the same `per_peer_state` lock as /// the channel-closing action, From 5d8acc2e09fc0e4e6b3fabe4f13d90df4307d9c0 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 13 Dec 2024 16:49:13 +0000 Subject: [PATCH 4/9] f DRY update_id building --- lightning/src/ln/channelmanager.rs | 51 ++++++++---------------------- 1 file changed, 14 insertions(+), 37 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d6e51047aca..80e0b32c038 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3951,39 +3951,6 @@ where self.chain_monitor.update_channel(funding_txo, &monitor_update) } - fn set_closed_chan_next_monitor_update_id( - peer_state: &mut PeerState, channel_id: ChannelId, monitor_update: &mut ChannelMonitorUpdate, - ) { - match peer_state.closed_channel_monitor_update_ids.entry(channel_id) { - btree_map::Entry::Vacant(entry) => { - let is_closing_unupdated_monitor = monitor_update.update_id == 1 - && monitor_update.updates.len() == 1 - && matches!(&monitor_update.updates[0], ChannelMonitorUpdateStep::ChannelForceClosed { .. }); - // If the ChannelMonitorUpdate is closing a channel that never got past initial - // funding (to have any commitment updates), we'll skip inserting in - // `locked_close_channel`, allowing us to avoid keeping around the PeerState for - // that peer. In that specific case we expect no entry in the map here. In any - // other cases, this is a bug, but in production we go ahead and recover by - // inserting the update_id and hoping its right. - debug_assert!(is_closing_unupdated_monitor, "Expected closing monitor against an unused channel, got {:?}", monitor_update); - if !is_closing_unupdated_monitor { - entry.insert(monitor_update.update_id); - } - }, - btree_map::Entry::Occupied(entry) => { - // If we're running in a threaded environment its possible we generate updates for - // a channel that is closing, then apply some preimage update, then go back and - // apply the close monitor update here. In order to ensure the updates are still - // well-ordered, we have to use the `closed_channel_monitor_update_ids` map to - // override the `update_id`, taking care to handle old monitors where the - // `latest_update_id` is already `u64::MAX`. - let latest_update_id = entry.into_mut(); - *latest_update_id = latest_update_id.saturating_add(1); - monitor_update.update_id = *latest_update_id; - } - } - } - /// When a channel is removed, two things need to happen: /// (a) [`locked_close_channel`] must be called in the same `per_peer_state` lock as /// the channel-closing action, @@ -7198,8 +7165,19 @@ where let counterparty_node_id = prev_hop.counterparty_node_id.expect("Checked immediately above"); let mut peer_state = peer_state_opt.expect("peer_state_opt is always Some when the counterparty_node_id is Some"); - let mut preimage_update = ChannelMonitorUpdate { - update_id: 0, // set in set_closed_chan_next_monitor_update_id + let update_id = if let Some(latest_update_id) = peer_state.closed_channel_monitor_update_ids.get_mut(&chan_id) { + *latest_update_id = latest_update_id.saturating_add(1); + *latest_update_id + } else { + let err = "We need the latest ChannelMonitorUpdate ID to build a new update. +This should have been checked for availability on startup but somehow it is no longer available. +This indicates a bug inside LDK. Please report this error at https://github.com/lightningdevkit/rust-lightning/issues/new"; + log_error!(self.logger, "{}", err); + panic!("{}", err); + }; + + let preimage_update = ChannelMonitorUpdate { + update_id, counterparty_node_id: prev_hop.counterparty_node_id, updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { payment_preimage, @@ -7208,12 +7186,11 @@ where channel_id: Some(prev_hop.channel_id), }; - // Note that the below is race-y - we set the `update_id` here and then drop the peer_state + // Note that the below is race-y - we set the `update_id` above and then drop the peer_state // lock before applying the update in `apply_post_close_monitor_update` (or via the // background events pipeline). During that time, some other update could be created and // then applied, resultin in `ChannelMonitorUpdate`s being applied out of order and causing // a panic. - Self::set_closed_chan_next_monitor_update_id(&mut *peer_state, prev_hop.channel_id, &mut preimage_update); mem::drop(peer_state); mem::drop(per_peer_state); From 1d9b2f72bed36de52ee59d78cb6f99fd9d6ad81f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 18 Nov 2024 19:46:49 +0000 Subject: [PATCH 5/9] Add an additional variant to `handle_new_monitor_update!` In d1c340a0e1f988e0414aa5425f7c76e515ada6dd we added support in `handle_new_monitor_update!` for handling updates without dropping locks. In the coming commits we'll start handling `ChannelMonitorUpdate`s "like normal" for updates against closed channels. Here we set up the first step by adding a new `POST_CHANNEL_CLOSE` variant on `handle_new_monitor_update!` which attempts to handle the `ChannelMonitorUpdate` and handles completion actions if it finishes immediately, just like the pre-close variant. --- lightning/src/ln/channelmanager.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 80e0b32c038..de9ace822ce 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3339,6 +3339,27 @@ macro_rules! handle_new_monitor_update { let _ = in_flight_updates.remove(idx); }) } }; + ( + $self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, + $per_peer_state_lock: expr, $logger: expr, $channel_id: expr, POST_CHANNEL_CLOSE + ) => { { + let in_flight_updates; + let idx; + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, $logger, + $channel_id, in_flight_updates, idx, _internal_outer, + { + let _ = in_flight_updates.remove(idx); + if in_flight_updates.is_empty() { + let update_actions = $peer_state.monitor_update_blocked_actions + .remove(&$channel_id).unwrap_or(Vec::new()); + + mem::drop($peer_state_lock); + mem::drop($per_peer_state_lock); + + $self.handle_monitor_update_completion_actions(update_actions); + } + }) + } }; ( $self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr From 656856fd0d2a77e6d384e56d7fa6ca3603cd2e77 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 13 Dec 2024 16:49:29 +0000 Subject: [PATCH 6/9] Support async `ChannelMonitorUpdate`s to closed chans at runtime One of the largest gaps in our async persistence functionality has been preimage (claim) updates to closed channels. Here we finally implement support for this (for updates at runtime). Thanks to all the work we've built up over the past many commits, this is a well-contained patch within `claim_mpp_part`, pushing the generated `ChannelMonitorUpdate`s through the same pipeline we use for open channels. Sadly we can't use the `handle_new_monitor_update` macro wholesale as it handles the `Channel` resumption as well which we don't do here. --- lightning/src/ln/chanmon_update_fail_tests.rs | 108 ++++++++++++++++++ lightning/src/ln/channelmanager.rs | 76 +++++------- 2 files changed, 135 insertions(+), 49 deletions(-) diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index dc67b198149..18425882943 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -3713,3 +3713,111 @@ fn test_partial_claim_mon_update_compl_actions() { send_payment(&nodes[2], &[&nodes[3]], 100_000); assert!(!get_monitor!(nodes[3], chan_4_id).get_stored_preimages().contains_key(&payment_hash)); } + + +#[test] +fn test_claim_to_closed_channel_blocks_forwarded_preimage_removal() { + // One of the last features for async persistence we implemented was the correct blocking of + // RAA(s) which remove a preimage from an outbound channel for a forwarded payment until the + // preimage write makes it durably to the closed inbound channel. + // This tests that behavior. + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + // First open channels, route a payment, and force-close the first hop. + let chan_a = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000); + let chan_b = create_announced_chan_between_nodes_with_value(&nodes, 1, 2, 1_000_000, 500_000_000); + + let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1_000_000); + + nodes[0].node.force_close_broadcasting_latest_txn(&chan_a.2, &nodes[1].node.get_our_node_id(), String::new()).unwrap(); + check_added_monitors!(nodes[0], 1); + let a_reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }; + check_closed_event!(nodes[0], 1, a_reason, [nodes[1].node.get_our_node_id()], 1000000); + check_closed_broadcast!(nodes[0], true); + + let as_commit_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + assert_eq!(as_commit_tx.len(), 1); + + mine_transaction(&nodes[1], &as_commit_tx[0]); + check_added_monitors!(nodes[1], 1); + check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed, [nodes[0].node.get_our_node_id()], 1000000); + check_closed_broadcast!(nodes[1], true); + + // Now that B has a pending forwarded payment across it with the inbound edge on-chain, claim + // the payment on C and give B the preimage for it. + nodes[2].node.claim_funds(payment_preimage); + check_added_monitors!(nodes[2], 1); + expect_payment_claimed!(nodes[2], payment_hash, 1_000_000); + + let updates = get_htlc_update_msgs!(nodes[2], nodes[1].node.get_our_node_id()); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + nodes[1].node.handle_update_fulfill_htlc(nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); + check_added_monitors!(nodes[1], 1); + commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false); + + // At this point nodes[1] has the preimage and is waiting for the `ChannelMonitorUpdate` for + // channel A to hit disk. Until it does so, it shouldn't ever let the preimage dissapear from + // channel B's `ChannelMonitor` + assert!(get_monitor!(nodes[1], chan_b.2).get_all_current_outbound_htlcs().iter().any(|(_, (_, preimage))| *preimage == Some(payment_preimage))); + + // Once we complete the `ChannelMonitorUpdate` on channel A, and the `ChannelManager` processes + // background events (via `get_and_clear_pending_msg_events`), the final `ChannelMonitorUpdate` + // will fly and we'll drop the preimage from channel B's `ChannelMonitor`. We'll also release + // the `Event::PaymentForwarded`. + check_added_monitors!(nodes[1], 0); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + + nodes[1].chain_monitor.complete_sole_pending_chan_update(&chan_a.2); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + check_added_monitors!(nodes[1], 1); + assert!(!get_monitor!(nodes[1], chan_b.2).get_all_current_outbound_htlcs().iter().any(|(_, (_, preimage))| *preimage == Some(payment_preimage))); + expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, true, false); +} + +#[test] +fn test_claim_to_closed_channel_blocks_claimed_event() { + // One of the last features for async persistence we implemented was the correct blocking of + // event(s) until the preimage for a claimed HTLC is durably on disk in a ChannelMonitor for a + // closed channel. + // This tests that behavior. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + // First open channels, route a payment, and force-close the first hop. + let chan_a = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000); + + let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); + + nodes[0].node.force_close_broadcasting_latest_txn(&chan_a.2, &nodes[1].node.get_our_node_id(), String::new()).unwrap(); + check_added_monitors!(nodes[0], 1); + let a_reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }; + check_closed_event!(nodes[0], 1, a_reason, [nodes[1].node.get_our_node_id()], 1000000); + check_closed_broadcast!(nodes[0], true); + + let as_commit_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + assert_eq!(as_commit_tx.len(), 1); + + mine_transaction(&nodes[1], &as_commit_tx[0]); + check_added_monitors!(nodes[1], 1); + check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed, [nodes[0].node.get_our_node_id()], 1000000); + check_closed_broadcast!(nodes[1], true); + + // Now that B has a pending payment with the inbound HTLC on a closed channel, claim the + // payment on disk, but don't let the `ChannelMonitorUpdate` complete. This should prevent the + // `Event::PaymentClaimed` from being generated. + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + nodes[1].node.claim_funds(payment_preimage); + check_added_monitors!(nodes[1], 1); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + + // Once we complete the `ChannelMonitorUpdate` the `Event::PaymentClaimed` will become + // available. + nodes[1].chain_monitor.complete_sole_pending_chan_update(&chan_a.2); + expect_payment_claimed!(nodes[1], payment_hash, 1_000_000); +} diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index de9ace822ce..bd0c27ac7f5 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -7199,7 +7199,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ let preimage_update = ChannelMonitorUpdate { update_id, - counterparty_node_id: prev_hop.counterparty_node_id, + counterparty_node_id: Some(counterparty_node_id), updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { payment_preimage, payment_info, @@ -7207,28 +7207,33 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ channel_id: Some(prev_hop.channel_id), }; - // Note that the below is race-y - we set the `update_id` above and then drop the peer_state - // lock before applying the update in `apply_post_close_monitor_update` (or via the - // background events pipeline). During that time, some other update could be created and - // then applied, resultin in `ChannelMonitorUpdate`s being applied out of order and causing - // a panic. + // Note that we do process the completion action here. This totally could be a + // duplicate claim, but we have no way of knowing without interrogating the + // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are + // generally always allowed to be duplicative (and it's specifically noted in + // `PaymentForwarded`). + let (action_opt, raa_blocker_opt) = completion_action(None, false); - mem::drop(peer_state); - mem::drop(per_peer_state); + if let Some(raa_blocker) = raa_blocker_opt { + peer_state.actions_blocking_raa_monitor_updates + .entry(prev_hop.channel_id) + .or_default() + .push(raa_blocker); + } + + // Given the fact that we're in a bit of a weird edge case, its worth hashing the preimage + // to include the `payment_hash` in the log metadata here. + let payment_hash = payment_preimage.into(); + let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(chan_id), Some(payment_hash)); if !during_init { - // We update the ChannelMonitor on the backward link, after - // receiving an `update_fulfill_htlc` from the forward link. - let update_res = self.apply_post_close_monitor_update(counterparty_node_id, prev_hop.channel_id, prev_hop.funding_txo, preimage_update); - if update_res != ChannelMonitorUpdateStatus::Completed { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same event and try - // again on restart. - log_error!(WithContext::from(&self.logger, None, Some(prev_hop.channel_id), None), - "Critical error: failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, update_res); + if let Some(action) = action_opt { + log_trace!(logger, "Tracking monitor update completion action for closed channel {}: {:?}", + chan_id, action); + peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } + + handle_new_monitor_update!(self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state, logger, chan_id, POST_CHANNEL_CLOSE); } else { // If we're running during init we cannot update a monitor directly - they probably // haven't actually been loaded yet. Instead, push the monitor update as a background @@ -7242,39 +7247,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ update: preimage_update, }; self.pending_background_events.lock().unwrap().push(event); - } - // Note that we do process the completion action here. This totally could be a - // duplicate claim, but we have no way of knowing without interrogating the - // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are - // generally always allowed to be duplicative (and it's specifically noted in - // `PaymentForwarded`). - let (action_opt, raa_blocker_opt) = completion_action(None, false); - - if let Some(raa_blocker) = raa_blocker_opt { - // TODO: Avoid always blocking the world for the write lock here. - let mut per_peer_state = self.per_peer_state.write().unwrap(); - let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(|| - Mutex::new(PeerState { - channel_by_id: new_hash_map(), - inbound_channel_request_by_id: new_hash_map(), - latest_features: InitFeatures::empty(), - pending_msg_events: Vec::new(), - in_flight_monitor_updates: BTreeMap::new(), - monitor_update_blocked_actions: BTreeMap::new(), - actions_blocking_raa_monitor_updates: BTreeMap::new(), - closed_channel_monitor_update_ids: BTreeMap::new(), - is_connected: false, - })); - let mut peer_state = peer_state_mutex.lock().unwrap(); + mem::drop(peer_state); + mem::drop(per_peer_state); - peer_state.actions_blocking_raa_monitor_updates - .entry(prev_hop.channel_id) - .or_default() - .push(raa_blocker); + self.handle_monitor_update_completion_actions(action_opt); } - - self.handle_monitor_update_completion_actions(action_opt); } fn finalize_claims(&self, sources: Vec) { From 77b0f0f1a2142fe9c70e74050f7c4bb4fa461d49 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 15 Nov 2024 22:32:47 +0000 Subject: [PATCH 7/9] Don't double-claim MPP payments that are pending on multiple chans On startup, we walk the preimages and payment HTLC sets on all our `ChannelMonitor`s, re-claiming all payments which we recently claimed. This ensures all HTLCs in any claimed payments are claimed across all channels. In doing so, we expect to see the same payment multiple times, after all it may have been received as multiple HTLCs across multiple channels. In such cases, there's no reason to redundantly claim the same set of HTLCs again and again. In the current code, doing so may lead to redundant `PaymentClaimed` events, and in a coming commit will instead cause an assertion failure. --- lightning/src/ln/channelmanager.rs | 11 ++++++++++- lightning/src/ln/reload_tests.rs | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index bd0c27ac7f5..86d1914402e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1185,7 +1185,7 @@ impl From<&MPPClaimHTLCSource> for HTLCClaimSource { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] /// The source of an HTLC which is being claimed as a part of an incoming payment. Each part is /// tracked in [`PendingMPPClaim`] as well as in [`ChannelMonitor`]s, so that it can be converted /// to an [`HTLCClaimSource`] for claim replays on startup. @@ -14159,10 +14159,18 @@ where testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), }; + let mut processed_claims: HashSet> = new_hash_set(); for (_, monitor) in args.channel_monitors.iter() { for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() { if !payment_claims.is_empty() { for payment_claim in payment_claims { + if processed_claims.contains(&payment_claim.mpp_parts) { + // We might get the same payment a few times from different channels + // that the MPP payment was received using. There's no point in trying + // to claim the same payment again and again, so we check if the HTLCs + // are the same and skip the payment here. + continue; + } if payment_claim.mpp_parts.is_empty() { return Err(DecodeError::InvalidValue); } @@ -14217,6 +14225,7 @@ where (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr) ); } + processed_claims.insert(payment_claim.mpp_parts); } } else { let per_peer_state = channel_manager.per_peer_state.read().unwrap(); diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index e1f6116cf6f..28465a09660 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -900,7 +900,7 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) { if persist_both_monitors { if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[2] { } else { panic!(); } if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[3] { } else { panic!(); } - check_added_monitors(&nodes[3], 6); + check_added_monitors(&nodes[3], 4); } else { if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[2] { } else { panic!(); } check_added_monitors(&nodes[3], 3); From 77d58644730122b3853482679cf25247675c58c3 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 11 Dec 2024 16:20:36 +0000 Subject: [PATCH 8/9] Support async `ChannelMonitorUpdate`s to closed chans at startup One of the largest gaps in our async persistence functionality has been preimage (claim) updates to closed channels. Here we finally implement support for this (for updates which are generated during startup). Thanks to all the work we've built up over the past many commits, this is a fairly straightforward patch, removing the immediate-completion logic from `claim_mpp_part` and adding the required in-flight tracking logic to `apply_post_close_monitor_update`. Like in the during-runtime case in the previous commit, we sadly can't use the `handle_new_monitor_update` macro wholesale as it handles the `Channel` resumption as well which we don't do here. --- lightning/src/ln/chanmon_update_fail_tests.rs | 37 ++++++++------- lightning/src/ln/channelmanager.rs | 45 ++++++++++--------- 2 files changed, 44 insertions(+), 38 deletions(-) diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 18425882943..fcc1f8f5a64 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -3294,10 +3294,6 @@ fn do_test_durable_preimages_on_closed_channel(close_chans_before_reload: bool, if !close_chans_before_reload { check_closed_broadcast(&nodes[1], 1, true); check_closed_event(&nodes[1], 1, ClosureReason::CommitmentTxConfirmed, false, &[nodes[0].node.get_our_node_id()], 100000); - } else { - // While we forwarded the payment a while ago, we don't want to process events too early or - // we'll run background tasks we wanted to test individually. - expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, true, !close_only_a); } mine_transactions(&nodes[0], &[&as_closing_tx[0], bs_preimage_tx]); @@ -3308,24 +3304,33 @@ fn do_test_durable_preimages_on_closed_channel(close_chans_before_reload: bool, // Make sure the B<->C channel is still alive and well by sending a payment over it. let mut reconnect_args = ReconnectArgs::new(&nodes[1], &nodes[2]); reconnect_args.pending_responding_commitment_signed.1 = true; - if !close_chans_before_reload { - // TODO: If the A<->B channel was closed before we reloaded, the `ChannelManager` - // will consider the forwarded payment complete and allow the B<->C - // `ChannelMonitorUpdate` to complete, wiping the payment preimage. This should not - // be allowed, and needs fixing. - reconnect_args.pending_responding_commitment_signed_dup_monitor.1 = true; - } + // The B<->C `ChannelMonitorUpdate` shouldn't be allowed to complete, which is the + // equivalent to the responding `commitment_signed` being a duplicate for node B, thus we + // need to set the `pending_responding_commitment_signed_dup` flag. + reconnect_args.pending_responding_commitment_signed_dup_monitor.1 = true; reconnect_args.pending_raa.1 = true; reconnect_nodes(reconnect_args); + + // Once the blocked `ChannelMonitorUpdate` *finally* completes, the pending + // `PaymentForwarded` event will finally be released. let (outpoint, ab_update_id, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id_ab).unwrap().clone(); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, ab_update_id); - expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), true, false); - if !close_chans_before_reload { - // Once we call `process_pending_events` the final `ChannelMonitor` for the B<->C - // channel will fly, removing the payment preimage from it. - check_added_monitors(&nodes[1], 1); + + // If the A<->B channel was closed before we reload, we'll replay the claim against it on + // reload, causing the `PaymentForwarded` event to get replayed. + let evs = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(evs.len(), if close_chans_before_reload { 2 } else { 1 }); + for ev in evs { + if let Event::PaymentForwarded { .. } = ev { } + else { + panic!(); + } } + + // Once we call `process_pending_events` the final `ChannelMonitor` for the B<->C channel + // will fly, removing the payment preimage from it. + check_added_monitors(&nodes[1], 1); assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); send_payment(&nodes[1], &[&nodes[2]], 100_000); } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 86d1914402e..c46cc1c6940 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3945,11 +3945,10 @@ where } /// Applies a [`ChannelMonitorUpdate`] which may or may not be for a channel which is closed. - #[must_use] fn apply_post_close_monitor_update( &self, counterparty_node_id: PublicKey, channel_id: ChannelId, funding_txo: OutPoint, monitor_update: ChannelMonitorUpdate, - ) -> ChannelMonitorUpdateStatus { + ) { // Note that there may be some post-close updates which need to be well-ordered with // respect to the `update_id`, so we hold the `peer_state` lock here. let per_peer_state = self.per_peer_state.read().unwrap(); @@ -3960,16 +3959,21 @@ where match peer_state.channel_by_id.entry(channel_id) { hash_map::Entry::Occupied(mut chan_phase) => { if let ChannelPhase::Funded(chan) = chan_phase.get_mut() { - let completed = handle_new_monitor_update!(self, funding_txo, + handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan); - return if completed { ChannelMonitorUpdateStatus::Completed } else { ChannelMonitorUpdateStatus::InProgress }; + return; } else { debug_assert!(false, "We shouldn't have an update for a non-funded channel"); } }, hash_map::Entry::Vacant(_) => {}, } - self.chain_monitor.update_channel(funding_txo, &monitor_update) + let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(channel_id), None); + + handle_new_monitor_update!( + self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, + logger, channel_id, POST_CHANNEL_CLOSE + ); } /// When a channel is removed, two things need to happen: @@ -3998,7 +4002,7 @@ where } if let Some((_, funding_txo, _channel_id, monitor_update)) = shutdown_res.monitor_update { debug_assert!(false, "This should have been handled in `locked_close_channel`"); - let _ = self.apply_post_close_monitor_update(shutdown_res.counterparty_node_id, shutdown_res.channel_id, funding_txo, monitor_update); + self.apply_post_close_monitor_update(shutdown_res.counterparty_node_id, shutdown_res.channel_id, funding_txo, monitor_update); } if self.background_events_processed_since_startup.load(Ordering::Acquire) { // If a `ChannelMonitorUpdate` was applied (i.e. any time we have a funding txo and are @@ -6293,9 +6297,7 @@ where let _ = self.chain_monitor.update_channel(funding_txo, &update); }, BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, channel_id, update } => { - // The monitor update will be replayed on startup if it doesnt complete, so no - // use bothering to care about the monitor update completing. - let _ = self.apply_post_close_monitor_update(counterparty_node_id, channel_id, funding_txo, update); + self.apply_post_close_monitor_update(counterparty_node_id, channel_id, funding_txo, update); }, BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -7226,20 +7228,24 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ let payment_hash = payment_preimage.into(); let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(chan_id), Some(payment_hash)); - if !during_init { - if let Some(action) = action_opt { - log_trace!(logger, "Tracking monitor update completion action for closed channel {}: {:?}", - chan_id, action); - peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); - } + if let Some(action) = action_opt { + log_trace!(logger, "Tracking monitor update completion action for closed channel {}: {:?}", + chan_id, action); + peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); + } + if !during_init { handle_new_monitor_update!(self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state, logger, chan_id, POST_CHANNEL_CLOSE); } else { // If we're running during init we cannot update a monitor directly - they probably // haven't actually been loaded yet. Instead, push the monitor update as a background // event. - // TODO: Track this update as pending and only complete the completion action when it - // finishes. + + let in_flight_updates = peer_state.in_flight_monitor_updates + .entry(prev_hop.funding_txo) + .or_insert_with(Vec::new); + in_flight_updates.push(preimage_update.clone()); + let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo: prev_hop.funding_txo, @@ -7247,11 +7253,6 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ update: preimage_update, }; self.pending_background_events.lock().unwrap().push(event); - - mem::drop(peer_state); - mem::drop(per_peer_state); - - self.handle_monitor_update_completion_actions(action_opt); } } From 5d5971c2f482f5b75dc14cdc9740672daf963d76 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 12 Dec 2024 19:48:43 +0000 Subject: [PATCH 9/9] DRY the pre-startup `ChannelMonitorUpdate` handling This moves the common `if during_startup { push background event } else { apply ChannelMonitorUpdate }` pattern by simply inlining it in `handle_new_monitor_update`. --- lightning/src/ln/channelmanager.rs | 116 +++++++++++------------------ 1 file changed, 44 insertions(+), 72 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index c46cc1c6940..d6076cc5b3d 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2973,31 +2973,9 @@ macro_rules! handle_error { /// [`ChannelMonitor`]/channel funding transaction) to begin with. macro_rules! locked_close_channel { ($self: ident, $peer_state: expr, $channel_context: expr, $shutdown_res_mut: expr) => {{ - if let Some((counterparty_node_id, funding_txo, channel_id, update)) = $shutdown_res_mut.monitor_update.take() { - if $self.background_events_processed_since_startup.load(Ordering::Acquire) { - handle_new_monitor_update!($self, funding_txo, update, $peer_state, - $channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER); - } else { - // We want to track the in-flight update both in `in_flight_monitor_updates` and in - // `pending_background_events` to avoid a race condition during - // `pending_background_events` processing where we complete one - // `ChannelMonitorUpdate` (but there are more pending as background events) but we - // conclude that all pending `ChannelMonitorUpdate`s have completed and its safe to - // run post-completion actions. We could work around that with some effort, but its - // simpler to just track updates twice. - let in_flight_updates = $peer_state.in_flight_monitor_updates.entry(funding_txo) - .or_insert_with(Vec::new); - if !in_flight_updates.contains(&update) { - in_flight_updates.push(update.clone()); - } - let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, - funding_txo, - channel_id, - update, - }; - $self.pending_background_events.lock().unwrap().push(event); - } + if let Some((_, funding_txo, _, update)) = $shutdown_res_mut.monitor_update.take() { + handle_new_monitor_update!($self, funding_txo, update, $peer_state, + $channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER); } // If there's a possibility that we need to generate further monitor updates for this // channel, we need to store the last update_id of it. However, we don't want to insert @@ -3309,8 +3287,8 @@ macro_rules! handle_new_monitor_update { }; ( $self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $logger: expr, - $chan_id: expr, $in_flight_updates: ident, $update_idx: ident, _internal_outer, - $completed: expr + $chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident, + _internal_outer, $completed: expr ) => { { $in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) .or_insert_with(Vec::new); @@ -3322,8 +3300,30 @@ macro_rules! handle_new_monitor_update { $in_flight_updates.push($update); $in_flight_updates.len() - 1 }); - let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]); - handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed) + if $self.background_events_processed_since_startup.load(Ordering::Acquire) { + let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]); + handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed) + } else { + // We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we + // fail to persist it. This is a fairly safe assumption, however, since anything we do + // during the startup sequence should be replayed exactly if we immediately crash. + let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: $counterparty_node_id, + funding_txo: $funding_txo, + channel_id: $chan_id, + update: $in_flight_updates[$update_idx].clone(), + }; + // We want to track the in-flight update both in `in_flight_monitor_updates` and in + // `pending_background_events` to avoid a race condition during + // `pending_background_events` processing where we complete one + // `ChannelMonitorUpdate` (but there are more pending as background events) but we + // conclude that all pending `ChannelMonitorUpdate`s have completed and its safe to + // run post-completion actions. + // We could work around that with some effort, but its simpler to just track updates + // twice. + $self.pending_background_events.lock().unwrap().push(event); + false + } } }; ( $self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr, @@ -3331,22 +3331,24 @@ macro_rules! handle_new_monitor_update { ) => { { let logger = WithChannelContext::from(&$self.logger, &$chan_context, None); let chan_id = $chan_context.channel_id(); + let counterparty_node_id = $chan_context.get_counterparty_node_id(); let in_flight_updates; let idx; handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id, - in_flight_updates, idx, _internal_outer, + counterparty_node_id, in_flight_updates, idx, _internal_outer, { let _ = in_flight_updates.remove(idx); }) } }; ( $self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, - $per_peer_state_lock: expr, $logger: expr, $channel_id: expr, POST_CHANNEL_CLOSE + $per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr, POST_CHANNEL_CLOSE ) => { { + let logger = WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None); let in_flight_updates; let idx; - handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, $logger, - $channel_id, in_flight_updates, idx, _internal_outer, + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, + $channel_id, $counterparty_node_id, in_flight_updates, idx, _internal_outer, { let _ = in_flight_updates.remove(idx); if in_flight_updates.is_empty() { @@ -3366,10 +3368,11 @@ macro_rules! handle_new_monitor_update { ) => { { let logger = WithChannelContext::from(&$self.logger, &$chan.context, None); let chan_id = $chan.context.channel_id(); + let counterparty_node_id = $chan.context.get_counterparty_node_id(); let in_flight_updates; let idx; handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id, - in_flight_updates, idx, _internal_outer, + counterparty_node_id, in_flight_updates, idx, _internal_outer, { let _ = in_flight_updates.remove(idx); if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { @@ -3968,11 +3971,10 @@ where }, hash_map::Entry::Vacant(_) => {}, } - let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(channel_id), None); handle_new_monitor_update!( self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, - logger, channel_id, POST_CHANNEL_CLOSE + counterparty_node_id, channel_id, POST_CHANNEL_CLOSE ); } @@ -7080,7 +7082,6 @@ where let peer_state = &mut **peer_state_lock; if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(chan_id) { if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { - let counterparty_node_id = chan.context.get_counterparty_node_id(); let logger = WithChannelContext::from(&self.logger, &chan.context, None); let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, payment_info, &&logger); @@ -7095,21 +7096,8 @@ where if let Some(raa_blocker) = raa_blocker_opt { peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker); } - if !during_init { - handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_opt, - peer_state, per_peer_state, chan); - } else { - // If we're running during init we cannot update a monitor directly - - // they probably haven't actually been loaded yet. Instead, push the - // monitor update as a background event. - self.pending_background_events.lock().unwrap().push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, - funding_txo: prev_hop.funding_txo, - channel_id: prev_hop.channel_id, - update: monitor_update.clone(), - }); - } + handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_opt, + peer_state, per_peer_state, chan); } UpdateFulfillCommitFetch::DuplicateClaim {} => { let (action_opt, raa_blocker_opt) = completion_action(None, true); @@ -7234,26 +7222,10 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } - if !during_init { - handle_new_monitor_update!(self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state, logger, chan_id, POST_CHANNEL_CLOSE); - } else { - // If we're running during init we cannot update a monitor directly - they probably - // haven't actually been loaded yet. Instead, push the monitor update as a background - // event. - - let in_flight_updates = peer_state.in_flight_monitor_updates - .entry(prev_hop.funding_txo) - .or_insert_with(Vec::new); - in_flight_updates.push(preimage_update.clone()); - - let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, - funding_txo: prev_hop.funding_txo, - channel_id: prev_hop.channel_id, - update: preimage_update, - }; - self.pending_background_events.lock().unwrap().push(event); - } + handle_new_monitor_update!( + self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state, + counterparty_node_id, chan_id, POST_CHANNEL_CLOSE + ); } fn finalize_claims(&self, sources: Vec) {