Skip to content

Commit

Permalink
optimization arbiter poll member status and add replset subscriber st…
Browse files Browse the repository at this point in the history
…atus tool
  • Loading branch information
snower committed Nov 11, 2022
1 parent 9a57171 commit 735eb69
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 43 deletions.
100 changes: 58 additions & 42 deletions server/arbiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ func (self *ArbiterMember) clientOnline(client *ArbiterClient) error {
self.status = ARBITER_MEMBER_STATUS_ONLINE
_ = self.manager.memberStatusUpdated(self)
_ = self.manager.voter.WakeupRetryVote()
go self.manager.voter.wakeupSubscriber()
return nil
}

Expand All @@ -651,6 +652,7 @@ func (self *ArbiterMember) clientOffline(client *ArbiterClient) error {

self.status = ARBITER_MEMBER_STATUS_OFFLINE
_ = self.manager.memberStatusUpdated(self)
go self.manager.voter.wakeupSubscriber()
return nil
}

Expand Down Expand Up @@ -1132,15 +1134,7 @@ func (self *ArbiterVoter) DoAnnouncement() error {
}
}

self.glock.Lock()
subscribers := self.subscribers
self.subscribers = nil
self.glock.Unlock()
if subscribers != nil {
for _, s := range subscribers {
s.handler(s, true)
}
}
go self.wakeupSubscriber()
self.manager.slock.Log().Infof("Arbiter replication do announcement finish")
return nil
}
Expand All @@ -1155,43 +1149,60 @@ func (self *ArbiterVoter) addSubscriber(subscriber *ArbiterVoterSubscriber) {
self.subscribers = make([]*ArbiterVoterSubscriber, 1)
self.subscribers[0] = subscriber
self.glock.Unlock()
go self.checkSubscriberTimeout(subscriber.timeoutTime - time.Now().Unix())
}

go func(timeout int64) {
for timeout > 0 {
select {
case <-self.closedWaiter:
case <-time.After(time.Duration(timeout) * time.Second):
}
self.glock.Lock()
if self.subscribers == nil {
self.glock.Unlock()
return
}
func (self *ArbiterVoter) wakeupSubscriber() {
self.glock.Lock()
if self.subscribers == nil {
self.glock.Unlock()
return
}
subscribers := self.subscribers
self.subscribers = make([]*ArbiterVoterSubscriber, 0)
self.glock.Unlock()
if subscribers != nil {
for _, subscriber := range subscribers {
subscriber.handler(subscriber, true)
}
}
self.manager.slock.Log().Infof("Arbiter replication wakeup subscriber finish")
}

now, subscribers, timeoutSubscribers := time.Now().Unix(), self.subscribers, make([]*ArbiterVoterSubscriber, 0)
timeout, self.subscribers = 0, nil
for _, s := range subscribers {
if s.timeoutTime <= now {
timeoutSubscribers = append(timeoutSubscribers, s)
continue
}
func (self *ArbiterVoter) checkSubscriberTimeout(timeout int64) {
for timeout > 0 {
select {
case <-self.closedWaiter:
case <-time.After(time.Duration(timeout) * time.Second):
}
self.glock.Lock()
if self.subscribers == nil {
self.glock.Unlock()
return
}

if self.subscribers == nil {
self.subscribers = make([]*ArbiterVoterSubscriber, 1)
self.subscribers[0] = s
} else {
self.subscribers = append(self.subscribers, s)
}
if timeout == 0 || timeout > s.timeoutTime-now {
timeout = s.timeoutTime - now
}
now, subscribers, timeoutSubscribers := time.Now().Unix(), self.subscribers, make([]*ArbiterVoterSubscriber, 0)
timeout, self.subscribers = 0, make([]*ArbiterVoterSubscriber, 0)
for _, subscriber := range subscribers {
if self.closed || subscriber.timeoutTime <= now {
timeoutSubscribers = append(timeoutSubscribers, subscriber)
continue
}
self.glock.Unlock()
for _, s := range timeoutSubscribers {
s.handler(s, false)

self.subscribers = append(self.subscribers, subscriber)
if timeout == 0 || timeout > subscriber.timeoutTime-now {
timeout = subscriber.timeoutTime - now
}
}
}(subscriber.timeoutTime - time.Now().Unix())

if len(self.subscribers) == 0 {
self.subscribers = nil
}
self.glock.Unlock()
for _, subscriber := range timeoutSubscribers {
subscriber.handler(subscriber, false)
}
}
}

func (self *ArbiterVoter) sleepWhenRetryVote() error {
Expand Down Expand Up @@ -2225,6 +2236,7 @@ func (self *ArbiterManager) commandHandleAnnouncementCommand(serverProtocol *Bin
_ = self.updateStatus()
}
self.glock.Unlock()
go self.voter.wakeupSubscriber()
self.slock.Log().Infof("Arbiter handle announcement from %s leader %s member count %d version %d commitid %d",
request.FromHost, leaderHost, len(newMembers), request.Replset.Version, request.Replset.CommitId)

Expand Down Expand Up @@ -2365,11 +2377,15 @@ func (self *ArbiterManager) commandHandleMemberListCommand(serverProtocol *Binar
}

if request.PollTimeout > 0 && self.voter != nil {
if request.Version < self.version || request.Vertime < self.vertime {
if request.Version < self.version || (request.Version == self.version && request.Vertime < self.vertime) {
return getResultCommand(), nil
}

self.voter.addSubscriber(&ArbiterVoterSubscriber{serverProtocol: serverProtocol, command: command, handler: func(subscriber *ArbiterVoterSubscriber, b bool) {
self.voter.addSubscriber(&ArbiterVoterSubscriber{serverProtocol: serverProtocol, command: command, handler: func(subscriber *ArbiterVoterSubscriber, succed bool) {
if !succed {
_ = subscriber.serverProtocol.Write(protocol.NewCallResultCommand(subscriber.command, 0, "ERR_PTIMEOUT", nil))
return
}
_ = subscriber.serverProtocol.Write(getResultCommand())
}, timeoutTime: time.Now().Unix() + int64(request.PollTimeout)})
return nil, nil
Expand Down
123 changes: 123 additions & 0 deletions tools/replset/subscribe/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"flag"
"fmt"
"github.com/snower/slock/client"
"github.com/snower/slock/protocol"
"github.com/snower/slock/protocol/protobuf"
"github.com/snower/slock/server"
"google.golang.org/protobuf/proto"
"os"
"os/signal"
"syscall"
"time"
)

func subscribe(client *client.Client, finishWaiter chan bool) {
fmt.Printf("start wait poll arbiter changing\n")
version, vertime := uint32(0), uint64(0)
for {
request := protobuf.ArbiterMemberListRequest{PollTimeout: 5, Version: version, Vertime: vertime}
data, err := proto.Marshal(&request)
if err != nil {
fmt.Printf("poll wait build request error %v\n", err)
close(finishWaiter)
return
}
command := protocol.NewCallCommand("REPL_MLIST", data)
commandResult, rerr := client.ExecuteCommand(command, 10)
if rerr != nil {
fmt.Printf("poll wait error %v\n", rerr)
close(finishWaiter)
return
}
callResultCommand, ok := commandResult.(*protocol.CallResultCommand)
if !ok {
fmt.Printf("poll wait unkown command result error %v\n", commandResult)
close(finishWaiter)
return
}
if callResultCommand.Result != 0 || callResultCommand.ErrType != "" {
if callResultCommand.ErrType == "ERR_UNINIT" || callResultCommand.ErrType == "ERR_PTIMEOUT" {
continue
}
fmt.Printf("poll wait command result error %d %v\n", callResultCommand.Result, callResultCommand.ErrType)
close(finishWaiter)
return
}
response := protobuf.ArbiterMemberListResponse{}
err = proto.Unmarshal(callResultCommand.Data, &response)
if err != nil {
fmt.Printf("poll wait parse response error %v\n", callResultCommand.ErrType)
close(finishWaiter)
return
}
version, vertime = response.Version, response.Vertime
if response.Members == nil || len(response.Members) == 0 {
continue
}

fmt.Printf("Replset Member Status: %s\n", time.Now().Format("2006-01-02 15:04:05.999999999-0700"))
for _, member := range response.Members {
arbiter, isself, status, abstianed, closed := "no", "no", "offline", "no", "no"
if member.Arbiter != 0 {
arbiter = "yes"
}
if member.IsSelf {
isself = "yes"
}
if member.Status == server.ARBITER_MEMBER_STATUS_ONLINE {
status = "online"
}
if member.Abstianed {
abstianed = "yes"
}
if member.Closed {
closed = "yes"
}
fmt.Printf("%s Weight:%d,Arbiter:%s,Role:%s,Status:%s,LastUpdated:%d,LastDelay:%.2f,LastError:%d,AofId:%x,IsSelf:%s,Abstianed:%s,Closed:%s\n",
member.Host, member.Weight, arbiter, server.ROLE_NAMES[member.Role], status, member.LastUpdated/1e6, float64(member.LastDelay)/1e6,
member.LastError, member.AofId, isself, abstianed, closed)
}
fmt.Println()
}
}

func main() {
port := flag.Int("port", 5658, "port")
host := flag.String("host", "127.0.0.1", "host")

flag.Parse()

c := client.NewClient(*host, uint(*port))
err := c.Open()
if err != nil {
fmt.Printf("Client open error %v\n", err)
return
}

finishWaiter, stopSignal := make(chan bool, 1), make(chan os.Signal, 1)
signal.Notify(stopSignal, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
go subscribe(c, finishWaiter)
select {
case <-finishWaiter:
err := c.Close()
if err != nil {
fmt.Printf("Client close error %v\n", err)
return
}
case <-stopSignal:
err := c.Close()
if err != nil {
fmt.Printf("Client close error %v\n", err)
return
}
case <-c.Unavailable():
err := c.Close()
if err != nil {
fmt.Printf("Client close error %v\n", err)
return
}
}
}
2 changes: 1 addition & 1 deletion tools/subscribe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func main() {

finishWaiter, stopSignal := make(chan bool, 1), make(chan os.Signal, 1)
signal.Notify(stopSignal, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
go subscribe(subscriber, finishWaiter)
go subscribe(subscriber.(*client.Subscriber), finishWaiter)
select {
case <-finishWaiter:
err := c.Close()
Expand Down

0 comments on commit 735eb69

Please sign in to comment.