Skip to content

Commit

Permalink
Stream from Channel (#918)
Browse files Browse the repository at this point in the history
- Adds `stream` and `streamUntilClosed` methods to `Channel`
- Optionally specify `maxChunkSize`
- Uses `drain` and `drainUpTo` under the hood

---------

Co-authored-by: Flavio Brasil <[email protected]>
  • Loading branch information
johnhungerford and fwbrasil authored Dec 11, 2024
1 parent b6a0185 commit e1fe3e3
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 1 deletion.
53 changes: 52 additions & 1 deletion kyo-core/shared/src/main/scala/kyo/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object Channel:
* @return
* A sequence containing all elements that were in the channel
*/
def drain(using Frame): Seq[A] < (Abort[Closed] & IO) = IO.Unsafe(Abort.get(self.drain()))
def drain(using Frame): Chunk[A] < (Abort[Closed] & IO) = IO.Unsafe(Abort.get(self.drain()))

/** Takes up to [[max]] elements from the channel.
*
Expand Down Expand Up @@ -158,6 +158,57 @@ object Channel:
*/
def full(using Frame): Boolean < (Abort[Closed] & IO) = IO.Unsafe(Abort.get(self.full()))

private def emitChunks(maxChunkSize: Int = Int.MaxValue)(
using
Tag[Emit[Chunk[A]]],
Frame
): Ack < (Emit[Chunk[A]] & Abort[Closed] & Async) =
if maxChunkSize <= 0 then Ack.Stop
else if maxChunkSize == 1 then
Loop(()): _ =>
Channel.take(self).map: v =>
Emit.andMap(Chunk(v)):
case Ack.Stop => Loop.done(Ack.Stop)
case _ => Loop.continue(())
else
val drainEffect =
if maxChunkSize == Int.MaxValue then Channel.drain(self)
else Channel.drainUpTo(self)(maxChunkSize - 1)
Loop[Unit, Ack, Abort[Closed] & Async & Emit[Chunk[A]]](()): _ =>
Channel.take(self).map: a =>
drainEffect.map: chunk =>
Emit.andMap(Chunk(a).concat(chunk)):
case Ack.Stop => Loop.done(Ack.Stop)
case _ => Loop.continue(())

/** Stream elements from channel, optionally specifying a maximum chunk size. In the absence of [[maxChunkSize]], chunk sizes will
* be limited only by channel capacity or the number of elements in the channel at a given time. (Chunks can still be larger than
* channel capacity.) Consumes elements from channel. Fails on channel closure.
*
* @param maxChunkSize
* Maximum number of elements to take for each chunk
*
* @return
* Asynchronous stream of elements in this channel
*/
def stream(maxChunkSize: Int = Int.MaxValue)(using Tag[Emit[Chunk[A]]], Frame): Stream[A, Abort[Closed] & Async] =
Stream(emitChunks(maxChunkSize))

/** Like [[stream]] but stops streaming when the channel closes instead of failing
*
* @param maxChunkSize
* Maximum number of elements to take for each chunk
*
* @return
* Asynchronous stream of elements in this channel
*/
def streamUntilClosed(maxChunkSize: Int = Int.MaxValue)(using Tag[Emit[Chunk[A]]], Frame): Stream[A, Async] =
Stream:
Abort.run[Closed](emitChunks(maxChunkSize)).map:
case Result.Success(v) => v
case Result.Fail(_) => Ack.Stop
case Result.Panic(e) => Abort.panic(e)

def unsafe: Unsafe[A] = self
end extension

Expand Down
168 changes: 168 additions & 0 deletions kyo-core/shared/src/test/scala/kyo/ChannelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -453,4 +453,172 @@ class ChannelTest extends Test:
}
}

"stream" - {
"should stream from channel" in run {
for
c <- Channel.init[Int](4)
_ <- Kyo.foreach(1 to 4)(c.put)
stream = c.stream().take(4)
v <- stream.run
yield assert(v == Chunk(1, 2, 3, 4))
}
"stream with zero or negative maxChunkSize should stop" in run {
for
c <- Channel.init[Int](4)
_ <- Kyo.foreach(1 to 4)(c.put)
s0 = c.stream(0)
sn = c.stream(-5)
r0 <- s0.run
rn <- sn.run
s <- c.size
yield assert(r0 == Chunk.empty && rn == Chunk.empty && s == 4)
end for
}
"stream with maxChunkSize of 1 should stream in chunks of 1" in run {
for
c <- Channel.init[Int](4)
_ <- Kyo.foreach(1 to 4)(c.put)
stream = c.stream(1).mapChunk(Chunk(_)).take(4)
r <- stream.run
s <- c.size
yield assert(r == Chunk(Chunk(1), Chunk(2), Chunk(3), Chunk(4)) && s == 0)
end for
}
"should stream from channel without specified chunk size" in run {
for
c <- Channel.init[Int](4)
_ <- Kyo.foreach(1 to 4)(c.put)
stream = c.stream().mapChunk(ch => Chunk(ch)).take(1)
v <- stream.run
s <- c.size
yield assert(v == Chunk(Chunk(1, 2, 3, 4)) && s == 0)
}

"should stream from channel with a specified chunk size" in run {
for
c <- Channel.init[Int](4)
_ <- Kyo.foreach(1 to 4)(c.put)
stream = c.stream(2).mapChunk(ch => Chunk(ch)).take(2)
v <- stream.run
s <- c.size
yield assert(v == Chunk(Chunk(1, 2), Chunk(3, 4)) && s == 0)
}

"should stream concurrently with ingest, without specified chunk size" in run {
for
c <- Channel.init[Int](4)
bg <- Async.run(Loop(0)(i => c.put(i).andThen(Loop.continue(i + 1))))
stream = c.stream().take(20).mapChunk(ch => Chunk(ch))
v <- stream.run
_ <- bg.interrupt
yield assert(v.flattenChunk == Chunk.from(0 until 20))
}

"should stream concurrently with ingest, with specified chunk size" in run {
for
c <- Channel.init[Int](4)
bg <- Async.run(Loop(0)(i => c.put(i).andThen(Loop.continue(i + 1))))
stream = c.stream(2).take(20).mapChunk(ch => Chunk(ch))
v <- stream.run
_ <- bg.interrupt
yield assert(v.flattenChunk == Chunk.from(0 until 20) && v.forall(_.size <= 2))
}

"should fail when channel is closed" in run {
for
c <- Channel.init[Int](3)
bg <- Async.run(Kyo.foreach(0 to 8)(c.put).andThen(c.close))
stream = c.stream().mapChunk(ch => Chunk(ch))
v <- Abort.run(stream.run)
yield v match
case Result.Success(v) => fail(s"Stream succeeded unexpectedly: ${v}")
case Result.Fail(Closed(_, _, _)) => assert(true)
case Result.Panic(ex) => fail(s"Stream panicked unexpectedly: ${ex}")
}
}

"streamUntilClosed" - {
"should stream from channel" in run {
for
c <- Channel.init[Int](4)
_ <- Kyo.foreach(1 to 4)(c.put)
stream = c.streamUntilClosed().take(4)
v <- stream.run
yield assert(v == Chunk(1, 2, 3, 4))
}
"stream with zero or negative maxChunkSize should stop" in run {
for
c <- Channel.init[Int](4)
_ <- Kyo.foreach(1 to 4)(c.put)
s0 = c.streamUntilClosed(0)
sn = c.streamUntilClosed(-5)
r0 <- s0.run
rn <- sn.run
s <- c.size
yield assert(r0 == Chunk.empty && rn == Chunk.empty && s == 4)
end for
}
"stream with maxChunkSize of 1 should stream in chunks of 1" in run {
for
c <- Channel.init[Int](4)
_ <- Kyo.foreach(1 to 4)(c.put)
stream = c.streamUntilClosed(1).mapChunk(Chunk(_)).take(4)
r <- stream.run
s <- c.size
yield assert(r == Chunk(Chunk(1), Chunk(2), Chunk(3), Chunk(4)) && s == 0)
end for
}
"should stream from channel without specified chunk size" in run {
for
c <- Channel.init[Int](4)
_ <- Kyo.foreach(1 to 4)(c.put)
stream = c.streamUntilClosed().mapChunk(ch => Chunk(ch)).take(1)
v <- stream.run
s <- c.size
yield assert(v == Chunk(Chunk(1, 2, 3, 4)) && s == 0)
}

"should stream from channel with a specified chunk size" in run {
for
c <- Channel.init[Int](4)
_ <- Kyo.foreach(1 to 4)(c.put)
stream = c.streamUntilClosed(2).mapChunk(ch => Chunk(ch)).take(2)
v <- stream.run
s <- c.size
yield assert(v == Chunk(Chunk(1, 2), Chunk(3, 4)) && s == 0)
}

"should stream concurrently with ingest, without specified chunk size" in run {
for
c <- Channel.init[Int](4)
bg <- Async.run(Loop(0)(i => c.put(i).andThen(Loop.continue(i + 1))))
stream = c.streamUntilClosed().take(20).mapChunk(ch => Chunk(ch))
v <- stream.run
_ <- bg.interrupt
yield assert(v.flattenChunk == Chunk.from(0 until 20))
}

"should stream concurrently with ingest, with specified chunk size" in run {
for
c <- Channel.init[Int](4)
bg <- Async.run(Loop(0)(i => c.put(i).andThen(Loop.continue(i + 1))))
stream = c.streamUntilClosed(2).take(20).mapChunk(ch => Chunk(ch))
v <- stream.run
_ <- bg.interrupt
yield assert(v.flattenChunk == Chunk.from(0 until 20) && v.forall(_.size <= 2))
}

"should stop when channel is closed" in run {
val fullStream = Chunk(0, 1, 2, 3, 4, 5, 6, 7, 8)
for
c <- Channel.init[Int](3)
stream = c.streamUntilClosed()
f <- Async.run(stream.run)
_ <- Kyo.foreach(fullStream)(c.put).andThen(c.close)
r <- Fiber.get(f)
yield assert(r.size <= fullStream.size && r == fullStream.take(r.size))
end for
}
}

end ChannelTest

0 comments on commit e1fe3e3

Please sign in to comment.