diff --git a/go.mod b/go.mod index 27d01ffe..ac7eb33b 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.0 require ( github.com/DataDog/datadog-go/v5 v5.5.0 - github.com/artie-labs/transfer v1.27.13 + github.com/artie-labs/transfer v1.27.14 github.com/aws/aws-sdk-go-v2 v1.30.3 github.com/aws/aws-sdk-go-v2/config v1.27.27 github.com/aws/aws-sdk-go-v2/credentials v1.17.27 diff --git a/go.sum b/go.sum index 3c27a86a..a38f7f46 100644 --- a/go.sum +++ b/go.sum @@ -98,8 +98,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo= github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q= -github.com/artie-labs/transfer v1.27.13 h1:kTQs/x5TheufGZm1yznLTkUFer+jB8GymAepVPvyAd8= -github.com/artie-labs/transfer v1.27.13/go.mod h1:Lbrj8nz/cCq5BycDR++l3K+kc2GUbEnGRyrVDyA8MfM= +github.com/artie-labs/transfer v1.27.14 h1:wN4tBGJmcCKlvUacFgBgug+1XtuTiW9QbmmwSh6OcDc= +github.com/artie-labs/transfer v1.27.14/go.mod h1:Lbrj8nz/cCq5BycDR++l3K+kc2GUbEnGRyrVDyA8MfM= github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts= github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY= diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index 7533dc11..a3a6b221 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -147,7 +147,7 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error return nil } -func (b *BatchWriter) OnComplete() error { +func (b *BatchWriter) OnComplete(_ context.Context) error { return nil } diff --git a/writers/transfer/writer.go b/writers/transfer/writer.go index 717df136..0f6fd52d 100644 --- a/writers/transfer/writer.go +++ b/writers/transfer/writer.go @@ -105,7 +105,7 @@ func (w *Writer) messageToEvent(message lib.RawMessage) (event.Event, error) { return memoryEvent, nil } -func (w *Writer) Write(_ context.Context, messages []lib.RawMessage) error { +func (w *Writer) Write(ctx context.Context, messages []lib.RawMessage) error { if len(messages) == 0 { return nil } @@ -150,7 +150,7 @@ func (w *Writer) Write(_ context.Context, messages []lib.RawMessage) error { } if shouldFlush { - if err = w.flush(flushReason); err != nil { + if err = w.flush(ctx, flushReason); err != nil { return err } } @@ -170,7 +170,7 @@ func (w *Writer) getTableData() (string, *models.TableData, error) { return "", nil, fmt.Errorf("expected exactly one table") } -func (w *Writer) flush(reason string) error { +func (w *Writer) flush(ctx context.Context, reason string) error { tableName, tableData, err := w.getTableData() if err != nil { return err @@ -198,7 +198,7 @@ func (w *Writer) flush(reason string) error { tableData.ResetTempTableSuffix() if isMicrosoftSQLServer(w.destination) { // Microsoft SQL Server uses MERGE not append - if err = w.destination.Merge(tableData.TableData); err != nil { + if err = w.destination.Merge(ctx, tableData.TableData); err != nil { tags["what"] = "merge_fail" tags["retryable"] = fmt.Sprint(w.destination.IsRetryableError(err)) return fmt.Errorf("failed to merge data to destination: %w", err) @@ -210,7 +210,7 @@ func (w *Writer) flush(reason string) error { } tableData.InMemoryColumns().DeleteColumn(constants.OnlySetDeleteColumnMarker) - if err = w.destination.Append(tableData.TableData, isBigQuery(w.destination)); err != nil { + if err = w.destination.Append(ctx, tableData.TableData, isBigQuery(w.destination)); err != nil { tags["what"] = "merge_fail" tags["retryable"] = fmt.Sprint(w.destination.IsRetryableError(err)) return fmt.Errorf("failed to append data to destination: %w", err) @@ -221,12 +221,12 @@ func (w *Writer) flush(reason string) error { return nil } -func (w *Writer) OnComplete() error { +func (w *Writer) OnComplete(ctx context.Context) error { if len(w.primaryKeys) == 0 { return fmt.Errorf("primary keys not set") } - if err := w.flush("complete"); err != nil { + if err := w.flush(ctx, "complete"); err != nil { return fmt.Errorf("failed to flush: %w", err) } diff --git a/writers/writer.go b/writers/writer.go index 38013990..e06a7d11 100644 --- a/writers/writer.go +++ b/writers/writer.go @@ -12,7 +12,7 @@ import ( type DestinationWriter interface { Write(ctx context.Context, rawMsgs []lib.RawMessage) error - OnComplete() error + OnComplete(ctx context.Context) error } type Writer struct { @@ -57,7 +57,7 @@ func (w *Writer) Write(ctx context.Context, iter iterator.Iterator[[]lib.RawMess // Only run [OnComplete] if we wrote messages out. Otherwise, primary keys may not be loaded. if count > 0 { - if err := w.destinationWriter.OnComplete(); err != nil { + if err := w.destinationWriter.OnComplete(ctx); err != nil { return 0, fmt.Errorf("failed running destination OnComplete: %w", err) } } @@ -65,8 +65,8 @@ func (w *Writer) Write(ctx context.Context, iter iterator.Iterator[[]lib.RawMess return count, nil } -func (w *Writer) OnComplete() error { - if err := w.destinationWriter.OnComplete(); err != nil { +func (w *Writer) OnComplete(ctx context.Context) error { + if err := w.destinationWriter.OnComplete(ctx); err != nil { return fmt.Errorf("failed running destination OnComplete: %w", err) }