Skip to content

Commit

Permalink
registry: some refactoring for the registry module
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Sep 17, 2023
1 parent 3c4430d commit d9f4937
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 157 deletions.
52 changes: 16 additions & 36 deletions internal/registry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,35 @@
package registry

import (
"sync"

"github.com/hashicorp/memberlist"
)

// sliceEventDelegate is used to hook into memberlist to get notification
// about nodes joining and leaving.
type sliceEventDelegate struct {
ch chan struct{}

mu struct {
sync.Mutex
events []memberlist.NodeEvent
}
store *metaStore
}

var _ memberlist.EventDelegate = (*sliceEventDelegate)(nil)

func newSliceEventDelegate() *sliceEventDelegate {
func newSliceEventDelegate(store *metaStore) *sliceEventDelegate {
return &sliceEventDelegate{
ch: make(chan struct{}, 1),
}
}

func (e *sliceEventDelegate) notify() {
select {
case e.ch <- struct{}{}:
default:
store: store,
}
}

func (e *sliceEventDelegate) get() []memberlist.NodeEvent {
e.mu.Lock()
defer e.mu.Unlock()
events := e.mu.events
e.mu.events = make([]memberlist.NodeEvent, 0)
return events
}

func (e *sliceEventDelegate) put(typ memberlist.NodeEventType, n *memberlist.Node) {
node := *n
node.Meta = make([]byte, len(n.Meta))
copy(node.Meta, n.Meta)
defer e.notify()
e.mu.Lock()
defer e.mu.Unlock()
event := memberlist.NodeEvent{
Event: typ,
Node: &node,
func (e *sliceEventDelegate) put(eventType memberlist.NodeEventType,
n *memberlist.Node) {
if eventType == memberlist.NodeJoin || eventType == memberlist.NodeUpdate {
var m meta
if m.unmarshal(n.Meta) {
e.store.put(n.Name, m)
}
} else if eventType == memberlist.NodeLeave {
e.store.delete(n.Name)
} else {
panic("unknown event type")
}
e.mu.events = append(e.mu.events, event)
}

func (e *sliceEventDelegate) NotifyJoin(n *memberlist.Node) {
Expand Down
91 changes: 22 additions & 69 deletions internal/registry/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ var plog = logger.GetLogger("registry")

type getShardInfo func() []ShardInfo

// meta is the metadata of the node. The actual payload is specified by the user
// by setting the Config.GossipConfig.Meta field. meta contains node information
// that will not change during the life of a particular NodeHost process.
type meta struct {
RaftAddress string
Data []byte
Expand Down Expand Up @@ -70,6 +73,7 @@ func (m *meta) unmarshal(data []byte) bool {
return true
}

// metaStore is a node name to node Meta concurrent map.
type metaStore struct {
nodes sync.Map
}
Expand Down Expand Up @@ -163,59 +167,15 @@ func (n *GossipRegistry) Resolve(shardID uint64,
return "", "", ErrUnknownTarget
}

type eventDelegate struct {
ed *sliceEventDelegate
stopper *syncutil.Stopper
store *metaStore
}

func newEventDelegate(s *syncutil.Stopper, store *metaStore) *eventDelegate {
ed := &eventDelegate{
stopper: s,
store: store,
ed: newSliceEventDelegate(),
}
return ed
}

func (d *eventDelegate) handle() {
events := d.ed.get()
if len(events) == 0 {
return
}
for _, e := range events {
if e.Event == memberlist.NodeJoin || e.Event == memberlist.NodeUpdate {
var m meta
if m.unmarshal(e.Node.Meta) {
d.store.put(e.Node.Name, m)
}
} else if e.Event == memberlist.NodeLeave {
d.store.delete(e.Node.Name)
} else {
panic("unknown event type")
}
}
}

func (d *eventDelegate) start() {
d.stopper.RunWorker(func() {
for {
select {
case <-d.stopper.ShouldStop():
return
case <-d.ed.ch:
d.handle()
}
}
})
}

// delegate is used to hook into memberlist's gossip layer.
type delegate struct {
getShardInfo getShardInfo
meta meta
view *view
}

var _ memberlist.Delegate = (*delegate)(nil)

func (d *delegate) NodeMeta(limit int) []byte {
m := d.meta.marshal()
if len(m) > limit {
Expand Down Expand Up @@ -269,21 +229,17 @@ func parseAddress(addr string) (string, int, error) {
}

type gossipManager struct {
nhConfig config.NodeHostConfig
cfg *memberlist.Config
list *memberlist.Memberlist
ed *eventDelegate
view *view
store *metaStore
stopper *syncutil.Stopper
eventStopper *syncutil.Stopper
nhConfig config.NodeHostConfig
cfg *memberlist.Config
list *memberlist.Memberlist
view *view
store *metaStore
stopper *syncutil.Stopper
}

func newGossipManager(nhid string, f getShardInfo,
nhConfig config.NodeHostConfig) (*gossipManager, error) {
eventStopper := syncutil.NewStopper()
store := &metaStore{}
ed := newEventDelegate(eventStopper, store)
cfg := memberlist.DefaultWANConfig()
cfg.Logger = newGossipLogWrapper()
cfg.Name = nhid
Expand Down Expand Up @@ -322,7 +278,8 @@ func newGossipManager(nhid string, f getShardInfo,
getShardInfo: f,
view: view,
}
cfg.Events = ed.ed
// set memberlist's event delegate
cfg.Events = newSliceEventDelegate(store)

list, err := memberlist.Create(cfg)
if err != nil {
Expand All @@ -332,18 +289,15 @@ func newGossipManager(nhid string, f getShardInfo,
seed := make([]string, 0, len(nhConfig.Gossip.Seed))
seed = append(seed, nhConfig.Gossip.Seed...)
g := &gossipManager{
nhConfig: nhConfig,
cfg: cfg,
list: list,
ed: ed,
view: view,
store: store,
stopper: syncutil.NewStopper(),
eventStopper: eventStopper,
nhConfig: nhConfig,
cfg: cfg,
list: list,
view: view,
store: store,
stopper: syncutil.NewStopper(),
}
// eventDelegate must be started first, otherwise join() could be blocked
// on a large cluster
g.ed.start()
g.join(seed)
g.stopper.RunWorker(func() {
ticker := time.NewTicker(500 * time.Millisecond)
Expand Down Expand Up @@ -377,7 +331,6 @@ func (g *gossipManager) Close() error {
if err := g.list.Shutdown(); err != nil {
return errors.Wrapf(err, "shutdown memberlist failed")
}
g.eventStopper.Stop()
return nil
}

Expand All @@ -392,7 +345,7 @@ func (g *gossipManager) GetRaftAddress(nhid string) (string, bool) {
if g.cfg.Name == nhid {
return g.nhConfig.RaftAddress, true
}
if v, ok := g.ed.store.get(nhid); ok {
if v, ok := g.store.get(nhid); ok {
return v.RaftAddress, true
}
return "", false
Expand Down
File renamed without changes.
6 changes: 3 additions & 3 deletions internal/registry/nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func (r *NodeHostRegistry) GetShardInfo(shardID uint64) (ShardView, bool) {
return ShardView{}, false
}
result := ci
result.Nodes = make(map[uint64]string)
for shardID, target := range ci.Nodes {
result.Nodes[shardID] = target
result.Replicas = make(map[uint64]string)
for shardID, target := range ci.Replicas {
result.Replicas[shardID] = target
}
return result, true
}
10 changes: 5 additions & 5 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
ErrUnknownTarget = errors.New("target address unknown")
)

// IResolver converts the (shard id, node id) tuple to network address.
// IResolver converts the (shard id, replica id) tuple to network address.
type IResolver interface {
Resolve(uint64, uint64) (string, string, error)
Add(uint64, uint64, string)
Expand Down Expand Up @@ -68,10 +68,10 @@ func NewNodeRegistry(streamConnections uint64, v config.TargetValidator) *Regist
return n
}

// Close closes the node registry.
// Close closes the registry.
func (n *Registry) Close() error { return nil }

// Add adds the specified node and its target info to the registry.
// Add adds the specified replica and its target info to the registry.
func (n *Registry) Add(shardID uint64, replicaID uint64, target string) {
if n.validate != nil && !n.validate(target) {
plog.Panicf("invalid target %s", target)
Expand All @@ -98,7 +98,7 @@ func (n *Registry) Remove(shardID uint64, replicaID uint64) {
n.addr.Delete(raftio.GetNodeInfo(shardID, replicaID))
}

// RemoveShard removes all nodes info associated with the specified shard
// RemoveShard removes info associated with the specified shard.
func (n *Registry) RemoveShard(shardID uint64) {
var toRemove []raftio.NodeInfo
n.addr.Range(func(k, v interface{}) bool {
Expand All @@ -113,7 +113,7 @@ func (n *Registry) RemoveShard(shardID uint64) {
}
}

// Resolve looks up the Addr of the specified node.
// Resolve looks up the address of the specified node.
func (n *Registry) Resolve(shardID uint64, replicaID uint64) (string, string, error) {
key := raftio.GetNodeInfo(shardID, replicaID)
addr, ok := n.addr.Load(key)
Expand Down
37 changes: 20 additions & 17 deletions internal/registry/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ var (
// ShardInfo is a record for representing the state of a Raft shard based
// on the knowledge of the local NodeHost instance.
type ShardInfo struct {
// Nodes is a map of member node IDs to their Raft addresses.
Nodes map[uint64]string
// ShardID is the shard ID of the Raft shard node.
// Replicas is a map of member replica IDs to their Raft addresses.
Replicas map[uint64]string
// ShardID is the shard ID of the Raft shard.
ShardID uint64
// ReplicaID is the replica ID of the Raft node.
// ReplicaID is the replica ID of the Raft replica.
ReplicaID uint64
// ConfigChangeIndex is the current config change index of the Raft node.
// ConfigChangeIndex is Raft Log index of the last applied membership
Expand All @@ -63,10 +63,11 @@ type ShardInfo struct {
Pending bool
}

// ShardView is the view of a shard from gossip's point of view.
// ShardView is the view of a shard from gossip's point of view at a certain
// point in time.
type ShardView struct {
ShardID uint64
Nodes map[uint64]string
Replicas map[uint64]string
ConfigChangeIndex uint64
LeaderID uint64
Term uint64
Expand All @@ -77,7 +78,7 @@ func toShardViewList(input []ShardInfo) []ShardView {
for _, ci := range input {
cv := ShardView{
ShardID: ci.ShardID,
Nodes: ci.Nodes,
Replicas: ci.Replicas,
ConfigChangeIndex: ci.ConfigChangeIndex,
LeaderID: ci.LeaderID,
Term: ci.Term,
Expand All @@ -87,11 +88,13 @@ func toShardViewList(input []ShardInfo) []ShardView {
return result
}

type sharedInfo struct {
type exchanged struct {
DeploymentID uint64
ShardInfo []ShardView
}

// view contains dynamic information on shards, it can change after an
// election or a raft configuration change.
type view struct {
deploymentID uint64
// shardID -> ShardView
Expand All @@ -115,9 +118,9 @@ func (v *view) shardCount() int {
return len(v.mu.shards)
}

func mergeShardInfo(current ShardView, update ShardView) ShardView {
func mergeShardView(current ShardView, update ShardView) ShardView {
if current.ConfigChangeIndex < update.ConfigChangeIndex {
current.Nodes = update.Nodes
current.Replicas = update.Replicas
current.ConfigChangeIndex = update.ConfigChangeIndex
}
// we only keep which replica is the last known leader
Expand All @@ -140,7 +143,7 @@ func (v *view) update(updates []ShardView) {
if !ok {
current = ShardView{ShardID: u.ShardID}
}
v.mu.shards[u.ShardID] = mergeShardInfo(current, u)
v.mu.shards[u.ShardID] = mergeShardView(current, u)
}
}

Expand All @@ -162,13 +165,13 @@ func getCompressedData(deploymentID uint64, l []ShardView, n int) []byte {
if n == 0 {
return nil
}
si := sharedInfo{
exchanged := exchanged{
DeploymentID: deploymentID,
ShardInfo: l[:n],
}
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(si); err != nil {
if err := enc.Encode(exchanged); err != nil {
panic(err)
}
data := buf.Bytes()
Expand Down Expand Up @@ -227,12 +230,12 @@ func (v *view) updateFrom(data []byte) {
dst = dst[:n]
buf := bytes.NewBuffer(dst)
dec := gob.NewDecoder(buf)
si := sharedInfo{}
if err := dec.Decode(&si); err != nil {
exchanged := exchanged{}
if err := dec.Decode(&exchanged); err != nil {
return
}
if si.DeploymentID != v.deploymentID {
if exchanged.DeploymentID != v.deploymentID {
return
}
v.update(si.ShardInfo)
v.update(exchanged.ShardInfo)
}
Loading

0 comments on commit d9f4937

Please sign in to comment.