Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: delete async-trait, fix typo, use tokio to replace async_std #998

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/raft-kv-memstore-singlethreaded/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ repository = "https://github.com/datafuselabs/openraft"
[dependencies]
openraft = { path = "../../openraft", features = ["serde", "storage-v2", "singlethreaded"] }

async-trait = "0.1.36"
clap = { version = "4.1.11", features = ["derive", "env"] }
reqwest = { version = "0.11.9", features = ["json"] }
serde = { version = "1.0.114", features = ["derive"] }
Expand Down
1 change: 0 additions & 1 deletion examples/raft-kv-memstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ path = "src/bin/main.rs"
openraft = { path = "../../openraft", features = ["serde", "storage-v2"] }

actix-web = "4.0.0-rc.2"
async-trait = "0.1.36"
clap = { version = "4.1.11", features = ["derive", "env"] }
reqwest = { version = "0.11.9", features = ["json"] }
serde = { version = "1.0.114", features = ["derive"] }
Expand Down
10 changes: 7 additions & 3 deletions examples/raft-kv-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ path = "src/bin/main.rs"
[dependencies]
openraft = { path = "../../openraft", features = ["serde", "storage-v2"] }

async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
async-trait = "0.1.36"
tokio = { version = "1.35.1", features = ["full"] }
byteorder = "1.4.3"
clap = { version = "4.1.11", features = ["derive", "env"] }
reqwest = { version = "0.11.9", features = ["json"] }
Expand All @@ -34,7 +33,12 @@ serde_json = "1.0.57"
tide = { version = "0.16" }
# for toy-rpc, use `serde_json` instead of the default `serde_bincode`:
# bincode which enabled by default by toy-rpc, does not support `#[serde(flatten)]`: https://docs.rs/bincode/2.0.0-alpha.1/bincode/serde/index.html#known-issues
toy-rpc = { version = "0.8.6", default-features = false, features = [ "serde_json", "ws_async_std", "server", "client", "async_std_runtime", ] }
toy-rpc = { version = "0.8.6", features = [
"ws_tokio",
"server",
"client",
"tokio_runtime",
] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }

Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-rocksdb/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use async_std::sync::RwLock;
use openraft::Config;
use tokio::sync::RwLock;

use crate::ExampleRaft;
use crate::NodeId;
Expand All @@ -12,7 +12,7 @@ use crate::NodeId;
pub struct App {
pub id: NodeId,
pub api_addr: String,
pub rcp_addr: String,
pub rpc_addr: String,
pub raft: ExampleRaft,
pub key_values: Arc<RwLock<BTreeMap<String, String>>>,
pub config: Arc<Config>,
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Opt {
pub rpc_addr: String,
}

#[async_std::main]
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Setup the logger
tracing_subscriber::fmt()
Expand Down
12 changes: 6 additions & 6 deletions examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::io::Cursor;
use std::path::Path;
use std::sync::Arc;

use async_std::net::TcpListener;
use async_std::task;
use openraft::Config;
use openraft::TokioRuntime;
use tokio::net::TcpListener;
use tokio::task;

use crate::app::App;
use crate::network::api;
Expand Down Expand Up @@ -79,7 +79,7 @@ pub async fn start_example_raft_node<P>(
node_id: NodeId,
dir: P,
http_addr: String,
rcp_addr: String,
rpc_addr: String,
) -> std::io::Result<()>
where
P: AsRef<Path>,
Expand Down Expand Up @@ -107,7 +107,7 @@ where
let app = Arc::new(App {
id: node_id,
api_addr: http_addr.clone(),
rcp_addr: rcp_addr.clone(),
rpc_addr: rpc_addr.clone(),
raft,
key_values: kvs,
config,
Expand All @@ -117,7 +117,7 @@ where

let server = toy_rpc::Server::builder().register(echo_service).build();

let listener = TcpListener::bind(rcp_addr).await.unwrap();
let listener = TcpListener::bind(rpc_addr).await.unwrap();
let handle = task::spawn(async move {
server.accept_websocket(listener).await.unwrap();
});
Expand All @@ -130,6 +130,6 @@ where
api::rest(&mut app);

app.listen(http_addr).await?;
handle.await;
_ = handle.await;
Ok(())
}
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/network/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn init(req: Request<Arc<App>>) -> tide::Result {
let mut nodes = BTreeMap::new();
let node = Node {
api_addr: req.state().api_addr.clone(),
rpc_addr: req.state().rcp_addr.clone(),
rpc_addr: req.state().rpc_addr.clone(),
};

nodes.insert(req.state().id, node);
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::ops::RangeBounds;
use std::path::Path;
use std::sync::Arc;

use async_std::sync::RwLock;
use byteorder::BigEndian;
use byteorder::ReadBytesExt;
use byteorder::WriteBytesExt;
Expand Down Expand Up @@ -35,6 +34,7 @@ use rocksdb::Options;
use rocksdb::DB;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::RwLock;

use crate::typ;
use crate::Node;
Expand Down
19 changes: 11 additions & 8 deletions examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use std::panic::PanicInfo;
use std::thread;
use std::time::Duration;

use async_std::task::block_on;
use maplit::btreemap;
use maplit::btreeset;
use raft_kv_rocksdb::client::ExampleClient;
use raft_kv_rocksdb::start_example_raft_node;
use raft_kv_rocksdb::store::Request;
use raft_kv_rocksdb::Node;
use tokio::runtime::Handle;
use tracing_subscriber::EnvFilter;

pub fn log_panic(panic: &PanicInfo) {
Expand All @@ -36,7 +36,7 @@ pub fn log_panic(panic: &PanicInfo) {

/// Setup a cluster of 3 nodes.
/// Write to it and read from it.
#[async_std::test(flavor = "multi_thread", worker_threads = 8)]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
// --- The client itself does not store addresses for all nodes, but just node id.
// Thus we need a supporting component to provide mapping from node id to node address.
Expand Down Expand Up @@ -77,23 +77,26 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
let d2 = tempfile::TempDir::new()?;
let d3 = tempfile::TempDir::new()?;

let handle = Handle::current();
let handle_clone = handle.clone();
let _h1 = thread::spawn(move || {
let x = block_on(start_example_raft_node(1, d1.path(), get_addr(1), get_rpc_addr(1)));
let x = handle_clone.block_on(start_example_raft_node(1, d1.path(), get_addr(1), get_rpc_addr(1)));
println!("x: {:?}", x);
});

let handle_clone = handle.clone();
let _h2 = thread::spawn(move || {
let x = block_on(start_example_raft_node(2, d2.path(), get_addr(2), get_rpc_addr(2)));
let x = handle_clone.block_on(start_example_raft_node(2, d2.path(), get_addr(2), get_rpc_addr(2)));
println!("x: {:?}", x);
});

let _h3 = thread::spawn(move || {
let x = block_on(start_example_raft_node(3, d3.path(), get_addr(3), get_rpc_addr(3)));
let x = handle.block_on(start_example_raft_node(3, d3.path(), get_addr(3), get_rpc_addr(3)));
println!("x: {:?}", x);
});

// Wait for server to start up.
async_std::task::sleep(Duration::from_millis(1_000)).await;
tokio::time::sleep(Duration::from_millis(1_000)).await;

// --- Create a client to the first node, as a control handle to the cluster.

Expand Down Expand Up @@ -172,7 +175,7 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {

// --- Wait for a while to let the replication get done.

async_std::task::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(500)).await;

// --- Read it on every node.

Expand Down Expand Up @@ -200,7 +203,7 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
})
.await?;

async_std::task::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(500)).await;

// --- Read it on every node.

Expand Down
Loading