Skip to content

Commit

Permalink
[stm][bench] Ref log optimization + benchmarks (#885)
Browse files Browse the repository at this point in the history
Introduces an initial set of benchmarks for the STM module and optimizes
ref log usage. I was surprised by how well optimized ZIO's STM is :)

[Benchmark
results](https://jmh.morethan.io/?source=https://gist.githubusercontent.com/fwbrasil/308dc726e9a4f5913170d7d49f7a2dcf/raw/19d284bd0fff6556f83a408db993d4f814b56a89/jmh-result.json):


![image](https://github.com/user-attachments/assets/bd9de6ea-ef6a-4cdd-bb75-4868df583a26)
  • Loading branch information
fwbrasil authored Dec 3, 2024
1 parent 310b168 commit 9b435a9
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 55 deletions.
44 changes: 23 additions & 21 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ lazy val `kyo-bench` =
.enablePlugins(JmhPlugin)
.dependsOn(`kyo-core`)
.dependsOn(`kyo-sttp`)
.dependsOn(`kyo-stm`)
.dependsOn(`kyo-scheduler-zio`)
.disablePlugins(MimaPlugin)
.settings(
Expand Down Expand Up @@ -484,27 +485,28 @@ lazy val `kyo-bench` =
)
}
},
libraryDependencies += "dev.zio" %% "izumi-reflect" % "2.3.10",
libraryDependencies += "org.typelevel" %% "cats-effect" % catsVersion,
libraryDependencies += "org.typelevel" %% "log4cats-core" % "2.7.0",
libraryDependencies += "org.typelevel" %% "log4cats-slf4j" % "2.7.0",
libraryDependencies += "org.typelevel" %% "cats-mtl" % "1.5.0",
libraryDependencies += "org.typelevel" %% "cats-mtl" % "1.5.0",
libraryDependencies += "com.47deg" %% "fetch" % "3.1.2",
libraryDependencies += "dev.zio" %% "zio-logging" % "2.4.0",
libraryDependencies += "dev.zio" %% "zio-logging-slf4j2" % "2.4.0",
libraryDependencies += "dev.zio" %% "zio" % zioVersion,
libraryDependencies += "dev.zio" %% "zio-concurrent" % zioVersion,
libraryDependencies += "dev.zio" %% "zio-query" % "0.7.6",
libraryDependencies += "dev.zio" %% "zio-prelude" % "1.0.0-RC35",
libraryDependencies += "com.softwaremill.ox" %% "core" % "0.0.25",
libraryDependencies += "co.fs2" %% "fs2-core" % "3.11.0",
libraryDependencies += "org.http4s" %% "http4s-ember-client" % "0.23.29",
libraryDependencies += "org.http4s" %% "http4s-dsl" % "0.23.29",
libraryDependencies += "dev.zio" %% "zio-http" % "3.0.1",
libraryDependencies += "io.vertx" % "vertx-core" % "5.0.0.CR2",
libraryDependencies += "io.vertx" % "vertx-web" % "5.0.0.CR2",
libraryDependencies += "org.scalatest" %% "scalatest" % scalaTestVersion % Test
libraryDependencies += "dev.zio" %% "izumi-reflect" % "2.3.10",
libraryDependencies += "org.typelevel" %% "cats-effect" % catsVersion,
libraryDependencies += "org.typelevel" %% "log4cats-core" % "2.7.0",
libraryDependencies += "org.typelevel" %% "log4cats-slf4j" % "2.7.0",
libraryDependencies += "org.typelevel" %% "cats-mtl" % "1.5.0",
libraryDependencies += "org.typelevel" %% "cats-mtl" % "1.5.0",
libraryDependencies += "io.github.timwspence" %% "cats-stm" % "0.13.5",
libraryDependencies += "com.47deg" %% "fetch" % "3.1.2",
libraryDependencies += "dev.zio" %% "zio-logging" % "2.4.0",
libraryDependencies += "dev.zio" %% "zio-logging-slf4j2" % "2.4.0",
libraryDependencies += "dev.zio" %% "zio" % zioVersion,
libraryDependencies += "dev.zio" %% "zio-concurrent" % zioVersion,
libraryDependencies += "dev.zio" %% "zio-query" % "0.7.6",
libraryDependencies += "dev.zio" %% "zio-prelude" % "1.0.0-RC35",
libraryDependencies += "com.softwaremill.ox" %% "core" % "0.0.25",
libraryDependencies += "co.fs2" %% "fs2-core" % "3.11.0",
libraryDependencies += "org.http4s" %% "http4s-ember-client" % "0.23.29",
libraryDependencies += "org.http4s" %% "http4s-dsl" % "0.23.29",
libraryDependencies += "dev.zio" %% "zio-http" % "3.0.1",
libraryDependencies += "io.vertx" % "vertx-core" % "5.0.0.CR2",
libraryDependencies += "io.vertx" % "vertx-web" % "5.0.0.CR2",
libraryDependencies += "org.scalatest" %% "scalatest" % scalaTestVersion % Test
)

lazy val rewriteReadmeFile = taskKey[Unit]("Rewrite README file")
Expand Down
72 changes: 72 additions & 0 deletions kyo-bench/src/main/scala/kyo/bench/TMapMultiKeyBench.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package kyo.bench

class TMapMultiKeyBench(parallelism: Int) extends Bench.ForkOnly(parallelism):

def this() = this(Runtime.getRuntime().availableProcessors() * 2)

def catsBench() =
import cats.effect.*
import cats.syntax.all.*
import io.github.timwspence.cats.stm.*

STM.runtime[IO].flatMap { stm =>
for
ref <- stm.commit(stm.TVar.of(Map.empty[Int, Int]))
_ <-
(0 until parallelism).map { i =>
stm.commit {
for
map <- ref.get
_ <- ref.set(map.updated(i, map.getOrElse(i, 0) + 1))
yield ()
}
}.toList.parSequence_
results <- stm.commit(ref.get.map(_.values.sum))
yield results
}
end catsBench

override def kyoBenchFiber() =
import kyo.*

for
map <- TMap.initNow[Int, Int]()
_ <-
Async.parallelUnbounded(
(0 until parallelism).map { i =>
STM.run {
for
current <- map.get(i)
_ <- map.put(i, current.getOrElse(0) + 1)
yield ()
}
}
)
results <- STM.run(map.snapshot)
yield results.values.sum
end for
end kyoBenchFiber

def zioBench() =
import zio.*
import zio.stm.*

for
map <- TMap.empty[Int, Int].commit
_ <-
ZIO.collectAllParDiscard(
(0 until parallelism).map { i =>
STM.atomically {
for
current <- map.get(i)
_ <- map.put(i, current.getOrElse(0) + 1)
yield ()
}
}
)
results <- map.toMap.commit
yield results.values.sum
end for
end zioBench

end TMapMultiKeyBench
70 changes: 70 additions & 0 deletions kyo-bench/src/main/scala/kyo/bench/TMapSingleKeyBench.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kyo.bench

class TMapSingleKeyBench(parallelism: Int) extends Bench.ForkOnly(parallelism):

def this() = this(Runtime.getRuntime().availableProcessors() * 2)

def catsBench() =
import cats.effect.*
import cats.syntax.all.*
import io.github.timwspence.cats.stm.*

STM.runtime[IO].flatMap { stm =>
for
ref <- stm.commit(stm.TVar.of(Map.empty[Int, Int]))
_ <- Seq.fill(parallelism)(
stm.commit {
for
map <- ref.get
current = map.getOrElse(0, 0)
_ <- ref.set(map.updated(0, current + 1))
yield ()
}
).parSequence_
result <- stm.commit(ref.get.map(_.getOrElse(0, 0)))
yield result
}
end catsBench

override def kyoBenchFiber() =
import kyo.*

for
map <- TMap.initNow[Int, Int]()
_ <- Async.parallelUnbounded(
Seq.fill(parallelism)(
STM.run {
for
current <- map.get(0)
_ <- map.put(0, current.getOrElse(0) + 1)
yield ()
}
)
)
result <- STM.run(map.get(0))
yield result.getOrElse(0)
end for
end kyoBenchFiber

def zioBench() =
import zio.*
import zio.stm.*

for
map <- TMap.empty[Int, Int].commit
_ <- ZIO.collectAllParDiscard(
Seq.fill(parallelism)(
STM.atomically {
for
current <- map.get(0)
_ <- map.put(0, current.getOrElse(0) + 1)
yield ()
}
)
)
result <- map.get(0).commit
yield result.getOrElse(0)
end for
end zioBench

end TMapSingleKeyBench
45 changes: 45 additions & 0 deletions kyo-bench/src/main/scala/kyo/bench/TRefMultiBench.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package kyo.bench

import java.util.concurrent.locks.LockSupport

class TRefMultiBench(parallelism: Int) extends Bench.ForkOnly(parallelism):

def this() = this(Runtime.getRuntime().availableProcessors() * 2)

def catsBench() =
import cats.effect.*
import cats.syntax.all.*
import io.github.timwspence.cats.stm.*

STM.runtime[IO].flatMap { stm =>
for
refs <- stm.commit(Seq.fill(parallelism)(stm.TVar.of(0)).sequence)
_ <- refs.map(ref => stm.commit(ref.modify(_ + 1))).parSequence_
result <- stm.commit(refs.traverse(_.get).map(_.sum))
yield result
}
end catsBench

override def kyoBenchFiber() =
import kyo.*

for
refs <- Kyo.fill(parallelism)(TRef.initNow(0))
_ <- Async.parallelUnbounded(refs.map(ref => STM.run(ref.update(_ + 1))))
result <- STM.run(Kyo.foreach(refs)(_.get).map(_.sum))
yield result
end for
end kyoBenchFiber

def zioBench() =
import zio.*
import zio.stm.*

for
refs <- ZIO.collectAll(Seq.fill(parallelism)(TRef.make(0).commit))
_ <- ZIO.collectAllParDiscard(refs.map(_.update(_ + 1).commit))
result <- STM.collectAll(refs.map(_.get)).map(_.sum).commit
yield result
end for
end zioBench
end TRefMultiBench
47 changes: 47 additions & 0 deletions kyo-bench/src/main/scala/kyo/bench/TRefSingleBench.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package kyo.bench

import java.util.concurrent.locks.LockSupport

class TRefSingleBench(parallelism: Int) extends Bench.ForkOnly(parallelism):

def this() = this(Runtime.getRuntime().availableProcessors() * 2)

def catsBench() =
import cats.effect.*
import cats.syntax.all.*
import io.github.timwspence.cats.stm.*

STM.runtime[IO].flatMap { stm =>
for
ref <- stm.commit(stm.TVar.of(0))
_ <- Seq.fill(parallelism)(
stm.commit(ref.modify(_ + 1))
).parSequence_
result <- stm.commit(ref.get)
yield result
}
end catsBench

override def kyoBenchFiber() =
import kyo.*

for
ref <- TRef.initNow(0)
_ <- Async.parallelUnbounded(Seq.fill(parallelism)(STM.run(ref.update(_ + 1))))
result <- STM.run(ref.get)
yield result
end for
end kyoBenchFiber

def zioBench() =
import zio.*
import zio.stm.*

for
ref <- TRef.make(0).commit
_ <- ZIO.collectAllParDiscard(Seq.fill(parallelism)(ref.update(_ + 1).commit))
result <- ref.get.commit
yield result
end for
end zioBench
end TRefSingleBench
6 changes: 3 additions & 3 deletions kyo-core/shared/src/test/scala/kyo/ClockTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class ClockTest extends Test:
fiber <- clock.sleep(5.millis)
_ <- fiber.get
elapsed <- stopwatch.elapsed
yield assert(elapsed >= 3.millis && elapsed < 20.millis)
yield assert(elapsed >= 3.millis && elapsed < 100.millis)
}

"multiple sequential sleeps" in run {
Expand All @@ -249,8 +249,8 @@ class ClockTest extends Test:
_ <- fiber2.get
end <- stopwatch.elapsed
yield
assert(mid >= 3.millis && mid < 30.millis)
assert(end >= 8.millis && end < 50.millis)
assert(mid >= 3.millis)
assert(end >= 8.millis)
}

"sleep with zero duration" in run {
Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/test/scala/kyo/FiberTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ class FiberTest extends Test:
fiber <- Fiber.gather(2)(Seq(
latch1.release.andThen(1),
latch2.release.andThen(2),
Async.delay(1.millis)(3)
Async.delay(50.millis)(3)
))
_ <- latch1.await
_ <- latch2.await
Expand Down
8 changes: 4 additions & 4 deletions kyo-stm/shared/src/main/scala/kyo/STM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ case class FailedTransaction(frame: Frame) extends Exception(frame.position.show
* fine-grained locking strategy means that transactions only conflict if they actually touch the same references, allowing for high
* concurrency when different transactions operate on different refs.
*/
opaque type STM <: (Var[RefLog] & Abort[FailedTransaction] & Async) =
Var[RefLog] & Abort[FailedTransaction] & Async
opaque type STM <: (Var[TRefLog] & Abort[FailedTransaction] & Async) =
Var[TRefLog] & Abort[FailedTransaction] & Async

object STM:

Expand Down Expand Up @@ -108,7 +108,7 @@ object STM:
// New transaction without a parent, use regular commit flow
Retry[FailedTransaction](retrySchedule) {
TID.useNew { tid =>
Var.runWith(RefLog.empty)(v) { (log, result) =>
TRefLog.runWith(v) { (log, result) =>
IO.Unsafe {
// Attempt to acquire locks and commit the transaction
val (locked, unlocked) =
Expand All @@ -135,7 +135,7 @@ object STM:
// Nested transaction inherits parent's transaction context but isolates RefLog.
// On success: changes propagate to parent. On failure: changes are rolled back
// without affecting parent's state.
val result = Var.isolate.update[RefLog].run(v)
val result = TRefLog.isolate(v)

// Can't return `result` directly since it has a pending STM effect
// but it's safe to cast because, if there's a parent transaction,
Expand Down
Loading

0 comments on commit 9b435a9

Please sign in to comment.