Skip to content

Commit

Permalink
fix fail node issues
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Nov 25, 2024
1 parent eb95591 commit 43532e1
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 22 deletions.
3 changes: 1 addition & 2 deletions src/fiber/graph.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use super::channel::ChannelActorStateStore;
use super::history::{Direction, InternalResult, PaymentHistory, TimedResult};
use super::network::{get_chain_hash, SendPaymentData, SendPaymentResponse};
use super::path::NodeHeap;
Expand Down Expand Up @@ -184,7 +183,7 @@ pub struct PathEdge {

impl<S> NetworkGraph<S>
where
S: ChannelActorStateStore + NetworkGraphStateStore + Clone + Send + Sync + 'static,
S: NetworkGraphStateStore + Clone + Send + Sync + 'static,
{
pub fn new(store: S, source: Pubkey) -> Self {
let mut network_graph = Self {
Expand Down
84 changes: 64 additions & 20 deletions src/fiber/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
// we only use direct channel probability now.

use super::{
channel::ChannelActorStateStore,
graph::{NetworkGraphStateStore, SessionRouteNode},
types::{Pubkey, TlcErr},
};
use crate::{fiber::types::TlcErrorCode, now_timestamp_as_millis_u64};
use ckb_types::packed::OutPoint;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use tracing::{debug, error};

#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -48,16 +47,10 @@ pub(crate) struct InternalPairResult {
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub(crate) struct InternalResult {
pub pairs: HashMap<(OutPoint, Direction), InternalPairResult>,
pub nodes_to_channel_map: HashMap<Pubkey, HashSet<OutPoint>>,
pub fail_node: Option<Pubkey>,
}

fn current_time() -> u128 {
std::time::UNIX_EPOCH
.elapsed()
.expect("unix epoch")
.as_millis()
}

pub(crate) fn output_direction(node1: Pubkey, node2: Pubkey) -> (Direction, Direction) {
if node1 < node2 {
(Direction::Forward, Direction::Backward)
Expand All @@ -76,13 +69,24 @@ impl InternalResult {
amount: u128,
success: bool,
) {
let pair = InternalPairResult {
success,
time,
amount,
};
let (direction, _) = output_direction(node_1, node_2);
self.pairs.insert((channel, direction), pair);
self.add_node_channel_map(node_1, channel.clone());
self.add_node_channel_map(node_2, channel.clone());
self.pairs.insert(
(channel, direction),
InternalPairResult {
success,
time,
amount,
},
);
}

fn add_node_channel_map(&mut self, node: Pubkey, channel: OutPoint) {
self.nodes_to_channel_map
.entry(node)
.or_default()
.insert(channel);
}

pub fn add_fail_pair(&mut self, from: Pubkey, target: Pubkey, channel: OutPoint) {
Expand Down Expand Up @@ -294,7 +298,7 @@ impl InternalResult {
#[derive(Debug, Clone)]
pub(crate) struct PaymentHistory<S> {
pub inner: HashMap<(OutPoint, Direction), TimedResult>,
pub failed_nodes: HashMap<Pubkey, u128>,
pub nodes_to_channel_map: HashMap<Pubkey, HashSet<OutPoint>>,
// The minimum interval between two failed payments in milliseconds
pub min_fail_relax_interval: u64,
pub bimodal_scale_msat: f64,
Expand All @@ -307,13 +311,13 @@ pub(crate) struct PaymentHistory<S> {

impl<S> PaymentHistory<S>
where
S: ChannelActorStateStore + NetworkGraphStateStore + Clone + Send + Sync + 'static,
S: NetworkGraphStateStore + Clone + Send + Sync + 'static,
{
pub(crate) fn new(source: Pubkey, min_fail_relax_interval: Option<u64>, store: S) -> Self {
let mut s = PaymentHistory {
source,
inner: HashMap::new(),
failed_nodes: HashMap::new(),
nodes_to_channel_map: HashMap::new(),
min_fail_relax_interval: min_fail_relax_interval
.unwrap_or(DEFAULT_MIN_FAIL_RELAX_INTERVAL),
bimodal_scale_msat: DEFAULT_BIMODAL_SCALE_SHANNONS,
Expand All @@ -326,6 +330,7 @@ where
#[cfg(test)]
pub(crate) fn reset(&mut self) {
self.inner.clear();
self.nodes_to_channel_map.clear();
}

pub(crate) fn add_result(
Expand All @@ -343,11 +348,22 @@ where
.insert_payment_history_result(channel, direction, result);
}

fn add_node_channel_map(&mut self, node: Pubkey, channel: OutPoint) {
self.nodes_to_channel_map
.entry(node)
.or_default()
.insert(channel);
}

pub(crate) fn load_from_store(&mut self) {
let results = self.store.get_payment_history_result();
for (channel, direction, result) in results.into_iter() {
self.inner.insert((channel, direction), result);
}
for channel in self.store.get_channels(None).iter() {
self.add_node_channel_map(channel.node1(), channel.out_point());
self.add_node_channel_map(channel.node2(), channel.out_point());
}
}

pub(crate) fn apply_pair_result(
Expand Down Expand Up @@ -395,7 +411,11 @@ where
}

pub(crate) fn apply_internal_result(&mut self, result: InternalResult) {
let InternalResult { pairs, fail_node } = result;
let InternalResult {
pairs,
fail_node,
nodes_to_channel_map,
} = result;
for ((channel, direction), pair_result) in pairs.into_iter() {
self.apply_pair_result(
channel,
Expand All @@ -405,8 +425,32 @@ where
pair_result.time,
);
}
for (node, channels) in nodes_to_channel_map.into_iter() {
self.nodes_to_channel_map
.entry(node)
.or_default()
.extend(channels);
}
if let Some(fail_node) = fail_node {
self.failed_nodes.insert(fail_node, current_time());
let channels = self
.nodes_to_channel_map
.get(&fail_node)
.expect("channels not found");
let pairs: Vec<(OutPoint, Direction)> = self
.inner
.iter()
.flat_map(|((outpoint, direction), _)| {
if channels.contains(outpoint) {
Some((outpoint.clone(), *direction))
} else {
None
}
})
.collect();

for (channel, direction) in pairs.into_iter() {
self.apply_pair_result(channel, direction, 0, false, now_timestamp_as_millis_u64());
}
}
}

Expand Down
143 changes: 143 additions & 0 deletions src/fiber/tests/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,149 @@ fn test_history_apply_internal_result_fail_node() {
));
}

#[test]
fn test_history_fail_node_with_multiple_channels() {
let mut internal_result = InternalResult::default();
let mut history = PaymentHistory::new(generate_pubkey().into(), None, MemoryStore::default());
let node1 = generate_pubkey();
let node2 = generate_pubkey();
let node3 = generate_pubkey();
let channel_outpoint1 = gen_rand_outpoint();
let channel_outpoint2 = gen_rand_outpoint();
let channel_outpoint3 = gen_rand_outpoint();
let channel_outpoint4 = gen_rand_outpoint();

let route1 = vec![
SessionRouteNode {
pubkey: node1,
amount: 10,
channel_outpoint: channel_outpoint1.clone(),
},
SessionRouteNode {
pubkey: node2,
amount: 5,
channel_outpoint: channel_outpoint2.clone(),
},
SessionRouteNode {
pubkey: node3,
amount: 3,
channel_outpoint: OutPoint::default(),
},
];

let route2 = vec![
SessionRouteNode {
pubkey: node1,
amount: 10,
channel_outpoint: channel_outpoint3.clone(),
},
SessionRouteNode {
pubkey: node2,
amount: 5,
channel_outpoint: channel_outpoint4.clone(),
},
SessionRouteNode {
pubkey: node3,
amount: 3,
channel_outpoint: OutPoint::default(),
},
];

let (direction1, rev_direction1) = output_direction(node1, node2);
let (direction2, rev_direction2) = output_direction(node2, node3);

internal_result.succeed_range_pairs(&route1, 0, 2);
history.apply_internal_result(internal_result.clone());

assert!(matches!(
history.get_result(&channel_outpoint1, direction1),
Some(&TimedResult {
fail_amount: 0,
fail_time: 0,
success_amount: 10,
..
})
));

assert!(matches!(
history.get_result(&channel_outpoint2, direction2),
Some(&TimedResult {
fail_amount: 0,
fail_time: 0,
success_amount: 5,
..
})
));

internal_result.fail_node(&route2, 1);
assert_eq!(internal_result.pairs.len(), 6);
history.apply_internal_result(internal_result);

assert!(matches!(
history.get_result(&channel_outpoint1, direction1),
Some(&TimedResult {
fail_amount: 0,
success_amount: 0,
..
})
));

assert!(matches!(
history.get_result(&channel_outpoint2, direction2),
Some(&TimedResult {
fail_amount: 0,
success_amount: 0,
..
})
));

assert!(matches!(
history.get_result(&channel_outpoint1, rev_direction1),
None,
));

assert!(matches!(
history.get_result(&channel_outpoint2, rev_direction2),
None,
));

assert!(matches!(
history.get_result(&channel_outpoint3, direction1),
Some(&TimedResult {
fail_amount: 0,
success_amount: 0,
..
})
));

assert!(matches!(
history.get_result(&channel_outpoint4, direction2),
Some(&TimedResult {
fail_amount: 0,
success_amount: 0,
..
})
));

assert!(matches!(
history.get_result(&channel_outpoint3, rev_direction1),
Some(&TimedResult {
fail_amount: 0,
success_amount: 0,
..
})
));

assert!(matches!(
history.get_result(&channel_outpoint4, rev_direction2),
Some(&TimedResult {
fail_amount: 0,
success_amount: 0,
..
})
));
}

#[test]
fn test_history_interal_success_fail() {
let mut history = PaymentHistory::new(generate_pubkey().into(), None, MemoryStore::default());
Expand Down

0 comments on commit 43532e1

Please sign in to comment.