Skip to content

Commit

Permalink
fix merge errors
Browse files Browse the repository at this point in the history
  • Loading branch information
pjain1 committed Dec 6, 2023
1 parent ffe983d commit d447718
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 132 deletions.
7 changes: 0 additions & 7 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,13 +855,6 @@ func (c *connection) logLimits(conn *sqlx.Conn) {
c.logger.Info("duckdb limits", zap.String("memory", memory), zap.String("threads", threads))
}

// fatalInternalError logs a critical internal error and exits the process.
// This is used for errors that are completely unrecoverable.
// Ideally, we should refactor to cleanup/reopen/rebuild so that we don't need this.
func (c *connection) fatalInternalError(err error) {
c.logger.Fatal("duckdb: critical internal error", zap.Error(err))
}

// Regex to parse human-readable size returned by DuckDB
// nolint
var humanReadableSizeRegex = regexp.MustCompile(`^([\d.]+)\s*(\S+)$`)
Expand Down
92 changes: 0 additions & 92 deletions runtime/drivers/duckdb/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,98 +631,6 @@ func (c *connection) execWithLimits(parentCtx context.Context, stmt *drivers.Sta
return err
}

// convertToEnum converts a varchar col in table to an enum type.
// Generally to be used for low cardinality varchar columns although not enforced here.
func (c *connection) convertToEnum(ctx context.Context, table, col string) error {
if !c.config.ExtTableStorage {
return fmt.Errorf("`cast_to_enum` is only supported when `external_table_storage` is enabled")
}
c.logger.Info("convert column to enum", zap.String("table", table), zap.String("col", col))

version, exist, err := c.tableVersion(table)
if err != nil {
return err
}

if !exist {
return fmt.Errorf("table %q does not exist", table)
}

dbName := dbName(table, version)
enum := fmt.Sprintf("%s_enum", col)

err = c.WithConnection(ctx, 100, true, false, func(ctx, ensuredCtx context.Context, _ *dbsql.Conn) error {
// check that atleast one non nil value exists in the column
res, err := c.Execute(ctx, &drivers.Statement{Query: fmt.Sprintf("SELECT (SELECT count(%s) FROM %s.default WHERE %s IS NOT NULL) > 0 AS cnt", safeSQLName(col), safeSQLName(dbName), safeSQLName(col))})
if err != nil {
return err
}

var exists bool
if res.Next() {
if err := res.Scan(&exists); err != nil {
_ = res.Close()
return err
}
}
_ = res.Close()
if !exists {
return fmt.Errorf("column %q can't be converted to enum, has zero non null values", col)
}

// scan current db and current schema
res, err = c.Execute(ctx, &drivers.Statement{Query: "SELECT current_database(), current_schema()"})
if err != nil {
return err
}

var currentDB, currentSchema string
if res.Next() {
if err := res.Scan(&currentDB, &currentSchema); err != nil {
_ = res.Close()
return err
}
}
_ = res.Close()

// switch to source db
// this is only required since duckdb has bugs around db scoped custom types
// TODO: remove this when https://github.com/duckdb/duckdb/pull/9622 is released
err = c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("USE %s", dbName)})
if err != nil {
return fmt.Errorf("failed switch db %q: %w", dbName, err)
}
defer func() {
// switch to original db, notice `db.schema` just doing USE db switches context to `main` schema in the current db if doing `USE main`
// we want to switch to original db and schema
err = c.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("USE %s.%s", safeSQLName(currentDB), safeSQLName(currentSchema))})
if err != nil {
// This should NEVER happen
c.fatalInternalError(fmt.Errorf("failed to switch back from db %q: %w", dbName, err))
}
}()

err = c.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("CREATE TYPE %s AS ENUM (SELECT DISTINCT %s FROM \"default\" WHERE %s IS NOT NULL)", safeSQLName(enum), safeSQLName(col), safeSQLName(col))})
if err != nil {
return fmt.Errorf("failed to create enum %q: %w", enum, err)
}

err = c.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("ALTER TABLE \"default\" ALTER COLUMN %s SET TYPE %s", safeSQLName(col), safeSQLName(enum))})
if err != nil {
return fmt.Errorf("failed to alter table %q: %w", table, err)
}

// recreate view to propagate schema changes
// NOTE :: db name need to be appened in the view query else query fails when switching to main db
return c.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("CREATE OR REPLACE VIEW %s.%s.%s AS SELECT * FROM %s.default", safeSQLName(currentDB), safeSQLName(currentSchema), safeSQLName(table), safeSQLName(dbName))})
})
if err != nil {
return err
}

return nil
}

func rowsToSchema(r *sqlx.Rows) (*runtimev1.StructType, error) {
if r == nil {
return nil, nil
Expand Down
30 changes: 0 additions & 30 deletions runtime/drivers/duckdb/olap_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,36 +383,6 @@ func Test_connection_InsertTableAsSelectLimits(t *testing.T) {
require.ErrorIs(t, err, drivers.ErrStorageLimitExceeded)
}

func Test_connection_CastEnum(t *testing.T) {
temp := t.TempDir()
os.Mkdir(temp, fs.ModePerm)

dbPath := filepath.Join(temp, "view.db")
handle, err := Driver{}.Open(map[string]any{"dsn": dbPath, "external_table_storage": true}, false, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
c := handle.(*connection)
require.NoError(t, c.Migrate(context.Background()))
c.AsOLAP("default")

err = c.CreateTableAsSelect(context.Background(), "test", false, "select 'hello' as name")
require.NoError(t, err)

err = c.InsertTableAsSelect(context.Background(), "test", false, "select 'world'")
require.NoError(t, err)

err = c.convertToEnum(context.Background(), "test", "name")
require.NoError(t, err)

res, err := c.Execute(context.Background(), &drivers.Statement{Query: "SELECT data_type FROM information_schema.columns WHERE column_name='name'"})
require.NoError(t, err)

var typ string
require.True(t, res.Next())
require.NoError(t, res.Scan(&typ))
require.Equal(t, "ENUM('hello', 'world')", typ)
require.NoError(t, res.Close())
}

func Test_connection_CreateTableAsSelectWithComments(t *testing.T) {
temp := t.TempDir()
require.NoError(t, os.Mkdir(filepath.Join(temp, "default"), fs.ModePerm))
Expand Down
16 changes: 13 additions & 3 deletions runtime/drivers/duckdb/transporter/sqlstore_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,12 @@ func (s *sqlStoreToDuckDB) transferFromRowIterator(ctx context.Context, iter dri
s.logger.Info("records to be ingested", zap.Uint64("rows", total))
p.Target(int64(total), drivers.ProgressUnitRecord)
}
// create table
qry, err := createTableQuery(schema, table)
// we first ingest data in a temporary table in the main db
// and then copy it to the final table to ensure that the final table is always created using CRUD APIs which takes care
// whether table goes in main db or in separate table specific db
tmpTable := fmt.Sprintf("__%s_tmp_postgres", table)
// generate create table query
qry, err := createTableQuery(schema, tmpTable)
if err != nil {
return err
}
Expand All @@ -148,7 +152,7 @@ func (s *sqlStoreToDuckDB) transferFromRowIterator(ctx context.Context, iter dri
err = s.to.WithConnection(ctx, 1, true, false, func(ctx, ensuredCtx context.Context, conn *sql.Conn) error {
// append data using appender API
return rawConn(conn, func(conn driver.Conn) error {
a, err := duckdb.NewAppenderFromConn(conn, "", table)
a, err := duckdb.NewAppenderFromConn(conn, "", tmpTable)
if err != nil {
return err
}
Expand Down Expand Up @@ -188,6 +192,12 @@ func (s *sqlStoreToDuckDB) transferFromRowIterator(ctx context.Context, iter dri
}
})
})
if err != nil {
return err
}

// copy data from temp table to target table
return s.to.CreateTableAsSelect(ctx, table, false, fmt.Sprintf("SELECT * FROM %s", tmpTable))
}

func createTableQuery(schema *runtimev1.StructType, name string) (string, error) {
Expand Down

1 comment on commit d447718

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 Published on https://ui.rilldata.com as production
🚀 Deployed on https://657082035eb8750a828d1cd7--rill-ui.netlify.app

Please sign in to comment.