From 8732a16ab8de10003224a3fe3dfe0ae9c70273ee Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 19 Jul 2024 17:28:24 -0700 Subject: [PATCH] Upgrading Artie Transfer (#439) --- go.mod | 2 +- go.sum | 2 ++ writers/transfer/writer.go | 1 + writers/transfer/writer_test.go | 24 +++++++++++++++++------- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 3d0c601b..e0b86dbb 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22.4 require ( github.com/DataDog/datadog-go/v5 v5.5.0 - github.com/artie-labs/transfer v1.25.39 + github.com/artie-labs/transfer v1.25.42 github.com/aws/aws-sdk-go v1.44.327 github.com/aws/aws-sdk-go-v2 v1.18.1 github.com/cockroachdb/apd/v3 v3.2.1 diff --git a/go.sum b/go.sum index 5c0524f4..df25cfc5 100644 --- a/go.sum +++ b/go.sum @@ -97,6 +97,8 @@ github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo= github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q= github.com/artie-labs/transfer v1.25.39 h1:hFK1BhlXt4JCxcZF42MgLkpjjWW6wcSzrpKEly1qeOA= github.com/artie-labs/transfer v1.25.39/go.mod h1:BClIu43kgZgqyx0Rq/vrm9EP7tdz61ou9a3adHrzcOc= +github.com/artie-labs/transfer v1.25.42 h1:PXMR0D840yM0zs3M/nk589/TK2tjXS7n+c4n67mx/qc= +github.com/artie-labs/transfer v1.25.42/go.mod h1:BClIu43kgZgqyx0Rq/vrm9EP7tdz61ou9a3adHrzcOc= github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go v1.44.327 h1:ZS8oO4+7MOBLhkdwIhgtVeDzCeWOlTfKJS7EgggbIEY= github.com/aws/aws-sdk-go v1.44.327/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= diff --git a/writers/transfer/writer.go b/writers/transfer/writer.go index f9b17f31..0249aa89 100644 --- a/writers/transfer/writer.go +++ b/writers/transfer/writer.go @@ -209,6 +209,7 @@ func (w *Writer) flush(reason string) error { tableData.InMemoryColumns().DeleteColumn(constants.DeleteColumnMarker) } + tableData.InMemoryColumns().DeleteColumn(constants.OnlySetDeleteColumnMarker) if err = w.destination.Append(tableData.TableData, isBigQuery(w.destination)); err != nil { tags["what"] = "merge_fail" tags["retryable"] = fmt.Sprint(w.destination.IsRetryableError(err)) diff --git a/writers/transfer/writer_test.go b/writers/transfer/writer_test.go index 6e4115cf..99849d14 100644 --- a/writers/transfer/writer_test.go +++ b/writers/transfer/writer_test.go @@ -36,15 +36,25 @@ func TestWriter_MessageToEvent(t *testing.T) { CDCKeyFormat: kafkalib.JSONKeyFmt, }, } + evtOut, err := writer.messageToEvent(message) assert.NoError(t, err) - assert.Equal(t, map[string]any{ - "__artie_delete": false, - "_id": "507f1f77bcf86cd799439011", - "double": 3.14159, - "int64": int32(1234567890), - "string": "Hello, world!", - }, evtOut.Data) + for expectedKey, expectedValue := range map[string]any{ + "__artie_delete": false, + "__artie_only_set_delete": false, + "_id": "507f1f77bcf86cd799439011", + "double": 3.14159, + "int64": int32(1234567890), + "string": "Hello, world!", + } { + actualValue, isOk := evtOut.Data[expectedKey] + assert.True(t, isOk, expectedKey) + assert.Equal(t, expectedValue, actualValue, expectedKey) + + delete(evtOut.Data, expectedKey) + } + + assert.Empty(t, evtOut.Data) assert.Equal(t, map[string]any{"_id": objId.Hex()}, evtOut.PrimaryKeyMap) }