Skip to content

Commit

Permalink
Ensure CloseAndExhaust never blocks forever.
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Oct 30, 2023
1 parent 973d04f commit 2635d15
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 8 deletions.
36 changes: 28 additions & 8 deletions pkg/util/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,45 @@

package util

import (
"errors"
"time"
)

var ErrCloseAndExhaustTimedOut = errors.New("timed out waiting to exhaust stream after calling CloseSend, will continue exhausting stream in background")

type Stream[T any] interface {
CloseSend() error
Recv() (T, error)
}

// CloseAndExhaust closes and exhausts stream to ensure:
// - the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream)
// - instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled"
// CloseAndExhaust closes and then tries to exhaust stream. This ensures:
// - the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream)
// - instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled"
//
// Note that this method may block if the stream has not already been exhausted (successfully or otherwise).
// Note that this method may block for up to three seconds if the stream has not already been exhausted.
// If the stream has not been exhausted after this time, it will return ErrCloseAndExhaustTimedOut and continue exhausting the stream in the background.
func CloseAndExhaust[T any](stream Stream[T]) error {
err := stream.CloseSend() //nolint:forbidigo // This is the one place we want to call this method.
if err != nil {
return err
}

for err == nil {
_, err = stream.Recv()
}
done := make(chan struct{})

return nil
go func() {
for {
if _, err := stream.Recv(); err != nil {
close(done)
return
}
}
}()

select {
case <-done:
return nil
case <-time.After(3 * time.Second):
return ErrCloseAndExhaustTimedOut
}
}
71 changes: 71 additions & 0 deletions pkg/util/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// SPDX-License-Identifier: AGPL-3.0-only

package util

import (
"errors"
"io"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestCloseAndExhaust(t *testing.T) {
t.Run("CloseSend returns an error", func(t *testing.T) {
expectedErr := errors.New("something went wrong")
stream := &mockStream{closeSendError: expectedErr}

actualErr := CloseAndExhaust[string](stream)
require.Equal(t, expectedErr, actualErr)
})

t.Run("Recv returns error immediately", func(t *testing.T) {
stream := &mockStream{recvErrors: []error{io.EOF}}
err := CloseAndExhaust[string](stream)
require.NoError(t, err, "CloseAndExhaust should ignore errors from Recv()")
})

t.Run("Recv returns error after multiple calls", func(t *testing.T) {
stream := &mockStream{recvErrors: []error{nil, nil, io.EOF}}
err := CloseAndExhaust[string](stream)
require.NoError(t, err, "CloseAndExhaust should ignore errors from Recv()")
})

t.Run("Recv blocks forever", func(t *testing.T) {
stream := &mockStream{}
returned := make(chan error)

go func() {
returned <- CloseAndExhaust[string](stream)
}()

select {
case err := <-returned:
require.Equal(t, ErrCloseAndExhaustTimedOut, err)
case <-time.After(5 * time.Second):
require.FailNow(t, "expected CloseAndExhaust to time out waiting for Recv() to return, but it did not")
}
})
}

type mockStream struct {
closeSendError error
recvErrors []error
}

func (m *mockStream) CloseSend() error {
return m.closeSendError
}

func (m *mockStream) Recv() (string, error) {
if len(m.recvErrors) == 0 {
// Block forever.
<-make(chan struct{})
}

err := m.recvErrors[0]
m.recvErrors = m.recvErrors[1:]

return "", err
}

0 comments on commit 2635d15

Please sign in to comment.