Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify connector source and sink props #3057

Merged
merged 5 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
487 changes: 155 additions & 332 deletions proto/gen/rill/runtime/v1/catalog.pb.go

Large diffs are not rendered by default.

139 changes: 0 additions & 139 deletions proto/gen/rill/runtime/v1/catalog.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 9 additions & 39 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2266,6 +2266,13 @@ paths:
tags:
- ConnectorService
definitions:
BucketExtractPolicyStrategy:
type: string
enum:
- STRATEGY_UNSPECIFIED
- STRATEGY_HEAD
- STRATEGY_TAIL
default: STRATEGY_UNSPECIFIED
ColumnTimeSeriesRequestBasicMeasure:
type: object
properties:
Expand Down Expand Up @@ -2487,33 +2494,6 @@ definitions:
items:
type: string
title: Dimension/measure level access condition
SourceExtractPolicy:
type: object
properties:
rowsStrategy:
$ref: '#/definitions/SourceExtractPolicyStrategy'
title: strategy for selecting rows in a file
rowsLimitBytes:
type: string
format: uint64
title: |-
could in future add: uint64 rows_limit = n;
limit on data fetched in bytes
filesStrategy:
$ref: '#/definitions/SourceExtractPolicyStrategy'
title: strategy for selecting files
filesLimit:
type: string
format: uint64
title: limit on number of files
title: Extract policy for glob connectors
SourceExtractPolicyStrategy:
type: string
enum:
- STRATEGY_UNSPECIFIED
- STRATEGY_HEAD
- STRATEGY_TAIL
default: STRATEGY_UNSPECIFIED
StructTypeField:
type: object
properties:
Expand Down Expand Up @@ -2618,22 +2598,15 @@ definitions:
type: object
properties:
rowsStrategy:
$ref: '#/definitions/v1BucketExtractPolicyStrategy'
$ref: '#/definitions/BucketExtractPolicyStrategy'
rowsLimitBytes:
type: string
format: uint64
filesStrategy:
$ref: '#/definitions/v1BucketExtractPolicyStrategy'
$ref: '#/definitions/BucketExtractPolicyStrategy'
filesLimit:
type: string
format: uint64
v1BucketExtractPolicyStrategy:
type: string
enum:
- STRATEGY_UNSPECIFIED
- STRATEGY_HEAD
- STRATEGY_TAIL
default: STRATEGY_UNSPECIFIED
v1BucketPlanner:
type: object
properties:
Expand Down Expand Up @@ -4526,9 +4499,6 @@ definitions:
schema:
$ref: '#/definitions/v1StructType'
title: Detected schema of the source
policy:
$ref: '#/definitions/SourceExtractPolicy'
title: extraction policy for the source
timeoutSeconds:
type: integer
format: int32
Expand Down
19 changes: 0 additions & 19 deletions proto/rill/runtime/v1/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,6 @@ message Source {
google.protobuf.Struct properties = 3;
// Detected schema of the source
StructType schema = 5;
// Extract policy for glob connectors
message ExtractPolicy {
enum Strategy {
STRATEGY_UNSPECIFIED = 0;
STRATEGY_HEAD = 1;
STRATEGY_TAIL = 2;
}
// strategy for selecting rows in a file
Strategy rows_strategy = 1;
// could in future add: uint64 rows_limit = n;
// limit on data fetched in bytes
uint64 rows_limit_bytes = 2;
// strategy for selecting files
Strategy files_strategy = 3;
// limit on number of files
uint64 files_limit = 4;
}
// extraction policy for the source
ExtractPolicy policy = 6;
// timeout for source ingestion in seconds
int32 timeout_seconds = 7;
}
Expand Down
29 changes: 1 addition & 28 deletions runtime/compilers/rillv1/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (p *Parser) AnalyzeConnectors(ctx context.Context) ([]*Connector, error) {
break
}
// Poll for anon access
res, _ := connector.HasAnonymousSourceAccess(ctx, driverSourceForAnonAccessCheck(driver, r.SourceSpec), zap.NewNop())
res, _ := connector.HasAnonymousSourceAccess(ctx, r.SourceSpec.Properties.AsMap(), zap.NewNop())
if !res {
anonAccess = false
break
Expand Down Expand Up @@ -106,29 +105,3 @@ func (p *Parser) connectorForName(name string) (string, drivers.Driver, error) {
}
return driver, connector, nil
}

func driverSourceForAnonAccessCheck(connector string, src *runtimev1.SourceSpec) drivers.Source {
props := src.Properties.AsMap()
switch connector {
case "s3":
return &drivers.BucketSource{
Properties: props,
}
case "gcs":
return &drivers.BucketSource{
Properties: props,
}
case "https":
return &drivers.FileSource{
Properties: props,
}
case "local_file":
return &drivers.FileSource{
Properties: props,
}
case "motherduck":
return &drivers.DatabaseSource{}
default:
return nil
}
}
32 changes: 1 addition & 31 deletions runtime/compilers/rillv1beta/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func ExtractConnectors(ctx context.Context, projectPath string) ([]*Connector, e
}
// ignoring error since failure to resolve this should not break the deployment flow
// this can fail under cases such as full or host/bucket of URI is a variable
access, _ := connector.HasAnonymousSourceAccess(ctx, source(src.Connector, src), zap.NewNop())
access, _ := connector.HasAnonymousSourceAccess(ctx, src.Properties.AsMap(), zap.NewNop())
c := key{Name: src.Connector, Type: src.Connector, AnonymousAccess: access}
srcs, ok := connectorMap[c]
if !ok {
Expand Down Expand Up @@ -158,33 +158,3 @@ type key struct {
Type string
AnonymousAccess bool
}

func source(connector string, src *runtimev1.Source) drivers.Source {
props := src.Properties.AsMap()
switch connector {
case "s3":
return &drivers.BucketSource{
Properties: props,
}
case "gcs":
return &drivers.BucketSource{
Properties: props,
}
case "https":
return &drivers.FileSource{
Properties: props,
}
case "local_file":
return &drivers.FileSource{
Properties: props,
}
case "motherduck":
return &drivers.DatabaseSource{}
case "bigquery":
return &drivers.DatabaseSource{
Props: props,
}
default:
return nil
}
}
2 changes: 1 addition & 1 deletion runtime/connection_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (*mockDriver) Drop(config map[string]any, logger *zap.Logger) error {
}

// HasAnonymousSourceAccess implements drivers.Driver.
func (*mockDriver) HasAnonymousSourceAccess(ctx context.Context, src drivers.Source, logger *zap.Logger) (bool, error) {
func (*mockDriver) HasAnonymousSourceAccess(ctx context.Context, src map[string]any, logger *zap.Logger) (bool, error) {
panic("unimplemented")
}

Expand Down
6 changes: 5 additions & 1 deletion runtime/drivers/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (d driver) Spec() drivers.Spec {
return spec
}

func (d driver) HasAnonymousSourceAccess(ctx context.Context, src drivers.Source, logger *zap.Logger) (bool, error) {
func (d driver) HasAnonymousSourceAccess(ctx context.Context, src map[string]any, logger *zap.Logger) (bool, error) {
// gcp provides public access to the data via a project
return false, nil
}
Expand Down Expand Up @@ -204,6 +204,7 @@ func (c *Connection) AsFileStore() (drivers.FileStore, bool) {

type sourceProperties struct {
ProjectID string `mapstructure:"project_id"`
SQL string `mapstructure:"sql"`
}

func parseSourceProperties(props map[string]any) (*sourceProperties, error) {
Expand All @@ -212,6 +213,9 @@ func parseSourceProperties(props map[string]any) (*sourceProperties, error) {
if err != nil {
return nil, err
}
if conf.SQL == "" {
return nil, fmt.Errorf("property 'sql' is mandatory for connector \"bigquery\"")
}
if conf.ProjectID == "" {
conf.ProjectID = bigquery.DetectProjectID
}
Expand Down
Loading
Loading