Skip to content

Commit

Permalink
finagle-netty4: Track burstiness of cpu usage by netty threads
Browse files Browse the repository at this point in the history
Problem
We need to know the burstiness of cpu utilization, pending tasks and registered channels within every metric collection interval

Solution
Add these metrics:
- cpu-util shows the distribution of cpu utilization within each minutely interval
- pending_tasks shows the distribution of the length of pending tasks within each minutely interval
- all_sockets - shows the distribution of registered sockets

Differential Revision: https://phabricator.twitter.biz/D1177719
  • Loading branch information
mbezoyan authored and jenkins committed Oct 18, 2024
1 parent 6818439 commit 8e36a30
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Runtime Behavior Changes
Added README in integration tests noting that this must exist for integration tests to run. ``PHAB_ID=D1152235``
* finagle-netty4: `EventLoopGroupTracker` (previously named `EventLoopGroupExecutionDelayTracker`) now collects
stats cpu_time_ms and active_sockets per netty worker thread.
* finagle-netty4: `EventLoopGroupTracker` now collects the distribution of cpu utilization by each netty thread
and all_sockets instead of active_sockets. ``PHAB_ID=D1177719``


New Features
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.twitter.finagle.netty4.threading

import com.twitter.finagle.stats.HistogramFormat
import com.twitter.finagle.stats.MetricBuilder
import com.twitter.finagle.stats.MetricBuilder.HistogramType
import com.twitter.finagle.stats.Stat
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.logging.Logger
Expand Down Expand Up @@ -51,11 +54,35 @@ private[threading] class EventLoopGroupTrackingRunnable(
private[this] val threadMXBean = ManagementFactory.getThreadMXBean

private[this] val scopedStatsReceiver = statsReceiver.scope(threadName)
private[this] val activeSocketsStat = scopedStatsReceiver.stat("active_sockets")
private[this] val pendingTasksStat = scopedStatsReceiver.stat(
MetricBuilder(
metricType = HistogramType,
name = Seq("pending_tasks"),
percentiles = Array[Double](0.25, 0.50, 0.75, 0.90, 0.95, 0.99),
histogramFormat = HistogramFormat.FullSummary
)
)
private[this] val allSocketsStat = scopedStatsReceiver.stat(
MetricBuilder(
metricType = HistogramType,
name = Seq("all_sockets"),
percentiles = Array[Double](0.50),
histogramFormat = HistogramFormat.FullSummary
)
)
private[this] val cpuTimeCounter = scopedStatsReceiver.counter("cpu_time_ms")
private[this] val cpuUtilStat = scopedStatsReceiver.stat(
MetricBuilder(
metricType = HistogramType,
name = Seq("cpu_util"),
percentiles = Array[Double](0.25, 0.50, 0.75, 0.90, 0.95, 0.99),
histogramFormat = HistogramFormat.FullSummary
)
)

// Accessed only from within the same netty thread
private[this] var prevCPUTimeMs = 0L
private[this] var prevCPUTimeNs = 0L
private[this] var prevWallTimeNs = 0L

setWatchTask()
executor.scheduleWithFixedDelay(
Expand All @@ -71,31 +98,37 @@ private[threading] class EventLoopGroupTrackingRunnable(
watchTask.get.cancel(false)
}

val executionDelay = Time.now - scheduledExecutionTime
val now = Time.now
val executionDelay = now - scheduledExecutionTime
if (threadDumpEnabled && executionDelay.inMillis > threadDumpThreshold.inMillis) {
dumpLogger.warning(
s"THREAD: $threadName EXECUTION DELAY is greater than ${threadDumpThreshold.inMillis}ms, was ${executionDelay.inMillis}ms"
)
}

delayStat.add(executionDelay.inMillis)
scheduledExecutionTime = Time.now.plus(taskTrackingPeriod)
scheduledExecutionTime = now.plus(taskTrackingPeriod)
setWatchTask()

var numActiveSockets = 0
// This will be nio event loop or epoll event loop.
executor.asInstanceOf[SingleThreadEventLoop].registeredChannelsIterator().forEachRemaining {
channel =>
if (channel.isActive) {
numActiveSockets += 1
}
}
activeSocketsStat.add(numActiveSockets)
val loop = executor.asInstanceOf[SingleThreadEventLoop]
allSocketsStat.add(loop.registeredChannels())
pendingTasksStat.add(loop.pendingTasks())

// `getThreadCPUTime` returns the time in nanoseconds.
val currentCPUTimeMs = threadMXBean.getThreadCpuTime(threadId) / 1000000
cpuTimeCounter.incr(currentCPUTimeMs - prevCPUTimeMs)
prevCPUTimeMs = currentCPUTimeMs
val currCPUTimeNs = threadMXBean.getThreadCpuTime(threadId)
val cpuTime = currCPUTimeNs - prevCPUTimeNs

val currWallTimeNs = System.nanoTime()
val wallTimeNs = currWallTimeNs - prevWallTimeNs
cpuTimeCounter.incr(TimeUnit.NANOSECONDS.toMillis(cpuTime))
if (prevWallTimeNs != 0 && wallTimeNs != 0) {
cpuUtilStat.add(
10000 * cpuTime / wallTimeNs
)
}
prevCPUTimeNs = currCPUTimeNs
prevWallTimeNs = currWallTimeNs
}

private[this] def setWatchTask(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class EventLoopGroupTrackerTest
.get(Seq("finagle_thread_delay_tracking_test-1", "cpu_time_ms")).isDefined)
assert(
statsReceiver.stats
.get(Seq("finagle_thread_delay_tracking_test-1", "active_sockets")).isDefined)
.get(Seq("finagle_thread_delay_tracking_test-1", "all_sockets")).isDefined)

// we should have no threads with the name no_threads_expected
Thread.getAllStackTraces.keySet().asScala.foreach { thread: Thread =>
Expand Down

0 comments on commit 8e36a30

Please sign in to comment.