From 49756018d5244d5d611ca35e2fdc2047249fe535 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 8 May 2023 16:39:54 -0700 Subject: [PATCH] Flush rules to be cognizant of in-memory database size (#89) --- Makefile | 5 ++ lib/config/config.go | 45 +++++++++----- lib/config/config_test.go | 7 +++ lib/optimization/event.go | 22 +++++++ lib/optimization/event_bench_test.go | 56 ++++++++++++++++++ lib/optimization/event_insert_test.go | 84 +++++++++++++++++++++++++++ lib/optimization/event_test.go | 76 ++++++++++++++++++------ lib/size/size.go | 14 +++++ lib/size/size_bench_test.go | 63 ++++++++++++++++++++ lib/size/size_test.go | 34 +++++++++++ main.go | 1 + models/memory.go | 5 +- processes/consumer/process_test.go | 1 + 13 files changed, 375 insertions(+), 38 deletions(-) create mode 100644 lib/optimization/event_bench_test.go create mode 100644 lib/optimization/event_insert_test.go create mode 100644 lib/size/size.go create mode 100644 lib/size/size_bench_test.go create mode 100644 lib/size/size_test.go diff --git a/Makefile b/Makefile index 6c11ebb55..eec83b40e 100644 --- a/Makefile +++ b/Makefile @@ -24,3 +24,8 @@ build: .PHONY: release release: goreleaser release --clean + +.PHONY: bench_size +bench_size: + go test ./lib/size -bench=Bench -benchtime=10s + go test ./lib/optimization -bench=Bench -benchtime=10s diff --git a/lib/config/config.go b/lib/config/config.go index cd8a80b32..6f889e745 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -14,6 +14,7 @@ import ( const ( defaultFlushTimeSeconds = 10 + defaultFlushSizeKb = 25 * 1024 // 25 mb flushIntervalSecondsStart = 5 flushIntervalSecondsEnd = 6 * 60 * 60 @@ -71,13 +72,19 @@ func (k *Kafka) String() string { } type Config struct { - Output constants.DestinationKind `yaml:"outputSource"` - Queue constants.QueueKind `yaml:"queue"` - FlushIntervalSeconds int `yaml:"flushIntervalSeconds"` - BufferRows uint `yaml:"bufferRows"` - Pubsub *Pubsub - Kafka *Kafka + Output constants.DestinationKind `yaml:"outputSource"` + Queue constants.QueueKind `yaml:"queue"` + // Flush rules + FlushIntervalSeconds int `yaml:"flushIntervalSeconds"` + FlushSizeKb int `yaml:"flushSizeKb"` + BufferRows uint `yaml:"bufferRows"` + + // Supported message queues + Pubsub *Pubsub + Kafka *Kafka + + // Supported destinations BigQuery *BigQuery `yaml:"bigquery"` Snowflake *Snowflake `yaml:"snowflake"` @@ -126,6 +133,10 @@ func readFileToConfig(pathToConfig string) (*Config, error) { config.BufferRows = bufferPoolSizeEnd } + if config.FlushSizeKb == 0 { + config.FlushSizeKb = defaultFlushSizeKb + } + return &config, nil } @@ -137,50 +148,54 @@ func (c *Config) Validate() error { return fmt.Errorf("config is nil") } + if c.FlushSizeKb <= 0 { + return fmt.Errorf("config is invalid, flush size pool has to be a positive number, current value: %v", c.FlushSizeKb) + } + if !numbers.BetweenEq(flushIntervalSecondsStart, flushIntervalSecondsEnd, c.FlushIntervalSeconds) { - return fmt.Errorf("flush interval is outside of our range, seconds: %v, expected start: %v, end: %v", + return fmt.Errorf("config is invalid, flush interval is outside of our range, seconds: %v, expected start: %v, end: %v", c.FlushIntervalSeconds, flushIntervalSecondsStart, flushIntervalSecondsEnd) } if !numbers.BetweenEq(bufferPoolSizeStart, bufferPoolSizeEnd, int(c.BufferRows)) { - return fmt.Errorf("buffer pool is outside of our range: %v, expected start: %v, end: %v", + return fmt.Errorf("config is invalid, buffer pool is outside of our range: %v, expected start: %v, end: %v", c.BufferRows, bufferPoolSizeStart, bufferPoolSizeEnd) } if !constants.IsValidDestination(c.Output) { - return fmt.Errorf("output: %s is invalid", c.Output) + return fmt.Errorf("config is invalid, output: %s is invalid", c.Output) } if c.Queue == constants.Kafka { if c.Kafka == nil || len(c.Kafka.TopicConfigs) == 0 { - return fmt.Errorf("no kafka topic configs, kafka: %v", c.Kafka) + return fmt.Errorf("config is invalid, no kafka topic configs, kafka: %v", c.Kafka) } for _, topicConfig := range c.Kafka.TopicConfigs { if valid := topicConfig.Valid(); !valid { - return fmt.Errorf("topic config is invalid, tc: %s", topicConfig.String()) + return fmt.Errorf("config is invalid, topic config is invalid, tc: %s", topicConfig.String()) } } // Username and password are not required (if it's within the same VPC or connecting locally if array.Empty([]string{c.Kafka.GroupID, c.Kafka.BootstrapServer}) { - return fmt.Errorf("kafka settings is invalid, kafka: %s", c.Kafka.String()) + return fmt.Errorf("config is invalid, kafka settings is invalid, kafka: %s", c.Kafka.String()) } } if c.Queue == constants.PubSub { if c.Pubsub == nil || len(c.Pubsub.TopicConfigs) == 0 { - return fmt.Errorf("no pubsub topic configs, pubsub: %v", c.Pubsub) + return fmt.Errorf("config is invalid, no pubsub topic configs, pubsub: %v", c.Pubsub) } for _, topicConfig := range c.Pubsub.TopicConfigs { if valid := topicConfig.Valid(); !valid { - return fmt.Errorf("topic config is invalid, tc: %s", topicConfig.String()) + return fmt.Errorf("config is invalid, topic config is invalid, tc: %s", topicConfig.String()) } } if array.Empty([]string{c.Pubsub.ProjectID, c.Pubsub.PathToCredentials}) { - return fmt.Errorf("pubsub settings is invalid, pubsub: %s", c.Pubsub.String()) + return fmt.Errorf("config is invalid, pubsub settings is invalid, pubsub: %s", c.Pubsub.String()) } } diff --git a/lib/config/config_test.go b/lib/config/config_test.go index d83706099..30b363bac 100644 --- a/lib/config/config_test.go +++ b/lib/config/config_test.go @@ -412,6 +412,7 @@ func TestConfig_Validate(t *testing.T) { cfg.Output = constants.Snowflake cfg.Queue = constants.PubSub + cfg.FlushSizeKb = 5 assert.Contains(t, cfg.Validate().Error(), "no pubsub topic configs") pubsub.TopicConfigs = []*kafkalib.TopicConfig{ @@ -448,4 +449,10 @@ func TestConfig_Validate(t *testing.T) { cfg.BufferRows = 500 cfg.FlushIntervalSeconds = 600 assert.Nil(t, cfg.Validate()) + + for _, num := range []int{-500, -300, -5, 0} { + cfg.FlushSizeKb = num + assert.Contains(t, cfg.Validate().Error(), "config is invalid, flush size pool has to be a positive number") + } + } diff --git a/lib/optimization/event.go b/lib/optimization/event.go index 2ab20f39a..24e95bff4 100644 --- a/lib/optimization/event.go +++ b/lib/optimization/event.go @@ -1,8 +1,11 @@ package optimization import ( + "context" "github.com/artie-labs/transfer/lib/artie" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/artie-labs/transfer/lib/size" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" "strings" @@ -22,6 +25,7 @@ type TableData struct { // This is used for the automatic schema detection LatestCDCTs time.Time + approxSize int } func NewTableData(inMemoryColumns *typing.Columns, primaryKeys []string, topicConfig kafkalib.TopicConfig) *TableData { @@ -35,7 +39,20 @@ func NewTableData(inMemoryColumns *typing.Columns, primaryKeys []string, topicCo } +// InsertRow creates a single entrypoint for how rows get added to TableData +// This is important to avoid concurrent r/w, but also the ability for us to add or decrement row size by keeping a running total +// With this, we are able to reduce the latency by 500x+ on a 5k row table. See event_bench_test.go vs. size_bench_test.go func (t *TableData) InsertRow(pk string, rowData map[string]interface{}) { + newRowSize := size.GetApproxSize(rowData) + prevRow, isOk := t.rowsData[pk] + var prevRowSize int + if isOk { + // Since the new row is taking over, let's update the approx size. + prevRowSize = size.GetApproxSize(prevRow) + } + + // If prevRow doesn't exist, it'll be 0, which is a no-op. + t.approxSize += newRowSize - prevRowSize t.rowsData[pk] = rowData return } @@ -58,6 +75,11 @@ func (t *TableData) Rows() uint { return uint(len(t.rowsData)) } +func (t *TableData) ShouldFlush(ctx context.Context) bool { + settings := config.FromContext(ctx) + return t.Rows() > settings.Config.BufferRows || t.approxSize > settings.Config.FlushSizeKb * 1024 +} + // UpdateInMemoryColumnsFromDestination - When running Transfer, we will have 2 column types. // 1) TableData (constructed in-memory) // 2) TableConfig (coming from the SQL DESCRIBE or equivalent statement) from the destination diff --git a/lib/optimization/event_bench_test.go b/lib/optimization/event_bench_test.go new file mode 100644 index 000000000..52840d5ed --- /dev/null +++ b/lib/optimization/event_bench_test.go @@ -0,0 +1,56 @@ +package optimization + +import ( + "fmt" + "github.com/artie-labs/transfer/lib/kafkalib" + "testing" + "time" +) + +func BenchmarkTableData_ApproxSize_TallTable(b *testing.B) { + td := NewTableData(nil, nil, kafkalib.TopicConfig{}) + for n := 0; n < b.N; n++ { + td.InsertRow(fmt.Sprint(n), map[string]interface{}{ + "id": n, + "name": "Robin", + "dog": "dusty the mini aussie", + }) + } +} + +func BenchmarkTableData_ApproxSize_WideTable(b *testing.B) { + td := NewTableData(nil, nil, kafkalib.TopicConfig{}) + for n := 0; n < b.N; n++ { + td.InsertRow(fmt.Sprint(n), map[string]interface{}{ + "id": n, + "name": "Robin", + "dog": "dusty the mini aussie", + "favorite_fruits": []string{"strawberry", "kiwi", "oranges"}, + "random": false, + "team": []string{"charlie", "jacqueline"}, + "email": "robin@artie.so", + "favorite_languages": []string{"go", "sql"}, + "favorite_databases": []string{"postgres", "bigtable"}, + "created_at": time.Now(), + "updated_at": time.Now(), + "negative_number": -500, + "nestedObject": map[string]interface{}{ + "foo": "bar", + "abc": "def", + }, + "array_of_objects": []map[string]interface{}{ + { + "foo": "bar", + }, + { + "foo_nested": map[string]interface{}{ + "foo_foo": "bar_bar", + }, + }, + }, + "is_deleted": false, + "lorem_ipsum": "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec elementum aliquet mi at efficitur. Praesent at erat ac elit faucibus convallis. Donec fermentum tellus eu nunc ornare, non convallis justo facilisis. In hac habitasse platea dictumst. Praesent eu ante vitae erat semper finibus eget ac mauris. Duis gravida cursus enim, nec sagittis arcu placerat sed. Integer semper orci justo, nec rhoncus libero convallis sed.", + "lorem_ipsum2": "Fusce vitae elementum tortor. Vestibulum consectetur ante id nibh ullamcorper, quis sodales turpis tempor. Duis pellentesque suscipit nibh porta posuere. In libero massa, efficitur at ultricies sit amet, vulputate ac ante. In euismod erat eget nulla blandit pretium. Ut tempor ante vel congue venenatis. Vestibulum at metus nec nibh iaculis consequat suscipit ac leo. Maecenas vitae rutrum nulla, quis ultrices justo. Aliquam ipsum ex, luctus ac diam eget, tempor tempor risus.", + }) + } +} diff --git a/lib/optimization/event_insert_test.go b/lib/optimization/event_insert_test.go new file mode 100644 index 000000000..b426a6cb3 --- /dev/null +++ b/lib/optimization/event_insert_test.go @@ -0,0 +1,84 @@ +package optimization + +import ( + "fmt" + "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/artie-labs/transfer/lib/size" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestTableData_InsertRow(t *testing.T) { + td := NewTableData(nil, nil, kafkalib.TopicConfig{}) + assert.Equal(t, 0, int(td.Rows())) + + // See if we can add rows to the private method. + td.RowsData()["foo"] = map[string]interface{}{ + "foo": "bar", + } + + assert.Equal(t, 0, int(td.Rows())) + + // Now insert the right way. + td.InsertRow("foo", map[string]interface{}{ + "foo": "bar", + }) + + assert.Equal(t, 1, int(td.Rows())) +} + +func TestTableData_InsertRowApproxSize(t *testing.T) { + // In this test, we'll insert 1000 rows, update X and then delete Y + // Does the size then match up? We will iterate over a map to take advantage of the in-deterministic ordering of a map + // So we can test multiple updates, deletes, etc. + td := NewTableData(nil, nil, kafkalib.TopicConfig{}) + numInsertRows := 1000 + numUpdateRows := 420 + numDeleteRows := 250 + + for i := 0; i < numInsertRows; i ++ { + td.InsertRow(fmt.Sprint(i), map[string]interface{}{ + "foo": "bar", + "array": []int{1, 2, 3, 4, 5}, + "boolean": true, + "nested_object": map[string]interface{}{ + "nested": map[string]interface{}{ + "foo": "bar", + "true": false, + }, + }, + }) + } + + var updateCount int + for updateKey := range td.RowsData() { + updateCount += 1 + td.InsertRow(updateKey, map[string]interface{}{ + "foo": "foo", + "bar": "bar", + }) + + if updateCount > numUpdateRows { + break + } + } + + var deleteCount int + for deleteKey := range td.RowsData() { + deleteCount += 1 + td.InsertRow(deleteKey, map[string]interface{}{ + "__artie_deleted": true, + }) + + if deleteCount > numDeleteRows { + break + } + } + + var actualSize int + for _, rowData := range td.RowsData() { + actualSize += size.GetApproxSize(rowData) + } + + assert.Equal(t, td.approxSize, actualSize) +} diff --git a/lib/optimization/event_test.go b/lib/optimization/event_test.go index f3f6cc2dd..a2e8e19ca 100644 --- a/lib/optimization/event_test.go +++ b/lib/optimization/event_test.go @@ -1,6 +1,9 @@ package optimization import ( + "context" + "fmt" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" @@ -9,25 +12,6 @@ import ( "time" ) -func TestTableData_InsertRow(t *testing.T) { - td := NewTableData(nil, nil, kafkalib.TopicConfig{}) - assert.Equal(t, 0, int(td.Rows())) - - // See if we can add rows to the private method. - td.RowsData()["foo"] = map[string]interface{}{ - "foo": "bar", - } - - assert.Equal(t, 0, int(td.Rows())) - - // Now insert the right way. - td.InsertRow("foo", map[string]interface{}{ - "foo": "bar", - }) - - assert.Equal(t, 1, int(td.Rows())) -} - func TestTableData_UpdateInMemoryColumns(t *testing.T) { var _cols typing.Columns for colName, colKind := range map[string]typing.KindDetails{ @@ -88,3 +72,57 @@ func TestTableData_UpdateInMemoryColumns(t *testing.T) { assert.Equal(t, col.KindDetails.ExtendedTimeDetails.Type, ext.DateTimeKindType, "correctly mapped type") assert.Equal(t, col.KindDetails.ExtendedTimeDetails.Format, time.RFC3339Nano, "format has been preserved") } + +func TestTableData_ShouldFlushRowLength(t *testing.T) { + ctx := context.Background() + ctx = config.InjectSettingsIntoContext(ctx, &config.Settings{Config: &config.Config{ + FlushSizeKb: 500, + BufferRows: 2, + }}) + + // Insert 3 rows and confirm that we need to flush. + td := NewTableData(nil, nil, kafkalib.TopicConfig{}) + for i := 0; i < 3; i ++ { + assert.False(t, td.ShouldFlush(ctx)) + td.InsertRow(fmt.Sprint(i), map[string]interface{}{ + "foo": "bar", + }) + } + + assert.True(t, td.ShouldFlush(ctx)) +} + +func TestTableData_ShouldFlushRowSize(t *testing.T) { + ctx := context.Background() + ctx = config.InjectSettingsIntoContext(ctx, &config.Settings{Config: &config.Config{ + FlushSizeKb: 5, + BufferRows: 20000, + }}) + + // Insert 3 rows and confirm that we need to flush. + td := NewTableData(nil, nil, kafkalib.TopicConfig{}) + for i := 0; i < 45; i ++ { + assert.False(t, td.ShouldFlush(ctx)) + td.InsertRow(fmt.Sprint(i), map[string]interface{}{ + "foo": "bar", + "array": []string{"foo", "bar", "dusty", "the aussie", "robin", "jacqueline", "charlie"}, + "true": true, + "false": false, + "nested": map[string]interface{}{ + "foo": "bar", + }, + }) + } + + td.InsertRow("33333", map[string]interface{}{ + "foo": "bar", + "array": []string{"foo", "bar", "dusty", "the aussie", "robin", "jacqueline", "charlie"}, + "true": true, + "false": false, + "nested": map[string]interface{}{ + "foo": "bar", + }, + }) + + assert.True(t, td.ShouldFlush(ctx)) +} diff --git a/lib/size/size.go b/lib/size/size.go new file mode 100644 index 000000000..7491b9705 --- /dev/null +++ b/lib/size/size.go @@ -0,0 +1,14 @@ +package size + +import ( + "fmt" +) + +// GetApproxSize will encode the actual variable into bytes and then check the length (approx) by using string encoding. +// We chose not to use unsafe.SizeOf or reflect.Type.Size (both are akin) because they do not do recursive traversal. +// We also chose not to use gob.NewEncoder because it does not work for all data types and had a huge computational overhead. +// Another bonus is that there is no possible way for this to error out. +func GetApproxSize(v interface{}) int { + valString := fmt.Sprint(v) + return len([]byte(valString)) +} diff --git a/lib/size/size_bench_test.go b/lib/size/size_bench_test.go new file mode 100644 index 000000000..4a91acf8a --- /dev/null +++ b/lib/size/size_bench_test.go @@ -0,0 +1,63 @@ +package size + +import ( + "fmt" + "testing" + "time" +) + +func BenchmarkGetApproxSize_TallTable(b *testing.B) { + rowsData := make(map[string]map[string]interface{}) + for i := 0; i < 5000; i ++ { + rowsData[fmt.Sprint(i)] = map[string]interface{}{ + "id": i, + "name": "Robin", + "dog": "dusty the mini aussie", + } + } + + for n := 0; n < b.N; n ++ { + GetApproxSize(rowsData) + } +} + +func BenchmarkGetApproxSize_WideTable(b *testing.B) { + rowsData := make(map[string]map[string]interface{}) + for i := 0; i < 5000; i ++ { + rowsData[fmt.Sprint(i)] = map[string]interface{}{ + "id": i, + "name": "Robin", + "dog": "dusty the mini aussie", + "favorite_fruits": []string{"strawberry", "kiwi", "oranges"}, + "random": false, + "team": []string{"charlie", "jacqueline"}, + "email": "robin@artie.so", + "favorite_languages": []string{"go", "sql"}, + "favorite_databases": []string{"postgres", "bigtable"}, + "created_at": time.Now(), + "updated_at": time.Now(), + "negative_number": -500, + "nestedObject": map[string]interface{}{ + "foo": "bar", + "abc": "def", + }, + "array_of_objects": []map[string]interface{}{ + { + "foo": "bar", + }, + { + "foo_nested": map[string]interface{}{ + "foo_foo": "bar_bar", + }, + }, + }, + "is_deleted": false, + "lorem_ipsum": "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec elementum aliquet mi at efficitur. Praesent at erat ac elit faucibus convallis. Donec fermentum tellus eu nunc ornare, non convallis justo facilisis. In hac habitasse platea dictumst. Praesent eu ante vitae erat semper finibus eget ac mauris. Duis gravida cursus enim, nec sagittis arcu placerat sed. Integer semper orci justo, nec rhoncus libero convallis sed.", + "lorem_ipsum2": "Fusce vitae elementum tortor. Vestibulum consectetur ante id nibh ullamcorper, quis sodales turpis tempor. Duis pellentesque suscipit nibh porta posuere. In libero massa, efficitur at ultricies sit amet, vulputate ac ante. In euismod erat eget nulla blandit pretium. Ut tempor ante vel congue venenatis. Vestibulum at metus nec nibh iaculis consequat suscipit ac leo. Maecenas vitae rutrum nulla, quis ultrices justo. Aliquam ipsum ex, luctus ac diam eget, tempor tempor risus.", + } + } + + for n := 0; n < b.N; n ++ { + GetApproxSize(rowsData) + } +} diff --git a/lib/size/size_test.go b/lib/size/size_test.go new file mode 100644 index 000000000..c473ac353 --- /dev/null +++ b/lib/size/size_test.go @@ -0,0 +1,34 @@ +package size + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestVariableToBytes(t *testing.T) { + filePath := "/tmp/size_test" + assert.NoError(t, os.RemoveAll(filePath)) + + rowsData := make(map[string]map[string]interface{}) // pk -> { col -> val } + for i := 0; i < 500; i ++ { + rowsData[fmt.Sprintf("key-%v", i)] = map[string]interface{}{ + "id": fmt.Sprintf("key-%v", i), + "artie": "transfer", + "dusty": "the mini aussie", + "next_puppy": true, + "team": []string{"charlie", "robin", "jacqueline"}, + } + } + + err := os.WriteFile(filePath, []byte(fmt.Sprint(rowsData)), os.ModePerm) + assert.NoError(t, err) + + stat, err := os.Stat(filePath) + assert.NoError(t, err) + + size := GetApproxSize(rowsData) + assert.NoError(t, err) + assert.Equal(t, int(stat.Size()), size) +} diff --git a/main.go b/main.go index 92591d1c4..0da17223a 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,7 @@ func main() { logger.FromContext(ctx).WithFields(map[string]interface{}{ "flush_interval_seconds": settings.Config.FlushIntervalSeconds, "buffer_pool_size": settings.Config.BufferRows, + "flush_pool_size (kb)": settings.Config.FlushSizeKb, }).Info("config is loaded") flushChan := make(chan bool) diff --git a/models/memory.go b/models/memory.go index 85def75b1..ed46b75f4 100644 --- a/models/memory.go +++ b/models/memory.go @@ -4,7 +4,6 @@ import ( "context" "errors" "github.com/artie-labs/transfer/lib/artie" - "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/stringutil" @@ -133,9 +132,7 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes } inMemoryDB.TableData[e.Table].LatestCDCTs = e.ExecutionTime - - settings := config.FromContext(ctx) - return inMemoryDB.TableData[e.Table].Rows() > settings.Config.BufferRows, nil + return inMemoryDB.TableData[e.Table].ShouldFlush(ctx), nil } func LoadMemoryDB() { diff --git a/processes/consumer/process_test.go b/processes/consumer/process_test.go index 0638658cc..6b66a71f4 100644 --- a/processes/consumer/process_test.go +++ b/processes/consumer/process_test.go @@ -24,6 +24,7 @@ func TestProcessMessageFailures(t *testing.T) { Config: &config.Config{ FlushIntervalSeconds: 10, BufferRows: 10, + FlushSizeKb: 900, }, VerboseLogging: false, })