Skip to content

Commit

Permalink
registry: support external node registry functions (lni#327)
Browse files Browse the repository at this point in the history
* fix: remove duplicate setRecovering call
* feat: start replica: add wait ready option
* Move INodeRegistry to raftio/registry
* Use Expert.NodeRegistryFactory if set
* Fix check and add a test
* Address review feedback pt 1
* Rename AddressByNodeHostID -> DefaultNodeRegistryEnabled
* Fix data race

---------

Co-authored-by: Noah-Jerome Lotzer <[email protected]>
  • Loading branch information
tylerwilliams and nolotz authored Sep 22, 2023
1 parent d9f4937 commit 657304a
Show file tree
Hide file tree
Showing 13 changed files with 297 additions and 85 deletions.
45 changes: 33 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ type NodeHostConfig struct {
// NodeHostID specifies what NodeHostID to use. By default, when NodeHostID
// is empty, a random UUID will be generated and recorded by the system.
// Specifying a concrete NodeHostID here will cause the specified NodeHostID
// value to be used. NodeHostID is only used when AddressByNodeHostID is
// value to be used. NodeHostID is only used when DefaultNodeRegistryEnabled is
// set to true.
NodeHostID string
// WALDir is the directory used for storing the WAL of Raft entries. It is
Expand Down Expand Up @@ -292,25 +292,25 @@ type NodeHostConfig struct {
// to all resolved IPv4 addresses.
//
// By default, the RaftAddress value is not allowed to change between NodeHost
// restarts. AddressByNodeHostID should be set to true when the RaftAddress
// restarts. DefaultNodeRegistryEnabled should be set to true when the RaftAddress
// value might change after restart.
RaftAddress string
// AddressByNodeHostID indicates that NodeHost instances should be addressed
// DefaultNodeRegistryEnabled indicates that NodeHost instances should be addressed
// by their NodeHostID values. This feature is usually used when only dynamic
// addresses are available. When enabled, NodeHostID values should be used
// as the target parameter when calling NodeHost's StartReplica,
// RequestAddReplica, RequestAddNonVoting and RequestAddWitness methods.
//
// Enabling AddressByNodeHostID also enables the internal gossip service,
// Enabling DefaultNodeRegistryEnabled also enables the internal gossip service,
// NodeHostConfig.Gossip must be configured to control the behaviors of the
// gossip service.
//
// Note that once enabled, the AddressByNodeHostID setting can not be later
// Note that once enabled, the DefaultNodeRegistryEnabled setting can not be later
// disabled after restarts.
//
// Please see the godocs of the NodeHostConfig.Gossip field for a detailed
// example on how AddressByNodeHostID and gossip works.
AddressByNodeHostID bool
// example on how DefaultNodeRegistryEnabled and gossip works.
DefaultNodeRegistryEnabled bool
// ListenAddress is an optional field in the hostname:port or IP:port address
// form used by the transport module to listen on for Raft message and
// snapshots. When the ListenAddress field is not set, The transport module
Expand Down Expand Up @@ -377,7 +377,7 @@ type NodeHostConfig struct {
// are both committed and applied.
NotifyCommit bool
// Gossip contains configurations for the gossip service. When the
// AddressByNodeHostID field is set to true, each NodeHost instance will use
// DefaultNodeRegistryEnabled field is set to true, each NodeHost instance will use
// an internal gossip service to exchange knowledges of known NodeHost
// instances including their RaftAddress and NodeHostID values. This Gossip
// field contains configurations that controls how the gossip service works.
Expand All @@ -395,7 +395,7 @@ type NodeHostConfig struct {
// 10.0.0.200:24000
// 10.0.0.300:24000
//
// To use these machines, first enable the NodeHostConfig.AddressByNodeHostID
// To use these machines, first enable the NodeHostConfig.DefaultNodeRegistryEnabled
// field and start the NodeHost instances. The NodeHostID value of each
// NodeHost instance can be obtained by calling NodeHost.ID(). Let's say they
// are
Expand Down Expand Up @@ -459,7 +459,13 @@ type NodeHostConfig struct {
// points the local gossip service will try to talk to. The Seed field doesn't
// need to include all gossip end points, a few well connected nodes in the
// gossip network is enough.
//
// Alternatively, if you wish to use a custom registry but manage it yourself,
// the Expert.NodeRegistryFactory field can be set to provide a registry that
// implements the raftio.INodeRegistry interface. A registry is simply a common
// channel shared between all nodes that allows them to identify each other.
Gossip GossipConfig

// Expert contains options for expert users who are familiar with the internals
// of Dragonboat. Users are recommended not to use this field unless
// absolutely necessary. It is important to note that any change to this field
Expand Down Expand Up @@ -487,6 +493,13 @@ type LogDBFactory interface {
Name() string
}

// NodeRegistryFactory is the interface used for providing a custom node registry.
// For a short example of how to implement a custom node registry, please see
// TestExternalNodeRegistryFunction in nodehost_test.go.
type NodeRegistryFactory interface {
Create(nhid string, streamConnections uint64, v TargetValidator) (raftio.INodeRegistry, error)
}

// TransportFactory is the interface used for creating custom transport modules.
type TransportFactory interface {
// Create creates a transport module.
Expand Down Expand Up @@ -564,7 +577,7 @@ func (c *NodeHostConfig) Validate() error {
if c.LogDBFactory != nil && c.Expert.LogDBFactory != nil {
return errors.New("both LogDBFactory and Expert.LogDBFactory specified")
}
if c.AddressByNodeHostID && c.Gossip.IsEmpty() {
if c.DefaultNodeRegistryEnabled && c.Gossip.IsEmpty() {
return errors.New("gossip service not configured")
}
validate := c.GetRaftAddressValidator()
Expand Down Expand Up @@ -674,6 +687,11 @@ func (c *NodeHostConfig) Prepare() error {
return nil
}

// NodeRegistryEnabled returns a bool indicating if any node registry is enabled.
func (c *NodeHostConfig) NodeRegistryEnabled() bool {
return c.DefaultNodeRegistryEnabled || c.Expert.NodeRegistryFactory != nil
}

// GetListenAddress returns the actual address the transport module is going to
// listen on.
func (c *NodeHostConfig) GetListenAddress() string {
Expand Down Expand Up @@ -725,7 +743,7 @@ func (c *NodeHostConfig) GetDeploymentID() uint64 {
// GetTargetValidator returns a TargetValidator based on the specified
// NodeHostConfig instance.
func (c *NodeHostConfig) GetTargetValidator() TargetValidator {
if c.AddressByNodeHostID {
if c.NodeRegistryEnabled() {
return id.IsNodeHostID
} else if c.Expert.TransportFactory != nil {
return c.Expert.TransportFactory.Validate
Expand Down Expand Up @@ -939,11 +957,14 @@ type ExpertConfig struct {
// TestGossipProbeInterval defines the probe interval used by the gossip
// service in tests.
TestGossipProbeInterval time.Duration
// NodeRegistryFactory defines a custom node registry function that can be used
// instead of a static registry or the built in memberlist gossip mechanism.
NodeRegistryFactory NodeRegistryFactory
}

// GossipConfig contains configurations for the gossip service. Gossip service
// is a fully distributed networked service for exchanging knowledge on
// NodeHost instances. When enabled by the NodeHostConfig.AddressByNodeHostID
// NodeHost instances. When enabled by the NodeHostConfig.DefaultNodeRegistryEnabled
// field, it is employed to manage NodeHostID to RaftAddress mappings of known
// NodeHost instances.
type GossipConfig struct {
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestLogDBFactoryAndExpertLogDBFactoryCanNotBeSetTogether(t *testing.T) {
}
}

func TestGossipMustBeConfiguredWhenAddressByNodeHostID(t *testing.T) {
func TestGossipMustBeConfiguredWhenDefaultNodeRegistryEnabled(t *testing.T) {
c := NodeHostConfig{
RaftAddress: "localhost:9010",
RTTMillisecond: 100,
Expand All @@ -200,7 +200,7 @@ func TestGossipMustBeConfiguredWhenAddressByNodeHostID(t *testing.T) {
if err := c.Validate(); err != nil {
t.Fatalf("invalid config")
}
c.AddressByNodeHostID = true
c.DefaultNodeRegistryEnabled = true
if err := c.Validate(); err == nil {
t.Fatalf("unexpectedly considreed as valid config")
}
Expand Down
2 changes: 1 addition & 1 deletion docs/devops.CHS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
* 在极端情况下,当多数节点同时永久故障并无法修复时,Raft组将不可用。此时须使用github.com/lni/dragonboat/tools包提供的ImportSnapshot工具修复受损的Raft组。这需要用户日常定期使用NodeHost的ExportSnapshot方法导出并备份快照供此灾备用途。
* 在默认方式下,NodeHost每次重启后其RaftAddress不能变化,否则将报错。
* 如无法直接满足上述RaftAddress不变的要求,比如IP地址会在每次机器重启后因随机分配而变更,可考虑是否可以使用DNS Name并配合适当配置管理,来达到RaftAddress始终不变的要求。
* 如依旧无法按照上述方法满足RaftAddress不变的需求,可通过设置AddressByNodeHostID项来开启gossip功能,它被设计用来处理动态的RaftAddress的场景,具体请参阅文档。
* 如依旧无法按照上述方法满足RaftAddress不变的需求,可通过设置DefaultNodeRegistryEnabled项来开启gossip功能,它被设计用来处理动态的RaftAddress的场景,具体请参阅文档。
* 用户应该自行测试系统是否具备高可用性,并测试在不同数量规模与组合的节点失效情况下用户系统是否可以正确处理并保持Raft组的高可用。各类灾备维护应该是日常CI的一部分。
2 changes: 1 addition & 1 deletion docs/devops.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ This document describes the DevOps requirements for operating Dragonboat based a
* When the quorum nodes are gone, you will not be able to resolve it without losing data. The github.com/lni/dragonboat/tools package provides the ImportSnapshot method to import a previously exported snapshot to repair such failed Raft shard.
* By default, the RaftAddress value can not be changed between restarts, otherwise the system will panic with an error message.
* When you can't provide a static IP for your nodes, e.g. when IP is dynamically assigned on node restart, you may want to configure a static DNS name for that node and update it on each restart.
* When it is not possible to do so, you can choose to set the AddressByNodeHostID field to enable the gossip feature which is designed to handle dynamic RaftAddress. Check godocs for more details on the gossip feature.
* When it is not possible to do so, you can choose to set the DefaultNodeRegistryEnabled field to enable the gossip feature which is designed to handle dynamic RaftAddress. Check godocs for more details on the gossip feature.
* Always test your system to ensure that it has high availability by design, disaster recovery should always be a part of the CI.
2 changes: 1 addition & 1 deletion docs/overview.CHS.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ NodeHost同时提供名为StaleRead的函数,如它的方法名称所表述的

默认下,每个Raft组的每个副本在被加入系统时都由用户明确指定它所在的节点的RaftAddress位置,系统以此确保各类Raft消息可以被正确发送给该副本。该方案简单直接,但缺点是RaftAddress必须是固定不变的,这要求使用固定的IP或者由用户维护一个DNS Name。在这一要求无法满足时,可以使用gossip功能来规避这一问题。

从v3.3版本开始,每个NodeHost节点都会被随机分配一个永久固定不变的NodeHostID值,它的值如nhid-1234567890形式,该值可由NodeHost的ID方法返回。在NodeHostConfig的AddressByNodeHostID项被设置为真后,所有新创建的副本都需要被指定其对应的NodeHostID值。此后,每次启动NodeHost实例时用于NodeHost间通讯的RaftAddress值可随意变化,每个NodeHost实例的RaftAddress与NodeHostID的对应关系将自动由后台的一个gossip服务来动态的维护,当Raft消息需要在两个副本间传递时,首先发生ReplicaID到NodeHostID的转换,接着由NodeHostID通过gossip服务查询得到对应的RaftAddress地址并完成消息副本间的传输。
从v3.3版本开始,每个NodeHost节点都会被随机分配一个永久固定不变的NodeHostID值,它的值如nhid-1234567890形式,该值可由NodeHost的ID方法返回。在NodeHostConfig的DefaultNodeRegistryEnabled项被设置为真后,所有新创建的副本都需要被指定其对应的NodeHostID值。此后,每次启动NodeHost实例时用于NodeHost间通讯的RaftAddress值可随意变化,每个NodeHost实例的RaftAddress与NodeHostID的对应关系将自动由后台的一个gossip服务来动态的维护,当Raft消息需要在两个副本间传递时,首先发生ReplicaID到NodeHostID的转换,接着由NodeHostID通过gossip服务查询得到对应的RaftAddress地址并完成消息副本间的传输。

Gossip服务本身是一个全分布的网络服务,用户仅需要通过NodeHostConfig.Gossip项简单设置其相关地址参数即可。

Expand Down
12 changes: 1 addition & 11 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,7 @@ type IResolver interface {
Add(uint64, uint64, string)
}

// INodeRegistry is the local registry interface used to keep all known
// nodes in the system.
type INodeRegistry interface {
Close() error
Add(shardID uint64, replicaID uint64, url string)
Remove(shardID uint64, replicaID uint64)
RemoveShard(shardID uint64)
Resolve(shardID uint64, replicaID uint64) (string, string, error)
}

var _ INodeRegistry = (*Registry)(nil)
var _ raftio.INodeRegistry = (*Registry)(nil)
var _ IResolver = (*Registry)(nil)

// Registry is used to manage all known node addresses in the multi raft system.
Expand Down
14 changes: 7 additions & 7 deletions internal/server/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ var (
// ErrDeploymentIDChanged is the error used to indicate that the deployment
// ID changed.
ErrDeploymentIDChanged = errors.New("deployment ID changed")
// ErrAddressByNodeHostIDChanged is the error used to indicate that the
// AddressByNodeHostID setting has changed.
ErrAddressByNodeHostIDChanged = errors.New("AddressByNodeHostID changed")
// ErrDefaultNodeRegistryEnabledChanged is the error used to indicate that the
// DefaultNodeRegistryEnabled setting has changed.
ErrDefaultNodeRegistryEnabledChanged = errors.New("DefaultNodeRegistryEnabled changed")
// ErrLogDBType is the error used to indicate that the LogDB type changed.
ErrLogDBType = errors.New("logdb type changed")
// ErrNotOwner indicates that the data directory belong to another NodeHost
Expand Down Expand Up @@ -408,7 +408,7 @@ func (env *Env) check(cfg config.NodeHostConfig,
return ErrLogDBType
}
if !dbto {
if !cfg.AddressByNodeHostID && !se(s.Address, cfg.RaftAddress) {
if !cfg.NodeRegistryEnabled() && !se(s.Address, cfg.RaftAddress) {
return ErrNotOwner
}
if len(s.Hostname) > 0 && !se(s.Hostname, env.hostname) {
Expand All @@ -417,8 +417,8 @@ func (env *Env) check(cfg config.NodeHostConfig,
if s.DeploymentId != 0 && s.DeploymentId != cfg.GetDeploymentID() {
return ErrDeploymentIDChanged
}
if s.AddressByNodeHostId != cfg.AddressByNodeHostID {
return ErrAddressByNodeHostIDChanged
if s.AddressByNodeHostId != cfg.DefaultNodeRegistryEnabled {
return ErrDefaultNodeRegistryEnabledChanged
}
if s.BinVer != binVer {
if s.BinVer == raftio.LogDBBinVersion &&
Expand Down Expand Up @@ -459,7 +459,7 @@ func (env *Env) createFlagFile(cfg config.NodeHostConfig,
LogdbShardCount: cfg.Expert.LogDB.Shards,
MaxSessionCount: settings.Hard.LRUMaxSessionCount,
EntryBatchSize: settings.Hard.LogDBEntryBatchSize,
AddressByNodeHostId: cfg.AddressByNodeHostID,
AddressByNodeHostId: cfg.DefaultNodeRegistryEnabled,
}
return fileutil.CreateFlagFile(dir, flagFilename, &s, env.fs)
}
8 changes: 4 additions & 4 deletions internal/server/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestRaftAddressIsAllowedToChangeWhenRequested(t *testing.T) {
if err := env.CheckNodeHostDir(cfg, binVer, testLogDBName); err == nil {
t.Fatalf("changed raft address not detected")
}
cfg.AddressByNodeHostID = true
cfg.DefaultNodeRegistryEnabled = true
status.AddressByNodeHostId = true
err = fileutil.CreateFlagFile(dir, flagFilename, &status, fs)
if err != nil {
Expand Down Expand Up @@ -169,7 +169,7 @@ func testNodeHostDirectoryDetectsMismatches(t *testing.T,
Expert: config.GetDefaultExpertConfig(),
DeploymentID: testDeploymentID,
RaftAddress: testAddress,
AddressByNodeHostID: addressByNodeHostID,
DefaultNodeRegistryEnabled: addressByNodeHostID,
}

status := raftpb.RaftDataStatus{
Expand Down Expand Up @@ -231,11 +231,11 @@ func TestCanDetectMismatchedHardHash(t *testing.T) {
testLogDBName, true, false, ErrHardSettingsChanged, fs)
}

func TestCanDetectMismatchedAddressByNodeHostID(t *testing.T) {
func TestCanDetectMismatchedDefaultNodeRegistryEnabled(t *testing.T) {
fs := vfs.GetTestFS()
testNodeHostDirectoryDetectsMismatches(t,
testAddress, "", raftio.LogDBBinVersion,
testLogDBName, false, true, ErrAddressByNodeHostIDChanged, fs)
testLogDBName, false, true, ErrDefaultNodeRegistryEnabledChanged, fs)
}

func TestLockFileCanBeLockedAndUnlocked(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func NewTransport(nhConfig config.NodeHostConfig,
dir server.SnapshotDirFunc, sysEvents ITransportEvent,
fs vfs.IFS) (*Transport, error) {
sourceID := nhConfig.RaftAddress
if nhConfig.AddressByNodeHostID {
if nhConfig.NodeRegistryEnabled() {
sourceID = env.NodeHostID()
}
t := &Transport{
Expand Down
5 changes: 2 additions & 3 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/lni/dragonboat/v4/internal/fileutil"
"github.com/lni/dragonboat/v4/internal/logdb"
"github.com/lni/dragonboat/v4/internal/raft"
"github.com/lni/dragonboat/v4/internal/registry"
"github.com/lni/dragonboat/v4/internal/rsm"
"github.com/lni/dragonboat/v4/internal/server"
"github.com/lni/dragonboat/v4/internal/settings"
Expand Down Expand Up @@ -77,7 +76,7 @@ type leaderInfo struct {
type node struct {
shardInfo atomic.Value
leaderInfo atomic.Value
nodeRegistry registry.INodeRegistry
nodeRegistry raftio.INodeRegistry
logdb raftio.ILogDB
pipeline pipeline
getStreamSink func(uint64, uint64) *transport.Sink
Expand Down Expand Up @@ -146,7 +145,7 @@ func newNode(peers map[uint64]string,
getStreamSink func(uint64, uint64) *transport.Sink,
handleSnapshotStatus func(uint64, uint64, bool),
sendMessage func(pb.Message),
nodeRegistry registry.INodeRegistry,
nodeRegistry raftio.INodeRegistry,
pool *sync.Pool,
ldb raftio.ILogDB,
metrics *logDBMetrics,
Expand Down
Loading

0 comments on commit 657304a

Please sign in to comment.