Skip to content

Commit

Permalink
Push metrics to Prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Nov 9, 2023
1 parent f7712e0 commit cad6df7
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 37 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_pushgateway</artifactId>
<version>0.16.0</version>
</dependency>
</dependencies>

<build>
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/oltpbenchmark/DBWorkload.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public static void main(String[] args) throws Exception {
intervalMonitor = Integer.parseInt(argsLine.getOptionValue("im"));
}

// Run the Prometheus metrics pusher
if (argsLine.hasOption("ma")) {
PrometheusMetrics.run(argsLine.getOptionValue("ma"));
}

// -------------------------------------------------------------------
// GET PLUGIN LIST
// -------------------------------------------------------------------
Expand Down Expand Up @@ -505,6 +510,7 @@ private static Options buildOptions(XMLConfiguration pluginConfig) {
options.addOption("d", "directory", true, "Base directory for the result files, default is current directory");
options.addOption(null, "dialects-export", true, "Export benchmark SQL to a dialects file");
options.addOption("jh", "json-histograms", true, "Export histograms to JSON file");
options.addOption("ma", "metrics-address", true, "Prometheus PushGateway address");
return options;
}

Expand Down
58 changes: 58 additions & 0 deletions src/main/java/com/oltpbenchmark/PrometheusMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.oltpbenchmark;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import io.prometheus.client.exporter.PushGateway;

public class PrometheusMetrics {
private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetrics.class);

private static final CollectorRegistry registry = new CollectorRegistry();

public static final Counter TXNS = Counter.build()
.name("benchbase_txn_total")
.help("Total number of transactions")
.labelNames("benchmark", "type", "status")
.register(registry);

public static final Histogram TXN_DURATION = Histogram.build()
.name("benchbase_txn_duration_seconds")
.help("Transaction duration in seconds")
.labelNames("benchmark", "type")
.buckets(0.000_001, 0.000_010, 0.000_100, // 1 us, 10 us, 100 us
0.001_000, 0.010_000, 0.100_000, // 1 ms, 10 ms, 100 ms
1.0, 10.0, 100.0) // 1 s, 10 s, 100 s
.register(registry);

public static final Histogram STATEMENT_DURATION = Histogram.build()
.name("benchbase_statement_duration_seconds")
.help("Statement duration in seconds")
.labelNames("benchmark", "type", "name")
.buckets(0.000_001, 0.000_010, 0.000_100, // 1 us, 10 us, 100 us
0.001_000, 0.010_000, 0.100_000, // 1 ms, 10 ms, 100 ms
1.0, 10.0, 100.0) // 1 s, 10 s, 100 s
.register(registry);

public static void run(String pushGatewayAddress) {
new Thread() {
@Override
public void run() {
PushGateway pushGateway = new PushGateway(pushGatewayAddress);
while (true) {
try {
pushGateway.pushAdd(registry, "benchbase");
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.start();

LOG.info("Started Prometheus metrics pusher to {}", pushGatewayAddress);
}
}
14 changes: 13 additions & 1 deletion src/main/java/com/oltpbenchmark/api/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import com.oltpbenchmark.types.State;
import com.oltpbenchmark.types.TransactionStatus;
import com.oltpbenchmark.util.Histogram;

import io.prometheus.client.Histogram.Timer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -310,6 +313,10 @@ public final void run() {
if (preState == MEASURE && postPhase.getId() == prePhase.getId()) {
latencies.addLatency(transactionType.getId(), start, end, this.id, prePhase.getId());
intervalRequests.incrementAndGet();

PrometheusMetrics.TXN_DURATION.labels(this.benchmark.getBenchmarkName(),
transactionType.getName())
.observe((end - start) / 1_000_000_000.0);
}
if (prePhase.isLatencyRun()) {
workloadState.startColdQuery();
Expand Down Expand Up @@ -429,7 +436,10 @@ protected final void doWork(DatabaseType databaseType, TransactionType transacti
LOG.debug(String.format("%s %s committing...", this, transactionType));
}

conn.commit();
try (Timer timer = PrometheusMetrics.STATEMENT_DURATION.labels(this.benchmark.getBenchmarkName(),
transactionType.getName(), "commit").startTimer()) {
conn.commit();
}

break;

Expand Down Expand Up @@ -482,6 +492,8 @@ protected final void doWork(DatabaseType databaseType, TransactionType transacti
case ERROR -> this.txnErrors.put(transactionType);
}

PrometheusMetrics.TXNS.labels(this.benchmark.getBenchmarkName(), transactionType.getName(),
status.name()).inc();
}

}
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/com/oltpbenchmark/benchmarks/hot/HOTBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public class HOTBenchmark extends BenchmarkModule {
/**
* The length in characters of each field
*/
protected final int fieldSize;
protected final int region;
protected final int hot;
protected final int keysPerTxn;
protected final int maxScanCount;
protected final boolean loadAll;
final int fieldSize;
final int region;
final int hot;
final int keysPerTxn;
final int maxScanCount;
final boolean loadAll;

public HOTBenchmark(WorkloadConfiguration workConf) {
super(workConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,13 @@ private void finalizePartitions(Connection conn) throws SQLException {
continue;
}
String maxKeySql = String.format("SELECT MAX(ycsb_key) FROM %s_%s;", HOTConstants.TABLE_NAME, p.getId());
try (PreparedStatement stmt = conn.prepareStatement(maxKeySql)) {
try (ResultSet res = stmt.executeQuery()) {
int maxKey = 0;
while (res.next()) {
maxKey = res.getInt(1);
}
p.setInsertCounterStartFromMaxKey(numInsertionSlots, maxKey);
try (PreparedStatement stmt = conn.prepareStatement(maxKeySql);
ResultSet res = stmt.executeQuery()) {
int maxKey = 0;
while (res.next()) {
maxKey = res.getInt(1);
}
p.setInsertCounterStartFromMaxKey(numInsertionSlots, maxKey);
}
}
this.partitions = Collections.unmodifiableList(this.partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package com.oltpbenchmark.benchmarks.hot.procedures;

import com.oltpbenchmark.PrometheusMetrics;
import com.oltpbenchmark.api.Procedure;
import com.oltpbenchmark.api.SQLStmt;

import io.prometheus.client.Histogram.Timer;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -31,53 +34,72 @@

class BasicProcedures extends Procedure {
protected void insert(Connection conn, Key key, String[] vals) throws SQLException {
try (PreparedStatement stmt = this.prepareInsertStmt(conn, key, vals)) {
try (Timer timer = PrometheusMetrics.STATEMENT_DURATION.labels(
"hot",
this.getProcedureName(),
"insert").startTimer();
PreparedStatement stmt = this.prepareInsertStmt(conn, key, vals)) {
stmt.executeUpdate();
}
}

protected void read(Connection conn, Key key, String[] results) throws SQLException {
try (PreparedStatement stmt = this.prepareReadStmt(conn, key)) {
try (ResultSet r = stmt.executeQuery()) {
while (r.next()) {
for (int i = 0; i < results.length; i++) {
results[i] = r.getString(i + 1);
}
try (Timer timer = PrometheusMetrics.STATEMENT_DURATION.labels(
"hot",
this.getProcedureName(),
"read").startTimer();
PreparedStatement stmt = this.prepareReadStmt(conn, key);
ResultSet r = stmt.executeQuery()) {
while (r.next()) {
for (int i = 0; i < results.length; i++) {
results[i] = r.getString(i + 1);
}
}
}
}

protected void update(Connection conn, Key key, String[] fields) throws SQLException {
try (PreparedStatement stmt = this.prepareUpdateStmt(conn, key, fields)) {
try (Timer timer = PrometheusMetrics.STATEMENT_DURATION.labels(
"hot",
this.getProcedureName(),
"update").startTimer();
PreparedStatement stmt = this.prepareUpdateStmt(conn, key, fields)) {
stmt.executeUpdate();
}
}

protected void scan(Connection conn, Key start, int count, List<String[]> results) throws SQLException {
try (PreparedStatement stmt = this.prepareScanStmt(conn, start, count)) {
try (ResultSet r = stmt.executeQuery()) {
while (r.next()) {
ResultSetMetaData meta = r.getMetaData();
String[] data = new String[meta.getColumnCount()];
for (int i = 0; i < data.length; i++) {
data[i] = r.getString(i + 1);
}
results.add(data);
try (Timer timer = PrometheusMetrics.STATEMENT_DURATION.labels(
"hot",
this.getProcedureName(),
"scan").startTimer();
PreparedStatement stmt = this.prepareScanStmt(conn, start, count);
ResultSet r = stmt.executeQuery()) {
while (r.next()) {
ResultSetMetaData meta = r.getMetaData();
String[] data = new String[meta.getColumnCount()];
for (int i = 0; i < data.length; i++) {
data[i] = r.getString(i + 1);
}
results.add(data);
}
}

}

protected void readModifyWrite(Connection conn, Key key, String[] fields,
String[] results)
throws SQLException {
try (PreparedStatement stmt = this.prepareReadStmt(conn, key)) {
try (ResultSet r = stmt.executeQuery()) {
while (r.next()) {
for (int i = 0; i < results.length; i++) {
results[i] = r.getString(i + 1);
}
try (
Timer timer = PrometheusMetrics.STATEMENT_DURATION.labels(
"hot",
this.getProcedureName(),
"read-modify-write").startTimer();
PreparedStatement stmt = this.prepareReadStmt(conn, key);
ResultSet r = stmt.executeQuery()) {
while (r.next()) {
for (int i = 0; i < results.length; i++) {
results[i] = r.getString(i + 1);
}
}
}
Expand Down

0 comments on commit cad6df7

Please sign in to comment.