Skip to content

Commit

Permalink
source templating
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anshul committed Dec 18, 2023
1 parent 7f2fef8 commit 7e218fd
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
23 changes: 19 additions & 4 deletions runtime/reconcilers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/compilers/rillv1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/pbutil"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -176,7 +177,7 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceN

// Execute ingestion
r.C.Logger.Info("Ingesting source data", slog.String("name", n.Name), slog.String("connector", connector))
ingestErr := r.ingestSource(ctx, src.Spec, stagingTableName)
ingestErr := r.ingestSource(ctx, self, stagingTableName)
if ingestErr != nil {
ingestErr = fmt.Errorf("failed to ingest source: %w", ingestErr)
}
Expand Down Expand Up @@ -308,7 +309,8 @@ func (r *SourceReconciler) setTriggerFalse(ctx context.Context, n *runtimev1.Res
// ingestSource ingests the source into a table with tableName.
// It does NOT drop the table if ingestion fails after the table has been created.
// It will return an error if the sink connector is not an OLAP.
func (r *SourceReconciler) ingestSource(ctx context.Context, src *runtimev1.SourceSpec, tableName string) (outErr error) {
func (r *SourceReconciler) ingestSource(ctx context.Context, self *runtimev1.Resource, tableName string) (outErr error) {
src := self.Resource.(*runtimev1.Resource_Source).Source.GetSpec()
// Get connections and transporter
srcConn, release1, err := r.C.AcquireConn(ctx, src.SourceConnector)
if err != nil {
Expand All @@ -329,7 +331,7 @@ func (r *SourceReconciler) ingestSource(ctx context.Context, src *runtimev1.Sour
}

// Get source and sink configs
srcConfig, err := driversSource(srcConn, src.Properties)
srcConfig, err := r.driversSource(ctx, self, src.Properties)
if err != nil {
return err
}
Expand Down Expand Up @@ -382,8 +384,21 @@ func (r *SourceReconciler) ingestSource(ctx context.Context, src *runtimev1.Sour
return err
}

func driversSource(conn drivers.Handle, propsPB *structpb.Struct) (map[string]any, error) {
func (r *SourceReconciler) driversSource(ctx context.Context, self *runtimev1.Resource, propsPB *structpb.Struct) (map[string]any, error) {
props := propsPB.AsMap()
spec := self.Resource.(*runtimev1.Resource_Source).Source.Spec
state := self.Resource.(*runtimev1.Resource_Source).Source.State

var err error
props, err = resolveTemplateProps(ctx, props, rillv1.TemplateResource{
Meta: self.Meta,
Spec: spec,
State: state,
}, r.C)
if err != nil {
return nil, err
}

return props, nil
}

Expand Down
46 changes: 46 additions & 0 deletions runtime/reconcilers/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
compilerv1 "github.com/rilldata/rill/runtime/compilers/rillv1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/robfig/cron/v3"
)
Expand Down Expand Up @@ -154,3 +155,48 @@ func safeSQLName(name string) string {
}
return fmt.Sprintf("\"%s\"", strings.ReplaceAll(name, "\"", "\"\""))
}

func resolveTemplateProps(ctx context.Context, props map[string]any, self compilerv1.TemplateResource, c *runtime.Controller) (map[string]any, error) {
inst, err := c.Runtime.Instance(ctx, c.InstanceID)
if err != nil {
return nil, err
}
vars := inst.ResolveVariables()

for key, value := range props {
strValue, ok := value.(string)
if !ok {
continue
}

strValue, err = compilerv1.ResolveTemplate(strValue, compilerv1.TemplateData{
User: map[string]interface{}{},
Variables: vars,
ExtraProps: map[string]interface{}{},
Self: self,
Resolve: func(ref compilerv1.ResourceName) (string, error) {
return safeSQLName(ref.Name), nil
},
Lookup: func(name compilerv1.ResourceName) (compilerv1.TemplateResource, error) {
if name.Kind == compilerv1.ResourceKindUnspecified {
return compilerv1.TemplateResource{}, fmt.Errorf("can't resolve name %q without kind specified", name.Name)
}
res, err := c.Get(ctx, resourceNameFromCompiler(name), false)
if err != nil {
return compilerv1.TemplateResource{}, err
}
return compilerv1.TemplateResource{
Meta: res.Meta,
Spec: res.Resource.(*runtimev1.Resource_Model).Model.Spec,
State: res.Resource.(*runtimev1.Resource_Model).Model.State,
}, nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to resolve template: %w", err)
}

props[key] = strValue
}
return props, nil
}

0 comments on commit 7e218fd

Please sign in to comment.