diff --git a/src/main/scala/com/github/unchama/generic/effect/stream/StreamExtra.scala b/src/main/scala/com/github/unchama/generic/effect/stream/StreamExtra.scala index 5873f09fe6..29038d8480 100644 --- a/src/main/scala/com/github/unchama/generic/effect/stream/StreamExtra.scala +++ b/src/main/scala/com/github/unchama/generic/effect/stream/StreamExtra.scala @@ -1,8 +1,8 @@ package com.github.unchama.generic.effect.stream -import cats.Eq +import cats.{Eq, Monad} import cats.effect.concurrent.Ref -import cats.effect.{Async, Concurrent} +import cats.effect.{Async, Concurrent, Sync} import com.github.unchama.generic.Diff import com.github.unchama.minecraft.algebra.HasUuid import fs2.{Chunk, Pull, Stream} @@ -92,17 +92,17 @@ object StreamExtra { /** * 与えられたストリームを、エラーが発生したときに再起動するストリームに変換してコンパイルする。 */ - def compileToRestartingStream[F[_] : Async : ErrorLogger, A](stream: Stream[F, _]): F[A] = { - stream - .handleErrorWith { error => - Stream.eval { - ErrorLogger[F] - .error(error)("fs2.Stream が予期せぬエラーで終了しました。再起動します。") + def compileToRestartingStream[F[_] : Sync : ErrorLogger, A](stream: Stream[F, _]): F[A] = { + Monad[F].foreverM { + stream + .handleErrorWith { error => + Stream.eval { + ErrorLogger[F] + .error(error)("fs2.Stream が予期せぬエラーで終了しました。再起動します。") + } } - } - .repeat - .compile - .drain - .flatMap(_ => Async[F].never[A]) + .compile + .drain + } } }