-
Notifications
You must be signed in to change notification settings - Fork 543
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
* Fix typo in comment. * Use more efficient `SpanLogger.DebugLog()` method in more places. * Add linting rule to forbid the direct use of CloseSend(). * Add documentation to CloseAndExhaust. * Update changelog entry. * Report correct error when closing loop fails * Avoid reading entire store-gateway query stream response when one or more calls fail. * Silence linting warnings. * Ensure CloseAndExhaust never blocks forever. * Fix race in ingester streaming tests: there's no guarantee we'll close the stream synchronously after encountering an error * Address PR feedback: reduce timeout.
- Loading branch information
1 parent
0f74525
commit 0103407
Showing
10 changed files
with
128 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package util | ||
|
||
import ( | ||
"errors" | ||
"io" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestCloseAndExhaust(t *testing.T) { | ||
t.Run("CloseSend returns an error", func(t *testing.T) { | ||
expectedErr := errors.New("something went wrong") | ||
stream := &mockStream{closeSendError: expectedErr} | ||
|
||
actualErr := CloseAndExhaust[string](stream) | ||
require.Equal(t, expectedErr, actualErr) | ||
}) | ||
|
||
t.Run("Recv returns error immediately", func(t *testing.T) { | ||
stream := &mockStream{recvErrors: []error{io.EOF}} | ||
err := CloseAndExhaust[string](stream) | ||
require.NoError(t, err, "CloseAndExhaust should ignore errors from Recv()") | ||
}) | ||
|
||
t.Run("Recv returns error after multiple calls", func(t *testing.T) { | ||
stream := &mockStream{recvErrors: []error{nil, nil, io.EOF}} | ||
err := CloseAndExhaust[string](stream) | ||
require.NoError(t, err, "CloseAndExhaust should ignore errors from Recv()") | ||
}) | ||
|
||
t.Run("Recv blocks forever", func(t *testing.T) { | ||
stream := &mockStream{} | ||
returned := make(chan error) | ||
|
||
go func() { | ||
returned <- CloseAndExhaust[string](stream) | ||
}() | ||
|
||
select { | ||
case err := <-returned: | ||
require.Equal(t, ErrCloseAndExhaustTimedOut, err) | ||
case <-time.After(500 * time.Millisecond): | ||
require.FailNow(t, "expected CloseAndExhaust to time out waiting for Recv() to return, but it did not") | ||
} | ||
}) | ||
} | ||
|
||
type mockStream struct { | ||
closeSendError error | ||
recvErrors []error | ||
} | ||
|
||
func (m *mockStream) CloseSend() error { | ||
return m.closeSendError | ||
} | ||
|
||
func (m *mockStream) Recv() (string, error) { | ||
if len(m.recvErrors) == 0 { | ||
// Block forever. | ||
<-make(chan struct{}) | ||
} | ||
|
||
err := m.recvErrors[0] | ||
m.recvErrors = m.recvErrors[1:] | ||
|
||
return "", err | ||
} |