Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime: duckdb sqlite extensions based connector for sqlite #3018

Merged
merged 14 commits into from
Sep 26, 2023
Merged
2 changes: 2 additions & 0 deletions runtime/compilers/rillv1/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func driverSourceForAnonAccessCheck(connector string, src *runtimev1.SourceSpec)
}
case "motherduck":
return &drivers.DatabaseSource{}
case "sqlite":
return &drivers.DatabaseSource{}
default:
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions runtime/compilers/rillv1beta/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func source(connector string, src *runtimev1.Source) drivers.Source {
}
case "motherduck":
return &drivers.DatabaseSource{}
case "sqlite":
return &drivers.DatabaseSource{}
case "bigquery":
return &drivers.DatabaseSource{
Props: props,
Expand Down
3 changes: 3 additions & 0 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ func (c *connection) AsTransporter(from, to drivers.Handle) (drivers.Transporter
if from == to {
return transporter.NewDuckDBToDuckDB(olap, c.logger), true
}
if from.Driver() == "sqlite" {
return transporter.NewSQLExtensionToDuckDB(from, olap, c.logger), true
}
if from.Driver() == "motherduck" {
return transporter.NewMotherduckToDuckDB(from, olap, c.logger), true
}
Expand Down
50 changes: 50 additions & 0 deletions runtime/drivers/duckdb/transporter/sqlextension_to_duckDB.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package transporter

import (
"context"
"fmt"
"strings"

"github.com/rilldata/rill/runtime/drivers"
"go.uber.org/zap"
)

type sqlextensionToDuckDB struct {
to drivers.OLAPStore
from drivers.Handle
logger *zap.Logger
}

var _ drivers.Transporter = &sqlextensionToDuckDB{}

// NewSQLExtensionToDuckDB returns a transporter to transfer data to duckdb from a sql extension supported by duckdb.
// Currently only sqlite extension is supported. Postgres is not supported due to licensing issues.
func NewSQLExtensionToDuckDB(from drivers.Handle, to drivers.OLAPStore, logger *zap.Logger) drivers.Transporter {
return &sqlextensionToDuckDB{
to: to,
from: from,
logger: logger,
}
}

func (t *sqlextensionToDuckDB) Transfer(ctx context.Context, source drivers.Source, sink drivers.Sink, opts *drivers.TransferOpts, p drivers.Progress) error {
src, ok := source.DatabaseSource()
if !ok {
return fmt.Errorf("type of source should be `drivers.DatabaseSource`")
}
fSink, ok := sink.DatabaseSink()
if !ok {
return fmt.Errorf("type of source should be `drivers.DatabaseSink`")
}

extensionName := t.from.Driver()
if err := t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("INSTALL '%s'; LOAD '%s';", extensionName, extensionName)}); err != nil {
return fmt.Errorf("failed to load %s extension %w", extensionName, err)
}

userQuery := strings.TrimSpace(src.SQL)
userQuery, _ = strings.CutSuffix(userQuery, ";") // trim trailing semi colon
query := fmt.Sprintf("CREATE OR REPLACE TABLE %s AS (%s);", safeName(fSink.Table), userQuery)
k-anshul marked this conversation as resolved.
Show resolved Hide resolved

return t.to.Exec(ctx, &drivers.Statement{Query: query, Priority: 1})
}
54 changes: 54 additions & 0 deletions runtime/drivers/duckdb/transporter/sqlextension_to_duckDB_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package transporter

import (
"context"
"database/sql"
"fmt"
"testing"

"github.com/rilldata/rill/runtime/drivers"
_ "github.com/rilldata/rill/runtime/drivers/sqlite"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
_ "modernc.org/sqlite"
)

func Test_sqlextensionToDuckDB_Transfer(t *testing.T) {
tempDir := t.TempDir()

dbPath := fmt.Sprintf("%s.db", tempDir)
db, err := sql.Open("sqlite", dbPath)
require.NoError(t, err)

_, err = db.Exec(`
drop table if exists t;
create table t(i);
insert into t values(42), (314);
`)
require.NoError(t, err)
db.Close()

from, err := drivers.Open("sqlite", map[string]any{"dsn": ""}, false, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
to, err := drivers.Open("duckdb", map[string]any{"dsn": ""}, false, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
olap, _ := to.AsOLAP("")

tr := &sqlextensionToDuckDB{
to: olap,
from: from,
logger: zap.NewNop(),
}
query := fmt.Sprintf("SELECT * FROM sqlite_scan('%s', 't');", dbPath)
err = tr.Transfer(context.Background(), &drivers.DatabaseSource{SQL: query}, &drivers.DatabaseSink{Table: "test"}, &drivers.TransferOpts{}, drivers.NoOpProgress{})
require.NoError(t, err)

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT count(*) from test"})
require.NoError(t, err)
res.Next()
var count int
err = res.Scan(&count)
require.NoError(t, err)
require.Equal(t, 2, count)
}
17 changes: 16 additions & 1 deletion runtime/drivers/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

func init() {
drivers.Register("sqlite", driver{})
drivers.RegisterAsConnector("sqlite", driver{})
}

type driver struct{}
Expand Down Expand Up @@ -55,7 +56,21 @@ func (d driver) Drop(config map[string]any, logger *zap.Logger) error {
}

func (d driver) Spec() drivers.Spec {
return drivers.Spec{}
return drivers.Spec{
DisplayName: "SQLite",
Description: "Import data from SQLite table to DuckDB.",
SourceProperties: []drivers.PropertySchema{
{
Key: "sql",
Type: drivers.StringPropertyType,
Required: true,
DisplayName: "SQL",
Description: "Query to extract data from SQLite",
Placeholder: "SELECT * FROM sqlite_scan('sqlite.db', 'film');",
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
Hint: "https://duckdb.org/docs/extensions/sqlite_scanner#querying-individual-tables",
},
},
}
}

func (d driver) HasAnonymousSourceAccess(ctx context.Context, src drivers.Source, logger *zap.Logger) (bool, error) {
Expand Down
9 changes: 9 additions & 0 deletions runtime/reconcilers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,15 @@ func driversSource(conn drivers.Handle, propsPB *structpb.Struct) (drivers.Sourc
SQL: query,
Database: db,
}, nil
case "sqlite":
query, ok := props["sql"].(string)
if !ok {
return nil, fmt.Errorf("property \"sql\" is mandatory for connector \"sqlite\"")
}

return &drivers.DatabaseSource{
SQL: query,
}, nil
case "duckdb":
query, ok := props["sql"].(string)
if !ok {
Expand Down
11 changes: 11 additions & 0 deletions runtime/services/catalog/migrator/sources/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,15 @@ func source(connector string, src *runtimev1.Source) (drivers.Source, error) {
SQL: query,
Props: props,
}, nil
case "sqlite":
query, ok := props["sql"].(string)
if !ok {
return nil, fmt.Errorf("property \"sql\" is mandatory for connector \"sqlite\"")
}

return &drivers.DatabaseSource{
SQL: query,
}, nil
default:
return nil, fmt.Errorf("connector %v not supported", connector)
}
Expand Down Expand Up @@ -434,6 +443,8 @@ func connectorVariables(src *runtimev1.Source, env map[string]string, repoRoot s
case "motherduck":
vars["token"] = env["token"]
vars["dsn"] = ""
case "sqlite":
vars["dsn"] = ""
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
case "local_file":
vars["dsn"] = repoRoot
case "bigquery":
Expand Down
3 changes: 2 additions & 1 deletion web-common/src/features/sources/modal/AddSourceModal.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"https",
"local_file",
"motherduck",
"sqlite",
"bigquery",
];

Expand Down Expand Up @@ -91,7 +92,7 @@
</TabGroup>
</div>
<div class="flex-grow overflow-y-auto">
{#if selectedConnector?.name === "gcs" || selectedConnector?.name === "s3" || selectedConnector?.name === "https" || selectedConnector?.name === "motherduck" || selectedConnector?.name === "bigquery"}
{#if selectedConnector?.name === "gcs" || selectedConnector?.name === "s3" || selectedConnector?.name === "https" || selectedConnector?.name === "motherduck" || selectedConnector?.name === "bigquery" || selectedConnector?.name === "sqlite"}
{#key selectedConnector}
<RemoteSourceForm connector={selectedConnector} on:close />
{/key}
Expand Down
11 changes: 11 additions & 0 deletions web-common/src/features/sources/modal/yupSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ export function getYupSchema(connector: V1ConnectorSpec) {
)
.required("Source name is required"),
});
case "sqlite":
return yup.object().shape({
sql: yup.string().required("sql is required"),
sourceName: yup
.string()
.matches(
/^[a-zA-Z_][a-zA-Z0-9_]*$/,
"Source name must start with a letter or underscore and contain only letters, numbers, and underscores"
)
.required("Source name is required"),
});
case "bigquery":
return yup.object().shape({
sql: yup.string().required("sql is required"),
Expand Down
Loading