Skip to content

Commit

Permalink
Runtime: unified duckdb connector for motherduck and external `db…
Browse files Browse the repository at this point in the history
…` files (#3700)

* external duckdb and motherduck

* adding test and lint fix

* lint fix

* formatting fix

* formatting fix

* lint fix

* review comments - fix defer res.Close()

* fix defer res.Close() in other places
  • Loading branch information
k-anshul authored Dec 15, 2023
1 parent 2398551 commit 7f2fef8
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 23 deletions.
20 changes: 8 additions & 12 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ var spec = drivers.Spec{
Description: "DuckDB SQL query.",
Placeholder: "select * from read_csv('data/file.csv', header=true);",
},
{
Key: "db",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "DB",
Description: "Path to external DuckDB database. Use md:<dbname> for motherduckb.",
Placeholder: "/path/to/main.db or md:main.db(for motherduck)",
},
},
ConfigProperties: []drivers.PropertySchema{
{
Expand All @@ -58,18 +66,6 @@ var spec = drivers.Spec{
}

var motherduckSpec = drivers.Spec{
DisplayName: "MotherDuck",
Description: "Import data from MotherDuck.",
SourceProperties: []drivers.PropertySchema{
{
Key: "sql",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "SQL",
Description: "Query to extract data from MotherDuck.",
Placeholder: "select * from my_db.my_table;",
},
},
ConfigProperties: []drivers.PropertySchema{
{
Key: "token",
Expand Down
61 changes: 59 additions & 2 deletions runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package duckdb

import (
"context"
"database/sql"
"errors"
"fmt"
"net/url"
"path/filepath"
"strings"

"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/duckdbsql"
Expand Down Expand Up @@ -37,6 +40,10 @@ func (t *duckDBToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps map[s
return err
}

if srcCfg.Database != "" { // query to be run against an external DB
return t.transferFromExternalDB(ctx, srcCfg, sinkCfg, opts)
}

// We can't just pass the SQL statement to DuckDB outright.
// We need to do some rewriting for certain table references (currently object stores and local files).

Expand Down Expand Up @@ -90,16 +97,66 @@ func (t *duckDBToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps map[s

// If the path is a local file reference, rewrite to a safe and repo-relative path.
if uri.Scheme == "" && uri.Host == "" {
sql, err := rewriteLocalPaths(ast, opts.RepoRoot, opts.AllowHostAccess)
rewrittenSQL, err := rewriteLocalPaths(ast, opts.RepoRoot, opts.AllowHostAccess)
if err != nil {
return fmt.Errorf("invalid local path: %w", err)
}
srcCfg.SQL = sql
srcCfg.SQL = rewrittenSQL
}

return t.to.CreateTableAsSelect(ctx, sinkCfg.Table, false, srcCfg.SQL)
}

func (t *duckDBToDuckDB) transferFromExternalDB(ctx context.Context, srcProps *dbSourceProperties, sinkProps *sinkProperties, opts *drivers.TransferOptions) error {
return t.to.WithConnection(ctx, 1, true, false, func(ctx, ensuredCtx context.Context, _ *sql.Conn) error {
res, err := t.to.Execute(ctx, &drivers.Statement{Query: "SELECT current_database(),current_schema();"})
if err != nil {
return err
}

var localDB, localSchema string
for res.Next() {
if err := res.Scan(&localDB, &localSchema); err != nil {
_ = res.Close()
return err
}
}
_ = res.Close()

// duckdb considers everything before first . as db name
// alternative solution can be to query `show databases()` before and after to identify db name
dbName, _, _ := strings.Cut(filepath.Base(srcProps.Database), ".")
if dbName == "main" {
return fmt.Errorf("`main` is a reserved db name")
}

if err = t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("ATTACH %s AS %s", safeSQLString(srcProps.Database), safeSQLName(dbName))}); err != nil {
return fmt.Errorf("failed to attach db %q: %w", srcProps.Database, err)
}

defer func() {
if err = t.to.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("DETACH %s;", safeSQLName(dbName))}); err != nil {
t.logger.Error("failed to detach db", zap.Error(err))
}
}()

if err := t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("USE %s;", safeName(dbName))}); err != nil {
return err
}

defer func() { // revert back to localdb
if err = t.to.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("USE %s.%s;", safeName(localDB), safeName(localSchema))}); err != nil {
t.logger.Error("failed to switch to local database", zap.Error(err))
}
}()

userQuery := strings.TrimSpace(srcProps.SQL)
userQuery, _ = strings.CutSuffix(userQuery, ";") // trim trailing semi colon
query := fmt.Sprintf("CREATE OR REPLACE TABLE %s.%s.%s AS (%s\n);", safeName(localDB), safeName(localSchema), safeName(sinkProps.Table), userQuery)
return t.to.Exec(ctx, &drivers.Statement{Query: query})
})
}

// rewriteLocalPaths rewrites a DuckDB SQL statement such that relative paths become absolute paths relative to the basePath,
// and if allowHostAccess is false, returns an error if any of the paths resolve to a path outside of the basePath.
func rewriteLocalPaths(ast *duckdbsql.AST, basePath string, allowHostAccess bool) (string, error) {
Expand Down
65 changes: 65 additions & 0 deletions runtime/drivers/duckdb/transporter_duckDB_to_duckDB_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package duckdb

import (
"context"
"fmt"
"path/filepath"
"testing"

"github.com/rilldata/rill/runtime/drivers"
activity "github.com/rilldata/rill/runtime/pkg/activity"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestDuckDBToDuckDBTransfer(t *testing.T) {
tempDir := t.TempDir()
conn, err := Driver{}.Open(map[string]any{"dsn": fmt.Sprintf("%s.db?access_mode=read_write", filepath.Join(tempDir, "tranfser"))}, false, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

olap, ok := conn.AsOLAP("")
require.True(t, ok)

err = olap.Exec(context.Background(), &drivers.Statement{
Query: "CREATE TABLE foo(bar VARCHAR, baz INTEGER)",
})
require.NoError(t, err)

err = olap.Exec(context.Background(), &drivers.Statement{
Query: "INSERT INTO foo VALUES ('a', 1), ('a', 2), ('b', 3), ('c', 4)",
})
require.NoError(t, err)
require.NoError(t, conn.Close())

to, err := Driver{}.Open(map[string]any{"dsn": ""}, false, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

olap, _ = to.AsOLAP("")

tr := NewDuckDBToDuckDB(olap, zap.NewNop())

// transfer once
err = tr.Transfer(context.Background(), map[string]any{"sql": "SELECT * FROM foo", "db": filepath.Join(tempDir, "tranfser.db")}, map[string]any{"table": "test"}, &drivers.TransferOptions{Progress: drivers.NoOpProgress{}})
require.NoError(t, err)

rows, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT COUNT(*) FROM test"})
require.NoError(t, err)

var count int
rows.Next()
require.NoError(t, rows.Scan(&count))
require.Equal(t, 4, count)
require.NoError(t, rows.Close())

// transfer again
err = tr.Transfer(context.Background(), map[string]any{"sql": "SELECT * FROM foo", "db": filepath.Join(tempDir, "tranfser.db")}, map[string]any{"table": "test"}, &drivers.TransferOptions{Progress: drivers.NoOpProgress{}})
require.NoError(t, err)

rows, err = olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT COUNT(*) FROM test"})
require.NoError(t, err)

rows.Next()
require.NoError(t, rows.Scan(&count))
require.Equal(t, 4, count)
require.NoError(t, rows.Close())
}
12 changes: 7 additions & 5 deletions runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,18 @@ func (t *motherduckToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps m
if err != nil {
return err
}
defer res.Close()

res.Next()
var localDB, localSchema string
if err := res.Scan(&localDB, &localSchema); err != nil {
return err
for res.Next() {
if err := res.Scan(&localDB, &localSchema); err != nil {
_ = res.Close()
return err
}
}
_ = res.Close()

// get token
token := config["token"]
token, _ := config["token"].(string)
if token == "" && config["allow_host_access"].(bool) {
token = os.Getenv("motherduck_token")
}
Expand Down
6 changes: 3 additions & 3 deletions web-common/src/features/sources/modal/AddSourceModal.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import Https from "../../../components/icons/connectors/HTTPS.svelte";
import LocalFile from "../../../components/icons/connectors/LocalFile.svelte";
import MicrosoftAzureBlobStorage from "../../../components/icons/connectors/MicrosoftAzureBlobStorage.svelte";
import MotherDuck from "../../../components/icons/connectors/MotherDuck.svelte";
import DuckDB from "../../../components/icons/connectors/DuckDB.svelte";
import Postgres from "../../../components/icons/connectors/Postgres.svelte";
import Snowflake from "../../../components/icons/connectors/Snowflake.svelte";
import SQLite from "../../../components/icons/connectors/SQLite.svelte";
Expand Down Expand Up @@ -40,7 +40,7 @@
// duckdb
"bigquery",
"athena",
"motherduck",
"duckdb",
"postgres",
"sqlite",
"snowflake",
Expand All @@ -55,7 +55,7 @@
// duckdb: DuckDB,
bigquery: GoogleBigQuery,
athena: AmazonAthena,
motherduck: MotherDuck,
duckdb: DuckDB,
postgres: Postgres,
sqlite: SQLite,
snowflake: Snowflake,
Expand Down
3 changes: 2 additions & 1 deletion web-common/src/features/sources/modal/yupSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ export function getYupSchema(connector: V1ConnectorSpec) {
)
.required("Source name is required"),
});
case "motherduck":
case "duckdb":
return yup.object().shape({
sql: yup.string().required("sql is required"),
db: yup.string().required("db is required"),
sourceName: yup
.string()
.matches(
Expand Down
8 changes: 8 additions & 0 deletions web-common/src/features/sources/sourceUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ export function compileCreateSourceYAML(
delete values.db;
delete values.table;
break;
case "duckdb": {
const db = values.db as string;
if (db.startsWith("md:")) {
connectorName = "motherduck";
values.db = db.replace("md:", "");
}
break;
}
}

const compiledKeyValues = Object.entries(values)
Expand Down

1 comment on commit 7f2fef8

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.