diff --git a/runtime/pkg/rduckdb/README.md b/runtime/pkg/rduckdb/README.md new file mode 100644 index 00000000000..00694e81b9a --- /dev/null +++ b/runtime/pkg/rduckdb/README.md @@ -0,0 +1,18 @@ +# rduckdb + +## Motivation +1. As an embedded database, DuckDB does not inherently provide the same isolation for ETL and serving workloads that other OLAP databases offer. +2. We have observed significant degradation in query performance during data ingestion. +3. In a Kubernetes environment, it is recommended to use local disks instead of network disks, necessitating separate local disk backups. + +## Features +1. Utilizes separate DuckDB handles for reading and writing, each with distinct CPU and memory resources. +2. Automatically backs up writes to GCS in real-time. +3. Automatically restores from backups when starting with an empty local disk. + +## Examples +1. Refer to `examples/main.go` for a usage example. + +## Future Work +1. Enable writes and reads to be executed on separate machines. +2. Limit read operations to specific tables to support ephemeral tables (intermediate tables required only for writes). diff --git a/runtime/pkg/rduckdb/catalog.go b/runtime/pkg/rduckdb/catalog.go new file mode 100644 index 00000000000..09ea358cf24 --- /dev/null +++ b/runtime/pkg/rduckdb/catalog.go @@ -0,0 +1,242 @@ +package rduckdb + +import ( + "fmt" + "log/slog" + "sync" +) + +// Represents one table and its versions currently present in the local cache. +type table struct { + name string + deleted bool + currentVersion string + versionReferenceCounts map[string]int + versionMeta map[string]*tableMeta +} + +// Represents a snapshot of table versions. +// The table versions referenced by the snapshot are guaranteed to exist for as long as the snapshot is acquired. +type snapshot struct { + id int + referenceCount int + tables []*tableMeta + // if snapshot is ready to be served then ready will be marked true + ready bool +} + +// Represents a catalog of available table versions. +// It is thread-safe and supports acquiring a snapshot of table versions which will not be mutated or removed for as long as the snapshot is held. +type catalog struct { + mu sync.Mutex + tables map[string]*table + snapshots map[int]*snapshot + currentSnapshotID int + + removeVersionFunc func(string, string) + removeSnapshotFunc func(int) + + logger *slog.Logger +} + +// newCatalog creates a new catalog. +// The removeSnapshotFunc func will be called exactly once for each snapshot ID when it is no longer the current snapshot and is no longer held by any readers. +// The removeVersionFunc func will be called exactly once for each table version when it is no longer the current version and is no longer used by any active snapshots. +func newCatalog(removeVersionFunc func(string, string), removeSnapshotFunc func(int), tables []*tableMeta, logger *slog.Logger) *catalog { + c := &catalog{ + tables: make(map[string]*table), + snapshots: make(map[int]*snapshot), + removeVersionFunc: removeVersionFunc, + removeSnapshotFunc: removeSnapshotFunc, + logger: logger, + } + for _, meta := range tables { + c.tables[meta.Name] = &table{ + name: meta.Name, + currentVersion: meta.Version, + versionReferenceCounts: map[string]int{}, + versionMeta: map[string]*tableMeta{meta.Version: meta}, + } + c.acquireVersion(c.tables[meta.Name], meta.Version) + } + _ = c.acquireSnapshotUnsafe() + return c +} + +func (c *catalog) tableMeta(name string) (*tableMeta, error) { + c.mu.Lock() + defer c.mu.Unlock() + + t, ok := c.tables[name] + if !ok || t.deleted { + return nil, errNotFound + } + meta, ok := t.versionMeta[t.currentVersion] + if !ok { + panic(fmt.Errorf("internal error: meta for table %q and version %q not found", name, t.currentVersion)) + } + return meta, nil +} + +// addTableVersion registers a new version of a table. +// If the table name has not been seen before, it is added to the catalog. +func (c *catalog) addTableVersion(name string, meta *tableMeta) { + c.mu.Lock() + defer c.mu.Unlock() + + t, ok := c.tables[name] + if !ok { + t = &table{ + name: name, + versionReferenceCounts: make(map[string]int), + versionMeta: make(map[string]*tableMeta), + } + c.tables[name] = t + } + + oldVersion := t.currentVersion + t.deleted = false // In case the table was deleted previously, but a snapshot still references it. + t.currentVersion = meta.Version + t.versionMeta[meta.Version] = meta + c.acquireVersion(t, t.currentVersion) + if oldVersion != "" { + c.releaseVersion(t, oldVersion) + } + + c.currentSnapshotID++ + c.acquireSnapshotUnsafe() + if c.currentSnapshotID > 1 { + c.releaseSnapshotUnsafe(c.snapshots[c.currentSnapshotID-1]) + } +} + +// removeTable removes a table from the catalog. +// If the table is currently used by a snapshot, it will stay in the catalog but marked with deleted=true. +// When the last snapshot referencing the table is released, the table will be removed completely. +func (c *catalog) removeTable(name string) { + c.mu.Lock() + defer c.mu.Unlock() + + t, ok := c.tables[name] + if !ok { + c.logger.Debug("table not found in rduckdb catalog", slog.String("name", name)) + return + } + + oldVersion := t.currentVersion + t.deleted = true + t.currentVersion = "" + + c.currentSnapshotID++ + c.acquireSnapshotUnsafe() + c.releaseVersion(t, oldVersion) + if c.currentSnapshotID > 1 { + c.releaseSnapshotUnsafe(c.snapshots[c.currentSnapshotID-1]) + } +} + +// listTables returns tableMeta for all active tables present in the catalog. +func (c *catalog) listTables() []*tableMeta { + c.mu.Lock() + defer c.mu.Unlock() + + tables := make([]*tableMeta, 0) + for _, t := range c.tables { + if t.deleted { + continue + } + meta, ok := t.versionMeta[t.currentVersion] + if !ok { + c.logger.Error("internal error: meta for table not found in catalog", slog.String("name", t.name), slog.String("version", t.currentVersion)) + } + tables = append(tables, meta) + } + return tables +} + +// acquireSnapshot acquires a snapshot of the current table versions. +func (c *catalog) acquireSnapshot() *snapshot { + c.mu.Lock() + defer c.mu.Unlock() + return c.acquireSnapshotUnsafe() +} + +func (c *catalog) acquireSnapshotUnsafe() *snapshot { + s, ok := c.snapshots[c.currentSnapshotID] + if ok { + s.referenceCount++ + return s + } + // first acquire + s = &snapshot{ + id: c.currentSnapshotID, + referenceCount: 1, + tables: make([]*tableMeta, 0), + } + for _, t := range c.tables { + if t.deleted { + continue + } + + meta, ok := t.versionMeta[t.currentVersion] + if !ok { + panic(fmt.Errorf("internal error: meta for table %q version %q not found in catalog", t.name, t.currentVersion)) + } + s.tables = append(s.tables, meta) + c.acquireVersion(t, t.currentVersion) + } + c.snapshots[c.currentSnapshotID] = s + return s +} + +// releaseSnapshot releases a snapshot of table versions. +func (c *catalog) releaseSnapshot(s *snapshot) { + c.mu.Lock() + defer c.mu.Unlock() + c.releaseSnapshotUnsafe(s) +} + +func (c *catalog) releaseSnapshotUnsafe(s *snapshot) { + s.referenceCount-- + if s.referenceCount > 0 { + return + } + + for _, meta := range s.tables { + t, ok := c.tables[meta.Name] + if !ok { + panic(fmt.Errorf("internal error: table %q not found in catalog", meta.Name)) + } + c.releaseVersion(t, meta.Version) + } + delete(c.snapshots, s.id) + c.removeSnapshotFunc(s.id) +} + +// acquireVersion increments the reference count of a table version. +// It must be called while holding the catalog mutex. +func (c *catalog) acquireVersion(t *table, version string) { + referenceCount := t.versionReferenceCounts[version] + referenceCount++ + t.versionReferenceCounts[version] = referenceCount +} + +// releaseVersion decrements the reference count of a table version. +// If the reference count reaches zero and the version is no longer the current version, it is removec. +func (c *catalog) releaseVersion(t *table, version string) { + referenceCount, ok := t.versionReferenceCounts[version] + if !ok { + panic(fmt.Errorf("internal error: version %q of table %q not found in catalog", t.currentVersion, t.name)) + } + referenceCount-- + if referenceCount > 0 { + t.versionReferenceCounts[version] = referenceCount + return + } + + delete(t.versionReferenceCounts, version) + if t.deleted && len(t.versionReferenceCounts) == 0 { + delete(c.tables, t.name) + } + c.removeVersionFunc(t.name, version) +} diff --git a/runtime/pkg/rduckdb/db.go b/runtime/pkg/rduckdb/db.go new file mode 100644 index 00000000000..76d8e494e75 --- /dev/null +++ b/runtime/pkg/rduckdb/db.go @@ -0,0 +1,1139 @@ +package rduckdb + +import ( + "context" + "database/sql" + "database/sql/driver" + "encoding/json" + "errors" + "fmt" + "io/fs" + "log/slog" + "net/url" + "os" + "path/filepath" + "regexp" + "slices" + "strconv" + "strings" + "time" + + "github.com/XSAM/otelsql" + "github.com/jmoiron/sqlx" + "github.com/marcboeker/go-duckdb" + "github.com/mitchellh/mapstructure" + "go.opentelemetry.io/otel/attribute" + "gocloud.dev/blob" + "golang.org/x/sync/semaphore" +) + +var errNotFound = errors.New("rduckdb: not found") + +type DB interface { + // Close closes the database. + Close() error + + // AcquireReadConnection returns a connection to the database for reading. + // Once done the connection should be released by calling the release function. + // This connection must only be used for select queries or for creating and working with temporary tables. + AcquireReadConnection(ctx context.Context) (conn *sqlx.Conn, release func() error, err error) + + // Size returns the size of the database in bytes. + // It is currently implemented as sum of the size of all serving `.db` files. + Size() int64 + + // CRUD APIs + + // CreateTableAsSelect creates a new table by name from the results of the given SQL query. + CreateTableAsSelect(ctx context.Context, name string, sql string, opts *CreateTableOptions) error + + // MutateTable allows mutating a table in the database by calling the mutateFn. + MutateTable(ctx context.Context, name string, mutateFn func(ctx context.Context, conn *sqlx.Conn) error) error + + // DropTable removes a table from the database. + DropTable(ctx context.Context, name string) error + + // RenameTable renames a table in the database. + RenameTable(ctx context.Context, oldName, newName string) error +} + +type DBOptions struct { + // LocalPath is the path where local db files will be stored. Should be unique for each database. + LocalPath string + // Remote is the blob storage bucket where the database files will be stored. This is the source of truth. + // The local db will be eventually synced with the remote. + Remote *blob.Bucket + + // ReadSettings are settings applied the read duckDB handle. + ReadSettings map[string]string + // WriteSettings are settings applied the write duckDB handle. + WriteSettings map[string]string + // InitQueries are the queries to run when the database is first created. + InitQueries []string + + Logger *slog.Logger + OtelAttributes []attribute.KeyValue +} + +func (d *DBOptions) ValidateSettings() error { + read := &settings{} + err := mapstructure.Decode(d.ReadSettings, read) + if err != nil { + return fmt.Errorf("read settings: %w", err) + } + + write := &settings{} + err = mapstructure.Decode(d.WriteSettings, write) + if err != nil { + return fmt.Errorf("write settings: %w", err) + } + + // no memory limits defined + // divide memory equally between read and write + if read.MaxMemory == "" && write.MaxMemory == "" { + connector, err := duckdb.NewConnector("", nil) + if err != nil { + return fmt.Errorf("unable to create duckdb connector: %w", err) + } + defer connector.Close() + db := sql.OpenDB(connector) + defer db.Close() + + row := db.QueryRow("SELECT value FROM duckdb_settings() WHERE name = 'max_memory'") + var maxMemory string + err = row.Scan(&maxMemory) + if err != nil { + return fmt.Errorf("unable to get max_memory: %w", err) + } + + bytes, err := humanReadableSizeToBytes(maxMemory) + if err != nil { + return fmt.Errorf("unable to parse max_memory: %w", err) + } + + read.MaxMemory = fmt.Sprintf("%d bytes", int64(bytes)/2) + write.MaxMemory = fmt.Sprintf("%d bytes", int64(bytes)/2) + } + + if read.MaxMemory == "" != (write.MaxMemory == "") { + // only one is defined + var mem string + if read.MaxMemory != "" { + mem = read.MaxMemory + } else { + mem = write.MaxMemory + } + + bytes, err := humanReadableSizeToBytes(mem) + if err != nil { + return fmt.Errorf("unable to parse max_memory: %w", err) + } + + read.MaxMemory = fmt.Sprintf("%d bytes", int64(bytes)/2) + write.MaxMemory = fmt.Sprintf("%d bytes", int64(bytes)/2) + } + + var readThread, writeThread int + if read.Threads != "" { + readThread, err = strconv.Atoi(read.Threads) + if err != nil { + return fmt.Errorf("unable to parse read threads: %w", err) + } + } + if write.Threads != "" { + writeThread, err = strconv.Atoi(write.Threads) + if err != nil { + return fmt.Errorf("unable to parse write threads: %w", err) + } + } + + if readThread == 0 && writeThread == 0 { + connector, err := duckdb.NewConnector("", nil) + if err != nil { + return fmt.Errorf("unable to create duckdb connector: %w", err) + } + defer connector.Close() + db := sql.OpenDB(connector) + defer db.Close() + + row := db.QueryRow("SELECT value FROM duckdb_settings() WHERE name = 'threads'") + var threads int + err = row.Scan(&threads) + if err != nil { + return fmt.Errorf("unable to get threads: %w", err) + } + + read.Threads = strconv.Itoa((threads + 1) / 2) + write.Threads = strconv.Itoa(threads / 2) + } + + if readThread == 0 != (writeThread == 0) { + // only one is defined + var threads int + if readThread != 0 { + threads = readThread + } else { + threads = writeThread + } + + read.Threads = strconv.Itoa((threads + 1) / 2) + if threads <= 3 { + write.Threads = "1" + } else { + write.Threads = strconv.Itoa(threads / 2) + } + } + + err = mapstructure.WeakDecode(read, &d.ReadSettings) + if err != nil { + return fmt.Errorf("failed to update read settings: %w", err) + } + + err = mapstructure.WeakDecode(write, &d.WriteSettings) + if err != nil { + return fmt.Errorf("failed to update write settings: %w", err) + } + return nil +} + +type CreateTableOptions struct { + // View specifies whether the created table is a view. + View bool + // If BeforeCreateFn is set, it will be executed before the create query is executed. + BeforeCreateFn func(ctx context.Context, conn *sqlx.Conn) error + // If AfterCreateFn is set, it will be executed after the create query is executed. + AfterCreateFn func(ctx context.Context, conn *sqlx.Conn) error +} + +// NewDB creates a new DB instance. +// dbIdentifier is a unique identifier for the database reported in metrics. +func NewDB(ctx context.Context, opts *DBOptions) (DB, error) { + err := opts.ValidateSettings() + if err != nil { + return nil, err + } + + bgctx, cancel := context.WithCancel(context.Background()) + db := &db{ + opts: opts, + localPath: opts.LocalPath, + remote: opts.Remote, + writeSem: semaphore.NewWeighted(1), + metaSem: semaphore.NewWeighted(1), + localDirty: true, + logger: opts.Logger, + ctx: bgctx, + cancel: cancel, + } + // create local path + err = os.MkdirAll(db.localPath, fs.ModePerm) + if err != nil { + return nil, fmt.Errorf("unable to create local path: %w", err) + } + + // sync local data + err = db.pullFromRemote(ctx, false) + if err != nil { + return nil, err + } + + // collect all tables + var tables []*tableMeta + _ = db.iterateLocalTables(false, func(name string, meta *tableMeta) error { + tables = append(tables, meta) + return nil + }) + + // catalog + db.catalog = newCatalog( + func(name, version string) { + go func() { + err = db.removeTableVersion(bgctx, name, version) + if err != nil && !errors.Is(err, context.Canceled) { + db.logger.Error("error in removing table version", slog.String("name", name), slog.String("version", version), slog.String("error", err.Error())) + } + }() + }, + func(i int) { + go func() { + err = db.removeSnapshot(bgctx, i) + if err != nil && !errors.Is(err, context.Canceled) { + db.logger.Error("error in removing snapshot", slog.Int("id", i), slog.String("error", err.Error())) + } + }() + }, + tables, + opts.Logger, + ) + + db.dbHandle, err = db.openDBAndAttach(ctx, "", "", true) + if err != nil { + if strings.Contains(err.Error(), "Symbol not found") { + fmt.Printf("Your version of macOS is not supported. Please upgrade to the latest major release of macOS. See this link for details: https://support.apple.com/en-in/macos/upgrade") + os.Exit(1) + } + return nil, err + } + go db.localDBMonitor() + return db, nil +} + +type db struct { + opts *DBOptions + + localPath string + remote *blob.Bucket + + // dbHandle serves executes meta queries and serves read queries + dbHandle *sqlx.DB + // writeSem ensures only one write operation is allowed at a time + writeSem *semaphore.Weighted + // metaSem enures only one meta operation can run on a duckb handle. + // Meta operations are attach, detach, create view queries done on the db handle + metaSem *semaphore.Weighted + // localDirty is set to true when a change is committed to the remote but not yet reflected in the local db + localDirty bool + catalog *catalog + + logger *slog.Logger + + // ctx and cancel to cancel background operations + ctx context.Context + cancel context.CancelFunc +} + +var _ DB = &db{} + +func (d *db) Close() error { + // close background operations + d.cancel() + return d.dbHandle.Close() +} + +func (d *db) AcquireReadConnection(ctx context.Context) (*sqlx.Conn, func() error, error) { + snapshot := d.catalog.acquireSnapshot() + + conn, err := d.dbHandle.Connx(ctx) + if err != nil { + return nil, nil, err + } + + err = d.prepareSnapshot(ctx, conn, snapshot) + if err != nil { + _ = conn.Close() + return nil, nil, err + } + + release := func() error { + d.catalog.releaseSnapshot(snapshot) + return conn.Close() + } + return conn, release, nil +} + +func (d *db) CreateTableAsSelect(ctx context.Context, name, query string, opts *CreateTableOptions) error { + d.logger.Debug("create: create table", slog.String("name", name), slog.Bool("view", opts.View)) + err := d.writeSem.Acquire(ctx, 1) + if err != nil { + return err + } + defer d.writeSem.Release(1) + + // pull latest changes from remote + err = d.pullFromRemote(ctx, true) + if err != nil { + return err + } + + // check if some older version exists + oldMeta, _ := d.catalog.tableMeta(name) + if oldMeta != nil { + d.logger.Debug("create: old version", slog.String("table", name), slog.String("version", oldMeta.Version)) + } + + // create new version directory + newVersion := newVersion() + newMeta := &tableMeta{ + Name: name, + Version: newVersion, + CreatedVersion: newVersion, + } + var dsn string + if opts.View { + dsn = "" + newMeta.SQL = query + err = d.initLocalTable(name, "") + if err != nil { + return fmt.Errorf("create: unable to create dir %q: %w", name, err) + } + } else { + err = d.initLocalTable(name, newVersion) + if err != nil { + return fmt.Errorf("create: unable to create dir %q: %w", name, err) + } + dsn = d.localDBPath(name, newVersion) + } + + // need to attach existing table so that any views dependent on this table are correctly attached + conn, release, err := d.acquireWriteConn(ctx, dsn, name, true) + if err != nil { + return err + } + defer func() { + _ = release() + }() + + safeName := safeSQLName(name) + var typ string + if opts.View { + typ = "VIEW" + } else { + typ = "TABLE" + } + newMeta.Type = typ + if opts.BeforeCreateFn != nil { + err = opts.BeforeCreateFn(ctx, conn) + if err != nil { + return fmt.Errorf("create: BeforeCreateFn returned error: %w", err) + } + } + // ingest data + _, err = conn.ExecContext(ctx, fmt.Sprintf("CREATE OR REPLACE %s %s AS (%s\n)", typ, safeName, query), nil) + if err != nil { + return fmt.Errorf("create: create %s %q failed: %w", typ, name, err) + } + if opts.AfterCreateFn != nil { + err = opts.AfterCreateFn(ctx, conn) + if err != nil { + return fmt.Errorf("create: AfterCreateFn returned error: %w", err) + } + } + + // close write handle before syncing local so that temp files or wal files are removed + err = release() + if err != nil { + return err + } + + // update remote data and metadata + if err := d.pushToRemote(ctx, name, oldMeta, newMeta); err != nil { + return fmt.Errorf("create: replicate failed: %w", err) + } + d.logger.Debug("create: remote table updated", slog.String("name", name)) + // no errors after this point since background goroutine will eventually sync the local db + + // update local metadata + err = d.writeTableMeta(name, newMeta) + if err != nil { + d.logger.Debug("create: error in writing table meta", slog.String("name", name), slog.String("error", err.Error())) + return nil + } + + d.catalog.addTableVersion(name, newMeta) + d.localDirty = false + return nil +} + +func (d *db) MutateTable(ctx context.Context, name string, mutateFn func(ctx context.Context, conn *sqlx.Conn) error) error { + d.logger.Debug("mutate table", slog.String("name", name)) + err := d.writeSem.Acquire(ctx, 1) + if err != nil { + return err + } + defer d.writeSem.Release(1) + + // pull latest changes from remote + err = d.pullFromRemote(ctx, true) + if err != nil { + return err + } + + oldMeta, err := d.catalog.tableMeta(name) + if err != nil { + if errors.Is(err, errNotFound) { + return fmt.Errorf("mutate: Table %q not found", name) + } + return fmt.Errorf("mutate: unable to get table meta: %w", err) + } + + // create new version directory + newVersion := newVersion() + err = copyDir(d.localTableDir(name, newVersion), d.localTableDir(name, oldMeta.Version)) + if err != nil { + return fmt.Errorf("mutate: copy table failed: %w", err) + } + + // acquire write connection + // need to ignore attaching table since it is already present in the db file + conn, release, err := d.acquireWriteConn(ctx, d.localDBPath(name, newVersion), name, false) + if err != nil { + return err + } + + err = mutateFn(ctx, conn) + if err != nil { + _ = release() + return fmt.Errorf("mutate: mutate failed: %w", err) + } + + // push to remote + err = release() + if err != nil { + return fmt.Errorf("mutate: failed to close connection: %w", err) + } + meta := &tableMeta{ + Name: name, + Version: newVersion, + CreatedVersion: oldMeta.CreatedVersion, + Type: oldMeta.Type, + SQL: oldMeta.SQL, + } + err = d.pushToRemote(ctx, name, oldMeta, meta) + if err != nil { + return fmt.Errorf("mutate: replicate failed: %w", err) + } + // no errors after this point since background goroutine will eventually sync the local db + + // update local meta + err = d.writeTableMeta(name, meta) + if err != nil { + d.logger.Debug("mutate: error in writing table meta", slog.String("name", name), slog.String("error", err.Error())) + return nil + } + + d.catalog.addTableVersion(name, meta) + d.localDirty = false + return nil +} + +// DropTable implements DB. +func (d *db) DropTable(ctx context.Context, name string) error { + d.logger.Debug("drop table", slog.String("name", name)) + err := d.writeSem.Acquire(ctx, 1) + if err != nil { + return err + } + defer d.writeSem.Release(1) + + // pull latest changes from remote + err = d.pullFromRemote(ctx, true) + if err != nil { + return fmt.Errorf("drop: unable to pull from remote: %w", err) + } + + // check if table exists + _, err = d.catalog.tableMeta(name) + if err != nil { + if errors.Is(err, errNotFound) { + return fmt.Errorf("drop: Table %q not found", name) + } + return fmt.Errorf("drop: unable to get table meta: %w", err) + } + + // drop the table from remote + d.localDirty = true + err = d.deleteRemote(ctx, name, "") + if err != nil { + return fmt.Errorf("drop: unable to drop table %q from remote: %w", name, err) + } + // no errors after this point since background goroutine will eventually sync the local db + + d.catalog.removeTable(name) + d.localDirty = false + return nil +} + +func (d *db) RenameTable(ctx context.Context, oldName, newName string) error { + d.logger.Debug("rename table", slog.String("from", oldName), slog.String("to", newName)) + if strings.EqualFold(oldName, newName) { + return fmt.Errorf("rename: Table with name %q already exists", newName) + } + err := d.writeSem.Acquire(ctx, 1) + if err != nil { + return err + } + defer d.writeSem.Release(1) + + // pull latest changes from remote + err = d.pullFromRemote(ctx, true) + if err != nil { + return fmt.Errorf("rename: unable to pull from remote: %w", err) + } + + oldMeta, err := d.catalog.tableMeta(oldName) + if err != nil { + if errors.Is(err, errNotFound) { + return fmt.Errorf("rename: Table %q not found", oldName) + } + return fmt.Errorf("rename: unable to get table meta: %w", err) + } + + // copy the old table to new table + newVersion := newVersion() + if oldMeta.Type == "TABLE" { + err = copyDir(d.localTableDir(newName, newVersion), d.localTableDir(oldName, oldMeta.Version)) + if err != nil { + return fmt.Errorf("rename: copy table failed: %w", err) + } + + // rename the underlying table + err = renameTable(ctx, d.localDBPath(newName, newVersion), oldName, newName) + if err != nil { + return fmt.Errorf("rename: rename table failed: %w", err) + } + } else { + err = copyDir(d.localTableDir(newName, ""), d.localTableDir(oldName, "")) + if err != nil { + return fmt.Errorf("rename: copy view failed: %w", err) + } + } + + // sync the new table and new version + meta := &tableMeta{ + Name: newName, + Version: newVersion, + CreatedVersion: newVersion, + Type: oldMeta.Type, + SQL: oldMeta.SQL, + } + if err := d.pushToRemote(ctx, newName, oldMeta, meta); err != nil { + return fmt.Errorf("rename: unable to replicate new table: %w", err) + } + + // TODO :: fix this + // at this point db is inconsistent + // has both old table and new table + + // drop the old table in remote + err = d.deleteRemote(ctx, oldName, "") + if err != nil { + return fmt.Errorf("rename: unable to delete old table %q from remote: %w", oldName, err) + } + + // no errors after this point since background goroutine will eventually sync the local db + + // update local meta for new table + err = d.writeTableMeta(newName, meta) + if err != nil { + d.logger.Debug("rename: error in writing table meta", slog.String("name", newName), slog.String("error", err.Error())) + return nil + } + + // remove old table from local db + d.catalog.removeTable(oldName) + d.catalog.addTableVersion(newName, meta) + d.localDirty = false + return nil +} + +func (d *db) localDBMonitor() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-d.ctx.Done(): + return + case <-ticker.C: + err := d.writeSem.Acquire(d.ctx, 1) + if err != nil { + if !errors.Is(err, context.Canceled) { + d.logger.Error("localDBMonitor: error in acquiring write sem", slog.String("error", err.Error())) + } + continue + } + if !d.localDirty { + d.writeSem.Release(1) + // all good + continue + } + err = d.pullFromRemote(d.ctx, true) + if err != nil && !errors.Is(err, context.Canceled) { + d.logger.Error("localDBMonitor: error in pulling from remote", slog.String("error", err.Error())) + } + d.writeSem.Release(1) + } + } +} + +func (d *db) Size() int64 { + var paths []string + _ = d.iterateLocalTables(false, func(name string, meta *tableMeta) error { + // this is to avoid counting temp tables during source ingestion + // in certain cases we only want to compute the size of the serving db files + if !strings.HasPrefix(name, "__rill_tmp_") { + paths = append(paths, d.localDBPath(meta.Name, meta.Version)) + } + return nil + }) + return fileSize(paths) +} + +// acquireWriteConn syncs the write database, initializes the write handle and returns a write connection. +// The release function should be called to release the connection. +// It should be called with the writeMu locked. +func (d *db) acquireWriteConn(ctx context.Context, dsn, table string, attachExisting bool) (*sqlx.Conn, func() error, error) { + var ignoreTable string + if !attachExisting { + ignoreTable = table + } + db, err := d.openDBAndAttach(ctx, dsn, ignoreTable, false) + if err != nil { + return nil, nil, err + } + conn, err := db.Connx(ctx) + if err != nil { + _ = db.Close() + return nil, nil, err + } + + if attachExisting { + _, err = conn.ExecContext(ctx, "DROP VIEW IF EXISTS "+safeSQLName(table)) + if err != nil { + _ = conn.Close() + _ = db.Close() + return nil, nil, err + } + } + + return conn, func() error { + _ = conn.Close() + err = db.Close() + return err + }, nil +} + +func (d *db) openDBAndAttach(ctx context.Context, uri, ignoreTable string, read bool) (*sqlx.DB, error) { + d.logger.Debug("open db", slog.Bool("read", read), slog.String("uri", uri)) + // open the db + var settings map[string]string + dsn, err := url.Parse(uri) + if err != nil { + return nil, err + } + if read { + settings = d.opts.ReadSettings + } else { + settings = d.opts.WriteSettings + } + query := dsn.Query() + for k, v := range settings { + query.Set(k, v) + } + // Rebuild DuckDB DSN (which should be "path?key=val&...") + // this is required since spaces and other special characters are valid in db file path but invalid and hence encoded in URL + connector, err := duckdb.NewConnector(generateDSN(dsn.Path, query.Encode()), func(execer driver.ExecerContext) error { + for _, qry := range d.opts.InitQueries { + _, err := execer.ExecContext(context.Background(), qry, nil) + if err != nil && strings.Contains(err.Error(), "Failed to download extension") { + // Retry using another mirror. Based on: https://github.com/duckdb/duckdb/issues/9378 + _, err = execer.ExecContext(context.Background(), qry+" FROM 'http://nightly-extensions.duckdb.org'", nil) + } + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return nil, err + } + + db := sqlx.NewDb(otelsql.OpenDB(connector), "duckdb") + err = otelsql.RegisterDBStatsMetrics(db.DB, otelsql.WithAttributes(d.opts.OtelAttributes...)) + if err != nil { + return nil, fmt.Errorf("registering db stats metrics: %w", err) + } + + conn, err := db.Connx(ctx) + if err != nil { + db.Close() + return nil, err + } + + tables := d.catalog.listTables() + err = d.attachTables(ctx, conn, tables, ignoreTable) + if err != nil { + conn.Close() + db.Close() + return nil, err + } + if err := conn.Close(); err != nil { + return nil, err + } + + // 2023-12-11: Hail mary for solving this issue: https://github.com/duckdblabs/rilldata/issues/6. + // Forces DuckDB to create catalog entries for the information schema up front (they are normally created lazily). + // Can be removed if the issue persists. + _, err = db.ExecContext(context.Background(), ` + select + coalesce(t.table_catalog, current_database()) as "database", + t.table_schema as "schema", + t.table_name as "name", + t.table_type as "type", + array_agg(c.column_name order by c.ordinal_position) as "column_names", + array_agg(c.data_type order by c.ordinal_position) as "column_types", + array_agg(c.is_nullable = 'YES' order by c.ordinal_position) as "column_nullable" + from information_schema.tables t + join information_schema.columns c on t.table_schema = c.table_schema and t.table_name = c.table_name + group by 1, 2, 3, 4 + order by 1, 2, 3, 4 + `) + if err != nil { + db.Close() + return nil, err + } + + return db, nil +} + +func (d *db) attachTables(ctx context.Context, conn *sqlx.Conn, tables []*tableMeta, ignoreTable string) error { + // sort tables by created_version + // this is to ensure that views/tables on which other views depend are attached first + slices.SortFunc(tables, func(a, b *tableMeta) int { + // all tables should be attached first and can be attached in any order + if a.Type == "TABLE" && b.Type == "TABLE" { + return 0 + } + if a.Type == "TABLE" { + return -1 + } + if b.Type == "TABLE" { + return 1 + } + // any order for views + return strings.Compare(a.CreatedVersion, b.CreatedVersion) + }) + + var failedViews []*tableMeta + // attach database files + for _, table := range tables { + if table.Name == ignoreTable { + continue + } + safeTable := safeSQLName(table.Name) + if table.Type == "VIEW" { + _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE OR REPLACE VIEW %s AS %s", safeTable, table.SQL)) + if err != nil { + if errors.Is(err, context.Canceled) { + return err + } + failedViews = append(failedViews, table) + } + continue + } + safeDBName := safeSQLName(dbName(table.Name, table.Version)) + _, err := conn.ExecContext(ctx, fmt.Sprintf("ATTACH IF NOT EXISTS %s AS %s (READ_ONLY)", safeSQLString(d.localDBPath(table.Name, table.Version)), safeDBName)) + if err != nil { + return fmt.Errorf("failed to attach table %q: %w", table.Name, err) + } + _, err = conn.ExecContext(ctx, fmt.Sprintf("CREATE OR REPLACE VIEW %s AS SELECT * FROM %s.%s", safeTable, safeDBName, safeTable)) + if err != nil { + return err + } + } + + // retry creating views + // views may depend on other views, without building a dependency graph we can not recreate them in correct order + // so we recreate all failed views and collect the ones that failed + // once a view is created successfully, it may be possible that other views that depend on it can be created in the next iteration + // if in a iteration no views are created successfully, it means either all views are invalid or there is a circular dependency + for len(failedViews) > 0 { + allViewsFailed := true + size := len(failedViews) + for i := 0; i < size; i++ { + table := failedViews[0] + failedViews = failedViews[1:] + safeTable := safeSQLName(table.Name) + _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE OR REPLACE VIEW %s AS %s", safeTable, table.SQL)) + if err != nil { + if errors.Is(err, context.Canceled) { + return err + } + failedViews = append(failedViews, table) + continue + } + // successfully created view + allViewsFailed = false + } + if !allViewsFailed { + continue + } + + // create views that return error on querying + // may be the view is incompatible with the underlying data due to schema changes + for i := 0; i < len(failedViews); i++ { + table := failedViews[i] + safeTable := safeSQLName(table.Name) + // capture the error in creating the view + _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE OR REPLACE VIEW %s AS %s", safeTable, table.SQL)) + if err == nil { + // not possible but just to be safe + continue + } + safeErr := strings.Trim(safeSQLString(err.Error()), "'") + _, err = conn.ExecContext(ctx, fmt.Sprintf("CREATE OR REPLACE VIEW %s AS SELECT error('View %s is incompatible with the underlying data: %s')", safeTable, safeTable, safeErr)) + if err != nil { + return err + } + } + break + } + return nil +} + +func (d *db) tableMeta(name string) (*tableMeta, error) { + contents, err := os.ReadFile(d.localMetaPath(name)) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil, errNotFound + } + return nil, err + } + m := &tableMeta{} + err = json.Unmarshal(contents, m) + if err != nil { + return nil, err + } + + // this is required because release version does not delete table directory as of now + _, err = os.Stat(d.localTableDir(name, m.Version)) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil, errNotFound + } + return nil, err + } + return m, nil +} + +func (d *db) writeTableMeta(name string, meta *tableMeta) error { + metaBytes, err := json.Marshal(meta) + if err != nil { + return fmt.Errorf("create: marshal meta failed: %w", err) + } + err = os.WriteFile(d.localMetaPath(name), metaBytes, fs.ModePerm) + if err != nil { + return fmt.Errorf("create: write meta failed: %w", err) + } + return nil +} + +func (d *db) localTableDir(name, version string) string { + var path string + if version == "" { + path = filepath.Join(d.localPath, name) + } else { + path = filepath.Join(d.localPath, name, version) + } + return path +} + +func (d *db) localMetaPath(table string) string { + return filepath.Join(d.localPath, table, "meta.json") +} + +func (d *db) localDBPath(table, version string) string { + return filepath.Join(d.localPath, table, version, "data.db") +} + +// initLocalTable creates a directory for the table in the local path. +// If version is provided, a version directory is also created. +func (d *db) initLocalTable(name, version string) error { + err := os.MkdirAll(d.localTableDir(name, version), fs.ModePerm) + if err != nil { + return fmt.Errorf("create: unable to create dir %q: %w", name, err) + } + return nil +} + +// removeTableVersion removes the table version from the catalog and deletes the local table files. +func (d *db) removeTableVersion(ctx context.Context, name, version string) error { + err := d.metaSem.Acquire(ctx, 1) + if err != nil { + return err + } + defer d.metaSem.Release(1) + + _, err = d.dbHandle.ExecContext(ctx, "DETACH DATABASE IF EXISTS "+dbName(name, version)) + if err != nil { + return err + } + return d.deleteLocalTableFiles(name, version) +} + +// deleteLocalTableFiles delete table files for the given table name. If version is provided, only that version is deleted. +func (d *db) deleteLocalTableFiles(name, version string) error { + return os.RemoveAll(d.localTableDir(name, version)) +} + +func (d *db) iterateLocalTables(removeInvalidTable bool, fn func(name string, meta *tableMeta) error) error { + entries, err := os.ReadDir(d.localPath) + if err != nil { + return err + } + for _, entry := range entries { + if !entry.IsDir() { + continue + } + meta, err := d.tableMeta(entry.Name()) + if err != nil { + if !removeInvalidTable { + continue + } + err = d.deleteLocalTableFiles(entry.Name(), "") + if err != nil { + return err + } + continue + } + err = fn(entry.Name(), meta) + if err != nil { + return err + } + } + return nil +} + +func (d *db) prepareSnapshot(ctx context.Context, conn *sqlx.Conn, s *snapshot) error { + err := d.metaSem.Acquire(ctx, 1) + if err != nil { + return err + } + defer d.metaSem.Release(1) + + if s.ready { + _, err = conn.ExecContext(ctx, "USE "+schemaName(s.id)) + return err + } + + _, err = conn.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName(s.id)) + if err != nil { + return err + } + + _, err = conn.ExecContext(ctx, "USE "+schemaName(s.id)) + if err != nil { + return err + } + + err = d.attachTables(ctx, conn, s.tables, "") + if err != nil { + return err + } + s.ready = true + return nil +} + +func (d *db) removeSnapshot(ctx context.Context, id int) error { + err := d.metaSem.Acquire(ctx, 1) + if err != nil { + return err + } + defer d.metaSem.Release(1) + + _, err = d.dbHandle.Exec(fmt.Sprintf("DROP SCHEMA %s CASCADE", schemaName(id))) + return err +} + +type tableMeta struct { + Name string `json:"name"` + Version string `json:"version"` + CreatedVersion string `json:"created_version"` + Type string `json:"type"` // either TABLE or VIEW + SQL string `json:"sql"` // populated for views +} + +func renameTable(ctx context.Context, dbFile, old, newName string) error { + db, err := sql.Open("duckdb", dbFile) + if err != nil { + return err + } + defer db.Close() + + // TODO :: create temporary views when attaching tables to write connection to avoid left views in .db file + // In that case this will not be required. + _, err = db.ExecContext(ctx, fmt.Sprintf("DROP VIEW IF EXISTS %s", safeSQLName(newName))) + if err != nil { + return err + } + + _, err = db.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", safeSQLName(old), safeSQLName(newName))) + return err +} + +func newVersion() string { + return strconv.FormatInt(time.Now().UnixMilli(), 10) +} + +func dbName(table, version string) string { + return fmt.Sprintf("%s__%s__db", table, version) +} + +type settings struct { + MaxMemory string `mapstructure:"max_memory"` + Threads string `mapstructure:"threads"` + // Can be more settings +} + +// Regex to parse human-readable size returned by DuckDB +// nolint +var humanReadableSizeRegex = regexp.MustCompile(`^([\d.]+)\s*(\S+)$`) + +// Reversed logic of StringUtil::BytesToHumanReadableString +// see https://github.com/cran/duckdb/blob/master/src/duckdb/src/common/string_util.cpp#L157 +// Examples: 1 bytes, 2 bytes, 1KB, 1MB, 1TB, 1PB +// nolint +func humanReadableSizeToBytes(sizeStr string) (float64, error) { + var multiplier float64 + + match := humanReadableSizeRegex.FindStringSubmatch(sizeStr) + + if match == nil { + return 0, fmt.Errorf("invalid size format: '%s'", sizeStr) + } + + sizeFloat, err := strconv.ParseFloat(match[1], 64) + if err != nil { + return 0, err + } + + switch match[2] { + case "byte", "bytes": + multiplier = 1 + case "KB": + multiplier = 1000 + case "MB": + multiplier = 1000 * 1000 + case "GB": + multiplier = 1000 * 1000 * 1000 + case "TB": + multiplier = 1000 * 1000 * 1000 * 1000 + case "PB": + multiplier = 1000 * 1000 * 1000 * 1000 * 1000 + case "KiB": + multiplier = 1024 + case "MiB": + multiplier = 1024 * 1024 + case "GiB": + multiplier = 1024 * 1024 * 1024 + case "TiB": + multiplier = 1024 * 1024 * 1024 * 1024 + case "PiB": + multiplier = 1024 * 1024 * 1024 * 1024 * 1024 + default: + return 0, fmt.Errorf("unknown size unit '%s' in '%s'", match[2], sizeStr) + } + + return sizeFloat * multiplier, nil +} + +func schemaName(gen int) string { + return fmt.Sprintf("main_%v", gen) +} + +func generateDSN(path, encodedQuery string) string { + if encodedQuery == "" { + return path + } + return path + "?" + encodedQuery +} diff --git a/runtime/pkg/rduckdb/db_test.go b/runtime/pkg/rduckdb/db_test.go new file mode 100644 index 00000000000..ca712974ce8 --- /dev/null +++ b/runtime/pkg/rduckdb/db_test.go @@ -0,0 +1,532 @@ +package rduckdb + +import ( + "context" + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "testing" + + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/require" + "gocloud.dev/blob/fileblob" +) + +func TestDB(t *testing.T) { + db, _, _ := prepareDB(t) + ctx := context.Background() + // create table + err := db.CreateTableAsSelect(ctx, "test", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{}) + require.NoError(t, err) + + // query table + var ( + id int + country string + ) + conn, release, err := db.AcquireReadConnection(ctx) + require.NoError(t, err) + + conn.QueryRowContext(ctx, "SELECT id, country FROM test").Scan(&id, &country) + require.Equal(t, 1, id) + require.Equal(t, "India", country) + require.NoError(t, release()) + + // rename table + err = db.RenameTable(ctx, "test", "test2") + require.NoError(t, err) + + // drop old table + err = db.DropTable(ctx, "test") + require.Error(t, err) + + // insert into table + err = db.MutateTable(ctx, "test2", func(ctx context.Context, conn *sqlx.Conn) error { + _, err := conn.ExecContext(ctx, "INSERT INTO test2 (id, country) VALUES (2, 'USA')") + return err + }) + require.NoError(t, err) + + // query table + conn, release, err = db.AcquireReadConnection(ctx) + require.NoError(t, err) + err = conn.QueryRowxContext(ctx, "SELECT id, country FROM test2 where id = 2").Scan(&id, &country) + require.NoError(t, err) + require.Equal(t, 2, id) + require.Equal(t, "USA", country) + require.NoError(t, release()) + + // Add column + err = db.MutateTable(ctx, "test2", func(ctx context.Context, conn *sqlx.Conn) error { + _, err := conn.ExecContext(ctx, "ALTER TABLE test2 ADD COLUMN city TEXT") + return err + }) + require.NoError(t, err) + + // drop table + err = db.DropTable(ctx, "test2") + require.NoError(t, err) + require.NoError(t, db.Close()) +} + +func TestCreateTable(t *testing.T) { + db, _, _ := prepareDB(t) + ctx := context.Background() + err := db.CreateTableAsSelect(ctx, "test", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}}) + + // replace table + err = db.CreateTableAsSelect(ctx, "test", "SELECT 2 AS id, 'USA' AS country", &CreateTableOptions{}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test", []testData{{ID: 2, Country: "USA"}}) + + // create another table that ingests from first table + err = db.CreateTableAsSelect(ctx, "test2", "SELECT * FROM test", &CreateTableOptions{}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test2", []testData{{ID: 2, Country: "USA"}}) + + // create view + err = db.CreateTableAsSelect(ctx, "test_view", "SELECT * FROM test", &CreateTableOptions{View: true}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test_view", []testData{{ID: 2, Country: "USA"}}) + + // view on top of view + err = db.CreateTableAsSelect(ctx, "pest_view", "SELECT * FROM test_view", &CreateTableOptions{View: true}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM pest_view", []testData{{ID: 2, Country: "USA"}}) + + // replace underlying table + err = db.CreateTableAsSelect(ctx, "test", "SELECT 3 AS id, 'UK' AS country", &CreateTableOptions{}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test", []testData{{ID: 3, Country: "UK"}}) + + // view should reflect the change + verifyTable(t, db, "SELECT id, country FROM test_view", []testData{{ID: 3, Country: "UK"}}) + + // create table that was previously view + err = db.CreateTableAsSelect(ctx, "test_view", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test_view", []testData{{ID: 1, Country: "India"}}) + + // create view that was previously table + err = db.CreateTableAsSelect(ctx, "test", "SELECT * FROM test_view", &CreateTableOptions{View: true}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}}) + require.NoError(t, db.Close()) +} + +func TestDropTable(t *testing.T) { + db, _, _ := prepareDB(t) + ctx := context.Background() + + // create table + err := db.CreateTableAsSelect(ctx, "test", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}}) + + // create view + err = db.CreateTableAsSelect(ctx, "test_view", "SELECT * FROM test", &CreateTableOptions{View: true}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test_view", []testData{{ID: 1, Country: "India"}}) + + // drop view + err = db.DropTable(ctx, "test_view") + require.NoError(t, err) + + // verify table data is still there + verifyTable(t, db, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}}) + + // drop table + err = db.DropTable(ctx, "test") + require.NoError(t, err) + require.NoError(t, db.Close()) +} + +func TestMutateTable(t *testing.T) { + db, _, _ := prepareDB(t) + ctx := context.Background() + + // create table + err := db.CreateTableAsSelect(ctx, "test", "SELECT 1 AS id, 'Delhi' AS city", &CreateTableOptions{}) + require.NoError(t, err) + + // create dependent view + err = db.CreateTableAsSelect(ctx, "test_view", "SELECT * FROM test", &CreateTableOptions{View: true}) + require.NoError(t, err) + + // insert into table + err = db.MutateTable(ctx, "test", func(ctx context.Context, conn *sqlx.Conn) error { + _, err := conn.ExecContext(ctx, "INSERT INTO test (id, city) VALUES (2, 'NY')") + return err + }) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, city FROM test", []testData{{ID: 1, City: "Delhi"}, {ID: 2, City: "NY"}}) + + // add column and update existing entries in parallel query existing table + alterDone := make(chan struct{}) + queryDone := make(chan struct{}) + testDone := make(chan struct{}) + + go func() { + db.MutateTable(ctx, "test", func(ctx context.Context, conn *sqlx.Conn) error { + _, err := conn.ExecContext(ctx, "ALTER TABLE test ADD COLUMN country TEXT") + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "UPDATE test SET country = 'USA' WHERE id = 2") + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "UPDATE test SET country = 'India' WHERE id = 1") + require.NoError(t, err) + + close(alterDone) + <-queryDone + return nil + }) + close(testDone) + }() + + go func() { + <-alterDone + verifyTable(t, db, "SELECT * FROM test", []testData{{ID: 1, City: "Delhi"}, {ID: 2, City: "NY"}}) + close(queryDone) + }() + + <-testDone + verifyTable(t, db, "SELECT * FROM test", []testData{{ID: 1, City: "Delhi", Country: "India"}, {ID: 2, City: "NY", Country: "USA"}}) + require.NoError(t, db.Close()) +} + +func TestResetLocal(t *testing.T) { + db, localDir, remoteDir := prepareDB(t) + ctx := context.Background() + + // create table + err := db.CreateTableAsSelect(ctx, "test", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}}) + + // reset local + require.NoError(t, db.Close()) + require.NoError(t, os.RemoveAll(localDir)) + + logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + bucket, err := fileblob.OpenBucket(remoteDir, nil) + require.NoError(t, err) + db, err = NewDB(ctx, &DBOptions{ + LocalPath: localDir, + Remote: bucket, + ReadSettings: map[string]string{"memory_limit": "2GB", "threads": "1"}, + WriteSettings: map[string]string{"memory_limit": "2GB", "threads": "1"}, + InitQueries: []string{"SET autoinstall_known_extensions=true", "SET autoload_known_extensions=true"}, + Logger: logger, + }) + require.NoError(t, err) + + // acquire connection + conn, release, err := db.AcquireReadConnection(ctx) + require.NoError(t, err) + + // drop table + err = db.DropTable(ctx, "test") + require.NoError(t, err) + + // verify table is still accessible + verifyTableForConn(t, conn, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}}) + require.NoError(t, release()) + + // verify table is now dropped + err = db.DropTable(ctx, "test") + require.ErrorContains(t, err, "not found") + + require.NoError(t, db.Close()) +} + +func TestResetSelectiveLocal(t *testing.T) { + db, localDir, remoteDir := prepareDB(t) + ctx := context.Background() + + // create table + err := db.CreateTableAsSelect(ctx, "test", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{}) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}}) + + // create two views on this + err = db.CreateTableAsSelect(ctx, "test_view", "SELECT * FROM test", &CreateTableOptions{View: true}) + require.NoError(t, err) + err = db.CreateTableAsSelect(ctx, "test_view2", "SELECT * FROM test", &CreateTableOptions{View: true}) + require.NoError(t, err) + + // create another table + err = db.CreateTableAsSelect(ctx, "test2", "SELECT 2 AS id, 'USA' AS country", &CreateTableOptions{}) + require.NoError(t, err) + + // create views on this + err = db.CreateTableAsSelect(ctx, "test2_view", "SELECT * FROM test2", &CreateTableOptions{View: true}) + require.NoError(t, err) + + // reset local for some tables + require.NoError(t, db.Close()) + require.NoError(t, os.RemoveAll(filepath.Join(localDir, "test2"))) + require.NoError(t, os.RemoveAll(filepath.Join(localDir, "test_view2"))) + + logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + bucket, err := fileblob.OpenBucket(remoteDir, nil) + require.NoError(t, err) + db, err = NewDB(ctx, &DBOptions{ + LocalPath: localDir, + Remote: bucket, + ReadSettings: map[string]string{"memory_limit": "2GB", "threads": "1"}, + WriteSettings: map[string]string{"memory_limit": "2GB", "threads": "1"}, + InitQueries: []string{"SET autoinstall_known_extensions=true", "SET autoload_known_extensions=true"}, + Logger: logger, + }) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test2_view", []testData{{ID: 2, Country: "USA"}}) + verifyTable(t, db, "SELECT id, country FROM test_view2", []testData{{ID: 1, Country: "India"}}) + require.NoError(t, db.Close()) +} + +func TestResetTablesRemote(t *testing.T) { + db, localDir, remoteDir := prepareDB(t) + ctx := context.Background() + + // create table + err := db.CreateTableAsSelect(ctx, "test", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{}) + require.NoError(t, err) + + require.NoError(t, db.Close()) + + // remove remote data + require.NoError(t, os.RemoveAll(remoteDir)) + + logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + bucket, err := fileblob.OpenBucket(remoteDir, &fileblob.Options{CreateDir: true}) + require.NoError(t, err) + db, err = NewDB(ctx, &DBOptions{ + LocalPath: localDir, + Remote: bucket, + ReadSettings: map[string]string{"memory_limit": "2GB", "threads": "1"}, + WriteSettings: map[string]string{"memory_limit": "2GB", "threads": "1"}, + InitQueries: []string{"SET autoinstall_known_extensions=true", "SET autoload_known_extensions=true"}, + Logger: logger, + }) + require.NoError(t, err) + require.ErrorContains(t, db.DropTable(ctx, "test"), "not found") + require.NoError(t, db.Close()) +} + +func TestResetSelectiveTablesRemote(t *testing.T) { + db, localDir, remoteDir := prepareDB(t) + ctx := context.Background() + + // create table + err := db.CreateTableAsSelect(ctx, "test", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{}) + require.NoError(t, err) + + // create two views on this + err = db.CreateTableAsSelect(ctx, "test_view", "SELECT * FROM test", &CreateTableOptions{View: true}) + require.NoError(t, err) + err = db.CreateTableAsSelect(ctx, "test_view2", "SELECT * FROM test", &CreateTableOptions{View: true}) + require.NoError(t, err) + + // create another table + err = db.CreateTableAsSelect(ctx, "test2", "SELECT 2 AS id, 'USA' AS country", &CreateTableOptions{}) + require.NoError(t, err) + + // create views on this + err = db.CreateTableAsSelect(ctx, "test2_view", "SELECT * FROM test2", &CreateTableOptions{View: true}) + require.NoError(t, err) + + require.NoError(t, db.Close()) + + // remove remote data for some tables + require.NoError(t, os.RemoveAll(filepath.Join(remoteDir, "test2"))) + require.NoError(t, os.RemoveAll(filepath.Join(remoteDir, "test_view2"))) + + logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + bucket, err := fileblob.OpenBucket(remoteDir, nil) + require.NoError(t, err) + db, err = NewDB(ctx, &DBOptions{ + LocalPath: localDir, + Remote: bucket, + ReadSettings: map[string]string{"memory_limit": "2GB", "threads": "1"}, + WriteSettings: map[string]string{"memory_limit": "2GB", "threads": "1"}, + InitQueries: []string{"SET autoinstall_known_extensions=true", "SET autoload_known_extensions=true"}, + Logger: logger, + }) + require.NoError(t, err) + verifyTable(t, db, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}}) + verifyTable(t, db, "SELECT id, country FROM test_view", []testData{{ID: 1, Country: "India"}}) + require.NoError(t, db.Close()) +} + +func TestConcurrentReads(t *testing.T) { + testDB, _, _ := prepareDB(t) + ctx := context.Background() + + // create table + err := testDB.CreateTableAsSelect(ctx, "pest", "SELECT 2 AS id, 'USA' AS country", &CreateTableOptions{}) + require.NoError(t, err) + + // create test table + err = testDB.CreateTableAsSelect(ctx, "test", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{}) + require.NoError(t, err) + + // acquire connection + conn1, release1, err1 := testDB.AcquireReadConnection(ctx) + require.NoError(t, err1) + + // replace with a view + err = testDB.CreateTableAsSelect(ctx, "test", "SELECT * FROM pest", &CreateTableOptions{View: true}) + require.NoError(t, err) + + // acquire connection + conn2, release2, err2 := testDB.AcquireReadConnection(ctx) + require.NoError(t, err2) + + // drop table + err = testDB.DropTable(ctx, "test") + + // verify both tables are still accessible + verifyTableForConn(t, conn1, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}}) + require.NoError(t, release1()) + verifyTableForConn(t, conn2, "SELECT id, country FROM test", []testData{{ID: 2, Country: "USA"}}) + require.NoError(t, release2()) + + // acquire connection to see that table is now dropped + conn3, release3, err3 := testDB.AcquireReadConnection(ctx) + require.NoError(t, err3) + var id int + var country string + err = conn3.QueryRowContext(ctx, "SELECT id, country FROM test").Scan(&id, &country) + require.Error(t, err) + require.NoError(t, release3()) +} + +func TestInconsistentSchema(t *testing.T) { + testDB, _, _ := prepareDB(t) + ctx := context.Background() + + // create table + err := testDB.CreateTableAsSelect(ctx, "test", "SELECT 2 AS id, 'USA' AS country", &CreateTableOptions{}) + require.NoError(t, err) + + // create view + err = testDB.CreateTableAsSelect(ctx, "test_view", "SELECT id, country FROM test", &CreateTableOptions{View: true}) + require.NoError(t, err) + verifyTable(t, testDB, "SELECT * FROM test_view", []testData{{ID: 2, Country: "USA"}}) + + // replace underlying table + err = testDB.CreateTableAsSelect(ctx, "test", "SELECT 20 AS id, 'USB' AS city", &CreateTableOptions{}) + require.NoError(t, err) + + conn, release, err := testDB.AcquireReadConnection(ctx) + require.NoError(t, err) + defer release() + + var ( + id int + country string + ) + err = conn.QueryRowxContext(ctx, "SELECT * FROM test_view").Scan(&id, &country) + require.Error(t, err) + + // but querying from table should work + err = conn.QueryRowxContext(ctx, "SELECT * FROM test").Scan(&id, &country) + require.NoError(t, err) + require.Equal(t, 20, id) + require.Equal(t, "USB", country) +} + +func TestViews(t *testing.T) { + testDB, _, _ := prepareDB(t) + ctx := context.Background() + + // create view + err := testDB.CreateTableAsSelect(ctx, "parent_view", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{View: true}) + require.NoError(t, err) + + // create dependent view + err = testDB.CreateTableAsSelect(ctx, "child_view", "SELECT * FROM parent_view", &CreateTableOptions{View: true}) + require.NoError(t, err) + verifyTable(t, testDB, "SELECT id, country FROM child_view", []testData{{ID: 1, Country: "India"}}) + + // replace parent view + err = testDB.CreateTableAsSelect(ctx, "parent_view", "SELECT 2 AS id, 'USA' AS country", &CreateTableOptions{View: true}) + require.NoError(t, err) + verifyTable(t, testDB, "SELECT id, country FROM child_view", []testData{{ID: 2, Country: "USA"}}) + + // rename child view + err = testDB.RenameTable(ctx, "child_view", "view0") + require.NoError(t, err) + verifyTable(t, testDB, "SELECT id, country FROM view0", []testData{{ID: 2, Country: "USA"}}) + + // old child view does not exist + err = testDB.DropTable(ctx, "child_view") + require.Error(t, err) + + // create a chain of views + for i := 1; i <= 10; i++ { + err = testDB.CreateTableAsSelect(ctx, fmt.Sprintf("view%d", i), fmt.Sprintf("SELECT * FROM view%d", i-1), &CreateTableOptions{View: true}) + require.NoError(t, err) + } + verifyTable(t, testDB, "SELECT id, country FROM view10", []testData{{ID: 2, Country: "USA"}}) + + require.NoError(t, testDB.Close()) +} + +func prepareDB(t *testing.T) (db DB, localDir, remoteDir string) { + localDir = t.TempDir() + ctx := context.Background() + logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + remoteDir = t.TempDir() + bucket, err := fileblob.OpenBucket(remoteDir, nil) + require.NoError(t, err) + db, err = NewDB(ctx, &DBOptions{ + LocalPath: localDir, + Remote: bucket, + ReadSettings: map[string]string{"memory_limit": "2GB", "threads": "1"}, + WriteSettings: map[string]string{"memory_limit": "2GB", "threads": "1"}, + InitQueries: []string{"SET autoinstall_known_extensions=true", "SET autoload_known_extensions=true"}, + Logger: logger, + }) + require.NoError(t, err) + return +} + +func verifyTable(t *testing.T, db DB, query string, data []testData) { + ctx := context.Background() + conn, release, err := db.AcquireReadConnection(ctx) + require.NoError(t, err) + defer release() + + var scannedData []testData + err = conn.SelectContext(ctx, &scannedData, query) + require.NoError(t, err) + require.Equal(t, data, scannedData) +} + +func verifyTableForConn(t *testing.T, conn *sqlx.Conn, query string, data []testData) { + ctx := context.Background() + var scannedData []testData + err := conn.SelectContext(ctx, &scannedData, query) + require.NoError(t, err) + require.Equal(t, data, scannedData) +} + +type testData struct { + ID int `db:"id"` + Country string `db:"country"` + City string `db:"city"` +} diff --git a/runtime/pkg/rduckdb/io.go b/runtime/pkg/rduckdb/io.go new file mode 100644 index 00000000000..eb2437f6161 --- /dev/null +++ b/runtime/pkg/rduckdb/io.go @@ -0,0 +1,75 @@ +package rduckdb + +import ( + "io" + "os" + "path/filepath" +) + +// copyDir copies a directory from source to destination +// It recursively copies all the contents of the source directory to the destination directory. +// Files with the same name in the destination directory will be overwritten. +func copyDir(dst, src string) error { + // Create the destination directory + err := os.MkdirAll(dst, os.ModePerm) + if err != nil { + return err + } + // Read the contents of the source directory + entries, err := os.ReadDir(src) + if err != nil { + return err + } + + // Copy the contents of the source directory + for _, entry := range entries { + srcPath := filepath.Join(src, entry.Name()) + dstPath := filepath.Join(dst, entry.Name()) + + if entry.IsDir() { + err = copyDir(dstPath, srcPath) + if err != nil { + return err + } + } else { + err = copyFile(dstPath, srcPath) + if err != nil { + return err + } + } + } + return nil +} + +func copyFile(dst, src string) error { + // Open the source file + srcFile, err := os.Open(src) + if err != nil { + return err + } + defer srcFile.Close() + + // Create the destination file + dstFile, err := os.Create(dst) + if err != nil { + return err + } + defer dstFile.Close() + + // Copy the content from source to destination + _, err = io.Copy(dstFile, srcFile) + if err != nil { + return err + } + return nil +} + +func fileSize(paths []string) int64 { + var size int64 + for _, path := range paths { + if info, err := os.Stat(path); err == nil { // ignoring error since only error possible is *PathError + size += info.Size() + } + } + return size +} diff --git a/runtime/pkg/rduckdb/io_test.go b/runtime/pkg/rduckdb/io_test.go new file mode 100644 index 00000000000..b54447ceb70 --- /dev/null +++ b/runtime/pkg/rduckdb/io_test.go @@ -0,0 +1,78 @@ +package rduckdb + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCopyDirEmptyDir(t *testing.T) { + src := t.TempDir() + dest := t.TempDir() + err := os.RemoveAll(dest) + require.NoError(t, err) + require.NoDirExists(t, dest) + + err = copyDir(dest, src) + require.NoError(t, err) + + require.DirExists(t, dest) + require.DirExists(t, src) +} + +func TestCopyDirEmptyNestedDir(t *testing.T) { + src := t.TempDir() + dest := t.TempDir() + err := os.RemoveAll(dest) + require.NoError(t, err) + require.NoDirExists(t, dest) + + err = os.MkdirAll(filepath.Join(src, "nested1", "nested"), os.ModePerm) + require.NoError(t, err) + + err = os.MkdirAll(filepath.Join(src, "nested2"), os.ModePerm) + require.NoError(t, err) + + err = copyDir(dest, src) + require.NoError(t, err) + + require.DirExists(t, dest) + require.DirExists(t, filepath.Join(dest, "nested1")) + require.DirExists(t, filepath.Join(dest, "nested2")) + require.DirExists(t, filepath.Join(dest, "nested1", "nested")) +} + +func TestCopyDirWithFile(t *testing.T) { + src := t.TempDir() + dest := t.TempDir() + require.NoError(t, os.Mkdir(filepath.Join(dest, "existing"), os.ModePerm)) + + err := os.MkdirAll(filepath.Join(src, "nested1", "nested"), os.ModePerm) + require.NoError(t, err) + + require.NoError(t, os.WriteFile(filepath.Join(src, "nested1", "file.txt"), []byte("nested1"), os.ModePerm)) + require.NoError(t, os.WriteFile(filepath.Join(src, "nested1", "nested", "file.txt"), []byte("nested1-nested"), os.ModePerm)) + + err = os.MkdirAll(filepath.Join(src, "nested2"), os.ModePerm) + require.NoError(t, os.WriteFile(filepath.Join(src, "nested2", "file.txt"), []byte("nested2"), os.ModePerm)) + require.NoError(t, err) + + err = copyDir(dest, src) + require.NoError(t, err) + + contents, err := os.ReadFile(filepath.Join(dest, "nested1", "file.txt")) + require.NoError(t, err) + require.Equal(t, "nested1", string(contents)) + + contents, err = os.ReadFile(filepath.Join(dest, "nested1", "nested", "file.txt")) + require.NoError(t, err) + require.Equal(t, "nested1-nested", string(contents)) + + contents, err = os.ReadFile(filepath.Join(dest, "nested2", "file.txt")) + require.NoError(t, err) + require.Equal(t, "nested2", string(contents)) + + require.DirExists(t, filepath.Join(dest, "existing")) +} diff --git a/runtime/pkg/rduckdb/remote.go b/runtime/pkg/rduckdb/remote.go new file mode 100644 index 00000000000..a4bf9eaf659 --- /dev/null +++ b/runtime/pkg/rduckdb/remote.go @@ -0,0 +1,311 @@ +package rduckdb + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "os" + "path" + "path/filepath" + "strings" + "time" + + "gocloud.dev/blob" + "gocloud.dev/gcerrors" + "golang.org/x/sync/errgroup" +) + +// pullFromRemote updates local data with the latest data from remote. +// This is not safe for concurrent calls. +func (d *db) pullFromRemote(ctx context.Context, updateCatalog bool) error { + if !d.localDirty { + // optimisation to skip sync if write was already synced + return nil + } + d.logger.Debug("syncing from remote") + // Create an errgroup for background downloads with limited concurrency. + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(8) + + objects := d.remote.List(&blob.ListOptions{ + Delimiter: "/", // only list directories with a trailing slash and IsDir set to true + }) + + remoteTables := make(map[string]*tableMeta) + for { + // Stop the loop if the ctx was cancelled + var stop bool + select { + case <-gctx.Done(): + stop = true + default: + // don't break + } + if stop { + break // can't use break inside the select + } + + obj, err := objects.Next(gctx) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + if !obj.IsDir { + continue + } + + table := strings.TrimSuffix(obj.Key, "/") + d.logger.Debug("SyncWithObjectStorage: discovered table", slog.String("table", table)) + + // get version of the table + var b []byte + err = retry(gctx, func() error { + res, err := d.remote.ReadAll(gctx, path.Join(table, "meta.json")) + if err != nil { + return err + } + b = res + return nil + }) + if err != nil { + if gcerrors.Code(err) == gcerrors.NotFound { + // invalid table directory + d.logger.Debug("SyncWithObjectStorage: invalid table directory", slog.String("table", table)) + continue + } + return err + } + remoteMeta := &tableMeta{} + err = json.Unmarshal(b, remoteMeta) + if err != nil { + d.logger.Debug("SyncWithObjectStorage: failed to unmarshal table metadata", slog.String("table", table), slog.Any("error", err)) + continue + } + remoteTables[table] = remoteMeta + + // check if table is locally present + meta, _ := d.tableMeta(table) + if meta != nil && meta.Version == remoteMeta.Version { + d.logger.Debug("SyncWithObjectStorage: local table is not present in catalog", slog.String("table", table)) + continue + } + if err := d.initLocalTable(table, remoteMeta.Version); err != nil { + return err + } + + tblIter := d.remote.List(&blob.ListOptions{Prefix: path.Join(table, remoteMeta.Version)}) + // download all objects in the table and current version + for { + obj, err := tblIter.Next(gctx) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + g.Go(func() error { + return retry(gctx, func() error { + file, err := os.Create(filepath.Join(d.localPath, obj.Key)) + if err != nil { + return err + } + defer file.Close() + + rdr, err := d.remote.NewReader(gctx, obj.Key, nil) + if err != nil { + return err + } + defer rdr.Close() + + _, err = io.Copy(file, rdr) + return err + }) + }) + } + } + + // Wait for all outstanding downloads to complete + err := g.Wait() + if err != nil { + return err + } + + // Update table versions(updates even if local is same as remote) + for table, meta := range remoteTables { + err = d.writeTableMeta(table, meta) + if err != nil { + return err + } + } + + if !updateCatalog { + // delete all local tables which are not present in remote + _ = d.iterateLocalTables(true, func(name string, meta *tableMeta) error { + if _, ok := remoteTables[name]; !ok { + return d.deleteLocalTableFiles(name, "") + } + return nil + }) + return nil + } + + // iterate over all remote tables and update catalog + for table, remoteMeta := range remoteTables { + meta, err := d.catalog.tableMeta(table) + if err != nil { + if errors.Is(err, errNotFound) { + // table not found in catalog + d.catalog.addTableVersion(table, remoteMeta) + } + return err + } + // table is present in catalog but has version mismatch + if meta.Version != remoteMeta.Version { + d.catalog.addTableVersion(table, remoteMeta) + } + } + + // iterate over local entries and remove if not present in remote + _ = d.iterateLocalTables(false, func(name string, meta *tableMeta) error { + if _, ok := remoteTables[name]; ok { + // table is present in remote + return nil + } + // check if table is present in catalog + _, err := d.catalog.tableMeta(name) + if err != nil { + return d.deleteLocalTableFiles(name, "") + } + // remove table from catalog + d.catalog.removeTable(name) + return nil + }) + return nil +} + +// pushToRemote syncs the remote location with the local path for given table. +// If oldVersion is specified, it is deleted after successful sync. +func (d *db) pushToRemote(ctx context.Context, table string, oldMeta, meta *tableMeta) error { + if meta.Type == "TABLE" { + localPath := d.localTableDir(table, meta.Version) + entries, err := os.ReadDir(localPath) + if err != nil { + return err + } + + for _, entry := range entries { + d.logger.Debug("replicating file", slog.String("file", entry.Name()), slog.String("path", localPath)) + // no directory should exist as of now + if entry.IsDir() { + d.logger.Debug("found directory in path which should not exist", slog.String("file", entry.Name()), slog.String("path", localPath)) + continue + } + + wr, err := os.Open(filepath.Join(localPath, entry.Name())) + if err != nil { + return err + } + + // upload to cloud storage + err = retry(ctx, func() error { + return d.remote.Upload(ctx, path.Join(table, meta.Version, entry.Name()), wr, &blob.WriterOptions{ + ContentType: "application/octet-stream", + }) + }) + _ = wr.Close() + if err != nil { + return err + } + } + } + + // update table meta + // todo :: also use etag to avoid concurrent writer conflicts + d.localDirty = true + m, err := json.Marshal(meta) + if err != nil { + return fmt.Errorf("failed to marshal table metadata: %w", err) + } + err = retry(ctx, func() error { + return d.remote.WriteAll(ctx, path.Join(table, "meta.json"), m, nil) + }) + if err != nil { + d.logger.Error("failed to update meta.json in remote", slog.String("table", table), slog.Any("error", err)) + } + + // success -- remove old version + if oldMeta != nil { + _ = d.deleteRemote(ctx, table, oldMeta.Version) + } + return err +} + +// deleteRemote deletes remote. +// If table is specified, only that table is deleted. +// If table and version is specified, only that version of the table is deleted. +func (d *db) deleteRemote(ctx context.Context, table, version string) error { + if table == "" && version != "" { + return fmt.Errorf("table must be specified if version is specified") + } + var prefix string + if table != "" { + if version != "" { + prefix = path.Join(table, version) + "/" + } else { + prefix = table + "/" + // delete meta.json first + err := retry(ctx, func() error { return d.remote.Delete(ctx, "meta.json") }) + if err != nil && gcerrors.Code(err) != gcerrors.NotFound { + d.logger.Error("failed to delete meta.json in remote", slog.String("table", table), slog.Any("error", err)) + return err + } + } + } + // ignore errors since meta.json is already removed + + iter := d.remote.List(&blob.ListOptions{Prefix: prefix}) + for { + obj, err := iter.Next(ctx) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + d.logger.Debug("failed to list object", slog.String("table", table), slog.Any("error", err)) + } + err = retry(ctx, func() error { return d.remote.Delete(ctx, obj.Key) }) + if err != nil { + d.logger.Debug("failed to delete object", slog.String("table", table), slog.String("object", obj.Key), slog.Any("error", err)) + } + } + return nil +} + +func retry(ctx context.Context, fn func() error) error { + var err error + for i := 0; i < _maxRetries; i++ { + err = fn() + if err == nil { + return nil // success + } + if !strings.Contains(err.Error(), "stream error: stream ID") { + break // break and return error + } + + select { + case <-ctx.Done(): + return ctx.Err() // return on context cancellation + case <-time.After(_retryDelay): + } + } + return err +} + +const ( + _maxRetries = 5 + _retryDelay = 10 * time.Second +) diff --git a/runtime/pkg/rduckdb/sqlutil.go b/runtime/pkg/rduckdb/sqlutil.go new file mode 100644 index 00000000000..28710d676be --- /dev/null +++ b/runtime/pkg/rduckdb/sqlutil.go @@ -0,0 +1,17 @@ +package rduckdb + +import ( + "fmt" + "strings" +) + +func safeSQLString(s string) string { + return fmt.Sprintf("'%s'", strings.ReplaceAll(s, "'", "''")) +} + +func safeSQLName(ident string) string { + if ident == "" { + return ident + } + return fmt.Sprintf("\"%s\"", strings.ReplaceAll(ident, "\"", "\"\"")) // nolint:gocritic // Because SQL escaping is different +}