Skip to content

Commit

Permalink
scheduler improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Feb 27, 2024
1 parent b09f399 commit 5d0d88f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
9 changes: 5 additions & 4 deletions kyo-core/jvm/src/main/scala/kyo/scheduler/Flag.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ private object Flag:
abstract class Reader[T]:
def apply(s: String): T
object Reader:
given Reader[Int] = Integer.parseInt(_)
given Reader[String] = identity(_)
given Reader[Long] = java.lang.Long.parseLong(_)
given Reader[Double] = java.lang.Double.parseDouble(_)
given Reader[Int] = Integer.parseInt(_)
given Reader[String] = identity(_)
given Reader[Long] = java.lang.Long.parseLong(_)
given Reader[Double] = java.lang.Double.parseDouble(_)
given Reader[Boolean] = java.lang.Boolean.parseBoolean(_)
given listReader[T](using r: Reader[T]): Reader[List[T]] =
(s: String) => s.split(",").toList.map(r(_))
end Reader
Expand Down
39 changes: 38 additions & 1 deletion kyo-core/jvm/src/main/scala/kyo/scheduler/Scheduler.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package kyo.scheduler

import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kyo.*
import org.jctools.queues.MpmcUnboundedXaddArrayQueue
import scala.annotation.tailrec

private[kyo] object Scheduler:

private val printStatus = Flag("printStatus", false)

private val coreWorkers = Flag(
"coreWorkers",
Math.ceil(Runtime.getRuntime().availableProcessors().toDouble / 2).intValue()
Math.min(1, Math.ceil(Runtime.getRuntime().availableProcessors().toDouble / 2).intValue())
)

@volatile
Expand All @@ -21,8 +24,18 @@ private[kyo] object Scheduler:
private val pool = Executors.newCachedThreadPool(Threads("kyo-worker", new Worker(_)))

startWorkers()

Coordinator.load()

if printStatus then
discard(Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
(() => println(status())): Runnable,
1,
1,
TimeUnit.SECONDS
))
end if

def removeWorker(): Unit =
if concurrencyLimit > coreWorkers then
concurrencyLimit = Math.max(1, concurrency.get() - 1)
Expand Down Expand Up @@ -67,6 +80,8 @@ private[kyo] object Scheduler:
end schedule

def steal(thief: Worker): IOTask[?] =
if Worker.all.size() <= 1 then
return null
// p2c load stealing
var r: IOTask[?] = null
var w0: Worker = randomWorker(thief)
Expand Down Expand Up @@ -116,6 +131,28 @@ private[kyo] object Scheduler:
w
end randomWorker

def status(): String =
val sb = new StringBuilder

sb.append("===== Kyo Scheduler =====\n")
sb.append(f"${"Load"}%-8s ${"Workers"}%-8s ${"Limit"}%-8s\n")
sb.append(f"${loadAvg()}%-8.2f ${concurrency.get()}%-8d ${concurrencyLimit}%-8d\n")
sb.append("\n")

sb.append(
f"${"Thread"}%-20s ${"Load"}%-5s ${"State"}%-15s ${"Frame"}%-30s\n"
)

Worker.all.iterator().forEachRemaining { worker =>
sb.append(
f"${worker.getName}%-20s ${worker.load()}%-5.2f ${worker.getState}%-15s ${worker.getStackTrace()(0)}%-30s\n"
)
}
sb.append("=========================\n")

sb.toString()
end status

override def toString =
s"Scheduler(loadAvg=${loadAvg()},concurrency=$concurrency,limit=$concurrencyLimit)"
end Scheduler

0 comments on commit 5d0d88f

Please sign in to comment.