From 7a867b3c593166ab1f58698e50859f631d74b5e3 Mon Sep 17 00:00:00 2001 From: Kory <6561358+kory33@users.noreply.github.com> Date: Tue, 27 Jul 2021 10:02:35 +0900 Subject: [PATCH] =?UTF-8?q?[update]=20compileToRestartingStream=E3=81=ABco?= =?UTF-8?q?ntext=E3=82=92=E8=BF=BD=E5=8A=A0=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../SignallingRepositoryDefinition.scala | 2 +- .../generic/effect/stream/StreamExtra.scala | 8 ++++---- .../subsystems/notification/System.scala | 2 +- .../ExpBarSynchronizationRepositoryTemplate.scala | 2 +- .../subsystems/fastdiggingeffect/System.scala | 14 +++++++------- .../EffectListRepositoryDefinitions.scala | 2 +- .../EffectStatsSettingsRepositoryDefinition.scala | 4 +++- .../PocketInventoryRepositoryDefinition.scala | 3 ++- .../subsystems/gachapoint/System.scala | 2 +- .../subsystems/halfhourranking/System.scala | 2 +- .../seichiassist/subsystems/mana/System.scala | 2 +- .../ManaBarSynchronizationRepository.scala | 6 +++--- .../GenericRefreshingRankingCache.scala | 2 +- .../subsystems/seichilevelupgift/System.scala | 2 +- .../subsystems/seichilevelupmessage/System.scala | 2 +- 15 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala b/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala index c4528c71d3..c7fa288528 100644 --- a/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala @@ -34,7 +34,7 @@ object SignallingRepositoryDefinition { // - restart the stream when the downstream stream fails // - unsubscribe when the player exits // We should be able to achieve this by returning a CancelToken or something on this flatXmapWithPlayer - StreamExtra.compileToRestartingStream { + StreamExtra.compileToRestartingStream("[SignallingRepositoryDefinition]") { stream.map(player -> _).through(publishSink) } } diff --git a/src/main/scala/com/github/unchama/generic/effect/stream/StreamExtra.scala b/src/main/scala/com/github/unchama/generic/effect/stream/StreamExtra.scala index 29038d8480..c02789566f 100644 --- a/src/main/scala/com/github/unchama/generic/effect/stream/StreamExtra.scala +++ b/src/main/scala/com/github/unchama/generic/effect/stream/StreamExtra.scala @@ -1,8 +1,8 @@ package com.github.unchama.generic.effect.stream -import cats.{Eq, Monad} import cats.effect.concurrent.Ref -import cats.effect.{Async, Concurrent, Sync} +import cats.effect.{Concurrent, Sync} +import cats.{Eq, Monad} import com.github.unchama.generic.Diff import com.github.unchama.minecraft.algebra.HasUuid import fs2.{Chunk, Pull, Stream} @@ -92,13 +92,13 @@ object StreamExtra { /** * 与えられたストリームを、エラーが発生したときに再起動するストリームに変換してコンパイルする。 */ - def compileToRestartingStream[F[_] : Sync : ErrorLogger, A](stream: Stream[F, _]): F[A] = { + def compileToRestartingStream[F[_] : Sync : ErrorLogger, A](context: String)(stream: Stream[F, _]): F[A] = { Monad[F].foreverM { stream .handleErrorWith { error => Stream.eval { ErrorLogger[F] - .error(error)("fs2.Stream が予期せぬエラーで終了しました。再起動します。") + .error(error)(s"$context fs2.Stream が予期せぬエラーで終了しました。再起動します。") } } .compile diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcount/subsystems/notification/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcount/subsystems/notification/System.scala index 9f74000229..e7ae1abeb6 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcount/subsystems/notification/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcount/subsystems/notification/System.scala @@ -18,7 +18,7 @@ object System { ](breakCountReadAPI: BreakCountReadAPI[F, G, Player]) : F[A] = { val action: NotifyLevelUp[F, Player] = BukkitNotifyLevelUp[F] - StreamExtra.compileToRestartingStream { + StreamExtra.compileToRestartingStream("[breakcount.notification]") { breakCountReadAPI .seichiLevelUpdates .either(breakCountReadAPI.seichiStarLevelUpdates) diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/application/ExpBarSynchronizationRepositoryTemplate.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/application/ExpBarSynchronizationRepositoryTemplate.scala index 480df2ed31..fe26524515 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/application/ExpBarSynchronizationRepositoryTemplate.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/application/ExpBarSynchronizationRepositoryTemplate.scala @@ -58,7 +58,7 @@ object ExpBarSynchronizationRepositoryTemplate { _ <- EffectExtra.runAsyncAndForget[F, G, Unit](bossBar.players.add(player)) _ <- EffectExtra.runAsyncAndForget[F, G, Unit] { - StreamExtra.compileToRestartingStream[F, Unit] { + StreamExtra.compileToRestartingStream[F, Unit]("[ExpBarSynchronizationRepositoryTemplate]") { switching.concurrently(synchronization) }.start >>= fiberPromise.complete } diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/System.scala index 8448081859..d3fde4ba96 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/System.scala @@ -106,7 +106,7 @@ object System { } _ <- - StreamExtra.compileToRestartingStream[F, Unit] { + StreamExtra.compileToRestartingStream[F, Unit]("[FastDiggingEffect/EffectStatsNotification]") { EffectStatsNotification.using[F, Player](effectListDiffTopic.subscribe(1).mapFilter(identity)) }.start @@ -176,15 +176,15 @@ object System { implicit val api: FastDiggingEffectApi[F, Player] = system.effectApi List( - BreakCountEffectSynchronization.using[F, H, Player], - PlayerCountEffectSynchronization.using[F, Player], - SynchronizationProcess.using[F, Player]( + "BreakCountEffectSynchronization" -> BreakCountEffectSynchronization.using[F, H, Player], + "PlayerCountEffectSynchronization" -> PlayerCountEffectSynchronization.using[F, Player], + "SynchronizationProcess" -> SynchronizationProcess.using[F, Player]( system.settingsApi.currentSuppressionSettings, system.effectApi.effectClock ) - ).traverse( - StreamExtra.compileToRestartingStream(_).start - ) + ).traverse { case (str, stream) => + StreamExtra.compileToRestartingStream(s"[FastDiggingEffect/$str]")(stream).start + } } } } diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectListRepositoryDefinitions.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectListRepositoryDefinitions.scala index a0b8e8c049..ae6cb4331b 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectListRepositoryDefinitions.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectListRepositoryDefinitions.scala @@ -44,7 +44,7 @@ object EffectListRepositoryDefinitions { val (mutexRef, fiberPromise) = value val programToRun: F[Unit] = - StreamExtra.compileToRestartingStream { + StreamExtra.compileToRestartingStream("[EffectListRepositoryDefinitions]") { fs2.Stream .awakeEvery[F](1.second) .evalMap[F, FastDiggingEffectList](_ => ContextCoercion(mutexRef.readLatest)) diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectStatsSettingsRepositoryDefinition.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectStatsSettingsRepositoryDefinition.scala index 98e68a8572..0effeb12ff 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectStatsSettingsRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectStatsSettingsRepositoryDefinition.scala @@ -58,7 +58,9 @@ object EffectStatsSettingsRepositoryDefinition { } EffectExtra.runAsyncAndForget[F, G, Unit] { - StreamExtra.compileToRestartingStream(processStream).start >>= fiberPromise.complete + StreamExtra.compileToRestartingStream("[EffectStatsSettingsRepository]") { + processStream + }.start >>= fiberPromise.complete } } } diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/fourdimensionalpocket/application/PocketInventoryRepositoryDefinition.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/fourdimensionalpocket/application/PocketInventoryRepositoryDefinition.scala index 5ef6e094d5..40fd42e04f 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/fourdimensionalpocket/application/PocketInventoryRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/fourdimensionalpocket/application/PocketInventoryRepositoryDefinition.scala @@ -54,7 +54,8 @@ object PocketInventoryRepositoryDefinition { } EffectExtra.runAsyncAndForget[F, G, Unit] { - StreamExtra.compileToRestartingStream(processStream).start >>= fiberPromise.complete + StreamExtra.compileToRestartingStream("[PocketInventoryRepository]")(processStream).start >>= + fiberPromise.complete } } } diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/gachapoint/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/gachapoint/System.scala index 87ff7a44c2..df2a870de4 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/gachapoint/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/gachapoint/System.scala @@ -59,7 +59,7 @@ object System { EffectExtra.runAsyncAndForget[F, G, Unit] { streams - .traverse(StreamExtra.compileToRestartingStream(_).start) + .traverse(StreamExtra.compileToRestartingStream("[GachaPoint]")(_).start) .as(()) } } diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/halfhourranking/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/halfhourranking/System.scala index 50d9721684..8805bca79a 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/halfhourranking/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/halfhourranking/System.scala @@ -31,7 +31,7 @@ object System { implicit val sendBukkitMessage: SendMinecraftMessage[F, Player] = SendBukkitMessage[F] implicit val broadcastBukkitMessage: BroadcastMinecraftMessage[F] = BroadcastBukkitMessage[F] - StreamExtra.compileToRestartingStream { + StreamExtra.compileToRestartingStream("[HalfHourRanking]") { breakCountReadAPI .batchedIncreases(30.minutes) .map(RankingRecord.apply) diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/System.scala index 1568e78802..4ca7dea8ef 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/System.scala @@ -49,7 +49,7 @@ object System { _ <- List( UpdateManaCaps.using[F, G, Player](handles.repository), RefillToCap.using[F, G, Player](handles.repository) - ).traverse(StreamExtra.compileToRestartingStream[F, Unit](_).start) + ).traverse(StreamExtra.compileToRestartingStream[F, Unit]("[Mana]")(_).start) } yield new System[F, G, Player] { override val manaApi: ManaApi[F, G, Player] = new ManaApi[F, G, Player] { override val readManaAmount: KeyedDataRepository[Player, G[LevelCappedManaAmount]] = diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/manabar/application/ManaBarSynchronizationRepository.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/manabar/application/ManaBarSynchronizationRepository.scala index a93ff8e560..9decca96ae 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/manabar/application/ManaBarSynchronizationRepository.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/manabar/application/ManaBarSynchronizationRepository.scala @@ -16,6 +16,7 @@ object ManaBarSynchronizationRepository { type BossBarWithPlayer[F[_], P] = MinecraftBossBar[F] {type Player = P} import cats.implicits._ + import cats.effect.implicits._ def withContext[ G[_] : Sync, @@ -36,9 +37,8 @@ object ManaBarSynchronizationRepository { val programToRunAsync = bossBar.players.add(player) >> - Concurrent[F].start[Nothing] { - StreamExtra.compileToRestartingStream[F, Nothing](synchronization) - } >>= promise.complete + StreamExtra.compileToRestartingStream[F, Nothing]("[ManaBarSynchronization]")(synchronization).start >>= + promise.complete EffectExtra.runAsyncAndForget[F, G, Unit](programToRunAsync) } diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/ranking/application/GenericRefreshingRankingCache.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/ranking/application/GenericRefreshingRankingCache.scala index 6c915f8b05..e5271f9e5c 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/ranking/application/GenericRefreshingRankingCache.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/ranking/application/GenericRefreshingRankingCache.scala @@ -22,7 +22,7 @@ object GenericRefreshingRankingCache { initialRankingRecords <- persistence.getAllRankingRecords rankingRef <- Ref.of(new Ranking(initialRankingRecords)) _ <- - StreamExtra.compileToRestartingStream[F, Unit] { + StreamExtra.compileToRestartingStream[F, Unit]("[GenericRefreshingRankingCache]") { fs2.Stream .awakeEvery[F](30.seconds) .evalMap(_ => persistence.getAllRankingRecords) diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/seichilevelupgift/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/seichilevelupgift/System.scala index 5222c69f43..1b3bd6dc47 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/seichilevelupgift/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/seichilevelupgift/System.scala @@ -33,7 +33,7 @@ object System { } } - StreamExtra.compileToRestartingStream { + StreamExtra.compileToRestartingStream("[SeichiLevelUpGift]") { breakCountReadApi .seichiLevelUpdates .evalTap { case (player, diff) => interpreter.onLevelDiff(diff).run(player) } diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/seichilevelupmessage/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/seichilevelupmessage/System.scala index f31d7c46cd..475fa71e7f 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/seichilevelupmessage/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/seichilevelupmessage/System.scala @@ -14,7 +14,7 @@ object System { G[_], Player ](implicit breakCountReadAPI: BreakCountReadAPI[F, G, Player]): F[Nothing] = { - StreamExtra.compileToRestartingStream { + StreamExtra.compileToRestartingStream("[SeichiLevelUpMessage]") { breakCountReadAPI .seichiLevelUpdates .evalMap { case (player, diff) =>