Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

Commit

Permalink
http-output fix empty body in retry again (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexAkulov authored and vjeantet committed Apr 1, 2018
1 parent 8974e94 commit 84578e2
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
35 changes: 17 additions & 18 deletions processors/output-http/httpoutput.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (b *batch) Fire(notifier muster.Notifier) {
writer := bufio.NewWriter(&body)
enc, err := b.p.opt.Codec.NewEncoder(writer)
if err != nil {
b.p.Logger.Errorf("%d events lost. codec error: %v", len(b.Items), err)
b.p.Logger.Errorf("Lost %d messages. Codec failed with: %d", len(b.Items), err)
return
}
for i := range b.Items {
Expand All @@ -196,30 +196,21 @@ func (b *batch) Fire(notifier muster.Notifier) {
}
}
if err := writer.Flush(); err != nil {
b.p.Logger.Errorf("%d events lost with error: %v", len(b.Items), err)
b.p.Logger.Errorf("Lost %d messages with: %v", len(b.Items), err)
return
}
req, err := http.NewRequest(b.p.opt.HTTPMethod, *b.url, bytes.NewBuffer(body.Bytes()))
if err != nil {
b.p.Logger.Errorf("Create request failed with: %v Lost %d messages", err, len(b.Items))
return
}
defer req.Body.Close()
for hName, hValue := range b.headers {
req.Header.Set(hName, hValue)
}

for {
retry, err := b.p.send(req)
retry, err := b.send(body.Bytes())
if err == nil {
b.p.Logger.Debugf("Successfully sent %d messages", len(b.Items))
return
}
if !retry {
b.p.Logger.Errorf("Lost %d messages with %v", len(b.Items), err)
b.p.Logger.Errorf("Lost %d messages. %v", len(b.Items), err)
return
}
b.p.Logger.Warnf("Can't sent %d messages with: %v. Retry after %d seconds", len(b.Items), err, b.p.opt.RetryInterval)
b.p.Logger.Warnf("Can't sent %d messages. %v. Retry after %d seconds", len(b.Items), err, b.p.opt.RetryInterval)
select {
case <-b.p.shutdown:
b.p.Logger.Errorf("Shutdown. Lost %d messages", len(b.Items))
Expand All @@ -230,15 +221,23 @@ func (b *batch) Fire(notifier muster.Notifier) {
}
}

func (p *processor) send(req *http.Request) (retry bool, err error) {
resp, err := p.httpClient.Do(req)
func (b *batch) send(body []byte) (retry bool, err error) {
req, err := http.NewRequest(b.p.opt.HTTPMethod, *b.url, bytes.NewBuffer(body))
if err != nil {
return false, fmt.Errorf("Create request failed with: %v", err)
}
defer req.Body.Close()
for hName, hValue := range b.headers {
req.Header.Set(hName, hValue)
}
resp, err := b.p.httpClient.Do(req)
if err != nil {
return true, fmt.Errorf("Send request failed with: %v", err)
}
defer resp.Body.Close()

io.Copy(ioutil.Discard, resp.Body)
for _, ignoreCode := range p.opt.IgnorableCodes {
for _, ignoreCode := range b.p.opt.IgnorableCodes {
if resp.StatusCode == ignoreCode {
return false, nil
}
Expand All @@ -247,7 +246,7 @@ func (p *processor) send(req *http.Request) (retry bool, err error) {
return false, nil
}

for _, retryCode := range p.opt.RetryableCodes {
for _, retryCode := range b.p.opt.RetryableCodes {
if resp.StatusCode == retryCode {
return true, fmt.Errorf("Server returned %s", resp.Status)
}
Expand Down
3 changes: 2 additions & 1 deletion processors/output-http/httpoutput_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,17 @@ func TestRetry(t *testing.T) {
c := make(chan string, 1)
counter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
counter++
if counter == 1 {
ioutil.ReadAll(r.Body)
w.WriteHeader(http.StatusInternalServerError)
c <- "500"
} else {
body, _ := ioutil.ReadAll(r.Body)
c <- string(body)
w.WriteHeader(http.StatusOK)
}
r.Body.Close()
}))
defer ts.Close()
p := New().(*processor)
Expand Down

0 comments on commit 84578e2

Please sign in to comment.