Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

athena-driver #3014

Merged
merged 46 commits into from
Sep 26, 2023
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
2ee73b7
athena-driver
Sep 1, 2023
328f2ae
athena-driver
Sep 1, 2023
a047f82
athena-driver
Sep 4, 2023
dbba228
athena-driver
Sep 4, 2023
985f195
athena-driver review
Sep 5, 2023
3db7655
athena-driver review
Sep 5, 2023
5c66737
athena-driver review
Sep 5, 2023
605b791
athena-driver review
Sep 5, 2023
2008155
athena-driver review
Sep 5, 2023
00d3ec0
athena-driver review
Sep 5, 2023
dc52056
athena-driver review
Sep 6, 2023
d7e774b
athena-driver review
Sep 6, 2023
e68b2af
Merge remote-tracking branch 'origin/main' into athena-connector
Sep 6, 2023
29b816e
Merge remote-tracking branch 'origin/main' into athena-connector
Sep 6, 2023
e5f1794
Merge remote-tracking branch 'origin/main' into athena-connector
Sep 6, 2023
549cf67
Run go mod tidy
begelundmuller Sep 7, 2023
e040606
athena-driver review
Sep 11, 2023
804f03e
athena-driver review
Sep 11, 2023
844877f
athena-driver review
Sep 12, 2023
f0bbee9
athena-driver review
Sep 12, 2023
8d4b69b
Merge remote-tracking branch 'origin/main' into athena-connector
Sep 13, 2023
03d5c42
Merge remote-tracking branch 'origin/main' into athena-connector
Sep 13, 2023
a5a5146
Merge remote-tracking branch 'origin/main' into athena-connector
Sep 13, 2023
b7fc59d
athena-driver review
Sep 13, 2023
28606aa
athena-driver review
Sep 13, 2023
c6af926
athena-driver review
Sep 13, 2023
163788c
Merge remote-tracking branch 'origin/main' into athena-connector
Sep 14, 2023
6d04765
Merge remote-tracking branch 'origin/main' into athena-connector
Sep 14, 2023
45efd9a
Merge remote-tracking branch 'origin/main' into athena-connector
Sep 14, 2023
b594ed4
athena-driver review
Sep 21, 2023
7ab90bc
Merge remote-tracking branch 'origin/main' into athena-connector
esevastyanov Sep 22, 2023
bf2f044
Auto-determine AWS region
esevastyanov Sep 23, 2023
28f3e6e
Athena icon
esevastyanov Sep 23, 2023
18b423d
Removed the auto-resolving of AWS region
esevastyanov Sep 25, 2023
679e85d
Simplified a clean-up process
esevastyanov Sep 25, 2023
7e320ac
Merge remote-tracking branch 'origin/main' into athena-connector
esevastyanov Sep 25, 2023
3ab245d
Updated according to previously merged changes
esevastyanov Sep 25, 2023
804de57
Dash vs underscore
esevastyanov Sep 25, 2023
591a4ef
A new line after a query
esevastyanov Sep 25, 2023
bc7f95d
Non-nil NextContinuationToken
esevastyanov Sep 25, 2023
af69414
ctx cancellation instead of a hardcoded timer
esevastyanov Sep 25, 2023
0bd4d25
Format for FileIterator
esevastyanov Sep 25, 2023
a0d18da
deferred cleanupFn()
esevastyanov Sep 25, 2023
a1d0eb9
Aligned Athena query with a source config
esevastyanov Sep 25, 2023
6d72df9
Merge remote-tracking branch 'origin/main' into athena-connector
esevastyanov Sep 25, 2023
480aa95
Fixed a merge conflict
esevastyanov Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/main' into athena-connector
Egor Ryashin committed Sep 14, 2023
commit 163788c35381cd63853c97bbb69cd4ed702961ae
2 changes: 1 addition & 1 deletion runtime/drivers/duckdb/transporter/filestore_to_duckDB.go
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ func (t *fileStoreToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps ma
}

// Ingest data
from, err := sourceReader(localPaths, format, ingestionProps, false)
from, err := sourceReader(localPaths, format, srcCfg.DuckDB, false)
if err != nil {
return err
}
8 changes: 3 additions & 5 deletions runtime/drivers/duckdb/transporter/objectStore_to_duckDB.go
Original file line number Diff line number Diff line change
@@ -54,8 +54,8 @@ func (t *objectStoreToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps

sql, hasSQL := srcProps["sql"].(string)
// if sql is specified use ast rewrite to fill in the downloaded files
if hasSQL {
return t.ingestDuckDBSQL(ctx, sql, iterator, sinkCfg, opts, p)
if srcCfg.SQL != "" {
return t.ingestDuckDBSQL(ctx, srcCfg.SQL, iterator, sinkCfg, opts, p)
}

p.Target(size, drivers.ProgressUnitByte)
@@ -94,9 +94,7 @@ func (t *objectStoreToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps
return err
}
} else {
var from string
var err error
from, err = sourceReader(files, format, ingestionProps, false)
from, err := sourceReader(files, format, srcCfg.DuckDB, false)
if err != nil {
return err
}
152 changes: 152 additions & 0 deletions runtime/drivers/duckdb/transporter/utils.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,158 @@ func parseSinkProperties(props map[string]any) (*sinkProperties, error) {
return cfg, nil
}

type dbSourceProperties struct {
Database string `mapstructure:"db"`
SQL string `mapstructure:"sql"`
}

func parseDBSourceProperties(props map[string]any) (*dbSourceProperties, error) {
cfg := &dbSourceProperties{}
if err := mapstructure.Decode(props, cfg); err != nil {
return nil, fmt.Errorf("failed to parse source properties: %w", err)
}
if cfg.SQL == "" {
return nil, fmt.Errorf("property 'sql' is mandatory")
}
return cfg, nil
}

type fileSourceProperties struct {
SQL string `mapstructure:"sql"`
DuckDB map[string]any `mapstructure:"duckdb"`
Format string `mapstructure:"format"`
AllowSchemaRelaxation bool `mapstructure:"allow_schema_relaxation"`
BatchSize string `mapstructure:"batch_size"`
BatchSizeBytes int64 `mapstructure:"-"` // Inferred from BatchSize

// Backwards compatibility
HivePartitioning *bool `mapstructure:"hive_partitioning"`
CSVDelimiter string `mapstructure:"csv.delimiter"`
IngestAllowSchemaRelaxation *bool `mapstructure:"ingest.allow_schema_relaxation"`
}

func parseFileSourceProperties(props map[string]any) (*fileSourceProperties, error) {
cfg := &fileSourceProperties{}
if err := mapstructure.Decode(props, cfg); err != nil {
return nil, fmt.Errorf("failed to parse source properties: %w", err)
}

if cfg.DuckDB == nil {
cfg.DuckDB = map[string]any{}
}

if cfg.HivePartitioning != nil {
cfg.DuckDB["hive_partitioning"] = *cfg.HivePartitioning
cfg.HivePartitioning = nil
}

if cfg.CSVDelimiter != "" {
cfg.DuckDB["delim"] = fmt.Sprintf("'%v'", cfg.CSVDelimiter)
cfg.CSVDelimiter = ""
}

if cfg.IngestAllowSchemaRelaxation != nil {
cfg.AllowSchemaRelaxation = *cfg.IngestAllowSchemaRelaxation
cfg.IngestAllowSchemaRelaxation = nil
}

if cfg.AllowSchemaRelaxation {
if val, ok := cfg.DuckDB["union_by_name"].(bool); ok && !val {
return nil, fmt.Errorf("can't set `union_by_name` and `allow_schema_relaxation` at the same time")
}

if hasKey(cfg.DuckDB, "columns", "types", "dtypes") {
return nil, fmt.Errorf("if any of `columns`,`types`,`dtypes` is set `allow_schema_relaxation` must be disabled")
}
}

if cfg.BatchSize != "" {
b, err := datasize.ParseString(cfg.BatchSize)
if err != nil {
return nil, err
}
cfg.BatchSizeBytes = int64(b.Bytes())
}

return cfg, nil
}

type dbSourceProperties struct {
Database string `mapstructure:"db"`
SQL string `mapstructure:"sql"`
}

func parseDBSourceProperties(props map[string]any) (*dbSourceProperties, error) {
cfg := &dbSourceProperties{}
if err := mapstructure.Decode(props, cfg); err != nil {
return nil, fmt.Errorf("failed to parse source properties: %w", err)
}
if cfg.SQL == "" {
return nil, fmt.Errorf("property 'sql' is mandatory")
}
return cfg, nil
}

type fileSourceProperties struct {
SQL string `mapstructure:"sql"`
DuckDB map[string]any `mapstructure:"duckdb"`
Format string `mapstructure:"format"`
AllowSchemaRelaxation bool `mapstructure:"allow_schema_relaxation"`
BatchSize string `mapstructure:"batch_size"`
BatchSizeBytes int64 `mapstructure:"-"` // Inferred from BatchSize

// Backwards compatibility
HivePartitioning *bool `mapstructure:"hive_partitioning"`
CSVDelimiter string `mapstructure:"csv.delimiter"`
IngestAllowSchemaRelaxation *bool `mapstructure:"ingest.allow_schema_relaxation"`
}

func parseFileSourceProperties(props map[string]any) (*fileSourceProperties, error) {
cfg := &fileSourceProperties{}
if err := mapstructure.Decode(props, cfg); err != nil {
return nil, fmt.Errorf("failed to parse source properties: %w", err)
}

if cfg.DuckDB == nil {
cfg.DuckDB = map[string]any{}
}

if cfg.HivePartitioning != nil {
cfg.DuckDB["hive_partitioning"] = *cfg.HivePartitioning
cfg.HivePartitioning = nil
}

if cfg.CSVDelimiter != "" {
cfg.DuckDB["delim"] = fmt.Sprintf("'%v'", cfg.CSVDelimiter)
cfg.CSVDelimiter = ""
}

if cfg.IngestAllowSchemaRelaxation != nil {
cfg.AllowSchemaRelaxation = *cfg.IngestAllowSchemaRelaxation
cfg.IngestAllowSchemaRelaxation = nil
}

if cfg.AllowSchemaRelaxation {
if val, ok := cfg.DuckDB["union_by_name"].(bool); ok && !val {
return nil, fmt.Errorf("can't set `union_by_name` and `allow_schema_relaxation` at the same time")
}

if hasKey(cfg.DuckDB, "columns", "types", "dtypes") {
return nil, fmt.Errorf("if any of `columns`,`types`,`dtypes` is set `allow_schema_relaxation` must be disabled")
}
}

if cfg.BatchSize != "" {
b, err := datasize.ParseString(cfg.BatchSize)
if err != nil {
return nil, err
}
cfg.BatchSizeBytes = int64(b.Bytes())
}

return cfg, nil
}

func sourceReader(paths []string, format string, ingestionProps map[string]any, fromAthena bool) (string, error) {
// Generate a "read" statement
if containsAny(format, []string{".csv", ".tsv", ".txt"}) {
47 changes: 24 additions & 23 deletions runtime/services/catalog/artifacts/yaml/objects.go
Original file line number Diff line number Diff line change
@@ -23,29 +23,30 @@ import (
* This file contains the mapping from CatalogObject to Yaml files
*/
type Source struct {
Type string
Path string `yaml:"path,omitempty"`
CsvDelimiter string `yaml:"csv.delimiter,omitempty" mapstructure:"csv.delimiter,omitempty"`
URI string `yaml:"uri,omitempty"`
Region string `yaml:"region,omitempty" mapstructure:"region,omitempty"`
S3Endpoint string `yaml:"endpoint,omitempty" mapstructure:"endpoint,omitempty"`
GlobMaxTotalSize int64 `yaml:"glob.max_total_size,omitempty" mapstructure:"glob.max_total_size,omitempty"`
GlobMaxObjectsMatched int `yaml:"glob.max_objects_matched,omitempty" mapstructure:"glob.max_objects_matched,omitempty"`
GlobMaxObjectsListed int64 `yaml:"glob.max_objects_listed,omitempty" mapstructure:"glob.max_objects_listed,omitempty"`
GlobPageSize int `yaml:"glob.page_size,omitempty" mapstructure:"glob.page_size,omitempty"`
BatchSize string `yaml:"batch_size,omitempty" mapstructure:"batch_size,omitempty"`
HivePartition *bool `yaml:"hive_partitioning,omitempty" mapstructure:"hive_partitioning,omitempty"`
Timeout int32 `yaml:"timeout,omitempty"`
Format string `yaml:"format,omitempty" mapstructure:"format,omitempty"`
Extract map[string]any `yaml:"extract,omitempty" mapstructure:"extract,omitempty"`
DuckDBProps map[string]any `yaml:"duckdb,omitempty" mapstructure:"duckdb,omitempty"`
Headers map[string]any `yaml:"headers,omitempty" mapstructure:"headers,omitempty"`
AllowSchemaRelaxation *bool `yaml:"ingest.allow_schema_relaxation,omitempty" mapstructure:"allow_schema_relaxation,omitempty"`
SQL string `yaml:"sql,omitempty" mapstructure:"sql,omitempty"`
DB string `yaml:"db,omitempty" mapstructure:"db,omitempty"`
ProjectID string `yaml:"project_id,omitempty" mapstructure:"project_id,omitempty"`
AthenaOutputLocation string `yaml:"athena_output_location,omitempty" mapstructure:"athena_output_location,omitempty"`
AthenaWorkgroup string `yaml:"athena_workgroup,omitempty" mapstructure:"athena_workgroup,omitempty"`
Type string
Path string `yaml:"path,omitempty"`
CsvDelimiter string `yaml:"csv.delimiter,omitempty" mapstructure:"csv.delimiter,omitempty"`
URI string `yaml:"uri,omitempty"`
Region string `yaml:"region,omitempty" mapstructure:"region,omitempty"`
S3Endpoint string `yaml:"endpoint,omitempty" mapstructure:"endpoint,omitempty"`
GlobMaxTotalSize int64 `yaml:"glob.max_total_size,omitempty" mapstructure:"glob.max_total_size,omitempty"`
GlobMaxObjectsMatched int `yaml:"glob.max_objects_matched,omitempty" mapstructure:"glob.max_objects_matched,omitempty"`
GlobMaxObjectsListed int64 `yaml:"glob.max_objects_listed,omitempty" mapstructure:"glob.max_objects_listed,omitempty"`
GlobPageSize int `yaml:"glob.page_size,omitempty" mapstructure:"glob.page_size,omitempty"`
BatchSize string `yaml:"batch_size,omitempty" mapstructure:"batch_size,omitempty"`
HivePartition *bool `yaml:"hive_partitioning,omitempty" mapstructure:"hive_partitioning,omitempty"`
Timeout int32 `yaml:"timeout,omitempty"`
Format string `yaml:"format,omitempty" mapstructure:"format,omitempty"`
Extract map[string]any `yaml:"extract,omitempty" mapstructure:"extract,omitempty"`
DuckDBProps map[string]any `yaml:"duckdb,omitempty" mapstructure:"duckdb,omitempty"`
Headers map[string]any `yaml:"headers,omitempty" mapstructure:"headers,omitempty"`
AllowSchemaRelaxation *bool `yaml:"allow_schema_relaxation,omitempty" mapstructure:"allow_schema_relaxation,omitempty"`
IngestAllowSchemaRelaxation *bool `yaml:"ingest.allow_schema_relaxation,omitempty" mapstructure:"ingest.allow_schema_relaxation,omitempty"`
SQL string `yaml:"sql,omitempty" mapstructure:"sql,omitempty"`
DB string `yaml:"db,omitempty" mapstructure:"db,omitempty"`
ProjectID string `yaml:"project_id,omitempty" mapstructure:"project_id,omitempty"`
AthenaOutputLocation string `yaml:"athena_output_location,omitempty" mapstructure:"athena_output_location,omitempty"`
AthenaWorkgroup string `yaml:"athena_workgroup,omitempty" mapstructure:"athena_workgroup,omitempty"`
}

type MetricsView struct {
You are viewing a condensed version of this merge commit. You can view the full changes here.