From 84578e21ea2619ee30de33abd6fde72a1322778b Mon Sep 17 00:00:00 2001 From: Alexander Akulov Date: Sun, 1 Apr 2018 18:41:11 +0500 Subject: [PATCH] http-output fix empty body in retry again (#76) --- processors/output-http/httpoutput.go | 35 +++++++++++------------ processors/output-http/httpoutput_test.go | 3 +- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/processors/output-http/httpoutput.go b/processors/output-http/httpoutput.go index e34d0923..25db5f88 100644 --- a/processors/output-http/httpoutput.go +++ b/processors/output-http/httpoutput.go @@ -187,7 +187,7 @@ func (b *batch) Fire(notifier muster.Notifier) { writer := bufio.NewWriter(&body) enc, err := b.p.opt.Codec.NewEncoder(writer) if err != nil { - b.p.Logger.Errorf("%d events lost. codec error: %v", len(b.Items), err) + b.p.Logger.Errorf("Lost %d messages. Codec failed with: %d", len(b.Items), err) return } for i := range b.Items { @@ -196,30 +196,21 @@ func (b *batch) Fire(notifier muster.Notifier) { } } if err := writer.Flush(); err != nil { - b.p.Logger.Errorf("%d events lost with error: %v", len(b.Items), err) + b.p.Logger.Errorf("Lost %d messages with: %v", len(b.Items), err) return } - req, err := http.NewRequest(b.p.opt.HTTPMethod, *b.url, bytes.NewBuffer(body.Bytes())) - if err != nil { - b.p.Logger.Errorf("Create request failed with: %v Lost %d messages", err, len(b.Items)) - return - } - defer req.Body.Close() - for hName, hValue := range b.headers { - req.Header.Set(hName, hValue) - } for { - retry, err := b.p.send(req) + retry, err := b.send(body.Bytes()) if err == nil { b.p.Logger.Debugf("Successfully sent %d messages", len(b.Items)) return } if !retry { - b.p.Logger.Errorf("Lost %d messages with %v", len(b.Items), err) + b.p.Logger.Errorf("Lost %d messages. %v", len(b.Items), err) return } - b.p.Logger.Warnf("Can't sent %d messages with: %v. Retry after %d seconds", len(b.Items), err, b.p.opt.RetryInterval) + b.p.Logger.Warnf("Can't sent %d messages. %v. Retry after %d seconds", len(b.Items), err, b.p.opt.RetryInterval) select { case <-b.p.shutdown: b.p.Logger.Errorf("Shutdown. Lost %d messages", len(b.Items)) @@ -230,15 +221,23 @@ func (b *batch) Fire(notifier muster.Notifier) { } } -func (p *processor) send(req *http.Request) (retry bool, err error) { - resp, err := p.httpClient.Do(req) +func (b *batch) send(body []byte) (retry bool, err error) { + req, err := http.NewRequest(b.p.opt.HTTPMethod, *b.url, bytes.NewBuffer(body)) + if err != nil { + return false, fmt.Errorf("Create request failed with: %v", err) + } + defer req.Body.Close() + for hName, hValue := range b.headers { + req.Header.Set(hName, hValue) + } + resp, err := b.p.httpClient.Do(req) if err != nil { return true, fmt.Errorf("Send request failed with: %v", err) } defer resp.Body.Close() io.Copy(ioutil.Discard, resp.Body) - for _, ignoreCode := range p.opt.IgnorableCodes { + for _, ignoreCode := range b.p.opt.IgnorableCodes { if resp.StatusCode == ignoreCode { return false, nil } @@ -247,7 +246,7 @@ func (p *processor) send(req *http.Request) (retry bool, err error) { return false, nil } - for _, retryCode := range p.opt.RetryableCodes { + for _, retryCode := range b.p.opt.RetryableCodes { if resp.StatusCode == retryCode { return true, fmt.Errorf("Server returned %s", resp.Status) } diff --git a/processors/output-http/httpoutput_test.go b/processors/output-http/httpoutput_test.go index 9d02d940..3b059126 100644 --- a/processors/output-http/httpoutput_test.go +++ b/processors/output-http/httpoutput_test.go @@ -86,9 +86,9 @@ func TestRetry(t *testing.T) { c := make(chan string, 1) counter := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() counter++ if counter == 1 { + ioutil.ReadAll(r.Body) w.WriteHeader(http.StatusInternalServerError) c <- "500" } else { @@ -96,6 +96,7 @@ func TestRetry(t *testing.T) { c <- string(body) w.WriteHeader(http.StatusOK) } + r.Body.Close() })) defer ts.Close() p := New().(*processor)