Skip to content

Commit

Permalink
added an optional observer node in the net framework
Browse files Browse the repository at this point in the history
  • Loading branch information
vkomenda committed Nov 14, 2018
1 parent 767944c commit d52e386
Showing 1 changed file with 58 additions and 12 deletions.
70 changes: 58 additions & 12 deletions tests/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{cmp, collections, env, fmt, fs, io, ops, process, time};

use rand;
use rand::{Rand, Rng};
use threshold_crypto as crypto;
use threshold_crypto::{self as crypto, SecretKeyShare};

use hbbft::dynamic_honey_badger::Batch;
use hbbft::util::SubRng;
Expand Down Expand Up @@ -302,7 +302,7 @@ pub struct NetBuilder<D, I>
where
D: DistAlgorithm,
{
/// Iterator used to create node ids.
/// Iterator used to create validator node ids.
node_ids: I,
/// Number of faulty nodes in the network.
num_faulty: usize,
Expand All @@ -318,6 +318,9 @@ where
message_limit: Option<usize>,
/// Optional time limit.
time_limit: Option<time::Duration>,
/// Optional observer node ID. It must not be a member of `node_ids` for the latter are
/// validator node IDs.
observer_id: Option<D::NodeId>,
/// Random number generator used to generate keys.
rng: Option<Box<dyn Rng>>,
}
Expand All @@ -336,6 +339,7 @@ where
.field("crank_limit", &self.crank_limit)
.field("message_limit", &self.message_limit)
.field("time_limit", &self.time_limit)
.field("observer_id", &self.observer_id)
.field("rng", &"<RNG>")
.finish()
}
Expand Down Expand Up @@ -368,6 +372,7 @@ where
message_limit: None,
time_limit: DEFAULT_TIME_LIMIT,
rng: None,
observer_id: None,
}
}

Expand Down Expand Up @@ -456,6 +461,15 @@ where
self
}

/// Override the default observer setting.
///
/// Adds an additional, observer node to the network.
#[inline]
pub fn observer_id(mut self, observer_id: D::NodeId) -> Self {
self.observer_id = Some(observer_id);
self
}

/// Constructor function (with step).
///
/// The constructor function is used to construct each node in the network. Any step returned
Expand Down Expand Up @@ -513,9 +527,13 @@ where

// Note: Closure is not redundant, won't compile without it.
#[cfg_attr(feature = "cargo-clippy", allow(redundant_closure))]
let mut net = VirtualNet::new(self.node_ids, self.num_faulty as usize, rng, move |node| {
cons(node)
})?;
let mut net = VirtualNet::new(
self.node_ids,
self.num_faulty as usize,
rng,
self.observer_id,
move |node| cons(node),
)?;

if self.adversary.is_some() {
net.adversary = self.adversary;
Expand Down Expand Up @@ -569,6 +587,8 @@ where
time_limit: Option<time::Duration>,
/// The instant the network was created.
start_time: time::Instant,
/// Optional observer node ID.
observer_id: Option<D::NodeId>,
}

impl<D> fmt::Debug for VirtualNet<D>
Expand Down Expand Up @@ -700,6 +720,11 @@ where
msgs.sort_by(f);
self.messages.extend(msgs);
}

/// Returns a reference to the observer node ID if it is defined and `None` otherwise.
pub fn observer_id(&self) -> Option<&D::NodeId> {
self.observer_id.as_ref()
}
}

impl<D> VirtualNet<D>
Expand All @@ -726,6 +751,7 @@ where
node_ids: I,
faulty: usize,
mut rng: R,
observer_id: Option<D::NodeId>,
cons: F,
) -> Result<Self, crypto::error::Error>
where
Expand All @@ -734,22 +760,29 @@ where
R: rand::Rng,
{
// Generate a new set of cryptographic keys for threshold cryptography.
let net_infos = NetworkInfo::generate_map(node_ids, &mut rng)?;
let netinfos = NetworkInfo::generate_map(node_ids, &mut rng)?;

assert!(
faulty * 3 < net_infos.len(),
faulty * 3 < netinfos.len(),
"Too many faulty nodes requested, `f` must satisfy `3f < total_nodes`."
);

let observer_netinfo = observer_id.map(|observer_id| {
let node_ni = netinfos.values().next().unwrap();
NetworkInfo::new(
observer_id,
SecretKeyShare::default(),
node_ni.public_key_set().clone(),
rng.gen(),
node_ni.public_key_map().clone(),
)
});
let mut steps = collections::BTreeMap::new();
let mut messages = collections::VecDeque::new();

let mut nodes = net_infos
let mut nodes: collections::BTreeMap<_, _> = netinfos
.into_iter()
.enumerate()
.map(|(idx, (id, netinfo))| {
let is_faulty = idx < faulty;

let (algorithm, step) = cons(NewNodeInfo {
id: id.clone(),
netinfo,
Expand All @@ -759,7 +792,19 @@ where
steps.insert(id.clone(), step);
(id, Node::new(algorithm, is_faulty))
}).collect();

// Add the observer node if required.
let observer_id = observer_netinfo.map(|netinfo| {
let id = netinfo.our_id().clone();
let (algorithm, step) = cons(NewNodeInfo {
id: id.clone(),
netinfo,
faulty: false,
rng: rng.sub_rng(),
});
steps.insert(id.clone(), step);
nodes.insert(id.clone(), Node::new(algorithm, false));
id
});
let mut message_count: usize = 0;
// For every recorded step, apply it.
for (sender, step) in steps {
Expand All @@ -782,6 +827,7 @@ where
message_limit: None,
time_limit: None,
start_time: time::Instant::now(),
observer_id,
})
}

Expand Down

0 comments on commit d52e386

Please sign in to comment.