Skip to content

Commit

Permalink
Simplify connector source and sink props (#3057)
Browse files Browse the repository at this point in the history
* Simplify connector source and sink props

* Fix test

* Add extract policy

* Review comment

* Self review
  • Loading branch information
begelundmuller authored Sep 12, 2023
1 parent a6f7bb7 commit d44bd03
Show file tree
Hide file tree
Showing 48 changed files with 601 additions and 1,378 deletions.
487 changes: 155 additions & 332 deletions proto/gen/rill/runtime/v1/catalog.pb.go

Large diffs are not rendered by default.

139 changes: 0 additions & 139 deletions proto/gen/rill/runtime/v1/catalog.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 9 additions & 39 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2266,6 +2266,13 @@ paths:
tags:
- ConnectorService
definitions:
BucketExtractPolicyStrategy:
type: string
enum:
- STRATEGY_UNSPECIFIED
- STRATEGY_HEAD
- STRATEGY_TAIL
default: STRATEGY_UNSPECIFIED
ColumnTimeSeriesRequestBasicMeasure:
type: object
properties:
Expand Down Expand Up @@ -2487,33 +2494,6 @@ definitions:
items:
type: string
title: Dimension/measure level access condition
SourceExtractPolicy:
type: object
properties:
rowsStrategy:
$ref: '#/definitions/SourceExtractPolicyStrategy'
title: strategy for selecting rows in a file
rowsLimitBytes:
type: string
format: uint64
title: |-
could in future add: uint64 rows_limit = n;
limit on data fetched in bytes
filesStrategy:
$ref: '#/definitions/SourceExtractPolicyStrategy'
title: strategy for selecting files
filesLimit:
type: string
format: uint64
title: limit on number of files
title: Extract policy for glob connectors
SourceExtractPolicyStrategy:
type: string
enum:
- STRATEGY_UNSPECIFIED
- STRATEGY_HEAD
- STRATEGY_TAIL
default: STRATEGY_UNSPECIFIED
StructTypeField:
type: object
properties:
Expand Down Expand Up @@ -2618,22 +2598,15 @@ definitions:
type: object
properties:
rowsStrategy:
$ref: '#/definitions/v1BucketExtractPolicyStrategy'
$ref: '#/definitions/BucketExtractPolicyStrategy'
rowsLimitBytes:
type: string
format: uint64
filesStrategy:
$ref: '#/definitions/v1BucketExtractPolicyStrategy'
$ref: '#/definitions/BucketExtractPolicyStrategy'
filesLimit:
type: string
format: uint64
v1BucketExtractPolicyStrategy:
type: string
enum:
- STRATEGY_UNSPECIFIED
- STRATEGY_HEAD
- STRATEGY_TAIL
default: STRATEGY_UNSPECIFIED
v1BucketPlanner:
type: object
properties:
Expand Down Expand Up @@ -4526,9 +4499,6 @@ definitions:
schema:
$ref: '#/definitions/v1StructType'
title: Detected schema of the source
policy:
$ref: '#/definitions/SourceExtractPolicy'
title: extraction policy for the source
timeoutSeconds:
type: integer
format: int32
Expand Down
19 changes: 0 additions & 19 deletions proto/rill/runtime/v1/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,6 @@ message Source {
google.protobuf.Struct properties = 3;
// Detected schema of the source
StructType schema = 5;
// Extract policy for glob connectors
message ExtractPolicy {
enum Strategy {
STRATEGY_UNSPECIFIED = 0;
STRATEGY_HEAD = 1;
STRATEGY_TAIL = 2;
}
// strategy for selecting rows in a file
Strategy rows_strategy = 1;
// could in future add: uint64 rows_limit = n;
// limit on data fetched in bytes
uint64 rows_limit_bytes = 2;
// strategy for selecting files
Strategy files_strategy = 3;
// limit on number of files
uint64 files_limit = 4;
}
// extraction policy for the source
ExtractPolicy policy = 6;
// timeout for source ingestion in seconds
int32 timeout_seconds = 7;
}
Expand Down
29 changes: 1 addition & 28 deletions runtime/compilers/rillv1/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (p *Parser) AnalyzeConnectors(ctx context.Context) ([]*Connector, error) {
break
}
// Poll for anon access
res, _ := connector.HasAnonymousSourceAccess(ctx, driverSourceForAnonAccessCheck(driver, r.SourceSpec), zap.NewNop())
res, _ := connector.HasAnonymousSourceAccess(ctx, r.SourceSpec.Properties.AsMap(), zap.NewNop())
if !res {
anonAccess = false
break
Expand Down Expand Up @@ -106,29 +105,3 @@ func (p *Parser) connectorForName(name string) (string, drivers.Driver, error) {
}
return driver, connector, nil
}

func driverSourceForAnonAccessCheck(connector string, src *runtimev1.SourceSpec) drivers.Source {
props := src.Properties.AsMap()
switch connector {
case "s3":
return &drivers.BucketSource{
Properties: props,
}
case "gcs":
return &drivers.BucketSource{
Properties: props,
}
case "https":
return &drivers.FileSource{
Properties: props,
}
case "local_file":
return &drivers.FileSource{
Properties: props,
}
case "motherduck":
return &drivers.DatabaseSource{}
default:
return nil
}
}
32 changes: 1 addition & 31 deletions runtime/compilers/rillv1beta/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func ExtractConnectors(ctx context.Context, projectPath string) ([]*Connector, e
}
// ignoring error since failure to resolve this should not break the deployment flow
// this can fail under cases such as full or host/bucket of URI is a variable
access, _ := connector.HasAnonymousSourceAccess(ctx, source(src.Connector, src), zap.NewNop())
access, _ := connector.HasAnonymousSourceAccess(ctx, src.Properties.AsMap(), zap.NewNop())
c := key{Name: src.Connector, Type: src.Connector, AnonymousAccess: access}
srcs, ok := connectorMap[c]
if !ok {
Expand Down Expand Up @@ -158,33 +158,3 @@ type key struct {
Type string
AnonymousAccess bool
}

func source(connector string, src *runtimev1.Source) drivers.Source {
props := src.Properties.AsMap()
switch connector {
case "s3":
return &drivers.BucketSource{
Properties: props,
}
case "gcs":
return &drivers.BucketSource{
Properties: props,
}
case "https":
return &drivers.FileSource{
Properties: props,
}
case "local_file":
return &drivers.FileSource{
Properties: props,
}
case "motherduck":
return &drivers.DatabaseSource{}
case "bigquery":
return &drivers.DatabaseSource{
Props: props,
}
default:
return nil
}
}
2 changes: 1 addition & 1 deletion runtime/connection_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (*mockDriver) Drop(config map[string]any, logger *zap.Logger) error {
}

// HasAnonymousSourceAccess implements drivers.Driver.
func (*mockDriver) HasAnonymousSourceAccess(ctx context.Context, src drivers.Source, logger *zap.Logger) (bool, error) {
func (*mockDriver) HasAnonymousSourceAccess(ctx context.Context, src map[string]any, logger *zap.Logger) (bool, error) {
panic("unimplemented")
}

Expand Down
6 changes: 5 additions & 1 deletion runtime/drivers/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (d driver) Spec() drivers.Spec {
return spec
}

func (d driver) HasAnonymousSourceAccess(ctx context.Context, src drivers.Source, logger *zap.Logger) (bool, error) {
func (d driver) HasAnonymousSourceAccess(ctx context.Context, src map[string]any, logger *zap.Logger) (bool, error) {
// gcp provides public access to the data via a project
return false, nil
}
Expand Down Expand Up @@ -204,6 +204,7 @@ func (c *Connection) AsFileStore() (drivers.FileStore, bool) {

type sourceProperties struct {
ProjectID string `mapstructure:"project_id"`
SQL string `mapstructure:"sql"`
}

func parseSourceProperties(props map[string]any) (*sourceProperties, error) {
Expand All @@ -212,6 +213,9 @@ func parseSourceProperties(props map[string]any) (*sourceProperties, error) {
if err != nil {
return nil, err
}
if conf.SQL == "" {
return nil, fmt.Errorf("property 'sql' is mandatory for connector \"bigquery\"")
}
if conf.ProjectID == "" {
conf.ProjectID = bigquery.DetectProjectID
}
Expand Down
Loading

1 comment on commit d44bd03

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.