diff --git a/README.md b/README.md
index 896669539..ea1c4f8d9 100644
--- a/README.md
+++ b/README.md
@@ -2,12 +2,12 @@
Artie Transfer
⚡️ Blazing fast data replication between OLTP and OLAP databases ⚡️
-
-
+
+
- Learn more »
+ Learn more »
@@ -51,7 +51,7 @@ To run Artie Transfer's stack locally, please refer to the [examples folder](htt
## Getting started
-[Getting started guide](https://docs.artie.so/running-transfer/overview)
+[Getting started guide](https://docs.artie.com/running-transfer/overview)
## What is currently supported?
Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Currently Transfer supports:
@@ -60,14 +60,14 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur
- Kafka (default)
- Google Pub/Sub
-- [Destinations](https://docs.artie.so/real-time-destinations/overview):
+- [Destinations](https://docs.artie.com/real-time-destinations/overview):
- Snowflake
- BigQuery
- Redshift
- Microsoft SQL Server
- S3
-- [Sources](https://docs.artie.so/real-time-sources/overview):
+- [Sources](https://docs.artie.com/real-time-sources/overview):
- MongoDB
- DocumentDB
- PostgreSQL
@@ -77,15 +77,15 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur
_If the database you are using is not on the list, feel free to file for a [feature request](https://github.com/artie-labs/transfer/issues/new)._
## Configuration File
-* [Artie Transfer configuration file guide](https://docs.artie.so/running-transfer/options)
-* [Examples of configuration files](https://docs.artie.so/running-transfer/examples)
+* [Artie Transfer configuration file guide](https://docs.artie.com/running-transfer/options)
+* [Examples of configuration files](https://docs.artie.com/running-transfer/examples)
## Telemetry
-[Artie Transfer's telemetry guide](https://docs.artie.so/telemetry/overview)
+[Artie Transfer's telemetry guide](https://docs.artie.com/telemetry/overview)
## Tests
-Transfer is written in Go and uses [counterfeiter](https://github.com/maxbrunsfeld/counterfeiter) to mock.
+Transfer is written in Go and uses [counterfeiter](https://github.com/maxbrunsfeld/counterfeiter) to mock.
To run the tests, run the following commands:
```sh
diff --git a/clients/shared/merge.go b/clients/shared/merge.go
index 754c3f140..7b3f06927 100644
--- a/clients/shared/merge.go
+++ b/clients/shared/merge.go
@@ -93,7 +93,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
for attempts := 0; attempts < backfillMaxRetries; attempts++ {
backfillErr = BackfillColumn(cfg, dwh, col, tableID)
if backfillErr == nil {
- tableConfig.Columns().UpsertColumn(col.RawName(), columns.UpsertColumnArg{
+ tableConfig.Columns().UpsertColumn(col.Name(), columns.UpsertColumnArg{
Backfilled: ptr.ToBool(true),
})
break
@@ -110,7 +110,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
}
if backfillErr != nil {
- return fmt.Errorf("failed to backfill col: %s, default value: %v, err: %w", col.RawName(), col.RawDefaultValue(), backfillErr)
+ return fmt.Errorf("failed to backfill col: %s, default value: %v, err: %w", col.Name(), col.RawDefaultValue(), backfillErr)
}
}
@@ -123,7 +123,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
TableID: tableID,
SubQuery: subQuery,
IdempotentKey: tableData.TopicConfig().IdempotentKey,
- PrimaryKeys: tableData.PrimaryKeys(dwh.Dialect()),
+ PrimaryKeys: tableData.PrimaryKeys(),
Columns: tableData.ReadOnlyInMemoryCols(),
SoftDelete: tableData.TopicConfig().SoftDelete,
DestKind: dwh.Label(),
diff --git a/clients/shared/utils.go b/clients/shared/utils.go
index 664fcf7ff..45e971ccf 100644
--- a/clients/shared/utils.go
+++ b/clients/shared/utils.go
@@ -30,7 +30,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col
return fmt.Errorf("failed to escape default value: %w", err)
}
- escapedCol := column.Name(dwh.Dialect())
+ escapedCol := dwh.Dialect().QuoteIdentifier(column.Name())
// TODO: This is added because `default` is not technically a column that requires escaping, but it is required when it's in the where clause.
// Once we escape everything by default, we can remove this patch of code.
@@ -45,7 +45,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col
tableID.FullyQualifiedName(), escapedCol, defaultVal, additionalEscapedCol,
)
slog.Info("Backfilling column",
- slog.String("colName", column.RawName()),
+ slog.String("colName", column.Name()),
slog.String("query", query),
slog.String("table", tableID.FullyQualifiedName()),
)
diff --git a/clients/snowflake/ddl_test.go b/clients/snowflake/ddl_test.go
index c2e8a3e77..61c607576 100644
--- a/clients/snowflake/ddl_test.go
+++ b/clients/snowflake/ddl_test.go
@@ -41,7 +41,7 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {
nameCol := columns.NewColumn("name", typing.String)
tc := s.stageStore.configMap.TableConfig(tableID)
- val := tc.ShouldDeleteColumn(nameCol.RawName(), time.Now().Add(-1*6*time.Hour), true)
+ val := tc.ShouldDeleteColumn(nameCol.Name(), time.Now().Add(-1*6*time.Hour), true)
assert.False(s.T(), val, "should not try to delete this column")
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(tableID).ReadOnlyColumnsToDelete()), 1)
@@ -68,23 +68,23 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {
nameCol := columns.NewColumn("name", typing.String)
// Let's try to delete name.
- allowed := s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(),
+ allowed := s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(),
time.Now().Add(-1*(6*time.Hour)), true)
assert.Equal(s.T(), allowed, false, "should not be allowed to delete")
// Process tried to delete, but it's lagged.
- allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(),
+ allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(),
time.Now().Add(-1*(6*time.Hour)), true)
assert.Equal(s.T(), allowed, false, "should not be allowed to delete")
// Process now caught up, and is asking if we can delete, should still be no.
- allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(), time.Now(), true)
+ allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(), time.Now(), true)
assert.Equal(s.T(), allowed, false, "should not be allowed to delete still")
// Process is finally ahead, has permission to delete now.
- allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(),
+ allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(),
time.Now().Add(2*constants.DeletionConfidencePadding), true)
assert.Equal(s.T(), allowed, true, "should now be allowed to delete")
diff --git a/clients/snowflake/snowflake.go b/clients/snowflake/snowflake.go
index 7a2b4f903..155251fa9 100644
--- a/clients/snowflake/snowflake.go
+++ b/clients/snowflake/snowflake.go
@@ -128,10 +128,7 @@ func (s *Store) reestablishConnection() error {
}
func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
- var primaryKeysEscaped []string
- for _, pk := range primaryKeys {
- primaryKeysEscaped = append(primaryKeysEscaped, s.Dialect().QuoteIdentifier(pk))
- }
+ primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, s.Dialect())
orderColsToIterate := primaryKeysEscaped
if topicConfig.IncludeArtieUpdatedAt {
diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go
index 2528dfae4..bac39ef83 100644
--- a/clients/snowflake/staging.go
+++ b/clients/snowflake/staging.go
@@ -12,6 +12,7 @@ import (
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
+ "github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/values"
@@ -83,7 +84,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
// COPY the CSV file (in Snowflake) into a table
copyCommand := fmt.Sprintf("COPY INTO %s (%s) FROM (SELECT %s FROM @%s)",
tempTableID.FullyQualifiedName(),
- strings.Join(tableData.ReadOnlyInMemoryCols().GetEscapedColumnsToUpdate(s.Dialect()), ","),
+ strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","),
escapeColumns(tableData.ReadOnlyInMemoryCols(), ","), addPrefixToTableName(tempTableID, "%"))
if additionalSettings.AdditionalCopyClause != "" {
diff --git a/examples/mongodb/README.md b/examples/mongodb/README.md
index ec7c92e57..cd3730feb 100644
--- a/examples/mongodb/README.md
+++ b/examples/mongodb/README.md
@@ -30,8 +30,8 @@ docker-compose -f docker-compose.yaml exec mongodb bash -c '/usr/local/bin/init-
# Now, if you want to connect to the Mongo shell and insert more data, go right ahead
docker-compose -f docker-compose.yaml exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory'
db.customers.insert([
- { _id : NumberLong("1020"), first_name : 'Robin',
- last_name : 'Tang', email : 'robin@artie.so', unique_id : UUID(),
+ { _id : NumberLong("1020"), first_name : 'Robin',
+ last_name : 'Tang', email : 'robin@example.com', unique_id : UUID(),
test_bool_false: false, test_bool_true: true, new_id: ObjectId(),
test_decimal: NumberDecimal("13.37"), test_int: NumberInt("1337"),
test_decimal_2: 13.37, test_list: [1, 2, 3, 4, "hello"], test_null: null, test_ts: Timestamp(42, 1), test_nested_object: {a: { b: { c: "hello"}}}}
diff --git a/examples/pubsub_postgres/README.md b/examples/pubsub_postgres/README.md
index 8ae4e77b4..bd56163c0 100644
--- a/examples/pubsub_postgres/README.md
+++ b/examples/pubsub_postgres/README.md
@@ -1,5 +1,5 @@
# Postgres Example
-This example requires additional configuration on the Pub/Sub side.
+This example requires additional configuration on the Pub/Sub side.
-Please see https://docs.artie.so/tutorials/setting-up-pub-sub for further details.
+Please see https://docs.artie.com/tutorials/setting-up-pub-sub for further details.
diff --git a/lib/cdc/mongo/debezium_test.go b/lib/cdc/mongo/debezium_test.go
index d01e21726..eba0dc9a2 100644
--- a/lib/cdc/mongo/debezium_test.go
+++ b/lib/cdc/mongo/debezium_test.go
@@ -142,7 +142,7 @@ func (p *MongoTestSuite) TestMongoDBEventCustomer() {
"schema": {},
"payload": {
"before": null,
- "after": "{\"_id\": {\"$numberLong\": \"1003\"},\"first_name\": \"Robin\",\"last_name\": \"Tang\",\"email\": \"robin@artie.so\", \"nested\": {\"object\": \"foo\"}}",
+ "after": "{\"_id\": {\"$numberLong\": \"1003\"},\"first_name\": \"Robin\",\"last_name\": \"Tang\",\"email\": \"robin@example.com\", \"nested\": {\"object\": \"foo\"}}",
"patch": null,
"filter": null,
"updateDescription": null,
@@ -176,7 +176,7 @@ func (p *MongoTestSuite) TestMongoDBEventCustomer() {
assert.Equal(p.T(), evtData["_id"], 1003)
assert.Equal(p.T(), evtData["first_name"], "Robin")
assert.Equal(p.T(), evtData["last_name"], "Tang")
- assert.Equal(p.T(), evtData["email"], "robin@artie.so")
+ assert.Equal(p.T(), evtData["email"], "robin@example.com")
evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
assert.NoError(p.T(), err)
diff --git a/lib/cdc/mysql/debezium_test.go b/lib/cdc/mysql/debezium_test.go
index 783ab83c8..49f6fb760 100644
--- a/lib/cdc/mysql/debezium_test.go
+++ b/lib/cdc/mysql/debezium_test.go
@@ -353,7 +353,7 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() {
col, isOk := cols.GetColumn("abcdef")
assert.True(m.T(), isOk)
- assert.Equal(m.T(), "abcdef", col.RawName())
+ assert.Equal(m.T(), "abcdef", col.Name())
for key := range evtData {
if strings.Contains(key, constants.ArtiePrefix) {
continue
@@ -361,6 +361,6 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() {
col, isOk = cols.GetColumn(strings.ToLower(key))
assert.Equal(m.T(), true, isOk, key)
- assert.Equal(m.T(), typing.Invalid, col.KindDetails, fmt.Sprintf("colName: %v, evtData key: %v", col.RawName(), key))
+ assert.Equal(m.T(), typing.Invalid, col.KindDetails, fmt.Sprintf("colName: %v, evtData key: %v", col.Name(), key))
}
}
diff --git a/lib/cdc/util/relational_data_test.go b/lib/cdc/util/relational_data_test.go
index beeb05186..f85253e13 100644
--- a/lib/cdc/util/relational_data_test.go
+++ b/lib/cdc/util/relational_data_test.go
@@ -3,7 +3,7 @@ package util
const (
MySQLDelete = `{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":null,"source":{"version":"2.0.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1711381272000,"snapshot":"false","db":"inventory","sequence":null,"table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":569,"row":0,"thread":11,"query":null},"op":"d","ts_ms":1711381272702,"transaction":null}}`
MySQLUpdate = `{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"after":{"id":1003,"first_name":"Dusty","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"2.0.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1711381320000,"snapshot":"false","db":"inventory","sequence":null,"table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":912,"row":0,"thread":11,"query":null},"op":"u","ts_ms":1711381320962,"transaction":null}}`
- MySQLInsert = `{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"The Dust","last_name":"Tang","email":"dusty@artie.so"},"source":{"version":"2.0.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1711381357000,"snapshot":"false","db":"inventory","sequence":null,"table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1276,"row":0,"thread":11,"query":null},"op":"c","ts_ms":1711381357622,"transaction":null}}`
+ MySQLInsert = `{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"The Dust","last_name":"Tang","email":"dusty@example.com"},"source":{"version":"2.0.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1711381357000,"snapshot":"false","db":"inventory","sequence":null,"table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1276,"row":0,"thread":11,"query":null},"op":"c","ts_ms":1711381357622,"transaction":null}}`
PostgresDelete = `{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":null,"source":{"version":"2.5.0.Final","connector":"postgresql","name":"dbserver1","ts_ms":1711381709158,"snapshot":"false","db":"postgres","sequence":"[null,\"36450928\"]","schema":"inventory","table":"customers","txId":792,"lsn":36450928,"xmin":null},"op":"d","ts_ms":1711381709586,"transaction":null}}`
PostgresUpdate = `{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"},{"type":"boolean","optional":true,"field":"boolean_test"},{"type":"boolean","optional":true,"field":"bool_test"},{"type":"boolean","optional":true,"field":"bit_test"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":true,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable scaled decimal","field":"numeric_test"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"5"},"field":"numeric_5"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"5"},"field":"numeric_5_2"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"6","connect.decimal.precision":"5"},"field":"numeric_5_6"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"5"},"field":"numeric_5_0"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"39"},"field":"numeric_39_0"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"39"},"field":"numeric_39_2"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"6","connect.decimal.precision":"39"},"field":"numeric_39_6"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"},{"type":"boolean","optional":true,"field":"boolean_test"},{"type":"boolean","optional":true,"field":"bool_test"},{"type":"boolean","optional":true,"field":"bit_test"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":true,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable scaled decimal","field":"numeric_test"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"5"},"field":"numeric_5"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"5"},"field":"numeric_5_2"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"6","connect.decimal.precision":"5"},"field":"numeric_5_6"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"5"},"field":"numeric_5_0"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"0","connect.decimal.precision":"39"},"field":"numeric_39_0"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"39"},"field":"numeric_39_2"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"6","connect.decimal.precision":"39"},"field":"numeric_39_6"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com","boolean_test":true,"bool_test":false,"bit_test":false,"numeric_test":null,"numeric_5":null,"numeric_5_2":null,"numeric_5_6":null,"numeric_5_0":null,"numeric_39_0":null,"numeric_39_2":null,"numeric_39_6":null},"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com","boolean_test":true,"bool_test":false,"bit_test":false,"numeric_test":{"scale":3,"value":"B1vNFQ=="},"numeric_5":"BNI=","numeric_5_2":"AOHJ","numeric_5_6":"W6A=","numeric_5_0":"BQ==","numeric_39_0":"LA//uAAAAAAAAAAAAAAAAA==","numeric_39_2":"LBAD0S5LtA8eEEfNAAAAFg==","numeric_39_6":"HOB1x8wbGatWdikB4kA="},"source":{"version":"2.5.0.Final","connector":"postgresql","name":"dbserver1","ts_ms":1711381838401,"snapshot":"false","db":"postgres","sequence":"[\"37133376\",\"37158360\"]","schema":"inventory","table":"customers","txId":806,"lsn":37158360,"xmin":null},"op":"u","ts_ms":1711381838845,"transaction":null}}`
diff --git a/lib/cdc/util/relational_event_test.go b/lib/cdc/util/relational_event_test.go
index e5a421ed7..72d903864 100644
--- a/lib/cdc/util/relational_event_test.go
+++ b/lib/cdc/util/relational_event_test.go
@@ -74,8 +74,8 @@ func TestSource_GetOptionalSchema(t *testing.T) {
for _, _col := range cols.GetColumns() {
// All the other columns do not have a default value.
- if _col.RawName() != "boolean_column" {
- assert.Nil(t, _col.RawDefaultValue(), _col.RawName())
+ if _col.Name() != "boolean_column" {
+ assert.Nil(t, _col.RawDefaultValue(), _col.Name())
}
}
}
diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go
index 72c193811..675dc5941 100644
--- a/lib/destination/ddl/ddl.go
+++ b/lib/destination/ddl/ddl.go
@@ -91,7 +91,7 @@ func (a AlterTableArgs) AlterTable(cols ...columns.Column) error {
}
if a.ColumnOp == constants.Delete {
- if !a.Tc.ShouldDeleteColumn(col.RawName(), a.CdcTime, a.ContainOtherOperations) {
+ if !a.Tc.ShouldDeleteColumn(col.Name(), a.CdcTime, a.ContainOtherOperations) {
continue
}
}
@@ -99,7 +99,7 @@ func (a AlterTableArgs) AlterTable(cols ...columns.Column) error {
mutateCol = append(mutateCol, col)
switch a.ColumnOp {
case constants.Add:
- colName := col.Name(a.Dwh.Dialect())
+ colName := a.Dwh.Dialect().QuoteIdentifier(col.Name())
if col.PrimaryKey() && a.Mode != config.History {
// Don't create a PK for history mode because it's append-only, so the primary key should not be enforced.
@@ -108,7 +108,7 @@ func (a AlterTableArgs) AlterTable(cols ...columns.Column) error {
colSQLParts = append(colSQLParts, fmt.Sprintf(`%s %s`, colName, typing.KindToDWHType(col.KindDetails, a.Dwh.Label(), col.PrimaryKey())))
case constants.Delete:
- colSQLParts = append(colSQLParts, col.Name(a.Dwh.Dialect()))
+ colSQLParts = append(colSQLParts, a.Dwh.Dialect().QuoteIdentifier(col.Name()))
}
}
diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go
index e756d0a1f..babea1977 100644
--- a/lib/destination/ddl/ddl_bq_test.go
+++ b/lib/destination/ddl/ddl_bq_test.go
@@ -86,7 +86,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() {
assert.NoError(d.T(), alterTableArgs.AlterTable(column))
query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx)
- assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s drop COLUMN %s", fqName, column.Name(d.bigQueryStore.Dialect())), query)
+ assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s drop COLUMN %s", fqName, d.bigQueryStore.Dialect().QuoteIdentifier(column.Name())), query)
callIdx += 1
}
@@ -143,7 +143,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() {
assert.NoError(d.T(), alterTableArgs.AlterTable(col))
query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx)
- assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, col.Name(d.bigQueryStore.Dialect()),
+ assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(col.Name()),
typing.KindToDWHType(kind, d.bigQueryStore.Label(), false)), query)
callIdx += 1
}
@@ -152,10 +152,10 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() {
assert.Equal(d.T(), newColsLen+existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns())
// Check by iterating over the columns
for _, column := range d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns().GetColumns() {
- existingCol, isOk := existingCols.GetColumn(column.RawName())
+ existingCol, isOk := existingCols.GetColumn(column.Name())
if !isOk {
// Check new cols?
- existingCol.KindDetails, isOk = newCols[column.RawName()]
+ existingCol.KindDetails, isOk = newCols[column.Name()]
}
assert.True(d.T(), isOk)
@@ -202,7 +202,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() {
assert.NoError(d.T(), alterTableArgs.AlterTable(column))
query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx)
- assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, column.Name(d.bigQueryStore.Dialect()),
+ assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(column.Name()),
typing.KindToDWHType(column.KindDetails, d.bigQueryStore.Label(), false)), query)
callIdx += 1
}
@@ -211,7 +211,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() {
assert.Equal(d.T(), existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns())
// Check by iterating over the columns
for _, column := range d.bigQueryStore.GetConfigMap().TableConfig(tableID).Columns().GetColumns() {
- existingCol, isOk := existingCols.GetColumn(column.RawName())
+ existingCol, isOk := existingCols.GetColumn(column.Name())
assert.True(d.T(), isOk)
assert.Equal(d.T(), column.KindDetails, existingCol.KindDetails)
}
diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go
index 80c2b5ded..deae9dfd8 100644
--- a/lib/destination/ddl/ddl_sflk_test.go
+++ b/lib/destination/ddl/ddl_sflk_test.go
@@ -42,7 +42,7 @@ func (d *DDLTestSuite) TestAlterComplexObjects() {
for i := 0; i < len(cols); i++ {
execQuery, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(i)
assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s add COLUMN %s %s", `shop.public."COMPLEX_COLUMNS"`,
- cols[i].Name(d.snowflakeStagesStore.Dialect()),
+ d.snowflakeStagesStore.Dialect().QuoteIdentifier(cols[i].Name()),
typing.KindToDWHType(cols[i].KindDetails, d.snowflakeStagesStore.Label(), false)), execQuery)
}
@@ -108,15 +108,15 @@ func (d *DDLTestSuite) TestAlterTableAdd() {
for _, column := range tableConfig.Columns().GetColumns() {
var found bool
for _, expCol := range cols {
- if found = column.RawName() == expCol.RawName(); found {
- assert.Equal(d.T(), column.KindDetails, expCol.KindDetails, fmt.Sprintf("wrong col kind, col: %s", column.RawName()))
+ if found = column.Name() == expCol.Name(); found {
+ assert.Equal(d.T(), column.KindDetails, expCol.KindDetails, fmt.Sprintf("wrong col kind, col: %s", column.Name()))
break
}
}
assert.True(d.T(), found,
fmt.Sprintf("Col not found: %s, actual list: %v, expected list: %v",
- column.RawName(), tableConfig.Columns(), cols))
+ column.Name(), tableConfig.Columns(), cols))
}
}
@@ -150,7 +150,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() {
for col := range tableConfig.ReadOnlyColumnsToDelete() {
var found bool
for _, expCol := range cols {
- if found = col == expCol.RawName(); found {
+ if found = col == expCol.Name(); found {
break
}
}
@@ -161,7 +161,7 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() {
}
for i := 0; i < len(cols); i++ {
- colToActuallyDelete := cols[i].RawName()
+ colToActuallyDelete := cols[i].Name()
// Now let's check the timestamp
assert.True(d.T(), tableConfig.ReadOnlyColumnsToDelete()[colToActuallyDelete].After(time.Now()))
// Now let's actually try to dial the time back, and it should actually try to delete.
@@ -172,7 +172,8 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() {
execArg, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(i)
assert.Equal(d.T(), execArg, fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", `shop.public."USERS"`, constants.Delete,
- cols[i].Name(d.snowflakeStagesStore.Dialect())))
+ d.snowflakeStagesStore.Dialect().QuoteIdentifier(cols[i].Name()),
+ ))
}
}
@@ -214,7 +215,7 @@ func (d *DDLTestSuite) TestAlterTableDelete() {
for col := range tableConfig.ReadOnlyColumnsToDelete() {
var found bool
for _, expCol := range cols {
- if found = col == expCol.RawName(); found {
+ if found = col == expCol.Name(); found {
break
}
}
diff --git a/lib/destination/dml/merge.go b/lib/destination/dml/merge.go
index 2ca9cb393..2c5b67eb2 100644
--- a/lib/destination/dml/merge.go
+++ b/lib/destination/dml/merge.go
@@ -3,6 +3,7 @@ package dml
import (
"errors"
"fmt"
+ "slices"
"strings"
"github.com/artie-labs/transfer/lib/array"
@@ -17,7 +18,7 @@ type MergeArgument struct {
TableID types.TableIdentifier
SubQuery string
IdempotentKey string
- PrimaryKeys []columns.Wrapper
+ PrimaryKeys []columns.Column
// AdditionalEqualityStrings is used for handling BigQuery partitioned table merges
AdditionalEqualityStrings []string
@@ -65,6 +66,12 @@ func (m *MergeArgument) Valid() error {
return nil
}
+func removeDeleteColumnMarker(columns []string) ([]string, bool) {
+ origLength := len(columns)
+ columns = slices.DeleteFunc(columns, func(col string) bool { return col == constants.DeleteColumnMarker })
+ return columns, len(columns) != origLength
+}
+
func (m *MergeArgument) GetParts() ([]string, error) {
if err := m.Valid(); err != nil {
return nil, err
@@ -94,28 +101,30 @@ func (m *MergeArgument) GetParts() ([]string, error) {
var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
- equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(), primaryKey.EscapedName())
+ quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name())
+ equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey)
equalitySQLParts = append(equalitySQLParts, equalitySQL)
}
- cols := m.Columns.GetEscapedColumnsToUpdate(m.Dialect)
+ columns := m.Columns.GetColumnsToUpdate()
if m.SoftDelete {
return []string{
// INSERT
fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`,
// insert into target (col1, col2, col3)
- m.TableID.FullyQualifiedName(), strings.Join(cols, ","),
+ m.TableID.FullyQualifiedName(), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
// SELECT cc.col1, cc.col2, ... FROM staging as CC
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
- Vals: cols,
+ Vals: sql.QuoteIdentifiers(columns, m.Dialect),
Separator: ",",
Prefix: "cc.",
}), m.SubQuery,
// LEFT JOIN table on pk(s)
m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "),
// Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts)
- m.PrimaryKeys[0].EscapedName()),
+ m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].Name()),
+ ),
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s;`,
// UPDATE table set col1 = cc. col1
@@ -128,38 +137,32 @@ func (m *MergeArgument) GetParts() ([]string, error) {
// We also need to remove __artie flags since it does not exist in the destination table
var removed bool
- for idx, col := range cols {
- if col == m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker) {
- cols = append(cols[:idx], cols[idx+1:]...)
- removed = true
- break
- }
- }
-
+ columns, removed = removeDeleteColumnMarker(columns)
if !removed {
return nil, errors.New("artie delete flag doesn't exist")
}
var pks []string
for _, pk := range m.PrimaryKeys {
- pks = append(pks, pk.EscapedName())
+ pks = append(pks, m.Dialect.QuoteIdentifier(pk.Name()))
}
parts := []string{
// INSERT
fmt.Sprintf(`INSERT INTO %s (%s) SELECT %s FROM %s as cc LEFT JOIN %s as c on %s WHERE c.%s IS NULL;`,
// insert into target (col1, col2, col3)
- m.TableID.FullyQualifiedName(), strings.Join(cols, ","),
+ m.TableID.FullyQualifiedName(), strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
// SELECT cc.col1, cc.col2, ... FROM staging as CC
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
- Vals: cols,
+ Vals: sql.QuoteIdentifiers(columns, m.Dialect),
Separator: ",",
Prefix: "cc.",
}), m.SubQuery,
// LEFT JOIN table on pk(s)
m.TableID.FullyQualifiedName(), strings.Join(equalitySQLParts, " and "),
// Where PK is NULL (we only need to specify one primary key since it's covered with equalitySQL parts)
- m.PrimaryKeys[0].EscapedName()),
+ m.Dialect.QuoteIdentifier(m.PrimaryKeys[0].Name()),
+ ),
// UPDATE
fmt.Sprintf(`UPDATE %s as c SET %s FROM %s as cc WHERE %s%s AND COALESCE(cc.%s, false) = false;`,
// UPDATE table set col1 = cc. col1
@@ -207,15 +210,17 @@ func (m *MergeArgument) GetStatement() (string, error) {
var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
- equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(), primaryKey.EscapedName())
- pkCol, isOk := m.Columns.GetColumn(primaryKey.RawName())
+ quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name())
+
+ equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey)
+ pkCol, isOk := m.Columns.GetColumn(primaryKey.Name())
if !isOk {
- return "", fmt.Errorf("column: %s does not exist in columnToType: %v", primaryKey.RawName(), m.Columns)
+ return "", fmt.Errorf("column: %s does not exist in columnToType: %v", primaryKey.Name(), m.Columns)
}
if m.DestKind == constants.BigQuery && pkCol.KindDetails.Kind == typing.Struct.Kind {
// BigQuery requires special casting to compare two JSON objects.
- equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey.EscapedName(), primaryKey.EscapedName())
+ equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", quotedPrimaryKey, quotedPrimaryKey)
}
equalitySQLParts = append(equalitySQLParts, equalitySQL)
@@ -230,7 +235,7 @@ func (m *MergeArgument) GetStatement() (string, error) {
equalitySQLParts = append(equalitySQLParts, m.AdditionalEqualityStrings...)
}
- cols := m.Columns.GetEscapedColumnsToUpdate(m.Dialect)
+ columns := m.Columns.GetColumnsToUpdate()
if m.SoftDelete {
return fmt.Sprintf(`
@@ -241,9 +246,9 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
// Update + Soft Deletion
idempotentClause, m.Columns.UpdateQuery(m.Dialect, false),
// Insert
- constants.DeleteColumnMarker, strings.Join(cols, ","),
+ constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
- Vals: cols,
+ Vals: sql.QuoteIdentifiers(columns, m.Dialect),
Separator: ",",
Prefix: "cc.",
})), nil
@@ -251,14 +256,7 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
// We also need to remove __artie flags since it does not exist in the destination table
var removed bool
- for idx, col := range cols {
- if col == m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker) {
- cols = append(cols[:idx], cols[idx+1:]...)
- removed = true
- break
- }
- }
-
+ columns, removed = removeDeleteColumnMarker(columns)
if !removed {
return "", errors.New("artie delete flag doesn't exist")
}
@@ -274,9 +272,9 @@ WHEN NOT MATCHED AND IFNULL(cc.%s, false) = false THEN INSERT (%s) VALUES (%s);`
// Update
constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.Dialect, true),
// Insert
- constants.DeleteColumnMarker, strings.Join(cols, ","),
+ constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
- Vals: cols,
+ Vals: sql.QuoteIdentifiers(columns, m.Dialect),
Separator: ",",
Prefix: "cc.",
})), nil
@@ -295,11 +293,12 @@ func (m *MergeArgument) GetMSSQLStatement() (string, error) {
var equalitySQLParts []string
for _, primaryKey := range m.PrimaryKeys {
// We'll need to escape the primary key as well.
- equalitySQL := fmt.Sprintf("c.%s = cc.%s", primaryKey.EscapedName(), primaryKey.EscapedName())
+ quotedPrimaryKey := m.Dialect.QuoteIdentifier(primaryKey.Name())
+ equalitySQL := fmt.Sprintf("c.%s = cc.%s", quotedPrimaryKey, quotedPrimaryKey)
equalitySQLParts = append(equalitySQLParts, equalitySQL)
}
- cols := m.Columns.GetEscapedColumnsToUpdate(m.Dialect)
+ columns := m.Columns.GetColumnsToUpdate()
if m.SoftDelete {
return fmt.Sprintf(`
@@ -311,9 +310,9 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
// Update + Soft Deletion
idempotentClause, m.Columns.UpdateQuery(m.Dialect, false),
// Insert
- constants.DeleteColumnMarker, strings.Join(cols, ","),
+ constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
- Vals: cols,
+ Vals: sql.QuoteIdentifiers(columns, m.Dialect),
Separator: ",",
Prefix: "cc.",
})), nil
@@ -321,14 +320,7 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 0) = 0 THEN INSERT (%s) VALUES (%s);`,
// We also need to remove __artie flags since it does not exist in the destination table
var removed bool
- for idx, col := range cols {
- if col == m.Dialect.QuoteIdentifier(constants.DeleteColumnMarker) {
- cols = append(cols[:idx], cols[idx+1:]...)
- removed = true
- break
- }
- }
-
+ columns, removed = removeDeleteColumnMarker(columns)
if !removed {
return "", errors.New("artie delete flag doesn't exist")
}
@@ -345,9 +337,9 @@ WHEN NOT MATCHED AND COALESCE(cc.%s, 1) = 0 THEN INSERT (%s) VALUES (%s);`,
// Update
constants.DeleteColumnMarker, idempotentClause, m.Columns.UpdateQuery(m.Dialect, true),
// Insert
- constants.DeleteColumnMarker, strings.Join(cols, ","),
+ constants.DeleteColumnMarker, strings.Join(sql.QuoteIdentifiers(columns, m.Dialect), ","),
array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{
- Vals: cols,
+ Vals: sql.QuoteIdentifiers(columns, m.Dialect),
Separator: ",",
Prefix: "cc.",
})), nil
diff --git a/lib/destination/dml/merge_bigquery_test.go b/lib/destination/dml/merge_bigquery_test.go
index 726ffa439..355c8229e 100644
--- a/lib/destination/dml/merge_bigquery_test.go
+++ b/lib/destination/dml/merge_bigquery_test.go
@@ -19,7 +19,7 @@ func TestMergeStatement_TempTable(t *testing.T) {
mergeArg := &MergeArgument{
TableID: MockTableIdentifier{"customers.orders"},
SubQuery: "customers.orders_tmp",
- PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("order_id", typing.Invalid), sql.BigQueryDialect{})},
+ PrimaryKeys: []columns.Column{columns.NewColumn("order_id", typing.Invalid)},
Columns: &cols,
DestKind: constants.BigQuery,
Dialect: sql.BigQueryDialect{},
@@ -41,7 +41,7 @@ func TestMergeStatement_JSONKey(t *testing.T) {
mergeArg := &MergeArgument{
TableID: MockTableIdentifier{"customers.orders"},
SubQuery: "customers.orders_tmp",
- PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("order_oid", typing.Invalid), sql.BigQueryDialect{})},
+ PrimaryKeys: []columns.Column{columns.NewColumn("order_oid", typing.Invalid)},
Columns: &cols,
DestKind: constants.BigQuery,
Dialect: sql.BigQueryDialect{},
diff --git a/lib/destination/dml/merge_mssql_test.go b/lib/destination/dml/merge_mssql_test.go
index 505d333da..15613eb7f 100644
--- a/lib/destination/dml/merge_mssql_test.go
+++ b/lib/destination/dml/merge_mssql_test.go
@@ -44,7 +44,7 @@ func Test_GetMSSQLStatement(t *testing.T) {
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: "",
- PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), sql.MSSQLDialect{})},
+ PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)},
Columns: &_cols,
DestKind: constants.MSSQL,
Dialect: sql.MSSQLDialect{},
diff --git a/lib/destination/dml/merge_parts_test.go b/lib/destination/dml/merge_parts_test.go
index 1b7038a70..7c7f3ce28 100644
--- a/lib/destination/dml/merge_parts_test.go
+++ b/lib/destination/dml/merge_parts_test.go
@@ -25,7 +25,7 @@ func TestMergeStatementPartsValidation(t *testing.T) {
}
type result struct {
- PrimaryKeys []columns.Wrapper
+ PrimaryKeys []columns.Column
ColumnsToTypes columns.Columns
}
@@ -47,11 +47,11 @@ func getBasicColumnsForTest(compositeKey bool) result {
cols.AddColumn(textToastCol)
cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))
- var pks []columns.Wrapper
- pks = append(pks, columns.NewWrapper(idCol, sql.MSSQLDialect{}))
+ var pks []columns.Column
+ pks = append(pks, idCol)
if compositeKey {
- pks = append(pks, columns.NewWrapper(emailCol, sql.MSSQLDialect{}))
+ pks = append(pks, emailCol)
}
return result{
diff --git a/lib/destination/dml/merge_test.go b/lib/destination/dml/merge_test.go
index 2cd93ae9e..0f0081bf3 100644
--- a/lib/destination/dml/merge_test.go
+++ b/lib/destination/dml/merge_test.go
@@ -32,6 +32,39 @@ func (m MockTableIdentifier) FullyQualifiedName() string {
return m.fqName
}
+func TestRemoveDeleteColumnMarker(t *testing.T) {
+ {
+ columns, removed := removeDeleteColumnMarker([]string{})
+ assert.Empty(t, columns)
+ assert.False(t, removed)
+ }
+ {
+ columns, removed := removeDeleteColumnMarker([]string{"a"})
+ assert.Equal(t, []string{"a"}, columns)
+ assert.False(t, removed)
+ }
+ {
+ columns, removed := removeDeleteColumnMarker([]string{"a", "b"})
+ assert.Equal(t, []string{"a", "b"}, columns)
+ assert.False(t, removed)
+ }
+ {
+ columns, removed := removeDeleteColumnMarker([]string{constants.DeleteColumnMarker})
+ assert.True(t, removed)
+ assert.Empty(t, columns)
+ }
+ {
+ columns, removed := removeDeleteColumnMarker([]string{"a", constants.DeleteColumnMarker, "b"})
+ assert.True(t, removed)
+ assert.Equal(t, []string{"a", "b"}, columns)
+ }
+ {
+ columns, removed := removeDeleteColumnMarker([]string{"a", constants.DeleteColumnMarker, "b", constants.DeleteColumnMarker, "c"})
+ assert.True(t, removed)
+ assert.Equal(t, []string{"a", "b", "c"}, columns)
+ }
+}
+
func TestMergeStatementSoftDelete(t *testing.T) {
// No idempotent key
fqTable := "database.schema.table"
@@ -56,16 +89,15 @@ func TestMergeStatementSoftDelete(t *testing.T) {
_cols.AddColumn(columns.NewColumn("id", typing.String))
_cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))
- dialect := sql.SnowflakeDialect{}
for _, idempotentKey := range []string{"", "updated_at"} {
mergeArg := MergeArgument{
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: idempotentKey,
- PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)},
+ PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)},
Columns: &_cols,
DestKind: constants.Snowflake,
- Dialect: dialect,
+ Dialect: sql.SnowflakeDialect{},
SoftDelete: true,
}
@@ -107,15 +139,14 @@ func TestMergeStatement(t *testing.T) {
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))
- dialect := sql.SnowflakeDialect{}
mergeArg := MergeArgument{
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: "",
- PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)},
+ PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)},
Columns: &_cols,
DestKind: constants.Snowflake,
- Dialect: dialect,
+ Dialect: sql.SnowflakeDialect{},
SoftDelete: false,
}
@@ -156,15 +187,14 @@ func TestMergeStatementIdempotentKey(t *testing.T) {
_cols.AddColumn(columns.NewColumn("id", typing.String))
_cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))
- dialect := sql.SnowflakeDialect{}
mergeArg := MergeArgument{
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: "updated_at",
- PrimaryKeys: []columns.Wrapper{columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect)},
+ PrimaryKeys: []columns.Column{columns.NewColumn("id", typing.Invalid)},
Columns: &_cols,
DestKind: constants.Snowflake,
- Dialect: dialect,
+ Dialect: sql.SnowflakeDialect{},
SoftDelete: false,
}
@@ -199,18 +229,17 @@ func TestMergeStatementCompositeKey(t *testing.T) {
_cols.AddColumn(columns.NewColumn("another_id", typing.String))
_cols.AddColumn(columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))
- dialect := sql.SnowflakeDialect{}
mergeArg := MergeArgument{
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: "updated_at",
- PrimaryKeys: []columns.Wrapper{
- columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect),
- columns.NewWrapper(columns.NewColumn("another_id", typing.Invalid), dialect),
+ PrimaryKeys: []columns.Column{
+ columns.NewColumn("id", typing.Invalid),
+ columns.NewColumn("another_id", typing.Invalid),
},
Columns: &_cols,
DestKind: constants.Snowflake,
- Dialect: dialect,
+ Dialect: sql.SnowflakeDialect{},
SoftDelete: false,
}
@@ -249,18 +278,17 @@ func TestMergeStatementEscapePrimaryKeys(t *testing.T) {
subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)",
strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ","))
- dialect := sql.SnowflakeDialect{}
mergeArg := MergeArgument{
TableID: MockTableIdentifier{fqTable},
SubQuery: subQuery,
IdempotentKey: "",
- PrimaryKeys: []columns.Wrapper{
- columns.NewWrapper(columns.NewColumn("id", typing.Invalid), dialect),
- columns.NewWrapper(columns.NewColumn("group", typing.Invalid), dialect),
+ PrimaryKeys: []columns.Column{
+ columns.NewColumn("id", typing.Invalid),
+ columns.NewColumn("group", typing.Invalid),
},
Columns: &_cols,
DestKind: constants.Snowflake,
- Dialect: dialect,
+ Dialect: sql.SnowflakeDialect{},
SoftDelete: false,
}
diff --git a/lib/destination/dml/merge_valid_test.go b/lib/destination/dml/merge_valid_test.go
index 71101833b..fc7114762 100644
--- a/lib/destination/dml/merge_valid_test.go
+++ b/lib/destination/dml/merge_valid_test.go
@@ -13,8 +13,8 @@ import (
)
func TestMergeArgument_Valid(t *testing.T) {
- primaryKeys := []columns.Wrapper{
- columns.NewWrapper(columns.NewColumn("id", typing.Integer), sql.SnowflakeDialect{}),
+ primaryKeys := []columns.Column{
+ columns.NewColumn("id", typing.Integer),
}
var cols columns.Columns
diff --git a/lib/destination/types/table_config.go b/lib/destination/types/table_config.go
index 755f5b66c..79d6515a0 100644
--- a/lib/destination/types/table_config.go
+++ b/lib/destination/types/table_config.go
@@ -64,15 +64,15 @@ func (d *DwhTableConfig) MutateInMemoryColumns(createTable bool, columnOp consta
for _, col := range cols {
d.columns.AddColumn(col)
// Delete from the permissions table, if exists.
- delete(d.columnsToDelete, col.RawName())
+ delete(d.columnsToDelete, col.Name())
}
d.createTable = createTable
case constants.Delete:
for _, col := range cols {
// Delete from the permissions and in-memory table
- d.columns.DeleteColumn(col.RawName())
- delete(d.columnsToDelete, col.RawName())
+ d.columns.DeleteColumn(col.Name())
+ delete(d.columnsToDelete, col.Name())
}
}
}
@@ -91,7 +91,7 @@ func (d *DwhTableConfig) AuditColumnsToDelete(colsToDelete []columns.Column) {
for colName := range d.columnsToDelete {
var found bool
for _, col := range colsToDelete {
- if found = col.RawName() == colName; found {
+ if found = col.Name() == colName; found {
break
}
}
diff --git a/lib/optimization/event_bench_test.go b/lib/optimization/event_bench_test.go
index 20a6d45d2..fc7987918 100644
--- a/lib/optimization/event_bench_test.go
+++ b/lib/optimization/event_bench_test.go
@@ -31,7 +31,7 @@ func BenchmarkTableData_ApproxSize_WideTable(b *testing.B) {
"favorite_fruits": []string{"strawberry", "kiwi", "oranges"},
"random": false,
"team": []string{"charlie", "jacqueline"},
- "email": "robin@artie.so",
+ "email": "robin@example.com",
"favorite_languages": []string{"go", "sql"},
"favorite_databases": []string{"postgres", "bigtable"},
"created_at": time.Now(),
diff --git a/lib/optimization/event_update_test.go b/lib/optimization/event_update_test.go
index 2d5fa5e78..a61b04e2e 100644
--- a/lib/optimization/event_update_test.go
+++ b/lib/optimization/event_update_test.go
@@ -74,16 +74,16 @@ func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) {
// Testing backfill
for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() {
- assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
+ assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.Name())
}
backfilledCol := columns.NewColumn("bool_backfill", typing.Boolean)
backfilledCol.SetBackfilled(true)
assert.NoError(t, tableData.MergeColumnsFromDestination(backfilledCol))
for _, inMemoryCol := range tableData.inMemoryColumns.GetColumns() {
- if inMemoryCol.RawName() == backfilledCol.RawName() {
- assert.True(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
+ if inMemoryCol.Name() == backfilledCol.Name() {
+ assert.True(t, inMemoryCol.Backfilled(), inMemoryCol.Name())
} else {
- assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.RawName())
+ assert.False(t, inMemoryCol.Backfilled(), inMemoryCol.Name())
}
}
diff --git a/lib/optimization/table_data.go b/lib/optimization/table_data.go
index 39402b00e..09017077e 100644
--- a/lib/optimization/table_data.go
+++ b/lib/optimization/table_data.go
@@ -10,7 +10,6 @@ import (
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/size"
- "github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
@@ -67,10 +66,10 @@ func (t *TableData) ContainOtherOperations() bool {
return t.containOtherOperations
}
-func (t *TableData) PrimaryKeys(dialect sql.Dialect) []columns.Wrapper {
- var pks []columns.Wrapper
+func (t *TableData) PrimaryKeys() []columns.Column {
+ var pks []columns.Column
for _, pk := range t.primaryKeys {
- pks = append(pks, columns.NewWrapper(columns.NewColumn(pk, typing.Invalid), dialect))
+ pks = append(pks, columns.NewColumn(pk, typing.Invalid))
}
return pks
@@ -258,9 +257,9 @@ func (t *TableData) MergeColumnsFromDestination(destCols ...columns.Column) erro
var foundColumn columns.Column
var found bool
for _, destCol := range destCols {
- if destCol.RawName() == strings.ToLower(inMemoryCol.RawName()) {
+ if destCol.Name() == strings.ToLower(inMemoryCol.Name()) {
if destCol.KindDetails.Kind == typing.Invalid.Kind {
- return fmt.Errorf("column %q is invalid", destCol.RawName())
+ return fmt.Errorf("column %q is invalid", destCol.Name())
}
foundColumn = destCol
diff --git a/lib/optimization/table_data_test.go b/lib/optimization/table_data_test.go
index 74cea7f52..5a4e8f449 100644
--- a/lib/optimization/table_data_test.go
+++ b/lib/optimization/table_data_test.go
@@ -145,7 +145,7 @@ func TestTableData_UpdateInMemoryColumns(t *testing.T) {
assert.True(t, isOk)
extCol.KindDetails.ExtendedTimeDetails.Format = time.RFC3339Nano
- tableData.inMemoryColumns.UpdateColumn(columns.NewColumn(extCol.RawName(), extCol.KindDetails))
+ tableData.inMemoryColumns.UpdateColumn(columns.NewColumn(extCol.Name(), extCol.KindDetails))
for name, colKindDetails := range map[string]typing.KindDetails{
"foo": typing.String,
diff --git a/lib/parquetutil/generate_schema.go b/lib/parquetutil/generate_schema.go
index dfbbf1490..8f02657d2 100644
--- a/lib/parquetutil/generate_schema.go
+++ b/lib/parquetutil/generate_schema.go
@@ -12,7 +12,7 @@ func GenerateJSONSchema(columns []columns.Column) (string, error) {
var fields []typing.Field
for _, column := range columns {
// We don't need to escape the column name here.
- field, err := column.KindDetails.ParquetAnnotation(column.RawName())
+ field, err := column.KindDetails.ParquetAnnotation(column.Name())
if err != nil {
return "", err
}
diff --git a/lib/size/size_bench_test.go b/lib/size/size_bench_test.go
index 423800c15..5031cd8f7 100644
--- a/lib/size/size_bench_test.go
+++ b/lib/size/size_bench_test.go
@@ -31,7 +31,7 @@ func BenchmarkGetApproxSize_WideTable(b *testing.B) {
"favorite_fruits": []string{"strawberry", "kiwi", "oranges"},
"random": false,
"team": []string{"charlie", "jacqueline"},
- "email": "robin@artie.so",
+ "email": "robin@example.com",
"favorite_languages": []string{"go", "sql"},
"favorite_databases": []string{"postgres", "bigtable"},
"created_at": time.Now(),
diff --git a/lib/sql/util.go b/lib/sql/util.go
index 9a8150f89..b7dab812e 100644
--- a/lib/sql/util.go
+++ b/lib/sql/util.go
@@ -13,3 +13,11 @@ import (
func QuoteLiteral(value string) string {
return fmt.Sprintf("'%s'", strings.ReplaceAll(stringutil.EscapeBackslashes(value), "'", `\'`))
}
+
+func QuoteIdentifiers(identifiers []string, dialect Dialect) []string {
+ result := make([]string, len(identifiers))
+ for i, identifier := range identifiers {
+ result[i] = dialect.QuoteIdentifier(identifier)
+ }
+ return result
+}
diff --git a/lib/sql/util_test.go b/lib/sql/util_test.go
index 89ea11320..1be71dfb9 100644
--- a/lib/sql/util_test.go
+++ b/lib/sql/util_test.go
@@ -38,3 +38,8 @@ func TestQuoteLiteral(t *testing.T) {
assert.Equal(t, testCase.expected, QuoteLiteral(testCase.colVal), testCase.name)
}
}
+
+func TestQuoteIdentifiers(t *testing.T) {
+ assert.Equal(t, []string{}, QuoteIdentifiers([]string{}, BigQueryDialect{}))
+ assert.Equal(t, []string{"`a`", "`b`", "`c`"}, QuoteIdentifiers([]string{"a", "b", "c"}, BigQueryDialect{}))
+}
diff --git a/lib/telemetry/README.md b/lib/telemetry/README.md
index e2a5de654..9fbb4541d 100644
--- a/lib/telemetry/README.md
+++ b/lib/telemetry/README.md
@@ -1,3 +1,3 @@
# Telemetry
-Artie Transfer's docs have moved! Please visit this link to see documentation regarding Transfer's telemetry package. https://docs.artie.so/telemetry/overview
+Artie Transfer's docs have moved! Please visit this link to see documentation regarding Transfer's telemetry package. https://docs.artie.com/telemetry/overview
diff --git a/lib/typing/columns/columns.go b/lib/typing/columns/columns.go
index a6b752a5c..19ffef0c3 100644
--- a/lib/typing/columns/columns.go
+++ b/lib/typing/columns/columns.go
@@ -79,15 +79,10 @@ func (c *Column) ShouldBackfill() bool {
return c.defaultValue != nil && !c.backfilled
}
-func (c *Column) RawName() string {
+func (c *Column) Name() string {
return c.name
}
-// Name will give you c.name and escape it if necessary.
-func (c *Column) Name(dialect sql.Dialect) string {
- return dialect.QuoteIdentifier(c.name)
-}
-
type Columns struct {
columns []Column
sync.RWMutex
@@ -188,29 +183,7 @@ func (c *Columns) GetColumnsToUpdate() []string {
continue
}
- cols = append(cols, col.RawName())
- }
-
- return cols
-}
-
-// GetEscapedColumnsToUpdate will filter all the `Invalid` columns so that we do not update it.
-// It will escape the returned columns.
-func (c *Columns) GetEscapedColumnsToUpdate(dialect sql.Dialect) []string {
- if c == nil {
- return []string{}
- }
-
- c.RLock()
- defer c.RUnlock()
-
- var cols []string
- for _, col := range c.columns {
- if col.KindDetails == typing.Invalid {
- continue
- }
-
- cols = append(cols, col.Name(dialect))
+ cols = append(cols, col.Name())
}
return cols
@@ -263,11 +236,11 @@ func (c *Columns) UpdateQuery(dialect sql.Dialect, skipDeleteCol bool) string {
}
// skipDeleteCol is useful because we don't want to copy the deleted column over to the source table if we're doing a hard row delete.
- if skipDeleteCol && column.RawName() == constants.DeleteColumnMarker {
+ if skipDeleteCol && column.Name() == constants.DeleteColumnMarker {
continue
}
- colName := column.Name(dialect)
+ colName := dialect.QuoteIdentifier(column.Name())
if column.ToastColumn {
if column.KindDetails == typing.Struct {
cols = append(cols, processToastStructCol(colName, dialect))
diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go
index 80e392b47..0ed03b152 100644
--- a/lib/typing/columns/columns_test.go
+++ b/lib/typing/columns/columns_test.go
@@ -132,49 +132,6 @@ func TestColumn_ShouldBackfill(t *testing.T) {
}
}
-func TestColumn_Name(t *testing.T) {
- type _testCase struct {
- colName string
- expectedName string
- // Snowflake
- expectedNameEsc string
- // BigQuery
- expectedNameEscBq string
- }
-
- testCases := []_testCase{
- {
- colName: "start",
- expectedName: "start",
- expectedNameEsc: `"START"`, // since this is a reserved word.
- expectedNameEscBq: "`start`", // BQ escapes via backticks.
- },
- {
- colName: "foo",
- expectedName: "foo",
- expectedNameEsc: `"FOO"`,
- expectedNameEscBq: "`foo`",
- },
- {
- colName: "bar",
- expectedName: "bar",
- expectedNameEsc: `"BAR"`,
- expectedNameEscBq: "`bar`",
- },
- }
-
- for _, testCase := range testCases {
- col := &Column{
- name: testCase.colName,
- }
-
- assert.Equal(t, testCase.expectedName, col.RawName(), testCase.colName)
-
- assert.Equal(t, testCase.expectedNameEsc, col.Name(sql.SnowflakeDialect{}), testCase.colName)
- assert.Equal(t, testCase.expectedNameEscBq, col.Name(sql.BigQueryDialect{}), testCase.colName)
- }
-}
-
func TestColumns_GetColumnsToUpdate(t *testing.T) {
type _testCase struct {
name string
@@ -229,64 +186,6 @@ func TestColumns_GetColumnsToUpdate(t *testing.T) {
}
}
-func TestColumns_GetEscapedColumnsToUpdate(t *testing.T) {
- type _testCase struct {
- name string
- cols []Column
- expectedColsEsc []string
- expectedColsEscBq []string
- }
-
- var (
- happyPathCols = []Column{
- {
- name: "hi",
- KindDetails: typing.String,
- },
- {
- name: "bye",
- KindDetails: typing.String,
- },
- {
- name: "start",
- KindDetails: typing.String,
- },
- }
- )
-
- extraCols := happyPathCols
- for i := 0; i < 100; i++ {
- extraCols = append(extraCols, Column{
- name: fmt.Sprintf("hello_%v", i),
- KindDetails: typing.Invalid,
- })
- }
-
- testCases := []_testCase{
- {
- name: "happy path",
- cols: happyPathCols,
- expectedColsEsc: []string{`"HI"`, `"BYE"`, `"START"`},
- expectedColsEscBq: []string{"`hi`", "`bye`", "`start`"},
- },
- {
- name: "happy path + extra col",
- cols: extraCols,
- expectedColsEsc: []string{`"HI"`, `"BYE"`, `"START"`},
- expectedColsEscBq: []string{"`hi`", "`bye`", "`start`"},
- },
- }
-
- for _, testCase := range testCases {
- cols := &Columns{
- columns: testCase.cols,
- }
-
- assert.Equal(t, testCase.expectedColsEsc, cols.GetEscapedColumnsToUpdate(sql.SnowflakeDialect{}), testCase.name)
- assert.Equal(t, testCase.expectedColsEscBq, cols.GetEscapedColumnsToUpdate(sql.BigQueryDialect{}), testCase.name)
- }
-}
-
func TestColumns_UpsertColumns(t *testing.T) {
keys := []string{"a", "b", "c", "d", "e"}
var cols Columns
diff --git a/lib/typing/columns/diff.go b/lib/typing/columns/diff.go
index ac22c27d5..f717bca1f 100644
--- a/lib/typing/columns/diff.go
+++ b/lib/typing/columns/diff.go
@@ -40,7 +40,7 @@ func Diff(columnsInSource *Columns, columnsInDestination *Columns, softDelete bo
targ := CloneColumns(columnsInDestination)
var colsToDelete []Column
for _, col := range src.GetColumns() {
- _, isOk := targ.GetColumn(col.RawName())
+ _, isOk := targ.GetColumn(col.Name())
if isOk {
colsToDelete = append(colsToDelete, col)
@@ -49,13 +49,13 @@ func Diff(columnsInSource *Columns, columnsInDestination *Columns, softDelete bo
// We cannot delete inside a for-loop that is iterating over src.GetColumns() because we are messing up the array order.
for _, colToDelete := range colsToDelete {
- src.DeleteColumn(colToDelete.RawName())
- targ.DeleteColumn(colToDelete.RawName())
+ src.DeleteColumn(colToDelete.Name())
+ targ.DeleteColumn(colToDelete.Name())
}
var targetColumnsMissing Columns
for _, col := range src.GetColumns() {
- if shouldSkipColumn(col.RawName(), softDelete, includeArtieUpdatedAt, includeDatabaseUpdatedAt, mode) {
+ if shouldSkipColumn(col.Name(), softDelete, includeArtieUpdatedAt, includeDatabaseUpdatedAt, mode) {
continue
}
@@ -64,7 +64,7 @@ func Diff(columnsInSource *Columns, columnsInDestination *Columns, softDelete bo
var sourceColumnsMissing Columns
for _, col := range targ.GetColumns() {
- if shouldSkipColumn(col.RawName(), softDelete, includeArtieUpdatedAt, includeDatabaseUpdatedAt, mode) {
+ if shouldSkipColumn(col.Name(), softDelete, includeArtieUpdatedAt, includeDatabaseUpdatedAt, mode) {
continue
}
diff --git a/lib/typing/columns/diff_test.go b/lib/typing/columns/diff_test.go
index 93a24e90e..8609cedeb 100644
--- a/lib/typing/columns/diff_test.go
+++ b/lib/typing/columns/diff_test.go
@@ -226,7 +226,7 @@ func TestDiffDeterministic(t *testing.T) {
var key string
for _, targetKeyMissing := range targetKeysMissing {
- key += targetKeyMissing.RawName()
+ key += targetKeyMissing.Name()
}
retMap[key] = false
diff --git a/lib/typing/columns/wrapper.go b/lib/typing/columns/wrapper.go
deleted file mode 100644
index 2d79d4845..000000000
--- a/lib/typing/columns/wrapper.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package columns
-
-import (
- "github.com/artie-labs/transfer/lib/sql"
-)
-
-type Wrapper struct {
- name string
- escapedName string
-}
-
-func NewWrapper(col Column, dialect sql.Dialect) Wrapper {
- return Wrapper{
- name: col.name,
- escapedName: col.Name(dialect),
- }
-}
-
-func (w Wrapper) EscapedName() string {
- return w.escapedName
-}
-
-func (w Wrapper) RawName() string {
- return w.name
-}
diff --git a/lib/typing/columns/wrapper_test.go b/lib/typing/columns/wrapper_test.go
deleted file mode 100644
index 95a54a649..000000000
--- a/lib/typing/columns/wrapper_test.go
+++ /dev/null
@@ -1,65 +0,0 @@
-package columns
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-
- "github.com/artie-labs/transfer/lib/sql"
-
- "github.com/artie-labs/transfer/lib/typing"
-)
-
-func TestWrapper_Complete(t *testing.T) {
- type _testCase struct {
- name string
- expectedRawName string
- expectedEscapedName string
- expectedEscapedNameBQ string
- }
-
- testCases := []_testCase{
- {
- name: "happy",
- expectedRawName: "happy",
- expectedEscapedName: `"HAPPY"`,
- expectedEscapedNameBQ: "`happy`",
- },
- {
- name: "user_id",
- expectedRawName: "user_id",
- expectedEscapedName: `"USER_ID"`,
- expectedEscapedNameBQ: "`user_id`",
- },
- {
- name: "group",
- expectedRawName: "group",
- expectedEscapedName: `"GROUP"`,
- expectedEscapedNameBQ: "`group`",
- },
- }
-
- for _, testCase := range testCases {
- // Snowflake escape
- w := NewWrapper(NewColumn(testCase.name, typing.Invalid), sql.SnowflakeDialect{})
-
- assert.Equal(t, testCase.expectedEscapedName, w.EscapedName(), testCase.name)
- assert.Equal(t, testCase.expectedRawName, w.RawName(), testCase.name)
-
- // BigQuery escape
- w = NewWrapper(NewColumn(testCase.name, typing.Invalid), sql.BigQueryDialect{})
-
- assert.Equal(t, testCase.expectedEscapedNameBQ, w.EscapedName(), testCase.name)
- assert.Equal(t, testCase.expectedRawName, w.RawName(), testCase.name)
-
- {
- w = NewWrapper(NewColumn(testCase.name, typing.Invalid), sql.SnowflakeDialect{})
- assert.Equal(t, testCase.expectedRawName, w.RawName(), testCase.name)
- }
- {
- w = NewWrapper(NewColumn(testCase.name, typing.Invalid), sql.BigQueryDialect{})
- assert.Equal(t, testCase.expectedRawName, w.RawName(), testCase.name)
- }
-
- }
-}
diff --git a/models/event/event_save_test.go b/models/event/event_save_test.go
index d37b7f2d0..6c7c98f01 100644
--- a/models/event/event_save_test.go
+++ b/models/event/event_save_test.go
@@ -48,7 +48,7 @@ func (e *EventsTestSuite) TestSaveEvent() {
// Check the in-memory DB columns.
var found int
for _, col := range optimization.ReadOnlyInMemoryCols().GetColumns() {
- if col.RawName() == expectedLowerCol || col.RawName() == anotherLowerCol {
+ if col.Name() == expectedLowerCol || col.Name() == anotherLowerCol {
found += 1
}
@@ -183,16 +183,16 @@ func (e *EventsTestSuite) TestEvent_SaveColumnsNoData() {
td := e.db.GetOrCreateTableData("non_existent")
var prevKey string
for _, col := range td.ReadOnlyInMemoryCols().GetColumns() {
- if col.RawName() == constants.DeleteColumnMarker {
+ if col.Name() == constants.DeleteColumnMarker {
continue
}
if prevKey == "" {
- prevKey = col.RawName()
+ prevKey = col.Name()
continue
}
- currentKeyParsed, err := strconv.Atoi(col.RawName())
+ currentKeyParsed, err := strconv.Atoi(col.Name())
assert.NoError(e.T(), err)
prevKeyParsed, err := strconv.Atoi(prevKey)
@@ -206,7 +206,7 @@ func (e *EventsTestSuite) TestEvent_SaveColumnsNoData() {
evt.Columns.AddColumn(columns.NewColumn("foo", typing.Invalid))
var index int
for idx, col := range evt.Columns.GetColumns() {
- if col.RawName() == "foo" {
+ if col.Name() == "foo" {
index = idx
}
}