From 003c7207c2f58263a65c04586b315115a4b3ff9a Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 11 Apr 2023 19:47:16 -0700 Subject: [PATCH] Allow specifying flush interval and pool size (#74) --- lib/config/config.go | 45 +++++++++++++++++++++++++----- lib/config/config_test.go | 29 ++++++++++++++++++- lib/config/constants/constants.go | 5 ---- lib/numbers/numbers.go | 6 ++++ lib/numbers/numbers_test.go | 27 ++++++++++++++++++ main.go | 9 ++++-- models/flush/flush_suite_test.go | 3 +- models/flush/flush_test.go | 11 +++++--- models/memory.go | 10 ++++--- models/memory_test.go | 4 +-- models/models_suite_test.go | 3 ++ processes/consumer/process.go | 2 +- processes/consumer/process_test.go | 10 ++++++- 13 files changed, 136 insertions(+), 28 deletions(-) create mode 100644 lib/numbers/numbers.go create mode 100644 lib/numbers/numbers_test.go diff --git a/lib/config/config.go b/lib/config/config.go index b7580deb3..12bfd30c4 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -2,16 +2,28 @@ package config import ( "fmt" + "github.com/artie-labs/transfer/lib/numbers" + "gopkg.in/yaml.v3" "io" "os" - "gopkg.in/yaml.v3" - "github.com/artie-labs/transfer/lib/array" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" ) +const ( + defaultFlushTimeSeconds = 10 + + flushIntervalSecondsStart = 5 + flushIntervalSecondsEnd = 6 * 60 * 60 + + bufferPoolSizeStart = 5 + // Snowflake has a limit of 2^14 elements within an expression. + // https://github.com/snowflakedb/snowflake-connector-python/issues/37 + bufferPoolSizeEnd = 15000 +) + type Sentry struct { DSN string `yaml:"dsn"` } @@ -42,11 +54,12 @@ func (k *Kafka) String() string { } type Config struct { - Output constants.DestinationKind `yaml:"outputSource"` - Queue constants.QueueKind `yaml:"queue"` - - Pubsub *Pubsub - Kafka *Kafka + Output constants.DestinationKind `yaml:"outputSource"` + Queue constants.QueueKind `yaml:"queue"` + FlushIntervalSeconds int `yaml:"flushIntervalSeconds"` + BufferRows uint `yaml:"bufferRows"` + Pubsub *Pubsub + Kafka *Kafka BigQuery struct { // PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var @@ -102,6 +115,14 @@ func readFileToConfig(pathToConfig string) (*Config, error) { config.Queue = constants.Kafka } + if config.FlushIntervalSeconds == 0 { + config.FlushIntervalSeconds = defaultFlushTimeSeconds + } + + if config.BufferRows == 0 { + config.BufferRows = bufferPoolSizeEnd + } + return &config, nil } @@ -113,6 +134,16 @@ func (c *Config) Validate() error { return fmt.Errorf("config is nil") } + if !numbers.BetweenEq(flushIntervalSecondsStart, flushIntervalSecondsEnd, c.FlushIntervalSeconds) { + return fmt.Errorf("flush interval is outside of our range, seconds: %v, expected start: %v, end: %v", + c.FlushIntervalSeconds, flushIntervalSecondsStart, flushIntervalSecondsEnd) + } + + if !numbers.BetweenEq(bufferPoolSizeStart, bufferPoolSizeEnd, int(c.BufferRows)) { + return fmt.Errorf("buffer pool is outside of our range: %v, expected start: %v, end: %v", + c.BufferRows, bufferPoolSizeStart, bufferPoolSizeEnd) + } + if !constants.IsValidDestination(c.Output) { return fmt.Errorf("output: %s is invalid", c.Output) } diff --git a/lib/config/config_test.go b/lib/config/config_test.go index 8d128bf18..3a2659570 100644 --- a/lib/config/config_test.go +++ b/lib/config/config_test.go @@ -66,6 +66,8 @@ func TestOutputSourceValid(t *testing.T) { _, err = io.WriteString(file, fmt.Sprintf( ` outputSource: snowflake +flushIntervalSeconds: 15 +bufferRows: 10 %s `, validKafkaTopic)) assert.Nil(t, err) @@ -73,6 +75,9 @@ outputSource: snowflake config, err := readFileToConfig(randomFile) assert.Nil(t, err) + assert.Equal(t, config.FlushIntervalSeconds, 15) + assert.Equal(t, int(config.BufferRows), 10) + assert.Nil(t, config.Validate()) } @@ -181,6 +186,9 @@ kafka: config, err = readFileToConfig(randomFile) assert.Nil(t, err) + assert.Equal(t, config.FlushIntervalSeconds, defaultFlushTimeSeconds) + assert.Equal(t, int(config.BufferRows), bufferPoolSizeEnd) + validErr = config.Validate() assert.Error(t, validErr) assert.True(t, strings.Contains(validErr.Error(), "kafka settings is invalid"), validErr.Error()) @@ -346,7 +354,9 @@ reporting: func TestConfig_Validate(t *testing.T) { pubsub := &Pubsub{} cfg := &Config{ - Pubsub: pubsub, + Pubsub: pubsub, + FlushIntervalSeconds: 5, + BufferRows: 500, } assert.Contains(t, cfg.Validate().Error(), "is invalid") @@ -372,4 +382,21 @@ func TestConfig_Validate(t *testing.T) { pubsub.ProjectID = "project_id" pubsub.PathToCredentials = "/tmp/abc" assert.Nil(t, cfg.Validate()) + + // Test the various flush error settings. + for _, count := range []int{0, 5000000} { + // Reset buffer rows. + cfg.BufferRows = 500 + cfg.FlushIntervalSeconds = count + assert.Contains(t, cfg.Validate().Error(), "flush interval is outside of our range") + + // Reset Flush + cfg.FlushIntervalSeconds = 20 + cfg.BufferRows = uint(count) + assert.Contains(t, cfg.Validate().Error(), "buffer pool is outside of our range") + } + + cfg.BufferRows = 500 + cfg.FlushIntervalSeconds = 600 + assert.Nil(t, cfg.Validate()) } diff --git a/lib/config/constants/constants.go b/lib/config/constants/constants.go index 914270700..b11ffbdb2 100644 --- a/lib/config/constants/constants.go +++ b/lib/config/constants/constants.go @@ -7,11 +7,6 @@ const ( DeleteColumnMarker = ArtiePrefix + "_delete" DeletionConfidencePadding = 4 * time.Hour - // SnowflakeArraySize is used because Snowflake has a max of 16,384 elements in an expression, - // https://github.com/snowflakedb/snowflake-connector-python/issues/37 - SnowflakeArraySize = 15000 - FlushTimeInterval = 10 * time.Second - // DBZPostgresFormat is the only supported CDC format right now DBZPostgresFormat = "debezium.postgres" DBZPostgresAltFormat = "debezium.postgres.wal2json" diff --git a/lib/numbers/numbers.go b/lib/numbers/numbers.go new file mode 100644 index 000000000..16deb5825 --- /dev/null +++ b/lib/numbers/numbers.go @@ -0,0 +1,6 @@ +package numbers + +// BetweenEq - Looks something like this. start <= number <= end +func BetweenEq(start, end, number int) bool { + return number >= start && number <= end +} diff --git a/lib/numbers/numbers_test.go b/lib/numbers/numbers_test.go new file mode 100644 index 000000000..c7419c2b3 --- /dev/null +++ b/lib/numbers/numbers_test.go @@ -0,0 +1,27 @@ +package numbers + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBetweenEq(t *testing.T) { + type testCase struct { + result bool + start int + end int + number int + } + + cases := []testCase{ + {result: true, start: 5, end: 500, number: 100}, + {result: true, start: 5, end: 500, number: 5}, + {result: true, start: 5, end: 500, number: 500}, + {result: false, start: 5, end: 500, number: 501}, + {result: false, start: 5, end: 500, number: 4}, + } + + for _, _case := range cases { + assert.Equal(t, _case.result, BetweenEq(_case.start, _case.end, _case.number), _case) + } +} diff --git a/main.go b/main.go index 1b4d216d3..92591d1c4 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "os" "sync" + "time" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" @@ -25,16 +26,20 @@ func main() { ctx = utils.InjectDwhIntoCtx(utils.DataWarehouse(ctx, nil), ctx) models.LoadMemoryDB() + settings := config.FromContext(ctx) + logger.FromContext(ctx).WithFields(map[string]interface{}{ + "flush_interval_seconds": settings.Config.FlushIntervalSeconds, + "buffer_pool_size": settings.Config.BufferRows, + }).Info("config is loaded") flushChan := make(chan bool) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - pool.StartPool(ctx, constants.FlushTimeInterval, flushChan) + pool.StartPool(ctx, time.Duration(settings.Config.FlushIntervalSeconds)*time.Second, flushChan) }() - settings := config.FromContext(ctx) wg.Add(1) go func(ctx context.Context) { defer wg.Done() diff --git a/models/flush/flush_suite_test.go b/models/flush/flush_suite_test.go index fbd861604..4539f88c3 100644 --- a/models/flush/flush_suite_test.go +++ b/models/flush/flush_suite_test.go @@ -29,7 +29,8 @@ func (f *FlushTestSuite) SetupTest() { f.ctx = config.InjectSettingsIntoContext(f.ctx, &config.Settings{ Config: &config.Config{ - Output: "snowflake", + Output: "snowflake", + BufferRows: 500, }, VerboseLogging: false, }) diff --git a/models/flush/flush_test.go b/models/flush/flush_test.go index 5783b5e2b..4905fb67e 100644 --- a/models/flush/flush_test.go +++ b/models/flush/flush_test.go @@ -3,6 +3,7 @@ package flush import ( "fmt" "github.com/artie-labs/transfer/lib/artie" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" @@ -33,7 +34,7 @@ func (f *FlushTestSuite) TestMemoryBasic() { } kafkaMsg := kafka.Message{Partition: 1, Offset: 1} - _, err := event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) + _, err := event.Save(f.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) assert.Nil(f.T(), err) assert.Equal(f.T(), len(models.GetMemoryDB().TableData["foo"].RowsData), i+1) } @@ -41,7 +42,9 @@ func (f *FlushTestSuite) TestMemoryBasic() { func (f *FlushTestSuite) TestShouldFlush() { var flush bool - for i := 0; i < constants.SnowflakeArraySize*1.5; i++ { + cfg := config.FromContext(f.ctx) + + for i := 0; i < int(float64(cfg.Config.BufferRows)*1.5); i++ { event := models.Event{ Table: "postgres", PrimaryKeyName: "id", @@ -56,7 +59,7 @@ func (f *FlushTestSuite) TestShouldFlush() { var err error kafkaMsg := kafka.Message{Partition: 1, Offset: int64(i)} - flush, err = event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) + flush, err = event.Save(f.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) assert.Nil(f.T(), err) if flush { @@ -90,7 +93,7 @@ func (f *FlushTestSuite) TestMemoryConcurrency() { } kafkaMsg := kafka.Message{Partition: 1, Offset: int64(i)} - _, err := event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) + _, err := event.Save(f.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) assert.Nil(f.T(), err) } diff --git a/models/memory.go b/models/memory.go index 92196d94a..696e38ab6 100644 --- a/models/memory.go +++ b/models/memory.go @@ -1,10 +1,11 @@ package models import ( + "context" "errors" "fmt" "github.com/artie-labs/transfer/lib/artie" - "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/stringutil" @@ -35,7 +36,7 @@ func (d *DatabaseData) ClearTableConfig(tableName string) { // The boolean signifies whether we should flush immediately or not. This is because Snowflake has a constraint // On the number of elements within an expression. // The other, error - is returned to see if anything went awry. -func (e *Event) Save(topicConfig *kafkalib.TopicConfig, message artie.Message) (bool, error) { +func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, message artie.Message) (bool, error) { if topicConfig == nil { return false, errors.New("topicConfig is missing") } @@ -104,7 +105,7 @@ func (e *Event) Save(topicConfig *kafkalib.TopicConfig, message artie.Message) ( inMemoryDB.TableData[e.Table].InMemoryColumns[col] = typing.Invalid continue } - + colTypeDetails, isOk := inMemoryDB.TableData[e.Table].InMemoryColumns[col] if !isOk { inMemoryDB.TableData[e.Table].InMemoryColumns[col] = typing.ParseValue(val) @@ -121,7 +122,8 @@ func (e *Event) Save(topicConfig *kafkalib.TopicConfig, message artie.Message) ( } } - return inMemoryDB.TableData[e.Table].Rows > constants.SnowflakeArraySize, nil + settings := config.FromContext(ctx) + return inMemoryDB.TableData[e.Table].Rows > settings.Config.BufferRows, nil } func LoadMemoryDB() { diff --git a/models/memory_test.go b/models/memory_test.go index 99567f0a2..4827e6218 100644 --- a/models/memory_test.go +++ b/models/memory_test.go @@ -33,7 +33,7 @@ func (m *ModelsTestSuite) SaveEvent() { } kafkaMsg := kafka.Message{} - _, err := event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) + _, err := event.Save(m.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) assert.Nil(m.T(), err) optimization := GetMemoryDB().TableData["foo"] @@ -64,7 +64,7 @@ func (m *ModelsTestSuite) SaveEvent() { } newKafkaMsg := kafka.Message{} - _, err = edgeCaseEvent.Save(topicConfig, artie.NewMessage(&newKafkaMsg, nil, newKafkaMsg.Topic)) + _, err = edgeCaseEvent.Save(m.ctx, topicConfig, artie.NewMessage(&newKafkaMsg, nil, newKafkaMsg.Topic)) assert.NoError(m.T(), err) val, isOk := GetMemoryDB().TableData["foo"].InMemoryColumns[badColumn] assert.True(m.T(), isOk) diff --git a/models/models_suite_test.go b/models/models_suite_test.go index d83eedc55..fb284b259 100644 --- a/models/models_suite_test.go +++ b/models/models_suite_test.go @@ -1,6 +1,7 @@ package models import ( + "context" "github.com/artie-labs/transfer/lib/mocks" "github.com/stretchr/testify/suite" "testing" @@ -9,10 +10,12 @@ import ( type ModelsTestSuite struct { suite.Suite fakeStore *mocks.FakeStore + ctx context.Context } func (m *ModelsTestSuite) SetupTest() { LoadMemoryDB() + m.ctx = context.Background() } func TestModelsTestSuite(t *testing.T) { diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 13f9aea00..097875c31 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -43,7 +43,7 @@ func processMessage(ctx context.Context, msg artie.Message, topicToConfigFmtMap } evt := models.ToMemoryEvent(ctx, event, pkName, pkValue, topicConfig.tc) - shouldFlush, err = evt.Save(topicConfig.tc, msg) + shouldFlush, err = evt.Save(ctx, topicConfig.tc, msg) if err != nil { tags["what"] = "save_fail" err = fmt.Errorf("event failed to save, err: %v", err) diff --git a/processes/consumer/process_test.go b/processes/consumer/process_test.go index 350bd5c33..ccbc3dc7c 100644 --- a/processes/consumer/process_test.go +++ b/processes/consumer/process_test.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/cdc/mongo" + "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/models" @@ -19,6 +20,14 @@ func TestProcessMessageFailures(t *testing.T) { models.LoadMemoryDB() ctx := context.Background() + ctx = config.InjectSettingsIntoContext(ctx, &config.Settings{ + Config: &config.Config{ + FlushIntervalSeconds: 10, + BufferRows: 10, + }, + VerboseLogging: false, + }) + kafkaMsg := kafka.Message{ Topic: "foo", Partition: 0, @@ -128,5 +137,4 @@ func TestProcessMessageFailures(t *testing.T) { shouldFlush, err = processMessage(ctx, msg, topicToConfigFmtMap, "foo") assert.False(t, shouldFlush) assert.Error(t, err) - }