Skip to content

Commit

Permalink
Implementing per table flush (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jun 8, 2023
1 parent 36cde56 commit 193d3bd
Show file tree
Hide file tree
Showing 15 changed files with 246 additions and 167 deletions.
1 change: 1 addition & 0 deletions lib/db/mock/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mock
import (
"database/sql"
"fmt"

"github.com/artie-labs/transfer/lib/mocks"
)

Expand Down
12 changes: 4 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,35 @@ func main() {
// Loading Telemetry
ctx = metrics.LoadExporter(ctx)
ctx = utils.InjectDwhIntoCtx(utils.DataWarehouse(ctx, nil), ctx)

ctx = models.LoadMemoryDB(ctx)

settings := config.FromContext(ctx)

logger.FromContext(ctx).WithFields(map[string]interface{}{
"flush_interval_seconds": settings.Config.FlushIntervalSeconds,
"buffer_pool_size": settings.Config.BufferRows,
"flush_pool_size (kb)": settings.Config.FlushSizeKb,
}).Info("config is loaded")

flushChan := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
pool.StartPool(ctx, time.Duration(settings.Config.FlushIntervalSeconds)*time.Second, flushChan)
pool.StartPool(ctx, time.Duration(settings.Config.FlushIntervalSeconds)*time.Second)
}()

wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()

switch settings.Config.Queue {
case constants.Kafka:
consumer.StartConsumer(ctx, flushChan)
consumer.StartConsumer(ctx)
break
case constants.PubSub:
consumer.StartSubscriber(ctx, flushChan)
consumer.StartSubscriber(ctx)
break
default:
logger.FromContext(ctx).Fatalf("message queue: %s not supported", settings.Config.Queue)
}

}(ctx)

wg.Wait()
Expand Down
29 changes: 14 additions & 15 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,34 +94,33 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes
return false, errors.New("topicConfig is missing")
}

inMemDB := models.GetMemoryDB(ctx)
inMemDB.Lock()
defer inMemDB.Unlock()

if !e.IsValid() {
return false, errors.New("event not valid")
}

inMemDB := models.GetMemoryDB(ctx)
// Does the table exist?
_, isOk := inMemDB.TableData[e.Table]
if !isOk {
td := inMemDB.GetOrCreateTableData(e.Table)
td.Lock()
defer td.Unlock()
if td.Empty() {
columns := &typing.Columns{}
if e.Columns != nil {
columns = e.Columns
}

inMemDB.TableData[e.Table] = optimization.NewTableData(columns, e.PrimaryKeys(), *topicConfig, e.Table)
td.SetTableData(optimization.NewTableData(columns, e.PrimaryKeys(), *topicConfig, e.Table))
} else {
if e.Columns != nil {
// Iterate over this again just in case.
for _, col := range e.Columns.GetColumns() {
inMemDB.TableData[e.Table].AddInMemoryCol(col)
td.AddInMemoryCol(col)
}
}
}

// Table columns
inMemoryColumns := inMemDB.TableData[e.Table].ReadOnlyInMemoryCols()
inMemoryColumns := td.ReadOnlyInMemoryCols()
// Update col if necessary
sanitizedData := make(map[string]interface{})
for _col, val := range e.Data {
Expand Down Expand Up @@ -166,19 +165,19 @@ func (e *Event) Save(ctx context.Context, topicConfig *kafkalib.TopicConfig, mes
}

// Now we commit the table columns.
inMemDB.TableData[e.Table].SetInMemoryColumns(inMemoryColumns)
td.SetInMemoryColumns(inMemoryColumns)

// Swap out sanitizedData <> data.
e.Data = sanitizedData
inMemDB.TableData[e.Table].InsertRow(e.PrimaryKeyValue(), e.Data)
td.InsertRow(e.PrimaryKeyValue(), e.Data)
// If the message is Kafka, then we only need the latest one
// If it's pubsub, we will store all of them in memory. This is because GCP pub/sub REQUIRES us to ack every single message
if message.Kind() == artie.Kafka {
inMemDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = []artie.Message{message}
td.PartitionsToLastMessage[message.Partition()] = []artie.Message{message}
} else {
inMemDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = append(inMemDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()], message)
td.PartitionsToLastMessage[message.Partition()] = append(td.PartitionsToLastMessage[message.Partition()], message)
}

inMemDB.TableData[e.Table].LatestCDCTs = e.ExecutionTime
return inMemDB.TableData[e.Table].ShouldFlush(ctx), nil
td.LatestCDCTs = e.ExecutionTime
return td.ShouldFlush(ctx), nil
}
33 changes: 20 additions & 13 deletions models/event/event_save_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (e *EventsTestSuite) TestSaveEvent() {
_, err := event.Save(e.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
assert.Nil(e.T(), err)

optimization := models.GetMemoryDB(e.ctx).TableData["foo"]
optimization := models.GetMemoryDB(e.ctx).GetOrCreateTableData("foo")
// Check the in-memory DB columns.
var found int
for _, col := range optimization.ReadOnlyInMemoryCols().GetColumns() {
Expand Down Expand Up @@ -74,9 +74,11 @@ func (e *EventsTestSuite) TestSaveEvent() {
newKafkaMsg := kafka.Message{}
_, err = edgeCaseEvent.Save(e.ctx, topicConfig, artie.NewMessage(&newKafkaMsg, nil, newKafkaMsg.Topic))
assert.NoError(e.T(), err)
inMemoryCol, isOk := models.GetMemoryDB(e.ctx).TableData["foo"].ReadOnlyInMemoryCols().GetColumn(badColumn)

td := models.GetMemoryDB(e.ctx).GetOrCreateTableData("foo")
inMemCol, isOk := td.ReadOnlyInMemoryCols().GetColumn(badColumn)
assert.True(e.T(), isOk)
assert.Equal(e.T(), typing.Invalid, inMemoryCol.KindDetails)
assert.Equal(e.T(), typing.Invalid, inMemCol.KindDetails)
}

func (e *EventsTestSuite) TestEvent_SaveCasing() {
Expand All @@ -96,7 +98,8 @@ func (e *EventsTestSuite) TestEvent_SaveCasing() {
_, err := event.Save(e.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
assert.Nil(e.T(), err)

rowData := models.GetMemoryDB(e.ctx).TableData["foo"].RowsData()[event.PrimaryKeyValue()]
td := models.GetMemoryDB(e.ctx).GetOrCreateTableData("foo")
rowData := td.RowsData()[event.PrimaryKeyValue()]
expectedColumns := []string{"randomcol", "anothercol"}
for _, expectedColumn := range expectedColumns {
_, isOk := rowData[expectedColumn]
Expand Down Expand Up @@ -131,19 +134,20 @@ func (e *EventsTestSuite) TestEventSaveOptionalSchema() {
_, err := event.Save(e.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
assert.Nil(e.T(), err)

column, isOk := models.GetMemoryDB(e.ctx).TableData["foo"].ReadOnlyInMemoryCols().GetColumn("created_at_date_string")
td := models.GetMemoryDB(e.ctx).GetOrCreateTableData("foo")
column, isOk := td.ReadOnlyInMemoryCols().GetColumn("created_at_date_string")
assert.True(e.T(), isOk)
assert.Equal(e.T(), typing.String, column.KindDetails)

column, isOk = models.GetMemoryDB(e.ctx).TableData["foo"].ReadOnlyInMemoryCols().GetColumn("created_at_date_no_schema")
column, isOk = td.ReadOnlyInMemoryCols().GetColumn("created_at_date_no_schema")
assert.True(e.T(), isOk)
assert.Equal(e.T(), ext.Date.Type, column.KindDetails.ExtendedTimeDetails.Type)

column, isOk = models.GetMemoryDB(e.ctx).TableData["foo"].ReadOnlyInMemoryCols().GetColumn("json_object_string")
column, isOk = td.ReadOnlyInMemoryCols().GetColumn("json_object_string")
assert.True(e.T(), isOk)
assert.Equal(e.T(), typing.String, column.KindDetails)

column, isOk = models.GetMemoryDB(e.ctx).TableData["foo"].ReadOnlyInMemoryCols().GetColumn("json_object_no_schema")
column, isOk = td.ReadOnlyInMemoryCols().GetColumn("json_object_no_schema")
assert.True(e.T(), isOk)
assert.Equal(e.T(), typing.Struct, column.KindDetails)
}
Expand All @@ -169,8 +173,9 @@ func (e *EventsTestSuite) TestEvent_SaveColumnsNoData() {
_, err := evt.Save(e.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
assert.NoError(e.T(), err)

td := models.GetMemoryDB(e.ctx).GetOrCreateTableData("non_existent")
var prevKey string
for _, col := range models.GetMemoryDB(e.ctx).TableData["non_existent"].ReadOnlyInMemoryCols().GetColumns() {
for _, col := range td.ReadOnlyInMemoryCols().GetColumns() {
if col.Name == constants.DeleteColumnMarker {
continue
}
Expand Down Expand Up @@ -235,19 +240,21 @@ func (e *EventsTestSuite) TestEventSaveColumns() {
_, err := event.Save(e.ctx, topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic))
assert.Nil(e.T(), err)

column, isOk := models.GetMemoryDB(e.ctx).TableData["foo"].ReadOnlyInMemoryCols().GetColumn("randomcol")
td := models.GetMemoryDB(e.ctx).GetOrCreateTableData("foo")

column, isOk := td.ReadOnlyInMemoryCols().GetColumn("randomcol")
assert.True(e.T(), isOk)
assert.Equal(e.T(), typing.String, column.KindDetails)

column, isOk = models.GetMemoryDB(e.ctx).TableData["foo"].ReadOnlyInMemoryCols().GetColumn("anothercol")
column, isOk = td.ReadOnlyInMemoryCols().GetColumn("anothercol")
assert.True(e.T(), isOk)
assert.Equal(e.T(), typing.Float, column.KindDetails)

column, isOk = models.GetMemoryDB(e.ctx).TableData["foo"].ReadOnlyInMemoryCols().GetColumn("created_at_date_string")
column, isOk = td.ReadOnlyInMemoryCols().GetColumn("created_at_date_string")
assert.True(e.T(), isOk)
assert.Equal(e.T(), ext.DateKindType, column.KindDetails.ExtendedTimeDetails.Type)

column, isOk = models.GetMemoryDB(e.ctx).TableData["foo"].ReadOnlyInMemoryCols().GetColumn(constants.DeleteColumnMarker)
column, isOk = td.ReadOnlyInMemoryCols().GetColumn(constants.DeleteColumnMarker)
assert.True(e.T(), isOk)
assert.Equal(e.T(), typing.Boolean, column.KindDetails)
}
76 changes: 0 additions & 76 deletions models/flush/flush.go

This file was deleted.

54 changes: 49 additions & 5 deletions models/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,36 @@ import (

const dbKey = "__db"

type DatabaseData struct {
TableData map[string]*optimization.TableData
// TableData is a wrapper around *optimization.TableData which stores the actual underlying tableData.
// The wrapper here is just to have a mutex. Any of the ptr methods on *TableData will require callers to use their own locks.
// We did this because certain operations require different locking patterns
type TableData struct {
*optimization.TableData
sync.Mutex
}

func (t *TableData) Wipe() {
t.TableData = nil
}

func (t *TableData) Empty() bool {
return t.TableData == nil
}

func (t *TableData) SetTableData(td *optimization.TableData) {
t.TableData = td
return
}

type DatabaseData struct {
tableData map[string]*TableData
sync.RWMutex
}

func LoadMemoryDB(ctx context.Context) context.Context {
tableData := make(map[string]*TableData)
return context.WithValue(ctx, dbKey, &DatabaseData{
TableData: map[string]*optimization.TableData{},
tableData: tableData,
})
}

Expand All @@ -35,7 +57,29 @@ func GetMemoryDB(ctx context.Context) *DatabaseData {
return db
}

func (d *DatabaseData) GetOrCreateTableData(tableName string) *TableData {
d.Lock()
defer d.Unlock()

table, exists := d.tableData[tableName]
if !exists {
table = &TableData{
Mutex: sync.Mutex{},
}
d.tableData[tableName] = table
}

return table
}

func (d *DatabaseData) ClearTableConfig(tableName string) {
// WARNING: before you call this, LOCK the table.
delete(d.TableData, tableName)
d.Lock()
defer d.Unlock()

d.tableData[tableName].Wipe()
return
}

func (d *DatabaseData) TableData() map[string]*TableData {
return d.tableData
}
Loading

0 comments on commit 193d3bd

Please sign in to comment.