diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala index 9bbccdcdb..2d329a6d0 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala @@ -126,6 +126,19 @@ object channels { } } } + if (u.isEmpty() && !puts.isEmpty() && !takes.isEmpty()) { + loop = true + val t = puts.poll() + if (t != null) { + val (v, p) = t + val p2 = takes.poll() + if (p2 != null && p2.unsafeComplete(v)) { + p.unsafeComplete(()) + } else { + puts.add(t) + } + } + } if (loop) flush() } } diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/queues.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/queues.scala index 5f062fc5c..a4243ae18 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/queues.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/queues.scala @@ -39,15 +39,17 @@ object queues { } private val zeroCapacity = - new Unsafe[Any] { - def capacity = 0 - def size = 0 - def isEmpty() = true - def isFull() = true - def offer(v: Any) = false - def poll() = None - def peek() = None - } + new Queue( + new Unsafe[Any] { + def capacity = 0 + def size = 0 + def isEmpty() = true + def isFull() = true + def offer(v: Any) = false + def poll() = None + def peek() = None + } + ) def init[T](capacity: Int, access: Access = Access.Mpmc): Queue[T] > IOs = IOs { diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/channelsTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/channelsTest.scala index 6aa999a65..4207a66bc 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/channelsTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/channelsTest.scala @@ -82,4 +82,11 @@ class channelsTest extends KyoTest { v <- f.get } yield assert(!d1 && d2 && v == 1) } + "no buffer" in runJVM { + for { + c <- Channels.init[Int](0) + _ <- c.putFiber(1) + v <- c.take + } yield assert(v == 1) + } } diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/queuesTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/queuesTest.scala index c089c78b7..940c59e63 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/queuesTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/queuesTest.scala @@ -36,6 +36,13 @@ class queuesTest extends KyoTest { b <- q.offer(3) } yield assert(!b) } + "zero capacity" in run { + for { + q <- Queues.init[Int](0) + b <- q.offer(1) + v <- q.poll + } yield assert(!b && v == None) + } } "unbounded" - {