diff --git a/zio-flow-runtime/src/main/scala/zio/flow/runtime/IndexedStore.scala b/zio-flow-runtime/src/main/scala/zio/flow/runtime/IndexedStore.scala index 4a0bc06a4..750dd0726 100644 --- a/zio-flow-runtime/src/main/scala/zio/flow/runtime/IndexedStore.scala +++ b/zio-flow-runtime/src/main/scala/zio/flow/runtime/IndexedStore.scala @@ -33,12 +33,17 @@ object IndexedStore { type Index = Index.Type object Index extends Subtype[Long] { implicit val schema: Schema[Index] = Schema[Long].transform(Index(_), Index.unwrap) + + val initial: Index = Index(0L) + val max: Index = Index(Long.MaxValue) } implicit class IndexSyntax(private val index: Index) extends AnyVal { def next: Index = Index(index + 1) } + val any: ZLayer[IndexedStore, Nothing, IndexedStore] = ZLayer.service[IndexedStore] + def position(topic: String): ZIO[IndexedStore, Throwable, Index] = ZIO.serviceWithZIO(_.position(topic)) @@ -80,8 +85,11 @@ object IndexedStore { ZStream.unwrap { topics.get.map { topics => topics.get(topic) match { - case Some(values) => ZStream.fromIterable(values.slice(position.toInt, until.toInt)) - case None => ZStream.empty + case Some(values) => + val slice = values.slice(position.toInt, math.min(values.size.toLong, until.toLong).toInt) + ZStream.fromIterable(slice) + case None => + ZStream.empty } } } diff --git a/zio-flow-runtime/src/main/scala/zio/flow/runtime/PersisterConfig.scala b/zio-flow-runtime/src/main/scala/zio/flow/runtime/PersisterConfig.scala new file mode 100644 index 000000000..1e72d94aa --- /dev/null +++ b/zio-flow-runtime/src/main/scala/zio/flow/runtime/PersisterConfig.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2021-2023 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.flow.runtime + +import zio.{Config, Duration} + +sealed trait PersisterConfig +object PersisterConfig { + case object SnapshotsOnly extends PersisterConfig + final case class PeriodicSnapshots( + afterEvery: Option[Int], + afterDuration: Option[Duration] + ) extends PersisterConfig + + val config: Config[PersisterConfig] = + Config + .string("type") + .switch( + "snapshots-only" -> Config.succeed(SnapshotsOnly), + "periodic-snapshots" -> + (Config.int("after-every").optional ++ + Config.duration("after-duration").optional).map { case (afterEvery, afterDuration) => + PeriodicSnapshots(afterEvery, afterDuration) + } + ) +} diff --git a/zio-flow-runtime/src/main/scala/zio/flow/runtime/ZFlowExecutor.scala b/zio-flow-runtime/src/main/scala/zio/flow/runtime/ZFlowExecutor.scala index 8d656ad29..9db5ee0fd 100644 --- a/zio-flow-runtime/src/main/scala/zio/flow/runtime/ZFlowExecutor.scala +++ b/zio-flow-runtime/src/main/scala/zio/flow/runtime/ZFlowExecutor.scala @@ -18,7 +18,7 @@ package zio.flow.runtime import zio._ import zio.flow.runtime.internal.PersistentExecutor.FlowResult -import zio.flow.runtime.internal.{DefaultOperationExecutor, PersistentExecutor} +import zio.flow.runtime.internal.{DefaultOperationExecutor, PersistentExecutor, PersistentState} import zio.flow.runtime.operation.http.HttpOperationPolicies import zio.flow.runtime.serialization.ExecutorBinaryCodecs import zio.flow.{Configuration, FlowId, RemoteVariableName, ZFlow} @@ -152,26 +152,31 @@ object ZFlowExecutor { ZIO.serviceWithZIO(_.forceGarbageCollection()) val default: ZLayer[ - KeyValueStore with IndexedStore with ExecutorBinaryCodecs with Configuration, + KeyValueStore with IndexedStore with PersistentState with ExecutorBinaryCodecs with Configuration, Throwable, ZFlowExecutor ] = ZLayer - .makeSome[KeyValueStore with IndexedStore with ExecutorBinaryCodecs with Configuration, ZFlowExecutor]( + .makeSome[ + KeyValueStore with IndexedStore with PersistentState with ExecutorBinaryCodecs with Configuration, + ZFlowExecutor + ]( DurableLog.layer, DefaultOperationExecutor.layer, HttpOperationPolicies.disabled, PersistentExecutor.make() ) - val defaultJson: ZLayer[KeyValueStore with IndexedStore with Configuration, Throwable, ZFlowExecutor] = - ZLayer.makeSome[KeyValueStore with IndexedStore with Configuration, ZFlowExecutor]( + val defaultJson + : ZLayer[KeyValueStore with IndexedStore with PersistentState with Configuration, Throwable, ZFlowExecutor] = + ZLayer.makeSome[KeyValueStore with IndexedStore with PersistentState with Configuration, ZFlowExecutor]( ZLayer.succeed(serialization.json), default ) - val defaultProtobuf: ZLayer[KeyValueStore with IndexedStore with Configuration, Throwable, ZFlowExecutor] = - ZLayer.makeSome[KeyValueStore with IndexedStore with Configuration, ZFlowExecutor]( + val defaultProtobuf + : ZLayer[KeyValueStore with IndexedStore with PersistentState with Configuration, Throwable, ZFlowExecutor] = + ZLayer.makeSome[KeyValueStore with IndexedStore with PersistentState with Configuration, ZFlowExecutor]( ZLayer.succeed(serialization.protobuf), default ) @@ -181,6 +186,7 @@ object ZFlowExecutor { KeyValueStore.inMemory, IndexedStore.inMemory, ZLayer.succeed(serialization.json), + PersistentState.snapshotOnly, default ) @@ -189,6 +195,7 @@ object ZFlowExecutor { KeyValueStore.inMemory, IndexedStore.inMemory, ZLayer.succeed(serialization.protobuf), + PersistentState.snapshotOnly, default ) } diff --git a/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/PersistentExecutor.scala b/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/PersistentExecutor.scala index 1d132bfa5..b872678d5 100644 --- a/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/PersistentExecutor.scala +++ b/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/PersistentExecutor.scala @@ -36,6 +36,7 @@ final case class PersistentExecutor( execEnv: ExecutionEnvironment, durableLog: DurableLog, kvStore: KeyValueStore, + persistentState: PersistentState, operationExecutor: OperationExecutor, workflows: TMap[FlowId, Promise[Nothing, PersistentExecutor.RuntimeState]], gcQueue: Queue[GarbageCollectionCommand], @@ -98,14 +99,15 @@ final case class PersistentExecutor( .fromEither( execEnv.codecs.decode[PersistentExecutor.State[Any, Any]](rawState) ) - .mapBoth( - error => ExecutorError.DeserializationError(s"state of $id", error.message), - state => + .mapError(error => ExecutorError.DeserializationError(s"state of $id", error.message)) + .flatMap { snapshot => + persistentState.loadUsingSnapshot(id, snapshot).map { state => ( - FlowId.unsafeMake(new String(rawKey.toArray, StandardCharsets.UTF_8)), + id, state ) - ) + } + } } .runCollect _ <- ZIO.foreachDiscard(deserializedStates) { case (id, state) => @@ -135,7 +137,7 @@ final case class PersistentExecutor( getResultFromPromise(runtimeState.result) } case None => - loadState(id).flatMap { + persistentState.load(id).flatMap { case None => ZIO.fail(ExecutorError.InvalidOperationArguments("Unknown flow id:" + FlowId.unwrap(id))) case Some(state) => @@ -153,18 +155,12 @@ final case class PersistentExecutor( _ <- ZIO .fail(ExecutorError.InvalidOperationArguments(s"Cannot delete running flow $id")) .when(rts.isDefined) - _ <- ZIO.logInfo(s"Deleting persisted state $id") - state <- loadState(id) - _ <- state match { - case Some(state) => - for { - _ <- kvStore - .delete(Namespaces.workflowState, id.toRaw, None) - .mapError(ExecutorError.KeyValueStoreError("delete", _)) - _ <- state.result.delete().provideEnvironment(promiseEnv) - } yield () - case None => ZIO.unit - } + maybeResult <- persistentState.delete(id) + _ <- + maybeResult match { + case Some(result) => result.delete().provideEnvironment(promiseEnv) + case None => ZIO.unit + } } yield () override def pause(id: FlowId): ZIO[Any, ExecutorError, Unit] = @@ -201,10 +197,14 @@ final case class PersistentExecutor( .fromEither( execEnv.codecs.decode[PersistentExecutor.State[Any, Any]](rawState) ) - .mapBoth( - error => ExecutorError.DeserializationError(s"state of $id", error.message), - state => (id, state.status) - ) + .mapError(error => ExecutorError.DeserializationError(s"state of $id", error.message)) + .flatMap { snapshot => + persistentState + .loadUsingSnapshot(id, snapshot) + .map { state => + (id, state.status) + } + } } override def getVariable(id: FlowId, name: RemoteVariableName): ZIO[Any, ExecutorError, Option[DynamicValue]] = @@ -218,7 +218,7 @@ final case class PersistentExecutor( state <- runtimeState.state.get } yield Some(state) case None => - loadState(id) + persistentState.load(id) } .flatMap { case Some(state) => @@ -251,7 +251,7 @@ final case class PersistentExecutor( } yield () case None => // The flow is not running currently - loadState(id).flatMap { + persistentState.load(id).flatMap { case None => ZIO.fail(ExecutorError.InvalidOperationArguments("Unknown flow id:" + FlowId.unwrap(id))) case Some(state) => @@ -316,9 +316,11 @@ final case class PersistentExecutor( startedAt = now, suspendedAt = None, totalExecutionTime = Duration.ZERO, - currentExecutionTime = Duration.ZERO + currentExecutionTime = Duration.ZERO, + journalIndex = None ) - state <- loadState(id.asFlowId) + state <- persistentState + .load(id.asFlowId) .map( _.getOrElse(freshState).asInstanceOf[State[E, A]] ) @@ -332,7 +334,7 @@ final case class PersistentExecutor( private def run[E, A]( initialState: State[E, A], - promise: Promise[_, PersistentExecutor.RuntimeState] + promise: Promise[Nothing, PersistentExecutor.RuntimeState] ): ZIO[Any, Nothing, DurablePromise[Either[ExecutorError, DynamicValue], FlowResult]] = { import zio.flow.ZFlow._ @@ -982,16 +984,12 @@ final case class PersistentExecutor( } } - def runSteps( - stateRef: Ref[State[E, A]], - messages: Queue[ExecutionCommand], - executionStartedAt: OffsetDateTime - ): ZIO[ + def runSteps(runtimeState: RuntimeState): ZIO[ VirtualClock with KeyValueStore with ExecutionEnvironment with DurableLog, ExecutorError, Unit ] = - stateRef.get.flatMap { lastState => + runtimeState.state.get.flatMap { lastState => ZIO.logAnnotate( LogAnnotation("flowId", lastState.id.asString), LogAnnotation("vts", lastState.lastTimestamp.value.toString) @@ -1004,13 +1002,14 @@ final case class PersistentExecutor( for { recordingContext <- RecordingRemoteContext.startRecording remoteContext <- recordingContext.remoteContext(scope) - changesFromMessages <- processMessages(messages).provide(ZLayer.succeed(remoteContext)) + changesFromMessages <- processMessages(runtimeState.messages).provide(ZLayer.succeed(remoteContext)) state0 <- persistState( lastState, changesFromMessages, - recordingContext + recordingContext, + runtimeState.persister ).map(_.asInstanceOf[PersistentExecutor.State[E, A]]) stepResult <- @@ -1044,27 +1043,32 @@ final case class PersistentExecutor( }.race { PersistentRemoteContext.make(scope.rootScope).flatMap { topLevelRemoteContext => // Processing execution commands in the background, as it may contain external variable modification - waitAndProcessMessages(messages).forever + waitAndProcessMessages(runtimeState.messages).forever .provide(ZLayer.succeed(topLevelRemoteContext)) } } case FlowStatus.Paused => ZIO.logInfo(s"Flow paused, waiting for resume command") *> - waitAndProcessMessages(messages).provide(ZLayer.succeed(remoteContext)).map { changes => - StepResult(changes, continue = true) + waitAndProcessMessages(runtimeState.messages).provide(ZLayer.succeed(remoteContext)).map { + changes => + StepResult(changes, continue = true) } } now <- Clock.currentDateTime state2 <- persistState( state0, - stepResult.stateChange ++ StateChange.updateCurrentExecutionTime(now, executionStartedAt), - recordingContext + stepResult.stateChange ++ StateChange.updateCurrentExecutionTime( + now, + runtimeState.executionStartedAt + ), + recordingContext, + runtimeState.persister ) - _ <- stateRef.set(state2.asInstanceOf[PersistentExecutor.State[E, A]]) + _ <- runtimeState.state.set(state2.asInstanceOf[PersistentExecutor.State[E, A]]) } yield stepResult }.flatMap { stepResult => - runSteps(stateRef, messages, executionStartedAt).when(stepResult.continue).unit + runSteps(runtimeState).when(stepResult.continue).unit } } } @@ -1076,20 +1080,21 @@ final case class PersistentExecutor( startGate <- Promise.make[Nothing, Unit] now <- Clock.currentDateTime messages <- Queue.unbounded[ExecutionCommand] + persister <- persistentState.createPersister(initialState.id.asFlowId) fiber <- { for { - _ <- startGate.await + _ <- startGate.await + runtimeState <- promise.await _ <- { for { - state <- saveStateChange( - initialState.id.asFlowId, + state <- persister.saveStateChange( initialState, StateChange.resetExecutionTime, initialState.lastTimestamp ) _ <- ref.set(state) - _ <- runSteps(ref, messages, executionStartedAt = now) + _ <- runSteps(runtimeState) .provide( ZLayer.succeed(execEnv), ZLayer.succeed(kvStore), @@ -1125,7 +1130,8 @@ final case class PersistentExecutor( fiber = fiber, executionStartedAt = now, state = ref.asInstanceOf[Ref[State[_, _]]], - messages = messages + messages = messages, + persister = persister ) _ <- promise.succeed(runtimeState) _ <- startGate.succeed(()) @@ -1174,7 +1180,8 @@ final case class PersistentExecutor( private def persistState( state0: PersistentExecutor.State[_, _], stateChange: PersistentExecutor.StateChange, - recordingContext: RecordingRemoteContext + recordingContext: RecordingRemoteContext, + persister: Persister ): ZIO[ VirtualClock with KeyValueStore with RemoteVariableKeyValueStore with ExecutionEnvironment with DurableLog, ExecutorError, @@ -1228,45 +1235,12 @@ final case class PersistentExecutor( state2 = additionalStateChanges(state1) shouldPersist = modifiedVariables.nonEmpty || stateChange.contains(_.forcePersistence) - _ <- saveStateChange(state2.id.asFlowId, state0, stateChange ++ additionalStateChanges, currentTimestamp) + _ <- persister + .saveStateChange(state0, stateChange ++ additionalStateChanges, currentTimestamp) .when(shouldPersist) } yield state2 } - private def loadState(id: FlowId): IO[ExecutorError, Option[PersistentExecutor.State[_, _]]] = - for { - _ <- ZIO.logInfo(s"Looking for persisted flow state $id") - key = id.toRaw - persistedState <- kvStore - .getLatest(Namespaces.workflowState, key, None) - .mapError(ExecutorError.KeyValueStoreError("getLatest", _)) - state <- persistedState match { - case Some(bytes) => - ZIO.logInfo(s"Using persisted state (${bytes.size} bytes)") *> - ZIO - .fromEither(execEnv.codecs.decode[PersistentExecutor.State[Any, Any]](bytes)) - .mapBoth(error => ExecutorError.DeserializationError(s"state of $id", error.message), Some(_)) - case None => ZIO.logInfo("No persisted state available").as(None) - } - } yield state - - private def saveStateChange[E, A]( - id: FlowId, - baseState: State[E, A], - changes: StateChange, - currentTimestamp: Timestamp - ): ZIO[Any, ExecutorError.KeyValueStoreError, State[E, A]] = - for { - updatedState <- ZIO.succeed(changes(baseState)) - persistedState = execEnv.codecs.encode(updatedState.asInstanceOf[State[Any, Any]]) - key = id.toRaw - _ <- metrics.serializedFlowStateSize.update(persistedState.size) - _ <- kvStore - .put(Namespaces.workflowState, key, persistedState, currentTimestamp) - .mapError(ExecutorError.KeyValueStoreError("put", _)) - _ <- ZIO.logTrace(s"State of $id persisted") - } yield updatedState - private def interruptFlow(id: FlowId): ZIO[Any, ExecutorError, Boolean] = for { _ <- ZIO.log(s"Interrupting flow $id") @@ -1276,10 +1250,14 @@ final case class PersistentExecutor( for { runtimeState <- runtimeStatePromise.await _ <- runtimeState.fiber.interrupt - _ <- loadState(id).flatMap { + _ <- persistentState.load(id).flatMap { case None => ZIO.unit case Some(lastState) => - saveStateChange(id, lastState, StateChange.done, lastState.lastTimestamp) + runtimeState.persister.saveStateChange( + lastState, + StateChange.done, + lastState.lastTimestamp + ) } _ <- runtimeState.result .fail(Left(ExecutorError.Interrupted))(execEnv.codecs) @@ -1372,7 +1350,7 @@ final case class PersistentExecutor( _ <- ZIO.logInfo(s"Garbage Collection starting") allStoredVariables <- RemoteVariableKeyValueStore.allStoredVariables.runCollect.map(_.toSet) allWorkflows <- workflows.keys.commit - allStates <- ZIO.foreach(allWorkflows)(loadState).map(_.flatten) + allStates <- ZIO.foreach(allWorkflows)(persistentState.load).map(_.flatten) allReferencedVariables <- ZIO.foldLeft(allStates)(Map.empty[ScopedRemoteVariableName, Timestamp]) { case (vars, state) => val topLevelReferencedVariables = state.current.variableUsage @@ -1462,7 +1440,12 @@ object PersistentExecutor { def make( gcPeriod: Duration = 5.minutes ): ZLayer[ - DurableLog with KeyValueStore with Configuration with OperationExecutor with ExecutorBinaryCodecs, + DurableLog + with KeyValueStore + with PersistentState + with Configuration + with OperationExecutor + with ExecutorBinaryCodecs, Nothing, ZFlowExecutor ] = @@ -1471,10 +1454,10 @@ object PersistentExecutor { configuration <- ZIO.service[Configuration] codecs <- ZIO.service[ExecutorBinaryCodecs] } yield runtime.ExecutionEnvironment(codecs, configuration, gcPeriod) - } >+> (DurableLog.any ++ KeyValueStore.any ++ OperationExecutor.any) >>> layer + } >+> (PersistentState.any ++ DurableLog.any ++ KeyValueStore.any ++ OperationExecutor.any) >>> layer val layer: ZLayer[ - DurableLog with KeyValueStore with ExecutionEnvironment with OperationExecutor, + PersistentState with DurableLog with KeyValueStore with ExecutionEnvironment with OperationExecutor, Nothing, ZFlowExecutor ] = @@ -1482,6 +1465,7 @@ object PersistentExecutor { for { durableLog <- ZIO.service[DurableLog] kvStore <- ZIO.service[KeyValueStore] + persistentState <- ZIO.service[PersistentState] operationExecutor <- ZIO.service[OperationExecutor] execEnv <- ZIO.service[ExecutionEnvironment] workflows <- TMap.empty[FlowId, Promise[Nothing, PersistentExecutor.RuntimeState]].commit @@ -1496,6 +1480,7 @@ object PersistentExecutor { execEnv, durableLog, kvStore, + persistentState, operationExecutor, workflows, gcQueue, @@ -1630,28 +1615,94 @@ object PersistentExecutor { } def forcePersistence: Boolean = false + + def toChunk: Chunk[StateChange] = self match { + case StateChange.SequentialChange(changes) => changes + case _ => Chunk.single(self) + } + + def name: String } object StateChange { private case object NoChange extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state + + val schema: Schema[NoChange.type] = Schema.singleton(NoChange) + val schemaCase: Schema.Case[StateChange, NoChange.type] = + Schema.Case[StateChange, NoChange.type]( + "NoChange", + schema, + _.asInstanceOf[NoChange.type], + _.asInstanceOf[StateChange], + _.isInstanceOf[NoChange.type] + ) + + override def name: String = "NoChange" } + private final case class SequentialChange(changes: Chunk[StateChange]) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = changes.foldLeft(state) { case (state, change) => change(state) } + + override def name: String = "SequentialChange" } + private final case class SetCurrent(current: ZFlow[_, _, _]) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(current = current) + + override def name: String = "SetCurrent" + } + private object SetCurrent { + val schema: Schema[SetCurrent] = + ZFlow.schemaAny.transform(SetCurrent(_), _.current.asInstanceOf[ZFlow[Any, Any, Any]]) + val schemaCase: Schema.Case[StateChange, SetCurrent] = + Schema.Case[StateChange, SetCurrent]( + "SetCurrent", + schema, + _.asInstanceOf[SetCurrent], + _.asInstanceOf[StateChange], + _.isInstanceOf[SetCurrent] + ) } + private final case class PushContinuation(cont: Instruction) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(stack = cont :: state.stack) + + override def name: String = "PushContinuation" + } + private object PushContinuation { + val schema: Schema[PushContinuation] = + Instruction.schema.transform(PushContinuation(_), _.cont) + val schemaCase: Schema.Case[StateChange, PushContinuation] = + Schema.Case[StateChange, PushContinuation]( + "PushContinuation", + schema, + _.asInstanceOf[PushContinuation], + _.asInstanceOf[StateChange], + _.isInstanceOf[PushContinuation] + ) } + private final case object PopContinuation extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(stack = state.stack.tail) + + val schema: Schema[PopContinuation.type] = Schema.singleton(PopContinuation) + val schemaCase: Schema.Case[StateChange, PopContinuation.type] = + Schema.Case[StateChange, PopContinuation.type]( + "PopContinuation", + schema, + _.asInstanceOf[PopContinuation.type], + _.asInstanceOf[StateChange], + _.isInstanceOf[PopContinuation.type] + ) + + override def name: String = "PopContinuation" } + private final case class AddCompensation(newCompensation: ZFlow[Any, ActivityError, Unit]) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy( @@ -1664,7 +1715,25 @@ object PersistentExecutor { ) override def forcePersistence: Boolean = true + + override def name: String = "AddCompensation" + } + private object AddCompensation { + val schema: Schema[AddCompensation] = + ZFlow.schemaAny.transform( + flow => AddCompensation(flow.asInstanceOf[ZFlow[Any, ActivityError, Unit]]), + _.newCompensation.asInstanceOf[ZFlow[Any, Any, Any]] + ) + val schemaCase: Schema.Case[StateChange, AddCompensation] = + Schema.Case[StateChange, AddCompensation]( + "AddCompensation", + schema, + _.asInstanceOf[AddCompensation], + _.asInstanceOf[StateChange], + _.isInstanceOf[AddCompensation] + ) } + private final case class EnterTransaction(flow: ZFlow[Any, _, _]) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = { val transactionId = TransactionId.fromCounter(state.transactionCounter) @@ -1680,13 +1749,44 @@ object PersistentExecutor { transactionCounter = state.transactionCounter + 1 ) } + + override def name: String = "EnterTransaction" } + private object EnterTransaction { + val schema: Schema[EnterTransaction] = + ZFlow.schemaAny.transform( + flow => EnterTransaction(flow.asInstanceOf[ZFlow[Any, Any, Any]]), + _.flow.asInstanceOf[ZFlow[Any, Any, Any]] + ) + val schemaCase: Schema.Case[StateChange, EnterTransaction] = + Schema.Case[StateChange, EnterTransaction]( + "EnterTransaction", + schema, + _.asInstanceOf[EnterTransaction], + _.asInstanceOf[StateChange], + _.isInstanceOf[EnterTransaction] + ) + } + private case object LeaveTransaction extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy( transactionStack = state.transactionStack.tail ) + + val schema: Schema[LeaveTransaction.type] = Schema.singleton(LeaveTransaction) + val schemaCase: Schema.Case[StateChange, LeaveTransaction.type] = + Schema.Case[StateChange, LeaveTransaction.type]( + "LeaveTransaction", + schema, + _.asInstanceOf[LeaveTransaction.type], + _.asInstanceOf[StateChange], + _.isInstanceOf[LeaveTransaction.type] + ) + + override def name: String = "LeaveTransaction" } + private case class RevertCurrentTransaction[E0](failure: Remote[E0]) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.transactionStack.headOption match { @@ -1707,7 +1807,28 @@ object PersistentExecutor { ) case None => state } + + override def forcePersistence: Boolean = true + + override def name: String = "RevertCurrentTransaction" + } + private object RevertCurrentTransaction { + def schema[E]: Schema[RevertCurrentTransaction[E]] = + Remote.schemaAny.transform( + remote => RevertCurrentTransaction(remote.asInstanceOf[Remote[E]]), + _.failure.asInstanceOf[Remote[Any]] + ) + + def schemaCase[E]: Schema.Case[StateChange, RevertCurrentTransaction[E]] = + Schema.Case( + "RevertCurrentTransaction", + schema[E], + _.asInstanceOf[RevertCurrentTransaction[E]], + _.asInstanceOf[StateChange], + _.isInstanceOf[RevertCurrentTransaction[_]] + ) } + private final case class RestartCurrentTransaction(suspend: Boolean, now: OffsetDateTime) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.transactionStack.headOption match { @@ -1750,33 +1871,144 @@ object PersistentExecutor { } override def forcePersistence: Boolean = true + + override def name: String = "RestartCurrentTransaction" + } + private object RestartCurrentTransaction { + val schema: Schema[RestartCurrentTransaction] = + Schema + .tuple2(Schema[Boolean], Schema[OffsetDateTime]) + .transform( + { case (suspend, now) => RestartCurrentTransaction(suspend, now) }, + { case RestartCurrentTransaction(suspend, now) => (suspend, now) } + ) + + val schemaCase: Schema.Case[StateChange, RestartCurrentTransaction] = + Schema.Case( + "RestartCurrentTransaction", + schema, + _.asInstanceOf[RestartCurrentTransaction], + _.asInstanceOf[StateChange], + _.isInstanceOf[RestartCurrentTransaction] + ) } + private case object IncreaseForkCounter extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(forkCounter = state.forkCounter + 1) + + val schema: Schema[IncreaseForkCounter.type] = Schema.singleton(IncreaseForkCounter) + val schemaCase: Schema.Case[StateChange, IncreaseForkCounter.type] = + Schema.Case[StateChange, IncreaseForkCounter.type]( + "IncreaseForkCounter", + schema, + _.asInstanceOf[IncreaseForkCounter.type], + _.asInstanceOf[StateChange], + _.isInstanceOf[IncreaseForkCounter.type] + ) + + override def name: String = "IncreaseForkCounter" } + private case object IncreaseTempVarCounter extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(tempVarCounter = state.tempVarCounter + 1) + + val schema: Schema[IncreaseTempVarCounter.type] = Schema.singleton(IncreaseTempVarCounter) + val schemaCase: Schema.Case[StateChange, IncreaseTempVarCounter.type] = + Schema.Case[StateChange, IncreaseTempVarCounter.type]( + "IncreaseTempVarCounter", + schema, + _.asInstanceOf[IncreaseTempVarCounter.type], + _.asInstanceOf[StateChange], + _.isInstanceOf[IncreaseTempVarCounter.type] + ) + + override def name: String = "IncreaseTempVarCounter" } + private final case class PushEnvironment(value: Remote[_]) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(envStack = value :: state.envStack) + + override def name: String = "PushEnvironment" } + private object PushEnvironment { + def schema: Schema[PushEnvironment] = + Remote.schemaAny.transform( + remote => PushEnvironment(remote.asInstanceOf[Remote[_]]), + _.value.asInstanceOf[Remote[Any]] + ) + + def schemaCase: Schema.Case[StateChange, PushEnvironment] = + Schema.Case( + "PushEnvironment", + schema, + _.asInstanceOf[PushEnvironment], + _.asInstanceOf[StateChange], + _.isInstanceOf[PushEnvironment] + ) + } + private final case object PopEnvironment extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(envStack = state.envStack.tail) + + val schema: Schema[PopEnvironment.type] = Schema.singleton(PopEnvironment) + val schemaCase: Schema.Case[StateChange, PopEnvironment.type] = + Schema.Case[StateChange, PopEnvironment.type]( + "PopEnvironment", + schema, + _.asInstanceOf[PopEnvironment.type], + _.asInstanceOf[StateChange], + _.isInstanceOf[PopEnvironment.type] + ) + + override def name: String = "PopEnvironment" } + private final case class AdvanceClock(atLeastTo: Timestamp) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(lastTimestamp = state.lastTimestamp.max(atLeastTo)) + + override def name: String = "AdvanceClock" } + private object AdvanceClock { + val schema: Schema[AdvanceClock] = + Schema[Timestamp].transform( + AdvanceClock(_), + _.atLeastTo + ) + + val schemaCase: Schema.Case[StateChange, AdvanceClock] = + Schema.Case( + "AdvanceClock", + schema, + _.asInstanceOf[AdvanceClock], + _.asInstanceOf[StateChange], + _.isInstanceOf[AdvanceClock] + ) + } + private case object Done extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(status = FlowStatus.Done) override def forcePersistence: Boolean = true + + val schema: Schema[Done.type] = Schema.singleton(Done) + val schemaCase: Schema.Case[StateChange, Done.type] = + Schema.Case[StateChange, Done.type]( + "Done", + schema, + _.asInstanceOf[Done.type], + _.asInstanceOf[StateChange], + _.isInstanceOf[Done.type] + ) + + override def name: String = "Done" } + private case class Resume(resetWatchedVariables: Boolean) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy( @@ -1785,18 +2017,66 @@ object PersistentExecutor { ) override def forcePersistence: Boolean = true + + override def name: String = "Resume" + } + private object Resume { + val schema: Schema[Resume] = + Schema[Boolean].transform( + Resume(_), + _.resetWatchedVariables + ) + val schemaCase: Schema.Case[StateChange, Resume] = + Schema.Case( + "Resume", + schema, + _.asInstanceOf[Resume], + _.asInstanceOf[StateChange], + _.isInstanceOf[Resume] + ) } + private case object Pause extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(status = FlowStatus.Paused) override def forcePersistence: Boolean = true + + val schema: Schema[Pause.type] = Schema.singleton(Pause) + val schemaCase: Schema.Case[StateChange, Pause.type] = + Schema.Case[StateChange, Pause.type]( + "Pause", + schema, + _.asInstanceOf[Pause.type], + _.asInstanceOf[StateChange], + _.isInstanceOf[Pause.type] + ) + + override def name: String = "Pause" } + private final case class UpdateWatchPosition(newWatchPosition: Index) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy( watchPosition = Index(Math.max(state.watchPosition, newWatchPosition)) ) + + override def name: String = "UpdateWatchPosition" + } + private object UpdateWatchPosition { + val schema: Schema[UpdateWatchPosition] = + Schema[Index].transform( + UpdateWatchPosition(_), + _.newWatchPosition + ) + val schemaCase: Schema.Case[StateChange, UpdateWatchPosition] = + Schema.Case( + "UpdateWatchPosition", + schema, + _.asInstanceOf[UpdateWatchPosition], + _.asInstanceOf[StateChange], + _.isInstanceOf[UpdateWatchPosition] + ) } private final case class UpdateCurrentExecutionTime(now: OffsetDateTime, executionStartedAt: OffsetDateTime) @@ -1805,9 +2085,28 @@ object PersistentExecutor { state.copy( currentExecutionTime = Duration.between(executionStartedAt, now) ) + + override def name: String = "UpdateCurrentExecutionTime" + } + private object UpdateCurrentExecutionTime { + val schema: Schema[UpdateCurrentExecutionTime] = + Schema + .tuple2[OffsetDateTime, OffsetDateTime] + .transform( + { case (now, executionStartedAt) => UpdateCurrentExecutionTime(now, executionStartedAt) }, + (x: UpdateCurrentExecutionTime) => (x.now, x.executionStartedAt) + ) + val schemaCase: Schema.Case[StateChange, UpdateCurrentExecutionTime] = + Schema.Case( + "UpdateCurrentExecutionTime", + schema, + _.asInstanceOf[UpdateCurrentExecutionTime], + _.asInstanceOf[StateChange], + _.isInstanceOf[UpdateCurrentExecutionTime] + ) } - private final case class RecordAccessedVariables(variables: Seq[(RemoteVariableName, Option[Timestamp], Boolean)]) + private final case class RecordAccessedVariables(variables: Chunk[(RemoteVariableName, Option[Timestamp], Boolean)]) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.transactionStack match { @@ -1831,6 +2130,23 @@ object PersistentExecutor { case Nil => state } + + override def name: String = "RecordAccessedVariables" + } + private object RecordAccessedVariables { + val schema: Schema[RecordAccessedVariables] = + Schema[Chunk[(RemoteVariableName, Option[Timestamp], Boolean)]].transform( + RecordAccessedVariables(_), + _.variables + ) + val schemaCase: Schema.Case[StateChange, RecordAccessedVariables] = + Schema.Case( + "RecordAccessedVariables", + schema, + _.asInstanceOf[RecordAccessedVariables], + _.asInstanceOf[StateChange], + _.isInstanceOf[RecordAccessedVariables] + ) } private final case class RecordReadVariables(variables: Set[ScopedRemoteVariableName]) extends StateChange { @@ -1844,11 +2160,45 @@ object PersistentExecutor { case Nil => state } + + override def name: String = "RecordReadVariables" + } + private object RecordReadVariables { + val schema: Schema[RecordReadVariables] = + Schema[Set[ScopedRemoteVariableName]].transform( + RecordReadVariables(_), + _.variables + ) + val schemaCase: Schema.Case[StateChange, RecordReadVariables] = + Schema.Case( + "RecordReadVariables", + schema, + _.asInstanceOf[RecordReadVariables], + _.asInstanceOf[StateChange], + _.isInstanceOf[RecordReadVariables] + ) } private final case class UpdateLastTimestamp(timestamp: Timestamp) extends StateChange { override def apply[E, A](state: State[E, A]): State[E, A] = state.copy(lastTimestamp = timestamp) + + override def name: String = "UpdateLastTimestamp" + } + private object UpdateLastTimestamp { + val schema: Schema[UpdateLastTimestamp] = + Schema[Timestamp].transform( + UpdateLastTimestamp(_), + _.timestamp + ) + val schemaCase: Schema.Case[StateChange, UpdateLastTimestamp] = + Schema.Case( + "UpdateLastTimestamp", + schema, + _.asInstanceOf[UpdateLastTimestamp], + _.asInstanceOf[StateChange], + _.isInstanceOf[UpdateLastTimestamp] + ) } private case object ResetExecutionTime extends StateChange { @@ -1857,6 +2207,18 @@ object PersistentExecutor { totalExecutionTime = state.totalExecutionTime + state.currentExecutionTime, currentExecutionTime = Duration.ZERO ) + + val schema: Schema[ResetExecutionTime.type] = Schema.singleton(ResetExecutionTime) + val schemaCase: Schema.Case[StateChange, ResetExecutionTime.type] = + Schema.Case( + "ResetExecutionTime", + schema, + _.asInstanceOf[ResetExecutionTime.type], + _.asInstanceOf[StateChange], + _.isInstanceOf[ResetExecutionTime.type] + ) + + override def name: String = "ResetExecutionTime" } val none: StateChange = NoChange @@ -1882,13 +2244,45 @@ object PersistentExecutor { def updateWatchPosition(index: Index): StateChange = UpdateWatchPosition(index) def updateCurrentExecutionTime(now: OffsetDateTime, executionStartedAt: OffsetDateTime): StateChange = UpdateCurrentExecutionTime(now, executionStartedAt) - def recordAccessedVariables(variables: Seq[(RemoteVariableName, Option[Timestamp], Boolean)]): StateChange = + def recordAccessedVariables(variables: Chunk[(RemoteVariableName, Option[Timestamp], Boolean)]): StateChange = RecordAccessedVariables(variables) def recordReadVariables(variables: Set[ScopedRemoteVariableName]): StateChange = RecordReadVariables(variables) def updateLastTimestamp(timestamp: Timestamp): StateChange = UpdateLastTimestamp(timestamp) val resetExecutionTime: StateChange = ResetExecutionTime + + implicit val schema: Schema[StateChange] = + Schema.EnumN( + TypeId.parse("zio.flow.runtime.internal.PersistentExecutor.StateChange"), + CaseSet + .Cons( + NoChange.schemaCase, + CaseSet.Empty[StateChange]() + ) + .:+:(SetCurrent.schemaCase) + .:+:(PushContinuation.schemaCase) + .:+:(PopContinuation.schemaCase) + .:+:(AddCompensation.schemaCase) + .:+:(EnterTransaction.schemaCase) + .:+:(LeaveTransaction.schemaCase) + .:+:(RevertCurrentTransaction.schemaCase[Any]) + .:+:(RestartCurrentTransaction.schemaCase) + .:+:(IncreaseForkCounter.schemaCase) + .:+:(IncreaseTempVarCounter.schemaCase) + .:+:(PushEnvironment.schemaCase) + .:+:(PopEnvironment.schemaCase) + .:+:(AdvanceClock.schemaCase) + .:+:(Done.schemaCase) + .:+:(Resume.schemaCase) + .:+:(Pause.schemaCase) + .:+:(UpdateWatchPosition.schemaCase) + .:+:(UpdateCurrentExecutionTime.schemaCase) + .:+:(RecordAccessedVariables.schemaCase) + .:+:(RecordReadVariables.schemaCase) + .:+:(UpdateLastTimestamp.schemaCase) + .:+:(ResetExecutionTime.schemaCase) + ) } final case class FlowResult(result: DynamicValue, timestamp: Timestamp) @@ -1914,7 +2308,8 @@ object PersistentExecutor { startedAt: OffsetDateTime, suspendedAt: Option[OffsetDateTime], totalExecutionTime: Duration, - currentExecutionTime: Duration + currentExecutionTime: Duration, + journalIndex: Option[Index] ) { def currentEnvironment: Remote[_] = envStack.headOption.getOrElse( @@ -1940,7 +2335,7 @@ object PersistentExecutor { schemaAny.asInstanceOf[Schema[State[E, A]]] implicit lazy val schemaAny: Schema[State[Any, Any]] = - Schema.CaseClass18( + Schema.CaseClass19( TypeId.parse("zio.flow.runtime.internal.PersistentExecutor.State"), Schema .Field( @@ -2052,6 +2447,12 @@ object PersistentExecutor { get0 = _.currentExecutionTime, set0 = (a: State[Any, Any], b: Duration) => a.copy(currentExecutionTime = b) ), + Schema.Field( + "journalIndex", + Schema[Option[Index]], + get0 = _.journalIndex, + set0 = (a: State[Any, Any], b: Option[Index]) => a.copy(journalIndex = b) + ), ( id: ScopedFlowId, lastTimestamp: Timestamp, @@ -2070,7 +2471,8 @@ object PersistentExecutor { startedAt: OffsetDateTime, suspendedAt: Option[OffsetDateTime], totalExecutionTime: Duration, - currentExecutionTime: Duration + currentExecutionTime: Duration, + journalIndex: Option[Index] ) => State( id, @@ -2090,7 +2492,8 @@ object PersistentExecutor { startedAt, suspendedAt, totalExecutionTime, - currentExecutionTime + currentExecutionTime, + journalIndex ) ) } @@ -2172,6 +2575,7 @@ object PersistentExecutor { fiber: Fiber[Nothing, Unit], executionStartedAt: OffsetDateTime, state: Ref[State[_, _]], - messages: Queue[ExecutionCommand] + messages: Queue[ExecutionCommand], + persister: Persister ) } diff --git a/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/PersistentState.scala b/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/PersistentState.scala new file mode 100644 index 000000000..80bc5da8a --- /dev/null +++ b/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/PersistentState.scala @@ -0,0 +1,210 @@ +/* + * Copyright 2021-2023 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.flow.runtime.internal + +import zio.{Duration, IO, Ref, ZIO, ZLayer} +import zio.flow.FlowId +import zio.flow.runtime.IndexedStore.Index +import zio.flow.runtime.internal.PersistentExecutor.{FlowResult, StateChange} +import zio.flow.runtime.serialization.ExecutorBinaryCodecs +import zio.flow.runtime.{DurablePromise, ExecutorError, IndexedStore, KeyValueStore, PersisterConfig} +import zio.schema.DynamicValue + +import java.time.OffsetDateTime + +trait PersistentState { + + /** Loads a persisted executor state */ + def load(id: FlowId): IO[ExecutorError, Option[PersistentExecutor.State[_, _]]] + + /** + * Loads a persisted executor state, reusing an already read latest snapshot. + * + * This is an optimization to avoid reading the snapshot table twice in case + * it was already read by enumerating all running states. + */ + def loadUsingSnapshot( + id: FlowId, + snapshot: PersistentExecutor.State[_, _] + ): IO[ExecutorError, PersistentExecutor.State[_, _]] + + /** Creates a persister used to save state changes */ + def createPersister(id: FlowId): ZIO[Any, Nothing, Persister] + + /** Deletes all persisted state for a flow */ + def delete( + id: FlowId + ): ZIO[Any, ExecutorError, Option[DurablePromise[Either[ExecutorError, DynamicValue], FlowResult]]] +} + +object PersistentState { + final case class SnapshotOnly(kvStore: KeyValueStore, codecs: ExecutorBinaryCodecs) extends PersistentState { + override def load(id: FlowId): IO[ExecutorError, Option[PersistentExecutor.State[_, _]]] = + for { + _ <- ZIO.logInfo(s"Looking for persisted flow state $id") + key = id.toRaw + persistedState <- kvStore + .getLatest(Namespaces.workflowState, key, None) + .mapError(ExecutorError.KeyValueStoreError("getLatest", _)) + state <- persistedState match { + case Some(bytes) => + ZIO.logInfo(s"Using persisted state for $id (${bytes.size} bytes)") *> + ZIO + .fromEither(codecs.decode[PersistentExecutor.State[Any, Any]](bytes)) + .mapBoth(error => ExecutorError.DeserializationError(s"state of $id", error.message), Some(_)) + case None => ZIO.logInfo("No persisted state available").as(None) + } + } yield state + + override def loadUsingSnapshot( + id: FlowId, + snapshot: PersistentExecutor.State[_, _] + ): IO[ExecutorError, PersistentExecutor.State[_, _]] = + ZIO.succeed(snapshot) + + override def createPersister(id: FlowId): ZIO[Any, Nothing, Persister] = + ZIO.succeed(Persister.SnapshotOnly(id, kvStore, codecs)) + + override def delete( + id: FlowId + ): ZIO[Any, ExecutorError, Option[DurablePromise[Either[ExecutorError, DynamicValue], FlowResult]]] = + for { + _ <- ZIO.logInfo(s"Deleting persisted state $id") + state <- load(id) + resultPromise <- state match { + case Some(state) => + for { + _ <- kvStore + .delete(Namespaces.workflowState, id.toRaw, None) + .mapError(ExecutorError.KeyValueStoreError("delete", _)) + } yield Some(state.result) + case None => ZIO.none + } + } yield resultPromise + } + + final case class JournalAndSnapshot( + kvStore: KeyValueStore, + indexedStore: IndexedStore, + codecs: ExecutorBinaryCodecs, + afterEvery: Option[Int], + afterDuration: Option[Duration] + ) extends PersistentState { + override def load(id: FlowId): IO[ExecutorError, Option[PersistentExecutor.State[_, _]]] = + for { + _ <- ZIO.logInfo(s"Looking for persisted flow state snapshot $id") + key = id.toRaw + persistedState <- kvStore + .getLatest(Namespaces.workflowState, key, None) + .mapError(ExecutorError.KeyValueStoreError("getLatest", _)) + state <- persistedState match { + case Some(bytes) => + for { + _ <- ZIO.logInfo(s"Using persisted state snapshot for $id (${bytes.size} bytes)") + snapshotState <- + ZIO + .fromEither(codecs.decode[PersistentExecutor.State[Any, Any]](bytes)) + .mapError(error => ExecutorError.DeserializationError(s"state of $id", error.message)) + finalState <- loadUsingSnapshot(id, snapshotState) + } yield Some(finalState) + case None => ZIO.logInfo("No persisted state available").as(None) + } + } yield state + + override def loadUsingSnapshot( + id: FlowId, + snapshot: PersistentExecutor.State[_, _] + ): IO[ExecutorError, PersistentExecutor.State[_, _]] = { + val startIndex = snapshot.journalIndex.getOrElse(Index.initial) + ZIO.logInfo(s"Looking for journal entries starting from $startIndex") *> + indexedStore + .scan(Topics.journal(id), startIndex, Index.max) + .mapError(ExecutorError.IndexedStoreError("scan", _)) + .mapZIO { bytes => + ZIO + .fromEither(codecs.decode[StateChange](bytes)) + .mapError(decodeError => ExecutorError.DeserializationError(s"journal entry", decodeError.message)) + } + .runFold(snapshot) { (state, change) => + change(state) + } + } + + override def createPersister(id: FlowId): ZIO[Any, Nothing, Persister] = + for { + state <- Ref.make(Persister.JournalAndSnapshot.State(initial = true, 0, OffsetDateTime.MIN)) + } yield Persister.JournalAndSnapshot(id, kvStore, indexedStore, codecs, state, afterEvery, afterDuration) + + /** Deletes all persisted state for a flow */ + override def delete( + id: FlowId + ): ZIO[Any, ExecutorError, Option[DurablePromise[Either[ExecutorError, DynamicValue], FlowResult]]] = + for { + _ <- ZIO.logInfo(s"Deleting persisted state $id") + state <- load(id) + resultPromise <- state match { + case Some(state) => + for { + _ <- kvStore + .delete(Namespaces.workflowState, id.toRaw, None) + .mapError(ExecutorError.KeyValueStoreError("delete", _)) + _ <- indexedStore + .delete(Topics.journal(id)) + .mapError(ExecutorError.IndexedStoreError("delete", _)) + } yield Some(state.result) + case None => ZIO.none + } + } yield resultPromise + } + + def make( + keyValueStore: KeyValueStore, + indexedStore: IndexedStore, + codecs: ExecutorBinaryCodecs, + config: PersisterConfig + ): ZIO[Any, Nothing, PersistentState] = + config match { + case PersisterConfig.SnapshotsOnly => + ZIO.succeed(SnapshotOnly(keyValueStore, codecs)) + case PersisterConfig.PeriodicSnapshots(afterEvery, afterDuration) => + ZIO.succeed(JournalAndSnapshot(keyValueStore, indexedStore, codecs, afterEvery, afterDuration)) + } + + val any: ZLayer[PersistentState, Nothing, PersistentState] = ZLayer.service[PersistentState] + + val snapshotOnly: ZLayer[ExecutorBinaryCodecs with IndexedStore with KeyValueStore, Nothing, PersistentState] = + configured(PersisterConfig.SnapshotsOnly) + + def journalAndSnapshot( + afterEvery: Option[Int], + afterDuration: Option[Duration] + ): ZLayer[ExecutorBinaryCodecs with IndexedStore with KeyValueStore, Nothing, PersistentState] = + configured(PersisterConfig.PeriodicSnapshots(afterEvery, afterDuration)) + + def configured( + persisterConfig: PersisterConfig + ): ZLayer[ExecutorBinaryCodecs with IndexedStore with KeyValueStore, Nothing, PersistentState] = + ZLayer { + for { + keyValueStore <- ZIO.service[KeyValueStore] + indexedStore <- ZIO.service[IndexedStore] + codecs <- ZIO.service[ExecutorBinaryCodecs] + persistentState <- + make(keyValueStore, indexedStore, codecs, persisterConfig) + } yield persistentState + } +} diff --git a/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/Persister.scala b/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/Persister.scala new file mode 100644 index 000000000..623aa5257 --- /dev/null +++ b/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/Persister.scala @@ -0,0 +1,115 @@ +/* + * Copyright 2021-2023 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.flow.runtime.internal + +import zio.{Clock, Duration, NonEmptyChunk, Ref, ZIO} +import zio.flow.FlowId +import zio.flow.runtime.internal.PersistentExecutor.{State, StateChange} +import zio.flow.runtime.serialization.ExecutorBinaryCodecs +import zio.flow.runtime._ + +import java.time.OffsetDateTime + +trait Persister { + def saveStateChange[E, A]( + baseState: State[E, A], + changes: StateChange, + currentTimestamp: Timestamp + ): ZIO[Any, ExecutorError, State[E, A]] +} + +object Persister { + final case class SnapshotOnly(id: FlowId, kvStore: KeyValueStore, codecs: ExecutorBinaryCodecs) extends Persister { + override def saveStateChange[E, A]( + baseState: State[E, A], + changes: StateChange, + currentTimestamp: Timestamp + ): ZIO[Any, ExecutorError, State[E, A]] = + for { + updatedState <- ZIO.succeed(changes(baseState)) + persistedState = codecs.encode(updatedState.asInstanceOf[State[Any, Any]]) + key = id.toRaw + _ <- metrics.serializedFlowStateSize.update(persistedState.size) + _ <- kvStore + .put(Namespaces.workflowState, key, persistedState, currentTimestamp) + .mapError(ExecutorError.KeyValueStoreError("put", _)) + _ <- ZIO.logTrace(s"State of $id persisted") + } yield updatedState + } + + final case class JournalAndSnapshot( + id: FlowId, + kvStore: KeyValueStore, + indexedStore: IndexedStore, + codecs: ExecutorBinaryCodecs, + state: Ref[JournalAndSnapshot.State], + afterEvery: Option[Int], + afterDuration: Option[Duration] + ) extends Persister { + private val topicName = Topics.journal(id) + + override def saveStateChange[E, A]( + baseState: State[E, A], + changes: StateChange, + currentTimestamp: Timestamp + ): ZIO[Any, ExecutorError, State[E, A]] = + NonEmptyChunk.fromChunk(changes.toChunk) match { + case Some(changesChunk) => + for { + persisterState <- state.get + indices <- ZIO + .foreach(changesChunk) { entry => + val bytes = codecs.encode(entry) + metrics.serializedStateChangeSize(entry).update(bytes.size) *> + indexedStore + .put(topicName, bytes) + .mapError(ExecutorError.IndexedStoreError("put", _)) + } + newState <- ZIO.succeed(changes(baseState)) + newCount = persisterState.changeCount + changesChunk.size + now <- Clock.currentDateTime + doSnapshot = persisterState.initial || + afterEvery.exists(n => newCount > n) || + afterDuration.exists(d => persisterState.lastSnapshot.plus(d).isBefore(now)) + newPersisterState <- + if (doSnapshot) { + val lastIndex = indices.last + val newStateWithJournalIndex = newState.copy(journalIndex = Some(lastIndex)) + val persistedState = codecs.encode(newStateWithJournalIndex.asInstanceOf[State[Any, Any]]) + val key = id.toRaw + for { + _ <- metrics.serializedFlowStateSize.update(persistedState.size) + _ <- kvStore + .put(Namespaces.workflowState, key, persistedState, currentTimestamp) + .mapError(ExecutorError.KeyValueStoreError("put", _)) + _ <- ZIO.logTrace(s"Snapshot state of $id persisted with last journal index $lastIndex") + } yield persisterState.copy(initial = false, changeCount = 0, lastSnapshot = now) + } else { + ZIO.logTrace(s"Saved ${changesChunk.size} state changes, no snapshot") *> + ZIO.succeed(persisterState.copy(initial = false, changeCount = newCount)) + } + _ <- state.set(newPersisterState) + } yield newState + case None => + ZIO.succeed(baseState) + } + } + + object JournalAndSnapshot { + final case class State(initial: Boolean, changeCount: Int, lastSnapshot: OffsetDateTime) + } +} diff --git a/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/Topics.scala b/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/Topics.scala index 379d6bd54..a3d7db76e 100644 --- a/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/Topics.scala +++ b/zio-flow-runtime/src/main/scala/zio/flow/runtime/internal/Topics.scala @@ -24,4 +24,7 @@ object Topics { def variableChanges(flowId: FlowId): String = s"_zflow_variable_changes__${FlowId.unwrap(flowId)}" + + def journal(flowId: FlowId): String = + s"_zflow_journal__${FlowId.unwrap(flowId)}" } diff --git a/zio-flow-runtime/src/main/scala/zio/flow/runtime/metrics/package.scala b/zio-flow-runtime/src/main/scala/zio/flow/runtime/metrics/package.scala index b438cc58d..a96553544 100644 --- a/zio-flow-runtime/src/main/scala/zio/flow/runtime/metrics/package.scala +++ b/zio-flow-runtime/src/main/scala/zio/flow/runtime/metrics/package.scala @@ -17,6 +17,7 @@ package zio.flow.runtime import zio.flow.operation.http.HttpFailure +import zio.flow.runtime.internal.PersistentExecutor.StateChange import zio.http.Status import zio.{Duration, ZIOAspect} import zio.metrics.MetricKeyType.{Counter, Gauge, Histogram} @@ -85,6 +86,16 @@ package object metrics { ) .contramap((bytes: Int) => bytes.toDouble) + /** + * Histogram of the serialized workflow state changes (journal entries) in + * bytes + */ + def serializedStateChangeSize(stateChange: StateChange): Metric.Histogram[Int] = + Metric + .histogram("zioflow_state_change_size_bytes", Histogram.Boundaries.exponential(512.0, 2.0, 16)) + .tagged("kind", stateChange.name) + .contramap((bytes: Int) => bytes.toDouble) + /** * Counter increased when a remote variable is accessed. The access can be * read, write or delete and its kind depends on the scope it belongs. diff --git a/zio-flow-runtime/src/main/scala/zio/flow/runtime/serialization/package.scala b/zio-flow-runtime/src/main/scala/zio/flow/runtime/serialization/package.scala index e7742a9af..ab8e67eaa 100644 --- a/zio-flow-runtime/src/main/scala/zio/flow/runtime/serialization/package.scala +++ b/zio-flow-runtime/src/main/scala/zio/flow/runtime/serialization/package.scala @@ -14,6 +14,7 @@ package object serialization { FlowResult :: Either[Either[ExecutorError, DynamicValue], FlowResult] :: DynamicValue :: + PersistentExecutor.StateChange :: End ] diff --git a/zio-flow-runtime/src/test/scala/zio/flow/MockExecutors.scala b/zio-flow-runtime/src/test/scala/zio/flow/MockExecutors.scala index b2e4f7362..9046ab870 100644 --- a/zio-flow-runtime/src/test/scala/zio/flow/MockExecutors.scala +++ b/zio-flow-runtime/src/test/scala/zio/flow/MockExecutors.scala @@ -17,7 +17,7 @@ package zio.flow import zio.flow.mock.{MockedOperation, MockedOperationExecutor} -import zio.flow.runtime.internal.PersistentExecutor +import zio.flow.runtime.internal.{PersistentExecutor, PersistentState} import zio.flow.runtime.{DurableLog, KeyValueStore, ZFlowExecutor} import zio.{Duration, Scope, ZIO, ZLayer, durationInt} @@ -25,9 +25,10 @@ object MockExecutors { def persistent( mockedOperations: MockedOperation = MockedOperation.Empty, gcPeriod: Duration = 5.minutes - ): ZIO[Scope with DurableLog with KeyValueStore with Configuration, Nothing, ZFlowExecutor] = + ): ZIO[Scope with DurableLog with KeyValueStore with PersistentState with Configuration, Nothing, ZFlowExecutor] = MockedOperationExecutor.make(mockedOperations).flatMap { operationExecutor => - ((DurableLog.any ++ KeyValueStore.any ++ Configuration.any ++ ZLayer.succeed(operationExecutor) ++ ZLayer + ((DurableLog.any ++ KeyValueStore.any ++ PersistentState.any ++ Configuration.any ++ ZLayer + .succeed(operationExecutor) ++ ZLayer .succeed(zio.flow.runtime.serialization.json)) >>> PersistentExecutor .make(gcPeriod)).build.map(_.get[ZFlowExecutor]) diff --git a/zio-flow-runtime/src/test/scala/zio/flow/ZFlowAssertionSyntax.scala b/zio-flow-runtime/src/test/scala/zio/flow/ZFlowAssertionSyntax.scala index f53498d8c..bebaef56d 100644 --- a/zio-flow-runtime/src/test/scala/zio/flow/ZFlowAssertionSyntax.scala +++ b/zio-flow-runtime/src/test/scala/zio/flow/ZFlowAssertionSyntax.scala @@ -17,6 +17,7 @@ package zio.flow import zio.flow.mock.MockedOperation +import zio.flow.runtime.internal.PersistentState import zio.flow.runtime.{DurableLog, ExecutorError, KeyValueStore, ZFlowExecutor} import zio.schema.{DynamicValue, Schema} import zio.{Duration, Fiber, Scope, ZIO, durationInt} @@ -31,7 +32,7 @@ object ZFlowAssertionSyntax { )(implicit schemaA: Schema[A], schemaE: Schema[E] - ): ZIO[DurableLog with KeyValueStore with Configuration, E, A] = + ): ZIO[DurableLog with KeyValueStore with PersistentState with Configuration, E, A] = ZIO.scoped { submitTestPersistent(id, mock, gcPeriod).flatMap(_._2.join) } @@ -40,7 +41,11 @@ object ZFlowAssertionSyntax { implicit schemaA: Schema[A], schemaE: Schema[E] - ): ZIO[Scope with DurableLog with KeyValueStore with Configuration, E, (ZFlowExecutor, Fiber[E, A])] = + ): ZIO[ + Scope with DurableLog with KeyValueStore with PersistentState with Configuration, + E, + (ZFlowExecutor, Fiber[E, A]) + ] = MockExecutors.persistent(mock, gcPeriod).flatMap { executor => executor.restartAll().orDieWith(_.toException) *> executor.run(FlowId.unsafeMake(id), zflow).forkScoped.map(fiber => (executor, fiber)) @@ -50,7 +55,7 @@ object ZFlowAssertionSyntax { def evaluateTestStartAndPoll( id: String, waitBeforePoll: Duration - ): ZIO[DurableLog with KeyValueStore with Configuration, ExecutorError, Option[ + ): ZIO[DurableLog with KeyValueStore with PersistentState with Configuration, ExecutorError, Option[ Either[Either[ExecutorError, DynamicValue], DynamicValue] ]] = ZIO.scoped { diff --git a/zio-flow-runtime/src/test/scala/zio/flow/examples/GoodcoverUseCase.scala b/zio-flow-runtime/src/test/scala/zio/flow/examples/GoodcoverUseCase.scala index 21466cb84..ccabaa36e 100644 --- a/zio-flow-runtime/src/test/scala/zio/flow/examples/GoodcoverUseCase.scala +++ b/zio-flow-runtime/src/test/scala/zio/flow/examples/GoodcoverUseCase.scala @@ -4,6 +4,7 @@ import zio.{Duration, ZNothing, durationInt} import zio.flow.operation.http import zio.flow._ import zio.flow.mock.MockedOperation +import zio.flow.runtime.internal.PersistentState import zio.flow.runtime.internal.executor.PersistentExecutorBaseSpec import zio.flow.runtime.{DurableLog, IndexedStore, KeyValueStore} import zio.schema.{DeriveSchema, Schema} @@ -117,8 +118,10 @@ object GoodcoverUseCase extends PersistentExecutorBaseSpec { val policy: Remote[Policy] = Remote(Policy("DummyPolicy")) - override def flowSpec - : Spec[TestEnvironment with IndexedStore with DurableLog with KeyValueStore with Configuration, Any] = + override def flowSpec: Spec[ + TestEnvironment with IndexedStore with DurableLog with KeyValueStore with PersistentState with Configuration, + Any + ] = suite("End to end goodcover use-case performed by in-memory executor")( suite("PolicyClaimStatus")( testFlow("PolicyClaimStatus", periodicAdjustClock = Some(1.second)) { diff --git a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/CustomOperationExecutorSpec.scala b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/CustomOperationExecutorSpec.scala index 04db6f292..5157645aa 100644 --- a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/CustomOperationExecutorSpec.scala +++ b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/CustomOperationExecutorSpec.scala @@ -1,8 +1,8 @@ package zio.flow.runtime.internal.executor import zio.flow._ -import zio.flow.runtime.internal.PersistentExecutor -import zio.flow.runtime.{DurableLog, IndexedStore, KeyValueStore, ZFlowExecutor} +import zio.flow.runtime.internal.{PersistentExecutor, PersistentState} +import zio.flow.runtime.{DurableLog, KeyValueStore, ZFlowExecutor} import zio.schema.{DeriveSchema, DynamicValue, Schema, TypeId} import zio.test.{Spec, TestEnvironment, assertTrue} import zio.{Chunk, Queue, ZIO, ZLayer} @@ -45,7 +45,7 @@ object CustomOperationExecutorSpec extends PersistentExecutorBaseSpec { } override def flowSpec - : Spec[TestEnvironment with IndexedStore with DurableLog with KeyValueStore with Configuration, Any] = + : Spec[TestEnvironment with PersistentState with DurableLog with KeyValueStore with Configuration, Any] = suite("CustomOperationExecutorSpec")( test("flows can use custom operations") { for { @@ -54,7 +54,7 @@ object CustomOperationExecutorSpec extends PersistentExecutorBaseSpec { opExecutor: OperationExecutor = new CustomOperationExecutor(queue) _ <- ZFlowExecutor .run(FlowId("test1"), flow) - .provideSome[DurableLog with KeyValueStore with Configuration]( + .provideSome[DurableLog with KeyValueStore with PersistentState with Configuration]( PersistentExecutor.make(), ZLayer.succeed(opExecutor), ZLayer.succeed(zio.flow.runtime.serialization.json) diff --git a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/ExecutorApi.scala b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/ExecutorApi.scala index 641717976..07050cbb4 100644 --- a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/ExecutorApi.scala +++ b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/ExecutorApi.scala @@ -20,6 +20,7 @@ import zio._ import zio.flow.ZFlowAssertionSyntax.InMemoryZFlowAssertion import zio.flow._ import zio.flow.runtime._ +import zio.flow.runtime.internal.PersistentState import zio.schema.{DynamicValue, Schema} import zio.test.{Spec, TestClock, TestEnvironment, assertTrue} @@ -27,7 +28,7 @@ import java.util.concurrent.TimeUnit object ExecutorApi extends PersistentExecutorBaseSpec { override def flowSpec - : Spec[TestEnvironment with IndexedStore with DurableLog with KeyValueStore with Configuration, Any] = + : Spec[TestEnvironment with PersistentState with DurableLog with KeyValueStore with Configuration, Any] = suite("PersistentExecutor API")( test("poll successful result") { for { diff --git a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/GarbageCollectionSpec.scala b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/GarbageCollectionSpec.scala index 4e1424df2..59a4da778 100644 --- a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/GarbageCollectionSpec.scala +++ b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/GarbageCollectionSpec.scala @@ -23,8 +23,10 @@ import zio._ import zio.flow.runtime.{DurableLog, IndexedStore, KeyValueStore} object GarbageCollectionSpec extends PersistentExecutorBaseSpec { - override def flowSpec - : Spec[TestEnvironment with IndexedStore with DurableLog with KeyValueStore with Configuration, Any] = + override def flowSpec: Spec[ + TestEnvironment with IndexedStore with DurableLog with KeyValueStore with PersistentState with Configuration, + Any + ] = suite("Garbage Collection")( testGCFlow("Unused simple variable gets deleted") { break => for { diff --git a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/PersistentExecutorBaseSpec.scala b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/PersistentExecutorBaseSpec.scala index 0b2e89e3c..7270bfd9d 100644 --- a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/PersistentExecutorBaseSpec.scala +++ b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/PersistentExecutorBaseSpec.scala @@ -20,7 +20,7 @@ import zio.flow.ZFlowAssertionSyntax.InMemoryZFlowAssertion import zio.flow._ import zio.flow.mock.MockedOperation import zio.flow.runtime.internal._ -import zio.flow.runtime._ +import zio.flow.runtime.{serialization, _} import zio.schema.Schema import zio.test.{Live, Spec, TestClock, TestEnvironment, TestResult} import zio.{ @@ -53,18 +53,40 @@ trait PersistentExecutorBaseSpec extends ZIOFlowBaseSpec { private val counter = new AtomicInteger(0) protected val unit: Unit = () - def flowSpec: Spec[TestEnvironment with IndexedStore with DurableLog with KeyValueStore with Configuration, Any] + def flowSpec: Spec[ + TestEnvironment with IndexedStore with DurableLog with KeyValueStore with PersistentState with Configuration, + Any + ] override def spec: Spec[TestEnvironment with Scope, Any] = - flowSpec - .provideSome[TestEnvironment]( - IndexedStore.inMemory, - DurableLog.layer, - KeyValueStore.inMemory, - Configuration.inMemory, - Runtime.removeDefaultLoggers, - Runtime.addLogger(TestFlowLogger.filterLogLevel(_ >= LogLevel.Debug)) + suite("Persistent executor tests")( + suite("Snapshot only")( + flowSpec + .provideSome[TestEnvironment]( + IndexedStore.inMemory, + DurableLog.layer, + KeyValueStore.inMemory, + Configuration.inMemory, + PersistentState.snapshotOnly, + ZLayer.succeed(serialization.json), + Runtime.removeDefaultLoggers, + Runtime.addLogger(TestFlowLogger.filterLogLevel(_ >= LogLevel.Debug)) + ) + ), + suite("Journal and Snapshot")( + flowSpec + .provideSome[TestEnvironment]( + IndexedStore.inMemory, + DurableLog.layer, + KeyValueStore.inMemory, + Configuration.inMemory, + PersistentState.journalAndSnapshot(afterEvery = Some(100), afterDuration = None), + ZLayer.succeed(serialization.json), + Runtime.removeDefaultLoggers, + Runtime.addLogger(TestFlowLogger.filterLogLevel(_ >= LogLevel.Debug)) + ) ) + ) protected def testFlowAndLogsExit[E: Schema, A: Schema]( label: String, @@ -100,7 +122,9 @@ trait PersistentExecutorBaseSpec extends ZIOFlowBaseSpec { fiber <- flow .evaluateTestPersistent(wfId, mock, gcPeriod) - .provideSomeLayer[DurableLog with KeyValueStore with Configuration](Runtime.addLogger(logger)) + .provideSomeLayer[DurableLog with KeyValueStore with PersistentState with Configuration]( + Runtime.addLogger(logger) + ) .exit .fork flowResult <- periodicAdjustClock match { @@ -193,7 +217,9 @@ trait PersistentExecutorBaseSpec extends ZIOFlowBaseSpec { for { fiber1 <- finalFlow .evaluateTestPersistent(label) - .provideSomeLayer[DurableLog with KeyValueStore with Configuration](Runtime.addLogger(logger)) + .provideSomeLayer[DurableLog with KeyValueStore with PersistentState with Configuration]( + Runtime.addLogger(logger) + ) .fork _ <- ZIO.logDebug(s"Adjusting clock by 20s") _ <- TestClock.adjust(20.seconds) @@ -205,7 +231,9 @@ trait PersistentExecutorBaseSpec extends ZIOFlowBaseSpec { logLines1 <- logQueue.takeAll fiber2 <- finalFlow .evaluateTestPersistent(label) - .provideSomeLayer[DurableLog with KeyValueStore with Configuration](Runtime.addLogger(logger)) + .provideSomeLayer[DurableLog with KeyValueStore with PersistentState with Configuration]( + Runtime.addLogger(logger) + ) .fork _ <- ZIO.logDebug(s"Adjusting clock by 200s") _ <- TestClock.adjust(200.seconds) @@ -256,13 +284,14 @@ trait PersistentExecutorBaseSpec extends ZIOFlowBaseSpec { ZFlow.waitTill(Instant.ofEpochSecond(100L))) val finalFlow = flow(break) - ZIO.scoped[Live with DurableLog with KeyValueStore with Configuration] { + ZIO.scoped[Live with DurableLog with KeyValueStore with PersistentState with Configuration] { for { - pair <- finalFlow - .submitTestPersistent(label) - .provideSomeLayer[Scope with DurableLog with KeyValueStore with Configuration]( - Runtime.addLogger(logger) - ) + pair <- + finalFlow + .submitTestPersistent(label) + .provideSomeLayer[Scope with DurableLog with KeyValueStore with PersistentState with Configuration]( + Runtime.addLogger(logger) + ) (executor, fiber) = pair _ <- ZIO.logDebug(s"Adjusting clock by 20s") _ <- TestClock.adjust(20.seconds) diff --git a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/PersistentExecutorSpec.scala b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/PersistentExecutorSpec.scala index 8ac500d1a..28f1e957b 100644 --- a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/PersistentExecutorSpec.scala +++ b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/PersistentExecutorSpec.scala @@ -24,6 +24,7 @@ import zio.flow._ import zio.test.Assertion._ import zio.test._ import zio._ +import zio.flow.runtime.internal.PersistentState import java.util.concurrent.TimeUnit @@ -41,8 +42,10 @@ object PersistentExecutorSpec extends PersistentExecutorBaseSpec { compensate = Activity.compensateNotSupported ) - override def flowSpec - : Spec[TestEnvironment with IndexedStore with DurableLog with KeyValueStore with Configuration, Any] = + override def flowSpec: Spec[ + TestEnvironment with IndexedStore with DurableLog with KeyValueStore with PersistentState with Configuration, + Any + ] = suite("Operators in single run")( testFlow("succeed")(ZFlow.succeed(12)) { result => assertTrue(result == 12) diff --git a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/RestartedFlowsSpec.scala b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/RestartedFlowsSpec.scala index 90c5702af..a49dbf242 100644 --- a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/RestartedFlowsSpec.scala +++ b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/RestartedFlowsSpec.scala @@ -16,6 +16,7 @@ package zio.flow.runtime.internal.executor +import zio.flow.runtime.internal.PersistentState import zio.{Chunk, Duration, ZNothing, durationInt} import zio.flow.{ZFlow, _} import zio.flow.runtime.{DurableLog, IndexedStore, KeyValueStore} @@ -23,8 +24,10 @@ import zio.schema.{DeriveSchema, Schema} import zio.test._ object RestartedFlowsSpec extends PersistentExecutorBaseSpec { - override def flowSpec - : Spec[TestEnvironment with IndexedStore with DurableLog with KeyValueStore with Configuration, Any] = + override def flowSpec: Spec[ + TestEnvironment with IndexedStore with DurableLog with KeyValueStore with PersistentState with Configuration, + Any + ] = suite("Restarted flows")( testRestartFlowAndLogs("log-|-log") { break => for { diff --git a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/ZFlowScheduleSpec.scala b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/ZFlowScheduleSpec.scala index 1a798c25a..67f466db5 100644 --- a/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/ZFlowScheduleSpec.scala +++ b/zio-flow-runtime/src/test/scala/zio/flow/runtime/internal/executor/ZFlowScheduleSpec.scala @@ -17,6 +17,7 @@ package zio.flow.runtime.internal.executor import zio.flow._ +import zio.flow.runtime.internal.PersistentState import zio.flow.runtime.{DurableLog, IndexedStore, KeyValueStore} import zio.test.{Spec, TestEnvironment, assertTrue} import zio.{Duration, durationInt} @@ -24,8 +25,10 @@ import zio.{Duration, durationInt} import java.time.ZoneOffset object ZFlowScheduleSpec extends PersistentExecutorBaseSpec { - override def flowSpec - : Spec[TestEnvironment with IndexedStore with DurableLog with KeyValueStore with Configuration, Any] = + override def flowSpec: Spec[ + TestEnvironment with IndexedStore with DurableLog with KeyValueStore with PersistentState with Configuration, + Any + ] = suite("ZFlowSchedule")( suite("schedule")( testFlowAndLogs("fixed", periodicAdjustClock = Some(30.seconds)) { diff --git a/zio-flow-server/src/main/scala/zio/flow/server/Main.scala b/zio-flow-server/src/main/scala/zio/flow/server/Main.scala index 603728375..aa569e3ca 100644 --- a/zio-flow-server/src/main/scala/zio/flow/server/Main.scala +++ b/zio-flow-server/src/main/scala/zio/flow/server/Main.scala @@ -25,7 +25,7 @@ import zio.flow.Configuration import zio.flow.cassandra.{CassandraIndexedStore, CassandraKeyValueStore} import zio.flow.dynamodb.{DynamoDbIndexedStore, DynamoDbKeyValueStore} import zio.flow.rocksdb.{RocksDbIndexedStore, RocksDbKeyValueStore} -import zio.flow.runtime.internal.{DefaultOperationExecutor, PersistentExecutor} +import zio.flow.runtime.internal.{DefaultOperationExecutor, PersistentExecutor, PersistentState} import zio.flow.runtime.operation.http.HttpOperationPolicies import zio.flow.runtime.{DurableLog, IndexedStore, KeyValueStore, serialization} import zio.flow.server.ServerConfig.{BackendImplementation, SerializationFormat} @@ -119,6 +119,7 @@ object Main extends ZIOAppDefault { case SerializationFormat.Protobuf => ZLayer.succeed(serialization.protobuf) }, PersistentExecutor.make(config.gcPeriod), + PersistentState.configured(config.persisterConfig), Server.configured() ) diff --git a/zio-flow-server/src/main/scala/zio/flow/server/ServerConfig.scala b/zio-flow-server/src/main/scala/zio/flow/server/ServerConfig.scala index 649698170..d480140e6 100644 --- a/zio-flow-server/src/main/scala/zio/flow/server/ServerConfig.scala +++ b/zio-flow-server/src/main/scala/zio/flow/server/ServerConfig.scala @@ -4,9 +4,10 @@ import zio.aws.core.config.CommonAwsConfig import zio.aws.core.config.descriptors.commonAwsConfig import zio.aws.netty.NettyClientConfig import zio.aws.netty.descriptors.nettyClientConfig +import zio.flow.runtime.PersisterConfig import zio.flow.server.ServerConfig._ import zio.metrics.connectors.MetricsConfig -import zio.{Chunk, Config, Duration, LogLevel} +import zio.{Chunk, Config, Duration, LogLevel, durationInt} final case class ServerConfig( keyValueStore: BackendImplementation, @@ -16,7 +17,8 @@ final case class ServerConfig( gcPeriod: Duration, logLevel: LogLevel, commonAwsConfig: CommonAwsConfig, - awsNettyClientConfig: NettyClientConfig + awsNettyClientConfig: NettyClientConfig, + persisterConfig: PersisterConfig ) object ServerConfig { @@ -72,8 +74,11 @@ object ServerConfig { Config.duration("gc-period") ++ logLevelConfig.nested("log-level").withDefault(LogLevel.Info) ++ commonAwsConfig.nested("aws") ++ - nettyClientConfig.nested("aws-netty") - ).map { case (kvStore, ixStore, metrics, ser, gcPeriod, logLevel, aws, awsNetty) => - ServerConfig(kvStore, ixStore, metrics, ser, gcPeriod, logLevel, aws, awsNetty) + nettyClientConfig.nested("aws-netty") ++ + PersisterConfig.config + .nested("persister") + .withDefault(PersisterConfig.PeriodicSnapshots(afterEvery = Some(100), afterDuration = Some(1.minute))) + ).map { case (kvStore, ixStore, metrics, ser, gcPeriod, logLevel, aws, awsNetty, persisterConfig) => + ServerConfig(kvStore, ixStore, metrics, ser, gcPeriod, logLevel, aws, awsNetty, persisterConfig) } } diff --git a/zio-flow/shared/src/main/scala/zio/flow/runtime/ExecutorError.scala b/zio-flow/shared/src/main/scala/zio/flow/runtime/ExecutorError.scala index 895e55899..c8c4008e9 100644 --- a/zio-flow/shared/src/main/scala/zio/flow/runtime/ExecutorError.scala +++ b/zio-flow/shared/src/main/scala/zio/flow/runtime/ExecutorError.scala @@ -31,6 +31,8 @@ sealed trait ExecutorError { self => case ExecutorError.AwaitedFlowDied(flowId, reason) => s"Awaited flow ($flowId) died: ${reason.toMessage}" case ExecutorError.KeyValueStoreError(operation, reason) => s"Key-value store failure in $operation: ${reason.getMessage}" + case ExecutorError.IndexedStoreError(operation, reason) => + s"Indexed store failure in $operation: ${reason.getMessage}" case ExecutorError.LogError(error) => s"Durable log failure: ${error.toMessage}" case ExecutorError.VariableChangeLogFinished => "Variable change log finished unexpectedly" case ExecutorError.FlowDied => s"Could not evaluate ZFlow" @@ -60,6 +62,10 @@ object ExecutorError { final case class KeyValueStoreError(operation: String, reason: Throwable) extends ExecutorError { override val cause: Option[Throwable] = Some(reason) } + + final case class IndexedStoreError(operation: String, reason: Throwable) extends ExecutorError { + override val cause: Option[Throwable] = Some(reason) + } final case class LogError(error: DurableLogError) extends ExecutorError { override val cause: Option[Throwable] = error.cause }