Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anshul committed Dec 13, 2024
1 parent f44cbcd commit 058c21a
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 77 deletions.
49 changes: 0 additions & 49 deletions runtime/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,55 +507,6 @@ select 1
testruntime.RequireIsView(t, olap, "bar", true)
}

func TestModelCTE(t *testing.T) {
// Create a model that references a source
rt, id := testruntime.NewInstance(t)
testruntime.PutFiles(t, rt, id, map[string]string{
"/data/foo.csv": `a,b,c,d,e
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5
`,
"/sources/foo.yaml": `
connector: local_file
path: data/foo.csv
`,
"/models/bar.sql": `SELECT * FROM foo`,
})
testruntime.ReconcileParserAndWait(t, rt, id)
testruntime.RequireReconcileState(t, rt, id, 3, 0, 0)
model, modelRes := newModel("SELECT * FROM foo", "bar", "foo")
testruntime.RequireResource(t, rt, id, modelRes)
testruntime.RequireOLAPTable(t, rt, id, "bar")

// Update model to have a CTE with alias different from the source
testruntime.PutFiles(t, rt, id, map[string]string{
"/models/bar.sql": `with CTEAlias as (select * from foo) select * from CTEAlias`,
})
testruntime.ReconcileParserAndWait(t, rt, id)
testruntime.RequireReconcileState(t, rt, id, 3, 0, 0)
model.Spec.InputProperties = must(structpb.NewStruct(map[string]any{"sql": `with CTEAlias as (select * from foo) select * from CTEAlias`}))
testruntime.RequireResource(t, rt, id, modelRes)
testruntime.RequireOLAPTable(t, rt, id, "bar")

// TODO :: Not sure how this can be tested
// The query will succeed when creating model (foo is attached in default schema so memory.foo will work)
// But when querying foo is attached in non default schema (memory.main_x.foo) so memory.foo will not work

// Update model to have a CTE with alias same as the source
testruntime.PutFiles(t, rt, id, map[string]string{
"/models/bar.sql": `with foo as (select * from memory.foo) select * from foo`,
})
testruntime.ReconcileParserAndWait(t, rt, id)
testruntime.RequireReconcileState(t, rt, id, 3, 0, 0)
model.Spec.InputProperties = must(structpb.NewStruct(map[string]any{"sql": `with foo as (select * from memory.foo) select * from foo`}))
modelRes.Meta.Refs = []*runtimev1.ResourceName{}
testruntime.RequireResource(t, rt, id, modelRes)
// Refs are removed but the model is valid.
// TODO: is this expected?
// testruntime.RequireOLAPTable(t, rt, id, "bar")
}

func TestRename(t *testing.T) {
// Rename model A to B and model B to A, verify success
// Rename model A to B and source B to A, verify success
Expand Down
8 changes: 2 additions & 6 deletions runtime/drivers/duckdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ const (

// config represents the DuckDB driver config
type config struct {
// DataDir is the path to directory where duckdb files will be created.
DataDir string `mapstructure:"data_dir"`
// PoolSize is the number of concurrent connections and queries allowed
PoolSize int `mapstructure:"pool_size"`
// AllowHostAccess denotes whether to limit access to the local environment and file system
Expand All @@ -36,10 +34,8 @@ type config struct {
LogQueries bool `mapstructure:"log_queries"`
}

func newConfig(cfgMap map[string]any, dataDir string) (*config, error) {
cfg := &config{
DataDir: dataDir,
}
func newConfig(cfgMap map[string]any) (*config, error) {
cfg := &config{}
err := mapstructure.WeakDecode(cfgMap, cfg)
if err != nil {
return nil, fmt.Errorf("could not decode config: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions runtime/drivers/duckdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ import (
)

func TestConfig(t *testing.T) {
cfg, err := newConfig(map[string]any{}, "")
cfg, err := newConfig(map[string]any{})
require.NoError(t, err)
require.Equal(t, 2, cfg.PoolSize)

cfg, err = newConfig(map[string]any{"dsn": "", "cpu": 2}, "")
cfg, err = newConfig(map[string]any{"dsn": "", "cpu": 2})
require.NoError(t, err)
require.Equal(t, "2", cfg.readSettings()["threads"])
require.Subset(t, cfg.writeSettings(), map[string]string{"custom_user_agent": "rill"})
require.Equal(t, 2, cfg.PoolSize)

cfg, err = newConfig(map[string]any{"pool_size": 10}, "path/to")
cfg, err = newConfig(map[string]any{"pool_size": 10})
require.NoError(t, err)
require.Equal(t, 10, cfg.PoolSize)

cfg, err = newConfig(map[string]any{"dsn": "duck.db", "memory_limit_gb": "8", "cpu": "2"}, "path/to")
cfg, err = newConfig(map[string]any{"dsn": "duck.db", "memory_limit_gb": "8", "cpu": "2"})
require.NoError(t, err)
require.Equal(t, "2", cfg.readSettings()["threads"])
require.Equal(t, "", cfg.writeSettings()["threads"])
Expand Down
24 changes: 15 additions & 9 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,7 @@ func (d Driver) Open(instanceID string, cfgMap map[string]any, st *storage.Clien
logger.Warn("failed to install embedded DuckDB extensions, let DuckDB download them", zap.Error(err))
}

dataDir, err := st.DataDir()
if err != nil {
return nil, err
}

cfg, err := newConfig(cfgMap, dataDir)
cfg, err := newConfig(cfgMap)
if err != nil {
return nil, err
}
Expand All @@ -163,6 +158,7 @@ func (d Driver) Open(instanceID string, cfgMap map[string]any, st *storage.Clien
config: cfg,
logger: logger,
activity: ac,
storage: st,
metaSem: semaphore.NewWeighted(1),
olapSem: priorityqueue.NewSemaphore(olapSemSize),
longRunningSem: semaphore.NewWeighted(1), // Currently hard-coded to 1
Expand Down Expand Up @@ -191,9 +187,12 @@ func (d Driver) Open(instanceID string, cfgMap map[string]any, st *storage.Clien
// Open the DB
err = c.reopenDB(context.Background())
if err != nil {
if remote != nil {
_ = remote.Close()
}
// Check for another process currently accessing the DB
if strings.Contains(err.Error(), "Could not set lock on file") {
panic(fmt.Errorf("failed to open database (is Rill already running?): %w", err))
return nil, fmt.Errorf("failed to open database (is Rill already running?): %w", err)
}
return nil, err
}
Expand Down Expand Up @@ -269,6 +268,7 @@ type connection struct {
config *config
logger *zap.Logger
activity *activity.Client
storage *storage.Client
remote *blob.Bucket
// This driver may issue both OLAP and "meta" queries (like catalog info) against DuckDB.
// Meta queries are usually fast, but OLAP queries may take a long time. To enable predictable parallel performance,
Expand Down Expand Up @@ -330,6 +330,9 @@ func (c *connection) Config() map[string]any {
func (c *connection) Close() error {
c.cancel()
_ = c.registration.Unregister()
if c.remote != nil {
_ = c.remote.Close()
}
if c.db != nil {
return c.db.Close()
}
Expand Down Expand Up @@ -493,12 +496,15 @@ func (c *connection) reopenDB(ctx context.Context) error {
}

// Create new DB
dataDir, err := c.storage.DataDir()
if err != nil {
return err
}
logger := slog.New(zapslog.NewHandler(c.logger.Core(), &zapslog.HandlerOptions{
AddSource: true,
}))
var err error
c.db, err = rduckdb.NewDB(ctx, &rduckdb.DBOptions{
LocalPath: c.config.DataDir,
LocalPath: dataDir,
Remote: c.remote,
ReadSettings: c.config.readSettings(),
WriteSettings: c.config.writeSettings(),
Expand Down
10 changes: 1 addition & 9 deletions runtime/pkg/rduckdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func NewDB(ctx context.Context, opts *DBOptions) (DB, error) {
opts.Logger,
)

db.dbHandle, err = db.openDBAndAttach(ctx, "", "", true)
db.dbHandle, err = db.openDBAndAttach(ctx, filepath.Join(db.localPath, "main.db"), "", true)
if err != nil {
if strings.Contains(err.Error(), "Symbol not found") {
fmt.Printf("Your version of macOS is not supported. Please upgrade to the latest major release of macOS. See this link for details: https://support.apple.com/en-in/macos/upgrade")
Expand All @@ -275,14 +275,6 @@ func NewDB(ctx context.Context, opts *DBOptions) (DB, error) {
return nil, err
}

// We want to prevent multiple rill process accessing same db files.
// All the files are accessed in read-only mode so it is possible for multiple rill process to access same db files.
// To prevent this we attach a dummy db file to the main in-memory db in write mode.
// This is required for local rill only but since there is no way to determine it in this package so we do it for all.
_, err = db.dbHandle.ExecContext(ctx, fmt.Sprintf("ATTACH %s AS __ymmud__", safeSQLString(filepath.Join(db.localPath, "main.db"))))
if err != nil {
return nil, err
}
go db.localDBMonitor()
return db, nil
}
Expand Down

0 comments on commit 058c21a

Please sign in to comment.