Skip to content

Commit

Permalink
Commented out a test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Apr 19, 2023
1 parent 04b8f73 commit 5cf34d2
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 54 deletions.
44 changes: 20 additions & 24 deletions models/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,35 +59,18 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes
}
}

// Update the key, offset and TS
inMemoryDB.TableData[e.Table].RowsData[e.PrimaryKeyValue()] = e.Data

// If the message is Kafka, then we only need the latest one
// If it's pubsub, we will store all of them in memory. This is because GCP pub/sub REQUIRES us to ack every single message
if message.Kind() == artie.Kafka {
inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = []artie.Message{message}
} else {
inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = append(inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()], message)
}

inMemoryDB.TableData[e.Table].LatestCDCTs = e.ExecutionTime

// Increment row count
inMemoryDB.TableData[e.Table].Rows += 1

// TODO: Test.
// Update col if necessary
sanitizedData := make(map[string]interface{})
for col, val := range e.Data {
for _col, val := range e.Data {
// columns need to all be normalized and lower cased.
newColName := strings.ToLower(col)
newColName := strings.ToLower(_col)

// Columns here could contain spaces. Every destination treats spaces in a column differently.
// So far, Snowflake accepts them when escaped properly, however BigQuery does not accept it.
// Instead of making this more complicated for future destinations, we will escape the spaces by having double underscore `__`
// So, if customers want to retrieve spaces again, they can replace `__`.
var containsSpace bool
containsSpace, col = stringutil.EscapeSpaces(col)
containsSpace, newColName = stringutil.EscapeSpaces(newColName)
if containsSpace {
// Write the message back if the column has changed.
sanitizedData[newColName] = val
Expand All @@ -101,21 +84,21 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes
// We are directly adding this column to our in-memory database
// This ensures that this column exists, we just have an invalid value (so we will not replicate over).
// However, this will ensure that we do not drop the column within the destination
inMemoryDB.TableData[e.Table].InMemoryColumns[col] = typing.Invalid
inMemoryDB.TableData[e.Table].InMemoryColumns[newColName] = typing.Invalid
continue
}

colTypeDetails, isOk := inMemoryDB.TableData[e.Table].InMemoryColumns[col]
colTypeDetails, isOk := inMemoryDB.TableData[e.Table].InMemoryColumns[newColName]
if !isOk {
inMemoryDB.TableData[e.Table].InMemoryColumns[col] = typing.ParseValue(val)
inMemoryDB.TableData[e.Table].InMemoryColumns[newColName] = typing.ParseValue(val)
} else {
if colTypeDetails.Kind == typing.Invalid.Kind {
// If colType is Invalid, let's see if we can update it to a better type
// If everything is nil, we don't need to add a column
// However, it's important to create a column even if it's nil.
// This is because we don't want to think that it's okay to drop a column in DWH
if kindDetails := typing.ParseValue(val); kindDetails.Kind != typing.Invalid.Kind {
inMemoryDB.TableData[e.Table].InMemoryColumns[col] = kindDetails
inMemoryDB.TableData[e.Table].InMemoryColumns[newColName] = kindDetails
}
}
}
Expand All @@ -125,6 +108,19 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes

// Swap out sanitizedData <> data.
e.Data = sanitizedData
inMemoryDB.TableData[e.Table].RowsData[e.PrimaryKeyValue()] = e.Data
// If the message is Kafka, then we only need the latest one
// If it's pubsub, we will store all of them in memory. This is because GCP pub/sub REQUIRES us to ack every single message
if message.Kind() == artie.Kafka {
inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = []artie.Message{message}
} else {
inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = append(inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()], message)
}

inMemoryDB.TableData[e.Table].LatestCDCTs = e.ExecutionTime

// Increment row count
inMemoryDB.TableData[e.Table].Rows += 1

settings := config.FromContext(ctx)
return inMemoryDB.TableData[e.Table].Rows > settings.Config.BufferRows, nil
Expand Down
59 changes: 29 additions & 30 deletions models/memory_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"fmt"
"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
Expand All @@ -15,12 +16,12 @@ var topicConfig = &kafkalib.TopicConfig{
Schema: "public",
}

func (m *ModelsTestSuite) SaveEvent() {
func (m *ModelsTestSuite) TestSaveEvent() {
expectedCol := "rOBiN TaNG"
expectedLowerCol := "robin tang"
expectedLowerCol := "robin__tang"

anotherCol := "DuStY tHE MINI aussie"
anotherLowerCol := "dusty the mini aussie"
anotherLowerCol := "dusty__the__mini__aussie"

event := Event{
Table: "foo",
Expand Down Expand Up @@ -73,32 +74,30 @@ func (m *ModelsTestSuite) SaveEvent() {
val, isOk := GetMemoryDB().TableData["foo"].InMemoryColumns[badColumn]
assert.True(m.T(), isOk)
assert.Equal(m.T(), val, typing.Invalid)

assert.False(m.T(), true)
}

//func (m *ModelsTestSuite) TestEvent_SaveCasing() {
// assert.True(m.T(), false)
//
// event := Event{
// Table: "foo",
// PrimaryKeyMap: map[string]interface{}{
// "id": "123",
// },
// Data: map[string]interface{}{
// constants.DeleteColumnMarker: true,
// "randomCol": "dusty",
// "anotherCOL": 13.37,
// },
// }
//
// kafkaMsg := kafka.Message{}
// _, err := event.Save(m.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
//
// fmt.Println("inMemoryDB", inMemoryDB.TableData["foo"].RowsData)
// fmt.Println("here", event.Data)
//
// assert.False(m.T(), true)
//
// assert.Nil(m.T(), err)
//}
func (m *ModelsTestSuite) TestEvent_SaveCasing() {
event := Event{
Table: "foo",
PrimaryKeyMap: map[string]interface{}{
"id": "123",
},
Data: map[string]interface{}{
constants.DeleteColumnMarker: true,
"randomCol": "dusty",
"anotherCOL": 13.37,
},
}

kafkaMsg := kafka.Message{}
_, err := event.Save(m.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))

rowData := inMemoryDB.TableData["foo"].RowsData[event.PrimaryKeyValue()]
expectedColumns := []string{"randomcol", "anothercol"}
for _, expectedColumn := range expectedColumns {
_, isOk := rowData[expectedColumn]
assert.True(m.T(), isOk, fmt.Sprintf("expected col: %s, rowsData: %v", expectedColumn, rowData))
}

assert.Nil(m.T(), err)
}
7 changes: 7 additions & 0 deletions models/models_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package models

import (
"context"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/mocks"
"github.com/stretchr/testify/suite"
"testing"
Expand All @@ -16,6 +17,12 @@ type ModelsTestSuite struct {
func (m *ModelsTestSuite) SetupTest() {
LoadMemoryDB()
m.ctx = context.Background()
m.ctx = config.InjectSettingsIntoContext(m.ctx, &config.Settings{
Config: &config.Config{
FlushIntervalSeconds: 10,
BufferRows: 10,
},
})
}

func TestModelsTestSuite(t *testing.T) {
Expand Down

0 comments on commit 5cf34d2

Please sign in to comment.