Skip to content

Commit

Permalink
[update] compileToRestartingStreamにcontextを追加する
Browse files Browse the repository at this point in the history
kory33 committed Jul 27, 2021
1 parent b895662 commit 7a867b3
Showing 15 changed files with 29 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ object SignallingRepositoryDefinition {
// - restart the stream when the downstream stream fails
// - unsubscribe when the player exits
// We should be able to achieve this by returning a CancelToken or something on this flatXmapWithPlayer
StreamExtra.compileToRestartingStream {
StreamExtra.compileToRestartingStream("[SignallingRepositoryDefinition]") {
stream.map(player -> _).through(publishSink)
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.github.unchama.generic.effect.stream

import cats.{Eq, Monad}
import cats.effect.concurrent.Ref
import cats.effect.{Async, Concurrent, Sync}
import cats.effect.{Concurrent, Sync}
import cats.{Eq, Monad}
import com.github.unchama.generic.Diff
import com.github.unchama.minecraft.algebra.HasUuid
import fs2.{Chunk, Pull, Stream}
@@ -92,13 +92,13 @@ object StreamExtra {
/**
* 与えられたストリームを、エラーが発生したときに再起動するストリームに変換してコンパイルする。
*/
def compileToRestartingStream[F[_] : Sync : ErrorLogger, A](stream: Stream[F, _]): F[A] = {
def compileToRestartingStream[F[_] : Sync : ErrorLogger, A](context: String)(stream: Stream[F, _]): F[A] = {
Monad[F].foreverM {
stream
.handleErrorWith { error =>
Stream.eval {
ErrorLogger[F]
.error(error)("fs2.Stream が予期せぬエラーで終了しました。再起動します。")
.error(error)(s"$context fs2.Stream が予期せぬエラーで終了しました。再起動します。")
}
}
.compile
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ object System {
](breakCountReadAPI: BreakCountReadAPI[F, G, Player]) : F[A] = {
val action: NotifyLevelUp[F, Player] = BukkitNotifyLevelUp[F]

StreamExtra.compileToRestartingStream {
StreamExtra.compileToRestartingStream("[breakcount.notification]") {
breakCountReadAPI
.seichiLevelUpdates
.either(breakCountReadAPI.seichiStarLevelUpdates)
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ object ExpBarSynchronizationRepositoryTemplate {

_ <- EffectExtra.runAsyncAndForget[F, G, Unit](bossBar.players.add(player))
_ <- EffectExtra.runAsyncAndForget[F, G, Unit] {
StreamExtra.compileToRestartingStream[F, Unit] {
StreamExtra.compileToRestartingStream[F, Unit]("[ExpBarSynchronizationRepositoryTemplate]") {
switching.concurrently(synchronization)
}.start >>= fiberPromise.complete
}
Original file line number Diff line number Diff line change
@@ -106,7 +106,7 @@ object System {
}

_ <-
StreamExtra.compileToRestartingStream[F, Unit] {
StreamExtra.compileToRestartingStream[F, Unit]("[FastDiggingEffect/EffectStatsNotification]") {
EffectStatsNotification.using[F, Player](effectListDiffTopic.subscribe(1).mapFilter(identity))
}.start

@@ -176,15 +176,15 @@ object System {
implicit val api: FastDiggingEffectApi[F, Player] = system.effectApi

List(
BreakCountEffectSynchronization.using[F, H, Player],
PlayerCountEffectSynchronization.using[F, Player],
SynchronizationProcess.using[F, Player](
"BreakCountEffectSynchronization" -> BreakCountEffectSynchronization.using[F, H, Player],
"PlayerCountEffectSynchronization" -> PlayerCountEffectSynchronization.using[F, Player],
"SynchronizationProcess" -> SynchronizationProcess.using[F, Player](
system.settingsApi.currentSuppressionSettings,
system.effectApi.effectClock
)
).traverse(
StreamExtra.compileToRestartingStream(_).start
)
).traverse { case (str, stream) =>
StreamExtra.compileToRestartingStream(s"[FastDiggingEffect/$str]")(stream).start
}
}
}
}
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ object EffectListRepositoryDefinitions {
val (mutexRef, fiberPromise) = value

val programToRun: F[Unit] =
StreamExtra.compileToRestartingStream {
StreamExtra.compileToRestartingStream("[EffectListRepositoryDefinitions]") {
fs2.Stream
.awakeEvery[F](1.second)
.evalMap[F, FastDiggingEffectList](_ => ContextCoercion(mutexRef.readLatest))
Original file line number Diff line number Diff line change
@@ -58,7 +58,9 @@ object EffectStatsSettingsRepositoryDefinition {
}

EffectExtra.runAsyncAndForget[F, G, Unit] {
StreamExtra.compileToRestartingStream(processStream).start >>= fiberPromise.complete
StreamExtra.compileToRestartingStream("[EffectStatsSettingsRepository]") {
processStream
}.start >>= fiberPromise.complete
}
}
}
Original file line number Diff line number Diff line change
@@ -54,7 +54,8 @@ object PocketInventoryRepositoryDefinition {
}

EffectExtra.runAsyncAndForget[F, G, Unit] {
StreamExtra.compileToRestartingStream(processStream).start >>= fiberPromise.complete
StreamExtra.compileToRestartingStream("[PocketInventoryRepository]")(processStream).start >>=
fiberPromise.complete
}
}
}
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ object System {

EffectExtra.runAsyncAndForget[F, G, Unit] {
streams
.traverse(StreamExtra.compileToRestartingStream(_).start)
.traverse(StreamExtra.compileToRestartingStream("[GachaPoint]")(_).start)
.as(())
}
}
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ object System {
implicit val sendBukkitMessage: SendMinecraftMessage[F, Player] = SendBukkitMessage[F]
implicit val broadcastBukkitMessage: BroadcastMinecraftMessage[F] = BroadcastBukkitMessage[F]

StreamExtra.compileToRestartingStream {
StreamExtra.compileToRestartingStream("[HalfHourRanking]") {
breakCountReadAPI
.batchedIncreases(30.minutes)
.map(RankingRecord.apply)
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ object System {
_ <- List(
UpdateManaCaps.using[F, G, Player](handles.repository),
RefillToCap.using[F, G, Player](handles.repository)
).traverse(StreamExtra.compileToRestartingStream[F, Unit](_).start)
).traverse(StreamExtra.compileToRestartingStream[F, Unit]("[Mana]")(_).start)
} yield new System[F, G, Player] {
override val manaApi: ManaApi[F, G, Player] = new ManaApi[F, G, Player] {
override val readManaAmount: KeyedDataRepository[Player, G[LevelCappedManaAmount]] =
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ object ManaBarSynchronizationRepository {
type BossBarWithPlayer[F[_], P] = MinecraftBossBar[F] {type Player = P}

import cats.implicits._
import cats.effect.implicits._

def withContext[
G[_] : Sync,
@@ -36,9 +37,8 @@ object ManaBarSynchronizationRepository {

val programToRunAsync =
bossBar.players.add(player) >>
Concurrent[F].start[Nothing] {
StreamExtra.compileToRestartingStream[F, Nothing](synchronization)
} >>= promise.complete
StreamExtra.compileToRestartingStream[F, Nothing]("[ManaBarSynchronization]")(synchronization).start >>=
promise.complete

EffectExtra.runAsyncAndForget[F, G, Unit](programToRunAsync)
}
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ object GenericRefreshingRankingCache {
initialRankingRecords <- persistence.getAllRankingRecords
rankingRef <- Ref.of(new Ranking(initialRankingRecords))
_ <-
StreamExtra.compileToRestartingStream[F, Unit] {
StreamExtra.compileToRestartingStream[F, Unit]("[GenericRefreshingRankingCache]") {
fs2.Stream
.awakeEvery[F](30.seconds)
.evalMap(_ => persistence.getAllRankingRecords)
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ object System {
}
}

StreamExtra.compileToRestartingStream {
StreamExtra.compileToRestartingStream("[SeichiLevelUpGift]") {
breakCountReadApi
.seichiLevelUpdates
.evalTap { case (player, diff) => interpreter.onLevelDiff(diff).run(player) }
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ object System {
G[_],
Player
](implicit breakCountReadAPI: BreakCountReadAPI[F, G, Player]): F[Nothing] = {
StreamExtra.compileToRestartingStream {
StreamExtra.compileToRestartingStream("[SeichiLevelUpMessage]") {
breakCountReadAPI
.seichiLevelUpdates
.evalMap { case (player, diff) =>

0 comments on commit 7a867b3

Please sign in to comment.