From 7e218fd23f433bc4b048573114ced31f813c78d0 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Mon, 18 Dec 2023 12:45:03 +0530 Subject: [PATCH] source templating --- runtime/reconcilers/source.go | 23 +++++++++++++++--- runtime/reconcilers/util.go | 46 +++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/runtime/reconcilers/source.go b/runtime/reconcilers/source.go index 442e094d7bc..0cdc2c99221 100644 --- a/runtime/reconcilers/source.go +++ b/runtime/reconcilers/source.go @@ -11,6 +11,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" + "github.com/rilldata/rill/runtime/compilers/rillv1" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/pbutil" "go.opentelemetry.io/otel/attribute" @@ -176,7 +177,7 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceN // Execute ingestion r.C.Logger.Info("Ingesting source data", slog.String("name", n.Name), slog.String("connector", connector)) - ingestErr := r.ingestSource(ctx, src.Spec, stagingTableName) + ingestErr := r.ingestSource(ctx, self, stagingTableName) if ingestErr != nil { ingestErr = fmt.Errorf("failed to ingest source: %w", ingestErr) } @@ -308,7 +309,8 @@ func (r *SourceReconciler) setTriggerFalse(ctx context.Context, n *runtimev1.Res // ingestSource ingests the source into a table with tableName. // It does NOT drop the table if ingestion fails after the table has been created. // It will return an error if the sink connector is not an OLAP. -func (r *SourceReconciler) ingestSource(ctx context.Context, src *runtimev1.SourceSpec, tableName string) (outErr error) { +func (r *SourceReconciler) ingestSource(ctx context.Context, self *runtimev1.Resource, tableName string) (outErr error) { + src := self.Resource.(*runtimev1.Resource_Source).Source.GetSpec() // Get connections and transporter srcConn, release1, err := r.C.AcquireConn(ctx, src.SourceConnector) if err != nil { @@ -329,7 +331,7 @@ func (r *SourceReconciler) ingestSource(ctx context.Context, src *runtimev1.Sour } // Get source and sink configs - srcConfig, err := driversSource(srcConn, src.Properties) + srcConfig, err := r.driversSource(ctx, self, src.Properties) if err != nil { return err } @@ -382,8 +384,21 @@ func (r *SourceReconciler) ingestSource(ctx context.Context, src *runtimev1.Sour return err } -func driversSource(conn drivers.Handle, propsPB *structpb.Struct) (map[string]any, error) { +func (r *SourceReconciler) driversSource(ctx context.Context, self *runtimev1.Resource, propsPB *structpb.Struct) (map[string]any, error) { props := propsPB.AsMap() + spec := self.Resource.(*runtimev1.Resource_Source).Source.Spec + state := self.Resource.(*runtimev1.Resource_Source).Source.State + + var err error + props, err = resolveTemplateProps(ctx, props, rillv1.TemplateResource{ + Meta: self.Meta, + Spec: spec, + State: state, + }, r.C) + if err != nil { + return nil, err + } + return props, nil } diff --git a/runtime/reconcilers/util.go b/runtime/reconcilers/util.go index d9105d8c334..11f41e593fb 100644 --- a/runtime/reconcilers/util.go +++ b/runtime/reconcilers/util.go @@ -9,6 +9,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" + compilerv1 "github.com/rilldata/rill/runtime/compilers/rillv1" "github.com/rilldata/rill/runtime/drivers" "github.com/robfig/cron/v3" ) @@ -154,3 +155,48 @@ func safeSQLName(name string) string { } return fmt.Sprintf("\"%s\"", strings.ReplaceAll(name, "\"", "\"\"")) } + +func resolveTemplateProps(ctx context.Context, props map[string]any, self compilerv1.TemplateResource, c *runtime.Controller) (map[string]any, error) { + inst, err := c.Runtime.Instance(ctx, c.InstanceID) + if err != nil { + return nil, err + } + vars := inst.ResolveVariables() + + for key, value := range props { + strValue, ok := value.(string) + if !ok { + continue + } + + strValue, err = compilerv1.ResolveTemplate(strValue, compilerv1.TemplateData{ + User: map[string]interface{}{}, + Variables: vars, + ExtraProps: map[string]interface{}{}, + Self: self, + Resolve: func(ref compilerv1.ResourceName) (string, error) { + return safeSQLName(ref.Name), nil + }, + Lookup: func(name compilerv1.ResourceName) (compilerv1.TemplateResource, error) { + if name.Kind == compilerv1.ResourceKindUnspecified { + return compilerv1.TemplateResource{}, fmt.Errorf("can't resolve name %q without kind specified", name.Name) + } + res, err := c.Get(ctx, resourceNameFromCompiler(name), false) + if err != nil { + return compilerv1.TemplateResource{}, err + } + return compilerv1.TemplateResource{ + Meta: res.Meta, + Spec: res.Resource.(*runtimev1.Resource_Model).Model.Spec, + State: res.Resource.(*runtimev1.Resource_Model).Model.State, + }, nil + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to resolve template: %w", err) + } + + props[key] = strValue + } + return props, nil +}