Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tmp/tokio 1.0 #28

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ travis-ci = {repository = "dwango/fibers-rs"}
[dependencies]
mio = "0.6"
futures = "0.1"
futures03 = {package = "futures", version = "0.3", features = ["thread-pool", "compat"]}
tokio = {version = "1.0", features = ["time", "rt", "rt-multi-thread", "net", "io-util"]}
splay_tree = "0.2"
num_cpus = "1"
nbchan = "0.1"
pin-project = "1.0.1"

[dev-dependencies]
clap = "2"
Expand Down
90 changes: 90 additions & 0 deletions src/executor/futures_in_place.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved.
// See the LICENSE file at the top-level directory of this distribution.

use futures::Future;
use futures03::compat::Future01CompatExt;
use futures03::executor::{LocalPool as LocalPool03, LocalSpawner as LocalSpawner03};
use futures03::task::{FutureObj as FutureObj03, Spawn as _};
use futures03::FutureExt;
use std::io;

use super::Executor;
use crate::fiber::Spawn;

/// An executor that executes spawned fibers and I/O event polling on current thread.
///
/// # Examples
///
/// An example to calculate fibonacci numbers:
///
/// ```
/// # extern crate fibers;
/// # extern crate futures;
/// use fibers::{Spawn, Executor, InPlaceExecutor};
/// use futures::{Async, Future};
///
/// fn fib<H: Spawn + Clone>(n: usize, handle: H) -> Box<dyn Future<Item=usize, Error=()> + Send> {
/// if n < 2 {
/// Box::new(futures::finished(n))
/// } else {
/// let f0 = handle.spawn_monitor(fib(n - 1, handle.clone()));
/// let f1 = handle.spawn_monitor(fib(n - 2, handle.clone()));
/// Box::new(f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ()))
/// }
/// }
///
/// let mut executor = InPlaceExecutor::new().unwrap();
/// let mut monitor = executor.spawn_monitor(fib(7, executor.handle()));
/// loop {
/// if let Async::Ready(answer) = monitor.poll().unwrap() {
/// assert_eq!(answer, 13);
/// return;
/// } else {
/// executor.run_once().unwrap();
/// }
/// }
/// ```
#[derive(Debug)]
pub struct InPlaceExecutor {
pool: LocalPool03,
}
impl InPlaceExecutor {
/// Creates a new instance of `InPlaceExecutor`.
pub fn new() -> io::Result<Self> {
let pool = LocalPool03::new();
Ok(InPlaceExecutor { pool })
}
}
impl Executor for InPlaceExecutor {
type Handle = InPlaceExecutorHandle;
fn handle(&self) -> Self::Handle {
InPlaceExecutorHandle {
spawner: self.pool.spawner(),
}
}
fn run_once(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Spawn for InPlaceExecutor {
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
self.handle().spawn_boxed(fiber)
}
}

/// A handle of an `InPlaceExecutor` instance.
#[derive(Debug, Clone)]
pub struct InPlaceExecutorHandle {
spawner: LocalSpawner03,
}

// TODO: don't rely on this
unsafe impl Send for InPlaceExecutorHandle {}
impl Spawn for InPlaceExecutorHandle {
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
let future03 = fiber.compat().map(|_result| ());
let futureobj03: FutureObj03<()> = Box::new(future03).into();
// TODO: proper error handlings
self.spawner.spawn_obj(futureobj03).unwrap();
}
}
118 changes: 118 additions & 0 deletions src/executor/futures_thread_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved.
// See the LICENSE file at the top-level directory of this distribution.

use futures::Future;
use futures03::compat::Future01CompatExt;
use futures03::FutureExt;
use std::io;
use std::sync::Arc;
use tokio::runtime::Runtime as TokioRuntime;

use super::Executor;
use crate::fiber::Spawn;

/// An executor that executes spawned fibers on pooled threads.
///
/// # Examples
///
/// An example to calculate fibonacci numbers:
///
/// ```
/// # extern crate fibers;
/// # extern crate futures;
/// use fibers::{Spawn, Executor, ThreadPoolExecutor};
/// use futures::{Async, Future};
///
/// fn fib<H: Spawn + Clone>(n: usize, handle: H) -> Box<dyn Future<Item=usize, Error=()> + Send> {
/// if n < 2 {
/// Box::new(futures::finished(n))
/// } else {
/// let f0 = handle.spawn_monitor(fib(n - 1, handle.clone()));
/// let f1 = handle.spawn_monitor(fib(n - 2, handle.clone()));
/// Box::new(f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ()))
/// }
/// }
///
/// let mut executor = ThreadPoolExecutor::new().unwrap();
/// let monitor = executor.spawn_monitor(fib(7, executor.handle()));
/// let answer = executor.run_fiber(monitor).unwrap();
/// assert_eq!(answer, Ok(13));
/// ```
#[derive(Debug)]
pub struct ThreadPoolExecutor {
pool: Arc<TokioRuntime>,
}
impl ThreadPoolExecutor {
/// Creates a new instance of `ThreadPoolExecutor`.
///
/// This is equivalent to `ThreadPoolExecutor::with_thread_count(num_cpus::get() * 2)`.
pub fn new() -> io::Result<Self> {
Self::with_thread_count(num_cpus::get() * 2)
}

/// Creates a new instance of `ThreadPoolExecutor` with the specified size of thread pool.
///
/// # Implementation Details
///
/// Note that current implementation is very naive and
/// should be improved in future releases.
///
/// Internally, `count` threads are assigned to each of
/// the scheduler (i.e., `fibers::fiber::Scheduler`) and
/// the I/O poller (i.e., `fibers::io::poll::Poller`).
///
/// When `spawn` function is called, the executor will assign a scheduler (thread)
/// for the fiber in simple round robin fashion.
///
/// If any of those threads are aborted, the executor will return an error as
/// a result of `run_once` method call after that.
pub fn with_thread_count(count: usize) -> io::Result<Self> {
assert!(count > 0);
let pool = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(count)
.build()?;
Ok(Self {
pool: Arc::new(pool),
})
}
}
impl Executor for ThreadPoolExecutor {
type Handle = ThreadPoolExecutorHandle;
fn handle(&self) -> Self::Handle {
ThreadPoolExecutorHandle {
pool: self.pool.clone(),
}
}
/// Does nothing. Futures are automatically polled.
fn run_once(&mut self) -> io::Result<()> {
Ok(())
}
/// Runs until the future is ready.
fn run_future<F: Future>(&mut self, future: F) -> io::Result<Result<F::Item, F::Error>> {
Ok(self.pool.block_on(future.compat()))
}

/// Runs infinitely until an error happens.
fn run(self) -> io::Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

サポートできないなら fibers-rs のメジャーバージョンを変更し、このメソッドも API から削除してしまってはどうでしょう。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

そうします。よく考えたらいずれにせよ、io, net, fiber::schedule モジュールは消してしまうので、メジャーバージョンの更新は必須でした。

// In this impl, run should never be called.
unreachable!("Don't call run directly!");
}
}
impl Spawn for ThreadPoolExecutor {
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
self.handle().spawn_boxed(fiber)
}
}

/// A handle of a `ThreadPoolExecutor` instance.
#[derive(Debug, Clone)]
pub struct ThreadPoolExecutorHandle {
pool: Arc<TokioRuntime>,
}
impl Spawn for ThreadPoolExecutorHandle {
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
let future03 = fiber.compat().map(|_result| ());
self.pool.spawn(future03);
}
}
13 changes: 11 additions & 2 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@
use futures::{Async, Future};
use std::io;

pub use self::in_place::{InPlaceExecutor, InPlaceExecutorHandle};
pub use self::thread_pool::{ThreadPoolExecutor, ThreadPoolExecutorHandle};
pub use self::futures_in_place::{InPlaceExecutor, InPlaceExecutorHandle};
pub use self::futures_thread_pool::{ThreadPoolExecutor, ThreadPoolExecutorHandle};
pub use self::in_place::{
InPlaceExecutor as OldInPlaceExecutor, InPlaceExecutorHandle as OldInPlaceExecutorHandle,
};
pub use self::thread_pool::{
ThreadPoolExecutor as OldThreadPoolExecutor,
ThreadPoolExecutorHandle as OldThreadPoolExecutorHandle,
};

use crate::fiber::Spawn;
use crate::sync::oneshot::{Monitor, MonitorError};

mod futures_in_place;
mod futures_thread_pool;
mod in_place;
mod thread_pool;

Expand Down
22 changes: 1 addition & 21 deletions src/io/poll/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use nbchan::mpsc as nb_mpsc;
use std::collections::HashMap;
use std::fmt;
use std::io;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{RecvError, TryRecvError};
use std::sync::Arc;
use std::time;
Expand Down Expand Up @@ -263,26 +263,6 @@ impl PollerHandle {
}
Register { rx }
}

fn set_timeout(&self, delay_from_now: time::Duration) -> Timeout {
let (tx, rx) = oneshot::channel();
let expiry_time = time::Instant::now() + delay_from_now;
let timeout_id = self.next_timeout_id.fetch_add(1, atomic::Ordering::SeqCst);
let request = Request::SetTimeout(timeout_id, expiry_time, tx);
let _ = self.request_tx.send(request);
Timeout {
cancel: Some(CancelTimeout {
timeout_id,
expiry_time,
request_tx: self.request_tx.clone(),
}),
rx,
}
}
}

pub fn set_timeout(poller: &PollerHandle, delay_from_now: time::Duration) -> Timeout {
poller.set_timeout(delay_from_now)
}

#[derive(Debug)]
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
#![warn(missing_docs)]

extern crate futures;
extern crate futures03;
extern crate mio;
extern crate nbchan;
extern crate num_cpus;
Expand Down
74 changes: 57 additions & 17 deletions src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,6 @@ impl TcpListener {
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.handle.inner().local_addr()
}

/// Get the value of the `SO_ERROR` option on this socket.
///
/// This will retrieve the stored error in the underlying socket,
/// clearing the field in the process.
/// This can be useful for checking errors between calls.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.handle.inner().take_error()
}

/// Calls `f` with the reference to the inner socket.
pub fn with_inner<F, T>(&self, f: F) -> T
where
F: FnOnce(&MioTcpListener) -> T,
{
f(&*self.handle.inner())
}
}
impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand Down Expand Up @@ -463,3 +446,60 @@ impl Future for ConnectInner {
}
}
}

mod tests {
use std::sync::atomic::{AtomicBool, Ordering};

#[test]
fn async_works() {
use crate::ThreadPoolExecutor;
use crate::{Executor, Spawn};
use futures::Future;
use futures03::TryFutureExt;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
let mut exec = ThreadPoolExecutor::new().unwrap();
let addr: SocketAddr = "127.0.0.1:2525".parse().unwrap();
let flag = Arc::new(AtomicBool::new(false));
let flag_cp = flag.clone();
let fut_listen_03 = async move {
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

このファイルが提供している struct とかをテストしてるわけじゃなさそうなんですけど, このテストはここにあったほうが良いのでしょうか?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

確かにそうなのですが、他に適切な置き場所もなく、という感じです。

let (conn, addr) = listener.accept().await.unwrap();
eprintln!("Connected! addr = {:?}", addr);
let mut buf = vec![0; 10];
for i in 0..10 {
let read = conn.try_read(&mut buf);
eprintln!("read = {:?}, buf = {:?}", read, buf);
if read.is_ok() {
break;
}
std::thread::sleep(Duration::from_secs(1));
}
flag_cp.store(true, Ordering::SeqCst);
Ok::<(), std::io::Error>(())
};
let fut_listen = Box::pin(fut_listen_03).compat();
let fut_conn_03 = async move {
eprintln!("connecting...");
let str = tokio::net::TcpStream::connect(addr).await.unwrap();
for i in 0..10 {
let res = str.try_write(&[68, 69, 70, 71]);
eprintln!("write = {:?}", res);
if res.is_ok() {
break;
}
}
Ok::<(), std::io::Error>(())
};
let fut = Box::pin(fut_conn_03).compat();
exec.spawn(fut_listen.map_err(|e| panic!("Spawn failed: server {}", e)));

std::thread::sleep(Duration::from_secs(1));

exec.run_future(fut.map_err(|e| panic!("Spawn failed: client {}", e)))
.unwrap()
.unwrap();
while !flag.load(Ordering::SeqCst) {}
}
}
17 changes: 0 additions & 17 deletions src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,6 @@ impl UdpSocket {
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.handle.inner().local_addr()
}

/// Get the value of the `SO_ERROR` option on this socket.
///
/// This will retrieve the stored error in the underlying socket,
/// clearing the field in the process.
/// This can be useful for checking errors between calls.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.handle.inner().take_error()
}

/// Calls `f` with the reference to the inner socket.
pub fn with_inner<F, T>(&self, f: F) -> T
where
F: FnOnce(&MioUdpSocket) -> T,
{
f(&*self.handle.inner())
}
}
impl fmt::Debug for UdpSocket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand Down
Loading