Skip to content

Commit

Permalink
Remove crossbeam feature, switch to flume for all
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Mar 14, 2024
1 parent 380ee22 commit 41ec4f7
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 57 deletions.
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 3 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ homepage = "https://github.com/deib-polimi/noir"
readme = "README.md"

[features]
default = ["flume", "clap", "ssh", "timestamp"]
crossbeam = ["crossbeam-channel"]
default = ["clap", "ssh", "timestamp"]
timestamp = []
ssh = ["ssh2", "whoami", "shell-escape", "sha2", "base64"]
async-tokio = ["tokio", "flume", "futures", "tokio/net", "tokio/io-util", "tokio/time", "tokio/rt-multi-thread", "tokio/macros"]
async-tokio = ["tokio", "futures", "tokio/net", "tokio/io-util", "tokio/time", "tokio/rt-multi-thread", "tokio/macros"]
profiler = []

[dependencies]
Expand Down Expand Up @@ -60,8 +59,7 @@ sha2 = { version = "0.10.8", optional = true }
base64 = { version = "0.22.0", optional = true }

# channel implementation
crossbeam-channel = { version = "0.5.12", optional = true }
flume = { version = "0.11.0", optional = true }
flume = "0.11.0"

# used for csv file source
csv = "1.3.0"
Expand Down Expand Up @@ -100,10 +98,6 @@ micrometer = { version = "0.2.7", features = ["enable"]}
# for the examples
regex = "1.10.3"

# used in the benchmarks
crossbeam-channel = "0.5.12"
flume = "0.11.0"

kstring = { version = "2.0.0", features = ["serde"] }
nexmark = { version = "0.2.0", features = ["serde"] }

Expand Down
33 changes: 1 addition & 32 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@

use std::time::Duration;

#[cfg(feature = "crossbeam")]
use crossbeam_channel::{
bounded as bounded_ext, select, unbounded as unbounded_ext, Receiver as ReceiverExt,
RecvError as ExtRecvError, RecvTimeoutError as ExtRecvTimeoutError, SendError as SendErrorExt,
Sender as SenderExt, TryRecvError as ExtTryRecvError,
};
#[cfg(feature = "flume")]
use flume::{
bounded as bounded_ext, unbounded as unbounded_ext, Receiver as ReceiverExt,
RecvError as ExtRecvError, RecvTimeoutError as ExtRecvTimeoutError, SendError as SendErrorExt,
Expand Down Expand Up @@ -85,30 +78,6 @@ pub enum SelectResult<In1, In2> {
B(Result<In2, RecvError>),
}

#[cfg(feature = "crossbeam")]
#[macro_use]
mod select_impl {
macro_rules! select_impl {
($self:expr, $other:expr) => {
select! {
recv($self.0) -> el => SelectResult::A(el.map_err(RecvError::from)),
recv($other.0) -> el => SelectResult::B(el.map_err(RecvError::from)),
}
};
}

macro_rules! select_timeout_impl {
($self:expr, $other:expr, $timeout:expr) => {
select! {
recv($self.0) -> el => Ok(SelectResult::A(el.map_err(RecvError::from))),
recv($other.0) -> el => Ok(SelectResult::B(el.map_err(RecvError::from))),
default($timeout) => Err(RecvTimeoutError::Timeout),
}
};
}
}

#[cfg(feature = "flume")]
#[macro_use]
mod select_impl {
macro_rules! select_impl {
Expand Down Expand Up @@ -160,7 +129,7 @@ impl<T: ChannelItem> Receiver<T> {
}

#[inline]
#[cfg(feature = "flume")]

pub async fn recv_async(&self) -> Result<T, RecvError> {
self.0.recv_async().await.map_err(RecvError::from)
}
Expand Down
3 changes: 0 additions & 3 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ use std::fmt::Display;
use std::hash::Hash;
use std::ops::{AddAssign, Div};

#[cfg(feature = "crossbeam")]
use crossbeam_channel::{unbounded, Receiver, Sender};
#[cfg(not(feature = "crossbeam"))]
use flume::{unbounded, Receiver};
#[cfg(feature = "async-tokio")]
use futures::Future;
Expand Down
3 changes: 0 additions & 3 deletions src/operator/sink/collect_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ use crate::operator::sink::Sink;
use crate::operator::{ExchangeData, Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;

#[cfg(feature = "crossbeam")]
use crossbeam_channel::Sender;
#[cfg(not(feature = "crossbeam"))]
use flume::Sender;

#[derive(Debug, Clone)]
Expand Down

0 comments on commit 41ec4f7

Please sign in to comment.