diff --git a/Cargo.lock b/Cargo.lock index 964370a..ecd02b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1926,7 +1926,7 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "renoir" -version = "0.2.0" +version = "0.3.0" dependencies = [ "apache-avro", "arrow", diff --git a/Cargo.toml b/Cargo.toml index 000131c..6ac4a3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "renoir" description = "Reactive Network of Operators In Rust" -version = "0.2.0" +version = "0.3.0" edition = "2021" authors = [ "Luca De Martini ", diff --git a/src/operator/mod.rs b/src/operator/mod.rs index 81c9cb5..a8344f2 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -8,7 +8,7 @@ use std::fmt::Display; use std::hash::Hash; use std::ops::{AddAssign, Div}; -use cache::{CacheRegistry, CacheSink, CachedStream, Cacher}; +use cache::{CacheRegistry, CacheSink, CachedStream, Cacher, VecCacher}; use flume::{unbounded, Receiver}; #[cfg(feature = "tokio")] use futures::Future; @@ -2120,6 +2120,14 @@ where CachedStream::new(rt_config, replication, output) } + /// Collect the output of the stream to a [StreamCache] that can later be resumed to + /// create a [Stream] with its content. Returns the cache and consumes the stream. + /// + /// **See [Stream::collect_cache]** + pub fn collect_cache_vec(self) -> CachedStream> { + self.collect_cache(()) + } + /// Collect the output of the stream to a [StreamCache] that can later be resumed to /// create a [Stream] with its content. Returns the cache and a copy of the current stream. /// @@ -2168,6 +2176,19 @@ where splits.pop().unwrap(), ) } + + /// Collect the output of the stream to a [StreamCache] that can later be resumed to + /// create a [Stream] with its content. Returns the cache and a copy of the current stream. + /// + /// **See [Stream::cache]** + pub fn cache_vec( + self, + ) -> ( + CachedStream>, + Stream>, + ) { + self.cache(()) + } } impl Stream