Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into sort-by-context-columns
Browse files Browse the repository at this point in the history
  • Loading branch information
bcolloran committed Sep 8, 2023
2 parents 5481f0b + 9461ca7 commit da7bbc9
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 123 deletions.
29 changes: 17 additions & 12 deletions admin/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"path"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -57,36 +58,40 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp
// Build instance config
instanceID := strings.ReplaceAll(uuid.New().String(), "-", "")
olapDriver := opts.ProdOLAPDriver
olapDSN := opts.ProdOLAPDSN
olapConfig := map[string]string{}
var embedCatalog bool
var ingestionLimit int64
if olapDriver == "duckdb" {
if olapDSN != "" {
switch olapDriver {
case "duckdb":
if opts.ProdOLAPDSN != "" {
return nil, fmt.Errorf("passing a DSN is not allowed for driver 'duckdb'")
}
if opts.ProdSlots == 0 {
return nil, fmt.Errorf("slot count can't be 0 for driver 'duckdb'")
}

olapConfig["dsn"] = fmt.Sprintf("%s.db?max_memory=%dGB", path.Join(alloc.DataDir, instanceID), alloc.MemoryGB)
olapConfig["pool_size"] = strconv.Itoa(alloc.CPU)
embedCatalog = true
ingestionLimit = alloc.StorageBytes

olapDSN = fmt.Sprintf("%s.db?rill_pool_size=%d&max_memory=%dGB", path.Join(alloc.DataDir, instanceID), alloc.CPU, alloc.MemoryGB)
} else if olapDriver == "duckdb-vip" {
if olapDSN != "" {
case "duckdb-vip":
if opts.ProdOLAPDSN != "" {
return nil, fmt.Errorf("passing a DSN is not allowed for driver 'duckdb-vip'")
}
if opts.ProdSlots == 0 {
return nil, fmt.Errorf("slot count can't be 0 for driver 'duckdb-vip'")
}

// NOTE: Rewriting to a "duckdb" driver without CPU, memory, or storage limits

olapDriver = "duckdb"
olapConfig["dsn"] = fmt.Sprintf("%s.db", path.Join(alloc.DataDir, instanceID))
olapConfig["pool_size"] = "8"
embedCatalog = true
ingestionLimit = 0

olapDriver = "duckdb"
olapDSN = fmt.Sprintf("%s.db?rill_pool_size=8", path.Join(alloc.DataDir, instanceID))
default:
olapConfig["dsn"] = opts.ProdOLAPDSN
embedCatalog = false
ingestionLimit = 0
}

// Open a runtime client
Expand All @@ -109,7 +114,7 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp
{
Name: "olap",
Type: olapDriver,
Config: map[string]string{"dsn": olapDSN},
Config: olapConfig,
},
{
Name: "repo",
Expand Down
9 changes: 8 additions & 1 deletion cli/pkg/local/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ func NewApp(ctx context.Context, ver config.Version, verbose, reset bool, olapDr
return nil, fmt.Errorf("failed to clean OLAP: %w", err)
}
}

// Set default DuckDB pool size to 4
olapCfg := map[string]string{"dsn": olapDSN}
if olapDriver == "duckdb" {
olapCfg["pool_size"] = "4"
}

// Create instance with its repo set to the project directory
inst := &drivers.Instance{
ID: DefaultInstanceID,
Expand All @@ -143,7 +150,7 @@ func NewApp(ctx context.Context, ver config.Version, verbose, reset bool, olapDr
{
Type: olapDriver,
Name: "olap",
Config: map[string]string{"dsn": olapDSN},
Config: olapCfg,
},
},
}
Expand Down
14 changes: 12 additions & 2 deletions runtime/drivers/bigquery/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import (
const defaultPageSize = 20

func (c *Connection) ListDatasets(ctx context.Context, req *runtimev1.BigQueryListDatasetsRequest) ([]string, string, error) {
client, err := c.createClient(ctx, &sourceProperties{ProjectID: bigquery.DetectProjectID})
opts, err := c.clientOption(ctx)
if err != nil {
return nil, "", err
}

client, err := bigquery.NewClient(ctx, bigquery.DetectProjectID, opts...)
if err != nil {
return nil, "", err
}
Expand All @@ -36,7 +41,12 @@ func (c *Connection) ListDatasets(ctx context.Context, req *runtimev1.BigQueryLi
}

func (c *Connection) ListTables(ctx context.Context, req *runtimev1.BigQueryListTablesRequest) ([]string, string, error) {
client, err := c.createClient(ctx, &sourceProperties{ProjectID: bigquery.DetectProjectID})
opts, err := c.clientOption(ctx)
if err != nil {
return nil, "", err
}

client, err := bigquery.NewClient(ctx, bigquery.DetectProjectID, opts...)
if err != nil {
return nil, "", err
}
Expand Down
10 changes: 3 additions & 7 deletions runtime/drivers/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bigquery

import (
"context"
"errors"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -219,13 +218,10 @@ func parseSourceProperties(props map[string]any) (*sourceProperties, error) {
return conf, err
}

func (c *Connection) createClient(ctx context.Context, props *sourceProperties) (*bigquery.Client, error) {
func (c *Connection) clientOption(ctx context.Context) ([]option.ClientOption, error) {
creds, err := gcputil.Credentials(ctx, c.config.SecretJSON, c.config.AllowHostAccess)
if err != nil {
if !errors.Is(err, gcputil.ErrNoCredentials) {
return nil, err
}
return bigquery.NewClient(ctx, props.ProjectID)
return nil, err
}
return bigquery.NewClient(ctx, props.ProjectID, option.WithCredentials(creds))
return []option.ClientOption{option.WithCredentials(creds)}, nil
}
11 changes: 8 additions & 3 deletions runtime/drivers/bigquery/sql_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,20 @@ func (c *Connection) QueryAsFiles(ctx context.Context, props map[string]any, sql
return nil, err
}

client, err := c.createClient(ctx, srcProps)
opts, err := c.clientOption(ctx)
if err != nil {
return nil, err
}

client, err := bigquery.NewClient(ctx, srcProps.ProjectID, opts...)
if err != nil {
if strings.Contains(err.Error(), "unable to detect projectID") {
return nil, fmt.Errorf("projectID not detected in credentials. Please set `project_id` in source yaml")
}
return nil, fmt.Errorf("failed to create bigquery client: %w", err)
}

if err := client.EnableStorageReadClient(ctx); err != nil {
if err := client.EnableStorageReadClient(ctx, opts...); err != nil {
client.Close()
return nil, err
}
Expand All @@ -60,7 +65,7 @@ func (c *Connection) QueryAsFiles(ctx context.Context, props map[string]any, sql
// the query results are always cached in a temporary table that storage api can use
// there are some exceptions when results aren't cached
// so we also try without storage api
client, err = c.createClient(ctx, srcProps)
client, err = bigquery.NewClient(ctx, srcProps.ProjectID, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create bigquery client: %w", err)
}
Expand Down
22 changes: 11 additions & 11 deletions runtime/drivers/drivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// This should be the only "real" test in the package. Other tests should be added
// as subtests of TestAll.
func TestAll(t *testing.T) {
var matrix = []func(t *testing.T, fn func(driver string, shared bool, dsn string)) error{
var matrix = []func(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error{
withDuckDB,
withFile,
withPostgres,
Expand All @@ -29,9 +29,9 @@ func TestAll(t *testing.T) {
}

for _, withDriver := range matrix {
err := withDriver(t, func(driver string, shared bool, dsn string) {
err := withDriver(t, func(driver string, shared bool, cfg map[string]any) {
// Open
conn, err := drivers.Open(driver, map[string]any{"dsn": dsn}, shared, activity.NewNoopClient(), zap.NewNop())
conn, err := drivers.Open(driver, cfg, shared, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
require.NotNil(t, conn)

Expand Down Expand Up @@ -63,26 +63,26 @@ func TestAll(t *testing.T) {
}
}

func withDuckDB(t *testing.T, fn func(driver string, shared bool, dsn string)) error {
fn("duckdb", false, "?access_mode=read_write&rill_pool_size=4")
func withDuckDB(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error {
fn("duckdb", false, map[string]any{"dsn": "?access_mode=read_write", "pool_size": 4})
return nil
}

func withFile(t *testing.T, fn func(driver string, shared bool, dsn string)) error {
func withFile(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error {
dsn := t.TempDir()
fn("file", false, dsn)
fn("file", false, map[string]any{"dsn": dsn})
return nil
}

func withPostgres(t *testing.T, fn func(driver string, shared bool, dsn string)) error {
func withPostgres(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error {
pg := pgtestcontainer.New(t)
defer pg.Terminate(t)

fn("postgres", true, pg.DatabaseURL)
fn("postgres", true, map[string]any{"dsn": pg.DatabaseURL})
return nil
}

func withSQLite(t *testing.T, fn func(driver string, shared bool, dsn string)) error {
fn("sqlite", true, ":memory:")
func withSQLite(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error {
fn("sqlite", true, map[string]any{"dsn": ":memory:"})
return nil
}
67 changes: 34 additions & 33 deletions runtime/drivers/duckdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,30 @@ import (
"net/url"
"strconv"

"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/mitchellh/mapstructure"
)

const poolSizeKey = "rill_pool_size"

// config represents the Driver config, extracted from the DSN
// config represents the DuckDB driver config
type config struct {
// DSN for DuckDB
DSN string
// DSN is the connection string
DSN string `mapstructure:"dsn"`
// PoolSize is the number of concurrent connections and queries allowed
PoolSize int
// DBFilePath is the path where database is stored
DBFilePath string
// Activity client
Activity activity.Client
PoolSize int `mapstructure:"pool_size"`
// DBFilePath is the path where the database is stored. It is inferred from the DSN (can't be provided by user).
DBFilePath string `mapstructure:"-"`
}

// activityDims and client are allowed to be nil, in this case DuckDB stats are not emitted
func newConfig(dsn string, client activity.Client) (*config, error) {
func newConfig(cfgMap map[string]any) (*config, error) {
cfg := &config{
PoolSize: 1, // Default value
}
err := mapstructure.WeakDecode(cfgMap, cfg)
if err != nil {
return nil, fmt.Errorf("could not decode config: %w", err)
}

// Parse DSN as URL
uri, err := url.Parse(dsn)
uri, err := url.Parse(cfg.DSN)
if err != nil {
return nil, fmt.Errorf("could not parse dsn: %w", err)
}
Expand All @@ -34,31 +37,29 @@ func newConfig(dsn string, client activity.Client) (*config, error) {
return nil, fmt.Errorf("could not parse dsn: %w", err)
}

// If poolSizeKey is in the DSN, parse and remove it
poolSize := 1
if qry.Has(poolSizeKey) {
// Infer DBFilePath
cfg.DBFilePath = uri.Path

// We also support overriding the pool size via the DSN by setting "rill_pool_size" as a query argument.
if qry.Has("rill_pool_size") {
// Parse as integer
poolSize, err = strconv.Atoi(qry.Get(poolSizeKey))
cfg.PoolSize, err = strconv.Atoi(qry.Get("rill_pool_size"))
if err != nil {
return nil, fmt.Errorf("duckdb Driver: %s is not an integer", poolSizeKey)
return nil, fmt.Errorf("could not parse dsn: 'rill_pool_size' is not an integer")
}

// Remove from query string (so not passed into DuckDB config)
qry.Del(poolSizeKey)
}
if poolSize < 1 {
return nil, fmt.Errorf("%s must be >= 1", poolSizeKey)
}
qry.Del("rill_pool_size")

// Rebuild DuckDB DSN (which should be "path?key=val&...")
uri.RawQuery = qry.Encode()
dsn = uri.String()
// Rebuild DuckDB DSN (which should be "path?key=val&...")
uri.RawQuery = qry.Encode()
cfg.DSN = uri.String()
}

// Return config
cfg := &config{
DSN: dsn,
PoolSize: poolSize,
DBFilePath: uri.Path,
Activity: client,
// Check pool size
if cfg.PoolSize < 1 {
return nil, fmt.Errorf("duckdb pool size must be >= 1")
}

return cfg, nil
}
Loading

0 comments on commit da7bbc9

Please sign in to comment.