Skip to content

Commit

Permalink
Remove the handlingLoop subcontexts
Browse files Browse the repository at this point in the history
We should use the stream's and server's contexts directly.
The subcontexts made the code obscure.
This aims to make clearer that forceful stop cancels the streams
contexts while the graceful stop allows the RPCs to finish their current
activities on their own before stopping the loops.

This method derived two context ojects mirroring the structure found in
the Server type: `gCtx` for graceful stop and `ctx` for "brute forcing".

They were arrenged in such a way that the local `gCtx` is child of
`ctx`. If the server own `ctx` is cancelled to force a stop, depending
on the moment that happens it could be interpreted by this method as a
graceful stop (because it only checks `gCtx` and that can be cancelled
due its parent being cancelled).

The whole point here is that `server.Stop()` is assumed in tests to
cause `server.Serve()` to return a non-nil error. But that can only
happen if the handling loop objects fail or perceive a brute-force
cancellation. If `handlingLooop[Command].run()` perceives `server.Stop()`
as a graceful cancellation, then they won't return errors.

Co-authored-by: Didier Roche-Tolomelli <[email protected]>
  • Loading branch information
CarlosNihelton and didrocks committed Aug 28, 2024
1 parent 1bc86bd commit 400acf7
Showing 1 changed file with 11 additions and 19 deletions.
30 changes: 11 additions & 19 deletions wsl-pro-service/internal/streams/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ type Server struct {

done chan struct{}

// This context will be the parent of the streams's context
ctx context.Context
cancel context.CancelFunc

// This context will be used for graceful stopping the server, i.e. waiting the streams to finish their current activities.
gracefulCtx context.Context
gracefulCancel context.CancelFunc
}
Expand Down Expand Up @@ -82,13 +84,15 @@ func NewServer(ctx context.Context, sys *system.System, conn *grpc.ClientConn) *
// Stop stops the server and the underlying connection immediately.
// It blocks until the server finishes its teardown.
func (s *Server) Stop() {
// Since this cancellation also cancels the streams's context, this should be an immediate stop.
s.cancel()
<-s.done
}

// GracefulStop stops the server as soon as all active unary calls finish.
// It blocks until the server finishes its teardown.
func (s *Server) GracefulStop() {
// Since this cancellation won't affect the streams directly, it allows the streams to finish their current activities before stopping the server loop.
s.gracefulCancel()
<-s.done
}
Expand Down Expand Up @@ -178,26 +182,21 @@ type handlingLoop[Command any] struct {
}

func (h *handlingLoop[Command]) run(s *Server, client *multiClient) error {
// Use this context to log onto the stream, and to cancel with server.Stop
ctx, cancel := cancelWith(h.stream.Context(), s.ctx)
defer cancel()

// Use this context to log onto the stream, but cancel with server.GracefulStop
gCtx, cancel := cancelWith(ctx, s.gracefulCtx)
defer cancel()

// We deliberately use the stream's context for logging, running the handler callback and acquiring system info.
ctx := h.stream.Context()
for {
// Graceful stop
select {
case <-gCtx.Done():
case <-s.gracefulCtx.Done():
log.Debugf(ctx, "Stopping serving %s requests", reflect.TypeFor[Command]())
return nil
default:
}

log.Debugf(ctx, "Started serving %s requests", reflect.TypeFor[Command]())

// Handle a single command
msg, ok, err := receiveWithContext(gCtx, h.stream.Recv)
// Handle a single command responsive to the cancellation of s.gracefulCtx.
msg, ok, err := receiveWithContext(s.gracefulCtx, h.stream.Recv)
if err != nil {
return fmt.Errorf("could not receive ProAttachCmd: %w", err)
} else if !ok {
Expand All @@ -218,18 +217,11 @@ func (h *handlingLoop[Command]) run(s *Server, client *multiClient) error {
}

if err = client.SendInfo(info); err != nil {
log.Warningf(ctx, "Streamserver: could not stream back info after command completion")
log.Warningf(ctx, "Streamserver: could not stream back info after command completion: %v", err)
}
}
}

// cancelWith creates a child context that is cancelled when with is done.
func cancelWith(ctx, with context.Context) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(ctx)
context.AfterFunc(with, cancel)
return ctx, cancel
}

// Receive with context calls the recv receiver asyncronously.
// Returns (message, message error) if recv returned.
// Returns (nil, context error) if the context was cancelled.
Expand Down

0 comments on commit 400acf7

Please sign in to comment.