Skip to content

Commit

Permalink
chore(client): add pending message validation to Flush call (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz authored Jun 29, 2023
1 parent 464ba3e commit 1a4434f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 12 deletions.
16 changes: 9 additions & 7 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.17.x]
go-version: [1.20.x]
name: Build with Go ${{ matrix.go-version }}
steps:
- name: Checkout repository
Expand All @@ -21,13 +21,15 @@ jobs:
stable: false
go-version: ${{ matrix.go-version }}

- name: Install staticcheck
run: go get honnef.co/go/tools/cmd/[email protected]
- name: Run vet
run: go vet ./...

- name: Run linters
run: |
go vet ./...
staticcheck ./...
- name: Run staticcheck
uses: dominikh/[email protected]
with:
version: "2023.1.2"
install-go: false
cache-key: ${{ matrix.go-version }}

- name: Run tests
run: go test -v ./...
25 changes: 20 additions & 5 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,14 +735,15 @@ func (s *LineSender) At(ctx context.Context, ts int64) error {
err := s.lastErr
s.lastErr = nil
if err != nil {
// Discard the partially written message.
s.buf.Truncate(s.lastMsgPos)
s.discardPendingMsg()
return err
}
if !s.hasTable {
s.discardPendingMsg()
return fmt.Errorf("table name was not provided: %w", ErrInvalidMsg)
}
if !s.hasTags && !s.hasFields {
s.discardPendingMsg()
return fmt.Errorf("no symbols or columns were provided: %w", ErrInvalidMsg)
}

Expand All @@ -753,9 +754,7 @@ func (s *LineSender) At(ctx context.Context, ts int64) error {
s.buf.WriteByte('\n')

s.lastMsgPos = s.buf.Len()
s.hasTable = false
s.hasTags = false
s.hasFields = false
s.resetMsgFlags()

if s.buf.Len() > s.bufCap {
return s.Flush(ctx)
Expand All @@ -776,8 +775,13 @@ func (s *LineSender) Flush(ctx context.Context) error {
err := s.lastErr
s.lastErr = nil
if err != nil {
s.discardPendingMsg()
return err
}
if s.hasTable {
s.discardPendingMsg()
return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
}

if err = ctx.Err(); err != nil {
return err
Expand All @@ -804,6 +808,17 @@ func (s *LineSender) Flush(ctx context.Context) error {
return nil
}

func (s *LineSender) discardPendingMsg() {
s.buf.Truncate(s.lastMsgPos)
s.resetMsgFlags()
}

func (s *LineSender) resetMsgFlags() {
s.hasTable = false
s.hasTags = false
s.hasFields = false
}

// Messages returns a copy of accumulated ILP messages that are not
// flushed to the TCP connection yet. Useful for debugging purposes.
func (s *LineSender) Messages() string {
Expand Down
19 changes: 19 additions & 0 deletions sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func TestErrorOnLengthyNames(t *testing.T) {

err = tc.writerFn(sender)
assert.ErrorContains(t, err, tc.expectedErrMsg)
assert.Empty(t, sender.Messages())

sender.Close()
srv.close()
Expand Down Expand Up @@ -519,6 +520,24 @@ func TestErrorOnSymbolCallAfterColumn(t *testing.T) {
}
}

func TestErrorOnFlushWhenMessageIsPending(t *testing.T) {
ctx := context.Background()

srv, err := newTestServer(readAndDiscard)
assert.NoError(t, err)
defer srv.close()

sender, err := qdb.NewLineSender(ctx, qdb.WithAddress(srv.addr))
assert.NoError(t, err)
defer sender.Close()

sender.Table(testTable)
err = sender.Flush(ctx)

assert.ErrorContains(t, err, "pending ILP message must be finalized with At or AtNow before calling Flush")
assert.Empty(t, sender.Messages())
}

func TestInvalidMessageGetsDiscarded(t *testing.T) {
ctx := context.Background()

Expand Down

0 comments on commit 1a4434f

Please sign in to comment.