Skip to content

Commit

Permalink
More generic removal
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Dec 12, 2023
1 parent 522dded commit 2947ec6
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 103 deletions.
97 changes: 45 additions & 52 deletions src/operator/flat_map.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
use core::iter::{IntoIterator, Iterator};
use std::fmt::Display;
use std::marker::PhantomData;

use crate::block::{BlockStructure, OperatorStructure};
use crate::operator::{Operator, StreamElement, Timestamp};
use crate::scheduler::ExecutionMetadata;
use crate::stream::KeyedItem;

#[derive(Derivative)]
#[derivative(Debug)]
pub struct FlatMap<O, It, F, Op>
pub struct FlatMap<It, F, Op>
where
O: Send,
Op: Operator,
It: IntoIterator<Item = O>,
<It as IntoIterator>::IntoIter: Send + 'static,
It: IntoIterator,
It::IntoIter: Send + 'static,
It::Item: Send,
F: Fn(Op::Out) -> It + Clone + Send + 'static,
{
prev: Op,
Expand All @@ -28,14 +26,14 @@ where
frontiter: Option<<It as IntoIterator>::IntoIter>,
#[cfg(feature = "timestamp")]
timestamp: Option<Timestamp>,
_iter_out: PhantomData<O>,
}

impl<O: Send, It, F: Clone, Op: Clone> Clone for FlatMap<O, It, F, Op>
impl<It, F, Op> Clone for FlatMap<It, F, Op>
where
Op: Operator,
It: IntoIterator<Item = O>,
<It as IntoIterator>::IntoIter: Send + 'static,
It: IntoIterator,
It::IntoIter: Send + 'static,
It::Item: Send,
F: Fn(Op::Out) -> It + Clone + Send + 'static,
{
fn clone(&self) -> Self {
Expand All @@ -45,17 +43,16 @@ where
frontiter: None,
#[cfg(feature = "timestamp")]
timestamp: self.timestamp,
_iter_out: PhantomData,
}
}
}

impl<O, It, F, Op> Display for FlatMap<O, It, F, Op>
impl<It, F, Op> Display for FlatMap<It, F, Op>
where
O: Send,
Op: Operator,
It: IntoIterator<Item = O>,
<It as IntoIterator>::IntoIter: Send + 'static,
It: IntoIterator,
It::IntoIter: Send + 'static,
It::Item: Send,
F: Fn(Op::Out) -> It + Clone + Send + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -64,17 +61,17 @@ where
"{} -> FlatMap<{} -> {}>",
self.prev,
std::any::type_name::<Op::Out>(),
std::any::type_name::<O>()
std::any::type_name::<It::Item>()
)
}
}

impl<O, It, F, Op> FlatMap<O, It, F, Op>
impl<It, F, Op> FlatMap<It, F, Op>
where
O: Send,
Op: Operator,
It: IntoIterator<Item = O>,
<It as IntoIterator>::IntoIter: Send + 'static,
It: IntoIterator,
It::IntoIter: Send + 'static,
It::Item: Send,
F: Fn(Op::Out) -> It + Clone + Send + 'static,
{
pub(super) fn new(prev: Op, f: F) -> Self {
Expand All @@ -84,20 +81,19 @@ where
frontiter: None,
#[cfg(feature = "timestamp")]
timestamp: None,
_iter_out: Default::default(),
}
}
}

impl<O, It, F, Op> Operator for FlatMap<O, It, F, Op>
impl<It, F, Op> Operator for FlatMap<It, F, Op>
where
O: Send,
Op: Operator,
It: IntoIterator<Item = O>,
<It as IntoIterator>::IntoIter: Send + 'static,
It: IntoIterator,
It::IntoIter: Send + 'static,
It::Item: Send,
F: Fn(Op::Out) -> It + Clone + Send + 'static,
{
type Out = O;
type Out = It::Item;

fn setup(&mut self, metadata: &mut ExecutionMetadata) {
self.prev.setup(metadata);
Expand Down Expand Up @@ -144,19 +140,19 @@ where
fn structure(&self) -> BlockStructure {
self.prev
.structure()
.add_operator(OperatorStructure::new::<O, _>("FlatMap"))
.add_operator(OperatorStructure::new::<It::Item, _>("FlatMap"))
}
}

#[derive(Derivative)]
#[derivative(Debug)]
pub struct KeyedFlatMap<O, It, F, Op>
pub struct KeyedFlatMap<It, F, Op>
where
O: Send,
Op: Operator,
Op::Out: KeyedItem,
It: IntoIterator<Item = O>,
<It as IntoIterator>::IntoIter: Send + 'static,
It: IntoIterator,
It::IntoIter: Send + 'static,
It::Item: Send,
F: Fn(Op::Out) -> It + Clone + Send + 'static,
{
prev: Op,
Expand All @@ -169,16 +165,15 @@ where
#[derivative(Debug = "ignore")]
frontiter: Option<(<Op::Out as KeyedItem>::Key, It::IntoIter)>,
timestamp: Option<Timestamp>,
_iter_out: PhantomData<O>,
}

impl<O: Send, It, F: Clone, Op: Clone> Clone for KeyedFlatMap<O, It, F, Op>
impl<It, F, Op> Clone for KeyedFlatMap<It, F, Op>
where
O: Send,
Op: Operator,
Op::Out: KeyedItem,
It: IntoIterator<Item = O>,
<It as IntoIterator>::IntoIter: Send + 'static,
It: IntoIterator,
It::IntoIter: Send + 'static,
It::Item: Send,
F: Fn(Op::Out) -> It + Clone + Send + 'static,
{
fn clone(&self) -> Self {
Expand All @@ -187,18 +182,17 @@ where
f: self.f.clone(),
frontiter: None,
timestamp: self.timestamp,
_iter_out: self._iter_out,
}
}
}

impl<O, It, F, Op> Display for KeyedFlatMap<O, It, F, Op>
impl<It, F, Op> Display for KeyedFlatMap<It, F, Op>
where
O: Send,
Op: Operator,
Op::Out: KeyedItem,
It: IntoIterator<Item = O>,
<It as IntoIterator>::IntoIter: Send + 'static,
It: IntoIterator,
It::IntoIter: Send + 'static,
It::Item: Send,
F: Fn(Op::Out) -> It + Clone + Send + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -207,18 +201,18 @@ where
"{} -> KeyedFlatMap<{} -> {}>",
self.prev,
std::any::type_name::<Op::Out>(),
std::any::type_name::<O>()
std::any::type_name::<It::Item>()
)
}
}

impl<O, It, F, Op> KeyedFlatMap<O, It, F, Op>
impl<It, F, Op> KeyedFlatMap<It, F, Op>
where
O: Send,
Op: Operator,
Op::Out: KeyedItem,
It: IntoIterator<Item = O>,
<It as IntoIterator>::IntoIter: Send + 'static,
It: IntoIterator,
It::IntoIter: Send + 'static,
It::Item: Send,
F: Fn(Op::Out) -> It + Clone + Send + 'static,
{
pub(super) fn new(prev: Op, f: F) -> Self {
Expand All @@ -227,21 +221,20 @@ where
f,
frontiter: None,
timestamp: None,
_iter_out: Default::default(),
}
}
}

impl<O, It, F, Op> Operator for KeyedFlatMap<O, It, F, Op>
impl<It, F, Op> Operator for KeyedFlatMap<It, F, Op>
where
O: Send,
Op: Operator,
Op::Out: KeyedItem,
It: IntoIterator<Item = O>,
<It as IntoIterator>::IntoIter: Send + 'static,
It: IntoIterator,
It::IntoIter: Send + 'static,
It::Item: Send,
F: Fn(Op::Out) -> It + Clone + Send + 'static,
{
type Out = (<Op::Out as KeyedItem>::Key, O);
type Out = (<Op::Out as KeyedItem>::Key, It::Item);

fn setup(&mut self, metadata: &mut ExecutionMetadata) {
self.prev.setup(metadata);
Expand Down
Loading

0 comments on commit 2947ec6

Please sign in to comment.