You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I've been looking for parallelism combined with async/await and thanks for your decent work.
I'd like to share to retrospection on pulling this crate in my existing project.
My project is filled with stream combinators from futures' StreamExt and async_std's StreamExt. They are extension traits of futures' Stream. In this way, any streams with Stream trait equip with extended combinators automatically. It's great convenience when you're writing your own stream type.
On the contrary, the parallel-stream's ParallelStream is alien to above extension traits. It's a standalone trait with a family of implemented types. That is, when a stream is turned into a ParallelStream, it loses all combinators from those extension traits. Also, things get more complex when writing your own stream type. I think the limit() of ParallelStream is the root of evil.
I noticed aggressive trait bounds that is hard to satisfy. For example, the map method requires the f to have Send, Sync and Copy. Only few and very special types have both Sync and Copy. It makes map useless because it restricts the closure cannot have a local variable lacking one of the traits.
I gather above thoughts and attempted an alternative based on your work, and it comes the par-stream. Basically it provides an extension trait ParStreamExt to futures's Stream and solve the trait bound issue. The limit is given on demand. It sets the # of workers only for that stage. It's not 100% equal to your design because the limit only applies to one stage rather than a group of stages. Instead, I moved your design to another particular API.
let shared = Arc::new(AtomicUsize::new(0));
stream.par_then(None, |item| {// None sets the limit to the number of cores.let shared = shared.clone();// Clone a variable without `Copy` traitasyncmove{let new_item = compute(item, shared);
new_item
}}).collect::<Vec<_>>();// from futures' StreamExt
To the limit the workers of a group of combinators, I suggest the builder patten. So far, it's implemented yet in my crate, but we can see how it would become here.
stream
.enumerate()/* start of group */.into_par_group(ParGroupConfig{// turn to a parallel group builderlimit:Some(4),
..Default::default()// using default runtime and other default options, etc}).then(|item| {asyncmove{/* omit */}})// first stage.filter_map(|item| {asyncmove{/* omit */}})// second stage.build_stream()// build a stream from the group builder/* end of group */.collect::<Vec<_>>();// combinator from futures' StreamExt
In this way, it lets users to customize the whole parallel group in one config. It would work seamlessly with existing futures' combinators.
Here we may move to more thorough discussion to help the design to evolve. It's fine for me to look more carefully to my work and find a way to combine them.
The text was updated successfully, but these errors were encountered:
From the perspectives of ergonomy keeping the functionality as an extension trait rather than a parallel (hah) siloed ecosystem is much more flexible. Both crates have been interesting to use but it's nice to see some exploration into alternative approaches.
On Restrospection
I've been looking for parallelism combined with async/await and thanks for your decent work.
I'd like to share to retrospection on pulling this crate in my existing project.
My project is filled with stream combinators from futures' StreamExt and async_std's StreamExt. They are extension traits of futures' Stream. In this way, any streams with
Stream
trait equip with extended combinators automatically. It's great convenience when you're writing your own stream type.On the contrary, the parallel-stream's ParallelStream is alien to above extension traits. It's a standalone trait with a family of implemented types. That is, when a stream is turned into a ParallelStream, it loses all combinators from those extension traits. Also, things get more complex when writing your own stream type. I think the
limit()
ofParallelStream
is the root of evil.I noticed aggressive trait bounds that is hard to satisfy. For example, the map method requires the
f
to haveSend
,Sync
andCopy
. Only few and very special types have bothSync
andCopy
. It makesmap
useless because it restricts the closure cannot have a local variable lacking one of the traits.On Alternative Design
I gather above thoughts and attempted an alternative based on your work, and it comes the par-stream. Basically it provides an extension trait
ParStreamExt
to futures'sStream
and solve the trait bound issue. Thelimit
is given on demand. It sets the # of workers only for that stage. It's not 100% equal to your design because thelimit
only applies to one stage rather than a group of stages. Instead, I moved your design to another particular API.To the limit the workers of a group of combinators, I suggest the builder patten. So far, it's implemented yet in my crate, but we can see how it would become here.
In this way, it lets users to customize the whole parallel group in one config. It would work seamlessly with existing futures' combinators.
Here we may move to more thorough discussion to help the design to evolve. It's fine for me to look more carefully to my work and find a way to combine them.
The text was updated successfully, but these errors were encountered: