-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multi-consumption of an AsyncSequence
#110
Comments
I think this should be a built in feature of async sequences. When I first started using them, I assumed this would work (coming from RX and combine) but was greeted with errors. You can have multiple for each loops for regular sequences. So I think semantically it makes sense. Maybe a SE proposal for this? Rather than it being part of a separate package. |
Here's (gist) my not very Swifty attempt, main thing I was going for was cancelling all the other streams would cancel the original. |
I think solutions in this territory are interesting; The one issue is that more often than not the buffer may be indefinite/unbounded. I think the uses are perhaps the important point here - I don't think that it is that folks need to replay all values in very distant parts of code execution but instead it feels like there are code locations that have short execution distances from each other that would want to use the values at the same time. So:
|
I believe you are right, I cannot think of a use case where all values should be replayed. The feature I wanted to talk about was rather to "jump in the wagon" and start receiving the remaining values of the stream without impacting other consumers. The reduce function would be used for that: to get the "synthesis" of all the previous values before receiving the remaining ones. |
a stream from a large file should be pretty cheap; If the rate of one consumer is less than another then there needs to be some sort of buffering or back pressure to accommodate for the slowest consumer. Else-wise there will be dropped values. The problem is knowing the number of consumers ahead of time. If you can know that then you can make back pressure work. |
@phausler I see that the multi-consumption was already discussed in a thread about
That's exactly what I meant in the first place (sorry if that was unclear). A way for multiple consumers to receive values emitted by a single sequence. Very much like a publisher-subscribers relationship. Is this this planned to add such a feature? When I experimented with func testAsyncChannel() {
let channel = AsyncChannel<Int>()
Task {
await channel.send(1)
await channel.send(2)
}
Task {
for await value in channel {
print("A", value)
}
}
Task {
for await value in channel {
print("B", value)
}
}
} // prints:
// A 1
// B 2 I have got some uses cases were it would be useful to have this logic available. For instance with a re-implementation of |
There are some unanswered questions with regards to sharing behaviors. For example; is it the best route to go having it be an "operator" style API? Combine takes the approach of multicast+autoconnect, does that make sense? or does some other composed behavior split make more sense? How can we make the api more approachable so that it is clear on when you need it? The list goes on. Personally I think the multicast+autoconnect is a bit complex of a concept, and instead breaking it up into concepts of sharing iteration or sharing values seems to me more approachable if you are not experienced with functional reactive programming. I also contend that splitting into a known number of replications can allow us to make some really nice behavioral aspects with regards to cancellation. The general role that share plays to me seems quite useful and needed, just not the forefront of need. The property wrapper thing brings up a ton of other questions; namely of which are more aimed at groups working at Apple on SwiftUI to answer. Since in truth, even though |
I understand now that Regarding the Thank you a lot for taking the time to answer my questions. If that's ok I'll close the issue in a few days if no comments are added. |
Perhaps the parts that we can help identify here are: what pieces do we need to build such a thing? And what are it's restrictions? First and foremost: any such property wrapper would have a couple of characteristics. Out of those questions; what parts can we build independently to make constructing that thing easy/safe? |
One idea that I had a bit ago is to do something like this: @propertyWrapper
public struct ThisDeservesABetterName<Element> {
public private(set) var wrappedValue: Element
let channel = AsyncChannel<Element>()
public init(wrappedValue: Element) {
self.wrappedValue = wrappedValue
}
public struct Projection: AsyncSequence {
public struct Iterator: AsyncIteratorProtocol {
var iterator: AsyncChannel<Element>.Iterator
public mutating func next() async -> Element? {
return await iterator.next()
}
}
var value: Element
let channel: AsyncChannel<Element>
public mutating func setValue(_ value: Element) async {
self.value = value
await channel.send(value)
}
public func makeAsyncIterator() -> Iterator {
Iterator(iterator: channel.makeAsyncIterator())
}
}
public var projectedValue: Projection {
get {
Projection(value: wrappedValue, channel: channel)
}
set {
wrappedValue = newValue.value
}
}
} |
I think there are some expectations that may not meet. It should use back pressure not buffering: e.g. the next item should await all consumption of previous next calls to be done. It should rethrow failures if and only if the base throws. |
Would love to have a built-in type to do this. We use Combine for some services that are shared throughout the app as well as view model outputs. I'm hoping to replace these with AsyncSequences in some way so we can enter into the Swift Concurrency world at the view level instead of views only seeing a Combine interface. The ability to have multiple consumers of a single AsyncSequence and access to the last sent value is a must-have. |
I agree that this is an important feature to iterate on (pun most assuredly intended). However the battle that we must face is that If we can somehow leverage things in the language that help to that end it would go a long way to making it more of a concrete item. Even though personally I find a share or split operator concept for |
Was trying to implement one of these behaviors myself (one sequence's elements consumed once each by multiple concurrent consumers) in this thread and got sent here to make a feature request, so +1 |
Any updates on this? We need the broadcasting behaviour in our app and currently rely on Asynchrone's SharedAsyncSequence. Would be nice to see an "official solution". |
+1 on this |
+1 |
Just make a channel per client, have the server loop the main sequence and send to all client channels. |
Checking back for any updates on this |
Hi, I have been reading the documentation for this repository and it left me wondering: is this planned to introduce a feature to allow several tasks to consume the same
AsyncSequence
?If this is not the right place to offer ideas, please let me know. I don't think this would fit in a proposal.
Context
I recently had to stream a file using
URLSession
and two pieces of code were interested in the streamed values. Rather than starting a stream twice which would not be efficient, I wanted to allow the stream to be consumed by several tasks. The data that were already streamed would be sent when a new consumption is set up.Basic implementation
I tried to implement such a solution (in a gist) that will use a reduce function on the already emitted values and emit the result when a new task starts consuming the sequence. It's far from being perfect but I think it might help understanding the idea.
Implementation
Usage
Some considerations about efficiency can be found in the gist.
The text was updated successfully, but these errors were encountered: