From 2947ec6175669215b6cc2a48d006e637e466c622 Mon Sep 17 00:00:00 2001 From: imDema Date: Tue, 12 Dec 2023 14:05:56 +0100 Subject: [PATCH] More generic removal --- src/operator/flat_map.rs | 97 +++++++++++++++++++--------------------- src/operator/mod.rs | 96 +++++++++++++++++++-------------------- src/stream.rs | 2 +- 3 files changed, 92 insertions(+), 103 deletions(-) diff --git a/src/operator/flat_map.rs b/src/operator/flat_map.rs index 33f98d4f..08c3d8d3 100644 --- a/src/operator/flat_map.rs +++ b/src/operator/flat_map.rs @@ -1,7 +1,5 @@ use core::iter::{IntoIterator, Iterator}; use std::fmt::Display; -use std::marker::PhantomData; - use crate::block::{BlockStructure, OperatorStructure}; use crate::operator::{Operator, StreamElement, Timestamp}; use crate::scheduler::ExecutionMetadata; @@ -9,12 +7,12 @@ use crate::stream::KeyedItem; #[derive(Derivative)] #[derivative(Debug)] -pub struct FlatMap +pub struct FlatMap where - O: Send, Op: Operator, - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Clone + Send + 'static, { prev: Op, @@ -28,14 +26,14 @@ where frontiter: Option<::IntoIter>, #[cfg(feature = "timestamp")] timestamp: Option, - _iter_out: PhantomData, } -impl Clone for FlatMap +impl Clone for FlatMap where Op: Operator, - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Clone + Send + 'static, { fn clone(&self) -> Self { @@ -45,17 +43,16 @@ where frontiter: None, #[cfg(feature = "timestamp")] timestamp: self.timestamp, - _iter_out: PhantomData, } } } -impl Display for FlatMap +impl Display for FlatMap where - O: Send, Op: Operator, - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Clone + Send + 'static, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -64,17 +61,17 @@ where "{} -> FlatMap<{} -> {}>", self.prev, std::any::type_name::(), - std::any::type_name::() + std::any::type_name::() ) } } -impl FlatMap +impl FlatMap where - O: Send, Op: Operator, - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Clone + Send + 'static, { pub(super) fn new(prev: Op, f: F) -> Self { @@ -84,20 +81,19 @@ where frontiter: None, #[cfg(feature = "timestamp")] timestamp: None, - _iter_out: Default::default(), } } } -impl Operator for FlatMap +impl Operator for FlatMap where - O: Send, Op: Operator, - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Clone + Send + 'static, { - type Out = O; + type Out = It::Item; fn setup(&mut self, metadata: &mut ExecutionMetadata) { self.prev.setup(metadata); @@ -144,19 +140,19 @@ where fn structure(&self) -> BlockStructure { self.prev .structure() - .add_operator(OperatorStructure::new::("FlatMap")) + .add_operator(OperatorStructure::new::("FlatMap")) } } #[derive(Derivative)] #[derivative(Debug)] -pub struct KeyedFlatMap +pub struct KeyedFlatMap where - O: Send, Op: Operator, Op::Out: KeyedItem, - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Clone + Send + 'static, { prev: Op, @@ -169,16 +165,15 @@ where #[derivative(Debug = "ignore")] frontiter: Option<(::Key, It::IntoIter)>, timestamp: Option, - _iter_out: PhantomData, } -impl Clone for KeyedFlatMap +impl Clone for KeyedFlatMap where - O: Send, Op: Operator, Op::Out: KeyedItem, - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Clone + Send + 'static, { fn clone(&self) -> Self { @@ -187,18 +182,17 @@ where f: self.f.clone(), frontiter: None, timestamp: self.timestamp, - _iter_out: self._iter_out, } } } -impl Display for KeyedFlatMap +impl Display for KeyedFlatMap where - O: Send, Op: Operator, Op::Out: KeyedItem, - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Clone + Send + 'static, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -207,18 +201,18 @@ where "{} -> KeyedFlatMap<{} -> {}>", self.prev, std::any::type_name::(), - std::any::type_name::() + std::any::type_name::() ) } } -impl KeyedFlatMap +impl KeyedFlatMap where - O: Send, Op: Operator, Op::Out: KeyedItem, - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Clone + Send + 'static, { pub(super) fn new(prev: Op, f: F) -> Self { @@ -227,21 +221,20 @@ where f, frontiter: None, timestamp: None, - _iter_out: Default::default(), } } } -impl Operator for KeyedFlatMap +impl Operator for KeyedFlatMap where - O: Send, Op: Operator, Op::Out: KeyedItem, - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Clone + Send + 'static, { - type Out = (::Key, O); + type Out = (::Key, It::Item); fn setup(&mut self, metadata: &mut ExecutionMetadata) { self.prev.setup(metadata); diff --git a/src/operator/mod.rs b/src/operator/mod.rs index 8a0d1fb8..549a47cc 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -99,8 +99,8 @@ pub trait ExchangeData: Serialize + for<'a> Deserialize<'a> + Clone + Send + 'st impl Deserialize<'a> + Clone + Send + 'static> ExchangeData for T {} /// Marker trait that all the keys should implement. -pub trait DataKey: Data + Hash + Eq {} -impl DataKey for T {} +pub trait DataKey: Clone + Send + Hash + Eq + 'static {} +impl DataKey for T {} /// Marker trait for key types that are used when communicating between different blocks. pub trait ExchangeDataKey: DataKey + ExchangeData {} @@ -519,7 +519,7 @@ where pub fn rich_map(self, mut f: F) -> Stream> where F: FnMut(Op::Out) -> O + Send + Clone + 'static, - O: Data, + O: Send + 'static, { self.key_by(|_| ()) .add_operator(|prev| RichMap::new(prev, move |(_, value)| f(value))) @@ -934,13 +934,12 @@ where /// /// assert_eq!(res.get().unwrap(), vec![(0, 1), (0, 2), (1, 2), (0, 3), (1, 3), (2, 3)]); /// ``` - pub fn rich_flat_map(self, f: F) -> Stream> + pub fn rich_flat_map(self, f: F) -> Stream> where - It: IntoIterator, - ::IntoIter: Clone + Send + 'static, + It: IntoIterator + Send + 'static, + ::IntoIter: Send + 'static, + ::Item: Send, F: FnMut(Op::Out) -> It + Send + Clone + 'static, - It: Data, - O: Data, { self.rich_map(f).flatten() } @@ -989,13 +988,12 @@ where /// /// assert_eq!(res.get().unwrap(), vec![0, 0, 1, 1, 2, 2]); /// ``` - pub fn flat_map(self, f: F) -> Stream> + pub fn flat_map(self, f: F) -> Stream> where - It: IntoIterator, - ::IntoIter: Send + 'static, + It: IntoIterator + 'static, + It::IntoIter: Send + 'static, + It::Item: Send, F: Fn(Op::Out) -> It + Send + Clone + 'static, - It: 'static, - O: Data, { self.add_operator(|prev| FlatMap::new(prev, f)) } @@ -1020,6 +1018,36 @@ where self.add_operator(|prev| ForEach::new(prev, f)) .finalize_block(); } + + /// Transform this stream of containers into a stream of all the contained values. + /// + /// **Note**: this is very similar to [`Iteartor::flatten`](std::iter::Iterator::flatten) + /// + /// ## Example + /// + /// ``` + /// # use noir::{StreamEnvironment, EnvironmentConfig}; + /// # use noir::operator::source::IteratorSource; + /// # let mut env = StreamEnvironment::new(EnvironmentConfig::local(1)); + /// let s = env.stream(IteratorSource::new(vec![ + /// vec![1, 2, 3], + /// vec![], + /// vec![4, 5], + /// ].into_iter())); + /// let res = s.flatten().collect_vec(); + /// + /// env.execute_blocking(); + /// + /// assert_eq!(res.get().unwrap(), vec![1, 2, 3, 4, 5]); + /// ``` + pub fn flatten(self) -> Stream::Item>> + where + Op::Out: IntoIterator, + ::IntoIter: Send, + ::Item: Send, + { + self.add_operator(|prev| Flatten::new(prev)) + } } impl Stream @@ -1843,39 +1871,6 @@ where } } -impl Stream -where - Op: Operator + 'static, - Op::Out: IntoIterator, - It: Iterator + Clone + Send + 'static, - O: Send, -{ - /// Transform this stream of containers into a stream of all the contained values. - /// - /// **Note**: this is very similar to [`Iteartor::flatten`](std::iter::Iterator::flatten) - /// - /// ## Example - /// - /// ``` - /// # use noir::{StreamEnvironment, EnvironmentConfig}; - /// # use noir::operator::source::IteratorSource; - /// # let mut env = StreamEnvironment::new(EnvironmentConfig::local(1)); - /// let s = env.stream(IteratorSource::new(vec![ - /// vec![1, 2, 3], - /// vec![], - /// vec![4, 5], - /// ].into_iter())); - /// let res = s.flatten().collect_vec(); - /// - /// env.execute_blocking(); - /// - /// assert_eq!(res.get().unwrap(), vec![1, 2, 3, 4, 5]); - /// ``` - pub fn flatten(self) -> Stream> { - self.add_operator(|prev| Flatten::new(prev)) - } -} - impl Stream where Op: Operator + 'static, @@ -1957,7 +1952,7 @@ where impl KeyedStream where K: DataKey, - I: Data, + I: Send + 'static, Op: Operator + 'static, { /// Given a keyed stream without timestamps nor watermarks, tag each item with a timestamp and insert @@ -2052,7 +2047,7 @@ where pub fn filter_map(self, f: F) -> KeyedStream> where F: Fn((&K, I)) -> Option + Send + Clone + 'static, - O: Data, + O: Send + 'static, { self.map(f) .filter(|(_, x)| x.is_some()) @@ -2176,7 +2171,7 @@ where pub fn fold(self, init: O, f: F) -> KeyedStream> where F: Fn(&mut O, ::Value) + Send + Clone + 'static, - O: Data, + O: Send + Clone, { self.add_operator(|prev| KeyedFold::new(prev, init, f)) } @@ -2220,6 +2215,7 @@ where /// ``` pub fn reduce(self, f: F) -> KeyedStream> where + I: Clone + 'static, F: Fn(&mut I, I) + Send + Clone + 'static, { self.fold(None, move |acc, value| match acc { @@ -2251,7 +2247,7 @@ where pub fn map(self, f: F) -> KeyedStream> where F: Fn((&K, I)) -> O + Send + Clone + 'static, - O: Data, + O: Send, { self.add_operator(|prev| { Map::new(prev, move |(k, v)| { diff --git a/src/stream.rs b/src/stream.rs index 4b2e85e9..ecc9ce68 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -311,7 +311,7 @@ where { pub(crate) fn add_operator(self, get_operator: GetOp) -> KeyedStream where - Op2: Operator + 'static, + Op2: Operator, GetOp: FnOnce(OperatorChain) -> Op2, Op2::Out: KeyedItem::Key>, {