From 735eb69715558e890729b4b44a8b3e2e29f90362 Mon Sep 17 00:00:00 2001 From: snower Date: Fri, 11 Nov 2022 16:41:14 +0800 Subject: [PATCH] optimization arbiter poll member status and add replset subscriber status tool --- server/arbiter.go | 100 +++++++++++++++----------- tools/replset/subscribe/main.go | 123 ++++++++++++++++++++++++++++++++ tools/subscribe/main.go | 2 +- 3 files changed, 182 insertions(+), 43 deletions(-) create mode 100644 tools/replset/subscribe/main.go diff --git a/server/arbiter.go b/server/arbiter.go index f7484ae..bbb03ce 100644 --- a/server/arbiter.go +++ b/server/arbiter.go @@ -641,6 +641,7 @@ func (self *ArbiterMember) clientOnline(client *ArbiterClient) error { self.status = ARBITER_MEMBER_STATUS_ONLINE _ = self.manager.memberStatusUpdated(self) _ = self.manager.voter.WakeupRetryVote() + go self.manager.voter.wakeupSubscriber() return nil } @@ -651,6 +652,7 @@ func (self *ArbiterMember) clientOffline(client *ArbiterClient) error { self.status = ARBITER_MEMBER_STATUS_OFFLINE _ = self.manager.memberStatusUpdated(self) + go self.manager.voter.wakeupSubscriber() return nil } @@ -1132,15 +1134,7 @@ func (self *ArbiterVoter) DoAnnouncement() error { } } - self.glock.Lock() - subscribers := self.subscribers - self.subscribers = nil - self.glock.Unlock() - if subscribers != nil { - for _, s := range subscribers { - s.handler(s, true) - } - } + go self.wakeupSubscriber() self.manager.slock.Log().Infof("Arbiter replication do announcement finish") return nil } @@ -1155,43 +1149,60 @@ func (self *ArbiterVoter) addSubscriber(subscriber *ArbiterVoterSubscriber) { self.subscribers = make([]*ArbiterVoterSubscriber, 1) self.subscribers[0] = subscriber self.glock.Unlock() + go self.checkSubscriberTimeout(subscriber.timeoutTime - time.Now().Unix()) +} - go func(timeout int64) { - for timeout > 0 { - select { - case <-self.closedWaiter: - case <-time.After(time.Duration(timeout) * time.Second): - } - self.glock.Lock() - if self.subscribers == nil { - self.glock.Unlock() - return - } +func (self *ArbiterVoter) wakeupSubscriber() { + self.glock.Lock() + if self.subscribers == nil { + self.glock.Unlock() + return + } + subscribers := self.subscribers + self.subscribers = make([]*ArbiterVoterSubscriber, 0) + self.glock.Unlock() + if subscribers != nil { + for _, subscriber := range subscribers { + subscriber.handler(subscriber, true) + } + } + self.manager.slock.Log().Infof("Arbiter replication wakeup subscriber finish") +} - now, subscribers, timeoutSubscribers := time.Now().Unix(), self.subscribers, make([]*ArbiterVoterSubscriber, 0) - timeout, self.subscribers = 0, nil - for _, s := range subscribers { - if s.timeoutTime <= now { - timeoutSubscribers = append(timeoutSubscribers, s) - continue - } +func (self *ArbiterVoter) checkSubscriberTimeout(timeout int64) { + for timeout > 0 { + select { + case <-self.closedWaiter: + case <-time.After(time.Duration(timeout) * time.Second): + } + self.glock.Lock() + if self.subscribers == nil { + self.glock.Unlock() + return + } - if self.subscribers == nil { - self.subscribers = make([]*ArbiterVoterSubscriber, 1) - self.subscribers[0] = s - } else { - self.subscribers = append(self.subscribers, s) - } - if timeout == 0 || timeout > s.timeoutTime-now { - timeout = s.timeoutTime - now - } + now, subscribers, timeoutSubscribers := time.Now().Unix(), self.subscribers, make([]*ArbiterVoterSubscriber, 0) + timeout, self.subscribers = 0, make([]*ArbiterVoterSubscriber, 0) + for _, subscriber := range subscribers { + if self.closed || subscriber.timeoutTime <= now { + timeoutSubscribers = append(timeoutSubscribers, subscriber) + continue } - self.glock.Unlock() - for _, s := range timeoutSubscribers { - s.handler(s, false) + + self.subscribers = append(self.subscribers, subscriber) + if timeout == 0 || timeout > subscriber.timeoutTime-now { + timeout = subscriber.timeoutTime - now } } - }(subscriber.timeoutTime - time.Now().Unix()) + + if len(self.subscribers) == 0 { + self.subscribers = nil + } + self.glock.Unlock() + for _, subscriber := range timeoutSubscribers { + subscriber.handler(subscriber, false) + } + } } func (self *ArbiterVoter) sleepWhenRetryVote() error { @@ -2225,6 +2236,7 @@ func (self *ArbiterManager) commandHandleAnnouncementCommand(serverProtocol *Bin _ = self.updateStatus() } self.glock.Unlock() + go self.voter.wakeupSubscriber() self.slock.Log().Infof("Arbiter handle announcement from %s leader %s member count %d version %d commitid %d", request.FromHost, leaderHost, len(newMembers), request.Replset.Version, request.Replset.CommitId) @@ -2365,11 +2377,15 @@ func (self *ArbiterManager) commandHandleMemberListCommand(serverProtocol *Binar } if request.PollTimeout > 0 && self.voter != nil { - if request.Version < self.version || request.Vertime < self.vertime { + if request.Version < self.version || (request.Version == self.version && request.Vertime < self.vertime) { return getResultCommand(), nil } - self.voter.addSubscriber(&ArbiterVoterSubscriber{serverProtocol: serverProtocol, command: command, handler: func(subscriber *ArbiterVoterSubscriber, b bool) { + self.voter.addSubscriber(&ArbiterVoterSubscriber{serverProtocol: serverProtocol, command: command, handler: func(subscriber *ArbiterVoterSubscriber, succed bool) { + if !succed { + _ = subscriber.serverProtocol.Write(protocol.NewCallResultCommand(subscriber.command, 0, "ERR_PTIMEOUT", nil)) + return + } _ = subscriber.serverProtocol.Write(getResultCommand()) }, timeoutTime: time.Now().Unix() + int64(request.PollTimeout)}) return nil, nil diff --git a/tools/replset/subscribe/main.go b/tools/replset/subscribe/main.go new file mode 100644 index 0000000..5f34976 --- /dev/null +++ b/tools/replset/subscribe/main.go @@ -0,0 +1,123 @@ +package main + +import ( + "flag" + "fmt" + "github.com/snower/slock/client" + "github.com/snower/slock/protocol" + "github.com/snower/slock/protocol/protobuf" + "github.com/snower/slock/server" + "google.golang.org/protobuf/proto" + "os" + "os/signal" + "syscall" + "time" +) + +func subscribe(client *client.Client, finishWaiter chan bool) { + fmt.Printf("start wait poll arbiter changing\n") + version, vertime := uint32(0), uint64(0) + for { + request := protobuf.ArbiterMemberListRequest{PollTimeout: 5, Version: version, Vertime: vertime} + data, err := proto.Marshal(&request) + if err != nil { + fmt.Printf("poll wait build request error %v\n", err) + close(finishWaiter) + return + } + command := protocol.NewCallCommand("REPL_MLIST", data) + commandResult, rerr := client.ExecuteCommand(command, 10) + if rerr != nil { + fmt.Printf("poll wait error %v\n", rerr) + close(finishWaiter) + return + } + callResultCommand, ok := commandResult.(*protocol.CallResultCommand) + if !ok { + fmt.Printf("poll wait unkown command result error %v\n", commandResult) + close(finishWaiter) + return + } + if callResultCommand.Result != 0 || callResultCommand.ErrType != "" { + if callResultCommand.ErrType == "ERR_UNINIT" || callResultCommand.ErrType == "ERR_PTIMEOUT" { + continue + } + fmt.Printf("poll wait command result error %d %v\n", callResultCommand.Result, callResultCommand.ErrType) + close(finishWaiter) + return + } + response := protobuf.ArbiterMemberListResponse{} + err = proto.Unmarshal(callResultCommand.Data, &response) + if err != nil { + fmt.Printf("poll wait parse response error %v\n", callResultCommand.ErrType) + close(finishWaiter) + return + } + version, vertime = response.Version, response.Vertime + if response.Members == nil || len(response.Members) == 0 { + continue + } + + fmt.Printf("Replset Member Status: %s\n", time.Now().Format("2006-01-02 15:04:05.999999999-0700")) + for _, member := range response.Members { + arbiter, isself, status, abstianed, closed := "no", "no", "offline", "no", "no" + if member.Arbiter != 0 { + arbiter = "yes" + } + if member.IsSelf { + isself = "yes" + } + if member.Status == server.ARBITER_MEMBER_STATUS_ONLINE { + status = "online" + } + if member.Abstianed { + abstianed = "yes" + } + if member.Closed { + closed = "yes" + } + fmt.Printf("%s Weight:%d,Arbiter:%s,Role:%s,Status:%s,LastUpdated:%d,LastDelay:%.2f,LastError:%d,AofId:%x,IsSelf:%s,Abstianed:%s,Closed:%s\n", + member.Host, member.Weight, arbiter, server.ROLE_NAMES[member.Role], status, member.LastUpdated/1e6, float64(member.LastDelay)/1e6, + member.LastError, member.AofId, isself, abstianed, closed) + } + fmt.Println() + } +} + +func main() { + port := flag.Int("port", 5658, "port") + host := flag.String("host", "127.0.0.1", "host") + + flag.Parse() + + c := client.NewClient(*host, uint(*port)) + err := c.Open() + if err != nil { + fmt.Printf("Client open error %v\n", err) + return + } + + finishWaiter, stopSignal := make(chan bool, 1), make(chan os.Signal, 1) + signal.Notify(stopSignal, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + go subscribe(c, finishWaiter) + select { + case <-finishWaiter: + err := c.Close() + if err != nil { + fmt.Printf("Client close error %v\n", err) + return + } + case <-stopSignal: + err := c.Close() + if err != nil { + fmt.Printf("Client close error %v\n", err) + return + } + case <-c.Unavailable(): + err := c.Close() + if err != nil { + fmt.Printf("Client close error %v\n", err) + return + } + } +} diff --git a/tools/subscribe/main.go b/tools/subscribe/main.go index f6a1cd3..eb1102d 100644 --- a/tools/subscribe/main.go +++ b/tools/subscribe/main.go @@ -110,7 +110,7 @@ func main() { finishWaiter, stopSignal := make(chan bool, 1), make(chan os.Signal, 1) signal.Notify(stopSignal, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) - go subscribe(subscriber, finishWaiter) + go subscribe(subscriber.(*client.Subscriber), finishWaiter) select { case <-finishWaiter: err := c.Close()