diff --git a/config/config.go b/config/config.go index 91cf547d9..dd0b1b42e 100644 --- a/config/config.go +++ b/config/config.go @@ -487,6 +487,11 @@ type LogDBFactory interface { Name() string } +// NodeRegistryFactory is the interface used for providing a custom node registry. +type NodeRegistryFactory interface { + Create(nhid string, streamConnections uint64, v TargetValidator) (raftio.INodeRegistry, error) +} + // TransportFactory is the interface used for creating custom transport modules. type TransportFactory interface { // Create creates a transport module. @@ -725,7 +730,7 @@ func (c *NodeHostConfig) GetDeploymentID() uint64 { // GetTargetValidator returns a TargetValidator based on the specified // NodeHostConfig instance. func (c *NodeHostConfig) GetTargetValidator() TargetValidator { - if c.AddressByNodeHostID { + if c.AddressByNodeHostID || c.Expert.NodeRegistryFactory != nil { return id.IsNodeHostID } else if c.Expert.TransportFactory != nil { return c.Expert.TransportFactory.Validate @@ -939,6 +944,9 @@ type ExpertConfig struct { // TestGossipProbeInterval defines the probe interval used by the gossip // service in tests. TestGossipProbeInterval time.Duration + // NodeRegistryFactory defines a custom node registry function that can be used + // instead of a static registry or the built in memberlist gossip mechanism. + NodeRegistryFactory NodeRegistryFactory } // GossipConfig contains configurations for the gossip service. Gossip service diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 272433bcd..661558201 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -38,17 +38,7 @@ type IResolver interface { Add(uint64, uint64, string) } -// INodeRegistry is the local registry interface used to keep all known -// nodes in the system. -type INodeRegistry interface { - Close() error - Add(shardID uint64, replicaID uint64, url string) - Remove(shardID uint64, replicaID uint64) - RemoveShard(shardID uint64) - Resolve(shardID uint64, replicaID uint64) (string, string, error) -} - -var _ INodeRegistry = (*Registry)(nil) +var _ raftio.INodeRegistry = (*Registry)(nil) var _ IResolver = (*Registry)(nil) // Registry is used to manage all known node addresses in the multi raft system. diff --git a/internal/server/environment.go b/internal/server/environment.go index 274156aa1..aaa5dc801 100644 --- a/internal/server/environment.go +++ b/internal/server/environment.go @@ -408,7 +408,7 @@ func (env *Env) check(cfg config.NodeHostConfig, return ErrLogDBType } if !dbto { - if !cfg.AddressByNodeHostID && !se(s.Address, cfg.RaftAddress) { + if !(cfg.AddressByNodeHostID || cfg.Expert.NodeRegistryFactory != nil) && !se(s.Address, cfg.RaftAddress) { return ErrNotOwner } if len(s.Hostname) > 0 && !se(s.Hostname, env.hostname) { diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 2e46a0a27..b98200276 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -204,7 +204,7 @@ func NewTransport(nhConfig config.NodeHostConfig, dir server.SnapshotDirFunc, sysEvents ITransportEvent, fs vfs.IFS) (*Transport, error) { sourceID := nhConfig.RaftAddress - if nhConfig.AddressByNodeHostID { + if nhConfig.AddressByNodeHostID || nhConfig.Expert.NodeRegistryFactory != nil { sourceID = env.NodeHostID() } t := &Transport{ diff --git a/node.go b/node.go index 36f592fca..2464d4207 100644 --- a/node.go +++ b/node.go @@ -26,7 +26,6 @@ import ( "github.com/lni/dragonboat/v4/internal/fileutil" "github.com/lni/dragonboat/v4/internal/logdb" "github.com/lni/dragonboat/v4/internal/raft" - "github.com/lni/dragonboat/v4/internal/registry" "github.com/lni/dragonboat/v4/internal/rsm" "github.com/lni/dragonboat/v4/internal/server" "github.com/lni/dragonboat/v4/internal/settings" @@ -77,7 +76,7 @@ type leaderInfo struct { type node struct { shardInfo atomic.Value leaderInfo atomic.Value - nodeRegistry registry.INodeRegistry + nodeRegistry raftio.INodeRegistry logdb raftio.ILogDB pipeline pipeline getStreamSink func(uint64, uint64) *transport.Sink @@ -146,7 +145,7 @@ func newNode(peers map[uint64]string, getStreamSink func(uint64, uint64) *transport.Sink, handleSnapshotStatus func(uint64, uint64, bool), sendMessage func(pb.Message), - nodeRegistry registry.INodeRegistry, + nodeRegistry raftio.INodeRegistry, pool *sync.Pool, ldb raftio.ILogDB, metrics *logDBMetrics, diff --git a/nodehost.go b/nodehost.go index 9140b34ef..7dd9604af 100644 --- a/nodehost.go +++ b/nodehost.go @@ -279,7 +279,7 @@ type NodeHost struct { sys *sysEventListener } registry INodeHostRegistry - nodes registry.INodeRegistry + nodes raftio.INodeRegistry fs vfs.IFS transport transport.ITransport id *id.UUID @@ -1747,6 +1747,11 @@ func (nh *NodeHost) createNodeRegistry() error { // TODO: // more tests here required if nh.nhConfig.AddressByNodeHostID { + // AddressByNodeHostID should not be set if a Expert.NodeRegistryFactory + // is also set. + if nh.nhConfig.Expert.NodeRegistryFactory != nil { + return errors.New("AddressByNodeHostID and Expert.NodeRegistryFactory should not both be set") + } plog.Infof("AddressByNodeHostID: true, use gossip based node registry") r, err := registry.NewGossipRegistry(nh.ID(), nh.getShardInfo, nh.nhConfig, streamConnections, validator) @@ -1755,6 +1760,13 @@ func (nh *NodeHost) createNodeRegistry() error { } nh.registry = r.GetNodeHostRegistry() nh.nodes = r + } else if nh.nhConfig.Expert.NodeRegistryFactory != nil { + plog.Infof("Expert.NodeRegistryFactory was set: using custom registry") + r, err := nh.nhConfig.Expert.NodeRegistryFactory.Create(nh.ID(), streamConnections, validator) + if err != nil { + return err + } + nh.nodes = r } else { plog.Infof("using regular node registry") nh.nodes = registry.NewNodeRegistry(streamConnections, validator) diff --git a/nodehost_test.go b/nodehost_test.go index 02a55ba50..6345120f0 100644 --- a/nodehost_test.go +++ b/nodehost_test.go @@ -45,6 +45,7 @@ import ( "github.com/lni/dragonboat/v4/internal/id" "github.com/lni/dragonboat/v4/internal/invariants" "github.com/lni/dragonboat/v4/internal/logdb" + "github.com/lni/dragonboat/v4/internal/registry" "github.com/lni/dragonboat/v4/internal/rsm" "github.com/lni/dragonboat/v4/internal/server" "github.com/lni/dragonboat/v4/internal/settings" @@ -1193,6 +1194,145 @@ func TestGossipCanHandleDynamicRaftAddress(t *testing.T) { testProposal() } +type testRegistry struct { + *registry.Registry + // map of nhid -> host:port + nodeAddrs map[string]string +} + +func (tr *testRegistry) Resolve(shardID uint64, replicaID uint64) (string, string, error) { + nhid, ck, err := tr.Registry.Resolve(shardID, replicaID) + if err != nil { + return "", "", err + } + return tr.nodeAddrs[nhid], ck, nil +} + +type testRegistryFactory struct { + // map of nhid -> host:port + nodeAddrs map[string]string +} + +func (trf *testRegistryFactory) Create(nhid string, streamConnections uint64, v config.TargetValidator) (raftio.INodeRegistry, error) { + return &testRegistry{ + registry.NewNodeRegistry(streamConnections, v), + trf.nodeAddrs, + }, nil +} + +func TestExternalNodeRegistryFunction(t *testing.T) { + fs := vfs.GetTestFS() + datadir1 := fs.PathJoin(singleNodeHostTestDir, "nh1") + datadir2 := fs.PathJoin(singleNodeHostTestDir, "nh2") + os.RemoveAll(singleNodeHostTestDir) + defer os.RemoveAll(singleNodeHostTestDir) + addr1 := nodeHostTestAddr1 + addr2 := nodeHostTestAddr2 + nhc1 := config.NodeHostConfig{ + NodeHostDir: datadir1, + RTTMillisecond: getRTTMillisecond(fs, datadir1), + RaftAddress: addr1, + Expert: config.ExpertConfig{ + FS: fs, + }, + } + nhc2 := config.NodeHostConfig{ + NodeHostDir: datadir2, + RTTMillisecond: getRTTMillisecond(fs, datadir2), + RaftAddress: addr2, + Expert: config.ExpertConfig{ + FS: fs, + }, + } + nhid1, err := id.NewUUID(testNodeHostID1) + if err != nil { + t.Fatalf("failed to parse nhid") + } + nhc1.NodeHostID = nhid1.String() + nhid2, err := id.NewUUID(testNodeHostID2) + if err != nil { + t.Fatalf("failed to parse nhid") + } + nhc2.NodeHostID = nhid2.String() + testRegistryFactory := &testRegistryFactory{ + nodeAddrs: map[string]string{ + nhc1.NodeHostID: nodeHostTestAddr1, + nhc2.NodeHostID: nodeHostTestAddr2, + }, + } + nhc1.Expert.NodeRegistryFactory = testRegistryFactory + nhc2.Expert.NodeRegistryFactory = testRegistryFactory + + nh1, err := NewNodeHost(nhc1) + if err != nil { + t.Fatalf("failed to create nh, %v", err) + } + defer nh1.Close() + nh2, err := NewNodeHost(nhc2) + if err != nil { + t.Fatalf("failed to create nh2, %v", err) + } + nh2NodeHostID := nh2.ID() + peers := make(map[uint64]string) + peers[1] = testNodeHostID1 + peers[2] = testNodeHostID2 + createSM := func(uint64, uint64) sm.IStateMachine { + return &PST{} + } + rc := config.Config{ + ShardID: 1, + ReplicaID: 1, + ElectionRTT: 3, + HeartbeatRTT: 1, + SnapshotEntries: 0, + } + if err := nh1.StartReplica(peers, false, createSM, rc); err != nil { + t.Fatalf("failed to start node %v", err) + } + rc.ReplicaID = 2 + if err := nh2.StartReplica(peers, false, createSM, rc); err != nil { + t.Fatalf("failed to start node %v", err) + } + waitForLeaderToBeElected(t, nh1, 1) + waitForLeaderToBeElected(t, nh2, 1) + pto := lpto(nh1) + session := nh1.GetNoOPSession(1) + testProposal := func() { + done := false + for i := 0; i < 100; i++ { + ctx, cancel := context.WithTimeout(context.Background(), pto) + _, err := nh1.SyncPropose(ctx, session, make([]byte, 0)) + cancel() + if err != nil { + time.Sleep(100 * time.Millisecond) + continue + } + done = true + break + } + if !done { + t.Fatalf("failed to make proposal") + } + } + testProposal() + nh2.Close() + nhc2.RaftAddress = nodeHostTestAddr3 + testRegistryFactory.nodeAddrs[nh2NodeHostID] = nodeHostTestAddr3 + nh2, err = NewNodeHost(nhc2) + if err != nil { + t.Fatalf("failed to restart nh2, %v", err) + } + defer nh2.Close() + if nh2.ID() != nh2NodeHostID { + t.Fatalf("NodeHostID changed, got %s, want %s", nh2.ID(), nh2NodeHostID) + } + if err := nh2.StartReplica(peers, false, createSM, rc); err != nil { + t.Fatalf("failed to start node %v", err) + } + waitForLeaderToBeElected(t, nh2, 1) + testProposal() +} + func TestNewNodeHostReturnErrorOnInvalidConfig(t *testing.T) { fs := vfs.GetTestFS() to := &testOption{ diff --git a/raftio/registry.go b/raftio/registry.go new file mode 100644 index 000000000..b42e4bd6c --- /dev/null +++ b/raftio/registry.go @@ -0,0 +1,38 @@ +// Copyright 2017-2021 Lei Ni (nilei81@gmail.com) and other contributors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package raftio contains structs, interfaces and function definitions required +to build custom persistent Raft log storage and transport modules. + +Structs, interfaces and functions defined in the raftio package are only +required when building your custom persistent Raft log storage or transport +modules. Skip this package if you plan to use the default built-in LogDB and +transport modules provided by Dragonboat. + +Structs, interfaces and functions defined in the raftio package are not +considered as a part of Dragonboat's public APIs. Breaking changes might +happen in the coming minor releases. +*/ +package raftio + +// INodeRegistry is the registry interface used to resolve all known +// NodeHosts and their shards and replicas in the system. +type INodeRegistry interface { + Close() error + Add(shardID uint64, replicaID uint64, url string) + Remove(shardID uint64, replicaID uint64) + RemoveShard(shardID uint64) + Resolve(shardID uint64, replicaID uint64) (string, string, error) +}