diff --git a/modules/core/src-ce2/weaver/CECompat.scala b/modules/core/src-ce2/weaver/CECompat.scala index eaa07c82..488b7bf6 100644 --- a/modules/core/src-ce2/weaver/CECompat.scala +++ b/modules/core/src-ce2/weaver/CECompat.scala @@ -14,6 +14,9 @@ private[weaver] trait CECompat { private[weaver] type Ref[F[_], A] = cats.effect.concurrent.Ref[F, A] private[weaver] val Ref = cats.effect.concurrent.Ref + private[weaver] type Deferred[F[_], A] = cats.effect.concurrent.Deferred[F, A] + private[weaver] val Deferred = cats.effect.concurrent.Deferred + private[weaver] type Semaphore[F[_]] = cats.effect.concurrent.Semaphore[F] private[weaver] val Semaphore = cats.effect.concurrent.Semaphore diff --git a/modules/core/src-ce3/weaver/CECompat.scala b/modules/core/src-ce3/weaver/CECompat.scala index 882411a8..a9f40dc6 100644 --- a/modules/core/src-ce3/weaver/CECompat.scala +++ b/modules/core/src-ce3/weaver/CECompat.scala @@ -15,6 +15,9 @@ private[weaver] trait CECompat { private[weaver] type Ref[F[_], A] = cats.effect.kernel.Ref[F, A] private[weaver] val Ref = cats.effect.kernel.Ref + private[weaver] type Deferred[F[_], A] = cats.effect.kernel.Deferred[F, A] + private[weaver] val Deferred = cats.effect.kernel.Deferred + private[weaver] type Semaphore[F[_]] = cats.effect.std.Semaphore[F] private[weaver] val Semaphore = cats.effect.std.Semaphore diff --git a/modules/core/src/weaver/GlobalResourceF.scala b/modules/core/src/weaver/GlobalResourceF.scala index eff2d9c9..8486bca4 100644 --- a/modules/core/src/weaver/GlobalResourceF.scala +++ b/modules/core/src/weaver/GlobalResourceF.scala @@ -33,22 +33,61 @@ trait GlobalResourceF[F[_]] extends GlobalResourceBase { object GlobalResourceF { trait Write[F[_]] { - protected implicit def F: Sync[F] + protected implicit def F: CECompat.Effect[F] + protected def rawPut[A]( + pureOrLazy: Either[A, Resource[F, A]], + label: Option[String])(implicit rt: ResourceTag[A]): F[Unit] + def put[A](value: A, label: Option[String] = None)( - implicit rt: ResourceTag[A]): F[Unit] + implicit rt: ResourceTag[A]): F[Unit] = rawPut(Left(value), label) def putR[A](value: A, label: Option[String] = None)( implicit rt: ResourceTag[A]): Resource[F, Unit] = CECompat.resourceLift(put(value, label)) + + /** + * Memoises a resource so to optimise its sharing. The memoised resource gets allocated + * lazily, when the first suite that needs it starts running, and gets finalised as soon + * as all suites that need it concurrently are done. + * + * In case the resource was already finalised when a suite needs, it gets re-allocated + * on demand. + * + * This can be useful for constructs that consume large amount of machine resources + * (CPU, memory, connections), to ensure they are cleaned-up when they should. + */ + def putLazy[A]( + resource: Resource[F, A], + label: Option[String] = None)(implicit rt: ResourceTag[A]): F[Unit] = + MemoisedResource(resource).flatMap(r => rawPut(Right(r), label)) + + def putLazyR[A]( + resource: Resource[F, A], + label: Option[String] = None)(implicit + rt: ResourceTag[A]): Resource[F, Unit] = + CECompat.resourceLift(putLazy(resource, label)) } trait Read[F[_]] { protected implicit def F: MonadError[F, Throwable] + + protected def rawGet[A](label: Option[String] = None)( + implicit rt: ResourceTag[A]): F[Option[Either[A, Resource[F, A]]]] + def get[A](label: Option[String] = None)( - implicit rt: ResourceTag[A]): F[Option[A]] + implicit rt: ResourceTag[A]): F[Option[A]] = rawGet[A](label).map { + case Some(Left(value)) => Some(value) + case _ => None + } def getR[A](label: Option[String] = None)( implicit rt: ResourceTag[A]): Resource[F, Option[A]] = - CECompat.resourceLift(get[A](label)) + CECompat.resourceLift { + rawGet[A](label) + }.flatMap { + case Some(Left(value)) => Resource.pure(Some(value)) + case Some(Right(resource)) => resource.map(Some(_)) + case None => Resource.pure(None) + } def getOrFail[A](label: Option[String] = None)( implicit rt: ResourceTag[A] @@ -58,31 +97,49 @@ object GlobalResourceF { case None => F.raiseError(GlobalResourceF.ResourceNotFound(label, rt.description)) } + def getOrFailR[A](label: Option[String] = None)( implicit rt: ResourceTag[A]): Resource[F, A] = - CECompat.resourceLift(getOrFail[A](label)) - + getR[A](label).flatMap { + case Some(value) => Resource.pure[F, A](value) + case None => + CECompat.resourceLift(F.raiseError(GlobalResourceF.ResourceNotFound( + label, + rt.description))) + } } - private[weaver] def createMap[F[_]: Sync]: F[Read[F] with Write[F]] = + def createMap[F[_]: CECompat.Effect]: F[Read[F] with Write[F]] = Ref[F] - .of(Map.empty[(Option[String], ResourceTag[_]), Any]) + .of(Map.empty[(Option[String], ResourceTag[_]), + Either[Any, Resource[F, Any]]]) .map(new ResourceMap(_)) private class ResourceMap[F[_]]( - ref: Ref[F, Map[(Option[String], ResourceTag[_]), Any]])( - implicit val F: Sync[F]) + ref: Ref[ + F, + Map[(Option[String], ResourceTag[_]), Either[Any, Resource[F, Any]]]])( + implicit val F: CECompat.Effect[F]) extends Read[F] with Write[F] { self => - def put[A](value: A, label: Option[String])( + def rawPut[A](pureOrLazy: Either[A, Resource[F, A]], label: Option[String])( implicit rt: ResourceTag[A]): F[Unit] = { - ref.update(_ + ((label, rt) -> value)) + ref.update(_ + ((label, rt) -> pureOrLazy)) } - def get[A](label: Option[String])( - implicit rt: ResourceTag[A]): F[Option[A]] = - ref.get.map(_.get(label -> rt).flatMap(rt.cast)) + def rawGet[A](label: Option[String])( + implicit rt: ResourceTag[A]): F[Option[Either[A, Resource[F, A]]]] = + ref.get.map(_.get(label -> rt)).map { + case None => None + case Some(Left(value)) => rt.cast(value).map(Left(_)) + case Some(Right(resource)) => + Some(Right(resource.map(rt.cast).map { + case None => + F.raiseError(ResourceNotFound(label, rt.description)) + case Some(value) => F.pure(value) + }.flatMap(CECompat.resourceLift(_)))) + } } @@ -91,6 +148,7 @@ object GlobalResourceF { override def getMessage(): String = s"Could not find a resource of type $typeDesc with label ${label.orNull}" } + } /** diff --git a/modules/core/src/weaver/MemoisedResource.scala b/modules/core/src/weaver/MemoisedResource.scala new file mode 100644 index 00000000..730ff7a4 --- /dev/null +++ b/modules/core/src/weaver/MemoisedResource.scala @@ -0,0 +1,66 @@ +package weaver + +import cats.effect._ +import cats.syntax.all._ + +import CECompat.{ Deferred, Ref } + +object MemoisedResource { + def apply[F[_]: Concurrent, A]( + resource: Resource[F, A]): F[Resource[F, A]] = + new MemoisedResource[F, A].apply(resource) +} + +private class MemoisedResource[F[_]: Concurrent, A] { + + sealed trait State + case object Uninitialised extends State + case class InUse( + value: Deferred[F, Either[Throwable, A]], + finalizer: F[Unit], + uses: Int) + extends State + + def apply(resource: Resource[F, A]): F[Resource[F, A]] = + Ref[F].of[State](Uninitialised).map { ref => + val initialise: F[A] = for { + valuePromise <- Deferred[F, Either[Throwable, A]] + finaliserPromise <- Deferred[F, F[Unit]] + compute <- ref.modify { + case Uninitialised => + val newState = InUse(valuePromise, finaliserPromise.get.flatten, 1) + val compute = Concurrent[F].attempt(resource.allocated).flatMap { + case Right((a, fin)) => for { + _ <- valuePromise.complete(Right(a)) + _ <- finaliserPromise.complete(fin) + } yield a + case Left(e) => for { + _ <- valuePromise.complete(Left(e)) + _ <- finaliserPromise.complete(Concurrent[F].unit) + _ <- ref.set(Uninitialised) // reset state + a <- Concurrent[F].raiseError[A](e) + } yield a + } + newState -> compute + case InUse(value, finalizer, uses) => + val newState = InUse(value, finalizer, uses + 1) + val compute: F[A] = value.get.flatMap(Concurrent[F].fromEither) + newState -> compute + } + value <- compute + } yield value + + val finalise: F[Unit] = ref.modify[F[Unit]] { + case Uninitialised => + Uninitialised -> Concurrent[F].raiseError( + new IllegalStateException("Implementation error")) + case InUse(_, finaliser, n) if n <= 1 => + Uninitialised -> finaliser + case InUse(value, finaliser, n) => + InUse(value, finaliser, n - 1) -> Concurrent[F].unit + }.flatten + + Resource.make(initialise)(_ => finalise) + } + +} diff --git a/modules/framework/cats/test/src-jvm/DogFoodTestsJVM.scala b/modules/framework/cats/test/src-jvm/DogFoodTestsJVM.scala index e469e4ba..5b6a93de 100644 --- a/modules/framework/cats/test/src-jvm/DogFoodTestsJVM.scala +++ b/modules/framework/cats/test/src-jvm/DogFoodTestsJVM.scala @@ -5,6 +5,8 @@ package test import cats.effect.{ IO, Resource } import cats.syntax.all._ +import sbt.testing.Status + object DogFoodTestsJVM extends IOSuite { type Res = DogFood[IO] @@ -42,4 +44,38 @@ object DogFoodTestsJVM extends IOSuite { } } + test("global lazy resources (parallel)") { dogfood => + import dogfood._ + runSuites( + globalInit(MetaJVM.LazyGlobal), + sharingSuite[MetaJVM.LazyAccessParallel], + sharingSuite[MetaJVM.LazyAccessParallel], + sharingSuite[MetaJVM.LazyAccessParallel] + ).map { + case (_, events) => + val successCount = events.toList.map(_.status()).count { + case Status.Success => true; case _ => false + } + expect(successCount == 3) + } + + } + + test("global lazy resources (sequential)") { dogfood => + import dogfood._ + runSuites( + globalInit(MetaJVM.LazyGlobal), + sharingSuite[MetaJVM.LazyAccessSequential0], + sharingSuite[MetaJVM.LazyAccessSequential1], + sharingSuite[MetaJVM.LazyAccessSequential2] + ).map { + case (_, events) => + val successCount = events.toList.map(_.status()).count { + case Status.Success => true; case _ => false + } + expect(successCount == 3) + } + + } + } diff --git a/modules/framework/cats/test/src-jvm/MetaJVM.scala b/modules/framework/cats/test/src-jvm/MetaJVM.scala index d3d8e715..7954ec96 100644 --- a/modules/framework/cats/test/src-jvm/MetaJVM.scala +++ b/modules/framework/cats/test/src-jvm/MetaJVM.scala @@ -4,6 +4,8 @@ package test import java.io.File +import scala.concurrent.duration._ + import cats.effect._ // The build tool will only detect and run top-level test suites. We can however nest objects @@ -39,4 +41,81 @@ object MetaJVM { override def realTimeMillis: IO[Long] = IO.pure(0L) } + class LazyState( + initialised: IO[Int], + finalised: IO[Int], + totalUses: CECompat.Ref[IO, Int], + uses: CECompat.Ref[IO, Int]) { + val getState: IO[(Int, Int, Int, Int)] = for { + i <- initialised + f <- finalised + t <- totalUses.updateAndGet(_ + 1) + u <- uses.updateAndGet(_ + 1) + } yield (i, f, t, u) + } + + object LazyGlobal extends GlobalResource { + def sharedResources(global: weaver.GlobalWrite): Resource[IO, Unit] = + CECompat.resourceLift { + for { + initialised <- CECompat.Ref[IO].of(0) + finalised <- CECompat.Ref[IO].of(0) + totalUses <- CECompat.Ref[IO].of(0) + resource = + CECompat.resourceLift(CECompat.Ref[IO].of(0)).flatMap { uses => + Resource.make(initialised.update(_ + 1))(_ => + finalised.update(_ + 1)).map(_ => + new LazyState(initialised.get, finalised.get, totalUses, uses)) + } + _ <- global.putLazy(resource) + } yield () + } + } + + class LazyAccessParallel(global: GlobalRead) extends IOSuite { + type Res = LazyState + def sharedResource: Resource[IO, Res] = global.getOrFailR[LazyState]() + + test("Lazy resources should be instantiated only once") { state => + IO.sleep(100.millis) *> state.getState.map { + case (initialised, finalised, totalUses, localUses) => + expect.all( + initialised == 1, // resource is initialised only once and uses in parallel + finalised == 0, // resource is not finalised until all parallel uses are completed + totalUses >= 1, + totalUses <= 3, + localUses >= 1, + localUses <= 3 + ) + } + } + } + + abstract class LazyAccessSequential(global: GlobalRead, index: Int) + extends IOSuite { + type Res = LazyState + def sharedResource: Resource[IO, Res] = + CECompat.resourceLift(IO.sleep(index * 500.millis)).flatMap(_ => + global.getOrFailR[LazyState]()) + + test("Lazy resources should be instantiated several times") { state => + state.getState.map { + case (initialised, finalised, totalUses, localUses) => + expect.all( + initialised == totalUses, // lazy resource will get initialised for each suite + finalised == totalUses - 1, + localUses == 1 // one test for each inialisation + ) + } + } + } + + // Using sleeps to force sequential runs of suites + class LazyAccessSequential0(global: GlobalRead) + extends LazyAccessSequential(global, 0) + class LazyAccessSequential1(global: GlobalRead) + extends LazyAccessSequential(global, 1) + class LazyAccessSequential2(global: GlobalRead) + extends LazyAccessSequential(global, 2) + } diff --git a/modules/framework/cats/test/src/MemoisedResourceTests.scala b/modules/framework/cats/test/src/MemoisedResourceTests.scala new file mode 100644 index 00000000..86ec660f --- /dev/null +++ b/modules/framework/cats/test/src/MemoisedResourceTests.scala @@ -0,0 +1,53 @@ +package weaver +package framework +package test + +import scala.concurrent.duration._ + +import cats.effect._ +import cats.syntax.all._ + +import CECompat.Ref + +object MemoisedResourceTests extends SimpleIOSuite { + + test("""|Memoised resources should be: + | * lazily allocated, + | * shared when accessed concurrently + | * not finalised until all uses are finished + | * re-allocated on demand after being finalised""".stripMargin) { + for { + initialised <- Ref[IO].of(0) + finalised <- Ref[IO].of(0) + used <- Ref[IO].of(0) + resource = + Resource.make(initialised.update(_ + 1))(_ => finalised.update(_ + 1)) + use <- MemoisedResource(resource).map(r => + r.use(_ => IO.sleep(100.millis) *> used.update(_ + 1))) + _ <- use + _ <- List.fill(10)(use).parSequence + _ <- use + initCount <- initialised.get + finCount <- finalised.get + useCount <- used.get + } yield expect.all(initCount == 3, finCount == 3, useCount == 12) + } + + object Boom extends Exception("Boom") + + test("""|Memoised resources reset when allocation fails""".stripMargin) { + for { + fail <- Ref[IO].of(true) + allocate = fail.modify[IO[String]] { + case true => (false, IO.raiseError(Boom)) + case false => (false, IO.pure("hello")) + }.flatten + resource = Resource.liftF(allocate) + res <- MemoisedResource(resource) + use = res.use(r => IO.pure(r)) + firstResult <- use.attempt + secondResult <- use + } yield expect.all(firstResult == Left(Boom), secondResult == "hello") + } + +}