diff --git a/kyo-core/shared/src/main/scala/kyo/Channel.scala b/kyo-core/shared/src/main/scala/kyo/Channel.scala index 9e1c6fe4b..75ba6cc98 100644 --- a/kyo-core/shared/src/main/scala/kyo/Channel.scala +++ b/kyo-core/shared/src/main/scala/kyo/Channel.scala @@ -121,7 +121,7 @@ object Channel: * @return * A sequence containing all elements that were in the channel */ - def drain(using Frame): Seq[A] < (Abort[Closed] & IO) = IO.Unsafe(Abort.get(self.drain())) + def drain(using Frame): Chunk[A] < (Abort[Closed] & IO) = IO.Unsafe(Abort.get(self.drain())) /** Takes up to [[max]] elements from the channel. * @@ -158,6 +158,57 @@ object Channel: */ def full(using Frame): Boolean < (Abort[Closed] & IO) = IO.Unsafe(Abort.get(self.full())) + private def emitChunks(maxChunkSize: Int = Int.MaxValue)( + using + Tag[Emit[Chunk[A]]], + Frame + ): Ack < (Emit[Chunk[A]] & Abort[Closed] & Async) = + if maxChunkSize <= 0 then Ack.Stop + else if maxChunkSize == 1 then + Loop(()): _ => + Channel.take(self).map: v => + Emit.andMap(Chunk(v)): + case Ack.Stop => Loop.done(Ack.Stop) + case _ => Loop.continue(()) + else + val drainEffect = + if maxChunkSize == Int.MaxValue then Channel.drain(self) + else Channel.drainUpTo(self)(maxChunkSize - 1) + Loop[Unit, Ack, Abort[Closed] & Async & Emit[Chunk[A]]](()): _ => + Channel.take(self).map: a => + drainEffect.map: chunk => + Emit.andMap(Chunk(a).concat(chunk)): + case Ack.Stop => Loop.done(Ack.Stop) + case _ => Loop.continue(()) + + /** Stream elements from channel, optionally specifying a maximum chunk size. In the absence of [[maxChunkSize]], chunk sizes will + * be limited only by channel capacity or the number of elements in the channel at a given time. (Chunks can still be larger than + * channel capacity.) Consumes elements from channel. Fails on channel closure. + * + * @param maxChunkSize + * Maximum number of elements to take for each chunk + * + * @return + * Asynchronous stream of elements in this channel + */ + def stream(maxChunkSize: Int = Int.MaxValue)(using Tag[Emit[Chunk[A]]], Frame): Stream[A, Abort[Closed] & Async] = + Stream(emitChunks(maxChunkSize)) + + /** Like [[stream]] but stops streaming when the channel closes instead of failing + * + * @param maxChunkSize + * Maximum number of elements to take for each chunk + * + * @return + * Asynchronous stream of elements in this channel + */ + def streamUntilClosed(maxChunkSize: Int = Int.MaxValue)(using Tag[Emit[Chunk[A]]], Frame): Stream[A, Async] = + Stream: + Abort.run[Closed](emitChunks(maxChunkSize)).map: + case Result.Success(v) => v + case Result.Fail(_) => Ack.Stop + case Result.Panic(e) => Abort.panic(e) + def unsafe: Unsafe[A] = self end extension diff --git a/kyo-core/shared/src/test/scala/kyo/ChannelTest.scala b/kyo-core/shared/src/test/scala/kyo/ChannelTest.scala index 3ff25a353..99795662f 100644 --- a/kyo-core/shared/src/test/scala/kyo/ChannelTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/ChannelTest.scala @@ -453,4 +453,172 @@ class ChannelTest extends Test: } } + "stream" - { + "should stream from channel" in run { + for + c <- Channel.init[Int](4) + _ <- Kyo.foreach(1 to 4)(c.put) + stream = c.stream().take(4) + v <- stream.run + yield assert(v == Chunk(1, 2, 3, 4)) + } + "stream with zero or negative maxChunkSize should stop" in run { + for + c <- Channel.init[Int](4) + _ <- Kyo.foreach(1 to 4)(c.put) + s0 = c.stream(0) + sn = c.stream(-5) + r0 <- s0.run + rn <- sn.run + s <- c.size + yield assert(r0 == Chunk.empty && rn == Chunk.empty && s == 4) + end for + } + "stream with maxChunkSize of 1 should stream in chunks of 1" in run { + for + c <- Channel.init[Int](4) + _ <- Kyo.foreach(1 to 4)(c.put) + stream = c.stream(1).mapChunk(Chunk(_)).take(4) + r <- stream.run + s <- c.size + yield assert(r == Chunk(Chunk(1), Chunk(2), Chunk(3), Chunk(4)) && s == 0) + end for + } + "should stream from channel without specified chunk size" in run { + for + c <- Channel.init[Int](4) + _ <- Kyo.foreach(1 to 4)(c.put) + stream = c.stream().mapChunk(ch => Chunk(ch)).take(1) + v <- stream.run + s <- c.size + yield assert(v == Chunk(Chunk(1, 2, 3, 4)) && s == 0) + } + + "should stream from channel with a specified chunk size" in run { + for + c <- Channel.init[Int](4) + _ <- Kyo.foreach(1 to 4)(c.put) + stream = c.stream(2).mapChunk(ch => Chunk(ch)).take(2) + v <- stream.run + s <- c.size + yield assert(v == Chunk(Chunk(1, 2), Chunk(3, 4)) && s == 0) + } + + "should stream concurrently with ingest, without specified chunk size" in run { + for + c <- Channel.init[Int](4) + bg <- Async.run(Loop(0)(i => c.put(i).andThen(Loop.continue(i + 1)))) + stream = c.stream().take(20).mapChunk(ch => Chunk(ch)) + v <- stream.run + _ <- bg.interrupt + yield assert(v.flattenChunk == Chunk.from(0 until 20)) + } + + "should stream concurrently with ingest, with specified chunk size" in run { + for + c <- Channel.init[Int](4) + bg <- Async.run(Loop(0)(i => c.put(i).andThen(Loop.continue(i + 1)))) + stream = c.stream(2).take(20).mapChunk(ch => Chunk(ch)) + v <- stream.run + _ <- bg.interrupt + yield assert(v.flattenChunk == Chunk.from(0 until 20) && v.forall(_.size <= 2)) + } + + "should fail when channel is closed" in run { + for + c <- Channel.init[Int](3) + bg <- Async.run(Kyo.foreach(0 to 8)(c.put).andThen(c.close)) + stream = c.stream().mapChunk(ch => Chunk(ch)) + v <- Abort.run(stream.run) + yield v match + case Result.Success(v) => fail(s"Stream succeeded unexpectedly: ${v}") + case Result.Fail(Closed(_, _, _)) => assert(true) + case Result.Panic(ex) => fail(s"Stream panicked unexpectedly: ${ex}") + } + } + + "streamUntilClosed" - { + "should stream from channel" in run { + for + c <- Channel.init[Int](4) + _ <- Kyo.foreach(1 to 4)(c.put) + stream = c.streamUntilClosed().take(4) + v <- stream.run + yield assert(v == Chunk(1, 2, 3, 4)) + } + "stream with zero or negative maxChunkSize should stop" in run { + for + c <- Channel.init[Int](4) + _ <- Kyo.foreach(1 to 4)(c.put) + s0 = c.streamUntilClosed(0) + sn = c.streamUntilClosed(-5) + r0 <- s0.run + rn <- sn.run + s <- c.size + yield assert(r0 == Chunk.empty && rn == Chunk.empty && s == 4) + end for + } + "stream with maxChunkSize of 1 should stream in chunks of 1" in run { + for + c <- Channel.init[Int](4) + _ <- Kyo.foreach(1 to 4)(c.put) + stream = c.streamUntilClosed(1).mapChunk(Chunk(_)).take(4) + r <- stream.run + s <- c.size + yield assert(r == Chunk(Chunk(1), Chunk(2), Chunk(3), Chunk(4)) && s == 0) + end for + } + "should stream from channel without specified chunk size" in run { + for + c <- Channel.init[Int](4) + _ <- Kyo.foreach(1 to 4)(c.put) + stream = c.streamUntilClosed().mapChunk(ch => Chunk(ch)).take(1) + v <- stream.run + s <- c.size + yield assert(v == Chunk(Chunk(1, 2, 3, 4)) && s == 0) + } + + "should stream from channel with a specified chunk size" in run { + for + c <- Channel.init[Int](4) + _ <- Kyo.foreach(1 to 4)(c.put) + stream = c.streamUntilClosed(2).mapChunk(ch => Chunk(ch)).take(2) + v <- stream.run + s <- c.size + yield assert(v == Chunk(Chunk(1, 2), Chunk(3, 4)) && s == 0) + } + + "should stream concurrently with ingest, without specified chunk size" in run { + for + c <- Channel.init[Int](4) + bg <- Async.run(Loop(0)(i => c.put(i).andThen(Loop.continue(i + 1)))) + stream = c.streamUntilClosed().take(20).mapChunk(ch => Chunk(ch)) + v <- stream.run + _ <- bg.interrupt + yield assert(v.flattenChunk == Chunk.from(0 until 20)) + } + + "should stream concurrently with ingest, with specified chunk size" in run { + for + c <- Channel.init[Int](4) + bg <- Async.run(Loop(0)(i => c.put(i).andThen(Loop.continue(i + 1)))) + stream = c.streamUntilClosed(2).take(20).mapChunk(ch => Chunk(ch)) + v <- stream.run + _ <- bg.interrupt + yield assert(v.flattenChunk == Chunk.from(0 until 20) && v.forall(_.size <= 2)) + } + + "should stop when channel is closed" in run { + val fullStream = Chunk(0, 1, 2, 3, 4, 5, 6, 7, 8) + for + c <- Channel.init[Int](3) + stream = c.streamUntilClosed() + f <- Async.run(stream.run) + _ <- Kyo.foreach(fullStream)(c.put).andThen(c.close) + r <- Fiber.get(f) + yield assert(r.size <= fullStream.size && r == fullStream.take(r.size)) + end for + } + } + end ChannelTest