diff --git a/runtime/reconcilers/source.go b/runtime/reconcilers/source.go index a78db2e5272..473b0b89eed 100644 --- a/runtime/reconcilers/source.go +++ b/runtime/reconcilers/source.go @@ -11,6 +11,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" + "github.com/rilldata/rill/runtime/compilers/rillv1" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/pbutil" "go.opentelemetry.io/otel/attribute" @@ -176,7 +177,7 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceN // Execute ingestion r.C.Logger.Info("Ingesting source data", slog.String("name", n.Name), slog.String("connector", connector)) - ingestErr := r.ingestSource(ctx, src.Spec, stagingTableName) + ingestErr := r.ingestSource(ctx, self, stagingTableName) if ingestErr != nil { ingestErr = fmt.Errorf("failed to ingest source: %w", ingestErr) } else if !r.C.Runtime.AllowHostAccess() { @@ -312,7 +313,9 @@ func (r *SourceReconciler) setTriggerFalse(ctx context.Context, n *runtimev1.Res // ingestSource ingests the source into a table with tableName. // It does NOT drop the table if ingestion fails after the table has been created. // It will return an error if the sink connector is not an OLAP. -func (r *SourceReconciler) ingestSource(ctx context.Context, src *runtimev1.SourceSpec, tableName string) (outErr error) { +func (r *SourceReconciler) ingestSource(ctx context.Context, self *runtimev1.Resource, tableName string) (outErr error) { + src := self.GetSource().Spec + // Get connections and transporter srcConn, release1, err := r.C.AcquireConn(ctx, src.SourceConnector) if err != nil { @@ -333,7 +336,7 @@ func (r *SourceReconciler) ingestSource(ctx context.Context, src *runtimev1.Sour } // Get source and sink configs - srcConfig, err := driversSource(srcConn, src.Properties) + srcConfig, err := r.driversSource(ctx, self, src.Properties) if err != nil { return err } @@ -386,9 +389,14 @@ func (r *SourceReconciler) ingestSource(ctx context.Context, src *runtimev1.Sour return err } -func driversSource(conn drivers.Handle, propsPB *structpb.Struct) (map[string]any, error) { - props := propsPB.AsMap() - return props, nil +func (r *SourceReconciler) driversSource(ctx context.Context, self *runtimev1.Resource, propsPB *structpb.Struct) (map[string]any, error) { + tself := rillv1.TemplateResource{ + Meta: self.Meta, + Spec: self.GetSource().Spec, + State: self.GetSource().State, + } + + return resolveTemplatedProps(ctx, r.C, tself, propsPB.AsMap()) } func driversSink(conn drivers.Handle, tableName string) (map[string]any, error) { diff --git a/runtime/reconcilers/util.go b/runtime/reconcilers/util.go index a0c34ff6a39..baf5835170e 100644 --- a/runtime/reconcilers/util.go +++ b/runtime/reconcilers/util.go @@ -9,6 +9,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" + compilerv1 "github.com/rilldata/rill/runtime/compilers/rillv1" "github.com/rilldata/rill/runtime/drivers" "github.com/robfig/cron/v3" "golang.org/x/exp/slog" @@ -184,3 +185,75 @@ func logTableNameAndType(ctx context.Context, c *runtime.Controller, connector, c.Logger.Info("LogTableNameAndType: ", slog.String("name", name), slog.String("schema", strings.Join(colTyp, ", "))) } + +func resolveTemplatedProps(ctx context.Context, c *runtime.Controller, self compilerv1.TemplateResource, props map[string]any) (map[string]any, error) { + inst, err := c.Runtime.Instance(ctx, c.InstanceID) + if err != nil { + return nil, err + } + vars := inst.ResolveVariables() + + templateData := compilerv1.TemplateData{ + User: map[string]interface{}{}, + Variables: vars, + ExtraProps: map[string]interface{}{}, + Self: self, + Resolve: func(ref compilerv1.ResourceName) (string, error) { + return safeSQLName(ref.Name), nil + }, + Lookup: func(name compilerv1.ResourceName) (compilerv1.TemplateResource, error) { + if name.Kind == compilerv1.ResourceKindUnspecified { + return compilerv1.TemplateResource{}, fmt.Errorf("can't resolve name %q without kind specified", name.Name) + } + res, err := c.Get(ctx, resourceNameFromCompiler(name), false) + if err != nil { + return compilerv1.TemplateResource{}, err + } + return compilerv1.TemplateResource{ + Meta: res.Meta, + Spec: res.Resource.(*runtimev1.Resource_Model).Model.Spec, + State: res.Resource.(*runtimev1.Resource_Model).Model.State, + }, nil + }, + } + + for key, value := range props { + res, err := convert(value, &templateData) + if err != nil { + return nil, fmt.Errorf("failed to convert property %q: %w", key, err) + } + props[key] = res + } + return props, nil +} + +func convert(value any, templateData *compilerv1.TemplateData) (res any, err error) { + switch v := value.(type) { + case string: + res, err = compilerv1.ResolveTemplate(v, *templateData) + if err != nil { + return nil, fmt.Errorf("failed to resolve template: %w", err) + } + case map[string]any: + for key, item := range v { + item, err = convert(item, templateData) + if err != nil { + return nil, err + } + v[key] = item + } + res = v + case []any: + for i, item := range v { + item, err = convert(item, templateData) + if err != nil { + return nil, err + } + v[i] = item + } + res = v + default: + res = v + } + return +}