Skip to content

Commit

Permalink
Merge branch 'master' into minor-improvement-bigquery-cast-col
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored May 2, 2024
2 parents 3091fef + a152157 commit 54aafa1
Show file tree
Hide file tree
Showing 70 changed files with 664 additions and 1,160 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
<img height="150px" src="https://github.com/artie-labs/transfer/assets/4412200/238df0c7-6087-4ddc-b83b-24638212af6a"/>
<h3>Artie Transfer</h3>
<p><b>⚡️ Blazing fast data replication between OLTP and OLAP databases ⚡️</b></p>
<a href="https://artie.so/slack"><img src="https://img.shields.io/badge/[email protected]?logo=slack"/></a>
<a href="https://docs.artie.so/running-transfer/overview"><img src="https://user-images.githubusercontent.com/4412200/226736695-6b8b9abd-c227-41c7-89a1-805a04c90d08.png"/></a>
<a href="https://artie.com/slack"><img src="https://img.shields.io/badge/[email protected]?logo=slack"/></a>
<a href="https://docs.artie.com/running-transfer/overview"><img src="https://user-images.githubusercontent.com/4412200/226736695-6b8b9abd-c227-41c7-89a1-805a04c90d08.png"/></a>
<a href="https://github.com/artie-labs/transfer/blob/master/LICENSE.txt"><img src="https://user-images.githubusercontent.com/4412200/201544613-a7197bc4-8b61-4fc5-bf09-68ee10133fd7.svg"/></a>
<img src="https://github.com/artie-labs/transfer/actions/workflows/gha-go-test.yml/badge.svg"/>
<br/>
<b><a target="_blank" href="https://artie.so" >Learn more »</a></b>
<b><a target="_blank" href="https://artie.com" >Learn more »</a></b>
</div>
<br/>

Expand Down Expand Up @@ -51,7 +51,7 @@ To run Artie Transfer's stack locally, please refer to the [examples folder](htt

## <a name="getting-started"></a>Getting started

[Getting started guide](https://docs.artie.so/running-transfer/overview)
[Getting started guide](https://docs.artie.com/running-transfer/overview)

## What is currently supported?
Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Currently Transfer supports:
Expand All @@ -60,14 +60,14 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur
- Kafka (default)
- Google Pub/Sub

- [Destinations](https://docs.artie.so/real-time-destinations/overview):
- [Destinations](https://docs.artie.com/real-time-destinations/overview):
- Snowflake
- BigQuery
- Redshift
- Microsoft SQL Server
- S3

- [Sources](https://docs.artie.so/real-time-sources/overview):
- [Sources](https://docs.artie.com/real-time-sources/overview):
- MongoDB
- DocumentDB
- PostgreSQL
Expand All @@ -77,15 +77,15 @@ Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Cur
_If the database you are using is not on the list, feel free to file for a [feature request](https://github.com/artie-labs/transfer/issues/new)._

## Configuration File
* [Artie Transfer configuration file guide](https://docs.artie.so/running-transfer/options)
* [Examples of configuration files](https://docs.artie.so/running-transfer/examples)
* [Artie Transfer configuration file guide](https://docs.artie.com/running-transfer/options)
* [Examples of configuration files](https://docs.artie.com/running-transfer/examples)

## Telemetry

[Artie Transfer's telemetry guide](https://docs.artie.so/telemetry/overview)
[Artie Transfer's telemetry guide](https://docs.artie.com/telemetry/overview)

## Tests
Transfer is written in Go and uses [counterfeiter](https://github.com/maxbrunsfeld/counterfeiter) to mock.
Transfer is written in Go and uses [counterfeiter](https://github.com/maxbrunsfeld/counterfeiter) to mock.
To run the tests, run the following commands:

```sh
Expand Down
12 changes: 0 additions & 12 deletions clients/bigquery/append.go

This file was deleted.

23 changes: 11 additions & 12 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,20 @@ type Store struct {
db.Store
}

func (s *Store) Append(tableData *optimization.TableData) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
Expand Down Expand Up @@ -119,10 +122,6 @@ func (s *Store) Dialect() sql.Dialect {
return sql.BigQueryDialect{}
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return false
}

func (s *Store) GetClient(ctx context.Context) *bigquery.Client {
client, err := bigquery.NewClient(ctx, s.config.BigQuery.ProjectID)
if err != nil {
Expand Down
16 changes: 7 additions & 9 deletions clients/mssql/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@ import (
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
)

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
Expand Down
9 changes: 2 additions & 7 deletions clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,15 @@ func (s *Store) Label() constants.DestinationKind {
}

func (s *Store) Dialect() sql.Dialect {
return sql.DefaultDialect{}
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return false
return sql.MSSQLDialect{}
}

func (s *Store) Merge(tableData *optimization.TableData) error {
return shared.Merge(s, tableData, s.config, types.MergeOpts{})
}

func (s *Store) Append(tableData *optimization.TableData) error {
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
return shared.Append(s, tableData, types.AppendOpts{TempTableID: tableID})
return shared.Append(s, tableData, types.AdditionalSettings{})
}

// specificIdentifierFor returns a MS SQL [TableIdentifier] for a [TopicConfig] + table name.
Expand Down
2 changes: 1 addition & 1 deletion clients/mssql/tableid.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/artie-labs/transfer/lib/sql"
)

var dialect = sql.DefaultDialect{}
var dialect = sql.MSSQLDialect{}

type TableIdentifier struct {
schema string
Expand Down
17 changes: 13 additions & 4 deletions clients/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ type Store struct {
db.Store
}

func (s *Store) Append(tableData *optimization.TableData) error {
return shared.Append(s, tableData, types.AdditionalSettings{})
}

func (s *Store) Merge(tableData *optimization.TableData) error {
return shared.Merge(s, tableData, s.config, types.MergeOpts{
UseMergeParts: true,
// We are adding SELECT DISTINCT here for the temporary table as an extra guardrail.
// Redshift does not enforce any row uniqueness and there could be potential LOAD errors which will cause duplicate rows to arise.
SubQueryDedupe: true,
})
}

func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) types.TableIdentifier {
return NewTableIdentifier(topicConfig.Schema, table)
}
Expand All @@ -50,10 +63,6 @@ func (s *Store) Dialect() sql.Dialect {
return sql.RedshiftDialect{}
}

func (s *Store) ShouldUppercaseEscapedNames() bool {
return false
}

func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
const (
describeNameCol = "column_name"
Expand Down
29 changes: 14 additions & 15 deletions clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,24 @@ import (
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/s3lib"
)

func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, _ bool) error {
// Redshift always creates a temporary table.
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
UppercaseEscNames: ptr.ToBool(s.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
}
func (s *Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID types.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
TableID: tempTableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
Mode: tableData.Mode(),
}

if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table: %w", err)
if err := tempAlterTableArgs.AlterTable(tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table: %w", err)
}
}

fp, err := s.loadTemporaryTable(tableData, tempTableID)
Expand Down
34 changes: 0 additions & 34 deletions clients/redshift/writes.go

This file was deleted.

43 changes: 24 additions & 19 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,38 @@ import (
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AppendOpts) error {
func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, opts types.AdditionalSettings) error {
if tableData.ShouldSkipUpdate() {
return nil
}

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
tableConfig, err := dwh.GetTableConfig(tableData)
if err != nil {
return fmt.Errorf("failed to get table config: %w", err)
}

// We don't care about srcKeysMissing because we don't drop columns when we append.
_, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(),
tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt, tableData.Mode())
_, targetKeysMissing := columns.Diff(
tableData.ReadOnlyInMemoryCols(),
tableConfig.Columns(),
tableData.TopicConfig().SoftDelete,
tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt,
tableData.Mode(),
)

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
createAlterTableArgs := ddl.AlterTableArgs{
Dwh: dwh,
Tc: tableConfig,
TableID: tableID,
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
UppercaseEscNames: ptr.ToBool(dwh.ShouldUppercaseEscapedNames()),
Mode: tableData.Mode(),
Dwh: dwh,
Tc: tableConfig,
TableID: tableID,
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Mode: tableData.Mode(),
}

// Keys that exist in CDC stream, but not in DWH
Expand All @@ -48,9 +51,11 @@ func Append(dwh destination.DataWarehouse, tableData *optimization.TableData, op
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

additionalSettings := types.AdditionalSettings{
AdditionalCopyClause: opts.AdditionalCopyClause,
}

return dwh.PrepareTemporaryTable(tableData, tableConfig, opts.TempTableID, additionalSettings, false)
return dwh.PrepareTemporaryTable(
tableData,
tableConfig,
tableID,
opts,
false,
)
}
Loading

0 comments on commit 54aafa1

Please sign in to comment.