Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry endpoint after the initial connection was refused #230

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (nc *NATSCollector) Collect(ch chan<- prometheus.Metric) {
// initMetricsFromServers builds the configuration
// For each NATS Metrics endpoint (/*z) get the first URL
// to determine the list of possible metrics.
func (nc *NATSCollector) initMetricsFromServers(namespace string) {
func (nc *NATSCollector) initMetricsFromServers(namespace string, retryTime time.Duration) {
var response map[string]interface{}

nc.Stats = make(map[string]metric)
Expand All @@ -302,6 +302,13 @@ func (nc *NATSCollector) initMetricsFromServers(namespace string) {
}
}

if response == nil && retryTime > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only retry once, which seems may solve the problem sometimes but in general I would think we should do it in a loop to wait until we've got some response.

I think it may be good to add an option like e.g. --max-retries to be able to bail out, but that can be done in a separate PR since it owuld also apply to other places where opts.RetryInterval is used.

Debugf("Retrying NATS connection for %s in %s", namespace, retryTime)
time.Sleep(retryTime)
nc.initMetricsFromServers(namespace, 0)
return
}

nc.objectToMetrics(response, namespace)
}

Expand Down Expand Up @@ -374,7 +381,7 @@ func (nc *NATSCollector) objectToMetrics(response map[string]interface{}, namesp
}
}

func newNatsCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector {
func newNatsCollector(system, endpoint string, servers []*CollectedServer, retryTime time.Duration) prometheus.Collector {
// TODO: Potentially add TLS config in the transport.
tr := &http.Transport{}
hc := &http.Client{Transport: tr}
Expand All @@ -394,7 +401,7 @@ func newNatsCollector(system, endpoint string, servers []*CollectedServer) prome
}
}

nc.initMetricsFromServers(system)
nc.initMetricsFromServers(system, retryTime)

return nc
}
Expand All @@ -415,7 +422,9 @@ func boolToFloat(b bool) float64 {

// NewCollector creates a new NATS Collector from a list of monitoring URLs.
// Each URL should be to a specific endpoint (e.g. varz, connz, healthz, subsz, or routez)
func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) prometheus.Collector {
// Some endpoints might not be available right away so a retry time is set after
// which the endpoint is tried again.
func NewCollector(system, endpoint, prefix string, servers []*CollectedServer, retryTime time.Duration) prometheus.Collector {
if isStreamingEndpoint(system, endpoint) {
return newStreamingCollector(getSystem(system, prefix), endpoint, servers)
}
Expand All @@ -437,5 +446,5 @@ func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) p
if isJszEndpoint(system) {
return newJszCollector(getSystem(system, prefix), endpoint, servers)
}
return newNatsCollector(getSystem(system, prefix), endpoint, servers)
return newNatsCollector(getSystem(system, prefix), endpoint, servers, retryTime)
}
14 changes: 7 additions & 7 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func verifyCollector(system, url string, endpoint string, cases map[string]float
ID: "id",
URL: url,
}
coll := NewCollector(system, endpoint, "", servers)
coll := NewCollector(system, endpoint, "", servers, time.Microsecond)

// now collect the metrics
c := make(chan prometheus.Metric)
Expand Down Expand Up @@ -74,7 +74,7 @@ func verifyStreamingCollector(url string, endpoint string, cases map[string]floa
ID: "id",
URL: url,
}
coll := NewCollector(StreamingSystem, endpoint, "", servers)
coll := NewCollector(StreamingSystem, endpoint, "", servers, time.Microsecond)

// now collect the metrics
c := make(chan prometheus.Metric)
Expand Down Expand Up @@ -152,7 +152,7 @@ func getLabelValues(system, url, endpoint string, metricNames []string) (map[str
ID: "id",
URL: url,
}
coll := NewCollector(system, endpoint, "", servers)
coll := NewCollector(system, endpoint, "", servers, time.Microsecond)
coll.Collect(metrics)
close(metrics)

Expand Down Expand Up @@ -264,10 +264,10 @@ func TestRegister(t *testing.T) {
// check duplicates do not panic
servers = append(servers, cs)

NewCollector("test", "varz", "", servers)
NewCollector("test", "varz", "", servers, time.Microsecond)

// test idenpotency.
nc := NewCollector("test", "varz", "", servers)
nc := NewCollector("test", "varz", "", servers, time.Microsecond)

// test without a server (no error).
if err := prometheus.Register(nc); err != nil {
Expand All @@ -283,7 +283,7 @@ func TestRegister(t *testing.T) {
defer s.Shutdown()

// test collect with a server
nc = NewCollector("test", "varz", "", servers)
nc = NewCollector("test", "varz", "", servers, time.Microsecond)
if err := prometheus.Register(nc); err != nil {
t.Fatal("Failed to register collector:", err)
}
Expand All @@ -293,7 +293,7 @@ func TestRegister(t *testing.T) {
prometheus.Unregister(nc)

// test collect with an invalid endpoint
nc = NewCollector("test", "GARBAGE", "", servers)
nc = NewCollector("test", "GARBAGE", "", servers, time.Microsecond)
if err := prometheus.Register(nc); err != nil {
t.Fatal("Failed to register collector:", err)
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (ne *NATSExporter) createCollector(system, endpoint string) {
ne.registerCollector(system, endpoint,
collector.NewCollector(system, endpoint,
ne.opts.Prefix,
ne.servers))
ne.servers, ne.opts.RetryInterval))
}

func (ne *NATSExporter) registerCollector(system, endpoint string, nc prometheus.Collector) {
Expand Down