Skip to content

Commit

Permalink
[DDL] Moving DDL queries to ddl.go (#1053)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 19, 2024
1 parent 98abe01 commit d9f8720
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 136 deletions.
33 changes: 33 additions & 0 deletions clients/bigquery/dialect/ddl.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,47 @@
package dialect

import (
"fmt"
"strings"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (BigQueryDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string {
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))

if temporary {
return fmt.Sprintf(
`%s OPTIONS (expiration_timestamp = TIMESTAMP("%s"))`,
query,
BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)),
)
} else {
return query
}
}

func (bd BigQueryDialect) BuildAddColumnQuery(tableID sql.TableIdentifier, sqlPart string) string {
return bd.buildAlterColumnQuery(tableID, constants.Add, sqlPart)
}

func (bd BigQueryDialect) BuildDropColumnQuery(tableID sql.TableIdentifier, colName string) string {
return bd.buildAlterColumnQuery(tableID, constants.Delete, colName)
}

func (BigQueryDialect) buildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (BigQueryDialect) BuildDescribeTableQuery(tableID sql.TableIdentifier) (string, []interface{}, error) {
bqTableID, err := typing.AssertType[TableIdentifier](tableID)
if err != nil {
return "", nil, err
}

query := fmt.Sprintf("SELECT column_name, data_type, description FROM `%s.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` WHERE table_name = ?;", bqTableID.Dataset())
return query, []any{bqTableID.Table()}, nil
}
28 changes: 0 additions & 28 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,6 @@ func (BigQueryDialect) IsTableDoesNotExistErr(_ error) bool {
return false
}

func (BigQueryDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string {
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))

if temporary {
return fmt.Sprintf(
`%s OPTIONS (expiration_timestamp = TIMESTAMP("%s"))`,
query,
BQExpiresDate(time.Now().UTC().Add(constants.TemporaryTableTTL)),
)
} else {
return query
}
}

func (BigQueryDialect) buildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (BigQueryDialect) BuildDescribeTableQuery(tableID sql.TableIdentifier) (string, []interface{}, error) {
bqTableID, err := typing.AssertType[TableIdentifier](tableID)
if err != nil {
return "", nil, err
}

query := fmt.Sprintf("SELECT column_name, data_type, description FROM `%s.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` WHERE table_name = ?;", bqTableID.Dataset())
return query, []any{bqTableID.Table()}, nil
}

func (bd BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
colName := sql.QuoteTableAliasColumn(tableAlias, column, bd)
if column.KindDetails == typing.Struct {
Expand Down
16 changes: 16 additions & 0 deletions clients/databricks/dialect/ddl.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
package dialect

import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
)

func (DatabricksDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string {
// Databricks doesn't have a concept of temporary tables.
return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ", "))
}

func (d DatabricksDialect) BuildAddColumnQuery(tableID sql.TableIdentifier, sqlPart string) string {
return d.buildAlterColumnQuery(tableID, constants.Add, sqlPart)
}

func (d DatabricksDialect) BuildDropColumnQuery(tableID sql.TableIdentifier, colName string) string {
return d.buildAlterColumnQuery(tableID, constants.Delete, colName)
}

func (DatabricksDialect) BuildDescribeTableQuery(tableID sql.TableIdentifier) (string, []any, error) {
return fmt.Sprintf("DESCRIBE TABLE %s", tableID.FullyQualifiedName()), nil, nil
}

func (DatabricksDialect) buildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}
13 changes: 0 additions & 13 deletions clients/databricks/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,6 @@ func (DatabricksDialect) IsTableDoesNotExistErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "[TABLE_OR_VIEW_NOT_FOUND]")
}

func (DatabricksDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string {
// Databricks doesn't have a concept of temporary tables.
return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ", "))
}

func (DatabricksDialect) BuildDescribeTableQuery(tableID sql.TableIdentifier) (string, []any, error) {
return fmt.Sprintf("DESCRIBE TABLE %s", tableID.FullyQualifiedName()), nil, nil
}

func (DatabricksDialect) buildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (d DatabricksDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
colName := sql.QuoteTableAliasColumn(tableAlias, column, d)
if column.KindDetails == typing.Struct {
Expand Down
40 changes: 40 additions & 0 deletions clients/mssql/dialect/ddl.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,54 @@
package dialect

import (
"fmt"
"strings"

mssql "github.com/microsoft/go-mssqldb"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (MSSQLDialect) BuildDescribeTableQuery(tableID sql.TableIdentifier) (string, []any, error) {
mssqlTableID, err := typing.AssertType[TableIdentifier](tableID)
if err != nil {
return "", nil, err
}

return `
SELECT
COLUMN_NAME,
CASE
WHEN DATA_TYPE = 'numeric' THEN
'numeric(' + COALESCE(CAST(NUMERIC_PRECISION AS VARCHAR), '') + ',' + COALESCE(CAST(NUMERIC_SCALE AS VARCHAR), '') + ')'
ELSE
DATA_TYPE
END AS DATA_TYPE,
CHARACTER_MAXIMUM_LENGTH,
COLUMN_DEFAULT AS DEFAULT_VALUE
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
LOWER(TABLE_NAME) = LOWER(?) AND LOWER(TABLE_SCHEMA) = LOWER(?);`, []any{mssql.VarChar(mssqlTableID.Table()), mssql.VarChar(mssqlTableID.Schema())}, nil
}

func (md MSSQLDialect) BuildAddColumnQuery(tableID sql.TableIdentifier, sqlPart string) string {
return md.buildAlterColumnQuery(tableID, constants.Add, sqlPart)
}

func (md MSSQLDialect) BuildDropColumnQuery(tableID sql.TableIdentifier, colName string) string {
return md.buildAlterColumnQuery(tableID, constants.Delete, colName)
}

func (MSSQLDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string {
// Microsoft SQL Server uses the same syntax for temporary and permanent tables.
// Microsoft SQL Server doesn't support IF NOT EXISTS
return fmt.Sprintf("CREATE TABLE %s (%s);", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))
}

func (MSSQLDialect) buildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
// Microsoft SQL Server doesn't support the COLUMN keyword
return fmt.Sprintf("ALTER TABLE %s %s %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}
34 changes: 0 additions & 34 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,40 +43,6 @@ func (MSSQLDialect) IsTableDoesNotExistErr(err error) bool {
return false
}

func (MSSQLDialect) BuildDescribeTableQuery(tableID sql.TableIdentifier) (string, []any, error) {
mssqlTableID, err := typing.AssertType[TableIdentifier](tableID)
if err != nil {
return "", nil, err
}

return `
SELECT
COLUMN_NAME,
CASE
WHEN DATA_TYPE = 'numeric' THEN
'numeric(' + COALESCE(CAST(NUMERIC_PRECISION AS VARCHAR), '') + ',' + COALESCE(CAST(NUMERIC_SCALE AS VARCHAR), '') + ')'
ELSE
DATA_TYPE
END AS DATA_TYPE,
CHARACTER_MAXIMUM_LENGTH,
COLUMN_DEFAULT AS DEFAULT_VALUE
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
LOWER(TABLE_NAME) = LOWER(?) AND LOWER(TABLE_SCHEMA) = LOWER(?);`, []any{mssql.VarChar(mssqlTableID.Table()), mssql.VarChar(mssqlTableID.Schema())}, nil
}

func (MSSQLDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string {
// Microsoft SQL Server uses the same syntax for temporary and permanent tables.
// Microsoft SQL Server doesn't support IF NOT EXISTS
return fmt.Sprintf("CREATE TABLE %s (%s);", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))
}

func (MSSQLDialect) buildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
// Microsoft SQL Server doesn't support the COLUMN keyword
return fmt.Sprintf("ALTER TABLE %s %s %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (md MSSQLDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
colName := sql.QuoteTableAliasColumn(tableAlias, column, md)
// Microsoft SQL Server doesn't allow boolean expressions to be in the COALESCE statement.
Expand Down
44 changes: 44 additions & 0 deletions clients/redshift/dialect/ddl.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,58 @@
package dialect

import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (RedshiftDialect) BuildDescribeTableQuery(tableID sql.TableIdentifier) (string, []any, error) {
redshiftTableID, err := typing.AssertType[TableIdentifier](tableID)
if err != nil {
return "", nil, err
}

// This query is a modified fork from: https://gist.github.com/alexanderlz/7302623
return fmt.Sprintf(`
SELECT
c.column_name,
CASE
WHEN c.data_type = 'numeric' THEN
'numeric(' || COALESCE(CAST(c.numeric_precision AS VARCHAR), '') || ',' || COALESCE(CAST(c.numeric_scale AS VARCHAR), '') || ')'
ELSE
c.data_type
END AS data_type,
c.%s,
d.description
FROM
INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN
PG_CLASS c1 ON c.table_name = c1.relname
LEFT JOIN
PG_CATALOG.PG_NAMESPACE n ON c.table_schema = n.nspname AND c1.relnamespace = n.oid
LEFT JOIN
PG_CATALOG.PG_DESCRIPTION d ON d.objsubid = c.ordinal_position AND d.objoid = c1.oid
WHERE
LOWER(c.table_schema) = LOWER($1) AND LOWER(c.table_name) = LOWER($2);
`, constants.StrPrecisionCol), []any{redshiftTableID.Schema(), redshiftTableID.Table()}, nil
}

func (rd RedshiftDialect) BuildAddColumnQuery(tableID sql.TableIdentifier, sqlPart string) string {
return rd.buildAlterColumnQuery(tableID, constants.Add, sqlPart)
}

func (rd RedshiftDialect) BuildDropColumnQuery(tableID sql.TableIdentifier, colName string) string {
return rd.buildAlterColumnQuery(tableID, constants.Delete, colName)
}

func (RedshiftDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string {
// Redshift uses the same syntax for temporary and permanent tables.
return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))
}

func (RedshiftDialect) buildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}
40 changes: 0 additions & 40 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,46 +30,6 @@ func (RedshiftDialect) IsTableDoesNotExistErr(_ error) bool {
return false
}

func (RedshiftDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, _ bool, colSQLParts []string) string {
// Redshift uses the same syntax for temporary and permanent tables.
return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))
}

func (RedshiftDialect) buildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}

func (RedshiftDialect) BuildDescribeTableQuery(tableID sql.TableIdentifier) (string, []any, error) {
redshiftTableID, err := typing.AssertType[TableIdentifier](tableID)
if err != nil {
return "", nil, err
}

// This query is a modified fork from: https://gist.github.com/alexanderlz/7302623
return fmt.Sprintf(`
SELECT
c.column_name,
CASE
WHEN c.data_type = 'numeric' THEN
'numeric(' || COALESCE(CAST(c.numeric_precision AS VARCHAR), '') || ',' || COALESCE(CAST(c.numeric_scale AS VARCHAR), '') || ')'
ELSE
c.data_type
END AS data_type,
c.%s,
d.description
FROM
INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN
PG_CLASS c1 ON c.table_name = c1.relname
LEFT JOIN
PG_CATALOG.PG_NAMESPACE n ON c.table_schema = n.nspname AND c1.relnamespace = n.oid
LEFT JOIN
PG_CATALOG.PG_DESCRIPTION d ON d.objsubid = c.ordinal_position AND d.objoid = c1.oid
WHERE
LOWER(c.table_schema) = LOWER($1) AND LOWER(c.table_name) = LOWER($2);
`, constants.StrPrecisionCol), []any{redshiftTableID.Schema(), redshiftTableID.Table()}, nil
}

func (rd RedshiftDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
colName := sql.QuoteTableAliasColumn(tableAlias, column, rd)
if column.KindDetails == typing.Struct {
Expand Down
24 changes: 24 additions & 0 deletions clients/snowflake/dialect/ddl.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
package dialect

import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
)

func (SnowflakeDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string {
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", tableID.FullyQualifiedName(), strings.Join(colSQLParts, ","))

if temporary {
// TEMPORARY Table syntax - https://docs.snowflake.com/en/sql-reference/sql/create-table
// PURGE syntax - https://docs.snowflake.com/en/sql-reference/sql/copy-into-table#purging-files-after-loading
// FIELD_OPTIONALLY_ENCLOSED_BY - is needed because CSV will try to escape any values that have `"`
return query + ` STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`
} else {
return query
}
}

func (sd SnowflakeDialect) BuildAddColumnQuery(tableID sql.TableIdentifier, sqlPart string) string {
return sd.buildAlterColumnQuery(tableID, constants.Add, sqlPart)
}

func (sd SnowflakeDialect) BuildDropColumnQuery(tableID sql.TableIdentifier, colName string) string {
return sd.buildAlterColumnQuery(tableID, constants.Delete, colName)
}

func (SnowflakeDialect) BuildDescribeTableQuery(tableID sql.TableIdentifier) (string, []any, error) {
return fmt.Sprintf("DESC TABLE %s", tableID.FullyQualifiedName()), nil, nil
}

func (SnowflakeDialect) buildAlterColumnQuery(tableID sql.TableIdentifier, columnOp constants.ColumnOperation, colSQLPart string) string {
return fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", tableID.FullyQualifiedName(), columnOp, colSQLPart)
}
Loading

0 comments on commit d9f8720

Please sign in to comment.