Skip to content

Commit

Permalink
Refactor spectator metrics
Browse files Browse the repository at this point in the history
__Problem__

Currently the listener does not expose metrics to specific events but instead does internal calculations and expose custom gauges. This is limiting as it gives less information.

__Modification__

Modified the metrics to represent the actual events.

__Result__

More flexible insights.
  • Loading branch information
NiteshKant committed Feb 7, 2017
1 parent be12144 commit c0b3387
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 699 deletions.

This file was deleted.

1 change: 0 additions & 1 deletion rxnetty-spectator-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ dependencies {
compile project(':rxnetty-http')
compile project(':rxnetty-spectator-tcp')
compile project(':rxnetty-common')
compile 'com.netflix.spectator:spectator-api:0.40.0'
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,32 @@

package io.reactivex.netty.spectator.http;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import io.reactivex.netty.protocol.http.client.events.HttpClientEventsListener;
import io.reactivex.netty.spectator.http.internal.ResponseCodesHolder;
import io.reactivex.netty.spectator.internal.LatencyMetrics;
import io.reactivex.netty.spectator.internal.EventMetric;
import io.reactivex.netty.spectator.tcp.TcpClientListener;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static io.reactivex.netty.spectator.internal.SpectatorUtils.*;

/**
* HttpClientListener.
*/
public class HttpClientListener extends HttpClientEventsListener {

private final AtomicInteger requestBacklog;
private final AtomicInteger inflightRequests;
private final Counter processedRequests;
private final Counter requestWriteFailed;
private final Counter failedResponses;
private final EventMetric requestWrite;
private final EventMetric requestProcessing;
private final EventMetric response;

private final ResponseCodesHolder responseCodesHolder;
private final LatencyMetrics requestWriteTimes;
private final LatencyMetrics responseReadTimes;
private final LatencyMetrics requestProcessingTimes;
private final TcpClientListener tcpDelegate;

public HttpClientListener(Registry registry, String monitorId) {
requestBacklog = newGauge(registry, "requestBacklog", monitorId, new AtomicInteger());
inflightRequests = newGauge(registry, "inflightRequests", monitorId, new AtomicInteger());
requestWriteTimes = new LatencyMetrics("requestWriteTimes", monitorId, registry);
responseReadTimes = new LatencyMetrics("responseReadTimes", monitorId, registry);
processedRequests = newCounter(registry, "processedRequests", monitorId);
requestWriteFailed = newCounter(registry, "requestWriteFailed", monitorId);
failedResponses = newCounter(registry, "failedResponses", monitorId);
requestProcessingTimes = new LatencyMetrics("requestProcessingTimes", monitorId, registry);
requestWrite = new EventMetric(registry, "request", monitorId, "action", "write");
requestProcessing = new EventMetric(registry, "request", monitorId, "action", "processing");
response = new EventMetric(registry, "response", monitorId, "action", "read");

responseCodesHolder = new ResponseCodesHolder(registry, monitorId);
tcpDelegate = new TcpClientListener(registry, monitorId);
}
Expand All @@ -63,49 +51,9 @@ public HttpClientListener(String monitorId) {
this(Spectator.globalRegistry(), monitorId);
}

public long getRequestBacklog() {
return requestBacklog.get();
}

public long getInflightRequests() {
return inflightRequests.get();
}

public long getProcessedRequests() {
return processedRequests.count();
}

public long getRequestWriteFailed() {
return requestWriteFailed.count();
}

public long getFailedResponses() {
return failedResponses.count();
}

public long getResponse1xx() {
return responseCodesHolder.getResponse1xx();
}

public long getResponse2xx() {
return responseCodesHolder.getResponse2xx();
}

public long getResponse3xx() {
return responseCodesHolder.getResponse3xx();
}

public long getResponse4xx() {
return responseCodesHolder.getResponse4xx();
}

public long getResponse5xx() {
return responseCodesHolder.getResponse5xx();
}

@Override
public void onRequestProcessingComplete(long duration, TimeUnit timeUnit) {
requestProcessingTimes.record(duration, timeUnit);
requestProcessing.success(duration, timeUnit);
}

@Override
Expand All @@ -115,38 +63,32 @@ public void onResponseHeadersReceived(int responseCode, long duration, TimeUnit

@Override
public void onResponseReceiveComplete(long duration, TimeUnit timeUnit) {
inflightRequests.decrementAndGet();
processedRequests.increment();
responseReadTimes.record(duration, timeUnit);
response.success(duration, timeUnit);
}

@Override
public void onRequestWriteStart() {
requestBacklog.decrementAndGet();
requestWrite.start();
}

@Override
public void onResponseFailed(Throwable throwable) {
inflightRequests.decrementAndGet();
processedRequests.increment();
failedResponses.increment();
response.failure();
}

@Override
public void onRequestWriteComplete(long duration, TimeUnit timeUnit) {
requestWriteTimes.record(duration, timeUnit);
requestWrite.success(duration, timeUnit);
}

@Override
public void onRequestWriteFailed(long duration, TimeUnit timeUnit, Throwable throwable) {
inflightRequests.decrementAndGet();
requestWriteFailed.increment();
requestWrite.failure(duration, timeUnit);
}

@Override
public void onRequestSubmitted() {
requestBacklog.incrementAndGet();
inflightRequests.incrementAndGet();
requestProcessing.start();
}

@Override
Expand Down Expand Up @@ -256,84 +198,4 @@ public void onConnectSuccess(long duration, TimeUnit timeUnit) {
public void onConnectStart() {
tcpDelegate.onConnectStart();
}

public long getLiveConnections() {
return tcpDelegate.getLiveConnections();
}

public long getConnectionCount() {
return tcpDelegate.getConnectionCount();
}

public long getPendingConnects() {
return tcpDelegate.getPendingConnects();
}

public long getFailedConnects() {
return tcpDelegate.getFailedConnects();
}

public long getPendingConnectionClose() {
return tcpDelegate.getPendingConnectionClose();
}

public long getFailedConnectionClose() {
return tcpDelegate.getFailedConnectionClose();
}

public long getPendingPoolAcquires() {
return tcpDelegate.getPendingPoolAcquires();
}

public long getFailedPoolAcquires() {
return tcpDelegate.getFailedPoolAcquires();
}

public long getPendingPoolReleases() {
return tcpDelegate.getPendingPoolReleases();
}

public long getFailedPoolReleases() {
return tcpDelegate.getFailedPoolReleases();
}

public long getPoolEvictions() {
return tcpDelegate.getPoolEvictions();
}

public long getPoolReuse() {
return tcpDelegate.getPoolReuse();
}

public long getPendingWrites() {
return tcpDelegate.getPendingWrites();
}

public long getPendingFlushes() {
return tcpDelegate.getPendingFlushes();
}

public long getBytesWritten() {
return tcpDelegate.getBytesWritten();
}

public long getBytesRead() {
return tcpDelegate.getBytesRead();
}

public long getFailedWrites() {
return tcpDelegate.getFailedWrites();
}

public long getFailedFlushes() {
return tcpDelegate.getFailedFlushes();
}

public long getPoolAcquires() {
return tcpDelegate.getPoolAcquires();
}

public long getPoolReleases() {
return tcpDelegate.getPoolReleases();
}
}
Loading

0 comments on commit c0b3387

Please sign in to comment.