From 8a6b5260c03c4f0c5ddd68e0a2fa4190a55f2014 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Sun, 26 Nov 2023 12:59:27 -0800 Subject: [PATCH] lists.fill + more hubs tests --- .../src/main/scala/kyo/concurrent/hubs.scala | 6 + .../shared/src/main/scala/kyo/lists.scala | 9 ++ .../scala/kyoTest/concurrent/hubsTest.scala | 115 ++++++++++++++++++ .../src/test/scala/kyoTest/listsTest.scala | 6 + 4 files changed, 136 insertions(+) diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/hubs.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/hubs.scala index f1a3bf48a..741fe4bad 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/hubs.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/hubs.scala @@ -108,6 +108,12 @@ object hubs { class Listener[T] private[hubs] (hub: Hub[T], child: Channel[T]) { + def size: Int > IOs = child.size + + def isEmpty: Boolean > IOs = child.isEmpty + + def isFull: Boolean > IOs = child.isFull + def poll: Option[T] > IOs = child.poll def takeFiber: Fiber[T] > IOs = child.takeFiber diff --git a/kyo-core/shared/src/main/scala/kyo/lists.scala b/kyo-core/shared/src/main/scala/kyo/lists.scala index 41c8ca4f3..5d2c64b22 100644 --- a/kyo-core/shared/src/main/scala/kyo/lists.scala +++ b/kyo-core/shared/src/main/scala/kyo/lists.scala @@ -78,6 +78,15 @@ object lists { } loop(v) } + + def fill[T, S](n: Int)(v: => T > S): List[T] > S = { + def loop(n: Int, acc: List[T]): List[T] > S = + n match { + case 0 => acc.reverse + case n => v.map(v => loop(n - 1, v :: acc)) + } + loop(n, Nil) + } } val Lists = new Lists } diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/hubsTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/hubsTest.scala index f63d7b4c7..5f32b9d80 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/hubsTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/hubsTest.scala @@ -10,6 +10,7 @@ import kyo.tries._ import kyoTest.KyoTest import scala.concurrent.duration._ +import kyo.lists.Lists class hubsTest extends KyoTest { @@ -72,4 +73,118 @@ class hubsTest extends KyoTest { v2 <- l.take } yield assert(b1 && v1 == None && b2 && v2 == 2) } + "close hub" in run { + for { + h <- Hubs.init[Int](2) + b <- h.offer(1) + _ <- retry(h.isEmpty) // wait transfer + l <- h.listen + c1 <- h.close + v1 <- Tries.run(h.listen) + v2 <- Tries.run(h.offer(2)) + v3 <- Tries.run(l.poll) + c2 <- l.close + } yield assert( + b && c1 == Some(Seq()) && v1.isFailure && v2.isFailure && v3.isFailure && c2 == None + ) + } + "close listener w/ buffer" in run { + for { + h <- Hubs.init[Int](2) + l1 <- h.listen(2) + b1 <- h.offer(1) + _ <- retry(l1.isEmpty.map(!_)) + c1 <- l1.close + l2 <- h.listen(2) + b2 <- h.offer(2) + _ <- retry(l2.isEmpty.map(!_)) + v2 <- l2.poll + c2 <- l2.close + } yield assert( + b1 && c1 == Some(Seq(1)) && b2 && v2 == Some(2) && c2 == Some(Seq()) + ) + } + "offer beyond capacity" in run { + for { + h <- Hubs.init[Int](2) + l <- h.listen + _ <- h.put(1) + _ <- h.put(2) + _ <- h.put(3) + b <- h.offer(4) + v1 <- l.take + v2 <- l.take + v3 <- l.take + v4 <- l.poll + } yield assert(!b && v1 == 1 && v2 == 2 && v3 == 3 && v4.isEmpty) + } + "concurrent listeners taking values" in run { + for { + h <- Hubs.init[Int](10) + l1 <- h.listen + l2 <- h.listen + _ <- h.offer(1) + v1 <- l1.take + v2 <- l2.take + } yield assert(v1 == 1 && v2 == 1) // Assuming listeners take different values + } + "listener removal" in run { + for { + h <- Hubs.init[Int](2) + l <- h.listen + _ <- h.offer(1) + _ <- retry(h.isEmpty) + c <- l.close + _ <- h.offer(2) + v <- Tries.run(l.poll) + } yield assert(c == Some(Seq()) && v.isFailure) + } + "hub closure with pending offers" in run { + for { + h <- Hubs.init[Int](2) + _ <- h.offer(1) + _ <- h.close + v <- Tries.run(h.offer(2)) + } yield assert(v.isFailure) + } + "create listener on empty hub" in run { + for { + h <- Hubs.init[Int](2) + l <- h.listen + v <- l.poll + } yield assert(v.isEmpty) + } + "contention" - { + "writes" in run { + for { + h <- Hubs.init[Int](2) + l <- h.listen + _ <- Lists.fill(100)(Fibers.fork(h.put(1))) + t <- Lists.fill(100)(l.take) + e1 <- h.isEmpty + e2 <- l.isEmpty + } yield assert(t == List.fill(100)(1) && e1 && e2) + } + "reads + writes" in run { + for { + h <- Hubs.init[Int](2) + l <- h.listen + _ <- Lists.fill(100)(Fibers.fork(h.put(1))) + t <- Lists.fill(100)(Fibers.fork(l.take).map(_.get)) + e1 <- h.isEmpty + e2 <- l.isEmpty + } yield assert(t == List.fill(100)(1) && e1 && e2) + } + "listeners" in run { + for { + h <- Hubs.init[Int](2) + l <- Lists.fill(100)(Fibers.fork(h.listen).map(_.get)) + _ <- Fibers.fork(h.put(1)) + t <- Lists.traverse(l)(l => Fibers.fork(l.take).map(_.get)) + e1 <- h.isEmpty + e2 <- Lists.traverse(l)(_.isEmpty) + } yield assert(t == List.fill(100)(1) && e1 && e2 == Lists.fill(100)(true)) + } + } + } diff --git a/kyo-core/shared/src/test/scala/kyoTest/listsTest.scala b/kyo-core/shared/src/test/scala/kyoTest/listsTest.scala index 868f9a01e..5ce47b4a1 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/listsTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/listsTest.scala @@ -70,4 +70,10 @@ class listsTest extends KyoTest { ) assert(acc == List(1, 2)) } + "fill" in { + checkEquals[List[Int], Nothing]( + IOs.run(Lists.fill(100)(IOs(1))), + List.fill(100)(1) + ) + } }