Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DuckDB with backup on GCS #6006

Merged
merged 85 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
41fbea9
initial commit
k-anshul Oct 29, 2024
854a666
use latest replicator version
k-anshul Oct 30, 2024
c86ed09
transporter changes
k-anshul Nov 1, 2024
c7fd731
fixed transporters
k-anshul Nov 1, 2024
c411c9f
set backup directory
k-anshul Nov 4, 2024
9330a7a
test fixes
k-anshul Nov 4, 2024
a1d4a10
lint fixes
k-anshul Nov 5, 2024
b27121c
postgres tests fix
k-anshul Nov 5, 2024
089d417
self review
k-anshul Nov 5, 2024
1a4c2b2
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Nov 6, 2024
279d207
Import
begelundmuller Nov 8, 2024
05a0603
Remove go.mod
begelundmuller Nov 8, 2024
13653fd
use single local directory
k-anshul Nov 11, 2024
e9a8c6c
use metadata.json for each table
k-anshul Nov 15, 2024
0acae1c
use semaphore instead of mutex for write locks
k-anshul Nov 18, 2024
9a6f9e6
local db monitor
k-anshul Nov 18, 2024
eac6d1b
small fixes
k-anshul Nov 18, 2024
6bd2177
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Nov 18, 2024
ad95df0
add data bucket
k-anshul Nov 19, 2024
11203b4
fix snowflake
k-anshul Nov 19, 2024
09424ba
non blocking read handle updates
k-anshul Nov 20, 2024
3b0eee7
use tableMeta plus minor fix
k-anshul Nov 20, 2024
50660ce
small cleanups
k-anshul Nov 21, 2024
365f484
use a catalog to manage table lifecyle
k-anshul Nov 25, 2024
2c3b485
use catalog to check if table exists
k-anshul Nov 25, 2024
c7ebb0b
Merge remote-tracking branch 'origin/main' into begelundmuller/import…
k-anshul Nov 25, 2024
c369e09
add concurrent access unit tests
k-anshul Nov 25, 2024
1f59235
minor tweaks
k-anshul Nov 26, 2024
a390746
data bucket for persisting data to gcs
k-anshul Nov 26, 2024
5325252
test fix
k-anshul Nov 26, 2024
953247d
also prefix with driver
k-anshul Nov 26, 2024
b10cfab
merge with main
k-anshul Nov 26, 2024
5585e3b
close bucket plus directory prefix
k-anshul Nov 26, 2024
d3bfbb6
bucket is closed when prefixed so need to open new data bucket all times
k-anshul Nov 26, 2024
3d5fe90
Merge remote-tracking branch 'origin/main' into begelundmuller/import…
k-anshul Nov 27, 2024
e5094c4
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Nov 27, 2024
f51a2b1
Update runtime/pkg/rduckdb/db.go
k-anshul Nov 28, 2024
5732e6e
remove ctx cancellation from catalog
k-anshul Nov 29, 2024
154ed02
close fix
k-anshul Nov 29, 2024
e3ccead
small renames
k-anshul Nov 29, 2024
86aaf2a
view fix
k-anshul Nov 29, 2024
5fcc934
add a storage client and remove preset data_dir
k-anshul Dec 2, 2024
539b481
lint fixes
k-anshul Dec 2, 2024
3f86e55
small refactor
k-anshul Dec 2, 2024
1907725
Apply suggestions from code review
k-anshul Dec 2, 2024
ea8040c
name in connection cache
k-anshul Dec 2, 2024
e4b216f
fix build errors
k-anshul Dec 2, 2024
2566537
transporters fixed
k-anshul Dec 2, 2024
bbc4625
Merge branch 'data_bucket' into duckdb_gcs
k-anshul Dec 2, 2024
214242a
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 2, 2024
f1caa8f
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Dec 2, 2024
2ec4780
rename fix
k-anshul Dec 3, 2024
dabed2c
Merge remote-tracking branch 'origin/main' into begelundmuller/import…
k-anshul Dec 3, 2024
ebfd0ba
dsn fix
k-anshul Dec 3, 2024
f2631a2
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Dec 3, 2024
e6d30cd
write should acquire snapshot
k-anshul Dec 3, 2024
e90409c
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Dec 3, 2024
b462a96
missing withprefix
k-anshul Dec 3, 2024
ccc5729
storage APIs also create directories
k-anshul Dec 3, 2024
84fcf1c
Merge remote-tracking branch 'origin/main' into data_bucket
k-anshul Dec 3, 2024
84b4f59
fix and add unit test
k-anshul Dec 3, 2024
590dee8
pullFromRemote fix and other review comments
k-anshul Dec 4, 2024
385652f
some more tests
k-anshul Dec 4, 2024
ed7fd81
remove invalid tables
k-anshul Dec 4, 2024
952651a
use unique directory in temp directory
k-anshul Dec 4, 2024
d46a38f
Merge branch 'begelundmuller/import-duckdb-replicator' into duckdb_gcs
k-anshul Dec 4, 2024
754c3ba
Merge branch 'data_bucket' into duckdb_gcs
k-anshul Dec 4, 2024
788a459
interim commit
k-anshul Dec 4, 2024
6befdc8
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 4, 2024
43e4b4f
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 4, 2024
c601937
remove isview param
k-anshul Dec 5, 2024
d863e2c
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 5, 2024
c609613
fix some more tests
k-anshul Dec 5, 2024
5b4148b
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 5, 2024
770f759
reopen db in a separate goroutine
k-anshul Dec 6, 2024
da86f4f
fix more tests
k-anshul Dec 7, 2024
2af27c0
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 7, 2024
8842b70
more cleanups
k-anshul Dec 9, 2024
5e1b2cb
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 9, 2024
1f1ddfa
small background task fix
k-anshul Dec 9, 2024
584bfba
add backward compatibility
k-anshul Dec 10, 2024
f44cbcd
Merge remote-tracking branch 'origin/main' into duckdb_gcs
k-anshul Dec 10, 2024
058c21a
review comments
k-anshul Dec 13, 2024
56f4e0b
custom temp and secret directory
k-anshul Dec 13, 2024
f45c379
remove custom temp directory
k-anshul Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 0 additions & 49 deletions runtime/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,55 +507,6 @@ select 1
testruntime.RequireIsView(t, olap, "bar", true)
}

func TestModelCTE(t *testing.T) {
// Create a model that references a source
rt, id := testruntime.NewInstance(t)
testruntime.PutFiles(t, rt, id, map[string]string{
"/data/foo.csv": `a,b,c,d,e
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5
`,
"/sources/foo.yaml": `
connector: local_file
path: data/foo.csv
`,
"/models/bar.sql": `SELECT * FROM foo`,
})
testruntime.ReconcileParserAndWait(t, rt, id)
testruntime.RequireReconcileState(t, rt, id, 3, 0, 0)
model, modelRes := newModel("SELECT * FROM foo", "bar", "foo")
testruntime.RequireResource(t, rt, id, modelRes)
testruntime.RequireOLAPTable(t, rt, id, "bar")

// Update model to have a CTE with alias different from the source
testruntime.PutFiles(t, rt, id, map[string]string{
"/models/bar.sql": `with CTEAlias as (select * from foo) select * from CTEAlias`,
})
testruntime.ReconcileParserAndWait(t, rt, id)
testruntime.RequireReconcileState(t, rt, id, 3, 0, 0)
model.Spec.InputProperties = must(structpb.NewStruct(map[string]any{"sql": `with CTEAlias as (select * from foo) select * from CTEAlias`}))
testruntime.RequireResource(t, rt, id, modelRes)
testruntime.RequireOLAPTable(t, rt, id, "bar")

// TODO :: Not sure how this can be tested
// The query will succeed when creating model (foo is attached in default schema so memory.foo will work)
// But when querying foo is attached in non default schema (memory.main_x.foo) so memory.foo will not work

// Update model to have a CTE with alias same as the source
testruntime.PutFiles(t, rt, id, map[string]string{
"/models/bar.sql": `with foo as (select * from memory.foo) select * from foo`,
})
testruntime.ReconcileParserAndWait(t, rt, id)
testruntime.RequireReconcileState(t, rt, id, 3, 0, 0)
model.Spec.InputProperties = must(structpb.NewStruct(map[string]any{"sql": `with foo as (select * from memory.foo) select * from foo`}))
modelRes.Meta.Refs = []*runtimev1.ResourceName{}
testruntime.RequireResource(t, rt, id, modelRes)
// Refs are removed but the model is valid.
// TODO: is this expected?
// testruntime.RequireOLAPTable(t, rt, id, "bar")
}

func TestRename(t *testing.T) {
// Rename model A to B and model B to A, verify success
// Rename model A to B and source B to A, verify success
Expand Down
8 changes: 2 additions & 6 deletions runtime/drivers/duckdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ const (

// config represents the DuckDB driver config
type config struct {
// DataDir is the path to directory where duckdb files will be created.
DataDir string `mapstructure:"data_dir"`
// PoolSize is the number of concurrent connections and queries allowed
PoolSize int `mapstructure:"pool_size"`
// AllowHostAccess denotes whether to limit access to the local environment and file system
Expand All @@ -36,10 +34,8 @@ type config struct {
LogQueries bool `mapstructure:"log_queries"`
}

func newConfig(cfgMap map[string]any, dataDir string) (*config, error) {
cfg := &config{
DataDir: dataDir,
}
func newConfig(cfgMap map[string]any) (*config, error) {
cfg := &config{}
err := mapstructure.WeakDecode(cfgMap, cfg)
if err != nil {
return nil, fmt.Errorf("could not decode config: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions runtime/drivers/duckdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ import (
)

func TestConfig(t *testing.T) {
cfg, err := newConfig(map[string]any{}, "")
cfg, err := newConfig(map[string]any{})
require.NoError(t, err)
require.Equal(t, 2, cfg.PoolSize)

cfg, err = newConfig(map[string]any{"dsn": "", "cpu": 2}, "")
cfg, err = newConfig(map[string]any{"dsn": "", "cpu": 2})
require.NoError(t, err)
require.Equal(t, "2", cfg.readSettings()["threads"])
require.Subset(t, cfg.writeSettings(), map[string]string{"custom_user_agent": "rill"})
require.Equal(t, 2, cfg.PoolSize)

cfg, err = newConfig(map[string]any{"pool_size": 10}, "path/to")
cfg, err = newConfig(map[string]any{"pool_size": 10})
require.NoError(t, err)
require.Equal(t, 10, cfg.PoolSize)

cfg, err = newConfig(map[string]any{"dsn": "duck.db", "memory_limit_gb": "8", "cpu": "2"}, "path/to")
cfg, err = newConfig(map[string]any{"dsn": "duck.db", "memory_limit_gb": "8", "cpu": "2"})
require.NoError(t, err)
require.Equal(t, "2", cfg.readSettings()["threads"])
require.Equal(t, "", cfg.writeSettings()["threads"])
Expand Down
36 changes: 26 additions & 10 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"net/url"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -141,12 +142,7 @@ func (d Driver) Open(instanceID string, cfgMap map[string]any, st *storage.Clien
logger.Warn("failed to install embedded DuckDB extensions, let DuckDB download them", zap.Error(err))
}

dataDir, err := st.DataDir()
if err != nil {
return nil, err
}

cfg, err := newConfig(cfgMap, dataDir)
cfg, err := newConfig(cfgMap)
if err != nil {
return nil, err
}
Expand All @@ -163,6 +159,7 @@ func (d Driver) Open(instanceID string, cfgMap map[string]any, st *storage.Clien
config: cfg,
logger: logger,
activity: ac,
storage: st,
metaSem: semaphore.NewWeighted(1),
olapSem: priorityqueue.NewSemaphore(olapSemSize),
longRunningSem: semaphore.NewWeighted(1), // Currently hard-coded to 1
Expand Down Expand Up @@ -191,9 +188,12 @@ func (d Driver) Open(instanceID string, cfgMap map[string]any, st *storage.Clien
// Open the DB
err = c.reopenDB(context.Background())
if err != nil {
if remote != nil {
_ = remote.Close()
}
// Check for another process currently accessing the DB
if strings.Contains(err.Error(), "Could not set lock on file") {
panic(fmt.Errorf("failed to open database (is Rill already running?): %w", err))
return nil, fmt.Errorf("failed to open database (is Rill already running?): %w", err)
}
return nil, err
}
Expand Down Expand Up @@ -269,6 +269,7 @@ type connection struct {
config *config
logger *zap.Logger
activity *activity.Client
storage *storage.Client
remote *blob.Bucket
// This driver may issue both OLAP and "meta" queries (like catalog info) against DuckDB.
// Meta queries are usually fast, but OLAP queries may take a long time. To enable predictable parallel performance,
Expand Down Expand Up @@ -330,6 +331,9 @@ func (c *connection) Config() map[string]any {
func (c *connection) Close() error {
c.cancel()
_ = c.registration.Unregister()
if c.remote != nil {
_ = c.remote.Close()
}
if c.db != nil {
return c.db.Close()
}
Expand Down Expand Up @@ -481,10 +485,23 @@ func (c *connection) reopenDB(ctx context.Context) error {
"SET old_implicit_casting = true", // Implicit Cast to VARCHAR
)

dataDir, err := c.storage.DataDir()
if err != nil {
return err
}

// We want to set preserve_insertion_order=false in hosted environments only (where source data is never viewed directly). Setting it reduces batch data ingestion time by ~40%.
// Hack: Using AllowHostAccess as a proxy indicator for a hosted environment.
if !c.config.AllowHostAccess {
bootQueries = append(bootQueries, "SET preserve_insertion_order TO false")
tempDir, err := c.storage.TempDir()
if err != nil {
return err
}
bootQueries = append(bootQueries,
"SET preserve_insertion_order TO false",
fmt.Sprintf("SET temp_directory = %s", safeSQLString(tempDir)),
fmt.Sprintf("SET secret_directory = %s", safeSQLString(filepath.Join(dataDir, ".duckdb", "secrets"))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this interfere with external table storage or will it just be ignored?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. These queries are executed for every db file so setting secret_directory is what is expected.
But there could be issues if all database share same tmp directory. May be for new removing temp_directory seems to be a safe option.
More important was to set unique secret_directory for each instance's database.

)
}

// Add init SQL if provided
Expand All @@ -496,9 +513,8 @@ func (c *connection) reopenDB(ctx context.Context) error {
logger := slog.New(zapslog.NewHandler(c.logger.Core(), &zapslog.HandlerOptions{
AddSource: true,
}))
var err error
c.db, err = rduckdb.NewDB(ctx, &rduckdb.DBOptions{
LocalPath: c.config.DataDir,
LocalPath: dataDir,
Remote: c.remote,
ReadSettings: c.config.readSettings(),
WriteSettings: c.config.writeSettings(),
Expand Down
10 changes: 1 addition & 9 deletions runtime/pkg/rduckdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func NewDB(ctx context.Context, opts *DBOptions) (DB, error) {
opts.Logger,
)

db.dbHandle, err = db.openDBAndAttach(ctx, "", "", true)
db.dbHandle, err = db.openDBAndAttach(ctx, filepath.Join(db.localPath, "main.db"), "", true)
if err != nil {
if strings.Contains(err.Error(), "Symbol not found") {
fmt.Printf("Your version of macOS is not supported. Please upgrade to the latest major release of macOS. See this link for details: https://support.apple.com/en-in/macos/upgrade")
Expand All @@ -275,14 +275,6 @@ func NewDB(ctx context.Context, opts *DBOptions) (DB, error) {
return nil, err
}

// We want to prevent multiple rill process accessing same db files.
// All the files are accessed in read-only mode so it is possible for multiple rill process to access same db files.
// To prevent this we attach a dummy db file to the main in-memory db in write mode.
// This is required for local rill only but since there is no way to determine it in this package so we do it for all.
_, err = db.dbHandle.ExecContext(ctx, fmt.Sprintf("ATTACH %s AS __ymmud__", safeSQLString(filepath.Join(db.localPath, "main.db"))))
if err != nil {
return nil, err
}
go db.localDBMonitor()
return db, nil
}
Expand Down
Loading