Skip to content

Commit

Permalink
[update] StreamExtra.compileToRestartingStreamを、stream.repeatではなくfore…
Browse files Browse the repository at this point in the history
…verMによって再起動するようにする
  • Loading branch information
kory33 committed Jul 27, 2021
1 parent 828fa58 commit b895662
Showing 1 changed file with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.github.unchama.generic.effect.stream

import cats.Eq
import cats.{Eq, Monad}
import cats.effect.concurrent.Ref
import cats.effect.{Async, Concurrent}
import cats.effect.{Async, Concurrent, Sync}
import com.github.unchama.generic.Diff
import com.github.unchama.minecraft.algebra.HasUuid
import fs2.{Chunk, Pull, Stream}
Expand Down Expand Up @@ -92,17 +92,17 @@ object StreamExtra {
/**
* 与えられたストリームを、エラーが発生したときに再起動するストリームに変換してコンパイルする。
*/
def compileToRestartingStream[F[_] : Async : ErrorLogger, A](stream: Stream[F, _]): F[A] = {
stream
.handleErrorWith { error =>
Stream.eval {
ErrorLogger[F]
.error(error)("fs2.Stream が予期せぬエラーで終了しました。再起動します。")
def compileToRestartingStream[F[_] : Sync : ErrorLogger, A](stream: Stream[F, _]): F[A] = {
Monad[F].foreverM {
stream
.handleErrorWith { error =>
Stream.eval {
ErrorLogger[F]
.error(error)("fs2.Stream が予期せぬエラーで終了しました。再起動します。")
}
}
}
.repeat
.compile
.drain
.flatMap(_ => Async[F].never[A])
.compile
.drain
}
}
}

0 comments on commit b895662

Please sign in to comment.