From 1a4434f12e5dd3010c3554f2da6be3b03156b2b0 Mon Sep 17 00:00:00 2001 From: Andrei Pechkurov <37772591+puzpuzpuz@users.noreply.github.com> Date: Thu, 29 Jun 2023 21:29:27 +0300 Subject: [PATCH] chore(client): add pending message validation to Flush call (#16) --- .github/workflows/build.yml | 16 +++++++++------- sender.go | 25 ++++++++++++++++++++----- sender_test.go | 19 +++++++++++++++++++ 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3bfff0a..55f9e41 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -6,7 +6,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [1.17.x] + go-version: [1.20.x] name: Build with Go ${{ matrix.go-version }} steps: - name: Checkout repository @@ -21,13 +21,15 @@ jobs: stable: false go-version: ${{ matrix.go-version }} - - name: Install staticcheck - run: go get honnef.co/go/tools/cmd/staticcheck@2022.1.3 + - name: Run vet + run: go vet ./... - - name: Run linters - run: | - go vet ./... - staticcheck ./... + - name: Run staticcheck + uses: dominikh/staticcheck-action@v1.3.0 + with: + version: "2023.1.2" + install-go: false + cache-key: ${{ matrix.go-version }} - name: Run tests run: go test -v ./... diff --git a/sender.go b/sender.go index 345d155..af6c832 100644 --- a/sender.go +++ b/sender.go @@ -735,14 +735,15 @@ func (s *LineSender) At(ctx context.Context, ts int64) error { err := s.lastErr s.lastErr = nil if err != nil { - // Discard the partially written message. - s.buf.Truncate(s.lastMsgPos) + s.discardPendingMsg() return err } if !s.hasTable { + s.discardPendingMsg() return fmt.Errorf("table name was not provided: %w", ErrInvalidMsg) } if !s.hasTags && !s.hasFields { + s.discardPendingMsg() return fmt.Errorf("no symbols or columns were provided: %w", ErrInvalidMsg) } @@ -753,9 +754,7 @@ func (s *LineSender) At(ctx context.Context, ts int64) error { s.buf.WriteByte('\n') s.lastMsgPos = s.buf.Len() - s.hasTable = false - s.hasTags = false - s.hasFields = false + s.resetMsgFlags() if s.buf.Len() > s.bufCap { return s.Flush(ctx) @@ -776,8 +775,13 @@ func (s *LineSender) Flush(ctx context.Context) error { err := s.lastErr s.lastErr = nil if err != nil { + s.discardPendingMsg() return err } + if s.hasTable { + s.discardPendingMsg() + return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush") + } if err = ctx.Err(); err != nil { return err @@ -804,6 +808,17 @@ func (s *LineSender) Flush(ctx context.Context) error { return nil } +func (s *LineSender) discardPendingMsg() { + s.buf.Truncate(s.lastMsgPos) + s.resetMsgFlags() +} + +func (s *LineSender) resetMsgFlags() { + s.hasTable = false + s.hasTags = false + s.hasFields = false +} + // Messages returns a copy of accumulated ILP messages that are not // flushed to the TCP connection yet. Useful for debugging purposes. func (s *LineSender) Messages() string { diff --git a/sender_test.go b/sender_test.go index 43ae175..97bc2f3 100644 --- a/sender_test.go +++ b/sender_test.go @@ -309,6 +309,7 @@ func TestErrorOnLengthyNames(t *testing.T) { err = tc.writerFn(sender) assert.ErrorContains(t, err, tc.expectedErrMsg) + assert.Empty(t, sender.Messages()) sender.Close() srv.close() @@ -519,6 +520,24 @@ func TestErrorOnSymbolCallAfterColumn(t *testing.T) { } } +func TestErrorOnFlushWhenMessageIsPending(t *testing.T) { + ctx := context.Background() + + srv, err := newTestServer(readAndDiscard) + assert.NoError(t, err) + defer srv.close() + + sender, err := qdb.NewLineSender(ctx, qdb.WithAddress(srv.addr)) + assert.NoError(t, err) + defer sender.Close() + + sender.Table(testTable) + err = sender.Flush(ctx) + + assert.ErrorContains(t, err, "pending ILP message must be finalized with At or AtNow before calling Flush") + assert.Empty(t, sender.Messages()) +} + func TestInvalidMessageGetsDiscarded(t *testing.T) { ctx := context.Background()