Skip to content

Commit

Permalink
[FLINK-36240][runtime/metrics] Fix incorrect Port display in the Prom…
Browse files Browse the repository at this point in the history
…etheusReporter constructor
  • Loading branch information
eon2208 authored and ferenc-csaky committed Sep 19, 2024
1 parent 8302e12 commit 20e9826
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.util.PortRange;
import org.apache.flink.util.Preconditions;

import io.prometheus.client.exporter.HTTPServer;
Expand All @@ -43,9 +44,10 @@ int getPort() {
return port;
}

PrometheusReporter(Iterator<Integer> ports) {
while (ports.hasNext()) {
port = ports.next();
PrometheusReporter(PortRange portRange) {
Iterator<Integer> portsIterator = portRange.getPortsIterator();
while (portsIterator.hasNext()) {
port = portsIterator.next();
try {
httpServer = new HTTPServer(new InetSocketAddress(port), this.registry);
log.info("Started PrometheusReporter HTTP server on port {}.", port);
Expand All @@ -58,7 +60,7 @@ int getPort() {
if (httpServer == null) {
throw new RuntimeException(
"Could not start PrometheusReporter HTTP server on any configured port. Ports: "
+ ports);
+ portRange);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.PortRange;

import java.util.Iterator;
import java.util.Properties;

/** {@link MetricReporterFactory} for {@link PrometheusReporter}. */
Expand All @@ -34,8 +33,8 @@ public class PrometheusReporterFactory implements MetricReporterFactory {
public PrometheusReporter createMetricReporter(Properties properties) {
MetricConfig metricConfig = (MetricConfig) properties;
String portsConfig = metricConfig.getString(ARG_PORT, DEFAULT_PORT);
Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
PortRange portRange = new PortRange(portsConfig);

return new PrometheusReporter(ports);
return new PrometheusReporter(portRange);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.util.TestHistogram;
import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.PortRange;

import com.mashape.unirest.http.exceptions.UnirestException;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -59,7 +59,7 @@ class PrometheusReporterTaskScopeTest {

@BeforeEach
void setupReporter() {
reporter = new PrometheusReporter(NetUtils.getPortRangeFromString("9400-9500"));
reporter = new PrometheusReporter(new PortRange("9400-9500"));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.util.TestHistogram;
import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.util.NetUtils;

import org.apache.flink.shaded.curator5.com.google.common.collect.Iterators;
import org.apache.flink.util.PortRange;

import com.mashape.unirest.http.HttpResponse;
import com.mashape.unirest.http.Unirest;
Expand All @@ -39,9 +37,7 @@
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;

Expand Down Expand Up @@ -70,7 +66,8 @@ class PrometheusReporterTest {

@BeforeEach
void setupReporter() {
reporter = new PrometheusReporter(portRangeProvider.next());
PortRange portRange = new PortRange(portRangeProvider.nextRange());
reporter = new PrometheusReporter(portRange);

metricGroup =
TestUtils.createTestMetricGroup(
Expand Down Expand Up @@ -227,18 +224,17 @@ void registeringSameMetricTwiceDoesNotThrowException() {

@Test
void cannotStartTwoReportersOnSamePort() {
assertThatThrownBy(
() ->
new PrometheusReporter(
Collections.singleton(reporter.getPort()).iterator()))
.isInstanceOf(Exception.class);
PortRange portRange = new PortRange(reporter.getPort());
assertThatThrownBy(() -> new PrometheusReporter(portRange))
.isInstanceOf(RuntimeException.class)
.hasMessageMatching(
"^Could not start PrometheusReporter HTTP server on any configured port. Ports: \\d+(, \\d+)*");
}

@Test
void canStartTwoReportersWhenUsingPortRange() {
final Iterator<Integer> portRange =
Iterators.concat(
Iterators.singletonIterator(reporter.getPort()), portRangeProvider.next());
String ports = reporter.getPort() + ", " + portRangeProvider.nextRange();
PortRange portRange = new PortRange(ports);
new PrometheusReporter(portRange).close();
}

Expand All @@ -262,28 +258,27 @@ private static String createExpectedPollResponse(
}

/** Utility class providing distinct port ranges. */
private static class PortRangeProvider implements Iterator<Iterator<Integer>> {
private static class PortRangeProvider {

private int base = 9000;

@Override
public boolean hasNext() {
return base < 14000; // arbitrary limit that should be sufficient for test purposes
}

/**
* Returns the next port range containing exactly 100 ports.
* Returns the next port range containing exactly 100 ports as string.
*
* @return next port range
*/
public Iterator<Integer> next() {
public String nextRange() {
if (!hasNext()) {
throw new NoSuchElementException();
}
int lowEnd = base;
int highEnd = base + 99;
base += 100;
return NetUtils.getPortRangeFromString(lowEnd + "-" + highEnd);
return lowEnd + "-" + highEnd;
}

private boolean hasNext() {
return base < 14000; // arbitrary limit that should be sufficient for test purposes
}
}
}

0 comments on commit 20e9826

Please sign in to comment.