From 9415fdc9150e8087d377d453e6f4bfefe2803465 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 12 Nov 2024 17:36:16 +0800 Subject: [PATCH] mcs: remove resource manager service (#8804) close tikv/pd#8805 Signed-off-by: Ryan Leung --- cmd/pd-server/main.go | 26 +- pkg/mcs/discovery/discover.go | 2 +- pkg/mcs/resourcemanager/server/apis/v1/api.go | 25 -- .../resourcemanager/server/grpc_service.go | 3 +- .../resourcemanager/server/install/install.go | 2 +- pkg/mcs/resourcemanager/server/server.go | 422 ------------------ pkg/mcs/resourcemanager/server/testutil.go | 79 ---- pkg/mcs/utils/constant/constant.go | 2 - pkg/member/participant.go | 3 - pkg/utils/keypath/key_path.go | 6 - .../mcs/discovery/register_test.go | 2 - .../mcs/resourcemanager/server_test.go | 124 ----- tests/testutil.go | 19 - 13 files changed, 4 insertions(+), 711 deletions(-) delete mode 100644 pkg/mcs/resourcemanager/server/server.go delete mode 100644 pkg/mcs/resourcemanager/server/testutil.go delete mode 100644 tests/integrations/mcs/resourcemanager/server_test.go diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index 443a50c6811..0181d4c47aa 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -27,7 +27,6 @@ import ( "github.com/tikv/pd/pkg/autoscaling" "github.com/tikv/pd/pkg/dashboard" "github.com/tikv/pd/pkg/errs" - resource_manager "github.com/tikv/pd/pkg/mcs/resourcemanager/server" scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server" tso "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/memory" @@ -53,7 +52,6 @@ import ( const ( apiMode = "api" tsoMode = "tso" - rmMode = "resource-manager" serviceModeEnv = "PD_SERVICE_MODE" ) @@ -78,10 +76,9 @@ func main() { func NewServiceCommand() *cobra.Command { cmd := &cobra.Command{ Use: "services ", - Short: "Run services, for example, tso, resource_manager", + Short: "Run services, for example, tso, scheduling", } cmd.AddCommand(NewTSOServiceCommand()) - cmd.AddCommand(NewResourceManagerServiceCommand()) cmd.AddCommand(NewSchedulingServiceCommand()) cmd.AddCommand(NewAPIServiceCommand()) return cmd @@ -129,27 +126,6 @@ func NewSchedulingServiceCommand() *cobra.Command { return cmd } -// NewResourceManagerServiceCommand returns the resource manager service command. -func NewResourceManagerServiceCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: rmMode, - Short: "Run the resource manager service", - Run: resource_manager.CreateServerWrapper, - } - cmd.Flags().StringP("name", "", "", "human-readable name for this resource manager member") - cmd.Flags().BoolP("version", "V", false, "print version information and exit") - cmd.Flags().StringP("config", "", "", "config file") - cmd.Flags().StringP("backend-endpoints", "", "", "url for etcd client") - cmd.Flags().StringP("listen-addr", "", "", "listen address for resource management service") - cmd.Flags().StringP("advertise-listen-addr", "", "", "advertise urls for listen address (default '${listen-addr}')") - cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs") - cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format") - cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format") - cmd.Flags().StringP("log-level", "L", "", "log level: debug, info, warn, error, fatal (default 'info')") - cmd.Flags().StringP("log-file", "", "", "log file path") - return cmd -} - // NewAPIServiceCommand returns the API service command. func NewAPIServiceCommand() *cobra.Command { cmd := &cobra.Command{ diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 083059780b4..d4234df893d 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -46,7 +46,7 @@ func Discover(cli *clientv3.Client, serviceName string) ([]string, error) { // GetMSMembers returns all the members of the specified service name. func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistryEntry, error) { switch serviceName { - case constant.TSOServiceName, constant.SchedulingServiceName, constant.ResourceManagerServiceName: + case constant.TSOServiceName, constant.SchedulingServiceName: servicePath := keypath.ServicePath(serviceName) resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit() if err != nil { diff --git a/pkg/mcs/resourcemanager/server/apis/v1/api.go b/pkg/mcs/resourcemanager/server/apis/v1/api.go index 3fd94e637d0..891072e898f 100644 --- a/pkg/mcs/resourcemanager/server/apis/v1/api.go +++ b/pkg/mcs/resourcemanager/server/apis/v1/api.go @@ -26,12 +26,10 @@ import ( "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/log" rmserver "github.com/tikv/pd/pkg/mcs/resourcemanager/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" - "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/reflectutil" ) @@ -84,17 +82,10 @@ func NewService(srv *rmserver.Service) *Service { apiHandlerEngine: apiHandlerEngine, root: endpoint, } - s.RegisterAdminRouter() s.RegisterRouter() return s } -// RegisterAdminRouter registers the router of the TSO admin handler. -func (s *Service) RegisterAdminRouter() { - router := s.root.Group("admin") - router.PUT("/log", changeLogLevel) -} - // RegisterRouter registers the router of the service. func (s *Service) RegisterRouter() { configEndpoint := s.root.Group("/config") @@ -113,22 +104,6 @@ func (s *Service) handler() http.Handler { }) } -func changeLogLevel(c *gin.Context) { - svr := c.MustGet(multiservicesapi.ServiceContextKey).(*rmserver.Service) - var level string - if err := c.Bind(&level); err != nil { - c.String(http.StatusBadRequest, err.Error()) - return - } - - if err := svr.SetLogLevel(level); err != nil { - c.String(http.StatusBadRequest, err.Error()) - return - } - log.SetLevel(logutil.StringToZapLogLevel(level)) - c.String(http.StatusOK, "The log level is updated.") -} - // postResourceGroup // // @Tags ResourceManager diff --git a/pkg/mcs/resourcemanager/server/grpc_service.go b/pkg/mcs/resourcemanager/server/grpc_service.go index 93243eb05be..21681bc0759 100644 --- a/pkg/mcs/resourcemanager/server/grpc_service.go +++ b/pkg/mcs/resourcemanager/server/grpc_service.go @@ -54,8 +54,7 @@ func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) { // Service is the gRPC service for resource manager. type Service struct { - ctx context.Context - *Server + ctx context.Context manager *Manager // settings } diff --git a/pkg/mcs/resourcemanager/server/install/install.go b/pkg/mcs/resourcemanager/server/install/install.go index 8573d5e52eb..8cd7e0eac85 100644 --- a/pkg/mcs/resourcemanager/server/install/install.go +++ b/pkg/mcs/resourcemanager/server/install/install.go @@ -28,5 +28,5 @@ func init() { // Install registers the API group and grpc service. func Install(register *registry.ServiceRegistry) { - register.RegisterService("ResourceManager", server.NewService[*server.Server]) + register.RegisterService("ResourceManager", server.NewService[*server.Manager]) } diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go deleted file mode 100644 index 05b01094801..00000000000 --- a/pkg/mcs/resourcemanager/server/server.go +++ /dev/null @@ -1,422 +0,0 @@ -// Copyright 2023 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "context" - "net/http" - "os" - "os/signal" - "runtime" - "sync" - "sync/atomic" - "syscall" - "time" - - grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/diagnosticspb" - "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/log" - "github.com/pingcap/sysutil" - "github.com/spf13/cobra" - bs "github.com/tikv/pd/pkg/basicserver" - "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/mcs/discovery" - "github.com/tikv/pd/pkg/mcs/server" - "github.com/tikv/pd/pkg/mcs/utils" - "github.com/tikv/pd/pkg/mcs/utils/constant" - "github.com/tikv/pd/pkg/member" - "github.com/tikv/pd/pkg/utils/apiutil" - "github.com/tikv/pd/pkg/utils/grpcutil" - "github.com/tikv/pd/pkg/utils/keypath" - "github.com/tikv/pd/pkg/utils/logutil" - "github.com/tikv/pd/pkg/utils/memberutil" - "github.com/tikv/pd/pkg/utils/metricutil" - "github.com/tikv/pd/pkg/versioninfo" - "go.uber.org/zap" - "google.golang.org/grpc" -) - -var _ bs.Server = (*Server)(nil) - -const serviceName = "Resource Manager" - -// Server is the resource manager server, and it implements bs.Server. -type Server struct { - *server.BaseServer - diagnosticspb.DiagnosticsServer - // Server state. 0 is not running, 1 is running. - isRunning int64 - - serverLoopCtx context.Context - serverLoopCancel func() - serverLoopWg sync.WaitGroup - - cfg *Config - - // for the primary election of resource manager - participant *member.Participant - - service *Service - - // primaryCallbacks will be called after the server becomes leader. - primaryCallbacks []func(context.Context) error - - // for service registry - serviceID *discovery.ServiceRegistryEntry - serviceRegister *discovery.ServiceRegister -} - -// Name returns the unique name for this server in the resource manager cluster. -func (s *Server) Name() string { - return s.cfg.Name -} - -// GetAddr returns the server address. -func (s *Server) GetAddr() string { - return s.cfg.ListenAddr -} - -// GetAdvertiseListenAddr returns the advertise address of the server. -func (s *Server) GetAdvertiseListenAddr() string { - return s.cfg.AdvertiseListenAddr -} - -// SetLogLevel sets log level. -func (s *Server) SetLogLevel(level string) error { - if !logutil.IsLevelLegal(level) { - return errors.Errorf("log level %s is illegal", level) - } - s.cfg.Log.Level = level - log.SetLevel(logutil.StringToZapLogLevel(level)) - log.Warn("log level changed", zap.String("level", log.GetLevel().String())) - return nil -} - -// Run runs the Resource Manager server. -func (s *Server) Run() (err error) { - if err = utils.InitClient(s); err != nil { - return err - } - - if s.serviceID, s.serviceRegister, err = utils.Register(s, constant.ResourceManagerServiceName); err != nil { - return err - } - - return s.startServer() -} - -func (s *Server) startServerLoop() { - s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) - s.serverLoopWg.Add(1) - go s.primaryElectionLoop() -} - -func (s *Server) primaryElectionLoop() { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() - - for { - select { - case <-s.serverLoopCtx.Done(): - log.Info("server is closed, exit resource manager primary election loop") - return - default: - } - - primary, checkAgain := s.participant.CheckLeader() - if checkAgain { - continue - } - if primary != nil { - log.Info("start to watch the primary", zap.Stringer("resource-manager-primary", primary)) - // Watch will keep looping and never return unless the primary/leader has changed. - primary.Watch(s.serverLoopCtx) - log.Info("the resource manager primary has changed, try to re-campaign a primary") - } - - s.campaignLeader() - } -} - -func (s *Server) campaignLeader() { - log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Name())) - if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil { - if err.Error() == errs.ErrEtcdTxnConflict.Error() { - log.Info("campaign resource manager primary meets error due to txn conflict, another server may campaign successfully", - zap.String("campaign-resource-manager-primary-name", s.participant.Name())) - } else { - log.Error("campaign resource manager primary meets error due to etcd error", - zap.String("campaign-resource-manager-primary-name", s.participant.Name()), - errs.ZapError(err)) - } - return - } - - // Start keepalive the leadership and enable Resource Manager service. - ctx, cancel := context.WithCancel(s.serverLoopCtx) - var resetLeaderOnce sync.Once - defer resetLeaderOnce.Do(func() { - cancel() - s.participant.ResetLeader() - member.ServiceMemberGauge.WithLabelValues(serviceName).Set(0) - }) - - // maintain the leadership, after this, Resource Manager could be ready to provide service. - s.participant.KeepLeader(ctx) - log.Info("campaign resource manager primary ok", zap.String("campaign-resource-manager-primary-name", s.participant.Name())) - - log.Info("triggering the primary callback functions") - for _, cb := range s.primaryCallbacks { - if err := cb(ctx); err != nil { - log.Error("failed to trigger the primary callback function", errs.ZapError(err)) - } - } - - s.participant.EnableLeader() - member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1) - log.Info("resource manager primary is ready to serve", zap.String("resource-manager-primary-name", s.participant.Name())) - - leaderTicker := time.NewTicker(constant.LeaderTickInterval) - defer leaderTicker.Stop() - - for { - select { - case <-leaderTicker.C: - if !s.participant.IsLeader() { - log.Info("no longer a primary/leader because lease has expired, the resource manager primary/leader will step down") - return - } - case <-ctx.Done(): - // Server is closed and it should return nil. - log.Info("server is closed") - return - } - } -} - -// Close closes the server. -func (s *Server) Close() { - if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) { - // server is already closed - return - } - - log.Info("closing resource manager server ...") - if err := s.serviceRegister.Deregister(); err != nil { - log.Error("failed to deregister the service", errs.ZapError(err)) - } - utils.StopHTTPServer(s) - utils.StopGRPCServer(s) - s.GetListener().Close() - s.CloseClientConns() - s.serverLoopCancel() - s.serverLoopWg.Wait() - - if s.GetClient() != nil { - if err := s.GetClient().Close(); err != nil { - log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) - } - } - - if s.GetHTTPClient() != nil { - s.GetHTTPClient().CloseIdleConnections() - } - - log.Info("resource manager server is closed") -} - -// GetControllerConfig returns the controller config. -func (s *Server) GetControllerConfig() *ControllerConfig { - return &s.cfg.Controller -} - -// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. -func (s *Server) IsServing() bool { - return !s.IsClosed() && s.participant.IsLeader() -} - -// IsClosed checks if the server loop is closed -func (s *Server) IsClosed() bool { - return s != nil && atomic.LoadInt64(&s.isRunning) == 0 -} - -// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { - s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) -} - -// GetBackendEndpoints returns the backend endpoints. -func (s *Server) GetBackendEndpoints() string { - return s.cfg.BackendEndpoints -} - -// ServerLoopWgDone decreases the server loop wait group. -func (s *Server) ServerLoopWgDone() { - s.serverLoopWg.Done() -} - -// ServerLoopWgAdd increases the server loop wait group. -func (s *Server) ServerLoopWgAdd(n int) { - s.serverLoopWg.Add(n) -} - -// SetUpRestHandler sets up the REST handler. -func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) { - return SetUpRestHandler(s.service) -} - -// RegisterGRPCService registers the grpc service. -func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { - s.service.RegisterGRPCService(grpcServer) -} - -// GetTLSConfig gets the security config. -func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { - return &s.cfg.Security.TLSConfig -} - -// GetLeaderListenUrls gets service endpoints from the leader in election group. -func (s *Server) GetLeaderListenUrls() []string { - return s.participant.GetLeaderListenUrls() -} - -func (s *Server) startServer() (err error) { - // The independent Resource Manager service still reuses PD version info since PD and Resource Manager are just - // different service modes provided by the same pd-server binary - bs.ServerInfoGauge.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) - bs.ServerMaxProcsGauge.Set(float64(runtime.GOMAXPROCS(0))) - - uniqueName := s.cfg.GetAdvertiseListenAddr() - uniqueID := memberutil.GenerateUniqueID(uniqueName) - log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) - s.participant = member.NewParticipant(s.GetClient(), constant.ResourceManagerServiceName) - p := &resource_manager.Participant{ - Name: uniqueName, - Id: uniqueID, // id is unique among all participants - ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()}, - } - s.participant.InitInfo(p, keypath.ResourceManagerSvcRootPath(), constant.PrimaryKey, "primary election") - - s.service = &Service{ - ctx: s.Context(), - manager: NewManager[*Server](s), - } - - if err := s.InitListener(s.GetTLSConfig(), s.cfg.GetListenAddr()); err != nil { - return err - } - - serverReadyChan := make(chan struct{}) - defer close(serverReadyChan) - s.serverLoopWg.Add(1) - go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener()) - <-serverReadyChan - - // Run callbacks - log.Info("triggering the start callback functions") - for _, cb := range s.GetStartCallbacks() { - cb() - } - // The start callback function will initialize storage, which will be used in service ready callback. - // We should make sure the calling sequence is right. - s.startServerLoop() - - atomic.StoreInt64(&s.isRunning, 1) - return nil -} - -// CreateServer creates the Server -func CreateServer(ctx context.Context, cfg *Config) *Server { - svr := &Server{ - BaseServer: server.NewBaseServer(ctx), - DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), - cfg: cfg, - } - return svr -} - -// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server -func CreateServerWrapper(cmd *cobra.Command, args []string) { - err := cmd.Flags().Parse(args) - if err != nil { - cmd.Println(err) - return - } - cfg := NewConfig() - flagSet := cmd.Flags() - err = cfg.Parse(flagSet) - defer logutil.LogPanic() - - if err != nil { - cmd.Println(err) - return - } - - if printVersion, err := flagSet.GetBool("version"); err != nil { - cmd.Println(err) - return - } else if printVersion { - versioninfo.Print() - utils.Exit(0) - } - - // New zap logger - err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) - if err == nil { - log.ReplaceGlobals(cfg.Logger, cfg.LogProps) - } else { - log.Fatal("initialize logger error", errs.ZapError(err)) - } - // Flushing any buffered log entries - log.Sync() - versioninfo.Log(serviceName) - log.Info("resource manager config", zap.Reflect("config", cfg)) - - grpcprometheus.EnableHandlingTimeHistogram() - metricutil.Push(&cfg.Metric) - - ctx, cancel := context.WithCancel(context.Background()) - svr := CreateServer(ctx, cfg) - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - - var sig os.Signal - go func() { - sig = <-sc - cancel() - }() - - if err := svr.Run(); err != nil { - log.Fatal("run server failed", errs.ZapError(err)) - } - - <-ctx.Done() - log.Info("got signal to exit", zap.String("signal", sig.String())) - - svr.Close() - switch sig { - case syscall.SIGTERM: - utils.Exit(0) - default: - utils.Exit(1) - } -} diff --git a/pkg/mcs/resourcemanager/server/testutil.go b/pkg/mcs/resourcemanager/server/testutil.go deleted file mode 100644 index 3577301258c..00000000000 --- a/pkg/mcs/resourcemanager/server/testutil.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2023 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "context" - "os" - - "github.com/pingcap/log" - "github.com/spf13/pflag" - "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/utils/logutil" - "github.com/tikv/pd/pkg/utils/testutil" -) - -// NewTestServer creates a resource manager server for testing. -func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*Server, testutil.CleanupFunc, error) { - // New zap logger - err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) - re.NoError(err) - log.ReplaceGlobals(cfg.Logger, cfg.LogProps) - // Flushing any buffered log entries - log.Sync() - - s := CreateServer(ctx, cfg) - if err = s.Run(); err != nil { - return nil, nil, err - } - - cleanup := func() { - s.Close() - os.RemoveAll(cfg.DataDir) - } - return s, cleanup, nil -} - -// GenerateConfig generates a new config with the given options. -func GenerateConfig(c *Config) (*Config, error) { - arguments := []string{ - "--name=" + c.Name, - "--listen-addr=" + c.ListenAddr, - "--advertise-listen-addr=" + c.AdvertiseListenAddr, - "--backend-endpoints=" + c.BackendEndpoints, - } - - flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError) - flagSet.StringP("name", "", "", "human-readable name for this resource manager member") - flagSet.BoolP("version", "V", false, "print version information and exit") - flagSet.StringP("config", "", "", "config file") - flagSet.StringP("backend-endpoints", "", "", "url for etcd client") - flagSet.StringP("listen-addr", "", "", "listen address for resource manager service") - flagSet.StringP("advertise-listen-addr", "", "", "advertise urls for listen address (default '${listen-addr}')") - flagSet.StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs") - flagSet.StringP("cert", "", "", "path of file that contains X509 certificate in PEM format") - flagSet.StringP("key", "", "", "path of file that contains X509 key in PEM format") - err := flagSet.Parse(arguments) - if err != nil { - return nil, err - } - cfg := NewConfig() - err = cfg.Parse(flagSet) - if err != nil { - return nil, err - } - - return cfg, nil -} diff --git a/pkg/mcs/utils/constant/constant.go b/pkg/mcs/utils/constant/constant.go index e8700ffbbc6..87fcf29f678 100644 --- a/pkg/mcs/utils/constant/constant.go +++ b/pkg/mcs/utils/constant/constant.go @@ -61,8 +61,6 @@ const ( APIServiceName = "api" // TSOServiceName is the name of tso server. TSOServiceName = "tso" - // ResourceManagerServiceName is the name of resource manager server. - ResourceManagerServiceName = "resource_manager" // SchedulingServiceName is the name of scheduling server. SchedulingServiceName = "scheduling" // KeyspaceGroupsKey is the path component of keyspace groups. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 5ef997341de..a513152d9b2 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -21,7 +21,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" @@ -328,8 +327,6 @@ func NewParticipantByService(serviceName string) (p participant) { p = &tsopb.Participant{} case constant.SchedulingServiceName: p = &schedulingpb.Participant{} - case constant.ResourceManagerServiceName: - p = &resource_manager.Participant{} } return p } diff --git a/pkg/utils/keypath/key_path.go b/pkg/utils/keypath/key_path.go index 5f3aafeca36..10934d1be9e 100644 --- a/pkg/utils/keypath/key_path.go +++ b/pkg/utils/keypath/key_path.go @@ -317,12 +317,6 @@ func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp { return regexp.MustCompile(pattern) } -// ResourceManagerSvcRootPath returns the root path of resource manager service. -// Path: /ms/{cluster_id}/resource_manager -func ResourceManagerSvcRootPath() string { - return svcRootPath(constant.ResourceManagerServiceName) -} - // SchedulingSvcRootPath returns the root path of scheduling service. // Path: /ms/{cluster_id}/scheduling func SchedulingSvcRootPath() string { diff --git a/tests/integrations/mcs/discovery/register_test.go b/tests/integrations/mcs/discovery/register_test.go index 63a40a1fb5b..147e16530b7 100644 --- a/tests/integrations/mcs/discovery/register_test.go +++ b/tests/integrations/mcs/discovery/register_test.go @@ -156,8 +156,6 @@ func (suite *serverRegisterTestSuite) addServer(serviceName string) (bs.Server, switch serviceName { case constant.TSOServiceName: return tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) - case constant.ResourceManagerServiceName: - return tests.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) default: return nil, nil } diff --git a/tests/integrations/mcs/resourcemanager/server_test.go b/tests/integrations/mcs/resourcemanager/server_test.go deleted file mode 100644 index 7ca83ad6bac..00000000000 --- a/tests/integrations/mcs/resourcemanager/server_test.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2023 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package resourcemanager_test - -import ( - "context" - "encoding/json" - "io" - "net/http" - "strings" - "testing" - - rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/stretchr/testify/require" - "github.com/tikv/pd/client/utils/grpcutil" - bs "github.com/tikv/pd/pkg/basicserver" - "github.com/tikv/pd/pkg/utils/tempurl" - "github.com/tikv/pd/pkg/versioninfo" - "github.com/tikv/pd/tests" -) - -func TestResourceManagerServer(t *testing.T) { - re := require.New(t) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestAPICluster(ctx, 1) - defer cluster.Destroy() - re.NoError(err) - - err = cluster.RunInitialServers() - re.NoError(err) - - leaderName := cluster.WaitLeader() - re.NotEmpty(leaderName) - leader := cluster.GetServer(leaderName) - - s, cleanup := tests.StartSingleResourceManagerTestServer(ctx, re, leader.GetAddr(), tempurl.Alloc()) - addr := s.GetAddr() - defer cleanup() - tests.WaitForPrimaryServing(re, map[string]bs.Server{addr: s}) - - // Test registered GRPC Service - cc, err := grpcutil.GetClientConn(ctx, addr, nil) - re.NoError(err) - defer cc.Close() - - c := rmpb.NewResourceManagerClient(cc) - _, err = c.GetResourceGroup(context.Background(), &rmpb.GetResourceGroupRequest{ - ResourceGroupName: "pingcap", - }) - re.ErrorContains(err, "resource group not found") - - // Test registered REST HTTP Handler - url := addr + "/resource-manager/api/v1/config" - { - resp, err := tests.TestDialClient.Get(url + "/groups") - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) - respString, err := io.ReadAll(resp.Body) - re.NoError(err) - re.JSONEq(`[{"name":"default","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":2147483647,"burst_limit":-1},"state":{"initialized":false}}},"priority":8}]`, string(respString)) - } - { - group := &rmpb.ResourceGroup{ - Name: "pingcap", - Mode: 1, - } - createJSON, err := json.Marshal(group) - re.NoError(err) - resp, err := tests.TestDialClient.Post(url+"/group", "application/json", strings.NewReader(string(createJSON))) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) - } - { - resp, err := tests.TestDialClient.Get(url + "/group/pingcap") - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) - respString, err := io.ReadAll(resp.Body) - re.NoError(err) - re.JSONEq("{\"name\":\"pingcap\",\"mode\":1,\"r_u_settings\":{\"r_u\":{\"state\":{\"initialized\":false}}},\"priority\":0}", string(respString)) - } - - // Test metrics handler - { - resp, err := tests.TestDialClient.Get(addr + "/metrics") - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) - respBytes, err := io.ReadAll(resp.Body) - re.NoError(err) - re.Contains(string(respBytes), "pd_server_info") - } - - // Test status handler - { - resp, err := tests.TestDialClient.Get(addr + "/status") - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) - respBytes, err := io.ReadAll(resp.Body) - re.NoError(err) - var s versioninfo.Status - re.NoError(json.Unmarshal(respBytes, &s)) - re.Equal(versioninfo.PDBuildTS, s.BuildTS) - re.Equal(versioninfo.PDGitHash, s.GitHash) - re.Equal(versioninfo.PDReleaseVersion, s.Version) - } -} diff --git a/tests/testutil.go b/tests/testutil.go index b5ea0a9f53a..b56fd245bd3 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -36,7 +36,6 @@ import ( "github.com/stretchr/testify/require" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/core" - rm "github.com/tikv/pd/pkg/mcs/resourcemanager/server" scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server" sc "github.com/tikv/pd/pkg/mcs/scheduling/server/config" tso "github.com/tikv/pd/pkg/mcs/tso/server" @@ -107,24 +106,6 @@ func InitLogger(logConfig log.Config, logger *zap.Logger, logProps *log.ZapPrope return err } -// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing. -func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) { - cfg := rm.NewConfig() - cfg.BackendEndpoints = backendEndpoints - cfg.ListenAddr = listenAddrs - cfg.Name = cfg.ListenAddr - cfg, err := rm.GenerateConfig(cfg) - re.NoError(err) - - s, cleanup, err := rm.NewTestServer(ctx, re, cfg) - re.NoError(err) - testutil.Eventually(re, func() bool { - return !s.IsClosed() - }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - - return s, cleanup -} - // StartSingleTSOTestServerWithoutCheck creates and starts a tso server with default config for testing. func StartSingleTSOTestServerWithoutCheck(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func(), error) { cfg := tso.NewConfig()