diff --git a/runtime/drivers/duckdb/transporter/objectStore_to_duckDB.go b/runtime/drivers/duckdb/transporter/objectStore_to_duckDB.go index cf28daac58e..e6b04a03b95 100644 --- a/runtime/drivers/duckdb/transporter/objectStore_to_duckDB.go +++ b/runtime/drivers/duckdb/transporter/objectStore_to_duckDB.go @@ -51,10 +51,10 @@ func (t *objectStoreToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps } fromAthena := reflect.TypeOf(t.from).AssignableTo(reflect.TypeOf(&athena.Connection{})) - sql, hasSQL := src.Properties["sql"].(string) + sql, hasSQL := srcProps["sql"].(string) // if sql is specified use ast rewrite to fill in the downloaded files if hasSQL && !fromAthena { - return t.ingestDuckDBSQL(ctx, sql, iterator, dbSink, opts, p) + return t.ingestDuckDBSQL(ctx, sql, iterator, sinkCfg, opts, p) } p.Target(size, drivers.ProgressUnitByte) diff --git a/runtime/drivers/duckdb/transporter/utils.go b/runtime/drivers/duckdb/transporter/utils.go index ee609e148ec..f23b26fbe3d 100644 --- a/runtime/drivers/duckdb/transporter/utils.go +++ b/runtime/drivers/duckdb/transporter/utils.go @@ -5,8 +5,38 @@ import ( "os" "path/filepath" "strings" + + "github.com/mitchellh/mapstructure" ) +type sourceProperties struct { + Database string `mapstructure:"db"` + SQL string `mapstructure:"sql"` +} + +func parseSourceProperties(props map[string]any) (*sourceProperties, error) { + cfg := &sourceProperties{} + if err := mapstructure.Decode(props, cfg); err != nil { + return nil, fmt.Errorf("failed to parse source properties: %w", err) + } + if cfg.SQL == "" { + return nil, fmt.Errorf("property 'sql' is mandatory") + } + return cfg, nil +} + +type sinkProperties struct { + Table string `mapstructure:"table"` +} + +func parseSinkProperties(props map[string]any) (*sinkProperties, error) { + cfg := &sinkProperties{} + if err := mapstructure.Decode(props, cfg); err != nil { + return nil, fmt.Errorf("failed to parse sink properties: %w", err) + } + return cfg, nil +} + func sourceReader(paths []string, format string, ingestionProps map[string]any, fromAthena bool) (string, error) { // Generate a "read" statement if containsAny(format, []string{".csv", ".tsv", ".txt"}) { diff --git a/web-common/src/features/sources/modal/AddSourceModal.svelte b/web-common/src/features/sources/modal/AddSourceModal.svelte index 2d566b9118d..9be5042f5eb 100644 --- a/web-common/src/features/sources/modal/AddSourceModal.svelte +++ b/web-common/src/features/sources/modal/AddSourceModal.svelte @@ -28,6 +28,7 @@ "local_file", "motherduck", "bigquery", + "athena", ]; const connectors = createRuntimeServiceListConnectors({