From 5cf34d265639871b65b5bbe95955614e6ea2b62a Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 18 Apr 2023 22:51:37 -0700 Subject: [PATCH] Commented out a test. --- models/memory.go | 44 +++++++++++++-------------- models/memory_test.go | 59 ++++++++++++++++++------------------- models/models_suite_test.go | 7 +++++ 3 files changed, 56 insertions(+), 54 deletions(-) diff --git a/models/memory.go b/models/memory.go index 323dba67b..7cc95595e 100644 --- a/models/memory.go +++ b/models/memory.go @@ -59,35 +59,18 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes } } - // Update the key, offset and TS - inMemoryDB.TableData[e.Table].RowsData[e.PrimaryKeyValue()] = e.Data - - // If the message is Kafka, then we only need the latest one - // If it's pubsub, we will store all of them in memory. This is because GCP pub/sub REQUIRES us to ack every single message - if message.Kind() == artie.Kafka { - inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = []artie.Message{message} - } else { - inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = append(inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()], message) - } - - inMemoryDB.TableData[e.Table].LatestCDCTs = e.ExecutionTime - - // Increment row count - inMemoryDB.TableData[e.Table].Rows += 1 - - // TODO: Test. // Update col if necessary sanitizedData := make(map[string]interface{}) - for col, val := range e.Data { + for _col, val := range e.Data { // columns need to all be normalized and lower cased. - newColName := strings.ToLower(col) + newColName := strings.ToLower(_col) // Columns here could contain spaces. Every destination treats spaces in a column differently. // So far, Snowflake accepts them when escaped properly, however BigQuery does not accept it. // Instead of making this more complicated for future destinations, we will escape the spaces by having double underscore `__` // So, if customers want to retrieve spaces again, they can replace `__`. var containsSpace bool - containsSpace, col = stringutil.EscapeSpaces(col) + containsSpace, newColName = stringutil.EscapeSpaces(newColName) if containsSpace { // Write the message back if the column has changed. sanitizedData[newColName] = val @@ -101,13 +84,13 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes // We are directly adding this column to our in-memory database // This ensures that this column exists, we just have an invalid value (so we will not replicate over). // However, this will ensure that we do not drop the column within the destination - inMemoryDB.TableData[e.Table].InMemoryColumns[col] = typing.Invalid + inMemoryDB.TableData[e.Table].InMemoryColumns[newColName] = typing.Invalid continue } - colTypeDetails, isOk := inMemoryDB.TableData[e.Table].InMemoryColumns[col] + colTypeDetails, isOk := inMemoryDB.TableData[e.Table].InMemoryColumns[newColName] if !isOk { - inMemoryDB.TableData[e.Table].InMemoryColumns[col] = typing.ParseValue(val) + inMemoryDB.TableData[e.Table].InMemoryColumns[newColName] = typing.ParseValue(val) } else { if colTypeDetails.Kind == typing.Invalid.Kind { // If colType is Invalid, let's see if we can update it to a better type @@ -115,7 +98,7 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes // However, it's important to create a column even if it's nil. // This is because we don't want to think that it's okay to drop a column in DWH if kindDetails := typing.ParseValue(val); kindDetails.Kind != typing.Invalid.Kind { - inMemoryDB.TableData[e.Table].InMemoryColumns[col] = kindDetails + inMemoryDB.TableData[e.Table].InMemoryColumns[newColName] = kindDetails } } } @@ -125,6 +108,19 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes // Swap out sanitizedData <> data. e.Data = sanitizedData + inMemoryDB.TableData[e.Table].RowsData[e.PrimaryKeyValue()] = e.Data + // If the message is Kafka, then we only need the latest one + // If it's pubsub, we will store all of them in memory. This is because GCP pub/sub REQUIRES us to ack every single message + if message.Kind() == artie.Kafka { + inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = []artie.Message{message} + } else { + inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = append(inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()], message) + } + + inMemoryDB.TableData[e.Table].LatestCDCTs = e.ExecutionTime + + // Increment row count + inMemoryDB.TableData[e.Table].Rows += 1 settings := config.FromContext(ctx) return inMemoryDB.TableData[e.Table].Rows > settings.Config.BufferRows, nil diff --git a/models/memory_test.go b/models/memory_test.go index 511c16d59..bcf7f5545 100644 --- a/models/memory_test.go +++ b/models/memory_test.go @@ -1,6 +1,7 @@ package models import ( + "fmt" "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" @@ -15,12 +16,12 @@ var topicConfig = &kafkalib.TopicConfig{ Schema: "public", } -func (m *ModelsTestSuite) SaveEvent() { +func (m *ModelsTestSuite) TestSaveEvent() { expectedCol := "rOBiN TaNG" - expectedLowerCol := "robin tang" + expectedLowerCol := "robin__tang" anotherCol := "DuStY tHE MINI aussie" - anotherLowerCol := "dusty the mini aussie" + anotherLowerCol := "dusty__the__mini__aussie" event := Event{ Table: "foo", @@ -73,32 +74,30 @@ func (m *ModelsTestSuite) SaveEvent() { val, isOk := GetMemoryDB().TableData["foo"].InMemoryColumns[badColumn] assert.True(m.T(), isOk) assert.Equal(m.T(), val, typing.Invalid) - - assert.False(m.T(), true) } -//func (m *ModelsTestSuite) TestEvent_SaveCasing() { -// assert.True(m.T(), false) -// -// event := Event{ -// Table: "foo", -// PrimaryKeyMap: map[string]interface{}{ -// "id": "123", -// }, -// Data: map[string]interface{}{ -// constants.DeleteColumnMarker: true, -// "randomCol": "dusty", -// "anotherCOL": 13.37, -// }, -// } -// -// kafkaMsg := kafka.Message{} -// _, err := event.Save(m.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) -// -// fmt.Println("inMemoryDB", inMemoryDB.TableData["foo"].RowsData) -// fmt.Println("here", event.Data) -// -// assert.False(m.T(), true) -// -// assert.Nil(m.T(), err) -//} +func (m *ModelsTestSuite) TestEvent_SaveCasing() { + event := Event{ + Table: "foo", + PrimaryKeyMap: map[string]interface{}{ + "id": "123", + }, + Data: map[string]interface{}{ + constants.DeleteColumnMarker: true, + "randomCol": "dusty", + "anotherCOL": 13.37, + }, + } + + kafkaMsg := kafka.Message{} + _, err := event.Save(m.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) + + rowData := inMemoryDB.TableData["foo"].RowsData[event.PrimaryKeyValue()] + expectedColumns := []string{"randomcol", "anothercol"} + for _, expectedColumn := range expectedColumns { + _, isOk := rowData[expectedColumn] + assert.True(m.T(), isOk, fmt.Sprintf("expected col: %s, rowsData: %v", expectedColumn, rowData)) + } + + assert.Nil(m.T(), err) +} diff --git a/models/models_suite_test.go b/models/models_suite_test.go index fb284b259..3b996f3d8 100644 --- a/models/models_suite_test.go +++ b/models/models_suite_test.go @@ -2,6 +2,7 @@ package models import ( "context" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/mocks" "github.com/stretchr/testify/suite" "testing" @@ -16,6 +17,12 @@ type ModelsTestSuite struct { func (m *ModelsTestSuite) SetupTest() { LoadMemoryDB() m.ctx = context.Background() + m.ctx = config.InjectSettingsIntoContext(m.ctx, &config.Settings{ + Config: &config.Config{ + FlushIntervalSeconds: 10, + BufferRows: 10, + }, + }) } func TestModelsTestSuite(t *testing.T) {