From c9cf389f392c4267d68bb73101031c1e5b89b4b3 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 27 Sep 2024 17:04:20 -0700 Subject: [PATCH] WIP. --- clients/databricks/dialect/dialect.go | 168 ++++++++++++++++++++++++++ clients/databricks/store.go | 43 ++++--- lib/config/destination_types.go | 13 +- lib/config/destinations.go | 11 ++ lib/config/types.go | 11 +- 5 files changed, 225 insertions(+), 21 deletions(-) create mode 100644 clients/databricks/dialect/dialect.go diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go new file mode 100644 index 000000000..2bd001564 --- /dev/null +++ b/clients/databricks/dialect/dialect.go @@ -0,0 +1,168 @@ +package dialect + +import ( + "fmt" + "strings" + + "github.com/artie-labs/transfer/lib/typing/ext" + + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/columns" +) + +type DatabricksDialect struct{} + +func (d DatabricksDialect) QuoteIdentifier(identifier string) string { + return fmt.Sprintf("`%s`", identifier) +} + +func (d DatabricksDialect) EscapeStruct(value string) string { + return strings.ReplaceAll(value, "`", "``") +} + +func (d DatabricksDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string { + // https://docs.databricks.com/en/sql/language-manual/sql-ref-datatypes.html + switch kd.Kind { + case typing.String.Kind: + return "STRING" + case typing.Integer.Kind: + return "INT" + case typing.Float.Kind: + return "FLOAT" + case typing.EDecimal.Kind: + return kd.ExtendedDecimalDetails.SnowflakeKind() + case typing.Boolean.Kind: + return "BOOLEAN" + case typing.ETime.Kind: + switch kd.ExtendedTimeDetails.Type { + case ext.TimestampTzKindType: + return "TIMESTAMP" + case ext.DateKindType: + return "DATE" + case ext.TimeKindType: + // Databricks doesn't have an explicit TIME type, so we use STRING instead + return "STRING" + } + case typing.Struct.Kind: + return "VARIANT" + case typing.Array.Kind: + // This is because Databricks requires typing within the element of an array (similar to BigQuery). + return "ARRAY" + } + + return kd.Kind +} + +func (d DatabricksDialect) KindForDataType(_type string, _ string) (typing.KindDetails, error) { + // Implement the reverse mapping from Databricks data types to KindDetails + switch strings.ToUpper(_type) { + case "STRING": + return typing.String, nil + case "INT": + return typing.Integer, nil + case "FLOAT": + return typing.Float, nil + case "BOOLEAN": + return typing.Boolean, nil + case "VARIANT": + return typing.Struct, nil + } + + return typing.KindDetails{}, fmt.Errorf("unsupported data type: %q", _type) +} + +func (d DatabricksDialect) IsColumnAlreadyExistsErr(err error) bool { + // Implement the logic to check if the error is a "column already exists" error + return strings.Contains(err.Error(), "already exists") +} + +func (d DatabricksDialect) IsTableDoesNotExistErr(err error) bool { + // Implement the logic to check if the error is a "table does not exist" error + return strings.Contains(err.Error(), "does not exist") +} + +func (d DatabricksDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string { + temp := "" + if temporary { + temp = "TEMPORARY " + } + return fmt.Sprintf("CREATE %sTABLE %s (%s)", temp, tableID.FullyQualifiedName(), strings.Join(colSQLParts, ", ")) +} + +func (d DatabricksDialect) BuildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string { + return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart) +} + +func (d DatabricksDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string { + return fmt.Sprintf("%s.%s IS NOT NULL", tableAlias, column.Name) +} + +func (d DatabricksDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { + return fmt.Sprintf("DELETE FROM %s WHERE ROWID NOT IN (SELECT MAX(ROWID) FROM %s GROUP BY %s)", tableID.FullyQualifiedName(), tableID.FullyQualifiedName(), strings.Join(primaryKeys, ", ")) +} + +func (d DatabricksDialect) BuildDedupeQueries(tableID, stagingTableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) []string { + var queries []string + queries = append(queries, fmt.Sprintf("CREATE OR REPLACE TEMPORARY VIEW %s AS SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s) = 1", + stagingTableID.FullyQualifiedName(), tableID.FullyQualifiedName(), strings.Join(primaryKeys, ", "), "updated_at DESC")) + queries = append(queries, fmt.Sprintf("DELETE FROM %s WHERE EXISTS (SELECT 1 FROM %s WHERE %s)", tableID.FullyQualifiedName(), stagingTableID.FullyQualifiedName(), strings.Join(primaryKeys, " AND "))) + queries = append(queries, fmt.Sprintf("INSERT INTO %s SELECT * FROM %s", tableID.FullyQualifiedName(), stagingTableID.FullyQualifiedName())) + return queries +} + +func (d DatabricksDialect) BuildMergeQueries( + tableID sql.TableIdentifier, + subQuery string, + primaryKeys []columns.Column, + additionalEqualityStrings []string, + cols []columns.Column, + softDelete bool, + containsHardDeletes bool, +) ([]string, error) { + equalitySQLParts := sql.BuildColumnComparisons(primaryKeys, constants.TargetAlias, constants.StagingAlias, sql.Equal, d) + if len(additionalEqualityStrings) > 0 { + equalitySQLParts = append(equalitySQLParts, additionalEqualityStrings...) + } + baseQuery := fmt.Sprintf(` +MERGE INTO %s %s USING ( %s ) AS %s ON %s`, + tableID.FullyQualifiedName(), constants.TargetAlias, subQuery, constants.StagingAlias, strings.Join(equalitySQLParts, " AND "), + ) + + cols, err := columns.RemoveOnlySetDeleteColumnMarker(cols) + if err != nil { + return []string{}, err + } + + if softDelete { + return []string{baseQuery + fmt.Sprintf(` +WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s +WHEN MATCHED AND IFNULL(%s, false) = true THEN UPDATE SET %s +WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);`, + sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, d), sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, d), + sql.GetQuotedOnlySetDeleteColumnMarker(constants.StagingAlias, d), sql.BuildColumnsUpdateFragment([]columns.Column{columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean)}, constants.StagingAlias, constants.TargetAlias, d), + strings.Join(sql.QuoteColumns(cols, d), ","), + strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, d), ","), + )}, nil + } + + cols, err = columns.RemoveDeleteColumnMarker(cols) + if err != nil { + return []string{}, err + } + + return []string{baseQuery + fmt.Sprintf(` +WHEN MATCHED AND %s THEN DELETE +WHEN MATCHED AND IFNULL(%s, false) = false THEN UPDATE SET %s +WHEN NOT MATCHED AND IFNULL(%s, false) = false THEN INSERT (%s) VALUES (%s);`, + sql.QuotedDeleteColumnMarker(constants.StagingAlias, d), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, d), sql.BuildColumnsUpdateFragment(cols, constants.StagingAlias, constants.TargetAlias, d), + sql.QuotedDeleteColumnMarker(constants.StagingAlias, d), strings.Join(sql.QuoteColumns(cols, d), ","), + strings.Join(sql.QuoteTableAliasColumns(constants.StagingAlias, cols, d), ","), + )}, nil +} + +func (d DatabricksDialect) GetDefaultValueStrategy() sql.DefaultValueStrategy { + return sql.Backfill +} diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 88ba04e5a..0a77bb17c 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -1,6 +1,7 @@ package databricks import ( + "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/db" "github.com/artie-labs/transfer/lib/destination/types" @@ -15,40 +16,52 @@ type Store struct { } func (s Store) Merge(tableData *optimization.TableData) error { - //TODO implement me - panic("implement me") + return shared.Merge(s, tableData, types.MergeOpts{}) } func (s Store) Append(tableData *optimization.TableData, useTempTable bool) error { - //TODO implement me - panic("implement me") + return shared.Append(s, tableData, types.AdditionalSettings{UseTempTable: useTempTable}) } func (s Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier { - //TODO implement me - panic("implement me") + //return NewTableIdentifier(topicConfig.Schema, table) } func (s Store) Dialect() sql.Dialect { - //TODO implement me - panic("implement me") + //return dialect.DatabricksDialect{} } func (s Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error { - //TODO implement me - panic("implement me") + //return shared.Dedupe(s, tableID, primaryKeys, includeArtieUpdatedAt) } func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) { - //TODO implement me - panic("implement me") + tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) + //query, args := describeTableQuery(tableID) + return shared.GetTableCfgArgs{ + Dwh: s, + TableID: tableID, + ConfigMap: s.configMap, + Query: query, + Args: args, + ColumnNameForName: "column_name", + ColumnNameForDataType: "data_type", + ColumnNameForComment: "description", + DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns, + }.GetTableConfig() } func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { - //TODO implement me - panic("implement me") + //return shared.PrepareTemporaryTable(s, tableData, tableConfig, tempTableID, parentTableID, additionalSettings, createTempTable) } func LoadStore(cfg config.Config) (Store, error) { - return Store{cfg: cfg}, nil + store, err := db.Open("databricks", cfg.Databricks.DSN()) + if err != nil { + return Store{}, err + } + return Store{ + Store: store, + cfg: cfg, + }, nil } diff --git a/lib/config/destination_types.go b/lib/config/destination_types.go index 47cd94304..b7c8a72c7 100644 --- a/lib/config/destination_types.go +++ b/lib/config/destination_types.go @@ -1,6 +1,8 @@ package config -import "github.com/artie-labs/transfer/lib/config/constants" +import ( + "github.com/artie-labs/transfer/lib/config/constants" +) type BigQuery struct { // PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var @@ -51,3 +53,12 @@ type Snowflake struct { Host string `yaml:"host"` Application string `yaml:"application"` } + +type Databricks struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Database string `yaml:"database"` + Protocol string `yaml:"protocol"` +} diff --git a/lib/config/destinations.go b/lib/config/destinations.go index af04a0038..b164290b1 100644 --- a/lib/config/destinations.go +++ b/lib/config/destinations.go @@ -72,3 +72,14 @@ func (s Snowflake) ToConfig() (*gosnowflake.Config, error) { return cfg, nil } + +func (d Databricks) DSN() string { + return fmt.Sprintf("%s://%s:%s@%s:%d/%s", + d.Protocol, + url.QueryEscape(d.Username), + url.QueryEscape(d.Password), + d.Host, + d.Port, + d.Database, + ) +} diff --git a/lib/config/types.go b/lib/config/types.go index d9247b22f..8727f7070 100644 --- a/lib/config/types.go +++ b/lib/config/types.go @@ -61,11 +61,12 @@ type Config struct { Kafka *Kafka `yaml:"kafka,omitempty"` // Supported destinations - MSSQL *MSSQL `yaml:"mssql,omitempty"` - BigQuery *BigQuery `yaml:"bigquery,omitempty"` - Snowflake *Snowflake `yaml:"snowflake,omitempty"` - Redshift *Redshift `yaml:"redshift,omitempty"` - S3 *S3Settings `yaml:"s3,omitempty"` + BigQuery *BigQuery `yaml:"bigquery,omitempty"` + Databricks *Databricks `yaml:"databricks,omitempty"` + MSSQL *MSSQL `yaml:"mssql,omitempty"` + Snowflake *Snowflake `yaml:"snowflake,omitempty"` + Redshift *Redshift `yaml:"redshift,omitempty"` + S3 *S3Settings `yaml:"s3,omitempty"` SharedDestinationSettings SharedDestinationSettings `yaml:"sharedDestinationSettings"` Reporting Reporting `yaml:"reporting"`