Skip to content

Commit

Permalink
Handle unexpected termination of plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
sukhwinder33445 committed Oct 10, 2023
1 parent 22b95fa commit 4185f90
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
2 changes: 1 addition & 1 deletion internal/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (c *Channel) StartPlugin(pluginDir string) error {
c.mu.Lock()
defer c.mu.Unlock()

if c.cmd != nil {
if c.cmd != nil && c.rpc.Err() == nil {
return nil
}

Expand Down
26 changes: 13 additions & 13 deletions pkg/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ type RPC struct {
lastRequestId uint64
requestsMu sync.Mutex

errChannel chan struct{} // never transports a value, only closed through setErr() to signal an occurred error
err *Error // only initialised via setErr(), if a rpc (Fatal/non-recoverable) error has occurred
errMu sync.Mutex
errChannel chan struct{} // never transports a value, only closed through setErr() to signal an occurred error
err *Error // only initialised via setErr(), if a rpc (Fatal/non-recoverable) error has occurred
errMu sync.Mutex
requestedShutdown bool
}

// NewRPC creates and returns an RPC instance
Expand Down Expand Up @@ -121,6 +122,7 @@ func (r *RPC) Close() error {
r.encoderMu.Lock()
defer r.encoderMu.Unlock()

r.requestedShutdown = true
return r.writer.Close()
}

Expand All @@ -129,6 +131,13 @@ func (r *RPC) setErr(err error) {
defer r.errMu.Unlock()

if r.err == nil {
pendingReqMsg := fmt.Sprintf("cancelling %d pending request(s)", len(r.pendingRequests))
if r.requestedShutdown {
r.logger.Infof("Plugin shutdown triggered: %s", pendingReqMsg)
} else {
r.logger.Warnf("Plugin terminated unexpectedly: %s", pendingReqMsg)
}

r.err = &Error{cause: err}
close(r.errChannel)
}
Expand All @@ -137,21 +146,12 @@ func (r *RPC) setErr(err error) {
// processResponses sends responses to its channel (identified by response.id)
// In case of any error, all pending requests are dropped
func (r *RPC) processResponses() {
defer func() {
r.requestsMu.Lock()
r.logger.Infof("dropping %d pending request(s)", len(r.pendingRequests))
r.pendingRequests = nil
r.requestsMu.Unlock()
}()

for r.Err() == nil {
var response Response
err := r.decoder.Decode(&response)

if err != nil {
if !errors.Is(err, io.EOF) { // not a plugin shutdown request
r.setErr(fmt.Errorf("failed to decode json response: %w", err))
}
r.setErr(fmt.Errorf("failed to read json response: %w", err))

return
}
Expand Down

0 comments on commit 4185f90

Please sign in to comment.