Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DuckDB with backup on GCS #6006

Merged
merged 85 commits into from
Dec 13, 2024
Merged
Changes from 10 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
41fbea9
initial commit
k-anshul Oct 29, 2024
854a666
use latest replicator version
k-anshul Oct 30, 2024
c86ed09
transporter changes
k-anshul Nov 1, 2024
c7fd731
fixed transporters
k-anshul Nov 1, 2024
c411c9f
set backup directory
k-anshul Nov 4, 2024
9330a7a
test fixes
k-anshul Nov 4, 2024
a1d4a10
lint fixes
k-anshul Nov 5, 2024
b27121c
postgres tests fix
k-anshul Nov 5, 2024
089d417
self review
k-anshul Nov 5, 2024
1a4c2b2
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Nov 6, 2024
279d207
Import
begelundmuller Nov 8, 2024
05a0603
Remove go.mod
begelundmuller Nov 8, 2024
13653fd
use single local directory
k-anshul Nov 11, 2024
e9a8c6c
use metadata.json for each table
k-anshul Nov 15, 2024
0acae1c
use semaphore instead of mutex for write locks
k-anshul Nov 18, 2024
9a6f9e6
local db monitor
k-anshul Nov 18, 2024
eac6d1b
small fixes
k-anshul Nov 18, 2024
6bd2177
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Nov 18, 2024
ad95df0
add data bucket
k-anshul Nov 19, 2024
11203b4
fix snowflake
k-anshul Nov 19, 2024
09424ba
non blocking read handle updates
k-anshul Nov 20, 2024
3b0eee7
use tableMeta plus minor fix
k-anshul Nov 20, 2024
50660ce
small cleanups
k-anshul Nov 21, 2024
365f484
use a catalog to manage table lifecyle
k-anshul Nov 25, 2024
2c3b485
use catalog to check if table exists
k-anshul Nov 25, 2024
c7ebb0b
Merge remote-tracking branch 'origin/main' into begelundmuller/import…
k-anshul Nov 25, 2024
c369e09
add concurrent access unit tests
k-anshul Nov 25, 2024
1f59235
minor tweaks
k-anshul Nov 26, 2024
a390746
data bucket for persisting data to gcs
k-anshul Nov 26, 2024
5325252
test fix
k-anshul Nov 26, 2024
953247d
also prefix with driver
k-anshul Nov 26, 2024
b10cfab
merge with main
k-anshul Nov 26, 2024
5585e3b
close bucket plus directory prefix
k-anshul Nov 26, 2024
d3bfbb6
bucket is closed when prefixed so need to open new data bucket all times
k-anshul Nov 26, 2024
3d5fe90
Merge remote-tracking branch 'origin/main' into begelundmuller/import…
k-anshul Nov 27, 2024
e5094c4
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Nov 27, 2024
f51a2b1
Update runtime/pkg/rduckdb/db.go
k-anshul Nov 28, 2024
5732e6e
remove ctx cancellation from catalog
k-anshul Nov 29, 2024
154ed02
close fix
k-anshul Nov 29, 2024
e3ccead
small renames
k-anshul Nov 29, 2024
86aaf2a
view fix
k-anshul Nov 29, 2024
5fcc934
add a storage client and remove preset data_dir
k-anshul Dec 2, 2024
539b481
lint fixes
k-anshul Dec 2, 2024
3f86e55
small refactor
k-anshul Dec 2, 2024
1907725
Apply suggestions from code review
k-anshul Dec 2, 2024
ea8040c
name in connection cache
k-anshul Dec 2, 2024
e4b216f
fix build errors
k-anshul Dec 2, 2024
2566537
transporters fixed
k-anshul Dec 2, 2024
bbc4625
Merge branch 'data_bucket' into duckdb_gcs
k-anshul Dec 2, 2024
214242a
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 2, 2024
f1caa8f
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Dec 2, 2024
2ec4780
rename fix
k-anshul Dec 3, 2024
dabed2c
Merge remote-tracking branch 'origin/main' into begelundmuller/import…
k-anshul Dec 3, 2024
ebfd0ba
dsn fix
k-anshul Dec 3, 2024
f2631a2
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Dec 3, 2024
e6d30cd
write should acquire snapshot
k-anshul Dec 3, 2024
e90409c
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Dec 3, 2024
b462a96
missing withprefix
k-anshul Dec 3, 2024
ccc5729
storage APIs also create directories
k-anshul Dec 3, 2024
84fcf1c
Merge remote-tracking branch 'origin/main' into data_bucket
k-anshul Dec 3, 2024
84b4f59
fix and add unit test
k-anshul Dec 3, 2024
590dee8
pullFromRemote fix and other review comments
k-anshul Dec 4, 2024
385652f
some more tests
k-anshul Dec 4, 2024
ed7fd81
remove invalid tables
k-anshul Dec 4, 2024
952651a
use unique directory in temp directory
k-anshul Dec 4, 2024
d46a38f
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Dec 4, 2024
754c3ba
Merge branch 'data_bucket' into duckdb_gcs
k-anshul Dec 4, 2024
788a459
interim commit
k-anshul Dec 4, 2024
6befdc8
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 4, 2024
43e4b4f
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 4, 2024
c601937
remove isview param
k-anshul Dec 5, 2024
d863e2c
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 5, 2024
c609613
fix some more tests
k-anshul Dec 5, 2024
5b4148b
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 5, 2024
770f759
reopen db in a separate goroutine
k-anshul Dec 6, 2024
da86f4f
fix more tests
k-anshul Dec 7, 2024
2af27c0
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 7, 2024
8842b70
more cleanups
k-anshul Dec 9, 2024
5e1b2cb
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 9, 2024
1f1ddfa
small background task fix
k-anshul Dec 9, 2024
584bfba
add backward compatibility
k-anshul Dec 10, 2024
f44cbcd
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 10, 2024
058c21a
review comments
k-anshul Dec 13, 2024
56f4e0b
custom temp and secret directory
k-anshul Dec 13, 2024
f45c379
remove custom temp directory
k-anshul Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 23 additions & 31 deletions runtime/drivers/duckdb/config.go
Original file line number Diff line number Diff line change
@@ -14,8 +14,7 @@ const (

// config represents the DuckDB driver config
type config struct {
// DataDir is the path to directory where duckdb file named `main.db` will be created. In case of external table storage all the files will also be present in DataDir's subdirectories.
// If path is set then DataDir is ignored.
// DataDir is the path to directory where duckdb files will be created.
DataDir string `mapstructure:"data_dir"`
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
// PoolSize is the number of concurrent connections and queries allowed
PoolSize int `mapstructure:"pool_size"`
@@ -35,9 +34,6 @@ type config struct {
InitSQL string `mapstructure:"init_sql"`
// LogQueries controls whether to log the raw SQL passed to OLAP.Execute. (Internal queries will not be logged.)
LogQueries bool `mapstructure:"log_queries"`

ReadSettings map[string]string `mapstructure:"-"`
WriteSettings map[string]string `mapstructure:"-"`
}

func newConfig(cfgMap map[string]any) (*config, error) {
@@ -47,27 +43,9 @@ func newConfig(cfgMap map[string]any) (*config, error) {
return nil, fmt.Errorf("could not decode config: %w", err)
}

// Set memory limit
cfg.ReadSettings = make(map[string]string)
cfg.WriteSettings = make(map[string]string)
if cfg.MemoryLimitGB > 0 {
cfg.ReadSettings["max_memory"] = fmt.Sprintf("%dGB", cfg.MemoryLimitGB)
}
if cfg.MemoryLimitGBWrite > 0 {
cfg.WriteSettings["max_memory"] = fmt.Sprintf("%dGB", cfg.MemoryLimitGB)
}

// Set threads limit
var threads int
if cfg.CPU > 0 {
cfg.ReadSettings["threads"] = strconv.Itoa(cfg.CPU)
}
if cfg.CPUWrite > 0 {
cfg.WriteSettings["threads"] = strconv.Itoa(cfg.CPUWrite)
}

// Set pool size
poolSize := cfg.PoolSize
threads := cfg.CPU
if poolSize == 0 && threads != 0 {
poolSize = threads
if cfg.CPU != 0 && cfg.CPU < poolSize {
@@ -77,15 +55,29 @@ func newConfig(cfgMap map[string]any) (*config, error) {
}
poolSize = max(poolSizeMin, poolSize) // Always enforce min pool size
cfg.PoolSize = poolSize

// useful for motherduck but safe to pass at initial connect
cfg.WriteSettings["custom_user_agent"] = "rill"
return cfg, nil
}

func generateDSN(path, encodedQuery string) string {
if encodedQuery == "" {
return path
func (c *config) readSettings() map[string]string {
readSettings := make(map[string]string)
if c.MemoryLimitGB > 0 {
readSettings["max_memory"] = fmt.Sprintf("%dGB", c.MemoryLimitGB)
}
if c.CPU > 0 {
readSettings["threads"] = strconv.Itoa(c.CPU)
}
return path + "?" + encodedQuery
return readSettings
}

func (c *config) writeSettings() map[string]string {
writeSettings := make(map[string]string)
if c.MemoryLimitGBWrite > 0 {
writeSettings["max_memory"] = fmt.Sprintf("%dGB", c.MemoryLimitGBWrite)
}
if c.CPUWrite > 0 {
writeSettings["threads"] = strconv.Itoa(c.CPUWrite)
}
// useful for motherduck but safe to pass at initial connect
writeSettings["custom_user_agent"] = "rill"
return writeSettings
}
15 changes: 7 additions & 8 deletions runtime/drivers/duckdb/config_test.go
Original file line number Diff line number Diff line change
@@ -17,18 +17,17 @@ func TestConfig(t *testing.T) {

cfg, err = newConfig(map[string]any{"dsn": "", "memory_limit_gb": "1", "cpu": 2})
require.NoError(t, err)
require.Equal(t, "1", cfg.ReadSettings["threads"])
require.Equal(t, "1", cfg.WriteSettings["threads"])
require.Equal(t, "1", cfg.readSettings()["threads"])
require.Equal(t, "1", cfg.readSettings()["threads"])
require.Equal(t, 2, cfg.PoolSize)

cfg, err = newConfig(map[string]any{"data_dir": "path/to"})
require.NoError(t, err)
require.Subset(t, cfg.WriteSettings, map[string]string{"custom_user_agent": "rill"})
require.Subset(t, cfg.writeSettings(), map[string]string{"custom_user_agent": "rill"})
require.Equal(t, 2, cfg.PoolSize)

cfg, err = newConfig(map[string]any{"data_dir": "path/to", "pool_size": 10})
require.NoError(t, err)
require.Subset(t, cfg.WriteSettings, map[string]string{"custom_user_agent": "rill"})
require.Equal(t, 10, cfg.PoolSize)

_, err = newConfig(map[string]any{"dsn": "path/to/duck.db?max_memory=4GB", "pool_size": "abc"})
@@ -42,9 +41,9 @@ func TestConfig(t *testing.T) {

cfg, err = newConfig(map[string]any{"dsn": "duck.db", "memory_limit_gb": "8", "cpu": "2"})
require.NoError(t, err)
require.Equal(t, "1", cfg.ReadSettings["threads"])
require.Equal(t, "1", cfg.WriteSettings["threads"])
require.Equal(t, "4", cfg.ReadSettings["max_memory"])
require.Equal(t, "4", cfg.WriteSettings["max_memory"])
require.Equal(t, "1", cfg.readSettings()["threads"])
require.Equal(t, "1", cfg.writeSettings()["threads"])
require.Equal(t, "4", cfg.readSettings()["max_memory"])
require.Equal(t, "4", cfg.writeSettings()["max_memory"])
require.Equal(t, 2, cfg.PoolSize)
}
4 changes: 2 additions & 2 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
@@ -506,8 +506,8 @@ func (c *connection) reopenDB(ctx context.Context, clean bool) error {
c.db, err = rduckdb.NewDB(ctx, &rduckdb.DBOptions{
LocalPath: c.config.DataDir,
Remote: c.data,
ReadSettings: c.config.ReadSettings,
WriteSettings: c.config.WriteSettings,
ReadSettings: c.config.readSettings(),
WriteSettings: c.config.writeSettings(),
InitQueries: bootQueries,
Logger: logger,
OtelAttributes: []attribute.KeyValue{attribute.String("instance_id", c.instanceID)},
253 changes: 253 additions & 0 deletions runtime/pkg/rduckdb/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
Example init logic:
- Sync remote files with the local cache
- Create a catalog
- Traverse the local files and call addTableVersion for table
Example write logic:
- Call addTableVersion after adding a new table version
- Call removeTable when deleting a table
Example read logic:
- Call acquireSnapshot when starting a read
- If it doesn't already exist, create a schema for the snapshot ID with views for all the table version in the snapshot
- Call releaseSnapshot when done reading the snapshot
Example removeFunc logic:
- Detach the version
- Remove the version file
- If there are no files left in it, remove the table folder
*/
package rduckdb

import (
"context"
"fmt"

"golang.org/x/sync/semaphore"
)

// Represents one table and its versions currently present in the local cache.
type table struct {
name string
deleted bool
currentVersion string
versionReferenceCounts map[string]int
versionMeta map[string]*tableMeta
}

// Represents a snapshot of table versions.
// The table versions referenced by the snapshot are guaranteed to exist for as long as the snapshot is acquired.
type snapshot struct {
id int
referenceCount int
tables []*tableMeta
// if snapshot is ready to be served then ready will be marked true
ready bool
}

// Represents a catalog of available table versions.
// It is thread-safe and supports acquiring a snapshot of table versions which will not be mutated or removed for as long as the snapshot is held.
type catalog struct {
sem *semaphore.Weighted
tables map[string]*table
snapshots map[int]*snapshot
currentSnapshotID int

removeVersionFunc func(context.Context, string, string) error
removeSnapshotFunc func(context.Context, int) error
}

// newCatalog creates a new catalog.
// The removeSnapshotFunc func will be called exactly once for each snapshot ID when it is no longer the current snapshot and is no longer held by any readers.
// The removeVersionFunc func will be called exactly once for each table version when it is no longer the current version and is no longer used by any active snapshots.
func newCatalog(removeVersionFunc func(context.Context, string, string) error, removeSnapshotFunc func(context.Context, int) error) *catalog {
return &catalog{
sem: semaphore.NewWeighted(1),
tables: make(map[string]*table),
snapshots: make(map[int]*snapshot),
removeVersionFunc: removeVersionFunc,
removeSnapshotFunc: removeSnapshotFunc,
}
}

func (c *catalog) tableMeta(ctx context.Context, name string) (*tableMeta, error) {
err := c.sem.Acquire(ctx, 1)
if err != nil {
return nil, err
}
defer c.sem.Release(1)

t, ok := c.tables[name]
if !ok || t.deleted {
return nil, errNotFound
}
meta, ok := t.versionMeta[t.currentVersion]
if !ok {
return nil, fmt.Errorf("internal error: meta for version %q not found", t.currentVersion)
}
return meta, nil
}

// addTableVersion registers a new version of a table.
// If the table name has not been seen before, it is added to the catalog.
func (c *catalog) addTableVersion(ctx context.Context, name string, meta *tableMeta) error {
err := c.sem.Acquire(ctx, 1)
if err != nil {
return err
}
defer c.sem.Release(1)

t, ok := c.tables[name]
if !ok {
t = &table{
name: name,
versionReferenceCounts: make(map[string]int),
versionMeta: make(map[string]*tableMeta),
}
c.tables[name] = t
}

oldVersion := t.currentVersion
t.deleted = false // In case the table was deleted previously, but a snapshot still references it.
t.currentVersion = meta.Version
t.versionMeta[meta.Version] = meta
c.acquireVersion(t, t.currentVersion)
if oldVersion != "" {
_ = c.releaseVersion(ctx, t, oldVersion)
}

c.currentSnapshotID++
return nil
}

// removeTable removes a table from the catalog.
// If the table is currently used by a snapshot, it will stay in the catalog but marked with deleted=true.
// When the last snapshot referencing the table is released, the table will be removed completely.
func (c *catalog) removeTable(ctx context.Context, name string) error {
err := c.sem.Acquire(ctx, 1)
if err != nil {
return err
}
defer c.sem.Release(1)

t, ok := c.tables[name]
if !ok {
return fmt.Errorf("table %q not found", name)
}

oldVersion := t.currentVersion
t.deleted = true
t.currentVersion = ""
return c.releaseVersion(ctx, t, oldVersion)
}

// listTables returns tableMeta for all active tables present in the catalog.
func (c *catalog) listTables(ctx context.Context) ([]*tableMeta, error) {
err := c.sem.Acquire(ctx, 1)
if err != nil {
return nil, err
}
defer c.sem.Release(1)

tables := make([]*tableMeta, 0)
for _, t := range c.tables {
if t.deleted {
continue
}
meta, ok := t.versionMeta[t.currentVersion]
if !ok {
return nil, fmt.Errorf("internal error: meta for version %q not found", t.currentVersion)
}
tables = append(tables, meta)
}
return tables, nil
}

// acquireSnapshot acquires a snapshot of the current table versions.
func (c *catalog) acquireSnapshot(ctx context.Context) (*snapshot, error) {
err := c.sem.Acquire(ctx, 1)
if err != nil {
return nil, err
}
defer c.sem.Release(1)

s, ok := c.snapshots[c.currentSnapshotID]
if ok {
s.referenceCount++
return s, nil
}
// first acquire
s = &snapshot{
id: c.currentSnapshotID,
referenceCount: 1,
tables: make([]*tableMeta, 0),
}
for _, t := range c.tables {
if t.deleted {
continue
}

meta, ok := t.versionMeta[t.currentVersion]
if !ok {
return nil, fmt.Errorf("internal error: meta for version %q not found", t.currentVersion)
}
s.tables = append(s.tables, meta)
c.acquireVersion(t, t.currentVersion)
}
c.snapshots[c.currentSnapshotID] = s
return s, nil
}

// releaseSnapshot releases a snapshot of table versions.
func (c *catalog) releaseSnapshot(ctx context.Context, s *snapshot) error {
err := c.sem.Acquire(ctx, 1)
if err != nil {
return err
}
defer c.sem.Release(1)

s.referenceCount--
if s.referenceCount > 0 {
return nil
}

for _, meta := range s.tables {
t, ok := c.tables[meta.Name]
if !ok {
return fmt.Errorf("internal error: table %q not found", meta.Name)
}
if err := c.releaseVersion(ctx, t, meta.Version); err != nil {
return err
}
}

delete(c.snapshots, s.id)
return c.removeSnapshotFunc(ctx, s.id)
}

// acquireVersion increments the reference count of a table version.
// It must be called while holding the catalog mutex.
func (c *catalog) acquireVersion(t *table, version string) {
referenceCount := t.versionReferenceCounts[version]
referenceCount++
t.versionReferenceCounts[version] = referenceCount
}

// releaseVersion decrements the reference count of a table version.
// If the reference count reaches zero and the version is no longer the current version, it is removec.
func (c *catalog) releaseVersion(ctx context.Context, t *table, version string) error {
referenceCount, ok := t.versionReferenceCounts[version]
if !ok {
return fmt.Errorf("version %q of table %q not found", version, t.name)
}
referenceCount--
if referenceCount > 0 {
t.versionReferenceCounts[version] = referenceCount
return nil
}

delete(t.versionReferenceCounts, version)
if t.deleted && len(t.versionReferenceCounts) == 0 {
delete(c.tables, t.name)
}

return c.removeVersionFunc(ctx, t.name, version)
}
328 changes: 167 additions & 161 deletions runtime/pkg/rduckdb/db.go

Large diffs are not rendered by default.

54 changes: 53 additions & 1 deletion runtime/pkg/rduckdb/db_test.go
Original file line number Diff line number Diff line change
@@ -57,10 +57,11 @@ func TestDB(t *testing.T) {
require.NoError(t, release())

// Add column
db.MutateTable(ctx, "test2", func(ctx context.Context, conn *sqlx.Conn) error {
err = db.MutateTable(ctx, "test2", func(ctx context.Context, conn *sqlx.Conn) error {
_, err := conn.ExecContext(ctx, "ALTER TABLE test2 ADD COLUMN city TEXT")
return err
})
require.NoError(t, err)

// drop table
err = db.DropTable(ctx, "test2")
@@ -224,6 +225,49 @@ func TestResetLocal(t *testing.T) {
verifyTable(t, db, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}})
}

func TestConcurrentReads(t *testing.T) {
testDB, _, _ := prepareDB(t)
ctx := context.Background()

// create table
err := testDB.CreateTableAsSelect(ctx, "pest", "SELECT 2 AS id, 'USA' AS country", &CreateTableOptions{})
require.NoError(t, err)

// create test table
err = testDB.CreateTableAsSelect(ctx, "test", "SELECT 1 AS id, 'India' AS country", &CreateTableOptions{})
require.NoError(t, err)

// acquire connection
conn1, release1, err1 := testDB.AcquireReadConnection(ctx)
require.NoError(t, err1)

// replace with a view
err = testDB.CreateTableAsSelect(ctx, "test", "SELECT * FROM pest", &CreateTableOptions{View: true})
require.NoError(t, err)

// acquire connection
conn2, release2, err2 := testDB.AcquireReadConnection(ctx)
require.NoError(t, err2)

// drop table
err = testDB.DropTable(ctx, "test")

// verify both tables are still accessible
verifyTableForConn(t, conn1, "SELECT id, country FROM test", []testData{{ID: 1, Country: "India"}})
require.NoError(t, release1())
verifyTableForConn(t, conn2, "SELECT id, country FROM test", []testData{{ID: 2, Country: "USA"}})
require.NoError(t, release2())

// acquire connection to see that table is now dropped
conn3, release3, err3 := testDB.AcquireReadConnection(ctx)
require.NoError(t, err3)
var id int
var country string
err = conn3.QueryRowContext(ctx, "SELECT id, country FROM test").Scan(&id, &country)
require.Error(t, err)
require.NoError(t, release3())
}

func prepareDB(t *testing.T) (db DB, localDir, remoteDir string) {
localDir = t.TempDir()
ctx := context.Background()
@@ -257,6 +301,14 @@ func verifyTable(t *testing.T, db DB, query string, data []testData) {
require.Equal(t, data, scannedData)
}

func verifyTableForConn(t *testing.T, conn *sqlx.Conn, query string, data []testData) {
ctx := context.Background()
var scannedData []testData
err := conn.SelectContext(ctx, &scannedData, query)
require.NoError(t, err)
require.Equal(t, data, scannedData)
}

type testData struct {
ID int `db:"id"`
Country string `db:"country"`
42 changes: 25 additions & 17 deletions runtime/pkg/rduckdb/remote.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ func (d *db) pullFromRemote(ctx context.Context) error {
}
d.logger.Debug("syncing from remote")
// Create an errgroup for background downloads with limited concurrency.
g, ctx := errgroup.WithContext(ctx)
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(8)

objects := d.remote.List(&blob.ListOptions{
@@ -39,7 +39,7 @@ func (d *db) pullFromRemote(ctx context.Context) error {
// Stop the loop if the ctx was cancelled
var stop bool
select {
case <-ctx.Done():
case <-gctx.Done():
stop = true
default:
// don't break
@@ -48,7 +48,7 @@ func (d *db) pullFromRemote(ctx context.Context) error {
break // can't use break inside the select
}

obj, err := objects.Next(ctx)
obj, err := objects.Next(gctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
@@ -64,8 +64,8 @@ func (d *db) pullFromRemote(ctx context.Context) error {

// get version of the table
var b []byte
err = retry(ctx, func() error {
res, err := d.remote.ReadAll(ctx, path.Join(table, "meta.json"))
err = retry(gctx, func() error {
res, err := d.remote.ReadAll(gctx, path.Join(table, "meta.json"))
if err != nil {
return err
}
@@ -87,36 +87,44 @@ func (d *db) pullFromRemote(ctx context.Context) error {
continue
}

// check with current version
meta, _ := d.tableMeta(table)
// check if table in catalog is already upto date
meta, _ := d.catalog.tableMeta(gctx, table)
if meta != nil && meta.Version == backedUpMeta.Version {
d.logger.Debug("SyncWithObjectStorage: table is already up to date", slog.String("table", table))
continue
}
tblMetas[table] = backedUpMeta

// check if table is locally present but not added to catalog yet
meta, _ = d.tableMeta(table)
if meta != nil && meta.Version == backedUpMeta.Version {
d.logger.Debug("SyncWithObjectStorage: local table is not present in catalog", slog.String("table", table))
tblMetas[table] = backedUpMeta
continue
}
if err := os.MkdirAll(filepath.Join(d.localPath, table, backedUpMeta.Version), os.ModePerm); err != nil {
return err
}

tblIter := d.remote.List(&blob.ListOptions{Prefix: path.Join(table, backedUpMeta.Version)})
// download all objects in the table and current version
for {
obj, err := tblIter.Next(ctx)
obj, err := tblIter.Next(gctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
g.Go(func() error {
return retry(ctx, func() error {
return retry(gctx, func() error {
file, err := os.Create(filepath.Join(d.localPath, obj.Key))
if err != nil {
return err
}
defer file.Close()

rdr, err := d.remote.NewReader(ctx, obj.Key, nil)
rdr, err := d.remote.NewReader(gctx, obj.Key, nil)
if err != nil {
return err
}
@@ -141,6 +149,10 @@ func (d *db) pullFromRemote(ctx context.Context) error {
if err != nil {
return err
}
err = d.catalog.addTableVersion(ctx, table, meta)
if err != nil {
return err
}
}

// mark tables that are not in remote for delete later
@@ -155,14 +167,10 @@ func (d *db) pullFromRemote(ctx context.Context) error {
if _, ok := tblMetas[entry.Name()]; ok {
continue
}
// get current meta
meta, _ := d.tableMeta(entry.Name())
if meta == nil {
// cleanup ??
continue
err = d.catalog.removeTable(ctx, entry.Name())
if err != nil {
return err
}
meta.Deleted = true
_ = d.writeTableMeta(entry.Name(), meta)
}
return nil
}