Skip to content

Commit

Permalink
tso/local: remove dc location and local_locator.go (#8851)
Browse files Browse the repository at this point in the history
ref #8802

Signed-off-by: okJiang <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
okJiang and ti-chi-bot[bot] authored Nov 29, 2024
1 parent da0000a commit a86f3dd
Show file tree
Hide file tree
Showing 31 changed files with 104 additions and 914 deletions.
7 changes: 1 addition & 6 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,19 +302,14 @@ func transferPrimary(c *gin.Context) {
return
}

globalAllocator, err := allocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
// only members of specific group are valid primary candidates.
group := svr.GetKeyspaceGroupManager().GetKeyspaceGroups()[keyspaceGroupID]
memberMap := make(map[string]bool, len(group.Members))
for _, member := range group.Members {
memberMap[member.Address] = true
}

if err := utils.TransferPrimary(svr.GetClient(), globalAllocator.(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(),
if err := utils.TransferPrimary(svr.GetClient(), allocator.GetAllocator().(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(),
constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID, memberMap); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
Expand Down
5 changes: 0 additions & 5 deletions pkg/mcs/tso/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,6 @@ func (c *Config) GetLeaderLease() int64 {
return c.LeaderLease
}

// IsLocalTSOEnabled returns if the local TSO is enabled.
func (c *Config) IsLocalTSOEnabled() bool {
return c.EnableLocalTSO
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
func (c *Config) GetTSOUpdatePhysicalInterval() time.Duration {
return c.TSOUpdatePhysicalInterval.Duration
Expand Down
5 changes: 2 additions & 3 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,11 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
}
keyspaceID := header.GetKeyspaceId()
keyspaceGroupID := header.GetKeyspaceGroupId()
dcLocation := request.GetDcLocation()
count := request.GetCount()
ts, keyspaceGroupBelongTo, err := s.keyspaceGroupManager.HandleTSORequest(
ctx,
keyspaceID, keyspaceGroupID,
dcLocation, count)
count)
if err != nil {
return status.Error(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -198,7 +197,7 @@ func (s *Service) GetMinTS(
}, nil
}

minTS, kgAskedCount, kgTotalCount, err := s.keyspaceGroupManager.GetMinTS(request.GetDcLocation())
minTS, kgAskedCount, kgTotalCount, err := s.keyspaceGroupManager.GetMinTS()
if err != nil {
return &tsopb.GetMinTSResponse{
Header: wrapErrorToHeader(
Expand Down
5 changes: 1 addition & 4 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,7 @@ func (s *Server) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, key
log.Error("failed to get allocator manager", errs.ZapError(err))
return err
}
tsoAllocator, err := tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
return err
}
tsoAllocator := tsoAllocatorManager.GetAllocator()
if tsoAllocator == nil {
return errs.ErrServerNotStarted
}
Expand Down
24 changes: 0 additions & 24 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,6 @@ func (m *EmbeddedEtcdMember) getMemberLeaderPriorityPath(id uint64) string {
return path.Join(m.rootPath, fmt.Sprintf("member/%d/leader_priority", id))
}

// GetDCLocationPathPrefix returns the dc-location path prefix of the cluster.
func (*EmbeddedEtcdMember) GetDCLocationPathPrefix() string {
return keypath.Prefix(keypath.DCLocationPath(nil, 0))
}

// GetDCLocationPath returns the dc-location path of a member with the given member ID.
func (*EmbeddedEtcdMember) GetDCLocationPath(id uint64) string {
return keypath.DCLocationPath(nil, id)
}

// SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader.
func (m *EmbeddedEtcdMember) SetMemberLeaderPriority(id uint64, priority int) error {
key := m.getMemberLeaderPriorityPath(id)
Expand Down Expand Up @@ -421,20 +411,6 @@ func (m *EmbeddedEtcdMember) DeleteMemberLeaderPriority(id uint64) error {
return nil
}

// DeleteMemberDCLocationInfo removes a member's dc-location info.
func (m *EmbeddedEtcdMember) DeleteMemberDCLocationInfo(id uint64) error {
key := m.GetDCLocationPath(id)
res, err := m.leadership.LeaderTxn().Then(clientv3.OpDelete(key)).Commit()
if err != nil {
return errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause()
}
if !res.Succeeded {
log.Error("delete dc-location info failed, maybe not pd leader")
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
return nil
}

// GetMemberLeaderPriority loads a member's priority to be elected as the etcd leader.
func (m *EmbeddedEtcdMember) GetMemberLeaderPriority(id uint64) (int, error) {
key := m.getMemberLeaderPriorityPath(id)
Expand Down
10 changes: 0 additions & 10 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,6 @@ func (m *Participant) IsSameLeader(leader participant) bool {
return leader.GetId() == m.ID()
}

// GetDCLocationPathPrefix returns the dc-location path prefix of the cluster.
func (m *Participant) GetDCLocationPathPrefix() string {
return keypath.Prefix(keypath.DCLocationPath(&m.MsParam, 0))
}

// GetDCLocationPath returns the dc-location path of a member with the given member ID.
func (m *Participant) GetDCLocationPath(id uint64) string {
return keypath.DCLocationPath(&m.MsParam, id)
}

func (m *Participant) campaignCheck() bool {
checker := m.campaignChecker.Load()
if checker == nil {
Expand Down
Loading

0 comments on commit a86f3dd

Please sign in to comment.