From 2230fb6f5fc1ae7ac3ca3f2250f88b0006eaa3f6 Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Thu, 11 Apr 2024 11:10:25 -0400 Subject: [PATCH] MM-57018: support reattaching plugins (#26421) * ProfileImageBytes for EnsureBotOptions * leverage plugintest.NewAPI * fix linting * add UpdateUserRoles to plugin api * MM-57018: support reattaching plugins Expose a local-only API for reattaching plugins: instead of the server starting and managing the process itself, allow the plugin to be launched externally (eg within a unit test) and reattach to an existing server instance to provide the unit test with a fully functional RPC API, sidestepping the need for mocking the plugin API in most cases. In the future, this may become the basis for running plugins in a sidecar container. Fixes: https://mattermost.atlassian.net/browse/MM-57018 * drop unused supervisor.pid * factor out checkMinServerVersion * factor out startPluginServer * restore missing setPluginState on successful reattach * avoid passing around a stale registeredPlugin * inline initializePluginImplementation * have IsValid return an error * explicitly close rpcClient In the case of reattached plugins, the Unix socket won't necessarily disappear leaving the muxBrokers blocked indefinitely. And `Kill()` doesn't do anything if there's no process being managed. * explicitly detachPlugin * emphasize gRPC not being supported --------- Co-authored-by: Mattermost Build --- server/channels/api4/plugin_local.go | 47 ++++++ server/channels/app/app_iface.go | 4 + .../app/opentracing/opentracing_layer.go | 44 ++++++ server/channels/app/plugin_reattach.go | 50 +++++++ server/i18n/en.json | 16 ++ server/public/model/client4.go | 30 ++++ server/public/model/plugin_reattach.go | 59 ++++++++ server/public/plugin/client.go | 62 +++++++- server/public/plugin/environment.go | 141 ++++++++++++++---- server/public/plugin/health_check_test.go | 4 +- .../plugintest/example_reattach_test.go | 85 +++++++++++ server/public/plugin/supervisor.go | 112 +++++++++----- 12 files changed, 582 insertions(+), 72 deletions(-) create mode 100644 server/channels/app/plugin_reattach.go create mode 100644 server/public/model/plugin_reattach.go create mode 100644 server/public/plugin/plugintest/example_reattach_test.go diff --git a/server/channels/api4/plugin_local.go b/server/channels/api4/plugin_local.go index 1aee887062a..88d1e1d6ce7 100644 --- a/server/channels/api4/plugin_local.go +++ b/server/channels/api4/plugin_local.go @@ -3,6 +3,13 @@ package api4 +import ( + "encoding/json" + "net/http" + + "github.com/mattermost/mattermost/server/public/model" +) + func (api *API) InitPluginLocal() { api.BaseRoutes.Plugins.Handle("", api.APILocal(uploadPlugin, handlerParamFileAPI)).Methods("POST") api.BaseRoutes.Plugins.Handle("", api.APILocal(getPlugins)).Methods("GET") @@ -12,4 +19,44 @@ func (api *API) InitPluginLocal() { api.BaseRoutes.Plugin.Handle("/disable", api.APILocal(disablePlugin)).Methods("POST") api.BaseRoutes.Plugins.Handle("/marketplace", api.APILocal(installMarketplacePlugin)).Methods("POST") api.BaseRoutes.Plugins.Handle("/marketplace", api.APILocal(getMarketplacePlugins)).Methods("GET") + api.BaseRoutes.Plugins.Handle("/reattach", api.APILocal(reattachPlugin)).Methods("POST") + api.BaseRoutes.Plugin.Handle("/detach", api.APILocal(detachPlugin)).Methods("POST") +} + +// reattachPlugin allows the server to bind to an existing plugin instance launched elsewhere. +// +// This API is only exposed over a local socket. +func reattachPlugin(c *Context, w http.ResponseWriter, r *http.Request) { + var pluginReattachRequest model.PluginReattachRequest + if err := json.NewDecoder(r.Body).Decode(&pluginReattachRequest); err != nil { + c.Err = model.NewAppError("reattachPlugin", "api4.plugin.reattachPlugin.invalid_request", nil, err.Error(), http.StatusBadRequest) + return + } + + if err := pluginReattachRequest.IsValid(); err != nil { + c.Err = err + return + } + + err := c.App.ReattachPlugin(pluginReattachRequest.Manifest, pluginReattachRequest.PluginReattachConfig) + if err != nil { + c.Err = err + return + } +} + +// detachPlugin detaches a previously reattached plugin. +// +// This API is only exposed over a local socket. +func detachPlugin(c *Context, w http.ResponseWriter, r *http.Request) { + c.RequirePluginId() + if c.Err != nil { + return + } + + err := c.App.DetachPlugin(c.Params.PluginId) + if err != nil { + c.Err = err + return + } } diff --git a/server/channels/app/app_iface.go b/server/channels/app/app_iface.go index d00d4feaba9..edd7252027f 100644 --- a/server/channels/app/app_iface.go +++ b/server/channels/app/app_iface.go @@ -129,6 +129,8 @@ type AppIface interface { // DemoteUserToGuest Convert user's roles and all his membership's roles from // regular user roles to guest roles. DemoteUserToGuest(c request.CTX, user *model.User) *model.AppError + // DetachPlugin allows the server to bind to an existing plugin instance launched elsewhere. + DetachPlugin(pluginId string) *model.AppError // DisablePlugin will set the config for an installed plugin to disabled, triggering deactivation if active. // Notifies cluster peers through config change. DisablePlugin(id string) *model.AppError @@ -303,6 +305,8 @@ type AppIface interface { // PromoteGuestToUser Convert user's roles and all his membership's roles from // guest roles to regular user roles. PromoteGuestToUser(c request.CTX, user *model.User, requestorId string) *model.AppError + // ReattachPlugin allows the server to bind to an existing plugin instance launched elsewhere. + ReattachPlugin(manifest *model.Manifest, pluginReattachConfig *model.PluginReattachConfig) *model.AppError // Removes a listener function by the unique ID returned when AddConfigListener was called RemoveConfigListener(id string) // RenameChannel is used to rename the channel Name and the DisplayName fields diff --git a/server/channels/app/opentracing/opentracing_layer.go b/server/channels/app/opentracing/opentracing_layer.go index 4149da59d28..9cf5e7a1496 100644 --- a/server/channels/app/opentracing/opentracing_layer.go +++ b/server/channels/app/opentracing/opentracing_layer.go @@ -3741,6 +3741,28 @@ func (a *OpenTracingAppLayer) DemoteUserToGuest(c request.CTX, user *model.User) return resultVar0 } +func (a *OpenTracingAppLayer) DetachPlugin(pluginId string) *model.AppError { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.DetachPlugin") + + a.ctx = newCtx + a.app.Srv().Store().SetContext(newCtx) + defer func() { + a.app.Srv().Store().SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0 := a.app.DetachPlugin(pluginId) + + if resultVar0 != nil { + span.LogFields(spanlog.Error(resultVar0)) + ext.Error.Set(span, true) + } + + return resultVar0 +} + func (a *OpenTracingAppLayer) DisableAutoResponder(rctx request.CTX, userID string, asAdmin bool) *model.AppError { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.DisableAutoResponder") @@ -13945,6 +13967,28 @@ func (a *OpenTracingAppLayer) ReadFile(path string) ([]byte, *model.AppError) { return resultVar0, resultVar1 } +func (a *OpenTracingAppLayer) ReattachPlugin(manifest *model.Manifest, pluginReattachConfig *model.PluginReattachConfig) *model.AppError { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.ReattachPlugin") + + a.ctx = newCtx + a.app.Srv().Store().SetContext(newCtx) + defer func() { + a.app.Srv().Store().SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0 := a.app.ReattachPlugin(manifest, pluginReattachConfig) + + if resultVar0 != nil { + span.LogFields(spanlog.Error(resultVar0)) + ext.Error.Set(span, true) + } + + return resultVar0 +} + func (a *OpenTracingAppLayer) RecycleDatabaseConnection(rctx request.CTX) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.RecycleDatabaseConnection") diff --git a/server/channels/app/plugin_reattach.go b/server/channels/app/plugin_reattach.go new file mode 100644 index 00000000000..846c681a5c7 --- /dev/null +++ b/server/channels/app/plugin_reattach.go @@ -0,0 +1,50 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. +package app + +import ( + "net/http" + + "github.com/mattermost/mattermost/server/public/model" +) + +// ReattachPlugin allows the server to bind to an existing plugin instance launched elsewhere. +func (a *App) ReattachPlugin(manifest *model.Manifest, pluginReattachConfig *model.PluginReattachConfig) *model.AppError { + return a.ch.ReattachPlugin(manifest, pluginReattachConfig) +} + +// ReattachPlugin allows the server to bind to an existing plugin instance launched elsewhere. +func (ch *Channels) ReattachPlugin(manifest *model.Manifest, pluginReattachConfig *model.PluginReattachConfig) *model.AppError { + pluginsEnvironment := ch.GetPluginsEnvironment() + if pluginsEnvironment == nil { + return model.NewAppError("ReattachPlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented) + } + + ch.DetachPlugin(manifest.Id) + + // Reattach to the plugin + if err := pluginsEnvironment.Reattach(manifest, pluginReattachConfig); err != nil { + return model.NewAppError("ReattachPlugin", "app.plugin.reattach.app_error", nil, "", http.StatusInternalServerError).Wrap(err) + } + + return nil +} + +// DetachPlugin allows the server to bind to an existing plugin instance launched elsewhere. +func (a *App) DetachPlugin(pluginId string) *model.AppError { + return a.ch.DetachPlugin(pluginId) +} + +// DetachPlugin allows the server to bind to an existing plugin instance launched elsewhere. +func (ch *Channels) DetachPlugin(pluginID string) *model.AppError { + pluginsEnvironment := ch.GetPluginsEnvironment() + if pluginsEnvironment == nil { + return model.NewAppError("DetachPlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented) + } + + // Deactivate and remove any existing plugin, if present. + pluginsEnvironment.Deactivate(pluginID) + pluginsEnvironment.RemovePlugin(pluginID) + + return nil +} diff --git a/server/i18n/en.json b/server/i18n/en.json index 53c0e7961c7..27abe9f077a 100644 --- a/server/i18n/en.json +++ b/server/i18n/en.json @@ -4614,6 +4614,10 @@ "id": "api.websocket_handler.server_busy.app_error", "translation": "Server is busy, non-critical services are temporarily unavailable." }, + { + "id": "api4.plugin.reattachPlugin.invalid_request", + "translation": "Failed to parse request" + }, { "id": "app.acknowledgement.delete.app_error", "translation": "Unable to delete acknowledgement." @@ -6290,6 +6294,10 @@ "id": "app.plugin.not_installed.app_error", "translation": "Plugin is not installed." }, + { + "id": "app.plugin.reattach.app_error", + "translation": "Failed to reattach plugin" + }, { "id": "app.plugin.remove.app_error", "translation": "Unable to delete plugin." @@ -10142,6 +10150,14 @@ "id": "plugin_api.send_mail.missing_to", "translation": "Missing TO address." }, + { + "id": "plugin_reattach_request.is_valid.manifest.app_error", + "translation": "Missing manifest" + }, + { + "id": "plugin_reattach_request.is_valid.plugin_reattach_config.app_error", + "translation": "Missing plugin reattach config" + }, { "id": "searchengine.bleve.disabled.error", "translation": "Error purging Bleve indexes: engine is disabled" diff --git a/server/public/model/client4.go b/server/public/model/client4.go index 947f9c1468c..f1074186d92 100644 --- a/server/public/model/client4.go +++ b/server/public/model/client4.go @@ -7352,6 +7352,36 @@ func (c *Client4) InstallMarketplacePlugin(ctx context.Context, request *Install return &m, BuildResponse(r), nil } +// ReattachPlugin asks the server to reattach to a plugin launched by another process. +// +// Only available in local mode, and currently only used for testing. +func (c *Client4) ReattachPlugin(ctx context.Context, request *PluginReattachRequest) (*Response, error) { + buf, err := json.Marshal(request) + if err != nil { + return nil, NewAppError("ReattachPlugin", "api.marshal_error", nil, "", http.StatusInternalServerError).Wrap(err) + } + r, err := c.DoAPIPost(ctx, c.pluginsRoute()+"/reattach", string(buf)) + if err != nil { + return BuildResponse(r), err + } + defer closeBody(r) + + return BuildResponse(r), nil +} + +// DetachPlugin detaches a previously reattached plugin. +// +// Only available in local mode, and currently only used for testing. +func (c *Client4) DetachPlugin(ctx context.Context, pluginID string) (*Response, error) { + r, err := c.DoAPIPost(ctx, c.pluginRoute(pluginID)+"/detach", "") + if err != nil { + return BuildResponse(r), err + } + defer closeBody(r) + + return BuildResponse(r), nil +} + // GetPlugins will return a list of plugin manifests for currently active plugins. func (c *Client4) GetPlugins(ctx context.Context) (*PluginsResponse, *Response, error) { r, err := c.DoAPIGet(ctx, c.pluginsRoute(), "") diff --git a/server/public/model/plugin_reattach.go b/server/public/model/plugin_reattach.go new file mode 100644 index 00000000000..77f5552b613 --- /dev/null +++ b/server/public/model/plugin_reattach.go @@ -0,0 +1,59 @@ +package model + +import ( + "net" + "net/http" + + "github.com/hashicorp/go-plugin" +) + +// PluginReattachConfig is a serializable version of go-plugin's ReattachConfig. +type PluginReattachConfig struct { + Protocol string + ProtocolVersion int + Addr net.UnixAddr + Pid int + Test bool +} + +func NewPluginReattachConfig(pluginReattachmentConfig *plugin.ReattachConfig) *PluginReattachConfig { + return &PluginReattachConfig{ + Protocol: string(pluginReattachmentConfig.Protocol), + ProtocolVersion: pluginReattachmentConfig.ProtocolVersion, + Addr: net.UnixAddr{ + Name: pluginReattachmentConfig.Addr.String(), + Net: pluginReattachmentConfig.Addr.Network(), + }, + Pid: pluginReattachmentConfig.Pid, + Test: pluginReattachmentConfig.Test, + } +} + +func (prc *PluginReattachConfig) ToHashicorpPluginReattachmentConfig() *plugin.ReattachConfig { + addr := prc.Addr + + return &plugin.ReattachConfig{ + Protocol: plugin.Protocol(prc.Protocol), + ProtocolVersion: prc.ProtocolVersion, + Addr: &addr, + Pid: prc.Pid, + ReattachFunc: nil, + Test: prc.Test, + } +} + +type PluginReattachRequest struct { + Manifest *Manifest + PluginReattachConfig *PluginReattachConfig +} + +func (prr *PluginReattachRequest) IsValid() *AppError { + if prr.Manifest == nil { + return NewAppError("PluginReattachRequest.IsValid", "plugin_reattach_request.is_valid.manifest.app_error", nil, "", http.StatusBadRequest) + } + if prr.PluginReattachConfig == nil { + return NewAppError("PluginReattachRequest.IsValid", "plugin_reattach_request.is_valid.plugin_reattach_config.app_error", nil, "", http.StatusBadRequest) + } + + return nil +} diff --git a/server/public/plugin/client.go b/server/public/plugin/client.go index c80ab8564e8..56b42f30597 100644 --- a/server/public/plugin/client.go +++ b/server/public/plugin/client.go @@ -4,6 +4,8 @@ package plugin import ( + "context" + "github.com/hashicorp/go-plugin" ) @@ -12,10 +14,51 @@ const ( BotUserKey = InternalKeyPrefix + "botid" ) -// Starts the serving of a Mattermost plugin over net/rpc. gRPC is not yet supported. +// WithTestContext provides a context typically used to terminate a plugin from a unit test. +func WithTestContext(ctx context.Context) func(*plugin.ServeConfig) error { + return func(config *plugin.ServeConfig) error { + if config.Test == nil { + config.Test = &plugin.ServeTestConfig{} + } + + config.Test.Context = ctx + + return nil + } +} + +// WithTestReattachConfigCh configures the channel to receive the ReattachConfig used to reattach +// an externally launched plugin instance with the Mattermost server. +func WithTestReattachConfigCh(reattachConfigCh chan<- *plugin.ReattachConfig) func(*plugin.ServeConfig) error { + return func(config *plugin.ServeConfig) error { + if config.Test == nil { + config.Test = &plugin.ServeTestConfig{} + } + + config.Test.ReattachConfigCh = reattachConfigCh + + return nil + } +} + +// WithTestCloseCh provides a channel that signals when the plugin exits. +func WithTestCloseCh(closeCh chan<- struct{}) func(*plugin.ServeConfig) error { + return func(config *plugin.ServeConfig) error { + if config.Test == nil { + config.Test = &plugin.ServeTestConfig{} + } + + config.Test.CloseCh = closeCh + + return nil + } +} + +// Starts the serving of a Mattermost plugin over net/rpc. gRPC is not supported. // -// Call this when your plugin is ready to start. -func ClientMain(pluginImplementation any) { +// Call this when your plugin is ready to start. Options allow configuring plugins for testing +// scenarios. +func ClientMain(pluginImplementation any, opts ...func(config *plugin.ServeConfig) error) { impl, ok := pluginImplementation.(interface { SetAPI(api API) SetDriver(driver Driver) @@ -30,10 +73,19 @@ func ClientMain(pluginImplementation any) { "hooks": &hooksPlugin{hooks: pluginImplementation}, } - plugin.Serve(&plugin.ServeConfig{ + serveConfig := &plugin.ServeConfig{ HandshakeConfig: handshake, Plugins: pluginMap, - }) + } + + for _, opt := range opts { + err := opt(serveConfig) + if err != nil { + panic("failed to start serving plugin: " + err.Error()) + } + } + + plugin.Serve(serveConfig) } type MattermostPlugin struct { diff --git a/server/public/plugin/environment.go b/server/public/plugin/environment.go index acf808dea39..ccca0466d2c 100644 --- a/server/public/plugin/environment.go +++ b/server/public/plugin/environment.go @@ -11,6 +11,7 @@ import ( "sync" "time" + plugin "github.com/hashicorp/go-plugin" "github.com/pkg/errors" "github.com/mattermost/mattermost/server/public/model" @@ -196,6 +197,15 @@ func (env *Environment) setPluginState(id string, state int) { } } +// setPluginSupervisor records the supervisor for a registered plugin. +func (env *Environment) setPluginSupervisor(id string, supervisor *supervisor) { + if rp, ok := env.registeredPlugins.Load(id); ok { + p := rp.(registeredPlugin) + p.supervisor = supervisor + env.registeredPlugins.Store(id, p) + } +} + // PublicFilesPath returns a path and true if the plugin with the given id is active. // It returns an empty string and false if the path is not set or invalid func (env *Environment) PublicFilesPath(id string) (string, error) { @@ -254,6 +264,46 @@ func (env *Environment) GetManifest(pluginId string) (*model.Manifest, error) { return nil, ErrNotFound } +func checkMinServerVersion(pluginInfo *model.BundleInfo) error { + if pluginInfo.Manifest.MinServerVersion == "" { + return nil + } + + fulfilled, err := pluginInfo.Manifest.MeetMinServerVersion(model.CurrentVersion) + if err != nil { + return fmt.Errorf("%v: %v", err.Error(), pluginInfo.Manifest.Id) + } + if !fulfilled { + return fmt.Errorf("plugin requires Mattermost %v: %v", pluginInfo.Manifest.MinServerVersion, pluginInfo.Manifest.Id) + } + + return nil +} + +func (env *Environment) startPluginServer(pluginInfo *model.BundleInfo, opts ...func(*supervisor, *plugin.ClientConfig) error) error { + sup, err := newSupervisor(pluginInfo, env.newAPIImpl(pluginInfo.Manifest), env.dbDriver, env.logger, env.metrics, opts...) + if err != nil { + return errors.Wrapf(err, "unable to start plugin: %v", pluginInfo.Manifest.Id) + } + + // We pre-emptively set the state to running to prevent re-entrancy issues. + // The plugin's OnActivate hook can in-turn call UpdateConfiguration + // which again calls this method. This method is guarded against multiple calls, + // but fails if it is called recursively. + // + // Therefore, setting the state to running prevents this from happening, + // and in case there is an error, the defer clause will set the proper state anyways. + env.setPluginState(pluginInfo.Manifest.Id, model.PluginStateRunning) + + if err := sup.Hooks().OnActivate(); err != nil { + sup.Shutdown() + return err + } + env.setPluginSupervisor(pluginInfo.Manifest.Id, sup) + + return nil +} + func (env *Environment) Activate(id string) (manifest *model.Manifest, activated bool, reterr error) { defer func() { if reterr != nil { @@ -296,20 +346,16 @@ func (env *Environment) Activate(id string) (manifest *model.Manifest, activated } }() - if pluginInfo.Manifest.MinServerVersion != "" { - fulfilled, err := pluginInfo.Manifest.MeetMinServerVersion(model.CurrentVersion) - if err != nil { - return nil, false, fmt.Errorf("%v: %v", err.Error(), id) - } - if !fulfilled { - return nil, false, fmt.Errorf("plugin requires Mattermost %v: %v", pluginInfo.Manifest.MinServerVersion, id) - } + err = checkMinServerVersion(pluginInfo) + if err != nil { + return nil, false, err } componentActivated := false if pluginInfo.Manifest.HasWebapp() { - updatedManifest, err := env.UnpackWebappBundle(id) + var updatedManifest *model.Manifest + updatedManifest, err = env.UnpackWebappBundle(id) if err != nil { return nil, false, errors.Wrapf(err, "unable to generate webapp bundle: %v", id) } @@ -319,27 +365,10 @@ func (env *Environment) Activate(id string) (manifest *model.Manifest, activated } if pluginInfo.Manifest.HasServer() { - sup, err := newSupervisor(pluginInfo, env.newAPIImpl(pluginInfo.Manifest), env.dbDriver, env.logger, env.metrics) + err = env.startPluginServer(pluginInfo, WithExecutableFromManifest(pluginInfo)) if err != nil { - return nil, false, errors.Wrapf(err, "unable to start plugin: %v", id) - } - - // We pre-emptively set the state to running to prevent re-entrancy issues. - // The plugin's OnActivate hook can in-turn call UpdateConfiguration - // which again calls this method. This method is guarded against multiple calls, - // but fails if it is called recursively. - // - // Therefore, setting the state to running prevents this from happening, - // and in case there is an error, the defer clause will set the proper state anyways. - env.setPluginState(id, model.PluginStateRunning) - - if err := sup.Hooks().OnActivate(); err != nil { - sup.Shutdown() return nil, false, err } - rp.supervisor = sup - env.registeredPlugins.Store(id, rp) - componentActivated = true } @@ -352,6 +381,64 @@ func (env *Environment) Activate(id string) (manifest *model.Manifest, activated return pluginInfo.Manifest, true, nil } +// Reattach allows the server to bind to an existing plugin instance launched elsewhere. +func (env *Environment) Reattach(manifest *model.Manifest, pluginReattachConfig *model.PluginReattachConfig) (reterr error) { + id := manifest.Id + + defer func() { + if reterr != nil { + env.SetPluginError(id, reterr.Error()) + } else { + env.SetPluginError(id, "") + } + }() + + // Check if we are already active + if env.IsActive(id) { + return nil + } + + pluginInfo := &model.BundleInfo{ + Path: "", + Manifest: manifest, + ManifestPath: "", + ManifestError: nil, + } + + rp := newRegisteredPlugin(pluginInfo) + env.registeredPlugins.Store(id, rp) + + defer func() { + if reterr == nil { + env.setPluginState(id, model.PluginStateRunning) + } else { + env.setPluginState(id, model.PluginStateFailedToStart) + } + }() + + err := checkMinServerVersion(pluginInfo) + if err != nil { + return nil + } + + if !pluginInfo.Manifest.HasServer() { + return errors.New("cannot reattach plugin without server component") + } + + if pluginInfo.Manifest.HasWebapp() { + env.logger.Warn("Ignoring webapp for reattached plugin", mlog.String("plugin_id", id)) + } + + err = env.startPluginServer(pluginInfo, WithReattachConfig(pluginReattachConfig)) + if err != nil { + return nil + } + + mlog.Debug("Plugin reattached", mlog.String("plugin_id", pluginInfo.Manifest.Id), mlog.String("version", pluginInfo.Manifest.Version)) + + return nil +} + func (env *Environment) RemovePlugin(id string) { if _, ok := env.registeredPlugins.Load(id); ok { env.registeredPlugins.Delete(id) diff --git a/server/public/plugin/health_check_test.go b/server/public/plugin/health_check_test.go index b05e8be5ca0..c9cf7f2e5c4 100644 --- a/server/public/plugin/health_check_test.go +++ b/server/public/plugin/health_check_test.go @@ -52,7 +52,7 @@ func testPluginHealthCheckSuccess(t *testing.T) { bundle := model.BundleInfoForPath(dir) logger := mlog.CreateConsoleTestLogger(t) - supervisor, err := newSupervisor(bundle, nil, nil, logger, nil) + supervisor, err := newSupervisor(bundle, nil, nil, logger, nil, WithExecutableFromManifest(bundle)) require.NoError(t, err) require.NotNil(t, supervisor) defer supervisor.Shutdown() @@ -93,7 +93,7 @@ func testPluginHealthCheckPanic(t *testing.T) { bundle := model.BundleInfoForPath(dir) logger := mlog.CreateConsoleTestLogger(t) - supervisor, err := newSupervisor(bundle, nil, nil, logger, nil) + supervisor, err := newSupervisor(bundle, nil, nil, logger, nil, WithExecutableFromManifest(bundle)) require.NoError(t, err) require.NotNil(t, supervisor) defer supervisor.Shutdown() diff --git a/server/public/plugin/plugintest/example_reattach_test.go b/server/public/plugin/plugintest/example_reattach_test.go new file mode 100644 index 00000000000..fb4ce08b03d --- /dev/null +++ b/server/public/plugin/plugintest/example_reattach_test.go @@ -0,0 +1,85 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package plugintest_test + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + goPlugin "github.com/hashicorp/go-plugin" + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/plugin" +) + +type UnitTestedPlugin struct { + plugin.MattermostPlugin +} + +// This example demonstrates a plugin that's launched during a unit test and reattached to an +// existing server instance to obtain a real PluginAPI. +func Example_unitTestingPlugins() { + t := &testing.T{} + + // The manifest is usually generated dynamically. + manifest := &model.Manifest{ + Id: "reattach-plugin-test", + } + + // ctx, and specifically cancel, gives us control over the plugin lifecycle + ctx, cancel := context.WithCancel(context.Background()) + + // reattachConfigCh is the means by which we get the Unix socket information to relay back + // to the server and finish the reattachment. + reattachConfigCh := make(chan *goPlugin.ReattachConfig) + + // closeCh tells us when the plugin exits and allows for cleanup. + closeCh := make(chan struct{}) + + // plugin.ClientMain with options allows for reattachment. + go plugin.ClientMain( + &UnitTestedPlugin{}, + plugin.WithTestContext(ctx), + plugin.WithTestReattachConfigCh(reattachConfigCh), + plugin.WithTestCloseCh(closeCh), + ) + + // Make sure the plugin shuts down normally with the test + t.Cleanup(func() { + cancel() + + select { + case <-closeCh: + case <-time.After(5 * time.Second): + panic("plugin failed to close after 5 seconds") + } + }) + + // Wait for the plugin to start and then reattach to the server. + var reattachConfig *goPlugin.ReattachConfig + select { + case reattachConfig = <-reattachConfigCh: + case <-time.After(5 * time.Second): + t.Fatal("failed to get reattach config") + } + + // Reattaching requires a local mode client. + socketPath := os.Getenv("MM_LOCALSOCKETPATH") + if socketPath == "" { + socketPath = model.LocalModeSocketPath + } + + clientLocal := model.NewAPIv4SocketClient(socketPath) + _, err := clientLocal.ReattachPlugin(ctx, &model.PluginReattachRequest{ + Manifest: manifest, + PluginReattachConfig: model.NewPluginReattachConfig(reattachConfig), + }) + require.NoError(t, err) + + // At this point, the plugin is ready for unit testing and will be cleaned up automatically + // with the testing.T instance. +} diff --git a/server/public/plugin/supervisor.go b/server/public/plugin/supervisor.go index fad1712ef17..3ab7add7858 100644 --- a/server/public/plugin/supervisor.go +++ b/server/public/plugin/supervisor.go @@ -23,15 +23,58 @@ import ( ) type supervisor struct { - lock sync.RWMutex - client *plugin.Client - hooks Hooks - implemented [TotalHooksID]bool - pid int - hooksClient *hooksRPCClient + lock sync.RWMutex + client *plugin.Client + hooks Hooks + implemented [TotalHooksID]bool + hooksClient *hooksRPCClient + isReattached bool } -func newSupervisor(pluginInfo *model.BundleInfo, apiImpl API, driver Driver, parentLogger *mlog.Logger, metrics metricsInterface) (retSupervisor *supervisor, retErr error) { +func WithExecutableFromManifest(pluginInfo *model.BundleInfo) func(*supervisor, *plugin.ClientConfig) error { + return func(_ *supervisor, clientConfig *plugin.ClientConfig) error { + executable := pluginInfo.Manifest.GetExecutableForRuntime(runtime.GOOS, runtime.GOARCH) + if executable == "" { + return fmt.Errorf("backend executable not found for environment: %s/%s", runtime.GOOS, runtime.GOARCH) + } + + executable = filepath.Clean(filepath.Join(".", executable)) + if strings.HasPrefix(executable, "..") { + return fmt.Errorf("invalid backend executable: %s", executable) + } + + executable = filepath.Join(pluginInfo.Path, executable) + + cmd := exec.Command(executable) + + // This doesn't add more security than before + // but removes the SecureConfig is nil warning. + // https://mattermost.atlassian.net/browse/MM-49167 + pluginChecksum, err := getPluginExecutableChecksum(executable) + if err != nil { + return errors.Wrapf(err, "unable to generate plugin checksum") + } + + clientConfig.Cmd = cmd + clientConfig.SecureConfig = &plugin.SecureConfig{ + Checksum: pluginChecksum, + Hash: sha256.New(), + } + + return nil + } +} + +func WithReattachConfig(pluginReattachConfig *model.PluginReattachConfig) func(*supervisor, *plugin.ClientConfig) error { + return func(sup *supervisor, clientConfig *plugin.ClientConfig) error { + clientConfig.Reattach = pluginReattachConfig.ToHashicorpPluginReattachmentConfig() + sup.isReattached = true + + return nil + } +} + +func newSupervisor(pluginInfo *model.BundleInfo, apiImpl API, driver Driver, parentLogger *mlog.Logger, metrics metricsInterface, opts ...func(*supervisor, *plugin.ClientConfig) error) (retSupervisor *supervisor, retErr error) { sup := supervisor{} defer func() { if retErr != nil { @@ -54,49 +97,28 @@ func newSupervisor(pluginInfo *model.BundleInfo, apiImpl API, driver Driver, par }, } - executable := pluginInfo.Manifest.GetExecutableForRuntime(runtime.GOOS, runtime.GOARCH) - if executable == "" { - return nil, fmt.Errorf("backend executable not found for environment: %s/%s", runtime.GOOS, runtime.GOARCH) - } - - executable = filepath.Clean(filepath.Join(".", executable)) - if strings.HasPrefix(executable, "..") { - return nil, fmt.Errorf("invalid backend executable: %s", executable) - } - - executable = filepath.Join(pluginInfo.Path, executable) - - cmd := exec.Command(executable) - - // This doesn't add more security than before - // but removes the SecureConfig is nil warning. - // https://mattermost.atlassian.net/browse/MM-49167 - pluginChecksum, err := getPluginExecutableChecksum(executable) - if err != nil { - return nil, errors.Wrapf(err, "unable to generate plugin checksum") - } - - sup.client = plugin.NewClient(&plugin.ClientConfig{ + clientConfig := &plugin.ClientConfig{ HandshakeConfig: handshake, Plugins: pluginMap, - Cmd: cmd, SyncStdout: wrappedLogger.With(mlog.String("source", "plugin_stdout")).StdLogWriter(), SyncStderr: wrappedLogger.With(mlog.String("source", "plugin_stderr")).StdLogWriter(), Logger: hclogAdaptedLogger, StartTimeout: time.Second * 3, - SecureConfig: &plugin.SecureConfig{ - Checksum: pluginChecksum, - Hash: sha256.New(), - }, - }) + } + for _, opt := range opts { + err := opt(&sup, clientConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to apply option") + } + } + + sup.client = plugin.NewClient(clientConfig) rpcClient, err := sup.client.Client() if err != nil { return nil, err } - sup.pid = cmd.Process.Pid - raw, err := rpcClient.Dispense("hooks") if err != nil { return nil, err @@ -126,6 +148,20 @@ func (sup *supervisor) Shutdown() { sup.lock.RLock() defer sup.lock.RUnlock() if sup.client != nil { + // For reattached plugins, Kill() is mostly a no-op, so manually clean up the + // underlying rpcClient. This might be something to upstream unless we're doing + // something else wrong. + if sup.isReattached { + rpcClient, err := sup.client.Client() + if err != nil { + mlog.Warn("Failed to obtain rpcClient on Shutdown") + } else { + if err = rpcClient.Close(); err != nil { + mlog.Warn("Failed to close rpcClient on Shutdown") + } + } + } + sup.client.Kill() }