Skip to content

Commit

Permalink
more consistent API (init*) for methods that initialize/instantiate o…
Browse files Browse the repository at this point in the history
…bjects
  • Loading branch information
fwbrasil committed Oct 5, 2023
1 parent de88068 commit 28db5cf
Show file tree
Hide file tree
Showing 15 changed files with 62 additions and 62 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ val a: Queue[Int] > IOs =
)
```

### Channels: Asynchronous Communication
### Channels: Backpressured Communication

The `Channels` effect serves as an advanced concurrency primitive, designed to facilitate seamless and backpressured data transfer between various parts of your application. Built upon the `Fibers` effect, `Channels` not only ensures thread-safe communication but also incorporates a backpressure mechanism. This mechanism temporarily suspends fibers under specific conditions—either when waiting for new items to arrive or when awaiting space to add new items.

Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/ChainedForkBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ChainedForkBench extends Bench.ForkOnly[Int] {
else IOs.unit.flatMap(_ => Fibers.forkFiber(iterate(p, n - 1)).unit)

for {
p <- Fibers.promise[Unit]
p <- Fibers.initPromise[Unit]
_ <- Fibers.forkFiber(iterate(p, depth))
_ <- p.get
} yield 0
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/ForkManyBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ForkManyBench extends Bench.ForkOnly[Int] {
else io.flatMap(_ => repeat(n - 1)(io))

for {
promise <- Fibers.promise[Unit]
promise <- Fibers.initPromise[Unit]
ref <- Atomics.initInt(depth)
effect = ref.decrementAndGet.flatMap {
case 1 =>
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class PingPongBench extends Bench.ForkOnly[Unit] {
} yield ()

for {
promise <- Fibers.promise[Unit]
promise <- Fibers.initPromise[Unit]
_ <- Fibers.forkFiber(iterate(promise, depth))
_ <- promise.get
} yield ()
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/SemaphoreBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SemaphoreBench extends Bench.ForkOnly[Unit] {
else
s.run(()).flatMap(_ => loop(s, i + 1))

Meters.semaphore(1).flatMap(loop(_, 0))
Meters.initSemaphore(1).flatMap(loop(_, 0))
}

def zioBench(): UIO[Unit] = {
Expand Down
2 changes: 1 addition & 1 deletion kyo-chatgpt/jvm/src/main/scala/kyo/chatgpt/UI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ object UI extends scala.App {
IOs.run {
App.runFiber(Duration.Inf) {
for {
p <- Fibers.promise[String]
p <- Fibers.initPromise[String]
_ <- chan.put((message, enabledModes.toList, p))
r <- p.get
} yield r
Expand Down
8 changes: 4 additions & 4 deletions kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object channels {
object Channels {

def init[T](capacity: Int, access: Access = Access.Mpmc): Channel[T] > IOs =
Queues.bounded[T](capacity, access).map { queue =>
Queues.initBounded[T](capacity, access).map { queue =>
IOs {
new Channel[T] {

Expand Down Expand Up @@ -71,7 +71,7 @@ object channels {
if (u.offer(v)) {
Fibers.value(())
} else {
val p = Fibers.unsafePromise[Unit]
val p = Fibers.unsafeInitPromise[Unit]
puts.add((v, p))
p
}
Expand All @@ -88,7 +88,7 @@ object channels {
case Some(v) =>
Fibers.value(v)
case None =>
val p = Fibers.unsafePromise[T]
val p = Fibers.unsafeInitPromise[T]
takes.add(p)
p
}
Expand All @@ -108,7 +108,7 @@ object channels {
takes.add(p)
case Some(v) =>
if (!p.unsafeComplete(v) && !u.offer(v)) {
val p = Fibers.unsafePromise[Unit]
val p = Fibers.unsafeInitPromise[Unit]
puts.add((v, p))
}
}
Expand Down
8 changes: 4 additions & 4 deletions kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,12 @@ object fibers {
def fail[T](ex: Throwable): Fiber[T] =
Fiber.failed(ex)

private val _promise = IOs(unsafePromise[Object])
private val _promise = IOs(unsafeInitPromise[Object])

def promise[T]: Promise[T] > IOs =
def initPromise[T]: Promise[T] > IOs =
_promise.asInstanceOf[Promise[T] > IOs]

private[kyo] def unsafePromise[T]: Promise[T] =
private[kyo] def unsafeInitPromise[T]: Promise[T] =
Fiber.promise(new IOPromise[T]())

// compiler bug workaround
Expand Down Expand Up @@ -459,7 +459,7 @@ object fibers {
IOs(Fiber.promise(new IOPromise[Unit]))

def sleep(d: Duration): Unit > (IOs with Fibers with Timers) =
promise[Unit].map { p =>
initPromise[Unit].map { p =>
if (d.isFinite) {
val run: Unit > IOs =
IOs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object latches {
} else {
IOs {
new Latch {
val promise = Fibers.unsafePromise[Unit]
val promise = Fibers.unsafeInitPromise[Unit]
val count = new AtomicInteger(n)

val await = promise.get
Expand Down
8 changes: 4 additions & 4 deletions kyo-core/shared/src/main/scala/kyo/concurrent/meters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ object meters {

object Meters {

def mutex: Meter > IOs =
semaphore(1)
val initMutex: Meter > IOs =
initSemaphore(1)

def semaphore(concurrency: Int): Meter > IOs =
def initSemaphore(concurrency: Int): Meter > IOs =
Channels.init[Unit](concurrency).map { chan =>
offer(concurrency, chan, ()).map { _ =>
new Meter {
Expand All @@ -57,7 +57,7 @@ object meters {
}
}

def rateLimiter(rate: Int, period: Duration): Meter > (IOs with Timers) =
def initRateLimiter(rate: Int, period: Duration): Meter > (IOs with Timers) =
Channels.init[Unit](rate).map { chan =>
Timers.scheduleAtFixedRate(period)(offer(rate, chan, ())).map { _ =>
new Meter {
Expand Down
12 changes: 6 additions & 6 deletions kyo-core/shared/src/main/scala/kyo/concurrent/queues.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object queues {
def peek() = None
}

def bounded[T](capacity: Int, access: Access = Access.Mpmc): Queue[T] > IOs =
def initBounded[T](capacity: Int, access: Access = Access.Mpmc): Queue[T] > IOs =
IOs {
capacity match {
case c if (c <= 0) =>
Expand Down Expand Up @@ -80,7 +80,7 @@ object queues {
}
}

def unbounded[T](access: Access = Access.Mpmc, chunkSize: Int = 8): Unbounded[T] > IOs =
def initUnbounded[T](access: Access = Access.Mpmc, chunkSize: Int = 8): Unbounded[T] > IOs =
IOs {
access match {
case Access.Mpmc =>
Expand All @@ -94,8 +94,8 @@ object queues {
}
}

def dropping[T](capacity: Int, access: Access = Access.Mpmc): Unbounded[T] > IOs =
bounded[T](capacity, access).map { q =>
def initDropping[T](capacity: Int, access: Access = Access.Mpmc): Unbounded[T] > IOs =
initBounded[T](capacity, access).map { q =>
val u = q.unsafe
val c = capacity
new Unbounded(
Expand All @@ -111,8 +111,8 @@ object queues {
)
}

def sliding[T](capacity: Int, access: Access = Access.Mpmc): Unbounded[T] > IOs =
bounded[T](capacity, access).map { q =>
def initSliding[T](capacity: Int, access: Access = Access.Mpmc): Unbounded[T] > IOs =
initBounded[T](capacity, access).map { q =>
val u = q.unsafe
val c = capacity
new Unbounded(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ class fibersTest extends KyoTest {
"promise" - {
"complete" in run {
for {
p <- Fibers.promise[Int]
p <- Fibers.initPromise[Int]
a <- p.complete(1)
b <- p.isDone
c <- p.get
} yield assert(a && b && c == 1)
}
"complete twice" in run {
for {
p <- Fibers.promise[Int]
p <- Fibers.initPromise[Int]
a <- p.complete(1)
b <- p.complete(2)
c <- p.isDone
Expand All @@ -48,7 +48,7 @@ class fibersTest extends KyoTest {
"failure" in run {
val ex = new Exception
for {
p <- Fibers.promise[Int]
p <- Fibers.initPromise[Int]
a <- p.complete(throw ex)
b <- p.isDone
c <- p.getTry
Expand Down
44 changes: 22 additions & 22 deletions kyo-core/shared/src/test/scala/kyoTest/concurrent/metersTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ class metersTest extends KyoTest {
"mutex" - {
"ok" in runJVM {
for {
t <- Meters.mutex
t <- Meters.initMutex
v <- t.run(2)
} yield assert(v == 2)
}

"run" in runJVM {
for {
t <- Meters.mutex
p <- Fibers.promise[Int]
b1 <- Fibers.promise[Unit]
t <- Meters.initMutex
p <- Fibers.initPromise[Int]
b1 <- Fibers.initPromise[Unit]
f1 <- Fibers.forkFiber(t.run(b1.complete(()).map(_ => p.block)))
_ <- b1.get
a1 <- t.isAvailable
b2 <- Fibers.promise[Unit]
b2 <- Fibers.initPromise[Unit]
f2 <- Fibers.forkFiber(b2.complete(()).map(_ => t.run(2)))
_ <- b2.get
a2 <- t.isAvailable
Expand All @@ -43,9 +43,9 @@ class metersTest extends KyoTest {

"tryRun" in runJVM {
for {
sem <- Meters.semaphore(1)
p <- Fibers.promise[Int]
b1 <- Fibers.promise[Unit]
sem <- Meters.initSemaphore(1)
p <- Fibers.initPromise[Int]
b1 <- Fibers.initPromise[Unit]
f1 <- Fibers.forkFiber(sem.tryRun(b1.complete(()).map(_ => p.block)))
_ <- b1.get
a1 <- sem.isAvailable
Expand All @@ -60,24 +60,24 @@ class metersTest extends KyoTest {
"semaphore" - {
"ok" in runJVM {
for {
t <- Meters.semaphore(2)
t <- Meters.initSemaphore(2)
v1 <- t.run(2)
v2 <- t.run(3)
} yield assert(v1 == 2 && v2 == 3)
}

"run" in runJVM {
for {
t <- Meters.semaphore(2)
p <- Fibers.promise[Int]
b1 <- Fibers.promise[Unit]
t <- Meters.initSemaphore(2)
p <- Fibers.initPromise[Int]
b1 <- Fibers.initPromise[Unit]
f1 <- Fibers.forkFiber(t.run(b1.complete(()).map(_ => p.block)))
_ <- b1.get
b2 <- Fibers.promise[Unit]
b2 <- Fibers.initPromise[Unit]
f2 <- Fibers.forkFiber(t.run(b2.complete(()).map(_ => p.block)))
_ <- b2.get
a1 <- t.isAvailable
b3 <- Fibers.promise[Unit]
b3 <- Fibers.initPromise[Unit]
f2 <- Fibers.forkFiber(b3.complete(()).map(_ => t.run(2)))
_ <- b3.get
a2 <- t.isAvailable
Expand All @@ -92,12 +92,12 @@ class metersTest extends KyoTest {

"tryRun" in runJVM {
for {
sem <- Meters.semaphore(2)
p <- Fibers.promise[Int]
b1 <- Fibers.promise[Unit]
sem <- Meters.initSemaphore(2)
p <- Fibers.initPromise[Int]
b1 <- Fibers.initPromise[Unit]
f1 <- Fibers.forkFiber(sem.tryRun(b1.complete(()).map(_ => p.block)))
_ <- b1.get
b2 <- Fibers.promise[Unit]
b2 <- Fibers.initPromise[Unit]
f2 <- Fibers.forkFiber(sem.tryRun(b2.complete(()).map(_ => p.block)))
_ <- b2.get
a1 <- sem.isAvailable
Expand All @@ -117,14 +117,14 @@ class metersTest extends KyoTest {
"rate limiter" - {
"ok" in runJVM {
for {
t <- Meters.rateLimiter(2, 10.millis)
t <- Meters.initRateLimiter(2, 10.millis)
v1 <- t.run(2)
v2 <- t.run(3)
} yield assert(v1 == 2 && v2 == 3)
}
"one loop" in runJVM {
for {
meter <- Meters.rateLimiter(10, 10.millis)
meter <- Meters.initRateLimiter(10, 10.millis)
counter <- Atomics.initInt(0)
f1 <- Fibers.forkFiber(loop(meter, counter))
_ <- Fibers.sleep(50.millis)
Expand All @@ -134,7 +134,7 @@ class metersTest extends KyoTest {
}
"two loops" in runJVM {
for {
meter <- Meters.rateLimiter(10, 10.millis)
meter <- Meters.initRateLimiter(10, 10.millis)
counter <- Atomics.initInt(0)
f1 <- Fibers.forkFiber(loop(meter, counter))
f2 <- Fibers.forkFiber(loop(meter, counter))
Expand All @@ -150,7 +150,7 @@ class metersTest extends KyoTest {

"run" in runJVM {
for {
meter <- Meters.pipeline(Meters.rateLimiter(2, 1.millis), Meters.mutex)
meter <- Meters.pipeline(Meters.initRateLimiter(2, 1.millis), Meters.initMutex)
counter <- Atomics.initInt(0)
f1 <- Fibers.forkFiber(loop(meter, counter))
f2 <- Fibers.forkFiber(loop(meter, counter))
Expand Down
Loading

0 comments on commit 28db5cf

Please sign in to comment.