Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into v0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
gavofyork committed Jul 29, 2018
2 parents dede5ab + a118a85 commit f9013b2
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 215 deletions.
295 changes: 135 additions & 160 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ const ERROR_MSG: &'static str = "Failed to generate metadata files";

fn main() {
vergen(OutputFns::all()).expect(ERROR_MSG);
println!("cargo:rerun-if-changed=.git/HEAD");
}
2 changes: 1 addition & 1 deletion polkadot/service/res/krummelanke.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"bootNodes": [
"/ip4/104.211.54.233/tcp/30333/p2p/QmRMGcQh69t8a8YwzHkofVo9SFr7ffggUwhAYjVSTChmrd",
"/ip4/104.211.48.51/tcp/30333/p2p/QmWCnXrhM1in1qPqVT3rDXQEJHedAzbPDMimdjqy2P9fGn",
"/ip4/104.211.48.247/tcp/30333/p2p/QmYPx99i3H8EKXrvYHTBwqz3jjFC1kBfkvmSKd2h9zwQFr",
"/ip4/104.211.48.247/tcp/30333/p2p/QmY33GW69TnTsdQWjAkxJR1GrWTdeV1PmzzcSmUay4HvAB",
"/ip4/40.114.120.164/tcp/30333/p2p/QmWzYU5X1NpFrprD1YZF5Lcj9aE5WF4QEg5FpvQx5XGWG7",
"/ip4/40.117.153.33/tcp/30333/p2p/QmSz8qCADMmi92QB8dTqMPu56JYQQKZBAHz7y8KXjvqcvW"
],
Expand Down
6 changes: 3 additions & 3 deletions substrate/network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "fad12c89ea2b6f1f6420557db6e9305fb03f9f67", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
libp2p = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ethkey = { git = "https://github.com/paritytech/parity.git" }
ethereum-types = "0.3"
Expand All @@ -20,10 +20,10 @@ parking_lot = "0.5"
libc = "0.2"
log = "0.3"
rand = "0.5.0"
tokio-core = "0.1"
tokio = "0.1"
tokio-io = "0.1"
tokio-timer = "0.2"
varint = { git = "https://github.com/libp2p/rust-libp2p" }
varint = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2" }

[dev-dependencies]
assert_matches = "1.2"
Expand Down
3 changes: 1 addition & 2 deletions substrate/network-libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

extern crate parking_lot;
extern crate fnv;
#[macro_use]
extern crate futures;
extern crate tokio_core;
extern crate tokio;
extern crate tokio_io;
extern crate tokio_timer;
extern crate ethkey;
Expand Down
8 changes: 2 additions & 6 deletions substrate/network-libp2p/src/network_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,10 +658,6 @@ impl NetworkState {
peer_info.id,
peer_info.kad_connec.is_alive(),
peer_info.protocols.iter().filter(|c| c.1.is_alive()).count());
// TODO: we manually clear the connections as a work-around for
// networking bugs ; normally it should automatically drop
for c in peer_info.protocols.iter() { c.1.clear(); }
peer_info.kad_connec.clear();
let old = connections.peer_by_nodeid.remove(&peer_info.id);
debug_assert_eq!(old, Some(who));
}
Expand Down Expand Up @@ -852,11 +848,11 @@ fn parse_and_add_to_node_store(
NodeStore::Memory(ref node_store) =>
node_store
.peer_or_create(&who)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
.set_addr_ttl(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
NodeStore::Json(ref node_store) =>
node_store
.peer_or_create(&who)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
.set_addr_ttl(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
}

Ok(who)
Expand Down
50 changes: 28 additions & 22 deletions substrate/network-libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::thread;
use std::time::{Duration, Instant};
use futures::{future, Future, Stream, IntoFuture};
use futures::sync::{mpsc, oneshot};
use tokio_core::reactor::{Core, Handle};
use tokio::runtime::current_thread;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Interval, Deadline};

Expand Down Expand Up @@ -118,7 +118,7 @@ impl NetworkService {
local_peer_id: local_peer_id.clone(),
kbuckets_timeout: Duration::from_secs(600),
request_timeout: Duration::from_secs(10),
known_initial_peers: network_state.known_peers().collect(),
known_initial_peers: network_state.known_peers(),
});

let shared = Arc::new(Shared {
Expand Down Expand Up @@ -191,16 +191,16 @@ impl NetworkService {

let shared = self.shared.clone();
let join_handle = thread::spawn(move || {
// Tokio core that is going to run everything in this thread.
let mut core = match Core::new() {
// Tokio runtime that is going to run everything in this thread.
let mut runtime = match current_thread::Runtime::new() {
Ok(c) => c,
Err(err) => {
let _ = init_tx.send(Err(err.into()));
return
}
};

let fut = match init_thread(core.handle(), shared,
let fut = match init_thread(shared,
timeouts_register_rx, close_rx) {
Ok(future) => {
debug!(target: "sub-libp2p", "Successfully started networking service");
Expand All @@ -213,7 +213,7 @@ impl NetworkService {
}
};

match core.run(fut) {
match runtime.block_on(fut) {
Ok(()) => debug!(target: "sub-libp2p", "libp2p future finished"),
Err(err) => error!(target: "sub-libp2p", "error while running libp2p: {:?}", err),
}
Expand Down Expand Up @@ -395,7 +395,6 @@ impl NetworkContext for NetworkContextImpl {
/// - `timeouts_register_rx` should receive newly-registered timeouts.
/// - `close_rx` should be triggered when we want to close the network.
fn init_thread(
core: Handle,
shared: Arc<Shared>,
timeouts_register_rx: mpsc::UnboundedReceiver<
(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))
Expand All @@ -405,7 +404,6 @@ fn init_thread(
// Build the transport layer.
let transport = {
let base = transport::build_transport(
core.clone(),
transport::UnencryptedAllowed::Denied,
shared.network_state.local_private_key().clone()
);
Expand Down Expand Up @@ -535,7 +533,7 @@ fn init_thread(

// Build the timeouts system for the `register_timeout` function.
// (note: this has nothing to do with socket timeouts)
let timeouts = timeouts::build_timeouts_stream(core.clone(), timeouts_register_rx)
let timeouts = timeouts::build_timeouts_stream(timeouts_register_rx)
.for_each({
let shared = shared.clone();
move |(handler, protocol_id, timer_token)| {
Expand Down Expand Up @@ -630,7 +628,7 @@ fn listener_handle<'a, C>(
match shared.network_state.ping_connection(node_id.clone()) {
Ok((_, ping_connec)) => {
trace!(target: "sub-libp2p", "Successfully opened ping substream with {:?}", node_id);
let fut = ping_connec.set_until(pinger, future);
let fut = ping_connec.tie_or_passthrough(pinger, future);
Box::new(fut) as Box<_>
},
Err(err) => Box::new(future::err(err)) as Box<_>
Expand Down Expand Up @@ -687,7 +685,7 @@ fn handle_kademlia_connection(
val
});

Ok(kad_connec.set_until(controller, future))
Ok(kad_connec.tie_or_passthrough(controller, future))
}

/// When a remote performs a `FIND_NODE` Kademlia request for `searched`,
Expand Down Expand Up @@ -823,7 +821,7 @@ fn handle_custom_connection(
});

let val = (custom_proto_out.outgoing, custom_proto_out.protocol_version);
let final_fut = unique_connec.set_until(val, fut)
let final_fut = unique_connec.tie_or_stop(val, fut)
.then(move |val| {
// Makes sure that `dc_guard` is kept alive until here.
drop(dc_guard);
Expand Down Expand Up @@ -950,7 +948,7 @@ fn perform_kademlia_query<T, To, St, C>(
let random_peer_id = random_key.into_peer_id();
trace!(target: "sub-libp2p", "Start kademlia discovery for {:?}", random_peer_id);

shared.clone()
let future = shared.clone()
.kad_system
.find_node(random_peer_id, {
let shared = shared.clone();
Expand All @@ -974,7 +972,10 @@ fn perform_kademlia_query<T, To, St, C>(
)
.into_future()
.map_err(|(err, _)| err)
.map(|_| ())
.map(|_| ());

// Note that we use a `Box` in order to speed up compilation.
Box::new(future) as Box<Future<Item = _, Error = _>>
}

/// Connects to additional nodes, if necessary.
Expand Down Expand Up @@ -1163,8 +1164,7 @@ fn open_peer_custom_proto<T, To, St, C>(
);
}

// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
unique_connec.dial(&swarm_controller, &addr, with_err);
},
Err(err) => {
trace!(target: "sub-libp2p",
Expand Down Expand Up @@ -1200,11 +1200,14 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>,
})
});

shared.network_state
let future = shared.network_state
.kad_connection(who.clone())
.into_future()
.map(move |(_, k)| k.get_or_dial(&swarm_controller, &addr, transport))
.flatten()
.map(move |(_, k)| k.dial(&swarm_controller, &addr, transport))
.flatten();

// Note that we use a Box in order to speed up compilation.
Box::new(future) as Box<Future<Item = _, Error = _>>
}

/// Processes the information about a node.
Expand Down Expand Up @@ -1305,7 +1308,7 @@ fn ping_all<T, St, C>(

let addr = Multiaddr::from(AddrComponent::P2P(who.clone().into_bytes()));
let fut = pinger
.get_or_dial(&swarm_controller, &addr, transport.clone())
.dial(&swarm_controller, &addr, transport.clone())
.and_then(move |mut p| {
trace!(target: "sub-libp2p", "Pinging peer #{} aka. {:?}", peer, who);
p.ping()
Expand Down Expand Up @@ -1334,7 +1337,7 @@ fn ping_all<T, St, C>(
ping_futures.push(fut);
}

future::loop_fn(ping_futures, |ping_futures| {
let future = future::loop_fn(ping_futures, |ping_futures| {
if ping_futures.is_empty() {
let fut = future::ok(future::Loop::Break(()));
return future::Either::A(fut)
Expand All @@ -1344,7 +1347,10 @@ fn ping_all<T, St, C>(
.map(|((), _, rest)| future::Loop::Continue(rest))
.map_err(|(err, _, _)| err);
future::Either::B(fut)
})
});

// Note that we use a Box in order to speed up compilation.
Box::new(future) as Box<Future<Item = _, Error = _>>
}

/// Expects a multiaddr of the format `/p2p/<node_id>` and returns the node ID.
Expand Down
39 changes: 21 additions & 18 deletions substrate/network-libp2p/src/timeouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?

use futures::{Async, future, Future, Poll, stream, Stream, sync::mpsc};
use std::io::Error as IoError;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::marker::PhantomData;
use std::time::{Duration, Instant};
use tokio_core::reactor::{Handle, Timeout};
use tokio_timer::{self, Delay};

/// Builds the timeouts system.
///
Expand All @@ -27,21 +27,18 @@ use tokio_core::reactor::{Handle, Timeout};
/// `T` can be anything you want, as it is transparently passed from the input
/// to the output. Timeouts continue to fire forever, as there is no way to
/// unregister them.
pub fn build_timeouts_stream<T>(
core: Handle,
pub fn build_timeouts_stream<'a, T>(
timeouts_rx: mpsc::UnboundedReceiver<(Duration, T)>
) -> impl Stream<Item = T, Error = IoError>
where T: Clone {
) -> Box<Stream<Item = T, Error = IoError> + 'a>
where T: Clone + 'a {
let next_timeout = next_in_timeouts_stream(timeouts_rx);

// The `unfold` function is essentially a loop turned into a stream. The
// first parameter is the initial state, and the closure returns the new
// state and an item.
stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
let stream = stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
// `timeouts` is a `Vec` of futures that produce an `Out`.

let core = core.clone();

// `select_ok` panics if `timeouts` is empty anyway.
if timeouts.is_empty() {
return None
Expand All @@ -53,8 +50,7 @@ pub fn build_timeouts_stream<T>(
Out::NewTimeout((Some((duration, item)), next_timeouts)) => {
// Received a new timeout request on the channel.
let next_timeout = next_in_timeouts_stream(next_timeouts);
let at = Instant::now() + duration;
let timeout = Timeout::new_at(at, &core)?;
let timeout = Delay::new(Instant::now() + duration);
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
timeouts.push(future::Either::B(timeout));
timeouts.push(future::Either::A(next_timeout));
Expand All @@ -66,16 +62,18 @@ pub fn build_timeouts_stream<T>(
Out::Timeout(duration, item) => {
// A timeout has happened.
let returned = item.clone();
let at = Instant::now() + duration;
let timeout = Timeout::new_at(at, &core)?;
let timeout = Delay::new(Instant::now() + duration);
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
timeouts.push(future::Either::B(timeout));
Ok((Some(returned), timeouts))
},
}
)
)
}).filter_map(|item| item)
}).filter_map(|item| item);

// Note that we use a `Box` in order to speed up compilation time.
Box::new(stream) as Box<Stream<Item = _, Error = _>>
}

/// Local enum representing the output of the selection.
Expand All @@ -97,15 +95,20 @@ fn next_in_timeouts_stream<T, B>(
.map_err(|_| unreachable!("an UnboundedReceiver can never error"))
}

/// Does the equivalent to `future.map(move |()| (duration, item))`.
/// Does the equivalent to `future.map(move |()| (duration, item)).map_err(|err| to_io_err(err))`.
struct TimeoutWrapper<A, F, T>(F, Duration, Option<T>, PhantomData<A>);
impl<A, F, T> Future for TimeoutWrapper<A, F, T>
where F: Future<Item = ()> {
where F: Future<Item = (), Error = tokio_timer::Error> {
type Item = Out<A, T>;
type Error = F::Error;
type Error = IoError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let _ready: () = try_ready!(self.0.poll());
match self.0.poll() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(IoError::new(IoErrorKind::Other, err.to_string())),
}

let out = Out::Timeout(self.1, self.2.take().expect("poll() called again after success"));
Ok(Async::Ready(out))
}
Expand Down
4 changes: 1 addition & 3 deletions substrate/network-libp2p/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ use libp2p::{self, Transport, mplex, secio, yamux};
use libp2p::core::{MuxedTransport, either, upgrade};
use libp2p::transport_timeout::TransportTimeout;
use std::time::Duration;
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};

/// Builds the transport that serves as a common ground for all connections.
pub fn build_transport(
core: Handle,
unencrypted_allowed: UnencryptedAllowed,
local_private_key: secio::SecioKeyPair
) -> impl MuxedTransport<Output = impl AsyncRead + AsyncWrite> + Clone {
let base = libp2p::CommonTransport::new(core)
let base = libp2p::CommonTransport::new()
.with_upgrade({
let secio = secio::SecioConfig {
key: local_private_key,
Expand Down

0 comments on commit f9013b2

Please sign in to comment.