Skip to content

Commit

Permalink
channels readme
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Sep 18, 2023
1 parent 81672c8 commit 4b78a26
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
77 changes: 77 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,83 @@ val b: Boolean > IOs =
### Channels: Asynchronous Communication

The `Channels` effect offers a high-level abstraction for concurrent communication. Built atop `IOs` and `Fibers`, it enables safe data transfer between different parts of your application, whether they're running in different fibers or threads.

**Bounded channels**
```scala
import kyo.concurrent.channels._

// A 'bounded' channel rejects new
// elements once full, does not provide
// 'put' and 'take' methods
val a: Channel[Int] > IOs =
Channels.bounded(capacity = 42)

// Obtain the number of items in the channel
// via the method 'size' in 'Channel'
val b: Int > IOs =
a.map(_.size)

// Try to offer a new item
val c: Boolean > IOs =
a.map(_.offer(42))

// Try to poll an item
val d: Option[Int] > IOs =
a.map(_.poll)

// Check if channel is empty
val e: Boolean > IOs =
a.map(_.isEmpty)

// Check if channel is full
val f: Boolean > IOs =
a.map(_.isFull)
```

**Dropping and sliding channels**
```scala
// A 'dropping' channel discards new entries
// when full
val a: Channels.Unbounded[Int] > IOs =
Channels.dropping(capacity = 42)

// A 'sliding' channel discards the oldest
// entries if necessary to make space for new
// entries
val b: Channels.Unbounded[Int] > IOs =
Channels.sliding(capacity = 42)

// Note how 'dropping' and 'sliding' channels
// return 'Channel.Unbounded`. It provides
// an additional method to 'put' new items
// unconditionally
val c: Unit > IOs =
b.map(_.put(42))
```

**Blocking channels**
```scala
// A 'blocking' channel integrates with the
// 'Fibers' effect to provide 'take' and 'put'
// methods that allow fibers to wait for
// space in the channel or for new elements
val a: Channels.Blocking[Int] > IOs =
Channels.blocking(capacity = 42)

// The 'put' method suspends the current fiber
// in case the channel has no space. Once space
// is made available, the new item is added to the
// channel and the fiber is resumed
val b: Unit > (Fibers with IOs) =
a.map(_.put(42))

// The 'take' method suspends the current fiber
// if the channel is empty. Once a new item arrives
// the fiber is resumed with the new entry
val c: Int > (Fibers with IOs) =
a.map(_.take)
```


License
Expand Down
4 changes: 0 additions & 4 deletions kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ object channels {

abstract class Unbounded[T] extends Channel[T] {

def offer[S](v: T > S): Boolean > (IOs with S)

def poll: Option[T] > IOs

def put[S](v: T > S): Unit > (IOs with S)
}

Expand Down

0 comments on commit 4b78a26

Please sign in to comment.