Skip to content

Commit

Permalink
Allow specifying flush interval and pool size (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Apr 12, 2023
1 parent 953a9ab commit 003c720
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 28 deletions.
45 changes: 38 additions & 7 deletions lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,28 @@ package config

import (
"fmt"
"github.com/artie-labs/transfer/lib/numbers"
"gopkg.in/yaml.v3"
"io"
"os"

"gopkg.in/yaml.v3"

"github.com/artie-labs/transfer/lib/array"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
)

const (
defaultFlushTimeSeconds = 10

flushIntervalSecondsStart = 5
flushIntervalSecondsEnd = 6 * 60 * 60

bufferPoolSizeStart = 5
// Snowflake has a limit of 2^14 elements within an expression.
// https://github.com/snowflakedb/snowflake-connector-python/issues/37
bufferPoolSizeEnd = 15000
)

type Sentry struct {
DSN string `yaml:"dsn"`
}
Expand Down Expand Up @@ -42,11 +54,12 @@ func (k *Kafka) String() string {
}

type Config struct {
Output constants.DestinationKind `yaml:"outputSource"`
Queue constants.QueueKind `yaml:"queue"`

Pubsub *Pubsub
Kafka *Kafka
Output constants.DestinationKind `yaml:"outputSource"`
Queue constants.QueueKind `yaml:"queue"`
FlushIntervalSeconds int `yaml:"flushIntervalSeconds"`
BufferRows uint `yaml:"bufferRows"`
Pubsub *Pubsub
Kafka *Kafka

BigQuery struct {
// PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var
Expand Down Expand Up @@ -102,6 +115,14 @@ func readFileToConfig(pathToConfig string) (*Config, error) {
config.Queue = constants.Kafka
}

if config.FlushIntervalSeconds == 0 {
config.FlushIntervalSeconds = defaultFlushTimeSeconds
}

if config.BufferRows == 0 {
config.BufferRows = bufferPoolSizeEnd
}

return &config, nil
}

Expand All @@ -113,6 +134,16 @@ func (c *Config) Validate() error {
return fmt.Errorf("config is nil")
}

if !numbers.BetweenEq(flushIntervalSecondsStart, flushIntervalSecondsEnd, c.FlushIntervalSeconds) {
return fmt.Errorf("flush interval is outside of our range, seconds: %v, expected start: %v, end: %v",
c.FlushIntervalSeconds, flushIntervalSecondsStart, flushIntervalSecondsEnd)
}

if !numbers.BetweenEq(bufferPoolSizeStart, bufferPoolSizeEnd, int(c.BufferRows)) {
return fmt.Errorf("buffer pool is outside of our range: %v, expected start: %v, end: %v",
c.BufferRows, bufferPoolSizeStart, bufferPoolSizeEnd)
}

if !constants.IsValidDestination(c.Output) {
return fmt.Errorf("output: %s is invalid", c.Output)
}
Expand Down
29 changes: 28 additions & 1 deletion lib/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ func TestOutputSourceValid(t *testing.T) {
_, err = io.WriteString(file, fmt.Sprintf(
`
outputSource: snowflake
flushIntervalSeconds: 15
bufferRows: 10
%s
`, validKafkaTopic))
assert.Nil(t, err)

config, err := readFileToConfig(randomFile)
assert.Nil(t, err)

assert.Equal(t, config.FlushIntervalSeconds, 15)
assert.Equal(t, int(config.BufferRows), 10)

assert.Nil(t, config.Validate())
}

Expand Down Expand Up @@ -181,6 +186,9 @@ kafka:
config, err = readFileToConfig(randomFile)
assert.Nil(t, err)

assert.Equal(t, config.FlushIntervalSeconds, defaultFlushTimeSeconds)
assert.Equal(t, int(config.BufferRows), bufferPoolSizeEnd)

validErr = config.Validate()
assert.Error(t, validErr)
assert.True(t, strings.Contains(validErr.Error(), "kafka settings is invalid"), validErr.Error())
Expand Down Expand Up @@ -346,7 +354,9 @@ reporting:
func TestConfig_Validate(t *testing.T) {
pubsub := &Pubsub{}
cfg := &Config{
Pubsub: pubsub,
Pubsub: pubsub,
FlushIntervalSeconds: 5,
BufferRows: 500,
}

assert.Contains(t, cfg.Validate().Error(), "is invalid")
Expand All @@ -372,4 +382,21 @@ func TestConfig_Validate(t *testing.T) {
pubsub.ProjectID = "project_id"
pubsub.PathToCredentials = "/tmp/abc"
assert.Nil(t, cfg.Validate())

// Test the various flush error settings.
for _, count := range []int{0, 5000000} {
// Reset buffer rows.
cfg.BufferRows = 500
cfg.FlushIntervalSeconds = count
assert.Contains(t, cfg.Validate().Error(), "flush interval is outside of our range")

// Reset Flush
cfg.FlushIntervalSeconds = 20
cfg.BufferRows = uint(count)
assert.Contains(t, cfg.Validate().Error(), "buffer pool is outside of our range")
}

cfg.BufferRows = 500
cfg.FlushIntervalSeconds = 600
assert.Nil(t, cfg.Validate())
}
5 changes: 0 additions & 5 deletions lib/config/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ const (
DeleteColumnMarker = ArtiePrefix + "_delete"
DeletionConfidencePadding = 4 * time.Hour

// SnowflakeArraySize is used because Snowflake has a max of 16,384 elements in an expression,
// https://github.com/snowflakedb/snowflake-connector-python/issues/37
SnowflakeArraySize = 15000
FlushTimeInterval = 10 * time.Second

// DBZPostgresFormat is the only supported CDC format right now
DBZPostgresFormat = "debezium.postgres"
DBZPostgresAltFormat = "debezium.postgres.wal2json"
Expand Down
6 changes: 6 additions & 0 deletions lib/numbers/numbers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package numbers

// BetweenEq - Looks something like this. start <= number <= end
func BetweenEq(start, end, number int) bool {
return number >= start && number <= end
}
27 changes: 27 additions & 0 deletions lib/numbers/numbers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package numbers

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestBetweenEq(t *testing.T) {
type testCase struct {
result bool
start int
end int
number int
}

cases := []testCase{
{result: true, start: 5, end: 500, number: 100},
{result: true, start: 5, end: 500, number: 5},
{result: true, start: 5, end: 500, number: 500},
{result: false, start: 5, end: 500, number: 501},
{result: false, start: 5, end: 500, number: 4},
}

for _, _case := range cases {
assert.Equal(t, _case.result, BetweenEq(_case.start, _case.end, _case.number), _case)
}
}
9 changes: 7 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"sync"
"time"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
Expand All @@ -25,16 +26,20 @@ func main() {
ctx = utils.InjectDwhIntoCtx(utils.DataWarehouse(ctx, nil), ctx)

models.LoadMemoryDB()
settings := config.FromContext(ctx)
logger.FromContext(ctx).WithFields(map[string]interface{}{
"flush_interval_seconds": settings.Config.FlushIntervalSeconds,
"buffer_pool_size": settings.Config.BufferRows,
}).Info("config is loaded")

flushChan := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
pool.StartPool(ctx, constants.FlushTimeInterval, flushChan)
pool.StartPool(ctx, time.Duration(settings.Config.FlushIntervalSeconds)*time.Second, flushChan)
}()

settings := config.FromContext(ctx)
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
Expand Down
3 changes: 2 additions & 1 deletion models/flush/flush_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func (f *FlushTestSuite) SetupTest() {

f.ctx = config.InjectSettingsIntoContext(f.ctx, &config.Settings{
Config: &config.Config{
Output: "snowflake",
Output: "snowflake",
BufferRows: 500,
},
VerboseLogging: false,
})
Expand Down
11 changes: 7 additions & 4 deletions models/flush/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flush
import (
"fmt"
"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -33,15 +34,17 @@ func (f *FlushTestSuite) TestMemoryBasic() {
}

kafkaMsg := kafka.Message{Partition: 1, Offset: 1}
_, err := event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
_, err := event.Save(f.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
assert.Nil(f.T(), err)
assert.Equal(f.T(), len(models.GetMemoryDB().TableData["foo"].RowsData), i+1)
}
}

func (f *FlushTestSuite) TestShouldFlush() {
var flush bool
for i := 0; i < constants.SnowflakeArraySize*1.5; i++ {
cfg := config.FromContext(f.ctx)

for i := 0; i < int(float64(cfg.Config.BufferRows)*1.5); i++ {
event := models.Event{
Table: "postgres",
PrimaryKeyName: "id",
Expand All @@ -56,7 +59,7 @@ func (f *FlushTestSuite) TestShouldFlush() {

var err error
kafkaMsg := kafka.Message{Partition: 1, Offset: int64(i)}
flush, err = event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
flush, err = event.Save(f.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
assert.Nil(f.T(), err)

if flush {
Expand Down Expand Up @@ -90,7 +93,7 @@ func (f *FlushTestSuite) TestMemoryConcurrency() {
}

kafkaMsg := kafka.Message{Partition: 1, Offset: int64(i)}
_, err := event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
_, err := event.Save(f.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
assert.Nil(f.T(), err)
}

Expand Down
10 changes: 6 additions & 4 deletions models/memory.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package models

import (
"context"
"errors"
"fmt"
"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/stringutil"
Expand Down Expand Up @@ -35,7 +36,7 @@ func (d *DatabaseData) ClearTableConfig(tableName string) {
// The boolean signifies whether we should flush immediately or not. This is because Snowflake has a constraint
// On the number of elements within an expression.
// The other, error - is returned to see if anything went awry.
func (e *Event) Save(topicConfig *kafkalib.TopicConfig, message artie.Message) (bool, error) {
func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, message artie.Message) (bool, error) {
if topicConfig == nil {
return false, errors.New("topicConfig is missing")
}
Expand Down Expand Up @@ -104,7 +105,7 @@ func (e *Event) Save(topicConfig *kafkalib.TopicConfig, message artie.Message) (
inMemoryDB.TableData[e.Table].InMemoryColumns[col] = typing.Invalid
continue
}

colTypeDetails, isOk := inMemoryDB.TableData[e.Table].InMemoryColumns[col]
if !isOk {
inMemoryDB.TableData[e.Table].InMemoryColumns[col] = typing.ParseValue(val)
Expand All @@ -121,7 +122,8 @@ func (e *Event) Save(topicConfig *kafkalib.TopicConfig, message artie.Message) (
}
}

return inMemoryDB.TableData[e.Table].Rows > constants.SnowflakeArraySize, nil
settings := config.FromContext(ctx)
return inMemoryDB.TableData[e.Table].Rows > settings.Config.BufferRows, nil
}

func LoadMemoryDB() {
Expand Down
4 changes: 2 additions & 2 deletions models/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (m *ModelsTestSuite) SaveEvent() {
}

kafkaMsg := kafka.Message{}
_, err := event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
_, err := event.Save(m.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
assert.Nil(m.T(), err)

optimization := GetMemoryDB().TableData["foo"]
Expand Down Expand Up @@ -64,7 +64,7 @@ func (m *ModelsTestSuite) SaveEvent() {
}

newKafkaMsg := kafka.Message{}
_, err = edgeCaseEvent.Save(topicConfig, artie.NewMessage(&newKafkaMsg, nil, newKafkaMsg.Topic))
_, err = edgeCaseEvent.Save(m.ctx, topicConfig, artie.NewMessage(&newKafkaMsg, nil, newKafkaMsg.Topic))
assert.NoError(m.T(), err)
val, isOk := GetMemoryDB().TableData["foo"].InMemoryColumns[badColumn]
assert.True(m.T(), isOk)
Expand Down
3 changes: 3 additions & 0 deletions models/models_suite_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"context"
"github.com/artie-labs/transfer/lib/mocks"
"github.com/stretchr/testify/suite"
"testing"
Expand All @@ -9,10 +10,12 @@ import (
type ModelsTestSuite struct {
suite.Suite
fakeStore *mocks.FakeStore
ctx context.Context
}

func (m *ModelsTestSuite) SetupTest() {
LoadMemoryDB()
m.ctx = context.Background()
}

func TestModelsTestSuite(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion processes/consumer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func processMessage(ctx context.Context, msg artie.Message, topicToConfigFmtMap
}

evt := models.ToMemoryEvent(ctx, event, pkName, pkValue, topicConfig.tc)
shouldFlush, err = evt.Save(topicConfig.tc, msg)
shouldFlush, err = evt.Save(ctx, topicConfig.tc, msg)
if err != nil {
tags["what"] = "save_fail"
err = fmt.Errorf("event failed to save, err: %v", err)
Expand Down
10 changes: 9 additions & 1 deletion processes/consumer/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/cdc/mongo"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/models"
Expand All @@ -19,6 +20,14 @@ func TestProcessMessageFailures(t *testing.T) {
models.LoadMemoryDB()

ctx := context.Background()
ctx = config.InjectSettingsIntoContext(ctx, &config.Settings{
Config: &config.Config{
FlushIntervalSeconds: 10,
BufferRows: 10,
},
VerboseLogging: false,
})

kafkaMsg := kafka.Message{
Topic: "foo",
Partition: 0,
Expand Down Expand Up @@ -128,5 +137,4 @@ func TestProcessMessageFailures(t *testing.T) {
shouldFlush, err = processMessage(ctx, msg, topicToConfigFmtMap, "foo")
assert.False(t, shouldFlush)
assert.Error(t, err)

}

0 comments on commit 003c720

Please sign in to comment.