From c0dd282d32e40760dfbe8075de4970cdd1a595c0 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Thu, 14 Dec 2023 17:31:37 +0530 Subject: [PATCH 1/8] external duckdb and motherduck --- runtime/drivers/duckdb/duckdb.go | 23 ++++---- .../duckdb/transporter_duckDB_to_duckDB.go | 55 +++++++++++++++++++ .../transporter_motherduck_to_duckDB.go | 2 +- .../sources/modal/AddSourceModal.svelte | 6 +- .../src/features/sources/modal/yupSchemas.ts | 3 +- .../src/features/sources/sourceUtils.ts | 7 +++ 6 files changed, 78 insertions(+), 18 deletions(-) diff --git a/runtime/drivers/duckdb/duckdb.go b/runtime/drivers/duckdb/duckdb.go index cf045549d3b..ddec3340d48 100644 --- a/runtime/drivers/duckdb/duckdb.go +++ b/runtime/drivers/duckdb/duckdb.go @@ -50,27 +50,24 @@ var spec = drivers.Spec{ Description: "DuckDB SQL query.", Placeholder: "select * from read_csv('data/file.csv', header=true);", }, + { + Key: "db", + Type: drivers.StringPropertyType, + Required: true, + DisplayName: "DB", + Description: "Path to external DuckDB database. Use md: for motherduckb.", + Placeholder: "/path/to/main.db or md:main.db(for motherduck)", + }, }, ConfigProperties: []drivers.PropertySchema{ { - Key: "dsn", + Key: "dsn", + Secret: true, }, }, } var motherduckSpec = drivers.Spec{ - DisplayName: "MotherDuck", - Description: "Import data from MotherDuck.", - SourceProperties: []drivers.PropertySchema{ - { - Key: "sql", - Type: drivers.StringPropertyType, - Required: true, - DisplayName: "SQL", - Description: "Query to extract data from MotherDuck.", - Placeholder: "select * from my_db.my_table;", - }, - }, ConfigProperties: []drivers.PropertySchema{ { Key: "token", diff --git a/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go b/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go index 50d5b2db51c..5ddc904a703 100644 --- a/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go +++ b/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go @@ -2,9 +2,12 @@ package duckdb import ( "context" + "database/sql" "errors" "fmt" "net/url" + "path/filepath" + "strings" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/duckdbsql" @@ -37,6 +40,10 @@ func (t *duckDBToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps map[s return err } + if srcCfg.Database != "" { // query to be run against an external DB + return t.transferFromExternalDB(ctx, srcCfg, sinkCfg, opts) + } + // We can't just pass the SQL statement to DuckDB outright. // We need to do some rewriting for certain table references (currently object stores and local files). @@ -100,6 +107,54 @@ func (t *duckDBToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps map[s return t.to.CreateTableAsSelect(ctx, sinkCfg.Table, false, srcCfg.SQL) } +func (t *duckDBToDuckDB) transferFromExternalDB(ctx context.Context, srcProps *dbSourceProperties, sinkProps *sinkProperties, opts *drivers.TransferOptions) error { + return t.to.WithConnection(ctx, 1, true, false, func(ctx, ensuredCtx context.Context, _ *sql.Conn) error { + res, err := t.to.Execute(ctx, &drivers.Statement{Query: "SELECT current_database(),current_schema();"}) + if err != nil { + return err + } + defer res.Close() + + res.Next() + var localDB, localSchema string + if err := res.Scan(&localDB, &localSchema); err != nil { + return err + } + + // duckdb considers everything before first . as db name + // alternative solution can be to query `show databases()` before and after to identify db name + dbName, _, _ := strings.Cut(filepath.Base(srcProps.Database), ".") + if dbName == "main" { + return fmt.Errorf("`main` is a reserved db name") + } + + if err = t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("ATTACH %s AS %s", safeSQLString(srcProps.Database), safeSQLName(dbName))}); err != nil { + return fmt.Errorf("failed to attach db %q: %w", srcProps.Database, err) + } + + defer func() { + if err = t.to.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("DETACH %s;", safeSQLName(dbName))}); err != nil { + t.logger.Error("failed to detach db", zap.Error(err)) + } + }() + + if err = t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("USE %s;", safeName(dbName))}); err != nil { + return err + } + + defer func() { // revert back to localdb + if err = t.to.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("USE %s.%s;", safeName(localDB), safeName(localSchema))}); err != nil { + t.logger.Error("failed to switch to local database", zap.Error(err)) + } + }() + + userQuery := strings.TrimSpace(srcProps.SQL) + userQuery, _ = strings.CutSuffix(userQuery, ";") // trim trailing semi colon + query := fmt.Sprintf("CREATE OR REPLACE TABLE %s.%s.%s AS (%s\n);", safeName(localDB), safeName(localSchema), safeName(sinkProps.Table), userQuery) + return t.to.Exec(ctx, &drivers.Statement{Query: query}) + }) +} + // rewriteLocalPaths rewrites a DuckDB SQL statement such that relative paths become absolute paths relative to the basePath, // and if allowHostAccess is false, returns an error if any of the paths resolve to a path outside of the basePath. func rewriteLocalPaths(ast *duckdbsql.AST, basePath string, allowHostAccess bool) (string, error) { diff --git a/runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go b/runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go index b97198fe127..b481c57509d 100644 --- a/runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go +++ b/runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go @@ -70,7 +70,7 @@ func (t *motherduckToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps m } // get token - token := config["token"] + token, _ := config["token"].(string) if token == "" && config["allow_host_access"].(bool) { token = os.Getenv("motherduck_token") } diff --git a/web-common/src/features/sources/modal/AddSourceModal.svelte b/web-common/src/features/sources/modal/AddSourceModal.svelte index 5c599d6d805..bc6f96c6d9f 100644 --- a/web-common/src/features/sources/modal/AddSourceModal.svelte +++ b/web-common/src/features/sources/modal/AddSourceModal.svelte @@ -12,7 +12,7 @@ import Https from "../../../components/icons/connectors/HTTPS.svelte"; import LocalFile from "../../../components/icons/connectors/LocalFile.svelte"; import MicrosoftAzureBlobStorage from "../../../components/icons/connectors/MicrosoftAzureBlobStorage.svelte"; - import MotherDuck from "../../../components/icons/connectors/MotherDuck.svelte"; + import DuckDB from "../../../components/icons/connectors/DuckDB.svelte"; import Postgres from "../../../components/icons/connectors/Postgres.svelte"; import Snowflake from "../../../components/icons/connectors/Snowflake.svelte"; import SQLite from "../../../components/icons/connectors/SQLite.svelte"; @@ -40,7 +40,7 @@ // duckdb "bigquery", "athena", - "motherduck", + "duckdb", "postgres", "sqlite", "snowflake", @@ -55,7 +55,7 @@ // duckdb: DuckDB, bigquery: GoogleBigQuery, athena: AmazonAthena, - motherduck: MotherDuck, + duckdb: DuckDB, postgres: Postgres, sqlite: SQLite, snowflake: Snowflake, diff --git a/web-common/src/features/sources/modal/yupSchemas.ts b/web-common/src/features/sources/modal/yupSchemas.ts index 108f23d7b97..6412c5c3fb5 100644 --- a/web-common/src/features/sources/modal/yupSchemas.ts +++ b/web-common/src/features/sources/modal/yupSchemas.ts @@ -46,9 +46,10 @@ export function getYupSchema(connector: V1ConnectorSpec) { ) .required("Source name is required"), }); - case "motherduck": + case "duckdb": return yup.object().shape({ sql: yup.string().required("sql is required"), + db: yup.string().required("db is required"), sourceName: yup .string() .matches( diff --git a/web-common/src/features/sources/sourceUtils.ts b/web-common/src/features/sources/sourceUtils.ts index 7fec9dd0350..083c4d223c0 100644 --- a/web-common/src/features/sources/sourceUtils.ts +++ b/web-common/src/features/sources/sourceUtils.ts @@ -39,6 +39,13 @@ export function compileCreateSourceYAML( delete values.db; delete values.table; break; + case "duckdb": + var db = values.db as string + if (db.startsWith("md:")) { + connectorName = "motherduck"; + values.db = db.replace("md:", "") + } + break; } const compiledKeyValues = Object.entries(values) From 503f33b454e6a2ca0df70e539e6a8931e9e9b5bf Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Thu, 14 Dec 2023 18:18:54 +0530 Subject: [PATCH 2/8] adding test and lint fix --- .../duckdb/transporter_duckDB_to_duckDB.go | 6 +- .../transporter_duckDB_to_duckDB_test.go | 65 +++++++++++++++++++ .../src/features/sources/sourceUtils.ts | 4 +- 3 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 runtime/drivers/duckdb/transporter_duckDB_to_duckDB_test.go diff --git a/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go b/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go index 5ddc904a703..dba839c925d 100644 --- a/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go +++ b/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go @@ -97,11 +97,11 @@ func (t *duckDBToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps map[s // If the path is a local file reference, rewrite to a safe and repo-relative path. if uri.Scheme == "" && uri.Host == "" { - sql, err := rewriteLocalPaths(ast, opts.RepoRoot, opts.AllowHostAccess) + rewrittenSQL, err := rewriteLocalPaths(ast, opts.RepoRoot, opts.AllowHostAccess) if err != nil { return fmt.Errorf("invalid local path: %w", err) } - srcCfg.SQL = sql + srcCfg.SQL = rewrittenSQL } return t.to.CreateTableAsSelect(ctx, sinkCfg.Table, false, srcCfg.SQL) @@ -138,7 +138,7 @@ func (t *duckDBToDuckDB) transferFromExternalDB(ctx context.Context, srcProps *d } }() - if err = t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("USE %s;", safeName(dbName))}); err != nil { + if err := t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("USE %s;", safeName(dbName))}); err != nil { return err } diff --git a/runtime/drivers/duckdb/transporter_duckDB_to_duckDB_test.go b/runtime/drivers/duckdb/transporter_duckDB_to_duckDB_test.go new file mode 100644 index 00000000000..a750ec637c9 --- /dev/null +++ b/runtime/drivers/duckdb/transporter_duckDB_to_duckDB_test.go @@ -0,0 +1,65 @@ +package duckdb + +import ( + "context" + "fmt" + "path/filepath" + "testing" + + "github.com/rilldata/rill/runtime/drivers" + activity "github.com/rilldata/rill/runtime/pkg/activity" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestDuckDBToDuckDBTransfer(t *testing.T) { + tempDir := t.TempDir() + conn, err := Driver{}.Open(map[string]any{"dsn": fmt.Sprintf("%s.db?access_mode=read_write", filepath.Join(tempDir, "tranfser"))}, false, activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + + olap, ok := conn.AsOLAP("") + require.True(t, ok) + + err = olap.Exec(context.Background(), &drivers.Statement{ + Query: "CREATE TABLE foo(bar VARCHAR, baz INTEGER)", + }) + require.NoError(t, err) + + err = olap.Exec(context.Background(), &drivers.Statement{ + Query: "INSERT INTO foo VALUES ('a', 1), ('a', 2), ('b', 3), ('c', 4)", + }) + require.NoError(t, err) + require.NoError(t, conn.Close()) + + to, err := Driver{}.Open(map[string]any{"dsn": ""}, false, activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + + olap, _ = to.AsOLAP("") + + tr := NewDuckDBToDuckDB(olap, zap.NewNop()) + + // transfer once + err = tr.Transfer(context.Background(), map[string]any{"sql": "SELECT * FROM foo", "db": filepath.Join(tempDir, "tranfser.db")}, map[string]any{"table": "test"}, &drivers.TransferOptions{Progress: drivers.NoOpProgress{}}) + require.NoError(t, err) + + rows, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT COUNT(*) FROM test"}) + require.NoError(t, err) + + var count int + rows.Next() + require.NoError(t, rows.Scan(&count)) + require.Equal(t, 4, count) + require.NoError(t, rows.Close()) + + // transfer again + err = tr.Transfer(context.Background(), map[string]any{"sql": "SELECT * FROM foo", "db": filepath.Join(tempDir, "tranfser.db")}, map[string]any{"table": "test"}, &drivers.TransferOptions{Progress: drivers.NoOpProgress{}}) + require.NoError(t, err) + + rows, err = olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT COUNT(*) FROM test"}) + require.NoError(t, err) + + rows.Next() + require.NoError(t, rows.Scan(&count)) + require.Equal(t, 4, count) + require.NoError(t, rows.Close()) +} diff --git a/web-common/src/features/sources/sourceUtils.ts b/web-common/src/features/sources/sourceUtils.ts index 083c4d223c0..ec9959e1afe 100644 --- a/web-common/src/features/sources/sourceUtils.ts +++ b/web-common/src/features/sources/sourceUtils.ts @@ -40,10 +40,10 @@ export function compileCreateSourceYAML( delete values.table; break; case "duckdb": - var db = values.db as string + var db = values.db as string; if (db.startsWith("md:")) { connectorName = "motherduck"; - values.db = db.replace("md:", "") + values.db = db.replace("md:", ""); } break; } From 499df49df5c8e5f4e3ac9e02190dd3b25473f54b Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Thu, 14 Dec 2023 18:27:21 +0530 Subject: [PATCH 3/8] lint fix --- runtime/drivers/duckdb/duckdb.go | 1 - web-common/src/features/sources/sourceUtils.ts | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/runtime/drivers/duckdb/duckdb.go b/runtime/drivers/duckdb/duckdb.go index ddec3340d48..3e9efedaf0d 100644 --- a/runtime/drivers/duckdb/duckdb.go +++ b/runtime/drivers/duckdb/duckdb.go @@ -62,7 +62,6 @@ var spec = drivers.Spec{ ConfigProperties: []drivers.PropertySchema{ { Key: "dsn", - Secret: true, }, }, } diff --git a/web-common/src/features/sources/sourceUtils.ts b/web-common/src/features/sources/sourceUtils.ts index ec9959e1afe..674984f816f 100644 --- a/web-common/src/features/sources/sourceUtils.ts +++ b/web-common/src/features/sources/sourceUtils.ts @@ -40,7 +40,7 @@ export function compileCreateSourceYAML( delete values.table; break; case "duckdb": - var db = values.db as string; + let db = values.db as string; if (db.startsWith("md:")) { connectorName = "motherduck"; values.db = db.replace("md:", ""); From 5958dc1e5f223745e895e3432ae554da5737afaa Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Thu, 14 Dec 2023 18:28:40 +0530 Subject: [PATCH 4/8] formatting fix --- runtime/drivers/duckdb/duckdb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/drivers/duckdb/duckdb.go b/runtime/drivers/duckdb/duckdb.go index 3e9efedaf0d..db4d660ef07 100644 --- a/runtime/drivers/duckdb/duckdb.go +++ b/runtime/drivers/duckdb/duckdb.go @@ -61,7 +61,7 @@ var spec = drivers.Spec{ }, ConfigProperties: []drivers.PropertySchema{ { - Key: "dsn", + Key: "dsn", }, }, } From 16659b91ee24d0820d384bfe7355603a87c7a631 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Thu, 14 Dec 2023 18:38:05 +0530 Subject: [PATCH 5/8] formatting fix --- web-common/src/features/sources/sourceUtils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web-common/src/features/sources/sourceUtils.ts b/web-common/src/features/sources/sourceUtils.ts index 674984f816f..0b0e31f9cf9 100644 --- a/web-common/src/features/sources/sourceUtils.ts +++ b/web-common/src/features/sources/sourceUtils.ts @@ -40,7 +40,7 @@ export function compileCreateSourceYAML( delete values.table; break; case "duckdb": - let db = values.db as string; + const db = values.db as string; if (db.startsWith("md:")) { connectorName = "motherduck"; values.db = db.replace("md:", ""); From d1f910565fbcaaf42ffa54ba8e8faf29f6ac2b87 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Thu, 14 Dec 2023 18:55:28 +0530 Subject: [PATCH 6/8] lint fix --- web-common/src/features/sources/sourceUtils.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/web-common/src/features/sources/sourceUtils.ts b/web-common/src/features/sources/sourceUtils.ts index 0b0e31f9cf9..d96091fe266 100644 --- a/web-common/src/features/sources/sourceUtils.ts +++ b/web-common/src/features/sources/sourceUtils.ts @@ -39,13 +39,14 @@ export function compileCreateSourceYAML( delete values.db; delete values.table; break; - case "duckdb": + case "duckdb": { const db = values.db as string; if (db.startsWith("md:")) { connectorName = "motherduck"; values.db = db.replace("md:", ""); } break; + } } const compiledKeyValues = Object.entries(values) From 9086c99a696d78cd635e068524d01bc6f45609fd Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Fri, 15 Dec 2023 17:56:07 +0530 Subject: [PATCH 7/8] review comments - fix defer res.Close() --- runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go b/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go index dba839c925d..3dccbb0028a 100644 --- a/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go +++ b/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go @@ -113,13 +113,15 @@ func (t *duckDBToDuckDB) transferFromExternalDB(ctx context.Context, srcProps *d if err != nil { return err } - defer res.Close() - res.Next() var localDB, localSchema string - if err := res.Scan(&localDB, &localSchema); err != nil { - return err + for res.Next() { + if err := res.Scan(&localDB, &localSchema); err != nil { + res.Close() + return err + } } + res.Close() // duckdb considers everything before first . as db name // alternative solution can be to query `show databases()` before and after to identify db name From 05e8dd7d603ba5b09ff5b5aee247f664736bc297 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Fri, 15 Dec 2023 17:59:23 +0530 Subject: [PATCH 8/8] fix defer res.Close() in other places --- runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go | 4 ++-- .../drivers/duckdb/transporter_motherduck_to_duckDB.go | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go b/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go index 3dccbb0028a..1e8e1331cd0 100644 --- a/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go +++ b/runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go @@ -117,11 +117,11 @@ func (t *duckDBToDuckDB) transferFromExternalDB(ctx context.Context, srcProps *d var localDB, localSchema string for res.Next() { if err := res.Scan(&localDB, &localSchema); err != nil { - res.Close() + _ = res.Close() return err } } - res.Close() + _ = res.Close() // duckdb considers everything before first . as db name // alternative solution can be to query `show databases()` before and after to identify db name diff --git a/runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go b/runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go index b481c57509d..20d32d4cbc7 100644 --- a/runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go +++ b/runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go @@ -61,13 +61,15 @@ func (t *motherduckToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps m if err != nil { return err } - defer res.Close() - res.Next() var localDB, localSchema string - if err := res.Scan(&localDB, &localSchema); err != nil { - return err + for res.Next() { + if err := res.Scan(&localDB, &localSchema); err != nil { + _ = res.Close() + return err + } } + _ = res.Close() // get token token, _ := config["token"].(string)