Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime: duckdb sqlite extensions based connector for sqlite #3018

Merged
merged 14 commits into from
Sep 26, 2023
Merged
4 changes: 4 additions & 0 deletions runtime/compilers/rillv1/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func driverSourceForAnonAccessCheck(connector string, src *runtimev1.SourceSpec)
}
case "motherduck":
return &drivers.DatabaseSource{}
case "postgres_ext":
return &drivers.DatabaseSource{}
case "sqlite_ext":
return &drivers.DatabaseSource{}
default:
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions runtime/compilers/rillv1beta/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ func source(connector string, src *runtimev1.Source) drivers.Source {
}
case "motherduck":
return &drivers.DatabaseSource{}
case "postgres_ext":
return &drivers.DatabaseSource{}
case "sqlite_ext":
return &drivers.DatabaseSource{}
case "bigquery":
return &drivers.DatabaseSource{
Props: props,
Expand Down
72 changes: 55 additions & 17 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,62 @@ import (
func init() {
drivers.Register("duckdb", Driver{name: "duckdb"})
drivers.Register("motherduck", Driver{name: "motherduck"})
drivers.Register("postgres_ext", Driver{name: "postgres_ext"})
drivers.Register("sqlite_ext", Driver{name: "sqlite_ext"})
drivers.RegisterAsConnector("motherduck", Driver{name: "motherduck"})
drivers.RegisterAsConnector("postgres_ext", Driver{name: "postgres_ext"})
drivers.RegisterAsConnector("sqlite_ext", Driver{name: "sqlite_ext"})
}

// spec for duckdb as motherduck connector
var spec = drivers.Spec{
DisplayName: "MotherDuck",
Description: "Import data from MotherDuck.",
SourceProperties: []drivers.PropertySchema{
{
Key: "sql",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "SQL",
Description: "Query to extract data from MotherDuck.",
Placeholder: "select * from my_db.my_table;",
var specs map[string]drivers.Spec = map[string]drivers.Spec{
"motherduck": {
DisplayName: "MotherDuck",
Description: "Import data from MotherDuck.",
SourceProperties: []drivers.PropertySchema{
{
Key: "sql",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "SQL",
Description: "Query to extract data from MotherDuck.",
Placeholder: "select * from my_db.my_table;",
},
},
ConfigProperties: []drivers.PropertySchema{
{
Key: "token",
Secret: true,
},
},
},
"postgres_ext": {
DisplayName: "Postgres",
Description: "Import data from Postgres table to DuckDB.",
SourceProperties: []drivers.PropertySchema{
{
Key: "sql",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "SQL",
Description: "Query to extract data from postgres",
Placeholder: "SELECT * FROM postgres_scan('dbname=postgres user=postgres password=*** host=127.0.0.1', 'public', 'users');",
Hint: "https://duckdb.org/docs/extensions/postgres_scanner.html#querying-individual-tables",
},
},
},
ConfigProperties: []drivers.PropertySchema{
{
Key: "token",
Secret: true,
"sqlite_ext": {
DisplayName: "SQLite",
Description: "Import data from SQLite table to DuckDB.",
SourceProperties: []drivers.PropertySchema{
{
Key: "sql",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "SQL",
Description: "Query to extract data from SQLite",
Placeholder: "SELECT * FROM sqlite_scan('sakila.db', 'film');",
Hint: "https://duckdb.org/docs/extensions/sqlite_scanner#querying-individual-tables",
},
},
},
}
Expand Down Expand Up @@ -144,7 +179,7 @@ func (d Driver) Drop(config map[string]any, logger *zap.Logger) error {
}

func (d Driver) Spec() drivers.Spec {
return spec
return specs[d.name]
}

func (d Driver) HasAnonymousSourceAccess(ctx context.Context, src drivers.Source, logger *zap.Logger) (bool, error) {
Expand Down Expand Up @@ -243,6 +278,9 @@ func (c *connection) AsTransporter(from, to drivers.Handle) (drivers.Transporter
if from == to {
return transporter.NewDuckDBToDuckDB(olap, c.logger), true
}
if from.Driver() == "postgres_ext" || from.Driver() == "sqlite_ext" {
return transporter.NewSQLExtensionToDuckDB(from, olap, c.logger), true
}
if from.Driver() == "motherduck" {
return transporter.NewMotherduckToDuckDB(from, olap, c.logger), true
}
Expand Down
54 changes: 54 additions & 0 deletions runtime/drivers/duckdb/transporter/sqlextension_to_duckDB.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package transporter

import (
"context"
"fmt"
"strings"

"github.com/rilldata/rill/runtime/drivers"
"go.uber.org/zap"
)

type sqlextensionToDuckDB struct {
to drivers.OLAPStore
from drivers.Handle
logger *zap.Logger
}

var _ drivers.Transporter = &sqlextensionToDuckDB{}

// NewSQLExtensionToDuckDB returns a transporter to transfer data to duckdb from a sql extension supported by duckdb.
func NewSQLExtensionToDuckDB(from drivers.Handle, to drivers.OLAPStore, logger *zap.Logger) drivers.Transporter {
return &sqlextensionToDuckDB{
to: to,
from: from,
logger: logger,
}
}

func (t *sqlextensionToDuckDB) Transfer(ctx context.Context, source drivers.Source, sink drivers.Sink, opts *drivers.TransferOpts, p drivers.Progress) error {
src, ok := source.DatabaseSource()
if !ok {
return fmt.Errorf("type of source should be `drivers.DatabaseSource`")
}
fSink, ok := sink.DatabaseSink()
if !ok {
return fmt.Errorf("type of source should be `drivers.DatabaseSink`")
}

extensionName := extName(t.from.Driver())
if err := t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("INSTALL '%s'; LOAD '%s';", extensionName, extensionName)}); err != nil {
return fmt.Errorf("failed to load %s extension %w", extensionName, err)
}

userQuery := strings.TrimSpace(src.SQL)
userQuery, _ = strings.CutSuffix(userQuery, ";") // trim trailing semi colon
query := fmt.Sprintf("CREATE OR REPLACE TABLE %s AS (%s);", safeName(fSink.Table), userQuery)

return t.to.Exec(ctx, &drivers.Statement{Query: query, Priority: 1})
}

func extName(name string) string {
before, _ := strings.CutSuffix(name, "_ext")
return before
}
52 changes: 52 additions & 0 deletions runtime/drivers/duckdb/transporter/sqlextension_to_duckDB_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package transporter

import (
"context"
"database/sql"
"fmt"
"testing"

"github.com/rilldata/rill/runtime/drivers"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
_ "modernc.org/sqlite"
)

func Test_sqlextensionToDuckDB_Transfer(t *testing.T) {
tempDir := t.TempDir()

dbPath := fmt.Sprintf("%s.db", tempDir)
db, err := sql.Open("sqlite", dbPath)
require.NoError(t, err)

_, err = db.Exec(`
drop table if exists t;
create table t(i);
insert into t values(42), (314);
`)
require.NoError(t, err)
db.Close()

from, err := drivers.Open("sqlite_ext", map[string]any{"dsn": ""}, false, zap.NewNop())
require.NoError(t, err)
to, err := drivers.Open("duckdb", map[string]any{"dsn": ""}, false, zap.NewNop())
require.NoError(t, err)
olap, _ := to.AsOLAP("")

tr := &sqlextensionToDuckDB{
to: olap,
from: from,
logger: zap.NewNop(),
}
query := fmt.Sprintf("SELECT * FROM sqlite_scan('%s', 't');", dbPath)
err = tr.Transfer(context.Background(), &drivers.DatabaseSource{SQL: query}, &drivers.DatabaseSink{Table: "test"}, &drivers.TransferOpts{}, drivers.NoOpProgress{})
require.NoError(t, err)

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT count(*) from test"})
require.NoError(t, err)
res.Next()
var count int
err = res.Scan(&count)
require.NoError(t, err)
require.Equal(t, 2, count)
}
18 changes: 18 additions & 0 deletions runtime/reconcilers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,24 @@ func driversSource(conn drivers.Handle, propsPB *structpb.Struct) (drivers.Sourc
SQL: query,
Database: db,
}, nil
case "postgres_ext":
query, ok := props["sql"].(string)
if !ok {
return nil, fmt.Errorf("property \"sql\" is mandatory for connector \"postgres\"")
}

return &drivers.DatabaseSource{
SQL: query,
}, nil
case "sqlite_ext":
query, ok := props["sql"].(string)
if !ok {
return nil, fmt.Errorf("property \"sql\" is mandatory for connector \"sqlite\"")
}

return &drivers.DatabaseSource{
SQL: query,
}, nil
case "duckdb":
query, ok := props["sql"].(string)
if !ok {
Expand Down
22 changes: 22 additions & 0 deletions runtime/services/catalog/migrator/sources/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,24 @@ func source(connector string, src *runtimev1.Source) (drivers.Source, error) {
SQL: query,
Props: props,
}, nil
case "postgres_ext":
query, ok := props["sql"].(string)
if !ok {
return nil, fmt.Errorf("property \"sql\" is mandatory for connector \"postgres\"")
}

return &drivers.DatabaseSource{
SQL: query,
}, nil
case "sqlite_ext":
query, ok := props["sql"].(string)
if !ok {
return nil, fmt.Errorf("property \"sql\" is mandatory for connector \"sqlite\"")
}

return &drivers.DatabaseSource{
SQL: query,
}, nil
default:
return nil, fmt.Errorf("connector %v not supported", connector)
}
Expand Down Expand Up @@ -433,6 +451,10 @@ func connectorVariables(src *runtimev1.Source, env map[string]string, repoRoot s
case "motherduck":
vars["token"] = env["token"]
vars["dsn"] = ""
case "postgres_ext":
vars["dsn"] = ""
case "sqlite_ext":
vars["dsn"] = ""
case "local_file":
vars["dsn"] = repoRoot
case "bigquery":
Expand Down
4 changes: 3 additions & 1 deletion web-common/src/features/sources/modal/AddSourceModal.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
"https",
"local_file",
"motherduck",
"postgres_ext",
"sqlite_ext",
"bigquery",
];

Expand Down Expand Up @@ -91,7 +93,7 @@
</TabGroup>
</div>
<div class="flex-grow overflow-y-auto">
{#if selectedConnector?.name === "gcs" || selectedConnector?.name === "s3" || selectedConnector?.name === "https" || selectedConnector?.name === "motherduck" || selectedConnector?.name === "bigquery"}
{#if selectedConnector?.name === "gcs" || selectedConnector?.name === "s3" || selectedConnector?.name === "https" || selectedConnector?.name === "motherduck" || selectedConnector?.name === "postgres_ext" || selectedConnector?.name === "sqlite_ext" || selectedConnector?.name === "bigquery"}
{#key selectedConnector}
<RemoteSourceForm connector={selectedConnector} on:close />
{/key}
Expand Down
22 changes: 22 additions & 0 deletions web-common/src/features/sources/modal/yupSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,28 @@ export function getYupSchema(connector: V1ConnectorSpec) {
)
.required("Source name is required"),
});
case "postgres_ext":
return yup.object().shape({
sql: yup.string().required("sql is required"),
sourceName: yup
.string()
.matches(
/^[a-zA-Z_][a-zA-Z0-9_]*$/,
"Source name must start with a letter or underscore and contain only letters, numbers, and underscores"
)
.required("Source name is required"),
});
case "sqlite_ext":
return yup.object().shape({
sql: yup.string().required("sql is required"),
sourceName: yup
.string()
.matches(
/^[a-zA-Z_][a-zA-Z0-9_]*$/,
"Source name must start with a letter or underscore and contain only letters, numbers, and underscores"
)
.required("Source name is required"),
});
case "bigquery":
return yup.object().shape({
sql: yup.string().required("sql is required"),
Expand Down