Skip to content

Commit

Permalink
lists.fill + more hubs tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Nov 26, 2023
1 parent 81c176d commit 8a6b526
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 0 deletions.
6 changes: 6 additions & 0 deletions kyo-core/shared/src/main/scala/kyo/concurrent/hubs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ object hubs {

class Listener[T] private[hubs] (hub: Hub[T], child: Channel[T]) {

def size: Int > IOs = child.size

def isEmpty: Boolean > IOs = child.isEmpty

def isFull: Boolean > IOs = child.isFull

def poll: Option[T] > IOs = child.poll

def takeFiber: Fiber[T] > IOs = child.takeFiber
Expand Down
9 changes: 9 additions & 0 deletions kyo-core/shared/src/main/scala/kyo/lists.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ object lists {
}
loop(v)
}

def fill[T, S](n: Int)(v: => T > S): List[T] > S = {
def loop(n: Int, acc: List[T]): List[T] > S =
n match {
case 0 => acc.reverse
case n => v.map(v => loop(n - 1, v :: acc))
}
loop(n, Nil)
}
}
val Lists = new Lists
}
115 changes: 115 additions & 0 deletions kyo-core/shared/src/test/scala/kyoTest/concurrent/hubsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import kyo.tries._
import kyoTest.KyoTest

import scala.concurrent.duration._
import kyo.lists.Lists

class hubsTest extends KyoTest {

Expand Down Expand Up @@ -72,4 +73,118 @@ class hubsTest extends KyoTest {
v2 <- l.take
} yield assert(b1 && v1 == None && b2 && v2 == 2)
}
"close hub" in run {
for {
h <- Hubs.init[Int](2)
b <- h.offer(1)
_ <- retry(h.isEmpty) // wait transfer
l <- h.listen
c1 <- h.close
v1 <- Tries.run(h.listen)
v2 <- Tries.run(h.offer(2))
v3 <- Tries.run(l.poll)
c2 <- l.close
} yield assert(
b && c1 == Some(Seq()) && v1.isFailure && v2.isFailure && v3.isFailure && c2 == None
)
}
"close listener w/ buffer" in run {
for {
h <- Hubs.init[Int](2)
l1 <- h.listen(2)
b1 <- h.offer(1)
_ <- retry(l1.isEmpty.map(!_))
c1 <- l1.close
l2 <- h.listen(2)
b2 <- h.offer(2)
_ <- retry(l2.isEmpty.map(!_))
v2 <- l2.poll
c2 <- l2.close
} yield assert(
b1 && c1 == Some(Seq(1)) && b2 && v2 == Some(2) && c2 == Some(Seq())
)
}
"offer beyond capacity" in run {
for {
h <- Hubs.init[Int](2)
l <- h.listen
_ <- h.put(1)
_ <- h.put(2)
_ <- h.put(3)
b <- h.offer(4)
v1 <- l.take
v2 <- l.take
v3 <- l.take
v4 <- l.poll
} yield assert(!b && v1 == 1 && v2 == 2 && v3 == 3 && v4.isEmpty)
}
"concurrent listeners taking values" in run {
for {
h <- Hubs.init[Int](10)
l1 <- h.listen
l2 <- h.listen
_ <- h.offer(1)
v1 <- l1.take
v2 <- l2.take
} yield assert(v1 == 1 && v2 == 1) // Assuming listeners take different values
}
"listener removal" in run {
for {
h <- Hubs.init[Int](2)
l <- h.listen
_ <- h.offer(1)
_ <- retry(h.isEmpty)
c <- l.close
_ <- h.offer(2)
v <- Tries.run(l.poll)
} yield assert(c == Some(Seq()) && v.isFailure)
}
"hub closure with pending offers" in run {
for {
h <- Hubs.init[Int](2)
_ <- h.offer(1)
_ <- h.close
v <- Tries.run(h.offer(2))
} yield assert(v.isFailure)
}
"create listener on empty hub" in run {
for {
h <- Hubs.init[Int](2)
l <- h.listen
v <- l.poll
} yield assert(v.isEmpty)
}
"contention" - {
"writes" in run {
for {
h <- Hubs.init[Int](2)
l <- h.listen
_ <- Lists.fill(100)(Fibers.fork(h.put(1)))
t <- Lists.fill(100)(l.take)
e1 <- h.isEmpty
e2 <- l.isEmpty
} yield assert(t == List.fill(100)(1) && e1 && e2)
}
"reads + writes" in run {
for {
h <- Hubs.init[Int](2)
l <- h.listen
_ <- Lists.fill(100)(Fibers.fork(h.put(1)))
t <- Lists.fill(100)(Fibers.fork(l.take).map(_.get))
e1 <- h.isEmpty
e2 <- l.isEmpty
} yield assert(t == List.fill(100)(1) && e1 && e2)
}
"listeners" in run {
for {
h <- Hubs.init[Int](2)
l <- Lists.fill(100)(Fibers.fork(h.listen).map(_.get))
_ <- Fibers.fork(h.put(1))
t <- Lists.traverse(l)(l => Fibers.fork(l.take).map(_.get))
e1 <- h.isEmpty
e2 <- Lists.traverse(l)(_.isEmpty)
} yield assert(t == List.fill(100)(1) && e1 && e2 == Lists.fill(100)(true))
}
}

}
6 changes: 6 additions & 0 deletions kyo-core/shared/src/test/scala/kyoTest/listsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,10 @@ class listsTest extends KyoTest {
)
assert(acc == List(1, 2))
}
"fill" in {
checkEquals[List[Int], Nothing](
IOs.run(Lists.fill(100)(IOs(1))),
List.fill(100)(1)
)
}
}

0 comments on commit 8a6b526

Please sign in to comment.