From 0cc0b954e277b755ba88ff7d0595976aed684e4a Mon Sep 17 00:00:00 2001 From: Reinier Schoof Date: Sat, 20 May 2023 10:52:16 +0200 Subject: [PATCH] Retry endpoint after the initial connection was refused Signed-off-by: Reinier Schoof --- collector/collector.go | 19 ++++++++++++++----- collector/collector_test.go | 14 +++++++------- exporter/exporter.go | 2 +- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/collector/collector.go b/collector/collector.go index c269676..e7c9f8e 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -278,7 +278,7 @@ func (nc *NATSCollector) Collect(ch chan<- prometheus.Metric) { // initMetricsFromServers builds the configuration // For each NATS Metrics endpoint (/*z) get the first URL // to determine the list of possible metrics. -func (nc *NATSCollector) initMetricsFromServers(namespace string) { +func (nc *NATSCollector) initMetricsFromServers(namespace string, retryTime time.Duration) { var response map[string]interface{} nc.Stats = make(map[string]metric) @@ -302,6 +302,13 @@ func (nc *NATSCollector) initMetricsFromServers(namespace string) { } } + if response == nil && retryTime > 0 { + Debugf("Retrying NATS connection for %s in %s", namespace, retryTime) + time.Sleep(retryTime) + nc.initMetricsFromServers(namespace, 0) + return + } + nc.objectToMetrics(response, namespace) } @@ -374,7 +381,7 @@ func (nc *NATSCollector) objectToMetrics(response map[string]interface{}, namesp } } -func newNatsCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector { +func newNatsCollector(system, endpoint string, servers []*CollectedServer, retryTime time.Duration) prometheus.Collector { // TODO: Potentially add TLS config in the transport. tr := &http.Transport{} hc := &http.Client{Transport: tr} @@ -394,7 +401,7 @@ func newNatsCollector(system, endpoint string, servers []*CollectedServer) prome } } - nc.initMetricsFromServers(system) + nc.initMetricsFromServers(system, retryTime) return nc } @@ -415,7 +422,9 @@ func boolToFloat(b bool) float64 { // NewCollector creates a new NATS Collector from a list of monitoring URLs. // Each URL should be to a specific endpoint (e.g. varz, connz, healthz, subsz, or routez) -func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) prometheus.Collector { +// Some endpoints might not be available right away so a retry time is set after +// which the endpoint is tried again. +func NewCollector(system, endpoint, prefix string, servers []*CollectedServer, retryTime time.Duration) prometheus.Collector { if isStreamingEndpoint(system, endpoint) { return newStreamingCollector(getSystem(system, prefix), endpoint, servers) } @@ -437,5 +446,5 @@ func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) p if isJszEndpoint(system) { return newJszCollector(getSystem(system, prefix), endpoint, servers) } - return newNatsCollector(getSystem(system, prefix), endpoint, servers) + return newNatsCollector(getSystem(system, prefix), endpoint, servers, retryTime) } diff --git a/collector/collector_test.go b/collector/collector_test.go index b4af926..306fbd3 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -39,7 +39,7 @@ func verifyCollector(system, url string, endpoint string, cases map[string]float ID: "id", URL: url, } - coll := NewCollector(system, endpoint, "", servers) + coll := NewCollector(system, endpoint, "", servers, time.Microsecond) // now collect the metrics c := make(chan prometheus.Metric) @@ -74,7 +74,7 @@ func verifyStreamingCollector(url string, endpoint string, cases map[string]floa ID: "id", URL: url, } - coll := NewCollector(StreamingSystem, endpoint, "", servers) + coll := NewCollector(StreamingSystem, endpoint, "", servers, time.Microsecond) // now collect the metrics c := make(chan prometheus.Metric) @@ -152,7 +152,7 @@ func getLabelValues(system, url, endpoint string, metricNames []string) (map[str ID: "id", URL: url, } - coll := NewCollector(system, endpoint, "", servers) + coll := NewCollector(system, endpoint, "", servers, time.Microsecond) coll.Collect(metrics) close(metrics) @@ -264,10 +264,10 @@ func TestRegister(t *testing.T) { // check duplicates do not panic servers = append(servers, cs) - NewCollector("test", "varz", "", servers) + NewCollector("test", "varz", "", servers, time.Microsecond) // test idenpotency. - nc := NewCollector("test", "varz", "", servers) + nc := NewCollector("test", "varz", "", servers, time.Microsecond) // test without a server (no error). if err := prometheus.Register(nc); err != nil { @@ -283,7 +283,7 @@ func TestRegister(t *testing.T) { defer s.Shutdown() // test collect with a server - nc = NewCollector("test", "varz", "", servers) + nc = NewCollector("test", "varz", "", servers, time.Microsecond) if err := prometheus.Register(nc); err != nil { t.Fatal("Failed to register collector:", err) } @@ -293,7 +293,7 @@ func TestRegister(t *testing.T) { prometheus.Unregister(nc) // test collect with an invalid endpoint - nc = NewCollector("test", "GARBAGE", "", servers) + nc = NewCollector("test", "GARBAGE", "", servers, time.Microsecond) if err := prometheus.Register(nc); err != nil { t.Fatal("Failed to register collector:", err) } diff --git a/exporter/exporter.go b/exporter/exporter.go index 62f72b5..5ed0337 100644 --- a/exporter/exporter.go +++ b/exporter/exporter.go @@ -126,7 +126,7 @@ func (ne *NATSExporter) createCollector(system, endpoint string) { ne.registerCollector(system, endpoint, collector.NewCollector(system, endpoint, ne.opts.Prefix, - ne.servers)) + ne.servers, ne.opts.RetryInterval)) } func (ne *NATSExporter) registerCollector(system, endpoint string, nc prometheus.Collector) {