Skip to content

Commit

Permalink
Adds support for memoised resources (#177)
Browse files Browse the repository at this point in the history
This adds a construct allowing for lazy allocation/eager de-allocation of resources, which can be handy when users want to keep computationally heavy constructs in the background across several suites (distributed log consumers, and so on).

It is currently not cancel-safe, but I intend to specialise it to CE2/CE3 in a later unit of work
  • Loading branch information
Baccata authored Jan 14, 2021
1 parent 1c68902 commit ee863a1
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 15 deletions.
3 changes: 3 additions & 0 deletions modules/core/src-ce2/weaver/CECompat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ private[weaver] trait CECompat {
private[weaver] type Ref[F[_], A] = cats.effect.concurrent.Ref[F, A]
private[weaver] val Ref = cats.effect.concurrent.Ref

private[weaver] type Deferred[F[_], A] = cats.effect.concurrent.Deferred[F, A]
private[weaver] val Deferred = cats.effect.concurrent.Deferred

private[weaver] type Semaphore[F[_]] = cats.effect.concurrent.Semaphore[F]
private[weaver] val Semaphore = cats.effect.concurrent.Semaphore

Expand Down
3 changes: 3 additions & 0 deletions modules/core/src-ce3/weaver/CECompat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ private[weaver] trait CECompat {
private[weaver] type Ref[F[_], A] = cats.effect.kernel.Ref[F, A]
private[weaver] val Ref = cats.effect.kernel.Ref

private[weaver] type Deferred[F[_], A] = cats.effect.kernel.Deferred[F, A]
private[weaver] val Deferred = cats.effect.kernel.Deferred

private[weaver] type Semaphore[F[_]] = cats.effect.std.Semaphore[F]
private[weaver] val Semaphore = cats.effect.std.Semaphore

Expand Down
88 changes: 73 additions & 15 deletions modules/core/src/weaver/GlobalResourceF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,61 @@ trait GlobalResourceF[F[_]] extends GlobalResourceBase {
object GlobalResourceF {

trait Write[F[_]] {
protected implicit def F: Sync[F]
protected implicit def F: CECompat.Effect[F]
protected def rawPut[A](
pureOrLazy: Either[A, Resource[F, A]],
label: Option[String])(implicit rt: ResourceTag[A]): F[Unit]

def put[A](value: A, label: Option[String] = None)(
implicit rt: ResourceTag[A]): F[Unit]
implicit rt: ResourceTag[A]): F[Unit] = rawPut(Left(value), label)
def putR[A](value: A, label: Option[String] = None)(
implicit rt: ResourceTag[A]): Resource[F, Unit] =
CECompat.resourceLift(put(value, label))

/**
* Memoises a resource so to optimise its sharing. The memoised resource gets allocated
* lazily, when the first suite that needs it starts running, and gets finalised as soon
* as all suites that need it concurrently are done.
*
* In case the resource was already finalised when a suite needs, it gets re-allocated
* on demand.
*
* This can be useful for constructs that consume large amount of machine resources
* (CPU, memory, connections), to ensure they are cleaned-up when they should.
*/
def putLazy[A](
resource: Resource[F, A],
label: Option[String] = None)(implicit rt: ResourceTag[A]): F[Unit] =
MemoisedResource(resource).flatMap(r => rawPut(Right(r), label))

def putLazyR[A](
resource: Resource[F, A],
label: Option[String] = None)(implicit
rt: ResourceTag[A]): Resource[F, Unit] =
CECompat.resourceLift(putLazy(resource, label))
}

trait Read[F[_]] {
protected implicit def F: MonadError[F, Throwable]

protected def rawGet[A](label: Option[String] = None)(
implicit rt: ResourceTag[A]): F[Option[Either[A, Resource[F, A]]]]

def get[A](label: Option[String] = None)(
implicit rt: ResourceTag[A]): F[Option[A]]
implicit rt: ResourceTag[A]): F[Option[A]] = rawGet[A](label).map {
case Some(Left(value)) => Some(value)
case _ => None
}

def getR[A](label: Option[String] = None)(
implicit rt: ResourceTag[A]): Resource[F, Option[A]] =
CECompat.resourceLift(get[A](label))
CECompat.resourceLift {
rawGet[A](label)
}.flatMap {
case Some(Left(value)) => Resource.pure(Some(value))
case Some(Right(resource)) => resource.map(Some(_))
case None => Resource.pure(None)
}

def getOrFail[A](label: Option[String] = None)(
implicit rt: ResourceTag[A]
Expand All @@ -58,31 +97,49 @@ object GlobalResourceF {
case None =>
F.raiseError(GlobalResourceF.ResourceNotFound(label, rt.description))
}

def getOrFailR[A](label: Option[String] = None)(
implicit rt: ResourceTag[A]): Resource[F, A] =
CECompat.resourceLift(getOrFail[A](label))

getR[A](label).flatMap {
case Some(value) => Resource.pure[F, A](value)
case None =>
CECompat.resourceLift(F.raiseError(GlobalResourceF.ResourceNotFound(
label,
rt.description)))
}
}

private[weaver] def createMap[F[_]: Sync]: F[Read[F] with Write[F]] =
def createMap[F[_]: CECompat.Effect]: F[Read[F] with Write[F]] =
Ref[F]
.of(Map.empty[(Option[String], ResourceTag[_]), Any])
.of(Map.empty[(Option[String], ResourceTag[_]),
Either[Any, Resource[F, Any]]])
.map(new ResourceMap(_))

private class ResourceMap[F[_]](
ref: Ref[F, Map[(Option[String], ResourceTag[_]), Any]])(
implicit val F: Sync[F])
ref: Ref[
F,
Map[(Option[String], ResourceTag[_]), Either[Any, Resource[F, Any]]]])(
implicit val F: CECompat.Effect[F])
extends Read[F]
with Write[F] { self =>

def put[A](value: A, label: Option[String])(
def rawPut[A](pureOrLazy: Either[A, Resource[F, A]], label: Option[String])(
implicit rt: ResourceTag[A]): F[Unit] = {
ref.update(_ + ((label, rt) -> value))
ref.update(_ + ((label, rt) -> pureOrLazy))
}

def get[A](label: Option[String])(
implicit rt: ResourceTag[A]): F[Option[A]] =
ref.get.map(_.get(label -> rt).flatMap(rt.cast))
def rawGet[A](label: Option[String])(
implicit rt: ResourceTag[A]): F[Option[Either[A, Resource[F, A]]]] =
ref.get.map(_.get(label -> rt)).map {
case None => None
case Some(Left(value)) => rt.cast(value).map(Left(_))
case Some(Right(resource)) =>
Some(Right(resource.map(rt.cast).map {
case None =>
F.raiseError(ResourceNotFound(label, rt.description))
case Some(value) => F.pure(value)
}.flatMap(CECompat.resourceLift(_))))
}

}

Expand All @@ -91,6 +148,7 @@ object GlobalResourceF {
override def getMessage(): String =
s"Could not find a resource of type $typeDesc with label ${label.orNull}"
}

}

/**
Expand Down
66 changes: 66 additions & 0 deletions modules/core/src/weaver/MemoisedResource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package weaver

import cats.effect._
import cats.syntax.all._

import CECompat.{ Deferred, Ref }

object MemoisedResource {
def apply[F[_]: Concurrent, A](
resource: Resource[F, A]): F[Resource[F, A]] =
new MemoisedResource[F, A].apply(resource)
}

private class MemoisedResource[F[_]: Concurrent, A] {

sealed trait State
case object Uninitialised extends State
case class InUse(
value: Deferred[F, Either[Throwable, A]],
finalizer: F[Unit],
uses: Int)
extends State

def apply(resource: Resource[F, A]): F[Resource[F, A]] =
Ref[F].of[State](Uninitialised).map { ref =>
val initialise: F[A] = for {
valuePromise <- Deferred[F, Either[Throwable, A]]
finaliserPromise <- Deferred[F, F[Unit]]
compute <- ref.modify {
case Uninitialised =>
val newState = InUse(valuePromise, finaliserPromise.get.flatten, 1)
val compute = Concurrent[F].attempt(resource.allocated).flatMap {
case Right((a, fin)) => for {
_ <- valuePromise.complete(Right(a))
_ <- finaliserPromise.complete(fin)
} yield a
case Left(e) => for {
_ <- valuePromise.complete(Left(e))
_ <- finaliserPromise.complete(Concurrent[F].unit)
_ <- ref.set(Uninitialised) // reset state
a <- Concurrent[F].raiseError[A](e)
} yield a
}
newState -> compute
case InUse(value, finalizer, uses) =>
val newState = InUse(value, finalizer, uses + 1)
val compute: F[A] = value.get.flatMap(Concurrent[F].fromEither)
newState -> compute
}
value <- compute
} yield value

val finalise: F[Unit] = ref.modify[F[Unit]] {
case Uninitialised =>
Uninitialised -> Concurrent[F].raiseError(
new IllegalStateException("Implementation error"))
case InUse(_, finaliser, n) if n <= 1 =>
Uninitialised -> finaliser
case InUse(value, finaliser, n) =>
InUse(value, finaliser, n - 1) -> Concurrent[F].unit
}.flatten

Resource.make(initialise)(_ => finalise)
}

}
36 changes: 36 additions & 0 deletions modules/framework/cats/test/src-jvm/DogFoodTestsJVM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package test
import cats.effect.{ IO, Resource }
import cats.syntax.all._

import sbt.testing.Status

object DogFoodTestsJVM extends IOSuite {

type Res = DogFood[IO]
Expand Down Expand Up @@ -42,4 +44,38 @@ object DogFoodTestsJVM extends IOSuite {
}
}

test("global lazy resources (parallel)") { dogfood =>
import dogfood._
runSuites(
globalInit(MetaJVM.LazyGlobal),
sharingSuite[MetaJVM.LazyAccessParallel],
sharingSuite[MetaJVM.LazyAccessParallel],
sharingSuite[MetaJVM.LazyAccessParallel]
).map {
case (_, events) =>
val successCount = events.toList.map(_.status()).count {
case Status.Success => true; case _ => false
}
expect(successCount == 3)
}

}

test("global lazy resources (sequential)") { dogfood =>
import dogfood._
runSuites(
globalInit(MetaJVM.LazyGlobal),
sharingSuite[MetaJVM.LazyAccessSequential0],
sharingSuite[MetaJVM.LazyAccessSequential1],
sharingSuite[MetaJVM.LazyAccessSequential2]
).map {
case (_, events) =>
val successCount = events.toList.map(_.status()).count {
case Status.Success => true; case _ => false
}
expect(successCount == 3)
}

}

}
79 changes: 79 additions & 0 deletions modules/framework/cats/test/src-jvm/MetaJVM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package test

import java.io.File

import scala.concurrent.duration._

import cats.effect._

// The build tool will only detect and run top-level test suites. We can however nest objects
Expand Down Expand Up @@ -39,4 +41,81 @@ object MetaJVM {
override def realTimeMillis: IO[Long] = IO.pure(0L)
}

class LazyState(
initialised: IO[Int],
finalised: IO[Int],
totalUses: CECompat.Ref[IO, Int],
uses: CECompat.Ref[IO, Int]) {
val getState: IO[(Int, Int, Int, Int)] = for {
i <- initialised
f <- finalised
t <- totalUses.updateAndGet(_ + 1)
u <- uses.updateAndGet(_ + 1)
} yield (i, f, t, u)
}

object LazyGlobal extends GlobalResource {
def sharedResources(global: weaver.GlobalWrite): Resource[IO, Unit] =
CECompat.resourceLift {
for {
initialised <- CECompat.Ref[IO].of(0)
finalised <- CECompat.Ref[IO].of(0)
totalUses <- CECompat.Ref[IO].of(0)
resource =
CECompat.resourceLift(CECompat.Ref[IO].of(0)).flatMap { uses =>
Resource.make(initialised.update(_ + 1))(_ =>
finalised.update(_ + 1)).map(_ =>
new LazyState(initialised.get, finalised.get, totalUses, uses))
}
_ <- global.putLazy(resource)
} yield ()
}
}

class LazyAccessParallel(global: GlobalRead) extends IOSuite {
type Res = LazyState
def sharedResource: Resource[IO, Res] = global.getOrFailR[LazyState]()

test("Lazy resources should be instantiated only once") { state =>
IO.sleep(100.millis) *> state.getState.map {
case (initialised, finalised, totalUses, localUses) =>
expect.all(
initialised == 1, // resource is initialised only once and uses in parallel
finalised == 0, // resource is not finalised until all parallel uses are completed
totalUses >= 1,
totalUses <= 3,
localUses >= 1,
localUses <= 3
)
}
}
}

abstract class LazyAccessSequential(global: GlobalRead, index: Int)
extends IOSuite {
type Res = LazyState
def sharedResource: Resource[IO, Res] =
CECompat.resourceLift(IO.sleep(index * 500.millis)).flatMap(_ =>
global.getOrFailR[LazyState]())

test("Lazy resources should be instantiated several times") { state =>
state.getState.map {
case (initialised, finalised, totalUses, localUses) =>
expect.all(
initialised == totalUses, // lazy resource will get initialised for each suite
finalised == totalUses - 1,
localUses == 1 // one test for each inialisation
)
}
}
}

// Using sleeps to force sequential runs of suites
class LazyAccessSequential0(global: GlobalRead)
extends LazyAccessSequential(global, 0)
class LazyAccessSequential1(global: GlobalRead)
extends LazyAccessSequential(global, 1)
class LazyAccessSequential2(global: GlobalRead)
extends LazyAccessSequential(global, 2)

}
Loading

0 comments on commit ee863a1

Please sign in to comment.