From 3050d2c957835932654a556edb2c4dd71f25d66b Mon Sep 17 00:00:00 2001 From: gjohar Date: Thu, 13 Oct 2022 14:23:08 +0530 Subject: [PATCH 1/2] Adding support for comma separate multiple FQDN Signed-off-by: gjohar --- README.md | 10 ++++++ cmd/client/main.go | 74 ++++++++++++++++++++++++++--------------- cmd/client/main_test.go | 6 ++-- 3 files changed, 60 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 4fb194f..7c77b76 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,16 @@ On every target machine run the client, pointing it at the proxy: ./pushprox-client --proxy-url=http://proxy:8080/ ``` +Passing custom FQDN: +``` +./pushprox-client --proxy-url=http://proxy:8080/ --fqdn=foo +``` + +Passing custom FQDN list: (Here client continue polling for all specified FQDNs) +``` +./pushprox-client --proxy-url=http://proxy:8080/ --fqdnList=foo,bar +``` + In Prometheus, use the proxy as a `proxy_url`: ``` diff --git a/cmd/client/main.go b/cmd/client/main.go index ffc8600..35616b4 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -20,18 +20,19 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "github.com/Showmax/go-fqdn" + "github.com/loki/vendor_bkp/github.com/cenkalti/backoff/v4" "io/ioutil" "net" "net/http" "net/url" "os" "strings" + "sync" "time" kingpin "gopkg.in/alecthomas/kingpin.v2" - "github.com/Showmax/go-fqdn" - "github.com/cenkalti/backoff/v4" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -44,6 +45,7 @@ import ( var ( myFqdn = kingpin.Flag("fqdn", "FQDN to register with").Default(fqdn.Get()).String() + fqdnList = kingpin.Flag("fqdnList", "Specify comma separated FQDN values, if running for more than one FQDN").String() proxyURL = kingpin.Flag("proxy-url", "Push proxy to talk to.").Required().String() caCertFile = kingpin.Flag("tls.cacert", " CA certificate to verify peer against").String() tlsCert = kingpin.Flag("tls.cert", " Client certificate file").String() @@ -55,23 +57,26 @@ var ( ) var ( - scrapeErrorCounter = prometheus.NewCounter( + counterLabelFqdn = "fqdn" + counterLabelList = []string{counterLabelFqdn} + + scrapeErrorCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "pushprox_client_scrape_errors_total", Help: "Number of scrape errors", - }, + }, counterLabelList, ) - pushErrorCounter = prometheus.NewCounter( + pushErrorCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "pushprox_client_push_errors_total", Help: "Number of push errors", - }, + }, counterLabelList, ) - pollErrorCounter = prometheus.NewCounter( + pollErrorCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "pushprox_client_poll_errors_total", Help: "Number of poll errors", - }, + }, counterLabelList, ) ) @@ -93,27 +98,27 @@ type Coordinator struct { logger log.Logger } -func (c *Coordinator) handleErr(request *http.Request, client *http.Client, err error) { +func (c *Coordinator) handleErr(request *http.Request, client *http.Client, err error, newFqdn string) { level.Error(c.logger).Log("err", err) - scrapeErrorCounter.Inc() + scrapeErrorCounter.With(prometheus.Labels{counterLabelFqdn: newFqdn}).Inc() resp := &http.Response{ StatusCode: http.StatusInternalServerError, Body: ioutil.NopCloser(strings.NewReader(err.Error())), Header: http.Header{}, } if err = c.doPush(resp, request, client); err != nil { - pushErrorCounter.Inc() + pushErrorCounter.With(prometheus.Labels{counterLabelFqdn: newFqdn}).Inc() level.Warn(c.logger).Log("msg", "Failed to push failed scrape response:", "err", err) return } level.Info(c.logger).Log("msg", "Pushed failed scrape response") } -func (c *Coordinator) doScrape(request *http.Request, client *http.Client) { +func (c *Coordinator) doScrape(request *http.Request, client *http.Client, newFqdn string) { logger := log.With(c.logger, "scrape_id", request.Header.Get("id")) timeout, err := util.GetHeaderTimeout(request.Header) if err != nil { - c.handleErr(request, client, err) + c.handleErr(request, client, err, newFqdn) return } ctx, cancel := context.WithTimeout(request.Context(), timeout) @@ -128,20 +133,20 @@ func (c *Coordinator) doScrape(request *http.Request, client *http.Client) { request.URL.RawQuery = params.Encode() } - if request.URL.Hostname() != *myFqdn { - c.handleErr(request, client, errors.New("scrape target doesn't match client fqdn")) + if request.URL.Hostname() != newFqdn { + c.handleErr(request, client, errors.New("scrape target doesn't match client fqdn"), newFqdn) return } scrapeResp, err := client.Do(request) if err != nil { msg := fmt.Sprintf("failed to scrape %s", request.URL.String()) - c.handleErr(request, client, errors.Wrap(err, msg)) + c.handleErr(request, client, errors.Wrap(err, msg), newFqdn) return } level.Info(logger).Log("msg", "Retrieved scrape response") if err = c.doPush(scrapeResp, request, client); err != nil { - pushErrorCounter.Inc() + pushErrorCounter.With(prometheus.Labels{counterLabelFqdn: newFqdn}).Inc() level.Warn(logger).Log("msg", "Failed to push scrape response:", "err", err) return } @@ -181,7 +186,7 @@ func (c *Coordinator) doPush(resp *http.Response, origRequest *http.Request, cli return nil } -func (c *Coordinator) doPoll(client *http.Client) error { +func (c *Coordinator) doPoll(client *http.Client, newFqdn string) error { base, err := url.Parse(*proxyURL) if err != nil { level.Error(c.logger).Log("msg", "Error parsing url:", "err", err) @@ -193,35 +198,35 @@ func (c *Coordinator) doPoll(client *http.Client) error { return errors.Wrap(err, "error parsing url poll") } url := base.ResolveReference(u) - resp, err := client.Post(url.String(), "", strings.NewReader(*myFqdn)) + resp, err := client.Post(url.String(), "", strings.NewReader(newFqdn)) if err != nil { - level.Error(c.logger).Log("msg", "Error polling:", "err", err) + level.Error(c.logger).Log("msg", "Error polling:", "Using-FQDN", newFqdn, "err", err) return errors.Wrap(err, "error polling") } defer resp.Body.Close() request, err := http.ReadRequest(bufio.NewReader(resp.Body)) if err != nil { - level.Error(c.logger).Log("msg", "Error reading request:", "err", err) + level.Error(c.logger).Log("msg", "Error reading request:", "Using-FQDN", newFqdn, "err", err) return errors.Wrap(err, "error reading request") } level.Info(c.logger).Log("msg", "Got scrape request", "scrape_id", request.Header.Get("id"), "url", request.URL) request.RequestURI = "" - go c.doScrape(request, client) + go c.doScrape(request, client, newFqdn) return nil } -func (c *Coordinator) loop(bo backoff.BackOff, client *http.Client) { +func (c *Coordinator) loop(bo backoff.BackOff, client *http.Client, newFqdn string) { op := func() error { - return c.doPoll(client) + return c.doPoll(client, newFqdn) } for { if err := backoff.RetryNotify(op, bo, func(err error, _ time.Duration) { - pollErrorCounter.Inc() + pollErrorCounter.With(prometheus.Labels{counterLabelFqdn: newFqdn}).Inc() }); err != nil { level.Error(c.logger).Log("err", err) } @@ -242,7 +247,7 @@ func main() { } // Make sure proxyURL ends with a single '/' *proxyURL = strings.TrimRight(*proxyURL, "/") + "/" - level.Info(coordinator.logger).Log("msg", "URL and FQDN info", "proxy_url", *proxyURL, "fqdn", *myFqdn) + level.Info(coordinator.logger).Log("msg", "URL info", "proxy_url", *proxyURL) tlsConfig := &tls.Config{} if *tlsCert != "" { @@ -297,5 +302,20 @@ func main() { client := &http.Client{Transport: transport} - coordinator.loop(newBackOffFromFlags(), client) + //Check if FQDN list is not passed - use single FDQN + if *fqdnList == "" { + coordinator.loop(newBackOffFromFlags(), client, *myFqdn) + } else { + fqdnLister := strings.Split(*fqdnList, ",") + fqdnCount := len(fqdnLister) + var wg sync.WaitGroup + level.Info(coordinator.logger).Log("msg", "Polling in parallel for all FQDN", "FQDN Count", fqdnCount) + for i := 0; i < fqdnCount; i++ { + wg.Add(1) + go coordinator.loop(newBackOffFromFlags(), client, fqdnLister[i]) + + } + wg.Wait() + } + } diff --git a/cmd/client/main_test.go b/cmd/client/main_test.go index 0adeec2..3e7b6e9 100644 --- a/cmd/client/main_test.go +++ b/cmd/client/main_test.go @@ -49,7 +49,7 @@ func TestDoScrape(t *testing.T) { } req.Header.Add("X-Prometheus-Scrape-Timeout-Seconds", "10.0") *myFqdn = ts.URL - c.doScrape(req, ts.Client()) + c.doScrape(req, ts.Client(), *myFqdn) } func TestHandleErr(t *testing.T) { @@ -60,13 +60,13 @@ func TestHandleErr(t *testing.T) { if err != nil { t.Fatal(err) } - c.handleErr(req, ts.Client(), errors.New("test error")) + c.handleErr(req, ts.Client(), errors.New("test error"), *myFqdn) } func TestLoop(t *testing.T) { ts, c := prepareTest() defer ts.Close() - if err := c.doPoll(ts.Client()); err != nil { + if err := c.doPoll(ts.Client(), *myFqdn); err != nil { t.Fatal(err) } } From 15f7735e1065f401a15ad70383f56090b96b6d8c Mon Sep 17 00:00:00 2001 From: gjohar Date: Thu, 13 Oct 2022 15:30:06 +0530 Subject: [PATCH 2/2] Adding support for comma separate multiple FQDN Signed-off-by: gjohar --- cmd/client/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index 35616b4..529dccd 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -21,7 +21,7 @@ import ( "crypto/x509" "fmt" "github.com/Showmax/go-fqdn" - "github.com/loki/vendor_bkp/github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v4" "io/ioutil" "net" "net/http"