Skip to content

Commit

Permalink
add backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anshul committed Dec 10, 2024
1 parent 1f1ddfa commit 584bfba
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 3 deletions.
2 changes: 1 addition & 1 deletion runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (d Driver) Open(instanceID string, cfgMap map[string]any, st *storage.Clien
if err != nil {
// Check for another process currently accessing the DB
if strings.Contains(err.Error(), "Could not set lock on file") {
return nil, fmt.Errorf("failed to open database (is Rill already running?): %w", err)
panic(fmt.Errorf("failed to open database (is Rill already running?): %w", err))
}
return nil, err
}
Expand Down
46 changes: 45 additions & 1 deletion runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"errors"
"fmt"
"net"
"net/url"
"strings"

"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/duckdbsql"
Expand Down Expand Up @@ -123,7 +125,8 @@ func (t *duckDBToDuckDB) transferFromExternalDB(ctx context.Context, srcProps *d
safeTempTable := safeName(sinkProps.Table + "__temp__")
switch t.externalDBType {
case "mysql":
initSQL = append(initSQL, "INSTALL 'MYSQL'; LOAD 'MYSQL';", fmt.Sprintf("ATTACH %s AS %s (TYPE mysql, READ_ONLY)", safeSQLString(srcProps.Database), safeDBName))
dsn := rewriteMySQLDSN(srcProps.Database)
initSQL = append(initSQL, "INSTALL 'MYSQL'; LOAD 'MYSQL';", fmt.Sprintf("ATTACH %s AS %s (TYPE mysql, READ_ONLY)", safeSQLString(dsn), safeDBName))
case "postgres":
initSQL = append(initSQL, "INSTALL 'POSTGRES'; LOAD 'POSTGRES';", fmt.Sprintf("ATTACH %s AS %s (TYPE postgres, READ_ONLY)", safeSQLString(srcProps.Database), safeDBName))
case "duckdb":
Expand Down Expand Up @@ -209,3 +212,44 @@ func rewriteLocalPaths(ast *duckdbsql.AST, basePath string, allowHostAccess bool

return ast.Format()
}

// rewriteMySQLDSN rewrites a MySQL DSN to a format that DuckDB expects.
// DuckDB does not support the URI based DSN format yet. It expects the DSN to be in the form of key=value pairs.
// This function parses the MySQL URI based DSN and converts it to the key=value format. It only converts the common parameters.
// For more advanced parameters like SSL configs, the user should manually convert the DSN to the key=value format.
// If there is an error parsing the DSN, it returns the DSN as is.
func rewriteMySQLDSN(dsn string) string {
cfg, err := mysql.ParseDSN(dsn)
if err != nil {
// If we can't parse the DSN, just return it as is. May be it is already in the form duckdb expects.
return dsn
}

var sb strings.Builder

if cfg.User != "" {
sb.WriteString(fmt.Sprintf("user=%s ", cfg.User))
}
if cfg.Passwd != "" {
sb.WriteString(fmt.Sprintf("password=%s ", cfg.Passwd))
}
if cfg.DBName != "" {
sb.WriteString(fmt.Sprintf("database=%s ", cfg.DBName))
}
switch cfg.Net {
case "unix":
sb.WriteString(fmt.Sprintf("socket=%s ", cfg.Addr))
case "tcp", "tcp6":
host, port, err := net.SplitHostPort(cfg.Addr)
if err != nil {
return dsn
}
sb.WriteString(fmt.Sprintf("host=%s ", host))
if port != "" {
sb.WriteString(fmt.Sprintf("port=%s ", port))
}
default:
return dsn
}
return sb.String()
}
2 changes: 1 addition & 1 deletion runtime/drivers/duckdb/transporter_mysql_to_duckDB_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestMySQLToDuckDBTransfer(t *testing.T) {
defer db.Close()

t.Run("AllDataTypes", func(t *testing.T) {
allMySQLDataTypesTest(t, db, fmt.Sprintf("host=%s user=myuser password=mypassword port=%v database=mydb", host, port.Int()))
allMySQLDataTypesTest(t, db, dsn)
})
}

Expand Down
9 changes: 9 additions & 0 deletions runtime/pkg/rduckdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ func NewDB(ctx context.Context, opts *DBOptions) (DB, error) {
}
return nil, err
}

// We want to prevent multiple rill process accessing same db files.
// All the files are accessed in read-only mode so it is possible for multiple rill process to access same db files.
// To prevent this we attach a dummy db file to the main in-memory db in write mode.
// This is required for local rill only but since there is no way to determine it in this package so we do it for all.
_, err = db.dbHandle.ExecContext(ctx, fmt.Sprintf("ATTACH %s AS __ymmud__", safeSQLString(filepath.Join(db.localPath, "main.db"))))
if err != nil {
return nil, err
}
go db.localDBMonitor()
return db, nil
}
Expand Down

0 comments on commit 584bfba

Please sign in to comment.