Skip to content

Commit

Permalink
external duckdb and motherduck
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anshul committed Dec 14, 2023
1 parent 7c667ed commit c0dd282
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 18 deletions.
23 changes: 10 additions & 13 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,24 @@ var spec = drivers.Spec{
Description: "DuckDB SQL query.",
Placeholder: "select * from read_csv('data/file.csv', header=true);",
},
{
Key: "db",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "DB",
Description: "Path to external DuckDB database. Use md:<dbname> for motherduckb.",
Placeholder: "/path/to/main.db or md:main.db(for motherduck)",
},
},
ConfigProperties: []drivers.PropertySchema{
{
Key: "dsn",
Key: "dsn",
Secret: true,
},
},
}

var motherduckSpec = drivers.Spec{
DisplayName: "MotherDuck",
Description: "Import data from MotherDuck.",
SourceProperties: []drivers.PropertySchema{
{
Key: "sql",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "SQL",
Description: "Query to extract data from MotherDuck.",
Placeholder: "select * from my_db.my_table;",
},
},
ConfigProperties: []drivers.PropertySchema{
{
Key: "token",
Expand Down
55 changes: 55 additions & 0 deletions runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package duckdb

import (
"context"
"database/sql"
"errors"
"fmt"
"net/url"
"path/filepath"
"strings"

"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/duckdbsql"
Expand Down Expand Up @@ -37,6 +40,10 @@ func (t *duckDBToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps map[s
return err
}

if srcCfg.Database != "" { // query to be run against an external DB
return t.transferFromExternalDB(ctx, srcCfg, sinkCfg, opts)
}

// We can't just pass the SQL statement to DuckDB outright.
// We need to do some rewriting for certain table references (currently object stores and local files).

Expand Down Expand Up @@ -100,6 +107,54 @@ func (t *duckDBToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps map[s
return t.to.CreateTableAsSelect(ctx, sinkCfg.Table, false, srcCfg.SQL)
}

func (t *duckDBToDuckDB) transferFromExternalDB(ctx context.Context, srcProps *dbSourceProperties, sinkProps *sinkProperties, opts *drivers.TransferOptions) error {
return t.to.WithConnection(ctx, 1, true, false, func(ctx, ensuredCtx context.Context, _ *sql.Conn) error {
res, err := t.to.Execute(ctx, &drivers.Statement{Query: "SELECT current_database(),current_schema();"})
if err != nil {
return err
}
defer res.Close()

res.Next()
var localDB, localSchema string
if err := res.Scan(&localDB, &localSchema); err != nil {
return err
}

// duckdb considers everything before first . as db name
// alternative solution can be to query `show databases()` before and after to identify db name
dbName, _, _ := strings.Cut(filepath.Base(srcProps.Database), ".")
if dbName == "main" {
return fmt.Errorf("`main` is a reserved db name")
}

if err = t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("ATTACH %s AS %s", safeSQLString(srcProps.Database), safeSQLName(dbName))}); err != nil {
return fmt.Errorf("failed to attach db %q: %w", srcProps.Database, err)
}

defer func() {
if err = t.to.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("DETACH %s;", safeSQLName(dbName))}); err != nil {
t.logger.Error("failed to detach db", zap.Error(err))
}
}()

if err = t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("USE %s;", safeName(dbName))}); err != nil {

Check failure on line 141 in runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go

View workflow job for this annotation

GitHub Actions / lint

sloppyReassign: re-assignment to `err` can be replaced with `err := t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("USE %s;", safeName(dbName))})` (gocritic)
return err
}

defer func() { // revert back to localdb
if err = t.to.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("USE %s.%s;", safeName(localDB), safeName(localSchema))}); err != nil {
t.logger.Error("failed to switch to local database", zap.Error(err))
}
}()

userQuery := strings.TrimSpace(srcProps.SQL)
userQuery, _ = strings.CutSuffix(userQuery, ";") // trim trailing semi colon
query := fmt.Sprintf("CREATE OR REPLACE TABLE %s.%s.%s AS (%s\n);", safeName(localDB), safeName(localSchema), safeName(sinkProps.Table), userQuery)
return t.to.Exec(ctx, &drivers.Statement{Query: query})
})
}

// rewriteLocalPaths rewrites a DuckDB SQL statement such that relative paths become absolute paths relative to the basePath,
// and if allowHostAccess is false, returns an error if any of the paths resolve to a path outside of the basePath.
func rewriteLocalPaths(ast *duckdbsql.AST, basePath string, allowHostAccess bool) (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (t *motherduckToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps m
}

// get token
token := config["token"]
token, _ := config["token"].(string)
if token == "" && config["allow_host_access"].(bool) {
token = os.Getenv("motherduck_token")
}
Expand Down
6 changes: 3 additions & 3 deletions web-common/src/features/sources/modal/AddSourceModal.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import Https from "../../../components/icons/connectors/HTTPS.svelte";
import LocalFile from "../../../components/icons/connectors/LocalFile.svelte";
import MicrosoftAzureBlobStorage from "../../../components/icons/connectors/MicrosoftAzureBlobStorage.svelte";
import MotherDuck from "../../../components/icons/connectors/MotherDuck.svelte";
import DuckDB from "../../../components/icons/connectors/DuckDB.svelte";
import Postgres from "../../../components/icons/connectors/Postgres.svelte";
import Snowflake from "../../../components/icons/connectors/Snowflake.svelte";
import SQLite from "../../../components/icons/connectors/SQLite.svelte";
Expand Down Expand Up @@ -40,7 +40,7 @@
// duckdb
"bigquery",
"athena",
"motherduck",
"duckdb",
"postgres",
"sqlite",
"snowflake",
Expand All @@ -55,7 +55,7 @@
// duckdb: DuckDB,
bigquery: GoogleBigQuery,
athena: AmazonAthena,
motherduck: MotherDuck,
duckdb: DuckDB,
postgres: Postgres,
sqlite: SQLite,
snowflake: Snowflake,
Expand Down
3 changes: 2 additions & 1 deletion web-common/src/features/sources/modal/yupSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ export function getYupSchema(connector: V1ConnectorSpec) {
)
.required("Source name is required"),
});
case "motherduck":
case "duckdb":
return yup.object().shape({
sql: yup.string().required("sql is required"),
db: yup.string().required("db is required"),
sourceName: yup
.string()
.matches(
Expand Down
7 changes: 7 additions & 0 deletions web-common/src/features/sources/sourceUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ export function compileCreateSourceYAML(
delete values.db;
delete values.table;
break;
case "duckdb":
var db = values.db as string
if (db.startsWith("md:")) {
connectorName = "motherduck";
values.db = db.replace("md:", "")
}
break;
}

const compiledKeyValues = Object.entries(values)
Expand Down

0 comments on commit c0dd282

Please sign in to comment.