Skip to content

Commit

Permalink
bulker: batches: operate only with columns provided in current batch …
Browse files Browse the repository at this point in the history
…events

bulker: clickhouse: ignore cluster parameter for clickhouse.cloud hosts
  • Loading branch information
absorbb committed Nov 19, 2024
1 parent 90a4cd3 commit 483cf06
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 17 deletions.
18 changes: 13 additions & 5 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ type AbstractSQLStream struct {

unmappedDataColumn string

// initial columns count in the destination table
initialColumnsCount int
// columns added to the destination table during processing.
addedColumns int

startTime time.Time
}

Expand Down Expand Up @@ -168,7 +173,7 @@ func (ps *AbstractSQLStream) init(ctx context.Context) error {
// object values that can't be casted will be added to '_unmaped_data' column of JSON type as an json object
// returns true if new column was added to the currentTable as a result of this function call
func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable, desiredTable *Table, values types.Object) bool {
columnsAdded := false
columnsAdded := 0
current := currentTable.Columns
unmappedObj := map[string]any{}
for el := desiredTable.Columns.Front(); el != nil; el = el.Next() {
Expand All @@ -182,7 +187,7 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
if !ok {
existingCol, ok = current.Get(name)
if !ok {
if ps.schemaFreeze || current.Len() >= ps.maxColumnsCount {
if ps.schemaFreeze || ps.initialColumnsCount+ps.addedColumns+columnsAdded >= ps.maxColumnsCount {
// when schemaFreeze=true all new columns values go to _unmapped_data
v, ok := values.Get(name)
if ok {
Expand All @@ -196,7 +201,7 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
newCol.New = true
}
current.Set(name, newCol)
columnsAdded = true
columnsAdded++
continue
}
}
Expand Down Expand Up @@ -255,15 +260,18 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
if len(unmappedObj) > 0 {
jsonSQLType, _ := ps.sqlAdapter.GetSQLType(types.JSON)
added := current.SetIfAbsent(ps.unmappedDataColumn, types.SQLColumn{DataType: types.JSON, Type: jsonSQLType})
columnsAdded = columnsAdded || added
if added {
columnsAdded++
}
if ps.sqlAdapter.StringifyObjects() {
b, _ := jsoniter.Marshal(unmappedObj)
values.Set(ps.unmappedDataColumn, string(b))
} else {
values.Set(ps.unmappedDataColumn, unmappedObj)
}
}
return columnsAdded
ps.addedColumns += columnsAdded
return columnsAdded > 0
}

func (ps *AbstractSQLStream) updateRepresentationTable(table *Table) {
Expand Down
15 changes: 11 additions & 4 deletions bulkerlib/implementations/sql/autocommit_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (s
return
}
table, processedObject, err := ps.preprocess(object)
if ps.schemaFromOptions != nil {
//just to convert values to schema data types
ps.adjustTableColumnTypes(table, nil, ps.schemaFromOptions, object)
}
if err != nil {
return
}
Expand All @@ -59,6 +55,13 @@ func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (s
return
}
if existingTable.Exists() {
if ps.initialColumnsCount == 0 {
ps.initialColumnsCount = existingTable.ColumnsCount()
}
if ps.schemaFromOptions != nil {
//just to convert values to schema data types
ps.adjustTableColumnTypes(table, existingTable, ps.schemaFromOptions, object)
}
currentTable := existingTable.CloneIfNeeded()
currentTable.PKFields = table.PKFields
columnsAdded := ps.adjustTableColumnTypes(currentTable, existingTable, table, processedObject)
Expand Down Expand Up @@ -94,6 +97,10 @@ func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (s
ps.updateRepresentationTable(currentTable)
err = ps.sqlAdapter.Insert(ctx, currentTable, ps.merge, processedObject)
} else {
if ps.schemaFromOptions != nil {
//just to convert values to schema data types
ps.adjustTableColumnTypes(table, nil, ps.schemaFromOptions, object)
}
existingTable, err = ps.sqlAdapter.TableHelper().EnsureTableWithCaching(ctx, ps.sqlAdapter, ps.id, table)
if err != nil {
err = errorj.Decorate(err, "failed to ensure table")
Expand Down
7 changes: 7 additions & 0 deletions bulkerlib/implementations/sql/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (
)

var (
clickHouseCloudHost = regexp.MustCompile(`^.+\.clickhouse\.cloud(?::\d+$|$)`)

clickhouseTypes = map[types.DataType][]string{
types.STRING: {"String", "%String%", "%CHAR%", "%TEXT%", "%BLOB%", "%Enum%", "%UUID%"},
types.INT64: {"Int64", "Int", "LowCardinality(Int"},
Expand Down Expand Up @@ -176,6 +178,11 @@ func NewClickHouse(bulkerConfig bulkerlib.Config) (bulkerlib.Bulker, error) {
if config.Protocol == ClickHouseProtocolHTTP || config.Protocol == ClickHouseProtocolHTTPS {
httpMode = true
}
chCloud := clickHouseCloudHost.MatchString(config.Hosts[0])
if chCloud {
// ClickHouse Cloud don't need cluster parameter
config.Cluster = ""
}
if config.Parameters == nil {
config.Parameters = map[string]string{}
}
Expand Down
9 changes: 5 additions & 4 deletions bulkerlib/implementations/sql/replacepartition_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream
}
ps.partitionId = partitionId
ps.existingTable, _ = ps.sqlAdapter.GetTableSchema(context.Background(), ps.namespace, ps.tableName)
ps.initialColumnsCount = ps.existingTable.ColumnsCount()
ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) {
dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable.Clone(), tableForObject.WithoutColumns())
ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object)
tmpTable := tableForObject.WithoutColumns()
ps.adjustTableColumnTypes(tmpTable, ps.existingTable, tableForObject, object)
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)
ps.adjustTableColumnTypes(tmpTable, ps.existingTable, ps.schemaFromOptions, object)
}
tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(ps.tableName, 47), time.Now().Format("060102150405"))
return &Table{
Namespace: p.TmpNamespace(ps.namespace),
Name: tmpTableName,
Columns: dstTable.Columns,
Columns: tmpTable.Columns,
Temporary: true,
TimestampColumn: tableForObject.TimestampColumn,
}
Expand Down
24 changes: 24 additions & 0 deletions bulkerlib/implementations/sql/schema_freeze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ func TestSchemaFreeze(t *testing.T) {
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
streamOptions: []bulker.StreamOption{WithSchemaFreeze()},
},
{
name: "added_columns_partial",
tableName: "schema_freeze_test",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream},
leaveResultingTable: true,
dataFile: "test_data/columns_added_partial2.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "id", "name", "column1", "column2", "column3", "_unmapped_data"),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "test", "column1": nil, "column2": nil, "column3": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 2, "name": "test2", "column1": "data", "column2": nil, "column3": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 3, "name": "test3", "column1": "data", "column2": "data", "column3": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 4, "name": "test2", "column1": "data", "column2": nil, "column3": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 5, "name": "test", "column1": nil, "column2": nil, "column3": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 6, "name": "test4", "column1": "data", "column2": "data", "column3": "data", "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 7, "name": "test", "column1": nil, "column2": nil, "column3": nil, "_unmapped_data": "{\"column4\": \"data\"}"},
{"_timestamp": constantTime, "id": 8, "name": "test2", "column1": nil, "column2": nil, "column3": nil, "_unmapped_data": "{\"column5\": \"data\"}"},
{"_timestamp": constantTime, "id": 9, "name": "test9", "column1": nil, "column2": nil, "column3": nil, "_unmapped_data": "{\"column6\": \"data\"}"},
{"_timestamp": constantTime, "id": 10, "name": "test10", "column1": nil, "column2": nil, "column3": nil, "_unmapped_data": "{\"column7\": \"data\"}"},
},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
streamOptions: []bulker.StreamOption{WithSchemaFreeze()},
},
{
name: "dummy_test_table_cleanup",
tableName: "schema_freeze_test",
Expand Down
3 changes: 3 additions & 0 deletions bulkerlib/implementations/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,5 +243,8 @@ func (t *Table) ToSimpleMap() *types2.OrderedMap[string, any] {
}

func (t *Table) ColumnsCount() int {
if t == nil {
return 0
}
return t.Columns.Len()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"_timestamp": "2022-08-18T14:17:22.375Z", "id": 9, "name": "test9","column6": "data"}
{"_timestamp": "2022-08-18T14:17:22.375Z", "id": 10, "name": "test10", "column7": "data"}
9 changes: 5 additions & 4 deletions bulkerlib/implementations/sql/transactional_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt
return nil, err
}
ps.existingTable, _ = ps.sqlAdapter.GetTableSchema(context.Background(), ps.namespace, ps.tableName)
ps.initialColumnsCount = ps.existingTable.ColumnsCount()
ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) {
dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable.Clone(), tableForObject.WithoutColumns())
ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object)
tmpTable := tableForObject.WithoutColumns()
ps.adjustTableColumnTypes(tmpTable, ps.existingTable, tableForObject, object)
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)
ps.adjustTableColumnTypes(tmpTable, ps.existingTable, ps.schemaFromOptions, object)
}
tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(ps.tableName, 47), time.Now().Format("060102150405"))
return &Table{
Namespace: p.TmpNamespace(ps.namespace),
Name: tmpTableName,
Columns: dstTable.Columns,
Columns: tmpTable.Columns,
Temporary: true,
TimestampColumn: tableForObject.TimestampColumn,
}
Expand Down

0 comments on commit 483cf06

Please sign in to comment.