Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
rpiaggio committed Dec 14, 2024
1 parent 295b31b commit 6c7976a
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,96 @@ import japgolly.scalajs.react.*
import japgolly.scalajs.react.hooks.CustomHook
import japgolly.scalajs.react.util.DefaultEffects.Async as DefaultA

object UseEffectStreamResource {

protected def hook[D: Reusability] =
CustomHook[WithDeps[D, StreamResource[Unit]]]
.useAsyncEffectWithDepsBy(props => props.deps): props =>
deps =>
for
latch <- Deferred[DefaultA, Unit] // Latch for stream termination.
(_, close) <- props
.fromDeps(deps)
.flatMap: stream =>
(stream.compile.drain >> latch.complete(())).background.void
.allocated
supervisor <- (latch.get >> close).start // Close the resource if the stream terminates.
yield
// Cleanup closes resource and cancels the supervisor, unless resource is already closed.
(supervisor.cancel >> close).when:
latch.tryGet.map(_.isEmpty)
.build
object UseEffectStreamResource:
/**
* Open a `Resource[Async, fs.Stream[Async, Unit]]` on mount or when dependencies change, and
* drain the stream by creating a fiber. The fiber will be cancelled and the resource closed on
* unmount or deps change.
*/
final def useEffectStreamResourceWithDeps[D: Reusability](deps: => D)(
effectStreamResource: D => StreamResource[Unit]
): HookResult[Unit] =
useAsyncEffectWithDeps(deps): depsValue =>
for
latch <- Deferred[DefaultA, Unit] // Latch for stream termination.
(_, close) <- effectStreamResource(deps)
.flatMap: stream =>
(stream.compile.drain >> latch.complete(())).background.void
.allocated
supervisor <- (latch.get >> close).start // Close the resource if the stream terminates.
yield
// Cleanup closes resource and cancels the supervisor, unless resource is already closed.
(supervisor.cancel >> close).when:
latch.tryGet.map(_.isEmpty)

/**
* Open a `Resource[Async, fs.Stream[Async, Unit]]` on each render, and drain the stream by
* creating a fiber. If there was another fiber executing from the previous render, it will be
* cancelled and its resource closed.
*/
final inline def useEffectStreamResource(
effectStreamResource: => StreamResource[Unit]
): HookResult[Unit] =
useEffectStreamResourceWithDeps(NeverReuse)(_ => effectStreamResource)

/**
* Open a `Resource[Async, fs.Stream[Async, Unit]]` on mount, and drain the stream by creating a
* fiber. The fiber will be cancelled and the resource closed on unmount.
*/
final inline def useEffectStreamResourceOnMount(
effectStreamResource: => StreamResource[Unit]
): HookResult[Unit] = // () has Reusability = always.
useEffectStreamResourceWithDeps(())(_ => effectStreamResource)

/**
* Open a `Resource[Async, fs.Stream[Async, Unit]]` when a `Pot` dependency becomes `Ready`, and
* drain the stream by creating a fiber. The fiber will be cancelled and the resource closed on
* unmount or if the dependency transitions to `Pending` or `Error`.
*/
final def useEffectStreamResourceWhenDepsReady[D](
deps: => Pot[D]
)(effectStreamResource: D => StreamResource[Unit]): HookResult[Unit] =
useEffectStreamResourceWithDeps(deps.toOption.void)(_ =>
deps.toOption.map(effectStreamResource).orEmpty
)

/**
* Drain a `fs2.Stream[Async, Unit]` by creating a fiber on mount or when deps change.The fiber
* will be cancelled on unmount or deps change.
*/
final inline def useEffectStreamWithDeps[D: Reusability](deps: => D)(
effectStream: D => fs2.Stream[DefaultA, Unit]
): HookResult[Unit] =
useEffectStreamResourceWithDeps(deps)(deps => Resource.pure(effectStream(deps)))

/**
* Drain a `fs2.Stream[Async, Unit]` by creating a fiber on each render. If there was another
* fiber executing from the previous render, it will be cancelled.
*/
final inline def useEffectStream(effectStream: fs2.Stream[DefaultA, Unit]): HookResult[Unit] =
useEffectStreamWithDeps(NeverReuse)(_ => effectStream)

/**
* Drain a `fs2.Stream[Async, Unit]` by creating a fiber when a `Pot` dependency becomes `Ready`.
* The fiber will be cancelled on unmount or if the dependency transitions to `Pending` or
* `Error`.
*/
final inline def useEffectStreamWhenDepsReady[D](
deps: => Pot[D]
)(effectStream: D => fs2.Stream[DefaultA, Unit]): HookResult[Unit] =
useEffectStreamWithDeps(deps.toOption.void)(_ => deps.toOption.map(effectStream).orEmpty)

/**
* Drain a `fs2.Stream[Async, Unit]` by creating a fiber on mount. The fiber will be cancelled on
* unmount.
*/
final inline def useEffectStreamOnMount(
effectStream: => fs2.Stream[DefaultA, Unit]
): HookResult[Unit] =
useEffectStreamResourceOnMount(Resource.pure(effectStream))

private def hook[D: Reusability]: CustomHook[WithDeps[D, StreamResource[Unit]], Unit] =
CustomHook.fromHookResult(input => useEffectStreamResourceWithDeps(input.deps)(input.fromDeps))

object HooksApiExt {
sealed class Primary[Ctx, Step <: HooksApi.AbstractStep](api: HooksApi.Primary[Ctx, Step]) {
Expand Down Expand Up @@ -312,4 +383,3 @@ object UseEffectStreamResource {
}

object syntax extends HooksApiExt
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import japgolly.scalajs.react.util.DefaultEffects.Sync as DefaultS

import scala.reflect.ClassTag

object UseStreamResource {
object UseStreamResource:

private def buildStreamResource[D, A](
props: WithDeps[D, StreamResource[A]],
Expand Down Expand Up @@ -719,4 +719,3 @@ object UseStreamResource {
}

object syntax extends HooksApiExt
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ import japgolly.scalajs.react.hooks.CustomHook

import scala.concurrent.duration.FiniteDuration

object UseThrottlingStateView {
def hook[A]: CustomHook[(A, FiniteDuration), Pot[ThrottlingView[A]]] =
CustomHook[(A, FiniteDuration)]
.useStateViewBy(props => props._1)
.useEffectResultOnMountBy((props, _) => ViewThrottler[A](props._2))
.buildReturning: (_, view, throttler) =>
throttler.map(_.throttle(view))
object UseThrottlingStateView:
/** Creates component state as a `ThrottlingView`. See `ViewThrottler[A]`. */
final def useThrottlingStateView[A](
input: (A, FiniteDuration)
): HookResult[Pot[ThrottlingView[A]]] =
for
view <- useStateView(input._1)
throttler <- useEffectResultOnMount(ViewThrottler[A](input._2))
yield throttler.map(_.throttle(view))

private def hook[A]: CustomHook[(A, FiniteDuration), Pot[ThrottlingView[A]]] =
CustomHook.fromHookResult(useThrottlingStateView(_))

object HooksApiExt {
sealed class Primary[Ctx, Step <: HooksApi.AbstractStep](api: HooksApi.Primary[Ctx, Step]) {
Expand Down Expand Up @@ -68,4 +73,3 @@ object UseThrottlingStateView {
}

object syntax extends HooksApiExt
}
12 changes: 11 additions & 1 deletion modules/core/js/src/main/scala/crystal/react/hooks/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ export UseSingleEffect.useSingleEffect, UseSerialState.useSerialState,
useEffectResultWhenDepsReady,
useEffectResultWhenDepsReadyOrChange,
useEffectResultWithDeps
}, UseResource.{useResource, useResourceOnMount}
}, UseResource.{useResource, useResourceOnMount}, UseThrottlingStateView.useThrottlingStateView,
UseEffectStreamResource.{
useEffectStream,
useEffectStreamOnMount,
useEffectStreamResource,
useEffectStreamResourceOnMount,
useEffectStreamResourceWhenDepsReady,
useEffectStreamResourceWithDeps,
useEffectStreamWhenDepsReady,
useEffectStreamWithDeps
}

export UseSingleEffect.syntax.*, UseSerialState.syntax.*, UseStateCallback.syntax.*,
UseStateView.syntax.*, UseStateViewWithReuse.syntax.*, UseSerialStateView.syntax.*,
Expand Down

0 comments on commit 6c7976a

Please sign in to comment.