From d9f49378cd2d064df3a1369772efded0260064ca Mon Sep 17 00:00:00 2001 From: Lei Ni Date: Mon, 18 Sep 2023 00:02:53 +0800 Subject: [PATCH] registry: some refactoring for the registry module --- internal/registry/event.go | 52 ++++------- internal/registry/gossip.go | 91 +++++-------------- .../registry/{gossip_logger.go => logger.go} | 0 internal/registry/nodehost.go | 6 +- internal/registry/registry.go | 10 +- internal/registry/view.go | 37 ++++---- internal/registry/view_test.go | 40 ++++---- node.go | 14 +-- 8 files changed, 93 insertions(+), 157 deletions(-) rename internal/registry/{gossip_logger.go => logger.go} (100%) diff --git a/internal/registry/event.go b/internal/registry/event.go index 5cba0536d..2d8045d73 100644 --- a/internal/registry/event.go +++ b/internal/registry/event.go @@ -15,55 +15,35 @@ package registry import ( - "sync" - "github.com/hashicorp/memberlist" ) +// sliceEventDelegate is used to hook into memberlist to get notification +// about nodes joining and leaving. type sliceEventDelegate struct { - ch chan struct{} - - mu struct { - sync.Mutex - events []memberlist.NodeEvent - } + store *metaStore } var _ memberlist.EventDelegate = (*sliceEventDelegate)(nil) -func newSliceEventDelegate() *sliceEventDelegate { +func newSliceEventDelegate(store *metaStore) *sliceEventDelegate { return &sliceEventDelegate{ - ch: make(chan struct{}, 1), - } -} - -func (e *sliceEventDelegate) notify() { - select { - case e.ch <- struct{}{}: - default: + store: store, } } -func (e *sliceEventDelegate) get() []memberlist.NodeEvent { - e.mu.Lock() - defer e.mu.Unlock() - events := e.mu.events - e.mu.events = make([]memberlist.NodeEvent, 0) - return events -} - -func (e *sliceEventDelegate) put(typ memberlist.NodeEventType, n *memberlist.Node) { - node := *n - node.Meta = make([]byte, len(n.Meta)) - copy(node.Meta, n.Meta) - defer e.notify() - e.mu.Lock() - defer e.mu.Unlock() - event := memberlist.NodeEvent{ - Event: typ, - Node: &node, +func (e *sliceEventDelegate) put(eventType memberlist.NodeEventType, + n *memberlist.Node) { + if eventType == memberlist.NodeJoin || eventType == memberlist.NodeUpdate { + var m meta + if m.unmarshal(n.Meta) { + e.store.put(n.Name, m) + } + } else if eventType == memberlist.NodeLeave { + e.store.delete(n.Name) + } else { + panic("unknown event type") } - e.mu.events = append(e.mu.events, event) } func (e *sliceEventDelegate) NotifyJoin(n *memberlist.Node) { diff --git a/internal/registry/gossip.go b/internal/registry/gossip.go index 1bb218da4..285c924ed 100644 --- a/internal/registry/gossip.go +++ b/internal/registry/gossip.go @@ -35,6 +35,9 @@ var plog = logger.GetLogger("registry") type getShardInfo func() []ShardInfo +// meta is the metadata of the node. The actual payload is specified by the user +// by setting the Config.GossipConfig.Meta field. meta contains node information +// that will not change during the life of a particular NodeHost process. type meta struct { RaftAddress string Data []byte @@ -70,6 +73,7 @@ func (m *meta) unmarshal(data []byte) bool { return true } +// metaStore is a node name to node Meta concurrent map. type metaStore struct { nodes sync.Map } @@ -163,59 +167,15 @@ func (n *GossipRegistry) Resolve(shardID uint64, return "", "", ErrUnknownTarget } -type eventDelegate struct { - ed *sliceEventDelegate - stopper *syncutil.Stopper - store *metaStore -} - -func newEventDelegate(s *syncutil.Stopper, store *metaStore) *eventDelegate { - ed := &eventDelegate{ - stopper: s, - store: store, - ed: newSliceEventDelegate(), - } - return ed -} - -func (d *eventDelegate) handle() { - events := d.ed.get() - if len(events) == 0 { - return - } - for _, e := range events { - if e.Event == memberlist.NodeJoin || e.Event == memberlist.NodeUpdate { - var m meta - if m.unmarshal(e.Node.Meta) { - d.store.put(e.Node.Name, m) - } - } else if e.Event == memberlist.NodeLeave { - d.store.delete(e.Node.Name) - } else { - panic("unknown event type") - } - } -} - -func (d *eventDelegate) start() { - d.stopper.RunWorker(func() { - for { - select { - case <-d.stopper.ShouldStop(): - return - case <-d.ed.ch: - d.handle() - } - } - }) -} - +// delegate is used to hook into memberlist's gossip layer. type delegate struct { getShardInfo getShardInfo meta meta view *view } +var _ memberlist.Delegate = (*delegate)(nil) + func (d *delegate) NodeMeta(limit int) []byte { m := d.meta.marshal() if len(m) > limit { @@ -269,21 +229,17 @@ func parseAddress(addr string) (string, int, error) { } type gossipManager struct { - nhConfig config.NodeHostConfig - cfg *memberlist.Config - list *memberlist.Memberlist - ed *eventDelegate - view *view - store *metaStore - stopper *syncutil.Stopper - eventStopper *syncutil.Stopper + nhConfig config.NodeHostConfig + cfg *memberlist.Config + list *memberlist.Memberlist + view *view + store *metaStore + stopper *syncutil.Stopper } func newGossipManager(nhid string, f getShardInfo, nhConfig config.NodeHostConfig) (*gossipManager, error) { - eventStopper := syncutil.NewStopper() store := &metaStore{} - ed := newEventDelegate(eventStopper, store) cfg := memberlist.DefaultWANConfig() cfg.Logger = newGossipLogWrapper() cfg.Name = nhid @@ -322,7 +278,8 @@ func newGossipManager(nhid string, f getShardInfo, getShardInfo: f, view: view, } - cfg.Events = ed.ed + // set memberlist's event delegate + cfg.Events = newSliceEventDelegate(store) list, err := memberlist.Create(cfg) if err != nil { @@ -332,18 +289,15 @@ func newGossipManager(nhid string, f getShardInfo, seed := make([]string, 0, len(nhConfig.Gossip.Seed)) seed = append(seed, nhConfig.Gossip.Seed...) g := &gossipManager{ - nhConfig: nhConfig, - cfg: cfg, - list: list, - ed: ed, - view: view, - store: store, - stopper: syncutil.NewStopper(), - eventStopper: eventStopper, + nhConfig: nhConfig, + cfg: cfg, + list: list, + view: view, + store: store, + stopper: syncutil.NewStopper(), } // eventDelegate must be started first, otherwise join() could be blocked // on a large cluster - g.ed.start() g.join(seed) g.stopper.RunWorker(func() { ticker := time.NewTicker(500 * time.Millisecond) @@ -377,7 +331,6 @@ func (g *gossipManager) Close() error { if err := g.list.Shutdown(); err != nil { return errors.Wrapf(err, "shutdown memberlist failed") } - g.eventStopper.Stop() return nil } @@ -392,7 +345,7 @@ func (g *gossipManager) GetRaftAddress(nhid string) (string, bool) { if g.cfg.Name == nhid { return g.nhConfig.RaftAddress, true } - if v, ok := g.ed.store.get(nhid); ok { + if v, ok := g.store.get(nhid); ok { return v.RaftAddress, true } return "", false diff --git a/internal/registry/gossip_logger.go b/internal/registry/logger.go similarity index 100% rename from internal/registry/gossip_logger.go rename to internal/registry/logger.go diff --git a/internal/registry/nodehost.go b/internal/registry/nodehost.go index 3b176c314..509e57648 100644 --- a/internal/registry/nodehost.go +++ b/internal/registry/nodehost.go @@ -47,9 +47,9 @@ func (r *NodeHostRegistry) GetShardInfo(shardID uint64) (ShardView, bool) { return ShardView{}, false } result := ci - result.Nodes = make(map[uint64]string) - for shardID, target := range ci.Nodes { - result.Nodes[shardID] = target + result.Replicas = make(map[uint64]string) + for shardID, target := range ci.Replicas { + result.Replicas[shardID] = target } return result, true } diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 272433bcd..d305a6bee 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -32,7 +32,7 @@ var ( ErrUnknownTarget = errors.New("target address unknown") ) -// IResolver converts the (shard id, node id) tuple to network address. +// IResolver converts the (shard id, replica id) tuple to network address. type IResolver interface { Resolve(uint64, uint64) (string, string, error) Add(uint64, uint64, string) @@ -68,10 +68,10 @@ func NewNodeRegistry(streamConnections uint64, v config.TargetValidator) *Regist return n } -// Close closes the node registry. +// Close closes the registry. func (n *Registry) Close() error { return nil } -// Add adds the specified node and its target info to the registry. +// Add adds the specified replica and its target info to the registry. func (n *Registry) Add(shardID uint64, replicaID uint64, target string) { if n.validate != nil && !n.validate(target) { plog.Panicf("invalid target %s", target) @@ -98,7 +98,7 @@ func (n *Registry) Remove(shardID uint64, replicaID uint64) { n.addr.Delete(raftio.GetNodeInfo(shardID, replicaID)) } -// RemoveShard removes all nodes info associated with the specified shard +// RemoveShard removes info associated with the specified shard. func (n *Registry) RemoveShard(shardID uint64) { var toRemove []raftio.NodeInfo n.addr.Range(func(k, v interface{}) bool { @@ -113,7 +113,7 @@ func (n *Registry) RemoveShard(shardID uint64) { } } -// Resolve looks up the Addr of the specified node. +// Resolve looks up the address of the specified node. func (n *Registry) Resolve(shardID uint64, replicaID uint64) (string, string, error) { key := raftio.GetNodeInfo(shardID, replicaID) addr, ok := n.addr.Load(key) diff --git a/internal/registry/view.go b/internal/registry/view.go index 01bd6522b..1c0ff74e8 100644 --- a/internal/registry/view.go +++ b/internal/registry/view.go @@ -34,11 +34,11 @@ var ( // ShardInfo is a record for representing the state of a Raft shard based // on the knowledge of the local NodeHost instance. type ShardInfo struct { - // Nodes is a map of member node IDs to their Raft addresses. - Nodes map[uint64]string - // ShardID is the shard ID of the Raft shard node. + // Replicas is a map of member replica IDs to their Raft addresses. + Replicas map[uint64]string + // ShardID is the shard ID of the Raft shard. ShardID uint64 - // ReplicaID is the replica ID of the Raft node. + // ReplicaID is the replica ID of the Raft replica. ReplicaID uint64 // ConfigChangeIndex is the current config change index of the Raft node. // ConfigChangeIndex is Raft Log index of the last applied membership @@ -63,10 +63,11 @@ type ShardInfo struct { Pending bool } -// ShardView is the view of a shard from gossip's point of view. +// ShardView is the view of a shard from gossip's point of view at a certain +// point in time. type ShardView struct { ShardID uint64 - Nodes map[uint64]string + Replicas map[uint64]string ConfigChangeIndex uint64 LeaderID uint64 Term uint64 @@ -77,7 +78,7 @@ func toShardViewList(input []ShardInfo) []ShardView { for _, ci := range input { cv := ShardView{ ShardID: ci.ShardID, - Nodes: ci.Nodes, + Replicas: ci.Replicas, ConfigChangeIndex: ci.ConfigChangeIndex, LeaderID: ci.LeaderID, Term: ci.Term, @@ -87,11 +88,13 @@ func toShardViewList(input []ShardInfo) []ShardView { return result } -type sharedInfo struct { +type exchanged struct { DeploymentID uint64 ShardInfo []ShardView } +// view contains dynamic information on shards, it can change after an +// election or a raft configuration change. type view struct { deploymentID uint64 // shardID -> ShardView @@ -115,9 +118,9 @@ func (v *view) shardCount() int { return len(v.mu.shards) } -func mergeShardInfo(current ShardView, update ShardView) ShardView { +func mergeShardView(current ShardView, update ShardView) ShardView { if current.ConfigChangeIndex < update.ConfigChangeIndex { - current.Nodes = update.Nodes + current.Replicas = update.Replicas current.ConfigChangeIndex = update.ConfigChangeIndex } // we only keep which replica is the last known leader @@ -140,7 +143,7 @@ func (v *view) update(updates []ShardView) { if !ok { current = ShardView{ShardID: u.ShardID} } - v.mu.shards[u.ShardID] = mergeShardInfo(current, u) + v.mu.shards[u.ShardID] = mergeShardView(current, u) } } @@ -162,13 +165,13 @@ func getCompressedData(deploymentID uint64, l []ShardView, n int) []byte { if n == 0 { return nil } - si := sharedInfo{ + exchanged := exchanged{ DeploymentID: deploymentID, ShardInfo: l[:n], } var buf bytes.Buffer enc := gob.NewEncoder(&buf) - if err := enc.Encode(si); err != nil { + if err := enc.Encode(exchanged); err != nil { panic(err) } data := buf.Bytes() @@ -227,12 +230,12 @@ func (v *view) updateFrom(data []byte) { dst = dst[:n] buf := bytes.NewBuffer(dst) dec := gob.NewDecoder(buf) - si := sharedInfo{} - if err := dec.Decode(&si); err != nil { + exchanged := exchanged{} + if err := dec.Decode(&exchanged); err != nil { return } - if si.DeploymentID != v.deploymentID { + if exchanged.DeploymentID != v.deploymentID { return } - v.update(si.ShardInfo) + v.update(exchanged.ShardInfo) } diff --git a/internal/registry/view_test.go b/internal/registry/view_test.go index 3b26184f1..8d59103d0 100644 --- a/internal/registry/view_test.go +++ b/internal/registry/view_test.go @@ -26,7 +26,7 @@ func getTestShardView() []ShardView { cv1 := ShardView{ ShardID: 100, ConfigChangeIndex: 200, - Nodes: map[uint64]string{ + Replicas: map[uint64]string{ 200: "address1", 300: "address2", 400: "address3", @@ -35,7 +35,7 @@ func getTestShardView() []ShardView { cv2 := ShardView{ ShardID: 1340, ConfigChangeIndex: 126200, - Nodes: map[uint64]string{ + Replicas: map[uint64]string{ 1200: "myaddress1", 4300: "theiraddress2", 6400: "heraddress3", @@ -64,7 +64,7 @@ func TestConfigChangeIndexIsChecked(t *testing.T) { { ShardID: 1340, ConfigChangeIndex: 300, - Nodes: map[uint64]string{ + Replicas: map[uint64]string{ 1200: "myaddress1", 4300: "theiraddress2", }, @@ -74,13 +74,13 @@ func TestConfigChangeIndexIsChecked(t *testing.T) { c, ok := v.mu.shards[1340] assert.True(t, ok) assert.Equal(t, uint64(126200), c.ConfigChangeIndex) - assert.Equal(t, 3, len(c.Nodes)) + assert.Equal(t, 3, len(c.Replicas)) update = []ShardView{ { ShardID: 1340, ConfigChangeIndex: 226200, - Nodes: map[uint64]string{ + Replicas: map[uint64]string{ 1200: "myaddress1", 4300: "theiraddress2", }, @@ -90,7 +90,7 @@ func TestConfigChangeIndexIsChecked(t *testing.T) { c, ok = v.mu.shards[1340] assert.True(t, ok) assert.Equal(t, uint64(226200), c.ConfigChangeIndex) - assert.Equal(t, 2, len(c.Nodes)) + assert.Equal(t, 2, len(c.Replicas)) } func TestDeploymentIDIsChecked(t *testing.T) { @@ -117,20 +117,20 @@ func TestUpdateMembershipView(t *testing.T) { cv := ShardView{ ShardID: 123, ConfigChangeIndex: 100, - Nodes: make(map[uint64]string), + Replicas: make(map[uint64]string), } - cv.Nodes[1] = "t1" - cv.Nodes[2] = "t2" + cv.Replicas[1] = "t1" + cv.Replicas[2] = "t2" v.mu.shards[123] = cv ncv := ShardView{ ShardID: 123, ConfigChangeIndex: 200, - Nodes: make(map[uint64]string), + Replicas: make(map[uint64]string), } - ncv.Nodes[1] = "t1" - ncv.Nodes[2] = "t2" - ncv.Nodes[3] = "t3" + ncv.Replicas[1] = "t1" + ncv.Replicas[2] = "t2" + ncv.Replicas[3] = "t3" updates := []ShardView{ncv} v.update(updates) @@ -144,20 +144,20 @@ func TestOutOfDateMembershipInfoIsIgnored(t *testing.T) { cv := ShardView{ ShardID: 123, ConfigChangeIndex: 100, - Nodes: make(map[uint64]string), + Replicas: make(map[uint64]string), } - cv.Nodes[1] = "t1" - cv.Nodes[2] = "t2" + cv.Replicas[1] = "t1" + cv.Replicas[2] = "t2" v.mu.shards[123] = cv ncv := ShardView{ ShardID: 123, ConfigChangeIndex: 10, - Nodes: make(map[uint64]string), + Replicas: make(map[uint64]string), } - ncv.Nodes[1] = "t1" - ncv.Nodes[2] = "t2" - ncv.Nodes[3] = "t3" + ncv.Replicas[1] = "t1" + ncv.Replicas[2] = "t2" + ncv.Replicas[3] = "t3" updates := []ShardView{ncv} v.update(updates) diff --git a/node.go b/node.go index 36f592fca..394dd5e43 100644 --- a/node.go +++ b/node.go @@ -1600,7 +1600,7 @@ func (n *node) notifyConfigChange() { IsNonVoting: isNonVoting, IsWitness: isWitness, ConfigChangeIndex: m.ConfigChangeId, - Nodes: m.Addresses, + Replicas: m.Addresses, } n.shardInfo.Store(ci) n.sysEvents.Publish(server.SystemEvent{ @@ -1620,7 +1620,7 @@ func (n *node) getShardInfo() ShardInfo { StateMachineType: sm.Type(n.sm.Type()), } } - ci := v.(*ShardInfo) + info := v.(*ShardInfo) leaderID := uint64(0) term := uint64(0) @@ -1632,13 +1632,13 @@ func (n *node) getShardInfo() ShardInfo { } return ShardInfo{ - ShardID: ci.ShardID, - ReplicaID: ci.ReplicaID, + ShardID: info.ShardID, + ReplicaID: info.ReplicaID, LeaderID: leaderID, Term: term, - IsNonVoting: ci.IsNonVoting, - ConfigChangeIndex: ci.ConfigChangeIndex, - Nodes: ci.Nodes, + IsNonVoting: info.IsNonVoting, + ConfigChangeIndex: info.ConfigChangeIndex, + Replicas: info.Replicas, StateMachineType: sm.Type(n.sm.Type()), } }