From a86f3ddf75dadc6dfae3fd272c728e620f625dcd Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Fri, 29 Nov 2024 18:06:20 +0800 Subject: [PATCH] tso/local: remove dc location and local_locator.go (#8851) ref tikv/pd#8802 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/tso/server/apis/v1/api.go | 7 +- pkg/mcs/tso/server/config.go | 5 - pkg/mcs/tso/server/grpc_service.go | 5 +- pkg/mcs/tso/server/server.go | 5 +- pkg/member/member.go | 24 -- pkg/member/participant.go | 10 - pkg/tso/allocator_manager.go | 274 ++---------------- pkg/tso/config.go | 2 - pkg/tso/filter.go | 30 -- pkg/tso/global_allocator.go | 16 +- pkg/tso/keyspace_group_manager.go | 53 +--- pkg/tso/keyspace_group_manager_test.go | 13 +- pkg/tso/local_allocator.go | 196 ------------- pkg/tso/testutil.go | 5 - pkg/tso/tso.go | 11 +- pkg/utils/keypath/key_path.go | 12 - pkg/utils/keypath/key_path_v2.go | 24 +- pkg/utils/tsoutil/tso_proto_factory.go | 12 +- pkg/utils/tsoutil/tso_request.go | 4 +- server/api/member.go | 30 +- server/cluster/cluster.go | 50 ++-- server/config/config.go | 5 - server/forward.go | 6 +- server/grpc_service.go | 160 +--------- server/handler.go | 6 +- server/server.go | 5 - tests/cluster.go | 28 -- tests/integrations/mcs/tso/api_test.go | 1 - .../mcs/tso/keyspace_group_manager_test.go | 5 +- tests/integrations/mcs/tso/server_test.go | 6 +- tests/server/member/member_test.go | 8 - 31 files changed, 104 insertions(+), 914 deletions(-) delete mode 100644 pkg/tso/filter.go delete mode 100644 pkg/tso/local_allocator.go diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index 68a654c7315..ec8782eb522 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -302,11 +302,6 @@ func transferPrimary(c *gin.Context) { return } - globalAllocator, err := allocator.GetAllocator(tso.GlobalDCLocation) - if err != nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) - return - } // only members of specific group are valid primary candidates. group := svr.GetKeyspaceGroupManager().GetKeyspaceGroups()[keyspaceGroupID] memberMap := make(map[string]bool, len(group.Members)) @@ -314,7 +309,7 @@ func transferPrimary(c *gin.Context) { memberMap[member.Address] = true } - if err := utils.TransferPrimary(svr.GetClient(), globalAllocator.(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(), + if err := utils.TransferPrimary(svr.GetClient(), allocator.GetAllocator().(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(), constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID, memberMap); err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return diff --git a/pkg/mcs/tso/server/config.go b/pkg/mcs/tso/server/config.go index 209a9deb949..0973042b912 100644 --- a/pkg/mcs/tso/server/config.go +++ b/pkg/mcs/tso/server/config.go @@ -127,11 +127,6 @@ func (c *Config) GetLeaderLease() int64 { return c.LeaderLease } -// IsLocalTSOEnabled returns if the local TSO is enabled. -func (c *Config) IsLocalTSOEnabled() bool { - return c.EnableLocalTSO -} - // GetTSOUpdatePhysicalInterval returns TSO update physical interval. func (c *Config) GetTSOUpdatePhysicalInterval() time.Duration { return c.TSOUpdatePhysicalInterval.Duration diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index 33d158fd785..3419fd16221 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -111,12 +111,11 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { } keyspaceID := header.GetKeyspaceId() keyspaceGroupID := header.GetKeyspaceGroupId() - dcLocation := request.GetDcLocation() count := request.GetCount() ts, keyspaceGroupBelongTo, err := s.keyspaceGroupManager.HandleTSORequest( ctx, keyspaceID, keyspaceGroupID, - dcLocation, count) + count) if err != nil { return status.Error(codes.Unknown, err.Error()) } @@ -198,7 +197,7 @@ func (s *Service) GetMinTS( }, nil } - minTS, kgAskedCount, kgTotalCount, err := s.keyspaceGroupManager.GetMinTS(request.GetDcLocation()) + minTS, kgAskedCount, kgTotalCount, err := s.keyspaceGroupManager.GetMinTS() if err != nil { return &tsopb.GetMinTSResponse{ Header: wrapErrorToHeader( diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 270cfbe13e9..04e81c2d48e 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -328,10 +328,7 @@ func (s *Server) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, key log.Error("failed to get allocator manager", errs.ZapError(err)) return err } - tsoAllocator, err := tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) - if err != nil { - return err - } + tsoAllocator := tsoAllocatorManager.GetAllocator() if tsoAllocator == nil { return errs.ErrServerNotStarted } diff --git a/pkg/member/member.go b/pkg/member/member.go index 89ba59d385f..b1924835988 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -383,16 +383,6 @@ func (m *EmbeddedEtcdMember) getMemberLeaderPriorityPath(id uint64) string { return path.Join(m.rootPath, fmt.Sprintf("member/%d/leader_priority", id)) } -// GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. -func (*EmbeddedEtcdMember) GetDCLocationPathPrefix() string { - return keypath.Prefix(keypath.DCLocationPath(nil, 0)) -} - -// GetDCLocationPath returns the dc-location path of a member with the given member ID. -func (*EmbeddedEtcdMember) GetDCLocationPath(id uint64) string { - return keypath.DCLocationPath(nil, id) -} - // SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader. func (m *EmbeddedEtcdMember) SetMemberLeaderPriority(id uint64, priority int) error { key := m.getMemberLeaderPriorityPath(id) @@ -421,20 +411,6 @@ func (m *EmbeddedEtcdMember) DeleteMemberLeaderPriority(id uint64) error { return nil } -// DeleteMemberDCLocationInfo removes a member's dc-location info. -func (m *EmbeddedEtcdMember) DeleteMemberDCLocationInfo(id uint64) error { - key := m.GetDCLocationPath(id) - res, err := m.leadership.LeaderTxn().Then(clientv3.OpDelete(key)).Commit() - if err != nil { - return errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause() - } - if !res.Succeeded { - log.Error("delete dc-location info failed, maybe not pd leader") - return errs.ErrEtcdTxnConflict.FastGenByArgs() - } - return nil -} - // GetMemberLeaderPriority loads a member's priority to be elected as the etcd leader. func (m *EmbeddedEtcdMember) GetMemberLeaderPriority(id uint64) (int, error) { key := m.getMemberLeaderPriorityPath(id) diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 93d852f0fb0..26888f6152d 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -276,16 +276,6 @@ func (m *Participant) IsSameLeader(leader participant) bool { return leader.GetId() == m.ID() } -// GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. -func (m *Participant) GetDCLocationPathPrefix() string { - return keypath.Prefix(keypath.DCLocationPath(&m.MsParam, 0)) -} - -// GetDCLocationPath returns the dc-location path of a member with the given member ID. -func (m *Participant) GetDCLocationPath(id uint64) string { - return keypath.DCLocationPath(&m.MsParam, id) -} - func (m *Participant) campaignCheck() bool { checker := m.campaignChecker.Load() if checker == nil { diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index ea1eeeeb18a..8d5589143aa 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -16,12 +16,10 @@ package tso import ( "context" - "fmt" "math" "path" "runtime/trace" "strconv" - "strings" "sync" "time" @@ -31,15 +29,12 @@ import ( "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/member" - "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" - "google.golang.org/grpc" ) const ( @@ -61,7 +56,6 @@ var ( type AllocatorGroupFilter func(ag *allocatorGroup) bool type allocatorGroup struct { - dcLocation string // ctx is built with cancel from a parent context when set up which can be different // in order to receive Done() signal correctly. // cancel would be call when allocatorGroup is deleted to stop background loop. @@ -75,25 +69,6 @@ type allocatorGroup struct { allocator Allocator } -// DCLocationInfo is used to record some dc-location related info, -// such as suffix sign and server IDs in this dc-location. -type DCLocationInfo struct { - // dc-location/global (string) -> Member IDs - ServerIDs []uint64 - // dc-location (string) -> Suffix sign. It is collected and maintained by the PD leader. - Suffix int32 -} - -func (info *DCLocationInfo) clone() DCLocationInfo { - copiedInfo := DCLocationInfo{ - Suffix: info.Suffix, - } - // Make a deep copy here for the slice - copiedInfo.ServerIDs = make([]uint64, len(info.ServerIDs)) - copy(copiedInfo.ServerIDs, info.ServerIDs) - return copiedInfo -} - // ElectionMember defines the interface for the election related logic. type ElectionMember interface { // ID returns the unique ID in the election group. For example, it can be unique @@ -136,10 +111,6 @@ type ElectionMember interface { GetLeadership() *election.Leadership // GetLastLeaderUpdatedTime returns the last time when the leader is updated. GetLastLeaderUpdatedTime() time.Time - // GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. - GetDCLocationPathPrefix() string - // GetDCLocationPath returns the dc-location path of a member with the given member ID. - GetDCLocationPath(id uint64) string // PreCheckLeader does some pre-check before checking whether it's the leader. PreCheckLeader() error } @@ -150,19 +121,13 @@ type ElectionMember interface { type AllocatorManager struct { mu struct { syncutil.RWMutex - // There are two kinds of TSO Allocators: - // 1. Global TSO Allocator, as a global single point to allocate - // TSO for global transactions, such as cross-region cases. - // 2. Local TSO Allocator, servers for DC-level transactions. - // dc-location/global (string) -> TSO Allocator - allocatorGroups map[string]*allocatorGroup - clusterDCLocations map[string]*DCLocationInfo + // Global TSO Allocator, as a global single point to allocate + // TSO for global transactions, such as cross-region cases. + allocatorGroup *allocatorGroup // The max suffix sign we have so far, it will be used to calculate // the number of suffix bits we need in the TSO logical part. maxSuffix int32 } - // for the synchronization purpose of the allocator update checks - wg sync.WaitGroup // for the synchronization purpose of the service loops svcLoopWG sync.WaitGroup @@ -175,7 +140,6 @@ type AllocatorManager struct { // TSO config rootPath string storage endpoint.TSOStorage - enableLocalTSO bool saveInterval time.Duration updatePhysicalInterval time.Duration // leaderLease defines the time within which a TSO primary/leader must update its TTL @@ -184,11 +148,6 @@ type AllocatorManager struct { leaderLease int64 maxResetTSGap func() time.Duration securityConfig *grpcutil.TLSConfig - // for gRPC use - localAllocatorConn struct { - syncutil.RWMutex - clientConns map[string]*grpc.ClientConn - } } // NewAllocatorManager creates a new TSO Allocator Manager. @@ -208,37 +167,33 @@ func NewAllocatorManager( member: member, rootPath: rootPath, storage: storage, - enableLocalTSO: cfg.IsLocalTSOEnabled(), saveInterval: cfg.GetTSOSaveInterval(), updatePhysicalInterval: cfg.GetTSOUpdatePhysicalInterval(), leaderLease: cfg.GetLeaderLease(), maxResetTSGap: cfg.GetMaxResetTSGap, securityConfig: cfg.GetTLSConfig(), } - am.mu.allocatorGroups = make(map[string]*allocatorGroup) - am.mu.clusterDCLocations = make(map[string]*DCLocationInfo) - am.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn) + am.mu.allocatorGroup = &allocatorGroup{} - // Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully. - am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership()) + // Set up the TSO Allocator here, it will be initialized once the member campaigns leader successfully. + am.SetUpAllocator(am.ctx, am.member.GetLeadership()) am.svcLoopWG.Add(1) go am.tsoAllocatorLoop() return am } -// SetUpGlobalAllocator is used to set up the global allocator, which will initialize the allocator and put it into +// SetUpAllocator is used to set up the allocator, which will initialize the allocator and put it into // an allocator daemon. An TSO Allocator should only be set once, and may be initialized and reset multiple times // depending on the election. -func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership) { +func (am *AllocatorManager) SetUpAllocator(ctx context.Context, leadership *election.Leadership) { am.mu.Lock() defer am.mu.Unlock() allocator := NewGlobalTSOAllocator(ctx, am) // Create a new allocatorGroup ctx, cancel := context.WithCancel(ctx) - am.mu.allocatorGroups[GlobalDCLocation] = &allocatorGroup{ - dcLocation: GlobalDCLocation, + am.mu.allocatorGroup = &allocatorGroup{ ctx: ctx, cancel: cancel, leadership: leadership, @@ -262,21 +217,15 @@ func (am *AllocatorManager) getGroupIDStr() string { return strconv.FormatUint(uint64(am.kgID), 10) } -// GetTimestampPath returns the timestamp path in etcd for the given DCLocation. -func (am *AllocatorManager) GetTimestampPath(dcLocation string) string { +// GetTimestampPath returns the timestamp path in etcd. +func (am *AllocatorManager) GetTimestampPath() string { if am == nil { return "" } - if len(dcLocation) == 0 { - dcLocation = GlobalDCLocation - } am.mu.RLock() defer am.mu.RUnlock() - if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist { - return path.Join(am.rootPath, allocatorGroup.allocator.GetTimestampPath()) - } - return "" + return path.Join(am.rootPath, am.mu.allocatorGroup.allocator.GetTimestampPath()) } // tsoAllocatorLoop is used to run the TSO Allocator updating daemon. @@ -293,16 +242,7 @@ func (am *AllocatorManager) tsoAllocatorLoop() { func (am *AllocatorManager) close() { log.Info("closing the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) - if allocatorGroup, exist := am.getAllocatorGroup(GlobalDCLocation); exist { - allocatorGroup.allocator.(*GlobalTSOAllocator).close() - } - - for _, cc := range am.localAllocatorConn.clientConns { - if err := cc.Close(); err != nil { - log.Error("failed to close allocator manager grpc clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) - } - } - + am.GetAllocator().(*GlobalTSOAllocator).close() am.cancel() am.svcLoopWG.Wait() @@ -314,65 +254,6 @@ func (am *AllocatorManager) GetMember() ElectionMember { return am.member } -// GetClusterDCLocationsFromEtcd fetches dcLocation topology from etcd -func (am *AllocatorManager) GetClusterDCLocationsFromEtcd() (clusterDCLocations map[string][]uint64, err error) { - resp, err := etcdutil.EtcdKVGet( - am.member.Client(), - am.member.GetDCLocationPathPrefix(), - clientv3.WithPrefix()) - if err != nil { - return clusterDCLocations, err - } - clusterDCLocations = make(map[string][]uint64) - for _, kv := range resp.Kvs { - // The key will contain the member ID and the value is its dcLocation - serverPath := strings.Split(string(kv.Key), "/") - // Get serverID from serverPath, e.g, /pd/dc-location/1232143243253 -> 1232143243253 - serverID, err := strconv.ParseUint(serverPath[len(serverPath)-1], 10, 64) - dcLocation := string(kv.Value) - if err != nil { - log.Warn("get server id and dcLocation from etcd failed, invalid server id", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.Any("split-serverPath", serverPath), - zap.String("dc-location", dcLocation), - errs.ZapError(err)) - continue - } - clusterDCLocations[dcLocation] = append(clusterDCLocations[dcLocation], serverID) - } - return clusterDCLocations, nil -} - -// GetDCLocationInfo returns a copy of DCLocationInfo of the given dc-location, -func (am *AllocatorManager) GetDCLocationInfo(dcLocation string) (DCLocationInfo, bool) { - am.mu.RLock() - defer am.mu.RUnlock() - infoPtr, ok := am.mu.clusterDCLocations[dcLocation] - if !ok { - return DCLocationInfo{}, false - } - return infoPtr.clone(), true -} - -// GetClusterDCLocations returns all dc-locations of a cluster with a copy of map, -// which satisfies dcLocation -> DCLocationInfo. -func (am *AllocatorManager) GetClusterDCLocations() map[string]DCLocationInfo { - am.mu.RLock() - defer am.mu.RUnlock() - dcLocationMap := make(map[string]DCLocationInfo) - for dcLocation, info := range am.mu.clusterDCLocations { - dcLocationMap[dcLocation] = info.clone() - } - return dcLocationMap -} - -// GetClusterDCLocationsNumber returns the number of cluster dc-locations. -func (am *AllocatorManager) GetClusterDCLocationsNumber() int { - am.mu.RLock() - defer am.mu.RUnlock() - return len(am.mu.clusterDCLocations) -} - // GetSuffixBits calculates the bits of suffix sign // by the max number of suffix so far, // which will be used in the TSO logical part. @@ -402,8 +283,11 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { for { select { case <-tsTicker.C: + allocatorGroup := am.mu.allocatorGroup // Update the initialized TSO Allocator to advance TSO. - am.allocatorUpdater() + if allocatorGroup.allocator.IsInitialize() && allocatorGroup.leadership.Check() { + am.updateAllocator(allocatorGroup) + } case <-ctx.Done(): log.Info("exit allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) return @@ -411,22 +295,9 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { } } -// Update the Local TSO Allocator leaders TSO in memory concurrently. -func (am *AllocatorManager) allocatorUpdater() { - // Filter out allocators without leadership and uninitialized - allocatorGroups := am.getAllocatorGroups(FilterUninitialized(), FilterUnavailableLeadership()) - // Update each allocator concurrently - for _, ag := range allocatorGroups { - am.wg.Add(1) - go am.updateAllocator(ag) - } - am.wg.Wait() -} - // updateAllocator is used to update the allocator in the group. func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { defer logutil.LogPanic() - defer am.wg.Done() select { case <-ag.ctx.Done(): @@ -438,130 +309,43 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { } if !ag.leadership.Check() { log.Info("allocator doesn't campaign leadership yet", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", ag.dcLocation)) + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) time.Sleep(200 * time.Millisecond) return } if err := ag.allocator.UpdateTSO(); err != nil { log.Warn("failed to update allocator's timestamp", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", ag.dcLocation), zap.String("name", am.member.Name()), errs.ZapError(err)) - am.ResetAllocatorGroup(ag.dcLocation, false) + am.ResetAllocatorGroup(false) return } } // HandleRequest forwards TSO allocation requests to correct TSO Allocators. -func (am *AllocatorManager) HandleRequest(ctx context.Context, dcLocation string, count uint32) (pdpb.Timestamp, error) { +func (am *AllocatorManager) HandleRequest(ctx context.Context, count uint32) (pdpb.Timestamp, error) { defer trace.StartRegion(ctx, "AllocatorManager.HandleRequest").End() - if len(dcLocation) == 0 { - dcLocation = GlobalDCLocation - } - allocatorGroup, exist := am.getAllocatorGroup(dcLocation) - if !exist { - err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation)) - return pdpb.Timestamp{}, err - } - - return allocatorGroup.allocator.GenerateTSO(ctx, count) + return am.GetAllocator().GenerateTSO(ctx, count) } // ResetAllocatorGroup will reset the allocator's leadership and TSO initialized in memory. // It usually should be called before re-triggering an Allocator leader campaign. -func (am *AllocatorManager) ResetAllocatorGroup(dcLocation string, skipResetLeader bool) { +func (am *AllocatorManager) ResetAllocatorGroup(skipResetLeader bool) { am.mu.Lock() defer am.mu.Unlock() - if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist { - allocatorGroup.allocator.Reset() - // Reset if it still has the leadership. Otherwise the data race may occur because of the re-campaigning. - if !skipResetLeader && allocatorGroup.leadership.Check() { - allocatorGroup.leadership.Reset() - } + am.mu.allocatorGroup.allocator.Reset() + // Reset if it still has the leadership. Otherwise the data race may occur because of the re-campaigning. + if !skipResetLeader && am.mu.allocatorGroup.leadership.Check() { + am.mu.allocatorGroup.leadership.Reset() } } -func (am *AllocatorManager) getAllocatorGroups(filters ...AllocatorGroupFilter) []*allocatorGroup { - am.mu.RLock() - defer am.mu.RUnlock() - var allocatorGroups []*allocatorGroup - for _, ag := range am.mu.allocatorGroups { - if ag == nil { - continue - } - if slice.NoneOf(filters, func(i int) bool { return filters[i](ag) }) { - allocatorGroups = append(allocatorGroups, ag) - } - } - return allocatorGroups -} - -func (am *AllocatorManager) getAllocatorGroup(dcLocation string) (*allocatorGroup, bool) { - am.mu.RLock() - defer am.mu.RUnlock() - allocatorGroup, exist := am.mu.allocatorGroups[dcLocation] - return allocatorGroup, exist -} - // GetAllocator get the allocator by dc-location. -func (am *AllocatorManager) GetAllocator(dcLocation string) (Allocator, error) { +func (am *AllocatorManager) GetAllocator() Allocator { am.mu.RLock() defer am.mu.RUnlock() - if len(dcLocation) == 0 { - dcLocation = GlobalDCLocation - } - allocatorGroup, exist := am.mu.allocatorGroups[dcLocation] - if !exist { - return nil, errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found", dcLocation)) - } - return allocatorGroup.allocator, nil -} - -// GetAllocators get all allocators with some filters. -func (am *AllocatorManager) GetAllocators(filters ...AllocatorGroupFilter) []Allocator { - allocatorGroups := am.getAllocatorGroups(filters...) - allocators := make([]Allocator, 0, len(allocatorGroups)) - for _, ag := range allocatorGroups { - allocators = append(allocators, ag.allocator) - } - return allocators -} - -// GetHoldingLocalAllocatorLeaders returns all Local TSO Allocator leaders this server holds. -func (am *AllocatorManager) GetHoldingLocalAllocatorLeaders() ([]*LocalTSOAllocator, error) { - localAllocators := am.GetAllocators( - FilterDCLocation(GlobalDCLocation), - FilterUnavailableLeadership()) - localAllocatorLeaders := make([]*LocalTSOAllocator, 0, len(localAllocators)) - for _, localAllocator := range localAllocators { - localAllocatorLeader, ok := localAllocator.(*LocalTSOAllocator) - if !ok { - return nil, errs.ErrGetLocalAllocator.FastGenByArgs("invalid local tso allocator found") - } - localAllocatorLeaders = append(localAllocatorLeaders, localAllocatorLeader) - } - return localAllocatorLeaders, nil -} - -// GetLocalAllocatorLeaders returns all Local TSO Allocator leaders' member info. -func (am *AllocatorManager) GetLocalAllocatorLeaders() (map[string]*pdpb.Member, error) { - localAllocators := am.GetAllocators(FilterDCLocation(GlobalDCLocation)) - localAllocatorLeaderMember := make(map[string]*pdpb.Member) - for _, allocator := range localAllocators { - localAllocator, ok := allocator.(*LocalTSOAllocator) - if !ok { - return nil, errs.ErrGetLocalAllocator.FastGenByArgs("invalid local tso allocator found") - } - localAllocatorLeaderMember[localAllocator.GetDCLocation()] = localAllocator.GetAllocatorLeader() - } - return localAllocatorLeaderMember, nil -} - -// EnableLocalTSO returns the value of AllocatorManager.enableLocalTSO. -func (am *AllocatorManager) EnableLocalTSO() bool { - return am.enableLocalTSO + return am.mu.allocatorGroup.allocator } // IsLeader returns whether the current member is the leader in the election group. @@ -585,7 +369,7 @@ func (am *AllocatorManager) GetLeaderAddr() string { } func (am *AllocatorManager) startGlobalAllocatorLoop() { - globalTSOAllocator, ok := am.mu.allocatorGroups[GlobalDCLocation].allocator.(*GlobalTSOAllocator) + globalTSOAllocator, ok := am.mu.allocatorGroup.allocator.(*GlobalTSOAllocator) if !ok { // it should never happen log.Error("failed to start global allocator loop, global allocator not found") diff --git a/pkg/tso/config.go b/pkg/tso/config.go index 598f76004b1..54056e96d09 100644 --- a/pkg/tso/config.go +++ b/pkg/tso/config.go @@ -38,8 +38,6 @@ type ServiceConfig interface { type Config interface { // GetLeaderLease returns the leader lease. GetLeaderLease() int64 - // IsLocalTSOEnabled returns if the local TSO is enabled. - IsLocalTSOEnabled() bool // GetTSOUpdatePhysicalInterval returns TSO update physical interval. GetTSOUpdatePhysicalInterval() time.Duration // GetTSOSaveInterval returns TSO save interval. diff --git a/pkg/tso/filter.go b/pkg/tso/filter.go deleted file mode 100644 index c2e32a62106..00000000000 --- a/pkg/tso/filter.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2020 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tso - -// FilterDCLocation will filter out the allocatorGroup with a given dcLocation. -func FilterDCLocation(dcLocation string) func(ag *allocatorGroup) bool { - return func(ag *allocatorGroup) bool { return ag.dcLocation == dcLocation } -} - -// FilterUninitialized will filter out the allocatorGroup uninitialized. -func FilterUninitialized() func(ag *allocatorGroup) bool { - return func(ag *allocatorGroup) bool { return !ag.allocator.IsInitialize() } -} - -// FilterUnavailableLeadership will filter out the allocatorGroup whose leadership is unavailable. -func FilterUnavailableLeadership() func(ag *allocatorGroup) bool { - return func(ag *allocatorGroup) bool { return !ag.leadership.Check() } -} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index cb599875dbd..52c30c38f1e 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -111,7 +111,6 @@ func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle { saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, maxResetTSGap: am.maxResetTSGap, - dcLocation: GlobalDCLocation, tsoMux: &tsoObject{}, metrics: newTSOMetrics(am.getGroupIDStr(), GlobalDCLocation), } @@ -161,7 +160,7 @@ func (gta *GlobalTSOAllocator) UpdateTSO() error { // SetTSO sets the physical part with given TSO. func (gta *GlobalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCheck bool) error { - return gta.timestampOracle.resetUserTimestampInner(gta.member.GetLeadership(), tso, ignoreSmaller, skipUpperBoundCheck) + return gta.timestampOracle.resetUserTimestamp(gta.member.GetLeadership(), tso, ignoreSmaller, skipUpperBoundCheck) } // GenerateTSO is used to generate the given number of TSOs. @@ -271,22 +270,15 @@ func (gta *GlobalTSOAllocator) campaignLeader() { logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name())) - allocator, err := gta.am.GetAllocator(GlobalDCLocation) - if err != nil { - log.Error("failed to get the global tso allocator", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - errs.ZapError(err)) - return - } log.Info("initializing the global tso allocator") - if err := allocator.Initialize(0); err != nil { + if err := gta.am.GetAllocator().Initialize(0); err != nil { log.Error("failed to initialize the global tso allocator", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), errs.ZapError(err)) return } defer func() { - gta.am.ResetAllocatorGroup(GlobalDCLocation, false) + gta.am.ResetAllocatorGroup(false) }() // check expected primary and watch the primary. @@ -308,8 +300,6 @@ func (gta *GlobalTSOAllocator) campaignLeader() { member.ServiceMemberGauge.WithLabelValues(tsoLabel).Set(0) }) - // TODO: if enable-local-tso is true, check the cluster dc-location after the primary is elected - // go gta.tsoAllocatorManager.ClusterDCLocationChecker() log.Info("tso primary is ready to serve", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("tso-primary-name", gta.member.Name())) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 07731204e3d..09f20609920 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -653,11 +653,6 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { log.Error("failed to get allocator manager", zap.Error(err)) continue } - globalAllocator, err := allocator.GetAllocator(GlobalDCLocation) - if err != nil { - log.Error("failed to get global allocator", zap.Error(err)) - continue - } // only members of specific group are valid primary candidates. group := kgm.GetKeyspaceGroups()[kg.ID] memberMap := make(map[string]bool, len(group.Members)) @@ -668,7 +663,7 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { zap.String("local-address", kgm.tsoServiceID.ServiceAddr), zap.Uint32("keyspace-group-id", kg.ID), zap.Int("local-priority", localPriority)) - if err := utils.TransferPrimary(kgm.etcdClient, globalAllocator.(*GlobalTSOAllocator).GetExpectedPrimaryLease(), + if err := utils.TransferPrimary(kgm.etcdClient, allocator.GetAllocator().(*GlobalTSOAllocator).GetExpectedPrimaryLease(), constant.TSOServiceName, kgm.GetServiceConfig().GetName(), "", kg.ID, memberMap); err != nil { log.Error("failed to transfer primary", zap.Error(err)) continue @@ -796,7 +791,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro am.startGlobalAllocatorLoop() log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID), - zap.String("timestamp-path", am.GetTimestampPath(""))) + zap.String("timestamp-path", am.GetTimestampPath())) kgm.Lock() group.KeyspaceLookupTable = make(map[uint32]struct{}) for _, kid := range group.Keyspaces { @@ -1075,7 +1070,7 @@ func (kgm *KeyspaceGroupManager) GetKeyspaceGroups() map[uint32]*endpoint.Keyspa func (kgm *KeyspaceGroupManager) HandleTSORequest( ctx context.Context, keyspaceID, keyspaceGroupID uint32, - dcLocation string, count uint32, + count uint32, ) (ts pdpb.Timestamp, curKeyspaceGroupID uint32, err error) { if err := checkKeySpaceGroupID(keyspaceGroupID); err != nil { return pdpb.Timestamp{}, keyspaceGroupID, err @@ -1084,7 +1079,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } - err = kgm.checkTSOSplit(curKeyspaceGroupID, dcLocation) + err = kgm.checkTSOSplit(curKeyspaceGroupID) if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } @@ -1097,17 +1092,12 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( // TSO is the latest one from the storage, which could prevent the potential // fallback caused by the rolling update of the mixed old PD and TSO service deployment. err = kgm.markGroupRequested(curKeyspaceGroupID, func() error { - allocator, err := am.GetAllocator(dcLocation) - if err != nil { - return err - } - // TODO: support the Local TSO Allocator. - return allocator.Initialize(0) + return am.GetAllocator().Initialize(0) }) if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } - ts, err = am.HandleRequest(ctx, dcLocation, count) + ts, err = am.HandleRequest(ctx, count) return ts, curKeyspaceGroupID, err } @@ -1120,9 +1110,7 @@ func checkKeySpaceGroupID(id uint32) error { } // GetMinTS returns the minimum timestamp across all keyspace groups served by this TSO server/pod. -func (kgm *KeyspaceGroupManager) GetMinTS( - dcLocation string, -) (_ pdpb.Timestamp, kgAskedCount, kgTotalCount uint32, err error) { +func (kgm *KeyspaceGroupManager) GetMinTS() (_ pdpb.Timestamp, kgAskedCount, kgTotalCount uint32, err error) { kgm.RLock() defer kgm.RUnlock() @@ -1146,7 +1134,7 @@ func (kgm *KeyspaceGroupManager) GetMinTS( if kgm.kgs[i] != nil && kgm.kgs[i].IsSplitTarget() { continue } - ts, err := am.HandleRequest(context.Background(), dcLocation, 1) + ts, err := am.HandleRequest(context.Background(), 1) if err != nil { return pdpb.Timestamp{}, kgAskedCount, kgTotalCount, err } @@ -1176,20 +1164,13 @@ func genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error { // newly split TSO keep consistent with the original one. func (kgm *KeyspaceGroupManager) checkTSOSplit( keyspaceGroupID uint32, - dcLocation string, ) error { splitTargetAM, splitSourceAM, err := kgm.state.checkGroupSplit(keyspaceGroupID) if err != nil || splitTargetAM == nil { return err } - splitTargetAllocator, err := splitTargetAM.GetAllocator(dcLocation) - if err != nil { - return err - } - splitSourceAllocator, err := splitSourceAM.GetAllocator(dcLocation) - if err != nil { - return err - } + splitTargetAllocator := splitTargetAM.GetAllocator() + splitSourceAllocator := splitSourceAM.GetAllocator() splitTargetTSO, err := splitTargetAllocator.GenerateTSO(context.Background(), 1) if err != nil { return err @@ -1413,17 +1394,7 @@ mergeLoop: zap.Uint32("merge-target-id", mergeTargetID), zap.Any("merge-list", mergeList), zap.Time("merged-ts", mergedTS)) - // TODO: support the Local TSO Allocator. - allocator, err := am.GetAllocator(GlobalDCLocation) - if err != nil { - log.Error("failed to get the allocator", - zap.String("member", kgm.tsoServiceID.ServiceAddr), - zap.Uint32("merge-target-id", mergeTargetID), - zap.Any("merge-list", mergeList), - zap.Error(err)) - continue - } - err = allocator.SetTSO( + err = am.GetAllocator().SetTSO( tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)), true, true) if err != nil { @@ -1490,7 +1461,7 @@ func (kgm *KeyspaceGroupManager) groupSplitPatroller() { zap.Uint32("keyspace-group-id", groupID), zap.Uint32("keyspace-id", group.Keyspaces[0])) // Request the TSO manually to speed up the split process. - _, _, err := kgm.HandleTSORequest(kgm.ctx, group.Keyspaces[0], groupID, GlobalDCLocation, 1) + _, _, err := kgm.HandleTSORequest(kgm.ctx, group.Keyspaces[0], groupID, 1) if err != nil { log.Warn("failed to request tso for the splitting keyspace group", zap.Uint32("keyspace-group-id", groupID), diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 71d65a80785..dea0b00f4f0 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -177,7 +177,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { am, err := kgm.GetAllocatorManager(constant.DefaultKeyspaceGroupID) re.NoError(err) - re.False(am.enableLocalTSO) re.Equal(constant.DefaultKeyspaceGroupID, am.kgID) re.Equal(constant.DefaultLeaderLease, am.leaderLease) re.Equal(time.Hour*24, am.maxResetTSGap()) @@ -647,19 +646,19 @@ func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembers // Should succeed because keyspace 0 is actually in keyspace group 0, which is served // by the current keyspace group manager, instead of keyspace group 1 in ask, and // keyspace group 0 is returned in the response. - _, keyspaceGroupBelongTo, err := mgr.HandleTSORequest(suite.ctx, 0, 1, GlobalDCLocation, 1) + _, keyspaceGroupBelongTo, err := mgr.HandleTSORequest(suite.ctx, 0, 1, 1) re.NoError(err) re.Equal(uint32(0), keyspaceGroupBelongTo) // Should succeed because keyspace 100 doesn't belong to any keyspace group, so it will // be served by the default keyspace group 0, and keyspace group 0 is returned in the response. - _, keyspaceGroupBelongTo, err = mgr.HandleTSORequest(suite.ctx, 100, 0, GlobalDCLocation, 1) + _, keyspaceGroupBelongTo, err = mgr.HandleTSORequest(suite.ctx, 100, 0, 1) re.NoError(err) re.Equal(uint32(0), keyspaceGroupBelongTo) // Should fail because keyspace 100 doesn't belong to any keyspace group, and the keyspace group // 1 in ask doesn't exist. - _, keyspaceGroupBelongTo, err = mgr.HandleTSORequest(suite.ctx, 100, 1, GlobalDCLocation, 1) + _, keyspaceGroupBelongTo, err = mgr.HandleTSORequest(suite.ctx, 100, 1, 1) re.Error(err) re.Equal(uint32(1), keyspaceGroupBelongTo) } @@ -1092,7 +1091,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { // And the primaries on TSO Server 1 should continue to serve TSO requests without any failures. for range 100 { for _, id := range ids { - _, keyspaceGroupBelongTo, err := mgr1.HandleTSORequest(suite.ctx, id, id, GlobalDCLocation, 1) + _, keyspaceGroupBelongTo, err := mgr1.HandleTSORequest(suite.ctx, id, id, 1) re.NoError(err) re.Equal(id, keyspaceGroupBelongTo) } @@ -1188,7 +1187,7 @@ func checkTSO( return default: } - respTS, respGroupID, err := mgr.HandleTSORequest(ctx, id, id, GlobalDCLocation, 1) + respTS, respGroupID, err := mgr.HandleTSORequest(ctx, id, id, 1) // omit the error check since there are many kinds of errors during primaries movement if err != nil { continue @@ -1210,7 +1209,7 @@ func waitForPrimariesServing( if member, err := mgrs[j].GetElectionMember(id, id); err != nil || member == nil || !member.IsLeader() { return false } - if _, _, err := mgrs[j].HandleTSORequest(mgrs[j].ctx, id, id, GlobalDCLocation, 1); err != nil { + if _, _, err := mgrs[j].HandleTSORequest(mgrs[j].ctx, id, id, 1); err != nil { return false } } diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go deleted file mode 100644 index e3456261a3a..00000000000 --- a/pkg/tso/local_allocator.go +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright 2020 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tso - -import ( - "context" - "fmt" - "runtime/trace" - "sync/atomic" - "time" - - "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/tikv/pd/pkg/election" - "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/utils/tsoutil" - "github.com/tikv/pd/pkg/utils/typeutil" - "go.uber.org/zap" -) - -// LocalTSOAllocator is the DC-level local TSO allocator, -// which is only used to allocate TSO in one DC each. -// One PD server may hold multiple Local TSO Allocators. -type LocalTSOAllocator struct { - allocatorManager *AllocatorManager - // leadership is used to campaign the corresponding DC's Local TSO Allocator. - leadership *election.Leadership - timestampOracle *timestampOracle - // for election use, notice that the leadership that member holds is - // the leadership for PD leader. Local TSO Allocator's leadership is for the - // election of Local TSO Allocator leader among several PD servers and - // Local TSO Allocator only use member's some etcd and pdpb.Member info. - // So it's not conflicted. - rootPath string - allocatorLeader atomic.Value // stored as *pdpb.Member - // pre-initialized metrics - tsoAllocatorRoleGauge prometheus.Gauge -} - -// GetTimestampPath returns the timestamp path in etcd. -func (lta *LocalTSOAllocator) GetTimestampPath() string { - if lta == nil || lta.timestampOracle == nil { - return "" - } - return lta.timestampOracle.GetTimestampPath() -} - -// GetDCLocation returns the local allocator's dc-location. -func (lta *LocalTSOAllocator) GetDCLocation() string { - return lta.timestampOracle.dcLocation -} - -// Initialize will initialize the created local TSO allocator. -func (lta *LocalTSOAllocator) Initialize(suffix int) error { - lta.tsoAllocatorRoleGauge.Set(1) - lta.timestampOracle.suffix = suffix - return lta.timestampOracle.SyncTimestamp() -} - -// IsInitialize is used to indicates whether this allocator is initialized. -func (lta *LocalTSOAllocator) IsInitialize() bool { - return lta.timestampOracle.isInitialized() -} - -// UpdateTSO is used to update the TSO in memory and the time window in etcd -// for all local TSO allocators this PD server hold. -func (lta *LocalTSOAllocator) UpdateTSO() error { - return lta.timestampOracle.UpdateTimestamp() -} - -// SetTSO sets the physical part with given TSO. -func (lta *LocalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCheck bool) error { - return lta.timestampOracle.resetUserTimestampInner(lta.leadership, tso, ignoreSmaller, skipUpperBoundCheck) -} - -// GenerateTSO is used to generate a given number of TSOs. -// Make sure you have initialized the TSO allocator before calling. -func (lta *LocalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) { - defer trace.StartRegion(ctx, "LocalTSOAllocator.GenerateTSO").End() - if !lta.leadership.Check() { - lta.getMetrics().notLeaderEvent.Inc() - return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs( - fmt.Sprintf("requested pd %s of %s allocator", errs.NotLeaderErr, lta.timestampOracle.dcLocation)) - } - return lta.timestampOracle.getTS(ctx, lta.leadership, count, lta.allocatorManager.GetSuffixBits()) -} - -// Reset is used to reset the TSO allocator. -func (lta *LocalTSOAllocator) Reset() { - lta.tsoAllocatorRoleGauge.Set(0) - lta.timestampOracle.ResetTimestamp() -} - -// GetAllocatorLeader returns the Local TSO Allocator leader. -func (lta *LocalTSOAllocator) GetAllocatorLeader() *pdpb.Member { - allocatorLeader := lta.allocatorLeader.Load() - if allocatorLeader == nil { - return nil - } - return allocatorLeader.(*pdpb.Member) -} - -// GetMember returns the Local TSO Allocator's member value. -func (lta *LocalTSOAllocator) GetMember() ElectionMember { - return lta.allocatorManager.member -} - -// GetCurrentTSO returns current TSO in memory. -func (lta *LocalTSOAllocator) GetCurrentTSO() (*pdpb.Timestamp, error) { - currentPhysical, currentLogical := lta.timestampOracle.getTSO() - if currentPhysical == typeutil.ZeroTime { - return &pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized") - } - return tsoutil.GenerateTimestamp(currentPhysical, uint64(currentLogical)), nil -} - -// WriteTSO is used to set the maxTS as current TSO in memory. -func (lta *LocalTSOAllocator) WriteTSO(maxTS *pdpb.Timestamp) error { - currentTSO, err := lta.GetCurrentTSO() - if err != nil { - return err - } - // If current local TSO has already been greater or equal to maxTS, then do not update it. - if tsoutil.CompareTimestamp(currentTSO, maxTS) >= 0 { - return nil - } - return lta.timestampOracle.resetUserTimestamp(context.Background(), lta.leadership, tsoutil.GenerateTS(maxTS), true) -} - -// IsAllocatorLeader returns whether the allocator is still a -// Local TSO Allocator leader by checking its leadership's lease and leader info. -func (lta *LocalTSOAllocator) IsAllocatorLeader() bool { - return lta.leadership.Check() && lta.GetAllocatorLeader().GetMemberId() == lta.GetMember().ID() -} - -// isSameAllocatorLeader checks whether a server is the leader itself. -func (lta *LocalTSOAllocator) isSameAllocatorLeader(leader *pdpb.Member) bool { - return leader.GetMemberId() == lta.allocatorManager.member.ID() -} - -// CheckAllocatorLeader checks who is the current Local TSO Allocator leader, and returns true if it is needed to check later. -func (lta *LocalTSOAllocator) CheckAllocatorLeader() (*pdpb.Member, int64, bool) { - if err := lta.allocatorManager.member.PreCheckLeader(); err != nil { - log.Error("no etcd leader, check local tso allocator leader later", - zap.String("dc-location", lta.timestampOracle.dcLocation), errs.ZapError(err)) - time.Sleep(200 * time.Millisecond) - return nil, 0, true - } - - allocatorLeader, rev, err := election.GetLeader(lta.leadership.GetClient(), lta.rootPath) - if err != nil { - log.Error("getting local tso allocator leader meets error", - zap.String("dc-location", lta.timestampOracle.dcLocation), errs.ZapError(err)) - time.Sleep(200 * time.Millisecond) - return nil, 0, true - } - if allocatorLeader != nil { - if lta.isSameAllocatorLeader(allocatorLeader) { - // oh, we are already a Local TSO Allocator leader, which indicates we may meet something wrong - // in previous CampaignAllocatorLeader. We should delete the leadership and campaign again. - // In normal case, if a Local TSO Allocator become an allocator leader, it will keep looping - // in the campaignAllocatorLeader to maintain its leadership. However, the potential failure - // may occur after an allocator get the leadership and it will return from the campaignAllocatorLeader, - // which means the election and initialization are not completed fully. By this mean, we should - // re-campaign by deleting the current allocator leader. - log.Warn("the local tso allocator leader has not changed, delete and campaign again", - zap.String("dc-location", lta.timestampOracle.dcLocation), zap.Stringer("old-pd-leader", allocatorLeader)) - // Delete the leader itself and let others start a new election again. - if err = lta.leadership.DeleteLeaderKey(); err != nil { - log.Error("deleting local tso allocator leader key meets error", errs.ZapError(err)) - time.Sleep(200 * time.Millisecond) - return nil, 0, true - } - // Return nil and false to make sure the campaign will start immediately. - return nil, 0, false - } - } - return allocatorLeader, rev, false -} - -func (lta *LocalTSOAllocator) getMetrics() *tsoMetrics { - return lta.timestampOracle.metrics -} diff --git a/pkg/tso/testutil.go b/pkg/tso/testutil.go index 6fc1ccc98a7..e3d04f55813 100644 --- a/pkg/tso/testutil.go +++ b/pkg/tso/testutil.go @@ -61,11 +61,6 @@ func (c *TestServiceConfig) GetLeaderLease() int64 { return c.LeaderLease } -// IsLocalTSOEnabled returns the LocalTSOEnabled field of TestServiceConfig. -func (c *TestServiceConfig) IsLocalTSOEnabled() bool { - return c.LocalTSOEnabled -} - // GetTSOUpdatePhysicalInterval returns the TSOUpdatePhysicalInterval field of TestServiceConfig. func (c *TestServiceConfig) GetTSOUpdatePhysicalInterval() time.Duration { return c.TSOUpdatePhysicalInterval diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 87d5f9a14ae..38a4c989093 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -76,7 +76,6 @@ type timestampOracle struct { // last timestamp window stored in etcd lastSavedTime atomic.Value // stored as time.Time suffix int - dcLocation string // pre-initialized metrics metrics *tsoMetrics @@ -241,12 +240,7 @@ func (t *timestampOracle) isInitialized() bool { // When ignoreSmaller is true, resetUserTimestamp will ignore the smaller tso resetting error and do nothing. // It's used to write MaxTS during the Global TSO synchronization without failing the writing as much as possible. // cannot set timestamp to one which >= current + maxResetTSGap -func (t *timestampOracle) resetUserTimestamp(ctx context.Context, leadership *election.Leadership, tso uint64, ignoreSmaller bool) error { - defer trace.StartRegion(ctx, "timestampOracle.resetUserTimestamp").End() - return t.resetUserTimestampInner(leadership, tso, ignoreSmaller, false) -} - -func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadership, tso uint64, ignoreSmaller, skipUpperBoundCheck bool) error { +func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, tso uint64, ignoreSmaller, skipUpperBoundCheck bool) error { t.tsoMux.Lock() defer t.tsoMux.Unlock() if !leadership.Check() { @@ -370,7 +364,6 @@ func (t *timestampOracle) UpdateTimestamp() error { if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { log.Warn("save timestamp failed", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0), - zap.String("dc-location", t.dcLocation), zap.String("timestamp-path", t.GetTimestampPath()), zap.Error(err)) t.metrics.errSaveUpdateTSEvent.Inc() @@ -427,7 +420,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader return resp, nil } t.metrics.exceededMaxRetryEvent.Inc() - return resp, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("generate %s tso maximum number of retries exceeded", t.dcLocation)) + return resp, errs.ErrGenerateTimestamp.FastGenByArgs("generate global tso maximum number of retries exceeded") } // ResetTimestamp is used to reset the timestamp in memory. diff --git a/pkg/utils/keypath/key_path.go b/pkg/utils/keypath/key_path.go index ba0e7111cf2..03d497dc408 100644 --- a/pkg/utils/keypath/key_path.go +++ b/pkg/utils/keypath/key_path.go @@ -395,18 +395,6 @@ func KeyspaceGroupGlobalTSPath(groupID uint32) string { return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix) } -// KeyspaceGroupLocalTSPath constructs the timestampOracle path prefix for Local TSO, which is: -// 1. for the default keyspace group: -// lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp -// 2. for the non-default keyspace groups: -// {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp -func KeyspaceGroupLocalTSPath(keyPrefix string, groupID uint32, dcLocation string) string { - if groupID == constant.DefaultKeyspaceGroupID { - return path.Join(keyPrefix, dcLocation) - } - return path.Join(fmt.Sprintf("%05d", groupID), keyPrefix, dcLocation) -} - // TimestampPath returns the timestamp path for the given timestamp oracle path prefix. func TimestampPath(tsPath string) string { return path.Join(tsPath, TimestampKey) diff --git a/pkg/utils/keypath/key_path_v2.go b/pkg/utils/keypath/key_path_v2.go index 395e311e64c..208dd832bcd 100644 --- a/pkg/utils/keypath/key_path_v2.go +++ b/pkg/utils/keypath/key_path_v2.go @@ -21,19 +21,15 @@ import ( const ( leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader" - dcLocationPathFormat = "/pd/%d/dc-location/%d" // "/pd/{cluster_id}/dc-location/{member_id}" memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path" memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash" memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version" allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id" keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id" - msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary" - msTsoDefaultLeaderPathFormat = "/ms/%d/tso/00000/primary" // "/ms/{cluster_id}/tso/00000/primary" - msTsoKespaceLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary" - msDCLocationPathFormat = "/ms/%d/%s/dc-location/%d" // "/ms/{cluster_id}/{service_name}/dc-location/{member_id}" - msTsoDefaultDCLocationPath = "/ms/%d/tso/00000/dc-location/%d" // "/ms/{cluster_id}/tso/00000/dc-location/{member_id}" - msTsoKespaceDCLocationPath = "/ms/%d/tso/keyspace_groups/election/%05d/dc-location/%d" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/dc-location/{member_id}" + msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary" + msTsoDefaultLeaderPathFormat = "/ms/%d/tso/00000/primary" // "/ms/{cluster_id}/tso/00000/primary" + msTsoKespaceLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary" ) // MsParam is the parameter of micro service. @@ -61,20 +57,6 @@ func LeaderPath(p *MsParam) string { return fmt.Sprintf(msLeaderPathFormat, ClusterID(), p.ServiceName) } -// DCLocationPath returns the dc-location path. -func DCLocationPath(p *MsParam, memberID uint64) string { - if p == nil || p.ServiceName == "" { - return fmt.Sprintf(dcLocationPathFormat, ClusterID(), memberID) - } - if p.ServiceName == "tso" { - if p.GroupID == 0 { - return fmt.Sprintf(msTsoDefaultDCLocationPath, ClusterID(), memberID) - } - return fmt.Sprintf(msTsoKespaceDCLocationPath, ClusterID(), p.GroupID, memberID) - } - return fmt.Sprintf(msDCLocationPathFormat, ClusterID(), p.ServiceName, memberID) -} - // MemberBinaryDeployPath returns the member binary deploy path. func MemberBinaryDeployPath(id uint64) string { return fmt.Sprintf(memberBinaryDeployPathFormat, ClusterID(), id) diff --git a/pkg/utils/tsoutil/tso_proto_factory.go b/pkg/utils/tsoutil/tso_proto_factory.go index 672cd4062be..9d8c0cfdb76 100644 --- a/pkg/utils/tsoutil/tso_proto_factory.go +++ b/pkg/utils/tsoutil/tso_proto_factory.go @@ -56,7 +56,7 @@ func (*PDProtoFactory) createForwardStream(ctx context.Context, clientConn *grpc type stream interface { // process sends a request and receives the response through the stream - process(clusterID uint64, count, keyspaceID, keyspaceGroupID uint32, dcLocation string) (response, error) + process(clusterID uint64, count, keyspaceID, keyspaceGroupID uint32) (response, error) } type tsoStream struct { @@ -64,15 +64,14 @@ type tsoStream struct { } // process sends a request and receives the response through the stream -func (s *tsoStream) process(clusterID uint64, count, keyspaceID, keyspaceGroupID uint32, dcLocation string) (response, error) { +func (s *tsoStream) process(clusterID uint64, count, keyspaceID, keyspaceGroupID uint32) (response, error) { req := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: clusterID, KeyspaceId: keyspaceID, KeyspaceGroupId: keyspaceGroupID, }, - Count: count, - DcLocation: dcLocation, + Count: count, } if err := s.stream.Send(req); err != nil { return nil, err @@ -89,13 +88,12 @@ type pdStream struct { } // process sends a request and receives the response through the stream -func (s *pdStream) process(clusterID uint64, count, _, _ uint32, dcLocation string) (response, error) { +func (s *pdStream) process(clusterID uint64, count, _, _ uint32) (response, error) { req := &pdpb.TsoRequest{ Header: &pdpb.RequestHeader{ ClusterId: clusterID, }, - Count: count, - DcLocation: dcLocation, + Count: count, } if err := s.stream.Send(req); err != nil { return nil, err diff --git a/pkg/utils/tsoutil/tso_request.go b/pkg/utils/tsoutil/tso_request.go index 3405debad9d..1ba1e233fc6 100644 --- a/pkg/utils/tsoutil/tso_request.go +++ b/pkg/utils/tsoutil/tso_request.go @@ -80,7 +80,7 @@ func (r *TSOProtoRequest) getCount() uint32 { // count defines the count of timestamps to retrieve. func (r *TSOProtoRequest) process(forwardStream stream, count uint32) (tsoResp, error) { return forwardStream.process(r.request.GetHeader().GetClusterId(), count, - r.request.GetHeader().GetKeyspaceId(), r.request.GetHeader().GetKeyspaceGroupId(), r.request.GetDcLocation()) + r.request.GetHeader().GetKeyspaceId(), r.request.GetHeader().GetKeyspaceGroupId()) } // postProcess sends the response back to the sender of the request @@ -141,7 +141,7 @@ func (r *PDProtoRequest) getCount() uint32 { // count defines the count of timestamps to retrieve. func (r *PDProtoRequest) process(forwardStream stream, count uint32) (tsoResp, error) { return forwardStream.process(r.request.GetHeader().GetClusterId(), count, - constant.DefaultKeyspaceID, constant.DefaultKeyspaceGroupID, r.request.GetDcLocation()) + constant.DefaultKeyspaceID, constant.DefaultKeyspaceGroupID) } // postProcess sends the response back to the sender of the request diff --git a/server/api/member.go b/server/api/member.go index 02cae4bca1e..5161322463a 100644 --- a/server/api/member.go +++ b/server/api/member.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/keypath" @@ -71,13 +70,7 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) { if members.GetHeader().GetError() != nil { return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String())) } - dclocationDistribution := make(map[string][]uint64) - if !svr.IsAPIServiceMode() { - dclocationDistribution, err = svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd() - if err != nil { - return nil, errors.WithStack(err) - } - } + for _, m := range members.GetMembers() { var e error m.BinaryVersion, e = svr.GetMember().GetMemberBinaryVersion(m.GetMemberId()) @@ -103,13 +96,6 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) { log.Error("failed to load git hash", zap.Uint64("member", m.GetMemberId()), errs.ZapError(e)) continue } - for dcLocation, serverIDs := range dclocationDistribution { - found := slice.Contains(serverIDs, m.MemberId) - if found { - m.DcLocation = dcLocation - break - } - } } return members, nil } @@ -152,13 +138,6 @@ func (h *memberHandler) DeleteMemberByName(w http.ResponseWriter, r *http.Reques return } - // Delete dc-location info. - err = h.svr.GetMember().DeleteMemberDCLocationInfo(id) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - // Remove member by id _, err = etcdutil.RemoveEtcdMember(client, id) if err != nil { @@ -191,13 +170,6 @@ func (h *memberHandler) DeleteMemberByID(w http.ResponseWriter, r *http.Request) return } - // Delete dc-location info. - err = h.svr.GetMember().DeleteMemberDCLocationInfo(id) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - client := h.svr.GetClient() _, err = etcdutil.RemoveEtcdMember(client, id) if err != nil { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 28b0f218371..9b4630964b9 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -418,10 +418,7 @@ func (c *RaftCluster) checkTSOService() { c.UnsetServiceIndependent(constant.TSOServiceName) } } else { - if err := c.stopTSOJobsIfNeeded(); err != nil { - log.Error("failed to stop TSO jobs", errs.ZapError(err)) - return - } + c.stopTSOJobsIfNeeded() if !c.IsServiceIndependent(constant.TSOServiceName) { log.Info("TSO is provided by TSO server") c.SetServiceIndependent(constant.TSOServiceName) @@ -476,11 +473,7 @@ func (c *RaftCluster) runServiceCheckJob() { } func (c *RaftCluster) startTSOJobsIfNeeded() error { - allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) - if err != nil { - log.Error("failed to get global TSO allocator", errs.ZapError(err)) - return err - } + allocator := c.tsoAllocator.GetAllocator() if !allocator.IsInitialize() { log.Info("initializing the global TSO allocator") if err := allocator.Initialize(0); err != nil { @@ -495,27 +488,22 @@ func (c *RaftCluster) startTSOJobsIfNeeded() error { return nil } -func (c *RaftCluster) stopTSOJobsIfNeeded() error { - allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) - if err != nil { - log.Error("failed to get global TSO allocator", errs.ZapError(err)) - return err - } - if allocator.IsInitialize() { - log.Info("closing the global TSO allocator") - c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true) - failpoint.Inject("updateAfterResetTSO", func() { - allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) - if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { - log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) - } - if allocator.IsInitialize() { - log.Panic("the allocator should be uninitialized after reset") - } - }) +func (c *RaftCluster) stopTSOJobsIfNeeded() { + allocator := c.tsoAllocator.GetAllocator() + if !allocator.IsInitialize() { + return } - - return nil + log.Info("closing the global TSO allocator") + c.tsoAllocator.ResetAllocatorGroup(true) + failpoint.Inject("updateAfterResetTSO", func() { + allocator := c.tsoAllocator.GetAllocator() + if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { + log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) + } + if allocator.IsInitialize() { + log.Panic("the allocator should be uninitialized after reset") + } + }) } // startGCTuner @@ -861,9 +849,7 @@ func (c *RaftCluster) Stop() { // For example, the cluster meets an error when starting, such as cluster is not bootstrapped. // In this case, the `running` in `RaftCluster` is false, but the tso job has been started. // Ref: https://github.com/tikv/pd/issues/8836 - if err := c.stopTSOJobsIfNeeded(); err != nil { - log.Error("failed to stop tso jobs", errs.ZapError(err)) - } + c.stopTSOJobsIfNeeded() if !c.running { c.Unlock() return diff --git a/server/config/config.go b/server/config/config.go index ed37d899390..7ad03baab77 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -671,11 +671,6 @@ func (c *Config) GetLeaderLease() int64 { return c.LeaderLease } -// IsLocalTSOEnabled returns if the local TSO is enabled. -func (c *Config) IsLocalTSOEnabled() bool { - return c.EnableLocalTSO -} - // GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings. // If the value is negative, there is no limit. func (c *Config) GetMaxConcurrentTSOProxyStreamings() int { diff --git a/server/forward.go b/server/forward.go index 66ac41347c9..674b3e008b6 100644 --- a/server/forward.go +++ b/server/forward.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/utils/constant" - "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" @@ -51,8 +50,7 @@ func forwardTSORequest( KeyspaceId: constant.DefaultKeyspaceID, KeyspaceGroupId: constant.DefaultKeyspaceGroupID, }, - Count: request.GetCount(), - DcLocation: request.GetDcLocation(), + Count: request.GetCount(), } failpoint.Inject("tsoProxySendToTSOTimeout", func() { @@ -420,7 +418,7 @@ func (s *GrpcServer) isLocalRequest(host string) bool { func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) { if !s.IsServiceIndependent(constant.TSOServiceName) { - return s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1) + return s.tsoAllocatorManager.HandleRequest(ctx, 1) } request := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ diff --git a/server/grpc_service.go b/server/grpc_service.go index 59d009daff4..35becc69f51 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -40,7 +40,6 @@ import ( "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" - "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" @@ -318,10 +317,10 @@ func (s *GrpcServer) GetMinTS( err error ) if s.IsServiceIndependent(constant.TSOServiceName) { - minTS, err = s.GetMinTSFromTSOService(tso.GlobalDCLocation) + minTS, err = s.GetMinTSFromTSOService() } else { start := time.Now() - ts, internalErr := s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1) + ts, internalErr := s.tsoAllocatorManager.HandleRequest(ctx, 1) if internalErr == nil { tsoHandleDuration.Observe(time.Since(start).Seconds()) } @@ -342,7 +341,7 @@ func (s *GrpcServer) GetMinTS( // GetMinTSFromTSOService queries all tso servers and gets the minimum timestamp across // all keyspace groups. -func (s *GrpcServer) GetMinTSFromTSOService(dcLocation string) (*pdpb.Timestamp, error) { +func (s *GrpcServer) GetMinTSFromTSOService() (*pdpb.Timestamp, error) { addrs := s.keyspaceGroupManager.GetTSOServiceAddrs() if len(addrs) == 0 { return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("no tso servers/pods discovered") @@ -356,7 +355,7 @@ func (s *GrpcServer) GetMinTSFromTSOService(dcLocation string) (*pdpb.Timestamp, for _, addr := range addrs { go func(addr string) { defer wg.Done() - resp, err := s.getMinTSFromSingleServer(s.ctx, dcLocation, addr) + resp, err := s.getMinTSFromSingleServer(s.ctx, addr) if err != nil || resp == nil { log.Warn("failed to get min ts from tso server", zap.String("address", addr), zap.Error(err)) @@ -411,7 +410,7 @@ func (s *GrpcServer) GetMinTSFromTSOService(dcLocation string) (*pdpb.Timestamp, } func (s *GrpcServer) getMinTSFromSingleServer( - ctx context.Context, dcLocation, tsoSrvAddr string, + ctx context.Context, tsoSrvAddr string, ) (*tsopb.GetMinTSResponse, error) { cc, err := s.getDelegateClient(s.ctx, tsoSrvAddr) if err != nil { @@ -427,7 +426,6 @@ func (s *GrpcServer) getMinTSFromSingleServer( Header: &tsopb.RequestHeader{ ClusterId: keypath.ClusterID(), }, - DcLocation: dcLocation, }) if err != nil { attachErr := errors.Errorf("error:%s target:%s status:%s", @@ -482,17 +480,6 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb } } - tsoAllocatorLeaders := make(map[string]*pdpb.Member) - if !s.IsServiceIndependent(constant.TSOServiceName) { - tsoAllocatorManager := s.GetTSOAllocatorManager() - tsoAllocatorLeaders, err = tsoAllocatorManager.GetLocalAllocatorLeaders() - } - if err != nil { - return &pdpb.GetMembersResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil - } - leader := s.member.GetLeader() for _, m := range members { if m.MemberId == leader.GetMemberId() { @@ -502,11 +489,10 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb } return &pdpb.GetMembersResponse{ - Header: wrapHeader(), - Members: members, - Leader: pdLeader, - EtcdLeader: etcdLeader, - TsoAllocatorLeaders: tsoAllocatorLeaders, + Header: wrapHeader(), + Members: members, + Leader: pdLeader, + EtcdLeader: etcdLeader, }, nil } @@ -611,7 +597,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { } count := request.GetCount() ctx, task := trace.NewTask(ctx, "tso") - ts, err := s.tsoAllocatorManager.HandleRequest(ctx, request.GetDcLocation(), count) + ts, err := s.tsoAllocatorManager.HandleRequest(ctx, count) task.End() tsoHandleDuration.Observe(time.Since(start).Seconds()) if err != nil { @@ -2467,114 +2453,10 @@ func convertAskSplitResponse(resp *schedulingpb.AskBatchSplitResponse) *pdpb.Ask } } -// Only used for the TestLocalAllocatorLeaderChange. -var mockLocalAllocatorLeaderChangeFlag = false - -// SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs this PD is holding when skipCheck is set, -// and write it into all Local TSO Allocators then if it's indeed the biggest one. -func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) { - // TODO: support local tso forward in api service mode in the future. - if err := s.validateInternalRequest(request.GetHeader(), true); err != nil { - return nil, err - } - if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { - fName := currentFunction() - limiter := s.GetGRPCRateLimiter() - if done, err := limiter.Allow(fName); err == nil { - defer done() - } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) - } - } - tsoAllocatorManager := s.GetTSOAllocatorManager() - // There is no dc-location found in this server, return err. - if tsoAllocatorManager.GetClusterDCLocationsNumber() == 0 { - return &pdpb.SyncMaxTSResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, - "empty cluster dc-location found, checker may not work properly"), - }, nil - } - // Get all Local TSO Allocator leaders - allocatorLeaders, err := tsoAllocatorManager.GetHoldingLocalAllocatorLeaders() - if err != nil { - return &pdpb.SyncMaxTSResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil - } - if !request.GetSkipCheck() { - var maxLocalTS *pdpb.Timestamp - syncedDCs := make([]string, 0, len(allocatorLeaders)) - for _, allocator := range allocatorLeaders { - // No longer leader, just skip here because - // the global allocator will check if all DCs are handled. - if !allocator.IsAllocatorLeader() { - continue - } - currentLocalTSO, err := allocator.GetCurrentTSO() - if err != nil { - return &pdpb.SyncMaxTSResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil - } - if tsoutil.CompareTimestamp(currentLocalTSO, maxLocalTS) > 0 { - maxLocalTS = currentLocalTSO - } - syncedDCs = append(syncedDCs, allocator.GetDCLocation()) - } - - failpoint.Inject("mockLocalAllocatorLeaderChange", func() { - if !mockLocalAllocatorLeaderChangeFlag { - maxLocalTS = nil - request.MaxTs = nil - mockLocalAllocatorLeaderChangeFlag = true - } - }) - - if maxLocalTS == nil { - return &pdpb.SyncMaxTSResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, - "local tso allocator leaders have changed during the sync, should retry"), - }, nil - } - if request.GetMaxTs() == nil { - return &pdpb.SyncMaxTSResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, - "empty maxTS in the request, should retry"), - }, nil - } - // Found a bigger or equal maxLocalTS, return it directly. - cmpResult := tsoutil.CompareTimestamp(maxLocalTS, request.GetMaxTs()) - if cmpResult >= 0 { - // Found an equal maxLocalTS, plus 1 to logical part before returning it. - // For example, we have a Global TSO t1 and a Local TSO t2, they have the - // same physical and logical parts. After being differentiating with suffix, - // there will be (t1.logical << suffixNum + 0) < (t2.logical << suffixNum + N), - // where N is bigger than 0, which will cause a Global TSO fallback than the previous Local TSO. - if cmpResult == 0 { - maxLocalTS.Logical += 1 - } - return &pdpb.SyncMaxTSResponse{ - Header: wrapHeader(), - MaxLocalTs: maxLocalTS, - SyncedDcs: syncedDCs, - }, nil - } - } - syncedDCs := make([]string, 0, len(allocatorLeaders)) - for _, allocator := range allocatorLeaders { - if !allocator.IsAllocatorLeader() { - continue - } - if err := allocator.WriteTSO(request.GetMaxTs()); err != nil { - return &pdpb.SyncMaxTSResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil - } - syncedDCs = append(syncedDCs, allocator.GetDCLocation()) - } +// Deprecated. +func (*GrpcServer) SyncMaxTS(_ context.Context, _ *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) { return &pdpb.SyncMaxTSResponse{ - Header: wrapHeader(), - SyncedDcs: syncedDCs, + Header: wrapHeader(), }, nil } @@ -2705,22 +2587,6 @@ func (*GrpcServer) GetDCLocationInfo(_ context.Context, _ *pdpb.GetDCLocationInf }, nil } -// validateInternalRequest checks if server is closed, which is used to validate -// the gRPC communication between PD servers internally. -func (s *GrpcServer) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error { - if s.IsClosed() { - return ErrNotStarted - } - // If onlyAllowLeader is true, check whether the sender is PD leader. - if onlyAllowLeader { - leaderID := s.GetLeader().GetMemberId() - if leaderID != header.GetSenderId() { - return status.Errorf(codes.FailedPrecondition, "%s, need %d but got %d", errs.MismatchLeaderErr, leaderID, header.GetSenderId()) - } - } - return nil -} - // for CDC compatibility, we need to initialize config path to `globalConfigPath` const globalConfigPath = "/global/config/" diff --git a/server/handler.go b/server/handler.go index 0789db9d35d..2ecf7763ce2 100644 --- a/server/handler.go +++ b/server/handler.go @@ -38,7 +38,6 @@ import ( "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" - "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/server/cluster" @@ -323,10 +322,7 @@ func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _ zap.Uint64("new-ts", ts), zap.Bool("ignore-smaller", ignoreSmaller), zap.Bool("skip-upper-bound-check", skipUpperBoundCheck)) - tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) - if err != nil { - return err - } + tsoAllocator := h.s.tsoAllocatorManager.GetAllocator() if tsoAllocator == nil { return errs.ErrServerNotStarted } diff --git a/server/server.go b/server/server.go index e2c9b31580d..e49496ff8d8 100644 --- a/server/server.go +++ b/server/server.go @@ -2057,11 +2057,6 @@ func (s *Server) SetExternalTS(externalTS, globalTS uint64) error { return c.SetExternalTS(externalTS) } -// IsLocalTSOEnabled returns if the local TSO is enabled. -func (s *Server) IsLocalTSOEnabled() bool { - return s.cfg.IsLocalTSOEnabled() -} - // GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings. // If the value is negative, there is no limit. func (s *Server) GetMaxConcurrentTSOProxyStreamings() int { diff --git a/tests/cluster.go b/tests/cluster.go index 7996e289cf5..bf17f79a87c 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -230,21 +230,6 @@ func (s *TestServer) GetLeader() *pdpb.Member { return s.server.GetLeader() } -// GetAllocatorLeader returns current allocator leader -// of PD cluster for given dc-location. -func (s *TestServer) GetAllocatorLeader(dcLocation string) *pdpb.Member { - // For the leader of Global TSO Allocator, it's the PD leader - if dcLocation == tso.GlobalDCLocation { - return s.GetLeader() - } - tsoAllocatorManager := s.GetTSOAllocatorManager() - allocator, err := tsoAllocatorManager.GetAllocator(dcLocation) - if err != nil { - return nil - } - return allocator.(*tso.LocalTSOAllocator).GetAllocatorLeader() -} - // GetKeyspaceManager returns the current TestServer's Keyspace Manager. func (s *TestServer) GetKeyspaceManager() *keyspace.Manager { s.RLock() @@ -287,19 +272,6 @@ func (s *TestServer) IsLeader() bool { return !s.server.IsClosed() && s.server.GetMember().IsLeader() } -// IsAllocatorLeader returns whether the server is a TSO Allocator leader or not. -func (s *TestServer) IsAllocatorLeader(dcLocation string) bool { - if dcLocation == tso.GlobalDCLocation { - return s.IsLeader() - } - tsoAllocatorManager := s.GetTSOAllocatorManager() - allocator, err := tsoAllocatorManager.GetAllocator(dcLocation) - if err != nil { - return false - } - return !s.server.IsClosed() && allocator.(*tso.LocalTSOAllocator).IsAllocatorLeader() -} - // GetEtcdLeader returns the builtin etcd leader. func (s *TestServer) GetEtcdLeader() (string, error) { s.RLock() diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index 758185b5e5b..7abf1503bec 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -276,7 +276,6 @@ func (suite *tsoAPITestSuite) TestConfig() { re.NoError(json.Unmarshal(respBytes, &cfg)) re.Equal(cfg.GetListenAddr(), primary.GetConfig().GetListenAddr()) re.Equal(cfg.GetTSOSaveInterval(), primary.GetConfig().GetTSOSaveInterval()) - re.Equal(cfg.IsLocalTSOEnabled(), primary.GetConfig().IsLocalTSOEnabled()) re.Equal(cfg.GetTSOUpdatePhysicalInterval(), primary.GetConfig().GetTSOUpdatePhysicalInterval()) re.Equal(cfg.GetMaxResetTSGap(), primary.GetConfig().GetMaxResetTSGap()) } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 3156f57b744..51f3fd37295 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -38,7 +38,6 @@ import ( "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" - tsopkg "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/testutil" @@ -223,7 +222,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe rootPath := keypath.TSOSvcRootPath() primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, param.keyspaceGroupID) timestampPath := keypath.FullTimestampPath(param.keyspaceGroupID) - re.Equal(timestampPath, am.GetTimestampPath(tsopkg.GlobalDCLocation)) + re.Equal(timestampPath, am.GetTimestampPath()) re.Equal(primaryPath, am.GetMember().GetLeaderPath()) served = true @@ -315,7 +314,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( primary := suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID) kgm := primary.GetKeyspaceGroupManager() re.NotNil(kgm) - ts, _, err := kgm.HandleTSORequest(suite.ctx, keyspaceID, keyspaceGroupID, tsopkg.GlobalDCLocation, 1) + ts, _, err := kgm.HandleTSORequest(suite.ctx, keyspaceID, keyspaceGroupID, 1) return ts, err } diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index f7cba524b3c..b0660ed6577 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -37,7 +37,6 @@ import ( tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/storage/endpoint" - tsopkg "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/tempurl" @@ -629,10 +628,7 @@ func TestTSOServiceSwitch(t *testing.T) { // Verify PD is not providing TSO service testutil.Eventually(re, func() bool { - allocator, err := pdLeader.GetServer().GetTSOAllocatorManager().GetAllocator(tsopkg.GlobalDCLocation) - if err != nil { - return false - } + allocator := pdLeader.GetServer().GetTSOAllocatorManager().GetAllocator() return !allocator.IsInitialize() }) diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index 8c6ca6f76bf..fd08e6557e8 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/assertutil" - "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" @@ -102,13 +101,6 @@ func TestMemberDelete(t *testing.T) { return true }) } - // Check whether the dc-location info of the corresponding member is deleted. - for _, member := range members { - key := member.GetServer().GetMember().GetDCLocationPath(member.GetServerID()) - resp, err := etcdutil.EtcdKVGet(leader.GetEtcdClient(), key) - re.NoError(err) - re.Empty(resp.Kvs) - } } func checkMemberList(re *require.Assertions, clientURL string, configs []*config.Config) error {