Skip to content

Commit

Permalink
feat : data bucket for persisting data to gcs (#6170)
Browse files Browse the repository at this point in the history
* data bucket for persisting data to gcs

* test fix

* also prefix with driver

* merge with main

* close bucket plus directory prefix

* bucket is closed when prefixed so need to open new data bucket all times

* add a storage client and remove preset data_dir

* lint fixes

* small refactor

* Apply suggestions from code review

Co-authored-by: Benjamin Egelund-Müller <[email protected]>

* name in connection cache

* fix build errors

* missing withprefix

* storage APIs also create directories

* fix and add unit test

* use unique directory in temp directory

* fix setting tempDirPath

---------

Co-authored-by: Benjamin Egelund-Müller <[email protected]>
  • Loading branch information
k-anshul and begelundmuller authored Dec 4, 2024
1 parent a0db342 commit 0154424
Show file tree
Hide file tree
Showing 58 changed files with 575 additions and 231 deletions.
4 changes: 3 additions & 1 deletion cli/cmd/runtime/install_duckdb_extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package runtime

import (
"fmt"
"os"

"github.com/rilldata/rill/cli/pkg/cmdutil"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/storage"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
Expand All @@ -17,7 +19,7 @@ func InstallDuckDBExtensionsCmd(ch *cmdutil.Helper) *cobra.Command {
Use: "install-duckdb-extensions",
RunE: func(cmd *cobra.Command, args []string) error {
cfg := map[string]any{"dsn": ":memory:"} // In-memory
h, err := drivers.Open("duckdb", "default", cfg, activity.NewNoopClient(), zap.NewNop())
h, err := drivers.Open("duckdb", "default", cfg, storage.MustNew(os.TempDir(), nil), activity.NewNoopClient(), zap.NewNop())
if err != nil {
return fmt.Errorf("failed to open ephemeral duckdb: %w", err)
}
Expand Down
18 changes: 15 additions & 3 deletions cli/cmd/runtime/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/rilldata/rill/runtime/pkg/observability"
"github.com/rilldata/rill/runtime/pkg/ratelimit"
"github.com/rilldata/rill/runtime/server"
"github.com/rilldata/rill/runtime/storage"
"github.com/spf13/cobra"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -85,6 +86,9 @@ type Config struct {
// DataDir stores data for all instances like duckdb file, temporary downloaded file etc.
// The data for each instance is stored in a child directory named instance_id
DataDir string `split_words:"true"`
// DataBucket is a common GCS bucket to store data for all instances. This data is expected to be persisted across resets.
DataBucket string `split_words:"true"`
DataBucketCredentialsJSON string `split_words:"true"`
// Sink type of activity client: noop (or empty string), kafka
ActivitySinkType string `default:"" split_words:"true"`
// Kafka brokers of an activity client's sink
Expand Down Expand Up @@ -195,9 +199,18 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command {
activityClient = activityClient.WithIsDev()
}

// storage client
bucketConfig := map[string]interface{}{
"bucket": conf.DataBucket,
"google_application_credentials_json": conf.DataBucketCredentialsJSON,
}
storage, err := storage.New(conf.DataDir, bucketConfig)
if err != nil {
logger.Fatal("error: could not create storage client", zap.Error(err))
}

// Create ctx that cancels on termination signals
ctx := graceful.WithCancelOnTerminate(context.Background())

// Init runtime
opts := &runtime.Options{
ConnectionCacheSize: conf.ConnectionCacheSize,
Expand All @@ -207,7 +220,6 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command {
ControllerLogBufferCapacity: conf.LogBufferCapacity,
ControllerLogBufferSizeBytes: conf.LogBufferSizeBytes,
AllowHostAccess: conf.AllowHostAccess,
DataDir: conf.DataDir,
SystemConnectors: []*runtimev1.Connector{
{
Type: conf.MetastoreDriver,
Expand All @@ -216,7 +228,7 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command {
},
},
}
rt, err := runtime.New(ctx, opts, logger, activityClient, emailClient)
rt, err := runtime.New(ctx, opts, logger, storage, activityClient, emailClient)
if err != nil {
logger.Fatal("error: could not create runtime", zap.Error(err))
}
Expand Down
4 changes: 3 additions & 1 deletion cli/pkg/cmdutil/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package cmdutil

import (
"context"
"os"

"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/storage"
"go.uber.org/zap"

// Ensure file driver is loaded
Expand All @@ -14,7 +16,7 @@ import (
// RepoForProjectPath creates an ad-hoc drivers.RepoStore for a local project file path
func RepoForProjectPath(path string) (drivers.RepoStore, string, error) {
instanceID := "default"
repoHandle, err := drivers.Open("file", instanceID, map[string]any{"dsn": path}, activity.NewNoopClient(), zap.NewNop())
repoHandle, err := drivers.Open("file", instanceID, map[string]any{"dsn": path}, storage.MustNew(os.TempDir(), nil), activity.NewNoopClient(), zap.NewNop())
if err != nil {
return nil, "", err
}
Expand Down
9 changes: 6 additions & 3 deletions cli/pkg/local/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/rilldata/rill/runtime/pkg/observability"
"github.com/rilldata/rill/runtime/pkg/ratelimit"
runtimeserver "github.com/rilldata/rill/runtime/server"
"github.com/rilldata/rill/runtime/storage"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
"go.uber.org/zap/buffer"
Expand Down Expand Up @@ -156,19 +157,21 @@ func NewApp(ctx context.Context, opts *AppOptions) (*App, error) {
// if err != nil {
// return nil, fmt.Errorf("failed to create email sender: %w", err)
// }

rtOpts := &runtime.Options{
ConnectionCacheSize: 100,
MetastoreConnector: "metastore",
QueryCacheSizeBytes: int64(datasize.MB * 100),
AllowHostAccess: true,
DataDir: dbDirPath,
SystemConnectors: systemConnectors,
SecurityEngineCacheSize: 1000,
ControllerLogBufferCapacity: 10000,
ControllerLogBufferSizeBytes: int64(datasize.MB * 16),
}
rt, err := runtime.New(ctx, rtOpts, logger, opts.Ch.Telemetry(ctx), email.New(sender))
st, err := storage.New(dbDirPath, nil)
if err != nil {
return nil, err
}
rt, err := runtime.New(ctx, rtOpts, logger, st, opts.Ch.Telemetry(ctx), email.New(sender))
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion runtime/compilers/rillv1/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/storage"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -2057,7 +2058,7 @@ func requireResourcesAndErrors(t testing.TB, p *Parser, wantResources []*Resourc

func makeRepo(t testing.TB, files map[string]string) drivers.RepoStore {
root := t.TempDir()
handle, err := drivers.Open("file", "default", map[string]any{"dsn": root}, activity.NewNoopClient(), zap.NewNop())
handle, err := drivers.Open("file", "default", map[string]any{"dsn": root}, storage.MustNew(root, nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

repo, ok := handle.AsRepoStore("")
Expand Down
15 changes: 7 additions & 8 deletions runtime/connection_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (

type cachedConnectionConfig struct {
instanceID string // Empty if connection is shared
name string
driver string
config map[string]any
}
Expand Down Expand Up @@ -66,13 +67,7 @@ func (r *Runtime) newConnectionCache() conncache.Cache {

// getConnection returns a cached connection for the given driver configuration.
// If instanceID is empty, the connection is considered shared (see drivers.Open for details).
func (r *Runtime) getConnection(ctx context.Context, instanceID, driver string, config map[string]any) (drivers.Handle, func(), error) {
cfg := cachedConnectionConfig{
instanceID: instanceID,
driver: driver,
config: config,
}

func (r *Runtime) getConnection(ctx context.Context, cfg cachedConnectionConfig) (drivers.Handle, func(), error) {
handle, release, err := r.connCache.Acquire(ctx, cfg)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -110,7 +105,7 @@ func (r *Runtime) openAndMigrate(ctx context.Context, cfg cachedConnectionConfig
}
}

handle, err := drivers.Open(cfg.driver, cfg.instanceID, cfg.config, activityClient, logger)
handle, err := drivers.Open(cfg.driver, cfg.instanceID, cfg.config, r.storage.WithPrefix(cfg.instanceID, cfg.name), activityClient, logger)
if err == nil && ctx.Err() != nil {
err = fmt.Errorf("timed out while opening driver %q", cfg.driver)
}
Expand All @@ -132,7 +127,11 @@ func (r *Runtime) openAndMigrate(ctx context.Context, cfg cachedConnectionConfig
func generateKey(cfg cachedConnectionConfig) string {
sb := strings.Builder{}
sb.WriteString(cfg.instanceID) // Empty if cfg.shared
sb.WriteString(":")
sb.WriteString(cfg.name)
sb.WriteString(":")
sb.WriteString(cfg.driver)
sb.WriteString(":")
keys := maps.Keys(cfg.config)
slices.Sort(keys)
for _, key := range keys {
Expand Down
19 changes: 12 additions & 7 deletions runtime/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ func (r *Runtime) AcquireSystemHandle(ctx context.Context, connector string) (dr
cfg[strings.ToLower(k)] = v
}
cfg["allow_host_access"] = r.opts.AllowHostAccess
return r.getConnection(ctx, "", c.Type, cfg)
return r.getConnection(ctx, cachedConnectionConfig{
instanceID: "",
name: connector,
driver: c.Type,
config: cfg,
})
}
}
return nil, nil, fmt.Errorf("connector %s doesn't exist", connector)
Expand All @@ -41,7 +46,12 @@ func (r *Runtime) AcquireHandle(ctx context.Context, instanceID, connector strin
// So we take this moment to make sure the ctx gets checked for cancellation at least every once in a while.
return nil, nil, ctx.Err()
}
return r.getConnection(ctx, instanceID, cfg.Driver, cfg.Resolve())
return r.getConnection(ctx, cachedConnectionConfig{
instanceID: instanceID,
name: connector,
driver: cfg.Driver,
config: cfg.Resolve(),
})
}

func (r *Runtime) Repo(ctx context.Context, instanceID string) (drivers.RepoStore, func(), error) {
Expand Down Expand Up @@ -279,11 +289,6 @@ func (r *Runtime) ConnectorConfig(ctx context.Context, instanceID, name string)

// Apply built-in system-wide config
res.setPreset("allow_host_access", strconv.FormatBool(r.opts.AllowHostAccess), true)
// data_dir stores persistent data
res.setPreset("data_dir", r.DataDir(instanceID, name), true)
// temp_dir stores temporary data. The logic that creates any temporary file here should also delete them.
// The contents will also be deleted on runtime restarts.
res.setPreset("temp_dir", r.TempDir(instanceID), true)

// Done
return res, nil
Expand Down
19 changes: 14 additions & 5 deletions runtime/drivers/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/pkg/archive"
"github.com/rilldata/rill/runtime/pkg/ctxsync"
"github.com/rilldata/rill/runtime/storage"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
Expand Down Expand Up @@ -60,10 +61,9 @@ type configProperties struct {
AccessToken string `mapstructure:"access_token"`
ProjectID string `mapstructure:"project_id"`
Branch string `mapstructure:"branch"`
TempDir string `mapstructure:"temp_dir"`
}

func (d driver) Open(instanceID string, config map[string]any, ac *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
func (d driver) Open(instanceID string, config map[string]any, st *storage.Client, ac *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if instanceID == "" {
return nil, errors.New("admin driver can't be shared")
}
Expand Down Expand Up @@ -105,6 +105,7 @@ func (d driver) TertiarySourceConnectors(ctx context.Context, src map[string]any
type Handle struct {
config *configProperties
logger *zap.Logger
storage *storage.Client
admin *client.Client
repoMu ctxsync.RWMutex
repoSF *singleflight.Group
Expand Down Expand Up @@ -391,7 +392,7 @@ func (h *Handle) checkHandshake(ctx context.Context) error {
}

if h.repoPath == "" {
h.repoPath, err = os.MkdirTemp(h.config.TempDir, "admin_driver_repo")
h.repoPath, err = h.storage.RandomTempDir("admin_driver_repo")
if err != nil {
return err
}
Expand Down Expand Up @@ -577,7 +578,11 @@ func (h *Handle) stashVirtual() error {
return nil
}

dst, err := generateTmpPath(h.config.TempDir, "admin_driver_virtual_stash", "")
tempPath, err := h.storage.TempDir()
if err != nil {
return fmt.Errorf("stash virtual: %w", err)
}
dst, err := generateTmpPath(tempPath, "admin_driver_virtual_stash", "")
if err != nil {
return fmt.Errorf("stash virtual: %w", err)
}
Expand Down Expand Up @@ -622,7 +627,11 @@ func (h *Handle) download() error {
defer cancel()

// generate a temporary file to copy repo tar directory
downloadDst, err := generateTmpPath(h.config.TempDir, "admin_driver_zipped_repo", ".tar.gz")
tempPath, err := h.storage.TempDir()
if err != nil {
return fmt.Errorf("download: %w", err)
}
downloadDst, err := generateTmpPath(tempPath, "admin_driver_zipped_repo", ".tar.gz")
if err != nil {
return fmt.Errorf("download: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion runtime/drivers/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/storage"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -85,7 +86,7 @@ type configProperties struct {
AllowHostAccess bool `mapstructure:"allow_host_access"`
}

func (d driver) Open(instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
func (d driver) Open(instanceID string, config map[string]any, st *storage.Client, ac *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if instanceID == "" {
return nil, errors.New("athena driver can't be shared")
}
Expand Down
14 changes: 8 additions & 6 deletions runtime/drivers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/storage"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -79,10 +80,9 @@ type configProperties struct {
SASToken string `mapstructure:"azure_storage_sas_token"`
ConnectionString string `mapstructure:"azure_storage_connection_string"`
AllowHostAccess bool `mapstructure:"allow_host_access"`
TempDir string `mapstructure:"temp_dir"`
}

func (d driver) Open(instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
func (d driver) Open(instanceID string, config map[string]any, st *storage.Client, ac *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if instanceID == "" {
return nil, errors.New("azure driver can't be shared")
}
Expand All @@ -94,8 +94,9 @@ func (d driver) Open(instanceID string, config map[string]any, client *activity.
}

conn := &Connection{
config: conf,
logger: logger,
config: conf,
storage: st,
logger: logger,
}
return conn, nil
}
Expand Down Expand Up @@ -129,8 +130,9 @@ func (d driver) TertiarySourceConnectors(ctx context.Context, src map[string]any
}

type Connection struct {
config *configProperties
logger *zap.Logger
config *configProperties
storage *storage.Client
logger *zap.Logger
}

var _ drivers.Handle = &Connection{}
Expand Down
7 changes: 6 additions & 1 deletion runtime/drivers/azure/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ func (c *Connection) DownloadFiles(ctx context.Context, props map[string]any) (d
}
}

tempDir, err := c.storage.TempDir()
if err != nil {
return nil, err
}

// prepare fetch configs
opts := rillblob.Options{
GlobMaxTotalSize: conf.GlobMaxTotalSize,
Expand All @@ -125,7 +130,7 @@ func (c *Connection) DownloadFiles(ctx context.Context, props map[string]any) (d
ExtractPolicy: conf.extractPolicy,
BatchSizeBytes: int64(batchSize.Bytes()),
KeepFilesUntilClose: conf.BatchSize == "-1",
TempDir: c.config.TempDir,
TempDir: tempDir,
}

iter, err := rillblob.NewIterator(ctx, bucketObj, opts, c.logger)
Expand Down
Loading

0 comments on commit 0154424

Please sign in to comment.