Skip to content

Commit

Permalink
Merge branch 'master' into nv/always-uppercase-escaped-snowflake-names
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored May 2, 2024
2 parents 3df0774 + a152157 commit 8a27a8f
Show file tree
Hide file tree
Showing 38 changed files with 195 additions and 382 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
<img height="150px" src="https://github.com/artie-labs/transfer/assets/4412200/238df0c7-6087-4ddc-b83b-24638212af6a"/>
<h3>Artie Transfer</h3>
<p><b>⚡️ Blazing fast data replication between OLTP and OLAP databases ⚡️</b></p>
<a href="https://artie.so/slack"><img src="https://img.shields.io/badge/[email protected]?logo=slack"/></a>
<a href="https://docs.artie.so/running-transfer/overview"><img src="https://user-images.githubusercontent.com/4412200/226736695-6b8b9abd-c227-41c7-89a1-805a04c90d08.png"/></a>
<a href="https://artie.com/slack"><img src="https://img.shields.io/badge/[email protected]?logo=slack"/></a>
<a href="https://docs.artie.com/running-transfer/overview"><img src="https://user-images.githubusercontent.com/4412200/226736695-6b8b9abd-c227-41c7-89a1-805a04c90d08.png"/></a>
<a href="https://github.com/artie-labs/transfer/blob/master/LICENSE.txt"><img src="https://user-images.githubusercontent.com/4412200/201544613-a7197bc4-8b61-4fc5-bf09-68ee10133fd7.svg"/></a>
<img src="https://github.com/artie-labs/transfer/actions/workflows/gha-go-test.yml/badge.svg"/>
<br/>
<b><a target="_blank" href="https://artie.so" >Learn more »</a></b>
<b><a target="_blank" href="https://artie.com" >Learn more »</a></b>
</div>
<br/>

Expand Down Expand Up @@ -51,7 +51,7 @@ To run Artie Transfer's stack locally, please refer to the [examples folder](htt

## <a name="getting-started"></a>Getting started

[Getting started guide](https://docs.artie.so/running-transfer/overview)
[Getting started guide](https://docs.artie.com/running-transfer/overview)

## What is currently supported?
Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Currently Transfer supports:
Expand All @@ -60,14 +60,14 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur
- Kafka (default)
- Google Pub/Sub

- [Destinations](https://docs.artie.so/real-time-destinations/overview):
- [Destinations](https://docs.artie.com/real-time-destinations/overview):
- Snowflake
- BigQuery
- Redshift
- Microsoft SQL Server
- S3

- [Sources](https://docs.artie.so/real-time-sources/overview):
- [Sources](https://docs.artie.com/real-time-sources/overview):
- MongoDB
- DocumentDB
- PostgreSQL
Expand All @@ -77,15 +77,15 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur
_If the database you are using is not on the list, feel free to file for a [feature request](https://github.com/artie-labs/transfer/issues/new)._

## Configuration File
* [Artie Transfer configuration file guide](https://docs.artie.so/running-transfer/options)
* [Examples of configuration files](https://docs.artie.so/running-transfer/examples)
* [Artie Transfer configuration file guide](https://docs.artie.com/running-transfer/options)
* [Examples of configuration files](https://docs.artie.com/running-transfer/examples)

## Telemetry

[Artie Transfer's telemetry guide](https://docs.artie.so/telemetry/overview)
[Artie Transfer's telemetry guide](https://docs.artie.com/telemetry/overview)

## Tests
Transfer is written in Go and uses [counterfeiter](https://github.com/maxbrunsfeld/counterfeiter) to mock.
Transfer is written in Go and uses [counterfeiter](https://github.com/maxbrunsfeld/counterfeiter) to mock.
To run the tests, run the following commands:

```sh
Expand Down
6 changes: 3 additions & 3 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
for attempts := 0; attempts < backfillMaxRetries; attempts++ {
backfillErr = BackfillColumn(cfg, dwh, col, tableID)
if backfillErr == nil {
tableConfig.Columns().UpsertColumn(col.RawName(), columns.UpsertColumnArg{
tableConfig.Columns().UpsertColumn(col.Name(), columns.UpsertColumnArg{
Backfilled: ptr.ToBool(true),
})
break
Expand All @@ -110,7 +110,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
}

if backfillErr != nil {
return fmt.Errorf("failed to backfill col: %s, default value: %v, err: %w", col.RawName(), col.RawDefaultValue(), backfillErr)
return fmt.Errorf("failed to backfill col: %s, default value: %v, err: %w", col.Name(), col.RawDefaultValue(), backfillErr)
}
}

Expand All @@ -123,7 +123,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, cfg
TableID: tableID,
SubQuery: subQuery,
IdempotentKey: tableData.TopicConfig().IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys(dwh.Dialect()),
PrimaryKeys: tableData.PrimaryKeys(),
Columns: tableData.ReadOnlyInMemoryCols(),
SoftDelete: tableData.TopicConfig().SoftDelete,
DestKind: dwh.Label(),
Expand Down
4 changes: 2 additions & 2 deletions clients/shared/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col
return fmt.Errorf("failed to escape default value: %w", err)
}

escapedCol := column.Name(dwh.Dialect())
escapedCol := dwh.Dialect().QuoteIdentifier(column.Name())

// TODO: This is added because `default` is not technically a column that requires escaping, but it is required when it's in the where clause.
// Once we escape everything by default, we can remove this patch of code.
Expand All @@ -45,7 +45,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col
tableID.FullyQualifiedName(), escapedCol, defaultVal, additionalEscapedCol,
)
slog.Info("Backfilling column",
slog.String("colName", column.RawName()),
slog.String("colName", column.Name()),
slog.String("query", query),
slog.String("table", tableID.FullyQualifiedName()),
)
Expand Down
10 changes: 5 additions & 5 deletions clients/snowflake/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *SnowflakeTestSuite) TestMutateColumnsWithMemoryCacheDeletions() {
nameCol := columns.NewColumn("name", typing.String)
tc := s.stageStore.configMap.TableConfig(tableID)

val := tc.ShouldDeleteColumn(nameCol.RawName(), time.Now().Add(-1*6*time.Hour), true)
val := tc.ShouldDeleteColumn(nameCol.Name(), time.Now().Add(-1*6*time.Hour), true)
assert.False(s.T(), val, "should not try to delete this column")
assert.Equal(s.T(), len(s.stageStore.configMap.TableConfig(tableID).ReadOnlyColumnsToDelete()), 1)

Expand All @@ -68,23 +68,23 @@ func (s *SnowflakeTestSuite) TestShouldDeleteColumn() {

nameCol := columns.NewColumn("name", typing.String)
// Let's try to delete name.
allowed := s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(),
allowed := s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(),
time.Now().Add(-1*(6*time.Hour)), true)

assert.Equal(s.T(), allowed, false, "should not be allowed to delete")

// Process tried to delete, but it's lagged.
allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(),
allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(),
time.Now().Add(-1*(6*time.Hour)), true)

assert.Equal(s.T(), allowed, false, "should not be allowed to delete")

// Process now caught up, and is asking if we can delete, should still be no.
allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(), time.Now(), true)
allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(), time.Now(), true)
assert.Equal(s.T(), allowed, false, "should not be allowed to delete still")

// Process is finally ahead, has permission to delete now.
allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.RawName(),
allowed = s.stageStore.configMap.TableConfig(tableID).ShouldDeleteColumn(nameCol.Name(),
time.Now().Add(2*constants.DeletionConfidencePadding), true)

assert.Equal(s.T(), allowed, true, "should now be allowed to delete")
Expand Down
5 changes: 1 addition & 4 deletions clients/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,7 @@ func (s *Store) reestablishConnection() error {
}

func (s *Store) generateDedupeQueries(tableID, stagingTableID types.TableIdentifier, primaryKeys []string, topicConfig kafkalib.TopicConfig) []string {
var primaryKeysEscaped []string
for _, pk := range primaryKeys {
primaryKeysEscaped = append(primaryKeysEscaped, s.Dialect().QuoteIdentifier(pk))
}
primaryKeysEscaped := sql.QuoteIdentifiers(primaryKeys, s.Dialect())

orderColsToIterate := primaryKeysEscaped
if topicConfig.IncludeArtieUpdatedAt {
Expand Down
3 changes: 2 additions & 1 deletion clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/values"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableCo
// COPY the CSV file (in Snowflake) into a table
copyCommand := fmt.Sprintf("COPY INTO %s (%s) FROM (SELECT %s FROM @%s)",
tempTableID.FullyQualifiedName(),
strings.Join(tableData.ReadOnlyInMemoryCols().GetEscapedColumnsToUpdate(s.Dialect()), ","),
strings.Join(sql.QuoteIdentifiers(tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), s.Dialect()), ","),
escapeColumns(tableData.ReadOnlyInMemoryCols(), ","), addPrefixToTableName(tempTableID, "%"))

if additionalSettings.AdditionalCopyClause != "" {
Expand Down
4 changes: 2 additions & 2 deletions examples/mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ docker-compose -f docker-compose.yaml exec mongodb bash -c '/usr/local/bin/init-
# Now, if you want to connect to the Mongo shell and insert more data, go right ahead
docker-compose -f docker-compose.yaml exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory'
db.customers.insert([
{ _id : NumberLong("1020"), first_name : 'Robin',
last_name : 'Tang', email : 'robin@artie.so', unique_id : UUID(),
{ _id : NumberLong("1020"), first_name : 'Robin',
last_name : 'Tang', email : 'robin@example.com', unique_id : UUID(),
test_bool_false: false, test_bool_true: true, new_id: ObjectId(),
test_decimal: NumberDecimal("13.37"), test_int: NumberInt("1337"),
test_decimal_2: 13.37, test_list: [1, 2, 3, 4, "hello"], test_null: null, test_ts: Timestamp(42, 1), test_nested_object: {a: { b: { c: "hello"}}}}
Expand Down
4 changes: 2 additions & 2 deletions examples/pubsub_postgres/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Postgres Example

This example requires additional configuration on the Pub/Sub side.
This example requires additional configuration on the Pub/Sub side.

Please see https://docs.artie.so/tutorials/setting-up-pub-sub for further details.
Please see https://docs.artie.com/tutorials/setting-up-pub-sub for further details.
4 changes: 2 additions & 2 deletions lib/cdc/mongo/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (p *MongoTestSuite) TestMongoDBEventCustomer() {
"schema": {},
"payload": {
"before": null,
"after": "{\"_id\": {\"$numberLong\": \"1003\"},\"first_name\": \"Robin\",\"last_name\": \"Tang\",\"email\": \"robin@artie.so\", \"nested\": {\"object\": \"foo\"}}",
"after": "{\"_id\": {\"$numberLong\": \"1003\"},\"first_name\": \"Robin\",\"last_name\": \"Tang\",\"email\": \"robin@example.com\", \"nested\": {\"object\": \"foo\"}}",
"patch": null,
"filter": null,
"updateDescription": null,
Expand Down Expand Up @@ -176,7 +176,7 @@ func (p *MongoTestSuite) TestMongoDBEventCustomer() {
assert.Equal(p.T(), evtData["_id"], 1003)
assert.Equal(p.T(), evtData["first_name"], "Robin")
assert.Equal(p.T(), evtData["last_name"], "Tang")
assert.Equal(p.T(), evtData["email"], "robin@artie.so")
assert.Equal(p.T(), evtData["email"], "robin@example.com")

evtDataWithIncludedAt, err := evt.GetData(map[string]any{"_id": 1003}, &kafkalib.TopicConfig{})
assert.NoError(p.T(), err)
Expand Down
4 changes: 2 additions & 2 deletions lib/cdc/mysql/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,14 @@ func (m *MySQLTestSuite) TestGetEventFromBytes() {

col, isOk := cols.GetColumn("abcdef")
assert.True(m.T(), isOk)
assert.Equal(m.T(), "abcdef", col.RawName())
assert.Equal(m.T(), "abcdef", col.Name())
for key := range evtData {
if strings.Contains(key, constants.ArtiePrefix) {
continue
}

col, isOk = cols.GetColumn(strings.ToLower(key))
assert.Equal(m.T(), true, isOk, key)
assert.Equal(m.T(), typing.Invalid, col.KindDetails, fmt.Sprintf("colName: %v, evtData key: %v", col.RawName(), key))
assert.Equal(m.T(), typing.Invalid, col.KindDetails, fmt.Sprintf("colName: %v, evtData key: %v", col.Name(), key))
}
}
Loading

0 comments on commit 8a27a8f

Please sign in to comment.