Skip to content

Commit

Permalink
Async reconcile integration (#3092)
Browse files Browse the repository at this point in the history
* Create controllers for all instances; instance cache; Refactor package runtime

* Adapt runtime/queries/ to use MetricsViewSpec

* Old catalog backwards compatibility

* Remove TriggerSync API

* Implement wait until idle and ready

* Move project parser config to instance

* Add reconcile logging

* Implement CreateTrigger

* Backwards compatibility for PutXXXAndReconcile

* Fix test error

* Fix tests part 1

* Fix a bunch of race conditions

* Better test utils

* Source test

* More extensive source testing

* Fix linter

* Fix more tests

* Basic complete test working

* More complete test

* Fix test utils

* Fix runtime package tests

* Fix oversized SQL test failure

* Misc fixes

* Make rill start work

* Increase sleep

* Fix bug in catalog flush

* Add missing timeline timer reset

* Make catalog case insensitive

* Hide kind qualifier from logs

* Fix redundant delayed materializations

* Ensure hydrating... log printed at top

* Cleaner logging

* Fix log ordering

* Check ctx in catalog funcs

* Fixed renames not getting enqueued

* Misc fixes

* Use every instead of ticker for refresh schedule yaml

* Fix merge conflict

* Handle init part 1

* Init part 2

* Rename dag2 -> dag

* Fix linter

* Fix model renames not turning into delete+create

* Insert instead of upsert resources

* Fix source model collision not removing model

* Fix name collision restoration in Reparse

* Add diff test

* Add another reparse collision test case

* Deduplicate buffered watcher paths

* Don't use LongRunning for model views

* Async reconcile UI Integration (#3118)

* Adding name selector and updating sources

* Updating models

* Updating remaining references in sources and models

* entity => resource

* Updating dashboards to use the new API

* Adding global invalidations

* Updated selectors

* Adding basic create/update of a source

* Update create model and dashboard from source

* Moving resource init into the store

* Updating models to use the new API

* Using parse errors

* Initial commit for dashboard integration

* Address previous PR comments

* Improving the resource/file watchers

* Fixing various issues

* Adding wait for dashboard creation

* Fixing a few more issues

* Fixing basic actions

* Fixing model and sources tests

* Fixing errored dashboard navigation

* Fixing the tests to a degree

* Fixing the final tests

* Fixing prettier

* Fixing unit tests

* Deterministic file watcher test

* Use StateUpdatedOn in query cache key

* Fix scheduling block on concurrent delete and rename

* Fix subscriber close race condition

* Adityahegde/async reconcile unit tests (#3148)

* Adding some tests

* Adding rename tests

* Removing prints

* Adding more rename tests

* Adding remaining tests

* Update tests

* Remove unused code

* Fix test import cycle

* Fix a few more edge cases

* Avoid ambiguous use of WaitUntilIdle

* Fixing dashboard e2e tests

* partner dashboard fixes

* Async reconcile: admin and CLI integration (#3157)

* Async reconcile: admin integration

* Fix prettier

* Fix return

* Fixes

* Async reconcile integration cloud fixes (#3163)

* Fixing project listings

* Some edge cases with errors

* Fix undefined path in project errors

* Fix prettier

* Misc fixes

* Make tests pass with -race

* Adding a rudimentary temporary spinner for reconciling resources

* Fixing linters

* Fix prettier

* Updating the reconciling message to include the item

* Fix wait for initial hydration

* Fixing view-as redirecting to editor on reset

* Improving reconciling message

* Remove some rillv1beta deps

* Misc UI fixes

* Treat self-referencing resources as cyclic

* Log error for cyclic deps

* Fix merge conflict

* Drop V2 from resource kinds

* Fixing errored model during source ingestion

* Fix merge error

* Fix async reconcile integration E2E stability (#3166)

* Fix prettier

* Removing log

* Adding logs

* Adding the correct e2e logs

* Adding a retry to resource network call

* Support keystroke-by-keystroke editing of rill.yaml (#3191)

* Use incrementing ids for file watchers

* Fix merge issues with sqlite

* Fix sqlite catalogv2

* Fix tests

* Extra safety in file watcher

* Fix delayed materialization

* Elaborate on existingTyp

* Millisecond query cache

---------

Co-authored-by: Aditya Hegde <[email protected]>
  • Loading branch information
begelundmuller and AdityaHegde authored Oct 10, 2023
1 parent f6a3715 commit b609031
Show file tree
Hide file tree
Showing 242 changed files with 9,938 additions and 16,651 deletions.
52 changes: 16 additions & 36 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package admin

import (
"context"
"sync"

"github.com/rilldata/rill/admin/database"
"github.com/rilldata/rill/admin/email"
Expand All @@ -18,17 +17,14 @@ type Options struct {
}

type Service struct {
DB database.DB
Provisioner *provisioner.StaticProvisioner
Email *email.Client
Used *usedFlusher
Github Github
Logger *zap.Logger
opts *Options
issuer *auth.Issuer
closeCtx context.Context
closeCtxCancel context.CancelFunc
reconcileWg sync.WaitGroup
DB database.DB
Provisioner *provisioner.StaticProvisioner
Email *email.Client
Used *usedFlusher
Github Github
Logger *zap.Logger
opts *Options
issuer *auth.Issuer
}

func New(ctx context.Context, opts *Options, logger *zap.Logger, issuer *auth.Issuer, emailClient *email.Client, github Github) (*Service, error) {
Expand Down Expand Up @@ -63,35 +59,19 @@ func New(ctx context.Context, opts *Options, logger *zap.Logger, issuer *auth.Is
return nil, err
}

// Create context that we cancel in Close() (for background reconciles)
ctx, cancel := context.WithCancel(context.Background())

return &Service{
DB: db,
Provisioner: prov,
Email: emailClient,
Github: github,
Used: newUsedFlusher(logger, db),
opts: opts,
Logger: logger,
issuer: issuer,
closeCtx: ctx,
closeCtxCancel: cancel,
DB: db,
Provisioner: prov,
Email: emailClient,
Github: github,
Used: newUsedFlusher(logger, db),
opts: opts,
Logger: logger,
issuer: issuer,
}, nil
}

func (s *Service) Close() error {
s.closeCtxCancel()
s.reconcileWg.Wait()
s.Used.Close()

return s.DB.Close()
}

// UnsafeWaitForReconciles waits for all background reconciles to finish.
// It is unsafe because while it is running, no new reconciles should be started.
// It's a temporary solution until the runtime is able to reconcile asynchronously.
// Unlike s.Close(), it does not cancel currently running reconciles, it just waits for them to finish.
func (s *Service) UnsafeWaitForReconciles() {
s.reconcileWg.Wait()
}
8 changes: 3 additions & 5 deletions admin/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ type DB interface {
InsertProject(ctx context.Context, opts *InsertProjectOptions) (*Project, error)
DeleteProject(ctx context.Context, id string) error
UpdateProject(ctx context.Context, id string, opts *UpdateProjectOptions) (*Project, error)
UpdateProjectVariables(ctx context.Context, id string, variables map[string]string) (*Project, error)
CountProjectsForOrganization(ctx context.Context, orgID string) (int, error)

FindExpiredDeployments(ctx context.Context) ([]*Deployment, error)
Expand All @@ -87,7 +86,7 @@ type DB interface {
FindDeploymentByInstanceID(ctx context.Context, instanceID string) (*Deployment, error)
InsertDeployment(ctx context.Context, opts *InsertDeploymentOptions) (*Deployment, error)
DeleteDeployment(ctx context.Context, id string) error
UpdateDeploymentStatus(ctx context.Context, id string, status DeploymentStatus, logs string) (*Deployment, error)
UpdateDeploymentStatus(ctx context.Context, id string, status DeploymentStatus, msg string) (*Deployment, error)
UpdateDeploymentBranch(ctx context.Context, id, branch string) (*Deployment, error)
UpdateDeploymentUsedOn(ctx context.Context, ids []string) error
CountDeploymentsForOrganization(ctx context.Context, orgID string) (*DeploymentsCount, error)
Expand Down Expand Up @@ -311,7 +310,6 @@ const (
DeploymentStatusUnspecified DeploymentStatus = 0
DeploymentStatusPending DeploymentStatus = 1
DeploymentStatusOK DeploymentStatus = 2
DeploymentStatusReconciling DeploymentStatus = 3
DeploymentStatusError DeploymentStatus = 4
)

Expand All @@ -326,7 +324,7 @@ type Deployment struct {
RuntimeInstanceID string `db:"runtime_instance_id"`
RuntimeAudience string `db:"runtime_audience"`
Status DeploymentStatus `db:"status"`
Logs string `db:"logs"`
StatusMessage string `db:"status_message"`
CreatedOn time.Time `db:"created_on"`
UpdatedOn time.Time `db:"updated_on"`
UsedOn time.Time `db:"used_on"`
Expand All @@ -341,7 +339,7 @@ type InsertDeploymentOptions struct {
RuntimeInstanceID string `validate:"required"`
RuntimeAudience string
Status DeploymentStatus
Logs string
StatusMessage string
}

// RuntimeSlotsUsed is the result of a ResolveRuntimeSlotsUsed query.
Expand Down
1 change: 1 addition & 0 deletions admin/database/postgres/migrations/0014.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE deployments RENAME COLUMN logs TO status_message;
19 changes: 4 additions & 15 deletions admin/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,17 +355,6 @@ func (c *connection) UpdateProject(ctx context.Context, id string, opts *databas
return res, nil
}

func (c *connection) UpdateProjectVariables(ctx context.Context, id string, prodVariables map[string]string) (*database.Project, error) {
res := &database.Project{}
err := c.getDB(ctx).QueryRowxContext(ctx, "UPDATE projects SET prod_variables=$1, updated_on=now() WHERE id=$2 RETURNING *",
prodVariables, id,
).StructScan(res)
if err != nil {
return nil, parseErr("project", err)
}
return res, nil
}

func (c *connection) CountProjectsForOrganization(ctx context.Context, orgID string) (int, error) {
var count int
err := c.getDB(ctx).QueryRowxContext(ctx, "SELECT COUNT(*) FROM projects WHERE org_id = $1", orgID).Scan(&count)
Expand Down Expand Up @@ -423,9 +412,9 @@ func (c *connection) InsertDeployment(ctx context.Context, opts *database.Insert

res := &database.Deployment{}
err := c.getDB(ctx).QueryRowxContext(ctx, `
INSERT INTO deployments (project_id, slots, branch, runtime_host, runtime_instance_id, runtime_audience, status, logs)
INSERT INTO deployments (project_id, slots, branch, runtime_host, runtime_instance_id, runtime_audience, status, status_message)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING *`,
opts.ProjectID, opts.Slots, opts.Branch, opts.RuntimeHost, opts.RuntimeInstanceID, opts.RuntimeAudience, opts.Status, opts.Logs,
opts.ProjectID, opts.Slots, opts.Branch, opts.RuntimeHost, opts.RuntimeInstanceID, opts.RuntimeAudience, opts.Status, opts.StatusMessage,
).StructScan(res)
if err != nil {
return nil, parseErr("deployment", err)
Expand All @@ -438,9 +427,9 @@ func (c *connection) DeleteDeployment(ctx context.Context, id string) error {
return checkDeleteRow("deployment", res, err)
}

func (c *connection) UpdateDeploymentStatus(ctx context.Context, id string, status database.DeploymentStatus, logs string) (*database.Deployment, error) {
func (c *connection) UpdateDeploymentStatus(ctx context.Context, id string, status database.DeploymentStatus, message string) (*database.Deployment, error) {
res := &database.Deployment{}
err := c.getDB(ctx).QueryRowxContext(ctx, "UPDATE deployments SET status=$1, logs=$2, updated_on=now() WHERE id=$3 RETURNING *", status, logs, id).StructScan(res)
err := c.getDB(ctx).QueryRowxContext(ctx, "UPDATE deployments SET status=$1, status_message=$2, updated_on=now() WHERE id=$3 RETURNING *", status, message, id).StructScan(res)
if err != nil {
return nil, parseErr("deployment", err)
}
Expand Down
65 changes: 13 additions & 52 deletions admin/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,9 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp

// Create the instance
_, err = rt.CreateInstance(ctx, &runtimev1.CreateInstanceRequest{
InstanceId: instanceID,
OlapConnector: olapDriver,
RepoConnector: "repo",
EmbedCatalog: embedCatalog,
Variables: opts.ProdVariables,
IngestionLimitBytes: ingestionLimit,
Annotations: opts.Annotations.toMap(),
InstanceId: instanceID,
OlapConnector: olapDriver,
RepoConnector: "repo",
Connectors: []*runtimev1.Connector{
{
Name: olapDriver,
Expand All @@ -136,6 +132,11 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp
Config: map[string]string{"dsn": repoDSN},
},
},
Variables: opts.ProdVariables,
Annotations: opts.Annotations.toMap(),
EmbedCatalog: embedCatalog,
IngestionLimitBytes: ingestionLimit,
StageChanges: true,
})
if err != nil {
return nil, err
Expand All @@ -149,8 +150,7 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp
RuntimeHost: alloc.Host,
RuntimeInstanceID: instanceID,
RuntimeAudience: alloc.Audience,
Status: database.DeploymentStatusPending,
Logs: "",
Status: database.DeploymentStatusOK,
})
if err != nil {
_, err2 := rt.DeleteInstance(ctx, &runtimev1.DeleteInstanceRequest{
Expand All @@ -169,7 +169,7 @@ type updateDeploymentOptions struct {
Subpath string
Branch string
Variables map[string]string
Annotations *deploymentAnnotations
Annotations deploymentAnnotations
}

func (s *Service) updateDeployment(ctx context.Context, depl *database.Deployment, opts *updateDeploymentOptions) error {
Expand All @@ -192,6 +192,7 @@ func (s *Service) updateDeployment(ctx context.Context, depl *database.Deploymen
if err != nil {
return err
}

connectors := res.Instance.Connectors
for _, c := range connectors {
if c.Name == "repo" {
Expand All @@ -203,14 +204,11 @@ func (s *Service) updateDeployment(ctx context.Context, depl *database.Deploymen
}
}

var annotations map[string]string
if opts.Annotations != nil { // annotations changed
annotations = opts.Annotations.toMap()
}
_, err = rt.EditInstance(ctx, &runtimev1.EditInstanceRequest{
InstanceId: depl.RuntimeInstanceID,
Connectors: connectors,
Annotations: annotations,
Annotations: opts.Annotations.toMap(),
Variables: opts.Variables,
})
if err != nil {
return err
Expand All @@ -227,10 +225,6 @@ func (s *Service) updateDeployment(ctx context.Context, depl *database.Deploymen
depl.UpdatedOn = newDepl.UpdatedOn
}

if err := s.triggerReconcile(ctx, depl); err != nil {
s.Logger.Error("failed to trigger reconcile", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx))
return err
}
return nil
}

Expand All @@ -248,11 +242,6 @@ func (s *Service) HibernateDeployments(ctx context.Context) error {
s.Logger.Info("hibernate: starting", zap.Int("deployments", len(depls)))

for _, depl := range depls {
if depl.Status == database.DeploymentStatusReconciling && time.Since(depl.UpdatedOn) < 30*time.Minute {
s.Logger.Info("hibernate: skipping deployment because it is reconciling", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx))
continue
}

proj, err := s.DB.FindProject(ctx, depl.ProjectID)
if err != nil {
s.Logger.Error("hibernate: find project error", zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.Error(err), observability.ZapCtx(ctx))
Expand Down Expand Up @@ -291,34 +280,6 @@ func (s *Service) HibernateDeployments(ctx context.Context) error {
return nil
}

func (s *Service) updateDeplVariables(ctx context.Context, depl *database.Deployment, variables map[string]string) error {
rt, err := s.openRuntimeClientForDeployment(depl)
if err != nil {
return err
}
defer rt.Close()

_, err = rt.EditInstanceVariables(ctx, &runtimev1.EditInstanceVariablesRequest{
InstanceId: depl.RuntimeInstanceID,
Variables: variables,
})
return err
}

func (s *Service) updateDeplAnnotations(ctx context.Context, depl *database.Deployment, annotations deploymentAnnotations) error {
rt, err := s.openRuntimeClientForDeployment(depl)
if err != nil {
return err
}
defer rt.Close()

_, err = rt.EditInstanceAnnotations(ctx, &runtimev1.EditInstanceAnnotationsRequest{
InstanceId: depl.RuntimeInstanceID,
Annotations: annotations.toMap(),
})
return err
}

func (s *Service) teardownDeployment(ctx context.Context, proj *database.Project, depl *database.Deployment) error {
// Connect to the deployment's runtime
rt, err := s.openRuntimeClientForDeployment(depl)
Expand Down
Loading

1 comment on commit b609031

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.