From 5d0d88fdf6cdebeed7e1d47110e300893046b1dc Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Mon, 26 Feb 2024 23:34:13 -0800 Subject: [PATCH] scheduler improvements --- .../src/main/scala/kyo/scheduler/Flag.scala | 9 +++-- .../main/scala/kyo/scheduler/Scheduler.scala | 39 ++++++++++++++++++- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/kyo-core/jvm/src/main/scala/kyo/scheduler/Flag.scala b/kyo-core/jvm/src/main/scala/kyo/scheduler/Flag.scala index 5f1a31629..034c17a80 100644 --- a/kyo-core/jvm/src/main/scala/kyo/scheduler/Flag.scala +++ b/kyo-core/jvm/src/main/scala/kyo/scheduler/Flag.scala @@ -4,10 +4,11 @@ private object Flag: abstract class Reader[T]: def apply(s: String): T object Reader: - given Reader[Int] = Integer.parseInt(_) - given Reader[String] = identity(_) - given Reader[Long] = java.lang.Long.parseLong(_) - given Reader[Double] = java.lang.Double.parseDouble(_) + given Reader[Int] = Integer.parseInt(_) + given Reader[String] = identity(_) + given Reader[Long] = java.lang.Long.parseLong(_) + given Reader[Double] = java.lang.Double.parseDouble(_) + given Reader[Boolean] = java.lang.Boolean.parseBoolean(_) given listReader[T](using r: Reader[T]): Reader[List[T]] = (s: String) => s.split(",").toList.map(r(_)) end Reader diff --git a/kyo-core/jvm/src/main/scala/kyo/scheduler/Scheduler.scala b/kyo-core/jvm/src/main/scala/kyo/scheduler/Scheduler.scala index 50e2b76b2..cb34abe36 100644 --- a/kyo-core/jvm/src/main/scala/kyo/scheduler/Scheduler.scala +++ b/kyo-core/jvm/src/main/scala/kyo/scheduler/Scheduler.scala @@ -1,6 +1,7 @@ package kyo.scheduler import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import kyo.* import org.jctools.queues.MpmcUnboundedXaddArrayQueue @@ -8,9 +9,11 @@ import scala.annotation.tailrec private[kyo] object Scheduler: + private val printStatus = Flag("printStatus", false) + private val coreWorkers = Flag( "coreWorkers", - Math.ceil(Runtime.getRuntime().availableProcessors().toDouble / 2).intValue() + Math.min(1, Math.ceil(Runtime.getRuntime().availableProcessors().toDouble / 2).intValue()) ) @volatile @@ -21,8 +24,18 @@ private[kyo] object Scheduler: private val pool = Executors.newCachedThreadPool(Threads("kyo-worker", new Worker(_))) startWorkers() + Coordinator.load() + if printStatus then + discard(Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( + (() => println(status())): Runnable, + 1, + 1, + TimeUnit.SECONDS + )) + end if + def removeWorker(): Unit = if concurrencyLimit > coreWorkers then concurrencyLimit = Math.max(1, concurrency.get() - 1) @@ -67,6 +80,8 @@ private[kyo] object Scheduler: end schedule def steal(thief: Worker): IOTask[?] = + if Worker.all.size() <= 1 then + return null // p2c load stealing var r: IOTask[?] = null var w0: Worker = randomWorker(thief) @@ -116,6 +131,28 @@ private[kyo] object Scheduler: w end randomWorker + def status(): String = + val sb = new StringBuilder + + sb.append("===== Kyo Scheduler =====\n") + sb.append(f"${"Load"}%-8s ${"Workers"}%-8s ${"Limit"}%-8s\n") + sb.append(f"${loadAvg()}%-8.2f ${concurrency.get()}%-8d ${concurrencyLimit}%-8d\n") + sb.append("\n") + + sb.append( + f"${"Thread"}%-20s ${"Load"}%-5s ${"State"}%-15s ${"Frame"}%-30s\n" + ) + + Worker.all.iterator().forEachRemaining { worker => + sb.append( + f"${worker.getName}%-20s ${worker.load()}%-5.2f ${worker.getState}%-15s ${worker.getStackTrace()(0)}%-30s\n" + ) + } + sb.append("=========================\n") + + sb.toString() + end status + override def toString = s"Scheduler(loadAvg=${loadAvg()},concurrency=$concurrency,limit=$concurrencyLimit)" end Scheduler