Skip to content

Commit

Permalink
also add to source
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anshul committed Dec 19, 2023
1 parent 152970a commit 8800def
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 34 deletions.
39 changes: 5 additions & 34 deletions runtime/reconcilers/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ import (
"encoding/hex"
"errors"
"fmt"
"strings"
"time"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
compilerv1 "github.com/rilldata/rill/runtime/compilers/rillv1"
"github.com/rilldata/rill/runtime/drivers"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -223,11 +221,13 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceNa
createErr := r.createModel(ctx, self, stagingTableName, !materialize)
if createErr != nil {
createErr = fmt.Errorf("failed to create model: %w", createErr)
} else {

Check failure on line 224 in runtime/reconcilers/model.go

View workflow job for this annotation

GitHub Actions / lint

elseif: can replace 'else {if cond {}}' with 'else if cond {}' (gocritic)
if !r.C.Runtime.AllowHostAccess() {
// temporarily for debugging
logTableNameAndType(ctx, r.C, connector, stagingTableName)
}
}

// temporarily for debugging
r.logModelNameAndType(ctx, self, stagingTableName)

if createErr == nil && stage {
// Rename the staging table to main view/table
err = olapForceRenameTable(ctx, r.C, connector, stagingTableName, !materialize, tableName)
Expand Down Expand Up @@ -465,32 +465,3 @@ func (r *ModelReconciler) createModel(ctx context.Context, self *runtimev1.Resou

return olap.CreateTableAsSelect(ctx, tableName, view, sql)
}

func (r *ModelReconciler) logModelNameAndType(ctx context.Context, self *runtimev1.Resource, name string) {
olap, release, err := r.C.AcquireOLAP(ctx, self.GetModel().Spec.Connector)
if err != nil {
r.C.Logger.Error("ModelReconciler: failed to acquire OLAP", slog.Any("err", err))
return
}
defer release()

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT column_name, data_type FROM information_schema.columns WHERE table_name=? ORDER BY column_name ASC", Args: []any{name}})
if err != nil {
r.C.Logger.Error("ModelReconciler: failed information_schema.columns", slog.Any("err", err))
return
}
defer res.Close()

colTyp := make([]string, 0)
var col, typ string
for res.Next() {
err = res.Scan(&col, &typ)
if err != nil {
r.C.Logger.Error("ModelReconciler: failed scan", slog.Any("err", err))
return
}
colTyp = append(colTyp, fmt.Sprintf("%s:%s", col, typ))
}

r.C.Logger.Info("ModelReconciler: ", slog.String("name", name), slog.String("schema", strings.Join(colTyp, ", ")))
}
6 changes: 6 additions & 0 deletions runtime/reconcilers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,13 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceN
ingestErr := r.ingestSource(ctx, src.Spec, stagingTableName)
if ingestErr != nil {
ingestErr = fmt.Errorf("failed to ingest source: %w", ingestErr)
} else {

Check failure on line 182 in runtime/reconcilers/source.go

View workflow job for this annotation

GitHub Actions / lint

elseif: can replace 'else {if cond {}}' with 'else if cond {}' (gocritic)
if !r.C.Runtime.AllowHostAccess() {
// temporarily for debugging
logTableNameAndType(ctx, r.C, connector, stagingTableName)
}
}

if ingestErr == nil && src.Spec.StageChanges {
// Rename staging table to main table
err = olapForceRenameTable(ctx, r.C, connector, stagingTableName, false, tableName)
Expand Down
30 changes: 30 additions & 0 deletions runtime/reconcilers/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
"github.com/robfig/cron/v3"
"golang.org/x/exp/slog"
)

// checkRefs checks that all refs exist, are idle, and have no errors.
Expand Down Expand Up @@ -154,3 +155,32 @@ func safeSQLName(name string) string {
}
return fmt.Sprintf("\"%s\"", strings.ReplaceAll(name, "\"", "\"\""))
}

func logTableNameAndType(ctx context.Context, c *runtime.Controller, connector, name string) {
olap, release, err := c.AcquireOLAP(ctx, connector)
if err != nil {
c.Logger.Error("LogTableNameAndType: failed to acquire OLAP", slog.Any("err", err))
return
}
defer release()

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT column_name, data_type FROM information_schema.columns WHERE table_name=? ORDER BY column_name ASC", Args: []any{name}})
if err != nil {
c.Logger.Error("LogTableNameAndType: failed information_schema.columns", slog.Any("err", err))
return
}
defer res.Close()

colTyp := make([]string, 0)
var col, typ string
for res.Next() {
err = res.Scan(&col, &typ)
if err != nil {
c.Logger.Error("LogTableNameAndType: failed scan", slog.Any("err", err))
return
}
colTyp = append(colTyp, fmt.Sprintf("%s:%s", col, typ))
}

c.Logger.Info("LogTableNameAndType: ", slog.String("name", name), slog.String("schema", strings.Join(colTyp, ", ")))
}

0 comments on commit 8800def

Please sign in to comment.