diff --git a/internal/channel/channel.go b/internal/channel/channel.go index 2bd525d00..d69b2435b 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -29,7 +29,7 @@ func (c *Channel) StartPlugin(pluginDir string) error { c.mu.Lock() defer c.mu.Unlock() - if c.cmd != nil { + if c.cmd != nil && c.rpc.Err() == nil { return nil } diff --git a/pkg/rpc/rpc.go b/pkg/rpc/rpc.go index 9da3364af..6b062a7dd 100644 --- a/pkg/rpc/rpc.go +++ b/pkg/rpc/rpc.go @@ -45,9 +45,10 @@ type RPC struct { lastRequestId uint64 requestsMu sync.Mutex - errChannel chan struct{} // never transports a value, only closed through setErr() to signal an occurred error - err *Error // only initialised via setErr(), if a rpc (Fatal/non-recoverable) error has occurred - errMu sync.Mutex + errChannel chan struct{} // never transports a value, only closed through setErr() to signal an occurred error + err *Error // only initialised via setErr(), if a rpc (Fatal/non-recoverable) error has occurred + errMu sync.Mutex + requestedShutdown bool } // NewRPC creates and returns an RPC instance @@ -121,6 +122,7 @@ func (r *RPC) Close() error { r.encoderMu.Lock() defer r.encoderMu.Unlock() + r.requestedShutdown = true return r.writer.Close() } @@ -129,6 +131,13 @@ func (r *RPC) setErr(err error) { defer r.errMu.Unlock() if r.err == nil { + pendingReqMsg := fmt.Sprintf("cancelling %d pending request(s)", len(r.pendingRequests)) + if r.requestedShutdown { + r.logger.Infof("Plugin shutdown triggered: %s", pendingReqMsg) + } else { + r.logger.Warnf("Plugin terminated unexpectedly: %s", pendingReqMsg) + } + r.err = &Error{cause: err} close(r.errChannel) } @@ -137,21 +146,12 @@ func (r *RPC) setErr(err error) { // processResponses sends responses to its channel (identified by response.id) // In case of any error, all pending requests are dropped func (r *RPC) processResponses() { - defer func() { - r.requestsMu.Lock() - r.logger.Infof("dropping %d pending request(s)", len(r.pendingRequests)) - r.pendingRequests = nil - r.requestsMu.Unlock() - }() - for r.Err() == nil { var response Response err := r.decoder.Decode(&response) if err != nil { - if !errors.Is(err, io.EOF) { // not a plugin shutdown request - r.setErr(fmt.Errorf("failed to decode json response: %w", err)) - } + r.setErr(fmt.Errorf("failed to read json response: %w", err)) return }