Skip to content

Commit

Permalink
Close TCP, TLS connections gracefully to avoid data loss (#123)
Browse files Browse the repository at this point in the history
TCP connections, including TLS connections, acknowledge received data.

Although a simple `net.Conn.Close()` will put all previously written
data on the network, the receiving server may disregard data that it
can't successfully acknowledge.

Graceful acknowledgement and closure can be facilitated by the client
closing writes first, and reading any available data before fully
closing the connection.
  • Loading branch information
chrisberkhout authored Nov 11, 2024
1 parent ab7b158 commit df7b22c
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .changelog/123.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
TCP, TLS outputs: Close connections gracefully to avoid data loss.
```
31 changes: 26 additions & 5 deletions internal/output/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package tcp

import (
"context"
"errors"
"io"
"net"
"time"

Expand All @@ -18,7 +20,7 @@ func init() {

type Output struct {
opts *output.Options
conn net.Conn
conn *net.TCPConn
}

func New(opts *output.Options) (output.Output, error) {
Expand All @@ -33,7 +35,7 @@ func (o *Output) DialContext(ctx context.Context) error {
return err
}

o.conn = conn
o.conn = conn.(*net.TCPConn)
return nil
}

Expand All @@ -42,10 +44,29 @@ func (o *Output) Conn() net.Conn {
}

func (o *Output) Close() error {
if o.conn == nil {
return nil
if o.conn != nil {
if err := o.conn.CloseWrite(); err != nil {
return err
}

// drain to facilitate graceful close on the other side
deadline := time.Now().Add(5 * time.Second)
if err := o.conn.SetReadDeadline(deadline); err != nil {
return err
}
buffer := make([]byte, 1024)
for {
_, err := o.conn.Read(buffer)
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
}

return o.conn.Close()
}
return o.conn.Close()
return nil
}

func (o *Output) Write(b []byte) (int, error) {
Expand Down
25 changes: 23 additions & 2 deletions internal/output/tls/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package tcp
import (
"context"
"crypto/tls"
"errors"
"io"
"net"
"time"

Expand All @@ -19,7 +21,7 @@ func init() {

type Output struct {
opts *output.Options
conn net.Conn
conn *tls.Conn
}

func New(opts *output.Options) (output.Output, error) {
Expand All @@ -39,12 +41,31 @@ func (o *Output) DialContext(ctx context.Context) error {
return err
}

o.conn = conn
o.conn = conn.(*tls.Conn)
return nil
}

func (o *Output) Close() error {
if o.conn != nil {
if err := o.conn.CloseWrite(); err != nil {
return err
}

// drain to facilitate graceful close on the other side
deadline := time.Now().Add(5 * time.Second)
if err := o.conn.SetReadDeadline(deadline); err != nil {
return err
}
buffer := make([]byte, 1024)
for {
_, err := o.conn.Read(buffer)
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
}

return o.conn.Close()
}
return nil
Expand Down

0 comments on commit df7b22c

Please sign in to comment.