From 4b78a26f816a92da6e1dd32d9712f1846fd1e8c4 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Sun, 17 Sep 2023 22:59:02 -0700 Subject: [PATCH] channels readme --- README.md | 77 +++++++++++++++++++ .../main/scala/kyo/concurrent/channels.scala | 4 - 2 files changed, 77 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 6959f1b96..9d2f778ec 100644 --- a/README.md +++ b/README.md @@ -894,6 +894,83 @@ val b: Boolean > IOs = ### Channels: Asynchronous Communication +The `Channels` effect offers a high-level abstraction for concurrent communication. Built atop `IOs` and `Fibers`, it enables safe data transfer between different parts of your application, whether they're running in different fibers or threads. + +**Bounded channels** +```scala +import kyo.concurrent.channels._ + +// A 'bounded' channel rejects new +// elements once full, does not provide +// 'put' and 'take' methods +val a: Channel[Int] > IOs = + Channels.bounded(capacity = 42) + +// Obtain the number of items in the channel +// via the method 'size' in 'Channel' +val b: Int > IOs = + a.map(_.size) + +// Try to offer a new item +val c: Boolean > IOs = + a.map(_.offer(42)) + +// Try to poll an item +val d: Option[Int] > IOs = + a.map(_.poll) + +// Check if channel is empty +val e: Boolean > IOs = + a.map(_.isEmpty) + +// Check if channel is full +val f: Boolean > IOs = + a.map(_.isFull) +``` + +**Dropping and sliding channels** +```scala +// A 'dropping' channel discards new entries +// when full +val a: Channels.Unbounded[Int] > IOs = + Channels.dropping(capacity = 42) + +// A 'sliding' channel discards the oldest +// entries if necessary to make space for new +// entries +val b: Channels.Unbounded[Int] > IOs = + Channels.sliding(capacity = 42) + +// Note how 'dropping' and 'sliding' channels +// return 'Channel.Unbounded`. It provides +// an additional method to 'put' new items +// unconditionally +val c: Unit > IOs = + b.map(_.put(42)) +``` + +**Blocking channels** +```scala +// A 'blocking' channel integrates with the +// 'Fibers' effect to provide 'take' and 'put' +// methods that allow fibers to wait for +// space in the channel or for new elements +val a: Channels.Blocking[Int] > IOs = + Channels.blocking(capacity = 42) + +// The 'put' method suspends the current fiber +// in case the channel has no space. Once space +// is made available, the new item is added to the +// channel and the fiber is resumed +val b: Unit > (Fibers with IOs) = + a.map(_.put(42)) + +// The 'take' method suspends the current fiber +// if the channel is empty. Once a new item arrives +// the fiber is resumed with the new entry +val c: Int > (Fibers with IOs) = + a.map(_.take) +``` License diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala index d2f9009e9..14ca179c1 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala @@ -29,10 +29,6 @@ object channels { abstract class Unbounded[T] extends Channel[T] { - def offer[S](v: T > S): Boolean > (IOs with S) - - def poll: Option[T] > IOs - def put[S](v: T > S): Unit > (IOs with S) }