diff --git a/inputs/prometheus/consul.go b/inputs/prometheus/consul.go index 981bdf46..64187c88 100644 --- a/inputs/prometheus/consul.go +++ b/inputs/prometheus/consul.go @@ -7,7 +7,6 @@ import ( "log" "net/url" "strings" - "sync" "text/template" "time" @@ -23,7 +22,6 @@ type ConsulConfig struct { Agent string `toml:"agent"` QueryInterval config.Duration `toml:"query_interval"` Queries []*ConsulQuery `toml:"query"` - Catalog *api.Catalog `toml:"-"` } // One Consul service discovery query @@ -51,152 +49,98 @@ type ConsulQuery struct { lastQueryFailed bool } -func (ins *Instance) InitConsulClient() error { +func (ins *Instance) InitConsulClient(ctx context.Context) error { consulAPIConfig := api.DefaultConfig() if ins.ConsulConfig.Agent != "" { consulAPIConfig.Address = ins.ConsulConfig.Agent } + consul, err := api.NewClient(consulAPIConfig) + if err != nil { + return fmt.Errorf("cannot connect to the Consul agent: %w", err) + } + i := 0 // Parse the template for metrics URL, drop queries with template parse errors - for i := range ins.ConsulConfig.Queries { + for _, q := range ins.ConsulConfig.Queries { serviceURLTemplate, err := template.New("URL").Parse(ins.ConsulConfig.Queries[i].ServiceURL) if err != nil { return fmt.Errorf("failed to parse the Consul query URL template (%s): %s", ins.ConsulConfig.Queries[i].ServiceURL, err) } - ins.ConsulConfig.Queries[i].serviceURLTemplate = serviceURLTemplate + q.serviceURLTemplate = serviceURLTemplate // Allow to use join function in tags templateFunctions := template.FuncMap{"join": strings.Join} // Parse the tag value templates - ins.ConsulConfig.Queries[i].serviceExtraTagsTemplate = make(map[string]*template.Template) + q.serviceExtraTagsTemplate = make(map[string]*template.Template) for tagName, tagTemplateString := range ins.ConsulConfig.Queries[i].ServiceExtraTags { tagTemplate, err := template.New(tagName).Funcs(templateFunctions).Parse(tagTemplateString) if err != nil { - return fmt.Errorf("failed to parse the Consul query Extra Tag template (%s): %s", tagTemplateString, err) + log.Println("failed to parse the Consul query Extra Tag template (%s): %s", tagTemplateString, err) + continue } - ins.ConsulConfig.Queries[i].serviceExtraTagsTemplate[tagName] = tagTemplate + q.serviceExtraTagsTemplate[tagName] = tagTemplate } + ins.ConsulConfig.Queries[i] = q + i++ } // Prevent memory leak by erasing truncated values - // for j := i; j < len(ins.ConsulConfig.Queries); j++ { - // ins.ConsulConfig.Queries[j] = nil - // } - // ins.ConsulConfig.Queries = ins.ConsulConfig.Queries[:i] - - consul, err := api.NewClient(consulAPIConfig) - if err != nil { - return fmt.Errorf("failed to connect the Consul agent(%s): %v", consulAPIConfig.Address, err) - } - - ins.ConsulConfig.Catalog = consul.Catalog() - - return nil -} - -func (ins *Instance) UrlsFromConsul(ctx context.Context) ([]ScrapeUrl, error) { - if !ins.ConsulConfig.Enabled { - return []ScrapeUrl{}, nil + for j := i; j < len(ins.ConsulConfig.Queries); j++ { + ins.ConsulConfig.Queries[j] = nil } + ins.ConsulConfig.Queries = ins.ConsulConfig.Queries[:i] - if ins.DebugMod { - log.Println("D! get urls from consul:", ins.ConsulConfig.Agent) - } - - urlset := map[string]struct{}{} - var returls []ScrapeUrl - - for _, q := range ins.ConsulConfig.Queries { - queryOptions := api.QueryOptions{} - if q.ServiceDc != "" { - queryOptions.Datacenter = q.ServiceDc - } + catalog := consul.Catalog() - // Request services from Consul - consulServices, _, err := ins.ConsulConfig.Catalog.Service(q.ServiceName, q.ServiceTag, &queryOptions) + ins.wg.Add(1) + go func() { + // Store last error status and change log level depending on repeated occurence + var refreshFailed = false + defer ins.wg.Done() + err := ins.refreshConsulServices(catalog) if err != nil { - return nil, err + refreshFailed = true + log.Printf("Unable to refreh Consul services: %v", err) } - - if len(consulServices) == 0 { - if ins.DebugMod { - log.Println("D! query consul did not find any instances, service:", q.ServiceName, " tag:", q.ServiceTag) + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(ins.ConsulConfig.QueryInterval)): + err := ins.refreshConsulServices(catalog) + if err != nil { + message := fmt.Sprintf("Unable to refreh Consul services: %v", err) + if refreshFailed { + log.Println("E!", message) + } else { + log.Println("W!", message) + } + refreshFailed = true + } else if refreshFailed { + refreshFailed = false + log.Println("Successfully refreshed Consul services after previous errors") + } } - continue } + }() - if ins.DebugMod { - log.Println("D! query consul found", len(consulServices), "instances, service:", q.ServiceName, " tag:", q.ServiceTag) - } - - for _, consulService := range consulServices { - su, err := ins.getConsulServiceURL(q, consulService) - if err != nil { - return nil, fmt.Errorf("unable to get scrape URLs from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, err) - } - - if _, has := urlset[su.URL.String()]; has { - continue - } - - urlset[su.URL.String()] = struct{}{} - returls = append(returls, *su) - } - } + return nil +} - if ins.firstRun { - var wg sync.WaitGroup - consulAPIConfig := api.DefaultConfig() - if ins.ConsulConfig.Agent != "" { - consulAPIConfig.Address = ins.ConsulConfig.Agent - } +func (ins *Instance) UrlsFromConsul() ([]*ScrapeUrl, error) { + ins.lock.Lock() + defer ins.lock.Unlock() - consul, err := api.NewClient(consulAPIConfig) - if err != nil { - return []ScrapeUrl{}, fmt.Errorf("cannot connect to the Consul agent: %w", err) - } - catalog := consul.Catalog() - - wg.Add(1) - go func() { - // Store last error status and change log level depending on repeated occurrence - var refreshFailed = false - defer wg.Done() - err := ins.refreshConsulServices(catalog) - if err != nil { - refreshFailed = true - log.Printf("Unable to refresh Consul services: %v\n", err) - } - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Duration(ins.ConsulConfig.QueryInterval)): - err := ins.refreshConsulServices(catalog) - if err != nil { - message := fmt.Sprintf("Unable to refresh Consul services: %v", err) - if refreshFailed { - log.Println("E!", message) - } else { - log.Println("W!", message) - } - refreshFailed = true - } else if refreshFailed { - refreshFailed = false - log.Println("Successfully refreshed Consul services after previous errors") - } - } - } - }() - ins.firstRun = false - wg.Wait() + urls := make([]*ScrapeUrl, 0, len(ins.consulServices)) + for _, u := range ins.consulServices { + urls = append(urls, u) } - return returls, nil + return urls, nil } func (ins *Instance) refreshConsulServices(c *api.Catalog) error { - consulServiceURLs := make(map[string]ScrapeUrl) + consulServiceURLs := make(map[string]*ScrapeUrl) if ins.DebugMod { log.Println("Refreshing Consul services") @@ -240,14 +184,12 @@ func (ins *Instance) refreshConsulServices(c *api.Catalog) error { } q.lastQueryFailed = false log.Printf("Adding scrape URL from Consul for Service (%s, %s): %s\n", q.ServiceName, q.ServiceTag, uaa.URL.String()) - consulServiceURLs[uaa.URL.String()] = *uaa + consulServiceURLs[uaa.URL.String()] = uaa } } ins.lock.Lock() - for _, u := range consulServiceURLs { - ins.URLs = append(ins.URLs, u.URL.String()) - } + ins.consulServices = consulServiceURLs ins.lock.Unlock() return nil diff --git a/inputs/prometheus/prometheus.go b/inputs/prometheus/prometheus.go index 31d3446c..6339b2d3 100644 --- a/inputs/prometheus/prometheus.go +++ b/inputs/prometheus/prometheus.go @@ -45,9 +45,11 @@ type Instance struct { ignoreLabelKeysFilter filter.Filter cancel context.CancelFunc lock sync.Mutex - firstRun bool tls.ClientConfig client *http.Client + + wg sync.WaitGroup + consulServices map[string]*ScrapeUrl } func (ins *Instance) Empty() bool { @@ -67,8 +69,11 @@ func (ins *Instance) Init() error { return types.ErrInstancesEmpty } + var ctx context.Context + ctx, ins.cancel = context.WithCancel(context.Background()) + if ins.ConsulConfig.Enabled && len(ins.ConsulConfig.Queries) > 0 { - if err := ins.InitConsulClient(); err != nil { + if err := ins.InitConsulClient(ctx); err != nil { return err } } @@ -80,7 +85,6 @@ func (ins *Instance) Init() error { if ins.Timeout <= 0 { ins.Timeout = config.Duration(time.Second * 3) } - ins.firstRun = true client, err := ins.createHTTPClient() if err != nil { @@ -156,8 +160,13 @@ func (p *Prometheus) GetInstances() []inputs.Instance { return ret } +func (p *Prometheus) Drop() { + for _, ins := range p.Instances { + ins.Drop() + } +} + func (ins *Instance) Gather(slist *types.SampleList) { - var ctx context.Context urlwg := new(sync.WaitGroup) defer urlwg.Wait() @@ -170,11 +179,10 @@ func (ins *Instance) Gather(slist *types.SampleList) { urlwg.Add(1) - go ins.gatherUrl(urlwg, slist, ScrapeUrl{URL: u, Tags: map[string]string{}}) + go ins.gatherUrl(urlwg, slist, &ScrapeUrl{URL: u, Tags: map[string]string{}}) } - ctx, ins.cancel = context.WithCancel(context.Background()) - urls, err := ins.UrlsFromConsul(ctx) + urls, err := ins.UrlsFromConsul() if err != nil { log.Println("E! failed to query urls from consul:", err) return @@ -186,7 +194,7 @@ func (ins *Instance) Gather(slist *types.SampleList) { } } -func (ins *Instance) gatherUrl(urlwg *sync.WaitGroup, slist *types.SampleList, uri ScrapeUrl) { +func (ins *Instance) gatherUrl(urlwg *sync.WaitGroup, slist *types.SampleList, uri *ScrapeUrl) { defer urlwg.Done() u := uri.URL @@ -271,4 +279,5 @@ func (ins *Instance) setHeaders(req *http.Request) { func (ins *Instance) Drop() { ins.cancel() + ins.wg.Wait() }