Skip to content

Commit

Permalink
Merge pull request #1960 from GiganticMinecraft/forceRetrySave
Browse files Browse the repository at this point in the history
Finalizerの処理に失敗した場合にリトライするようにする
  • Loading branch information
rito528 authored May 7, 2023
2 parents 7886dc2 + 9debe67 commit a46f26f
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.bukkit.entity.Player
import org.bukkit.event.Listener

class System[F[_]: ConcurrentEffect: Timer](
val finalizer: PlayerDataFinalizer[F, Player],
val finalizers: List[PlayerDataFinalizer[F, Player]],
messagePublishingContext: ContextShift[IO]
)(
implicit configuration: Configuration,
Expand All @@ -26,6 +26,6 @@ class System[F[_]: ConcurrentEffect: Timer](
implicit val _synchronization: BungeeSemaphoreSynchronization[F[Unit], PlayerName] = {
new RedisBungeeSemaphoreSynchronization[F]()
}
Seq(new BungeeSemaphoreCooperator[F](finalizer))
Seq(new BungeeSemaphoreCooperator[F](finalizers))
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.github.unchama.bungeesemaphoreresponder.bukkit.listeners

import cats.ApplicativeError
import cats.effect.{Async, ConcurrentEffect, Timer}
import cats.data.Validated
import cats.effect.{Async, ConcurrentEffect, Sync, Timer}
import com.github.unchama.bungeesemaphoreresponder.Configuration
import com.github.unchama.bungeesemaphoreresponder.domain.actions.BungeeSemaphoreSynchronization
import com.github.unchama.bungeesemaphoreresponder.domain.{PlayerDataFinalizer, PlayerName}
import com.github.unchama.generic.EitherExtra
import com.github.unchama.generic.effect.ConcurrentExtra.attemptInParallel
import com.github.unchama.generic.effect.MonadThrowExtra.retryUntilSucceeds
import com.github.unchama.generic.effect.unsafe.EffectEnvironment
import org.bukkit.entity.Player
import org.bukkit.event.player.PlayerQuitEvent
Expand All @@ -14,16 +16,13 @@ import org.bukkit.event.{EventHandler, EventPriority, Listener}
import scala.concurrent.duration.{Duration, FiniteDuration}

class BungeeSemaphoreCooperator[F[_]: ConcurrentEffect: Timer](
finalizer: PlayerDataFinalizer[F, Player]
finalizers: List[PlayerDataFinalizer[F, Player]]
)(
implicit synchronization: BungeeSemaphoreSynchronization[F[Unit], PlayerName],
configuration: Configuration,
effectEnvironment: EffectEnvironment
) extends Listener {

import cats.effect.implicits._
import cats.implicits._

@EventHandler(priority = EventPriority.LOWEST)
def onQuit(event: PlayerQuitEvent): Unit = {
val player = event.getPlayer
Expand All @@ -36,17 +35,29 @@ class BungeeSemaphoreCooperator[F[_]: ConcurrentEffect: Timer](
case object TimeoutReached
extends Exception(s"Timeout ${configuration.saveTimeoutDuration} reached!")

val quitProcess = attemptInParallel(
finalizers.map(finalizer => retryUntilSucceeds(finalizer.onQuitOf(player))(10))
)

import cats.implicits._

val program = for {
fiber <- finalizer.onQuitOf(player).attempt.start
result <- ConcurrentEffect[F].race(timeout, fiber.join)
_ <- EitherExtra.unassociate(result) match {
case Left(timeoutOrErrorOnFinalization) =>
synchronization.notifySaveFailureOf(name) >>
ApplicativeError[F, Throwable].raiseError[Unit] {
timeoutOrErrorOnFinalization.getOrElse(TimeoutReached)
}
case Right(_) =>
synchronization.confirmSaveCompletionOf(name)
raceResult <- ConcurrentEffect[F].race(timeout, quitProcess)
_ <- raceResult match {
case Left(_) =>
synchronization.notifySaveFailureOf(name) >> ApplicativeError[F, Throwable]
.raiseError[Unit](TimeoutReached)
case Right(results) =>
results.traverse(e => Validated.fromEither(e).leftMap(List.apply(_))) match {
case Validated.Valid(_) =>
synchronization.confirmSaveCompletionOf(name)
case Validated.Invalid(errors) =>
synchronization.notifySaveFailureOf(name) >> errors.traverse(error =>
Sync[F].delay {
error.printStackTrace()
}
)
}
}
} yield ()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.unchama.bungeesemaphoreresponder.domain

import cats.effect.ConcurrentEffect
import cats.{Applicative, ApplicativeError, ~>}
import cats.~>
import com.github.unchama.generic.ContextCoercion

/**
Expand Down Expand Up @@ -38,24 +37,4 @@ object PlayerDataFinalizer {
def apply[F[_], Player](f: Player => F[Unit]): PlayerDataFinalizer[F, Player] =
(player: Player) => f(player)

import cats.effect.implicits._
import cats.implicits._

def concurrently[F[_]: ConcurrentEffect, Player](
finalizers: List[PlayerDataFinalizer[F, Player]]
): PlayerDataFinalizer[F, Player] =
PlayerDataFinalizer { player =>
for {
fibers <- finalizers.traverse(_.onQuitOf(player).attempt.start)
results <- fibers.traverse(_.join)
_ <-
// TODO: 最初のエラーしか報告されていないが、全部報告すべき
results.collectFirst { case Left(error) => error } match {
case Some(error) =>
ApplicativeError[F, Throwable].raiseError(error)
case None =>
Applicative[F].unit
}
} yield ()
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.unchama.generic.effect

import cats.Applicative
import cats.effect.concurrent.Deferred
import cats.effect.{CancelToken, Concurrent}

Expand Down Expand Up @@ -33,4 +34,31 @@ object ConcurrentExtra {

a <- fiber.join
} yield a

import cats.effect._

/**
* 与えられた複数の入力プログラムをすべて並列に実行するようなプログラムを構築します。
* 構築されたプログラムは次のような挙動をします:
* - 構築されたプログラムの実行 fiber がキャンセルされた時、
* その時点で並列で走っている入力プログラムの実行 fiber がすべてキャンセルされます
* - 入力プログラムの実行 fiber がエラー等で早期終了しても、他の実行 fiber への影響はありません
* (一つの fiber が異常終了しても、他の fiber のキャンセルなどは行われません)
* - 結果の `List[Either[Throwable, A]]` は入力プログラムの終了結果を
* (入力プログラムが与えられた順と同じ順で) 保持しています。
*
* 各値は
* - `Right[A]` だった場合、入力プログラムが `A` を結果として正常終了したこと
* - `Left[Throwable]` だった場合、入力プログラムが例外を送出して異常終了したこと
* をそれぞれ表します。
*/
def attemptInParallel[F[_]: ConcurrentEffect, A](
programs: List[F[A]]
): F[List[Either[Throwable, A]]] = {
ConcurrentEffect[F]
.bracketCase(programs.traverse(Concurrent[F].start(_)))(_.traverse(_.join.attempt)) {
case (fibers, ExitCase.Canceled) => fibers.traverse(_.cancel).void
case _ => Applicative[F].unit
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.github.unchama.generic.effect

import cats.MonadError
import cats.effect.Sync

object MonadThrowExtra {

import cats.implicits._

def retryUntilSucceeds[F[_]: Sync, A](fa: F[A])(limit: Int): F[A] = {
require(limit >= 1)
def go(currentIterationCount: Int, occurredException: Option[Throwable]): F[A] = {
if (currentIterationCount <= limit) {
fa.attempt.flatMap {
case Right(a) =>
Sync[F].delay(occurredException.foreach(_.printStackTrace())).as(a)
case Left(error) =>
go(currentIterationCount + 1, Some(error))
}
} else {
// このelse節に入っている時点で1度は失敗しているので、`occurredExceptions`が`None`であることはありえない。
case object LimitReached
extends Exception(s"Limit $limit reached!", occurredException.last)

MonadError[F, Throwable].raiseError(LimitReached)
}
}

go(0, None)
}

}
20 changes: 10 additions & 10 deletions src/main/scala/com/github/unchama/seichiassist/SeichiAssist.scala
Original file line number Diff line number Diff line change
Expand Up @@ -546,15 +546,13 @@ class SeichiAssist extends JavaPlugin() {
import PluginExecutionContexts.timer

new BungeeSemaphoreResponderSystem(
PlayerDataFinalizer.concurrently[IO, Player](
Seq(
savePlayerData,
assaultSkillRoutinesRepositoryControls.finalizer.coerceContextTo[IO],
activeSkillAvailabilityRepositoryControls.finalizer.coerceContextTo[IO]
).appendedAll(wiredSubsystems.flatMap(_.managedFinalizers))
.appendedAll(wiredSubsystems.flatMap(_.managedRepositoryControls.map(_.finalizer)))
.toList
),
Seq(
savePlayerData,
assaultSkillRoutinesRepositoryControls.finalizer.coerceContextTo[IO],
activeSkillAvailabilityRepositoryControls.finalizer.coerceContextTo[IO]
).appendedAll(wiredSubsystems.flatMap(_.managedFinalizers))
.appendedAll(wiredSubsystems.flatMap(_.managedRepositoryControls.map(_.finalizer)))
.toList,
PluginExecutionContexts.asyncShift
)
}
Expand Down Expand Up @@ -869,7 +867,9 @@ class SeichiAssist extends JavaPlugin() {
.getOnlinePlayers
.asScala
.toList
.traverse(bungeeSemaphoreResponderSystem.finalizer.onQuitOf)
.flatTraverse { player =>
bungeeSemaphoreResponderSystem.finalizers.traverse(_.onQuitOf(player))
}
.unsafeRunSync()

if (SeichiAssist.databaseGateway.disconnect() == ActionStatus.Fail) {
Expand Down

0 comments on commit a46f26f

Please sign in to comment.