From ad190876114f82ce93a53adfa27458c1866b5670 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 27 Sep 2024 15:14:56 -0700 Subject: [PATCH] WIP. --- clients/databricks/store.go | 54 +++++++++++++++++++++++++++++++ lib/config/constants/constants.go | 12 ++++--- lib/destination/utils/load.go | 9 ++++++ 3 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 clients/databricks/store.go diff --git a/clients/databricks/store.go b/clients/databricks/store.go new file mode 100644 index 000000000..88ba04e5a --- /dev/null +++ b/clients/databricks/store.go @@ -0,0 +1,54 @@ +package databricks + +import ( + "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/db" + "github.com/artie-labs/transfer/lib/destination/types" + "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/artie-labs/transfer/lib/optimization" + "github.com/artie-labs/transfer/lib/sql" +) + +type Store struct { + db.Store + cfg config.Config +} + +func (s Store) Merge(tableData *optimization.TableData) error { + //TODO implement me + panic("implement me") +} + +func (s Store) Append(tableData *optimization.TableData, useTempTable bool) error { + //TODO implement me + panic("implement me") +} + +func (s Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier { + //TODO implement me + panic("implement me") +} + +func (s Store) Dialect() sql.Dialect { + //TODO implement me + panic("implement me") +} + +func (s Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { + //TODO implement me + panic("implement me") +} + +func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) { + //TODO implement me + panic("implement me") +} + +func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { + //TODO implement me + panic("implement me") +} + +func LoadStore(cfg config.Config) (Store, error) { + return Store{cfg: cfg}, nil +} diff --git a/lib/config/constants/constants.go b/lib/config/constants/constants.go index 113039d26..33be18b66 100644 --- a/lib/config/constants/constants.go +++ b/lib/config/constants/constants.go @@ -67,13 +67,15 @@ const ( type DestinationKind string const ( - BigQuery DestinationKind = "bigquery" - MSSQL DestinationKind = "mssql" - Redshift DestinationKind = "redshift" - S3 DestinationKind = "s3" - Snowflake DestinationKind = "snowflake" + BigQuery DestinationKind = "bigquery" + Databricks DestinationKind = "databricks" + MSSQL DestinationKind = "mssql" + Redshift DestinationKind = "redshift" + S3 DestinationKind = "s3" + Snowflake DestinationKind = "snowflake" ) +// TODO: Add Databricks here when it's live. var ValidDestinations = []DestinationKind{ BigQuery, MSSQL, diff --git a/lib/destination/utils/load.go b/lib/destination/utils/load.go index 21bc1d89f..225715ade 100644 --- a/lib/destination/utils/load.go +++ b/lib/destination/utils/load.go @@ -3,6 +3,8 @@ package utils import ( "fmt" + "github.com/artie-labs/transfer/clients/databricks" + "github.com/artie-labs/transfer/clients/bigquery" "github.com/artie-labs/transfer/clients/mssql" "github.com/artie-labs/transfer/clients/redshift" @@ -63,6 +65,13 @@ func LoadDataWarehouse(cfg config.Config, store *db.Store) (destination.DataWare return nil, fmt.Errorf("failed to clean up Redshift: %w", err) } return s, nil + case constants.Databricks: + store, err := databricks.LoadStore(cfg) + if err != nil { + return nil, fmt.Errorf("failed to load Databricks: %w", err) + } + + return store, nil } return nil, fmt.Errorf("invalid data warehouse output source specified: %q", cfg.Output)