Skip to content

Commit

Permalink
Flush rules to be cognizant of in-memory database size (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored May 8, 2023
1 parent d0a0955 commit 4975601
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 38 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ build:
.PHONY: release
release:
goreleaser release --clean

.PHONY: bench_size
bench_size:
go test ./lib/size -bench=Bench -benchtime=10s
go test ./lib/optimization -bench=Bench -benchtime=10s
45 changes: 30 additions & 15 deletions lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

const (
defaultFlushTimeSeconds = 10
defaultFlushSizeKb = 25 * 1024 // 25 mb

flushIntervalSecondsStart = 5
flushIntervalSecondsEnd = 6 * 60 * 60
Expand Down Expand Up @@ -71,13 +72,19 @@ func (k *Kafka) String() string {
}

type Config struct {
Output constants.DestinationKind `yaml:"outputSource"`
Queue constants.QueueKind `yaml:"queue"`
FlushIntervalSeconds int `yaml:"flushIntervalSeconds"`
BufferRows uint `yaml:"bufferRows"`
Pubsub *Pubsub
Kafka *Kafka
Output constants.DestinationKind `yaml:"outputSource"`
Queue constants.QueueKind `yaml:"queue"`

// Flush rules
FlushIntervalSeconds int `yaml:"flushIntervalSeconds"`
FlushSizeKb int `yaml:"flushSizeKb"`
BufferRows uint `yaml:"bufferRows"`

// Supported message queues
Pubsub *Pubsub
Kafka *Kafka

// Supported destinations
BigQuery *BigQuery `yaml:"bigquery"`
Snowflake *Snowflake `yaml:"snowflake"`

Expand Down Expand Up @@ -126,6 +133,10 @@ func readFileToConfig(pathToConfig string) (*Config, error) {
config.BufferRows = bufferPoolSizeEnd
}

if config.FlushSizeKb == 0 {
config.FlushSizeKb = defaultFlushSizeKb
}

return &config, nil
}

Expand All @@ -137,50 +148,54 @@ func (c *Config) Validate() error {
return fmt.Errorf("config is nil")
}

if c.FlushSizeKb <= 0 {
return fmt.Errorf("config is invalid, flush size pool has to be a positive number, current value: %v", c.FlushSizeKb)
}

if !numbers.BetweenEq(flushIntervalSecondsStart, flushIntervalSecondsEnd, c.FlushIntervalSeconds) {
return fmt.Errorf("flush interval is outside of our range, seconds: %v, expected start: %v, end: %v",
return fmt.Errorf("config is invalid, flush interval is outside of our range, seconds: %v, expected start: %v, end: %v",
c.FlushIntervalSeconds, flushIntervalSecondsStart, flushIntervalSecondsEnd)
}

if !numbers.BetweenEq(bufferPoolSizeStart, bufferPoolSizeEnd, int(c.BufferRows)) {
return fmt.Errorf("buffer pool is outside of our range: %v, expected start: %v, end: %v",
return fmt.Errorf("config is invalid, buffer pool is outside of our range: %v, expected start: %v, end: %v",
c.BufferRows, bufferPoolSizeStart, bufferPoolSizeEnd)
}

if !constants.IsValidDestination(c.Output) {
return fmt.Errorf("output: %s is invalid", c.Output)
return fmt.Errorf("config is invalid, output: %s is invalid", c.Output)
}

if c.Queue == constants.Kafka {
if c.Kafka == nil || len(c.Kafka.TopicConfigs) == 0 {
return fmt.Errorf("no kafka topic configs, kafka: %v", c.Kafka)
return fmt.Errorf("config is invalid, no kafka topic configs, kafka: %v", c.Kafka)
}

for _, topicConfig := range c.Kafka.TopicConfigs {
if valid := topicConfig.Valid(); !valid {
return fmt.Errorf("topic config is invalid, tc: %s", topicConfig.String())
return fmt.Errorf("config is invalid, topic config is invalid, tc: %s", topicConfig.String())
}
}

// Username and password are not required (if it's within the same VPC or connecting locally
if array.Empty([]string{c.Kafka.GroupID, c.Kafka.BootstrapServer}) {
return fmt.Errorf("kafka settings is invalid, kafka: %s", c.Kafka.String())
return fmt.Errorf("config is invalid, kafka settings is invalid, kafka: %s", c.Kafka.String())
}
}

if c.Queue == constants.PubSub {
if c.Pubsub == nil || len(c.Pubsub.TopicConfigs) == 0 {
return fmt.Errorf("no pubsub topic configs, pubsub: %v", c.Pubsub)
return fmt.Errorf("config is invalid, no pubsub topic configs, pubsub: %v", c.Pubsub)
}

for _, topicConfig := range c.Pubsub.TopicConfigs {
if valid := topicConfig.Valid(); !valid {
return fmt.Errorf("topic config is invalid, tc: %s", topicConfig.String())
return fmt.Errorf("config is invalid, topic config is invalid, tc: %s", topicConfig.String())
}
}

if array.Empty([]string{c.Pubsub.ProjectID, c.Pubsub.PathToCredentials}) {
return fmt.Errorf("pubsub settings is invalid, pubsub: %s", c.Pubsub.String())
return fmt.Errorf("config is invalid, pubsub settings is invalid, pubsub: %s", c.Pubsub.String())
}
}

Expand Down
7 changes: 7 additions & 0 deletions lib/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func TestConfig_Validate(t *testing.T) {

cfg.Output = constants.Snowflake
cfg.Queue = constants.PubSub
cfg.FlushSizeKb = 5
assert.Contains(t, cfg.Validate().Error(), "no pubsub topic configs")

pubsub.TopicConfigs = []*kafkalib.TopicConfig{
Expand Down Expand Up @@ -448,4 +449,10 @@ func TestConfig_Validate(t *testing.T) {
cfg.BufferRows = 500
cfg.FlushIntervalSeconds = 600
assert.Nil(t, cfg.Validate())

for _, num := range []int{-500, -300, -5, 0} {
cfg.FlushSizeKb = num
assert.Contains(t, cfg.Validate().Error(), "config is invalid, flush size pool has to be a positive number")
}

}
22 changes: 22 additions & 0 deletions lib/optimization/event.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package optimization

import (
"context"
"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/size"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/ext"
"strings"
Expand All @@ -22,6 +25,7 @@ type TableData struct {

// This is used for the automatic schema detection
LatestCDCTs time.Time
approxSize int
}

func NewTableData(inMemoryColumns *typing.Columns, primaryKeys []string, topicConfig kafkalib.TopicConfig) *TableData {
Expand All @@ -35,7 +39,20 @@ func NewTableData(inMemoryColumns *typing.Columns, primaryKeys []string, topicCo

}

// InsertRow creates a single entrypoint for how rows get added to TableData
// This is important to avoid concurrent r/w, but also the ability for us to add or decrement row size by keeping a running total
// With this, we are able to reduce the latency by 500x+ on a 5k row table. See event_bench_test.go vs. size_bench_test.go
func (t *TableData) InsertRow(pk string, rowData map[string]interface{}) {
newRowSize := size.GetApproxSize(rowData)
prevRow, isOk := t.rowsData[pk]
var prevRowSize int
if isOk {
// Since the new row is taking over, let's update the approx size.
prevRowSize = size.GetApproxSize(prevRow)
}

// If prevRow doesn't exist, it'll be 0, which is a no-op.
t.approxSize += newRowSize - prevRowSize
t.rowsData[pk] = rowData
return
}
Expand All @@ -58,6 +75,11 @@ func (t *TableData) Rows() uint {
return uint(len(t.rowsData))
}

func (t *TableData) ShouldFlush(ctx context.Context) bool {
settings := config.FromContext(ctx)
return t.Rows() > settings.Config.BufferRows || t.approxSize > settings.Config.FlushSizeKb * 1024
}

// UpdateInMemoryColumnsFromDestination - When running Transfer, we will have 2 column types.
// 1) TableData (constructed in-memory)
// 2) TableConfig (coming from the SQL DESCRIBE or equivalent statement) from the destination
Expand Down
56 changes: 56 additions & 0 deletions lib/optimization/event_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package optimization

import (
"fmt"
"github.com/artie-labs/transfer/lib/kafkalib"
"testing"
"time"
)

func BenchmarkTableData_ApproxSize_TallTable(b *testing.B) {
td := NewTableData(nil, nil, kafkalib.TopicConfig{})
for n := 0; n < b.N; n++ {
td.InsertRow(fmt.Sprint(n), map[string]interface{}{
"id": n,
"name": "Robin",
"dog": "dusty the mini aussie",
})
}
}

func BenchmarkTableData_ApproxSize_WideTable(b *testing.B) {
td := NewTableData(nil, nil, kafkalib.TopicConfig{})
for n := 0; n < b.N; n++ {
td.InsertRow(fmt.Sprint(n), map[string]interface{}{
"id": n,
"name": "Robin",
"dog": "dusty the mini aussie",
"favorite_fruits": []string{"strawberry", "kiwi", "oranges"},
"random": false,
"team": []string{"charlie", "jacqueline"},
"email": "[email protected]",
"favorite_languages": []string{"go", "sql"},
"favorite_databases": []string{"postgres", "bigtable"},
"created_at": time.Now(),
"updated_at": time.Now(),
"negative_number": -500,
"nestedObject": map[string]interface{}{
"foo": "bar",
"abc": "def",
},
"array_of_objects": []map[string]interface{}{
{
"foo": "bar",
},
{
"foo_nested": map[string]interface{}{
"foo_foo": "bar_bar",
},
},
},
"is_deleted": false,
"lorem_ipsum": "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec elementum aliquet mi at efficitur. Praesent at erat ac elit faucibus convallis. Donec fermentum tellus eu nunc ornare, non convallis justo facilisis. In hac habitasse platea dictumst. Praesent eu ante vitae erat semper finibus eget ac mauris. Duis gravida cursus enim, nec sagittis arcu placerat sed. Integer semper orci justo, nec rhoncus libero convallis sed.",
"lorem_ipsum2": "Fusce vitae elementum tortor. Vestibulum consectetur ante id nibh ullamcorper, quis sodales turpis tempor. Duis pellentesque suscipit nibh porta posuere. In libero massa, efficitur at ultricies sit amet, vulputate ac ante. In euismod erat eget nulla blandit pretium. Ut tempor ante vel congue venenatis. Vestibulum at metus nec nibh iaculis consequat suscipit ac leo. Maecenas vitae rutrum nulla, quis ultrices justo. Aliquam ipsum ex, luctus ac diam eget, tempor tempor risus.",
})
}
}
84 changes: 84 additions & 0 deletions lib/optimization/event_insert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package optimization

import (
"fmt"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/size"
"github.com/stretchr/testify/assert"
"testing"
)

func TestTableData_InsertRow(t *testing.T) {
td := NewTableData(nil, nil, kafkalib.TopicConfig{})
assert.Equal(t, 0, int(td.Rows()))

// See if we can add rows to the private method.
td.RowsData()["foo"] = map[string]interface{}{
"foo": "bar",
}

assert.Equal(t, 0, int(td.Rows()))

// Now insert the right way.
td.InsertRow("foo", map[string]interface{}{
"foo": "bar",
})

assert.Equal(t, 1, int(td.Rows()))
}

func TestTableData_InsertRowApproxSize(t *testing.T) {
// In this test, we'll insert 1000 rows, update X and then delete Y
// Does the size then match up? We will iterate over a map to take advantage of the in-deterministic ordering of a map
// So we can test multiple updates, deletes, etc.
td := NewTableData(nil, nil, kafkalib.TopicConfig{})
numInsertRows := 1000
numUpdateRows := 420
numDeleteRows := 250

for i := 0; i < numInsertRows; i ++ {
td.InsertRow(fmt.Sprint(i), map[string]interface{}{
"foo": "bar",
"array": []int{1, 2, 3, 4, 5},
"boolean": true,
"nested_object": map[string]interface{}{
"nested": map[string]interface{}{
"foo": "bar",
"true": false,
},
},
})
}

var updateCount int
for updateKey := range td.RowsData() {
updateCount += 1
td.InsertRow(updateKey, map[string]interface{}{
"foo": "foo",
"bar": "bar",
})

if updateCount > numUpdateRows {
break
}
}

var deleteCount int
for deleteKey := range td.RowsData() {
deleteCount += 1
td.InsertRow(deleteKey, map[string]interface{}{
"__artie_deleted": true,
})

if deleteCount > numDeleteRows {
break
}
}

var actualSize int
for _, rowData := range td.RowsData() {
actualSize += size.GetApproxSize(rowData)
}

assert.Equal(t, td.approxSize, actualSize)
}
Loading

0 comments on commit 4975601

Please sign in to comment.