From 140dd8bb572196d44a1b2d9775cdf9bdbeb28c9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Fri, 8 Sep 2023 12:39:24 +0200 Subject: [PATCH 1/3] Fix comparison toplist time range nil checks (#3044) --- runtime/queries/metricsview_comparison_toplist.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/runtime/queries/metricsview_comparison_toplist.go b/runtime/queries/metricsview_comparison_toplist.go index f823434ade1..10da6b291b5 100644 --- a/runtime/queries/metricsview_comparison_toplist.go +++ b/runtime/queries/metricsview_comparison_toplist.go @@ -71,11 +71,11 @@ func (q *MetricsViewComparisonToplist) Resolve(ctx context.Context, rt *runtime. return fmt.Errorf("not available for dialect '%s'", olap.Dialect()) } - if q.MetricsView.TimeDimension == "" && (q.BaseTimeRange != nil || q.ComparisonTimeRange != nil) { + if q.MetricsView.TimeDimension == "" && (!isTimeRangeNil(q.BaseTimeRange) || !isTimeRangeNil(q.ComparisonTimeRange)) { return fmt.Errorf("metrics view '%s' does not have a time dimension", q.MetricsViewName) } - if q.ComparisonTimeRange != nil { + if !isTimeRangeNil(q.ComparisonTimeRange) { return q.executeComparisonToplist(ctx, olap, q.MetricsView, priority, q.ResolvedMVSecurity) } @@ -561,3 +561,7 @@ func validateSort(sorts []*runtimev1.MetricsViewComparisonSort) error { } return nil } + +func isTimeRangeNil(tr *runtimev1.TimeRange) bool { + return tr == nil || (tr.Start == nil && tr.End == nil) +} From ca2fc7baf1bd6fe995914aa987239c93f4397be6 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Fri, 8 Sep 2023 16:43:15 +0530 Subject: [PATCH 2/3] BigQuery storage api credentials fix (#3043) * bq storage api fix * credentials should be present in all cases * small fix --- runtime/drivers/bigquery/api.go | 14 ++++++++++++-- runtime/drivers/bigquery/bigquery.go | 10 +++------- runtime/drivers/bigquery/sql_store.go | 11 ++++++++--- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/runtime/drivers/bigquery/api.go b/runtime/drivers/bigquery/api.go index 59fdce53ede..7f7c0879847 100644 --- a/runtime/drivers/bigquery/api.go +++ b/runtime/drivers/bigquery/api.go @@ -11,7 +11,12 @@ import ( const defaultPageSize = 20 func (c *Connection) ListDatasets(ctx context.Context, req *runtimev1.BigQueryListDatasetsRequest) ([]string, string, error) { - client, err := c.createClient(ctx, &sourceProperties{ProjectID: bigquery.DetectProjectID}) + opts, err := c.clientOption(ctx) + if err != nil { + return nil, "", err + } + + client, err := bigquery.NewClient(ctx, bigquery.DetectProjectID, opts...) if err != nil { return nil, "", err } @@ -36,7 +41,12 @@ func (c *Connection) ListDatasets(ctx context.Context, req *runtimev1.BigQueryLi } func (c *Connection) ListTables(ctx context.Context, req *runtimev1.BigQueryListTablesRequest) ([]string, string, error) { - client, err := c.createClient(ctx, &sourceProperties{ProjectID: bigquery.DetectProjectID}) + opts, err := c.clientOption(ctx) + if err != nil { + return nil, "", err + } + + client, err := bigquery.NewClient(ctx, bigquery.DetectProjectID, opts...) if err != nil { return nil, "", err } diff --git a/runtime/drivers/bigquery/bigquery.go b/runtime/drivers/bigquery/bigquery.go index b2a274249b5..9f7a89f1fbc 100644 --- a/runtime/drivers/bigquery/bigquery.go +++ b/runtime/drivers/bigquery/bigquery.go @@ -2,7 +2,6 @@ package bigquery import ( "context" - "errors" "fmt" "os" "strings" @@ -219,13 +218,10 @@ func parseSourceProperties(props map[string]any) (*sourceProperties, error) { return conf, err } -func (c *Connection) createClient(ctx context.Context, props *sourceProperties) (*bigquery.Client, error) { +func (c *Connection) clientOption(ctx context.Context) ([]option.ClientOption, error) { creds, err := gcputil.Credentials(ctx, c.config.SecretJSON, c.config.AllowHostAccess) if err != nil { - if !errors.Is(err, gcputil.ErrNoCredentials) { - return nil, err - } - return bigquery.NewClient(ctx, props.ProjectID) + return nil, err } - return bigquery.NewClient(ctx, props.ProjectID, option.WithCredentials(creds)) + return []option.ClientOption{option.WithCredentials(creds)}, nil } diff --git a/runtime/drivers/bigquery/sql_store.go b/runtime/drivers/bigquery/sql_store.go index a972dd3d2ce..93922a5a291 100644 --- a/runtime/drivers/bigquery/sql_store.go +++ b/runtime/drivers/bigquery/sql_store.go @@ -37,7 +37,12 @@ func (c *Connection) QueryAsFiles(ctx context.Context, props map[string]any, sql return nil, err } - client, err := c.createClient(ctx, srcProps) + opts, err := c.clientOption(ctx) + if err != nil { + return nil, err + } + + client, err := bigquery.NewClient(ctx, srcProps.ProjectID, opts...) if err != nil { if strings.Contains(err.Error(), "unable to detect projectID") { return nil, fmt.Errorf("projectID not detected in credentials. Please set `project_id` in source yaml") @@ -45,7 +50,7 @@ func (c *Connection) QueryAsFiles(ctx context.Context, props map[string]any, sql return nil, fmt.Errorf("failed to create bigquery client: %w", err) } - if err := client.EnableStorageReadClient(ctx); err != nil { + if err := client.EnableStorageReadClient(ctx, opts...); err != nil { client.Close() return nil, err } @@ -60,7 +65,7 @@ func (c *Connection) QueryAsFiles(ctx context.Context, props map[string]any, sql // the query results are always cached in a temporary table that storage api can use // there are some exceptions when results aren't cached // so we also try without storage api - client, err = c.createClient(ctx, srcProps) + client, err = bigquery.NewClient(ctx, srcProps.ProjectID, opts...) if err != nil { return nil, fmt.Errorf("failed to create bigquery client: %w", err) } From 9461ca795c71df29664f95843d22228779681fbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Fri, 8 Sep 2023 15:18:05 +0200 Subject: [PATCH 3/3] Set DuckDB pool size to 4 on local (#3045) --- admin/deployments.go | 29 +++++++----- cli/pkg/local/app.go | 9 +++- runtime/drivers/drivers_test.go | 22 ++++----- runtime/drivers/duckdb/config.go | 67 ++++++++++++++------------- runtime/drivers/duckdb/config_test.go | 38 +++++++++------ runtime/drivers/duckdb/duckdb.go | 45 ++++++++---------- runtime/drivers/duckdb/duckdb_test.go | 16 +++---- runtime/drivers/duckdb/olap_test.go | 2 +- runtime/server/queries_test.go | 2 +- 9 files changed, 121 insertions(+), 109 deletions(-) diff --git a/admin/deployments.go b/admin/deployments.go index 38568920a53..7c6391c5be8 100644 --- a/admin/deployments.go +++ b/admin/deployments.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "path" + "strconv" "strings" "time" @@ -57,23 +58,24 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp // Build instance config instanceID := strings.ReplaceAll(uuid.New().String(), "-", "") olapDriver := opts.ProdOLAPDriver - olapDSN := opts.ProdOLAPDSN + olapConfig := map[string]string{} var embedCatalog bool var ingestionLimit int64 - if olapDriver == "duckdb" { - if olapDSN != "" { + switch olapDriver { + case "duckdb": + if opts.ProdOLAPDSN != "" { return nil, fmt.Errorf("passing a DSN is not allowed for driver 'duckdb'") } if opts.ProdSlots == 0 { return nil, fmt.Errorf("slot count can't be 0 for driver 'duckdb'") } + olapConfig["dsn"] = fmt.Sprintf("%s.db?max_memory=%dGB", path.Join(alloc.DataDir, instanceID), alloc.MemoryGB) + olapConfig["pool_size"] = strconv.Itoa(alloc.CPU) embedCatalog = true ingestionLimit = alloc.StorageBytes - - olapDSN = fmt.Sprintf("%s.db?rill_pool_size=%d&max_memory=%dGB", path.Join(alloc.DataDir, instanceID), alloc.CPU, alloc.MemoryGB) - } else if olapDriver == "duckdb-vip" { - if olapDSN != "" { + case "duckdb-vip": + if opts.ProdOLAPDSN != "" { return nil, fmt.Errorf("passing a DSN is not allowed for driver 'duckdb-vip'") } if opts.ProdSlots == 0 { @@ -81,12 +83,15 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp } // NOTE: Rewriting to a "duckdb" driver without CPU, memory, or storage limits - + olapDriver = "duckdb" + olapConfig["dsn"] = fmt.Sprintf("%s.db", path.Join(alloc.DataDir, instanceID)) + olapConfig["pool_size"] = "8" embedCatalog = true ingestionLimit = 0 - - olapDriver = "duckdb" - olapDSN = fmt.Sprintf("%s.db?rill_pool_size=8", path.Join(alloc.DataDir, instanceID)) + default: + olapConfig["dsn"] = opts.ProdOLAPDSN + embedCatalog = false + ingestionLimit = 0 } // Open a runtime client @@ -109,7 +114,7 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp { Name: "olap", Type: olapDriver, - Config: map[string]string{"dsn": olapDSN}, + Config: olapConfig, }, { Name: "repo", diff --git a/cli/pkg/local/app.go b/cli/pkg/local/app.go index 343d1ce3ed1..593e35ff1ba 100644 --- a/cli/pkg/local/app.go +++ b/cli/pkg/local/app.go @@ -126,6 +126,13 @@ func NewApp(ctx context.Context, ver config.Version, verbose, reset bool, olapDr return nil, fmt.Errorf("failed to clean OLAP: %w", err) } } + + // Set default DuckDB pool size to 4 + olapCfg := map[string]string{"dsn": olapDSN} + if olapDriver == "duckdb" { + olapCfg["pool_size"] = "4" + } + // Create instance with its repo set to the project directory inst := &drivers.Instance{ ID: DefaultInstanceID, @@ -143,7 +150,7 @@ func NewApp(ctx context.Context, ver config.Version, verbose, reset bool, olapDr { Type: olapDriver, Name: "olap", - Config: map[string]string{"dsn": olapDSN}, + Config: olapCfg, }, }, } diff --git a/runtime/drivers/drivers_test.go b/runtime/drivers/drivers_test.go index befb425381c..e83a940ba40 100644 --- a/runtime/drivers/drivers_test.go +++ b/runtime/drivers/drivers_test.go @@ -20,7 +20,7 @@ import ( // This should be the only "real" test in the package. Other tests should be added // as subtests of TestAll. func TestAll(t *testing.T) { - var matrix = []func(t *testing.T, fn func(driver string, shared bool, dsn string)) error{ + var matrix = []func(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error{ withDuckDB, withFile, withPostgres, @@ -29,9 +29,9 @@ func TestAll(t *testing.T) { } for _, withDriver := range matrix { - err := withDriver(t, func(driver string, shared bool, dsn string) { + err := withDriver(t, func(driver string, shared bool, cfg map[string]any) { // Open - conn, err := drivers.Open(driver, map[string]any{"dsn": dsn}, shared, activity.NewNoopClient(), zap.NewNop()) + conn, err := drivers.Open(driver, cfg, shared, activity.NewNoopClient(), zap.NewNop()) require.NoError(t, err) require.NotNil(t, conn) @@ -63,26 +63,26 @@ func TestAll(t *testing.T) { } } -func withDuckDB(t *testing.T, fn func(driver string, shared bool, dsn string)) error { - fn("duckdb", false, "?access_mode=read_write&rill_pool_size=4") +func withDuckDB(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error { + fn("duckdb", false, map[string]any{"dsn": "?access_mode=read_write", "pool_size": 4}) return nil } -func withFile(t *testing.T, fn func(driver string, shared bool, dsn string)) error { +func withFile(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error { dsn := t.TempDir() - fn("file", false, dsn) + fn("file", false, map[string]any{"dsn": dsn}) return nil } -func withPostgres(t *testing.T, fn func(driver string, shared bool, dsn string)) error { +func withPostgres(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error { pg := pgtestcontainer.New(t) defer pg.Terminate(t) - fn("postgres", true, pg.DatabaseURL) + fn("postgres", true, map[string]any{"dsn": pg.DatabaseURL}) return nil } -func withSQLite(t *testing.T, fn func(driver string, shared bool, dsn string)) error { - fn("sqlite", true, ":memory:") +func withSQLite(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error { + fn("sqlite", true, map[string]any{"dsn": ":memory:"}) return nil } diff --git a/runtime/drivers/duckdb/config.go b/runtime/drivers/duckdb/config.go index 89fd2c72004..9760be34f4f 100644 --- a/runtime/drivers/duckdb/config.go +++ b/runtime/drivers/duckdb/config.go @@ -5,27 +5,30 @@ import ( "net/url" "strconv" - "github.com/rilldata/rill/runtime/pkg/activity" + "github.com/mitchellh/mapstructure" ) -const poolSizeKey = "rill_pool_size" - -// config represents the Driver config, extracted from the DSN +// config represents the DuckDB driver config type config struct { - // DSN for DuckDB - DSN string + // DSN is the connection string + DSN string `mapstructure:"dsn"` // PoolSize is the number of concurrent connections and queries allowed - PoolSize int - // DBFilePath is the path where database is stored - DBFilePath string - // Activity client - Activity activity.Client + PoolSize int `mapstructure:"pool_size"` + // DBFilePath is the path where the database is stored. It is inferred from the DSN (can't be provided by user). + DBFilePath string `mapstructure:"-"` } -// activityDims and client are allowed to be nil, in this case DuckDB stats are not emitted -func newConfig(dsn string, client activity.Client) (*config, error) { +func newConfig(cfgMap map[string]any) (*config, error) { + cfg := &config{ + PoolSize: 1, // Default value + } + err := mapstructure.WeakDecode(cfgMap, cfg) + if err != nil { + return nil, fmt.Errorf("could not decode config: %w", err) + } + // Parse DSN as URL - uri, err := url.Parse(dsn) + uri, err := url.Parse(cfg.DSN) if err != nil { return nil, fmt.Errorf("could not parse dsn: %w", err) } @@ -34,31 +37,29 @@ func newConfig(dsn string, client activity.Client) (*config, error) { return nil, fmt.Errorf("could not parse dsn: %w", err) } - // If poolSizeKey is in the DSN, parse and remove it - poolSize := 1 - if qry.Has(poolSizeKey) { + // Infer DBFilePath + cfg.DBFilePath = uri.Path + + // We also support overriding the pool size via the DSN by setting "rill_pool_size" as a query argument. + if qry.Has("rill_pool_size") { // Parse as integer - poolSize, err = strconv.Atoi(qry.Get(poolSizeKey)) + cfg.PoolSize, err = strconv.Atoi(qry.Get("rill_pool_size")) if err != nil { - return nil, fmt.Errorf("duckdb Driver: %s is not an integer", poolSizeKey) + return nil, fmt.Errorf("could not parse dsn: 'rill_pool_size' is not an integer") } + // Remove from query string (so not passed into DuckDB config) - qry.Del(poolSizeKey) - } - if poolSize < 1 { - return nil, fmt.Errorf("%s must be >= 1", poolSizeKey) - } + qry.Del("rill_pool_size") - // Rebuild DuckDB DSN (which should be "path?key=val&...") - uri.RawQuery = qry.Encode() - dsn = uri.String() + // Rebuild DuckDB DSN (which should be "path?key=val&...") + uri.RawQuery = qry.Encode() + cfg.DSN = uri.String() + } - // Return config - cfg := &config{ - DSN: dsn, - PoolSize: poolSize, - DBFilePath: uri.Path, - Activity: client, + // Check pool size + if cfg.PoolSize < 1 { + return nil, fmt.Errorf("duckdb pool size must be >= 1") } + return cfg, nil } diff --git a/runtime/drivers/duckdb/config_test.go b/runtime/drivers/duckdb/config_test.go index 25810a4029d..633cd39154a 100644 --- a/runtime/drivers/duckdb/config_test.go +++ b/runtime/drivers/duckdb/config_test.go @@ -3,50 +3,58 @@ package duckdb import ( "testing" - "github.com/rilldata/rill/runtime/pkg/activity" "github.com/stretchr/testify/require" ) func TestConfig(t *testing.T) { - cfg, err := newConfig("", nil) + cfg, err := newConfig(map[string]any{}) require.NoError(t, err) require.Equal(t, "", cfg.DSN) require.Equal(t, 1, cfg.PoolSize) - cfg, err = newConfig("path/to/duck.db", nil) + cfg, err = newConfig(map[string]any{"dsn": "path/to/duck.db"}) require.NoError(t, err) require.Equal(t, "path/to/duck.db", cfg.DSN) require.Equal(t, "path/to/duck.db", cfg.DBFilePath) require.Equal(t, 1, cfg.PoolSize) - cfg, err = newConfig("path/to/duck.db?rill_pool_size=10", nil) + cfg, err = newConfig(map[string]any{"dsn": "path/to/duck.db", "pool_size": 10}) require.NoError(t, err) require.Equal(t, "path/to/duck.db", cfg.DSN) require.Equal(t, "path/to/duck.db", cfg.DBFilePath) require.Equal(t, 10, cfg.PoolSize) - cfg, err = newConfig("path/to/duck.db?rill_pool_size=10&hello=world", nil) + cfg, err = newConfig(map[string]any{"dsn": "path/to/duck.db", "pool_size": "10"}) require.NoError(t, err) - require.Equal(t, "path/to/duck.db?hello=world", cfg.DSN) + require.Equal(t, 10, cfg.PoolSize) + + cfg, err = newConfig(map[string]any{"dsn": "path/to/duck.db?rill_pool_size=4", "pool_size": "10"}) + require.NoError(t, err) + require.Equal(t, 4, cfg.PoolSize) + + cfg, err = newConfig(map[string]any{"dsn": "path/to/duck.db?rill_pool_size=10"}) + require.NoError(t, err) + require.Equal(t, "path/to/duck.db", cfg.DSN) + require.Equal(t, "path/to/duck.db", cfg.DBFilePath) + require.Equal(t, 10, cfg.PoolSize) + + cfg, err = newConfig(map[string]any{"dsn": "path/to/duck.db?max_memory=4GB&rill_pool_size=10"}) + require.NoError(t, err) + require.Equal(t, "path/to/duck.db?max_memory=4GB", cfg.DSN) require.Equal(t, 10, cfg.PoolSize) require.Equal(t, "path/to/duck.db", cfg.DBFilePath) - _, err = newConfig("path/to/duck.db?rill_pool_size=abc&hello=world", nil) + _, err = newConfig(map[string]any{"dsn": "path/to/duck.db?max_memory=4GB", "pool_size": "abc"}) require.Error(t, err) - _, err = newConfig("path/to/duck.db?rill_pool_size=0&hello=world", nil) + _, err = newConfig(map[string]any{"dsn": "path/to/duck.db?max_memory=4GB", "pool_size": 0}) require.Error(t, err) - cfg, err = newConfig("duck.db", nil) + cfg, err = newConfig(map[string]any{"dsn": "duck.db"}) require.NoError(t, err) require.Equal(t, "duck.db", cfg.DBFilePath) - cfg, err = newConfig("duck.db?rill_pool_size=10", nil) + cfg, err = newConfig(map[string]any{"dsn": "duck.db?rill_pool_size=10"}) require.NoError(t, err) require.Equal(t, "duck.db", cfg.DBFilePath) - - client := activity.NewNoopClient() - cfg, err = newConfig("path/to/duck.db", client) - require.NoError(t, err) - require.Equal(t, client, cfg.Activity) } diff --git a/runtime/drivers/duckdb/duckdb.go b/runtime/drivers/duckdb/duckdb.go index e8b7bebbb25..6583566ecbb 100644 --- a/runtime/drivers/duckdb/duckdb.go +++ b/runtime/drivers/duckdb/duckdb.go @@ -55,16 +55,12 @@ type Driver struct { name string } -func (d Driver) Open(config map[string]any, shared bool, client activity.Client, logger *zap.Logger) (drivers.Handle, error) { +func (d Driver) Open(cfgMap map[string]any, shared bool, ac activity.Client, logger *zap.Logger) (drivers.Handle, error) { if shared { return nil, fmt.Errorf("duckdb driver can't be shared") } - dsn, ok := config["dsn"].(string) - if !ok { - return nil, fmt.Errorf("require dsn to open duckdb connection") - } - cfg, err := newConfig(dsn, client) + cfg, err := newConfig(cfgMap) if err != nil { return nil, err } @@ -79,10 +75,11 @@ func (d Driver) Open(config map[string]any, shared bool, client activity.Client, c := &connection{ config: cfg, logger: logger, + activity: ac, metaSem: semaphore.NewWeighted(1), olapSem: priorityqueue.NewSemaphore(olapSemSize), dbCond: sync.NewCond(&sync.Mutex{}), - driverConfig: config, + driverConfig: cfgMap, driverName: d.name, shared: shared, ctx: ctx, @@ -111,13 +108,8 @@ func (d Driver) Open(config map[string]any, shared bool, client activity.Client, return c, nil } -func (d Driver) Drop(config map[string]any, logger *zap.Logger) error { - dsn, ok := config["dsn"].(string) - if !ok { - return fmt.Errorf("require dsn to drop duckdb connection") - } - - cfg, err := newConfig(dsn, nil) +func (d Driver) Drop(cfgMap map[string]any, logger *zap.Logger) error { + cfg, err := newConfig(cfgMap) if err != nil { return err } @@ -148,8 +140,9 @@ type connection struct { driverConfig map[string]any driverName string // config is parsed configs - config *config - logger *zap.Logger + config *config + logger *zap.Logger + activity activity.Client // This driver may issue both OLAP and "meta" queries (like catalog info) against DuckDB. // Meta queries are usually fast, but OLAP queries may take a long time. To enable predictable parallel performance, // we gate queries with semaphores that limits the number of concurrent queries of each type. @@ -439,7 +432,7 @@ func (c *connection) checkErr(err error) error { // Periodically collects stats using pragma_database_size() and emits as activity events func (c *connection) periodicallyEmitStats(d time.Duration) { - if c.config.Activity == nil { + if c.activity == nil { // Activity client isn't set, there is no need to report stats return } @@ -473,37 +466,37 @@ func (c *connection) periodicallyEmitStats(d time.Duration) { if err != nil { c.logger.Error("couldn't convert duckdb size to bytes", zap.Error(err)) } else { - c.config.Activity.Emit(c.ctx, "duckdb_size_bytes", dbSize, commonDims...) + c.activity.Emit(c.ctx, "duckdb_size_bytes", dbSize, commonDims...) } walSize, err := humanReadableSizeToBytes(stat.WalSize) if err != nil { c.logger.Error("couldn't convert duckdb wal size to bytes", zap.Error(err)) } else { - c.config.Activity.Emit(c.ctx, "duckdb_wal_size_bytes", walSize, commonDims...) + c.activity.Emit(c.ctx, "duckdb_wal_size_bytes", walSize, commonDims...) } memoryUsage, err := humanReadableSizeToBytes(stat.MemoryUsage) if err != nil { c.logger.Error("couldn't convert duckdb memory usage to bytes", zap.Error(err)) } else { - c.config.Activity.Emit(c.ctx, "duckdb_memory_usage_bytes", memoryUsage, commonDims...) + c.activity.Emit(c.ctx, "duckdb_memory_usage_bytes", memoryUsage, commonDims...) } memoryLimit, err := humanReadableSizeToBytes(stat.MemoryLimit) if err != nil { c.logger.Error("couldn't convert duckdb memory limit to bytes", zap.Error(err)) } else { - c.config.Activity.Emit(c.ctx, "duckdb_memory_limit_bytes", memoryLimit, commonDims...) + c.activity.Emit(c.ctx, "duckdb_memory_limit_bytes", memoryLimit, commonDims...) } - c.config.Activity.Emit(c.ctx, "duckdb_block_size_bytes", float64(stat.BlockSize), commonDims...) - c.config.Activity.Emit(c.ctx, "duckdb_total_blocks", float64(stat.TotalBlocks), commonDims...) - c.config.Activity.Emit(c.ctx, "duckdb_free_blocks", float64(stat.FreeBlocks), commonDims...) - c.config.Activity.Emit(c.ctx, "duckdb_used_blocks", float64(stat.UsedBlocks), commonDims...) + c.activity.Emit(c.ctx, "duckdb_block_size_bytes", float64(stat.BlockSize), commonDims...) + c.activity.Emit(c.ctx, "duckdb_total_blocks", float64(stat.TotalBlocks), commonDims...) + c.activity.Emit(c.ctx, "duckdb_free_blocks", float64(stat.FreeBlocks), commonDims...) + c.activity.Emit(c.ctx, "duckdb_used_blocks", float64(stat.UsedBlocks), commonDims...) estimatedDBSize, _ := c.EstimateSize() - c.config.Activity.Emit(c.ctx, "duckdb_estimated_size_bytes", float64(estimatedDBSize)) + c.activity.Emit(c.ctx, "duckdb_estimated_size_bytes", float64(estimatedDBSize)) case <-c.ctx.Done(): statTicker.Stop() diff --git a/runtime/drivers/duckdb/duckdb_test.go b/runtime/drivers/duckdb/duckdb_test.go index f2512d6e5f6..8828bc302cf 100644 --- a/runtime/drivers/duckdb/duckdb_test.go +++ b/runtime/drivers/duckdb/duckdb_test.go @@ -17,9 +17,9 @@ import ( func TestOpenDrop(t *testing.T) { path := filepath.Join(t.TempDir(), "tmp.db") walpath := path + ".wal" - dsn := path + "?rill_pool_size=2" + dsn := path - handle, err := Driver{}.Open(map[string]any{"dsn": dsn}, false, activity.NewNoopClient(), zap.NewNop()) + handle, err := Driver{}.Open(map[string]any{"dsn": dsn, "pool_size": 2}, false, activity.NewNoopClient(), zap.NewNop()) require.NoError(t, err) olap, ok := handle.AsOLAP("") @@ -42,10 +42,9 @@ func TestOpenDrop(t *testing.T) { func TestFatalErr(t *testing.T) { // NOTE: Using this issue to create a fatal error: https://github.com/duckdb/duckdb/issues/7905 - path := filepath.Join(t.TempDir(), "tmp.db") - dsn := path + "?rill_pool_size=2" + dsn := filepath.Join(t.TempDir(), "tmp.db") - handle, err := Driver{}.Open(map[string]any{"dsn": dsn}, false, activity.NewNoopClient(), zap.NewNop()) + handle, err := Driver{}.Open(map[string]any{"dsn": dsn, "pool_size": 2}, false, activity.NewNoopClient(), zap.NewNop()) require.NoError(t, err) olap, ok := handle.AsOLAP("") @@ -105,10 +104,9 @@ func TestFatalErr(t *testing.T) { func TestFatalErrConcurrent(t *testing.T) { // NOTE: Using this issue to create a fatal error: https://github.com/duckdb/duckdb/issues/7905 - path := filepath.Join(t.TempDir(), "tmp.db") - dsn := path + "?rill_pool_size=3" + dsn := filepath.Join(t.TempDir(), "tmp.db") - handle, err := Driver{}.Open(map[string]any{"dsn": dsn}, false, activity.NewNoopClient(), zap.NewNop()) + handle, err := Driver{}.Open(map[string]any{"dsn": dsn, "pool_size": 3}, false, activity.NewNoopClient(), zap.NewNop()) require.NoError(t, err) olap, ok := handle.AsOLAP("") @@ -185,7 +183,7 @@ func TestFatalErrConcurrent(t *testing.T) { wg.Done() }() - // Func 3 acquires conn after 250ms and runs query immediately. It will be enqueued (because the OLAP conns limit is rill_pool_size-1 = 2). + // Func 3 acquires conn after 250ms and runs query immediately. It will be enqueued (because the OLAP conns limit is pool_size-1 = 2). // By the time it's dequeued, the DB will have been invalidated, and it will wait for the reopen before returning a conn. So the query should succeed. wg.Add(1) var err3 error diff --git a/runtime/drivers/duckdb/olap_test.go b/runtime/drivers/duckdb/olap_test.go index 12be8a3f824..485efb7cb73 100644 --- a/runtime/drivers/duckdb/olap_test.go +++ b/runtime/drivers/duckdb/olap_test.go @@ -204,7 +204,7 @@ func TestClose(t *testing.T) { } func prepareConn(t *testing.T) drivers.Handle { - conn, err := Driver{}.Open(map[string]any{"dsn": "?access_mode=read_write&rill_pool_size=4"}, false, activity.NewNoopClient(), zap.NewNop()) + conn, err := Driver{}.Open(map[string]any{"dsn": "?access_mode=read_write", "pool_size": 4}, false, activity.NewNoopClient(), zap.NewNop()) require.NoError(t, err) olap, ok := conn.AsOLAP("") diff --git a/runtime/server/queries_test.go b/runtime/server/queries_test.go index 2e62e75026c..edc41bb40b4 100644 --- a/runtime/server/queries_test.go +++ b/runtime/server/queries_test.go @@ -134,7 +134,7 @@ func TestServer_UpdateLimit_UNION(t *testing.T) { } func prepareOLAPStore(t *testing.T) drivers.OLAPStore { - conn, err := drivers.Open("duckdb", map[string]any{"dsn": "?access_mode=read_write&rill_pool_size=4"}, false, activity.NewNoopClient(), zap.NewNop()) + conn, err := drivers.Open("duckdb", map[string]any{"dsn": "?access_mode=read_write", "pool_size": 4}, false, activity.NewNoopClient(), zap.NewNop()) require.NoError(t, err) olap, ok := conn.AsOLAP("") require.True(t, ok)