Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gateway/rest): use cluster status instead of manifest #243

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions gateway/rest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ctxCon

// POST /lease/<lease-id>/shell
lrouter.HandleFunc("/shell",
leaseShellHandler(log, pclient.Manifest(), pclient.Cluster()))
leaseShellHandler(log, pclient.Cluster(), ctxConfig))

return router
}
Expand Down Expand Up @@ -306,24 +306,20 @@ type leaseShellResponse struct {
Message string `json:"message,omitempty"`
}

func leaseShellHandler(log log.Logger, mclient pmanifest.Client, cclient cluster.Client) http.HandlerFunc {
func leaseShellHandler(log log.Logger, cclient cluster.Client, clusterSettings map[any]any) http.HandlerFunc {
return func(rw http.ResponseWriter, req *http.Request) {
leaseID := requestLeaseID(req)

ctx := fromctx.ApplyToContext(req.Context(), clusterSettings)

// check if deployment actually exists in the first place before querying kubernetes
active, err := mclient.IsActive(req.Context(), leaseID.DeploymentID())
status, err := cclient.LeaseStatus(ctx, leaseID)
if err != nil {
log.Error("failed checking deployment activity", "err", err)
rw.WriteHeader(http.StatusInternalServerError)
return
}

if !active {
log.Info("no active deployment", "lease", leaseID)
rw.WriteHeader(http.StatusNotFound)
return
}

localLog := log.With("lease", leaseID.String(), "action", "shell")

vars := req.URL.Query()
Expand Down Expand Up @@ -357,6 +353,19 @@ func leaseShellHandler(log log.Logger, mclient pmanifest.Client, cclient cluster
return
}

svc, active := status[service]
if !active {
log.Info("no active deployment", "lease", leaseID)
rw.WriteHeader(http.StatusNotFound)
return
}

if svc.Available == 0 {
log.Info("deployment does not have active replicas", "lease", leaseID)
rw.WriteHeader(http.StatusNotFound)
return
}

stdin := vars.Get("stdin")
if 0 == len(stdin) {
localLog.Error("missing parameter stdin")
Expand Down
57 changes: 0 additions & 57 deletions manifest/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 2 additions & 40 deletions manifest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type StatusClient interface {
//go:generate mockery --name Client
type Client interface {
Submit(context.Context, dtypes.DeploymentID, manifest.Manifest) error
IsActive(context.Context, dtypes.DeploymentID) (bool, error)
}

// Service is the interface that includes StatusClient and Handler interfaces. It also wraps Done method
Expand All @@ -82,7 +81,6 @@ func NewService(ctx context.Context, session session.Session, bus pubsub.Bus, ho
sub: sub,
statusch: make(chan chan<- *Status),
mreqch: make(chan manifestRequest),
activeCheckCh: make(chan isActiveCheck),
managers: make(map[string]*manager),
managerch: make(chan *manager),
lc: lifecycle.New(),
Expand All @@ -107,9 +105,8 @@ type service struct {
sub pubsub.Subscriber
lc lifecycle.Lifecycle

statusch chan chan<- *Status
mreqch chan manifestRequest
activeCheckCh chan isActiveCheck
statusch chan chan<- *Status
mreqch chan manifestRequest

managers map[string]*manager
managerch chan *manager
Expand All @@ -131,38 +128,6 @@ func (s *service) updateGauges() {
manifestWatchdogGauge.Set(float64(len(s.managers)))
}

type isActiveCheck struct {
ch chan<- bool
Deployment dtypes.DeploymentID
}

func (s *service) IsActive(ctx context.Context, dID dtypes.DeploymentID) (bool, error) {
ch := make(chan bool, 1)
req := isActiveCheck{
Deployment: dID,
ch: ch,
}

select {
case <-ctx.Done():
return false, ctx.Err()
case s.activeCheckCh <- req:
case <-s.lc.ShuttingDown():
return false, ErrNotRunning
case <-s.lc.Done():
return false, ErrNotRunning
}

select {
case <-ctx.Done():
return false, ctx.Err()
case <-s.lc.Done():
return false, ErrNotRunning
case result := <-ch:
return result, nil
}
}

// Submit incoming manifest request.
func (s *service) Submit(ctx context.Context, did dtypes.DeploymentID, mani manifest.Manifest) error {
// This needs to be buffered because the goroutine writing to this may get the result
Expand Down Expand Up @@ -287,9 +252,6 @@ loop:
manager.removeLease(ev.ID)
}
}
case check := <-s.activeCheckCh:
_, ok := s.managers[dquery.DeploymentPath(check.Deployment)]
check.ch <- ok
case req := <-s.mreqch:
// Cancel the watchdog (if it exists), since a manifest has been received
s.maybeRemoveWatchdog(req.value.Deployment)
Expand Down
Loading