Skip to content

Commit

Permalink
Merge pull request #2 from tylerwilliams/external_registry_pr
Browse files Browse the repository at this point in the history
External registry pr
  • Loading branch information
tylerwilliams authored Sep 12, 2023
2 parents 345de75 + 5aaf7b4 commit 5f815b6
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 18 deletions.
10 changes: 9 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ type LogDBFactory interface {
Name() string
}

// NodeRegistryFactory is the interface used for providing a custom node registry.
type NodeRegistryFactory interface {
Create(nhid string, streamConnections uint64, v TargetValidator) (raftio.INodeRegistry, error)
}

// TransportFactory is the interface used for creating custom transport modules.
type TransportFactory interface {
// Create creates a transport module.
Expand Down Expand Up @@ -725,7 +730,7 @@ func (c *NodeHostConfig) GetDeploymentID() uint64 {
// GetTargetValidator returns a TargetValidator based on the specified
// NodeHostConfig instance.
func (c *NodeHostConfig) GetTargetValidator() TargetValidator {
if c.AddressByNodeHostID {
if c.AddressByNodeHostID || c.Expert.NodeRegistryFactory != nil {
return id.IsNodeHostID
} else if c.Expert.TransportFactory != nil {
return c.Expert.TransportFactory.Validate
Expand Down Expand Up @@ -939,6 +944,9 @@ type ExpertConfig struct {
// TestGossipProbeInterval defines the probe interval used by the gossip
// service in tests.
TestGossipProbeInterval time.Duration
// NodeRegistryFactory defines a custom node registry function that can be used
// instead of a static registry or the built in memberlist gossip mechanism.
NodeRegistryFactory NodeRegistryFactory
}

// GossipConfig contains configurations for the gossip service. Gossip service
Expand Down
12 changes: 1 addition & 11 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,7 @@ type IResolver interface {
Add(uint64, uint64, string)
}

// INodeRegistry is the local registry interface used to keep all known
// nodes in the system.
type INodeRegistry interface {
Close() error
Add(shardID uint64, replicaID uint64, url string)
Remove(shardID uint64, replicaID uint64)
RemoveShard(shardID uint64)
Resolve(shardID uint64, replicaID uint64) (string, string, error)
}

var _ INodeRegistry = (*Registry)(nil)
var _ raftio.INodeRegistry = (*Registry)(nil)
var _ IResolver = (*Registry)(nil)

// Registry is used to manage all known node addresses in the multi raft system.
Expand Down
2 changes: 1 addition & 1 deletion internal/server/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (env *Env) check(cfg config.NodeHostConfig,
return ErrLogDBType
}
if !dbto {
if !cfg.AddressByNodeHostID && !se(s.Address, cfg.RaftAddress) {
if !(cfg.AddressByNodeHostID || cfg.Expert.NodeRegistryFactory != nil) && !se(s.Address, cfg.RaftAddress) {
return ErrNotOwner
}
if len(s.Hostname) > 0 && !se(s.Hostname, env.hostname) {
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func NewTransport(nhConfig config.NodeHostConfig,
dir server.SnapshotDirFunc, sysEvents ITransportEvent,
fs vfs.IFS) (*Transport, error) {
sourceID := nhConfig.RaftAddress
if nhConfig.AddressByNodeHostID {
if nhConfig.AddressByNodeHostID || nhConfig.Expert.NodeRegistryFactory != nil {
sourceID = env.NodeHostID()
}
t := &Transport{
Expand Down
5 changes: 2 additions & 3 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/lni/dragonboat/v4/internal/fileutil"
"github.com/lni/dragonboat/v4/internal/logdb"
"github.com/lni/dragonboat/v4/internal/raft"
"github.com/lni/dragonboat/v4/internal/registry"
"github.com/lni/dragonboat/v4/internal/rsm"
"github.com/lni/dragonboat/v4/internal/server"
"github.com/lni/dragonboat/v4/internal/settings"
Expand Down Expand Up @@ -77,7 +76,7 @@ type leaderInfo struct {
type node struct {
shardInfo atomic.Value
leaderInfo atomic.Value
nodeRegistry registry.INodeRegistry
nodeRegistry raftio.INodeRegistry
logdb raftio.ILogDB
pipeline pipeline
getStreamSink func(uint64, uint64) *transport.Sink
Expand Down Expand Up @@ -146,7 +145,7 @@ func newNode(peers map[uint64]string,
getStreamSink func(uint64, uint64) *transport.Sink,
handleSnapshotStatus func(uint64, uint64, bool),
sendMessage func(pb.Message),
nodeRegistry registry.INodeRegistry,
nodeRegistry raftio.INodeRegistry,
pool *sync.Pool,
ldb raftio.ILogDB,
metrics *logDBMetrics,
Expand Down
14 changes: 13 additions & 1 deletion nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ type NodeHost struct {
sys *sysEventListener
}
registry INodeHostRegistry
nodes registry.INodeRegistry
nodes raftio.INodeRegistry
fs vfs.IFS
transport transport.ITransport
id *id.UUID
Expand Down Expand Up @@ -1747,6 +1747,11 @@ func (nh *NodeHost) createNodeRegistry() error {
// TODO:
// more tests here required
if nh.nhConfig.AddressByNodeHostID {
// AddressByNodeHostID should not be set if a Expert.NodeRegistryFactory
// is also set.
if nh.nhConfig.Expert.NodeRegistryFactory != nil {
return errors.New("AddressByNodeHostID and Expert.NodeRegistryFactory should not both be set")
}
plog.Infof("AddressByNodeHostID: true, use gossip based node registry")
r, err := registry.NewGossipRegistry(nh.ID(), nh.getShardInfo,
nh.nhConfig, streamConnections, validator)
Expand All @@ -1755,6 +1760,13 @@ func (nh *NodeHost) createNodeRegistry() error {
}
nh.registry = r.GetNodeHostRegistry()
nh.nodes = r
} else if nh.nhConfig.Expert.NodeRegistryFactory != nil {
plog.Infof("Expert.NodeRegistryFactory was set: using custom registry")
r, err := nh.nhConfig.Expert.NodeRegistryFactory.Create(nh.ID(), streamConnections, validator)
if err != nil {
return err
}
nh.nodes = r
} else {
plog.Infof("using regular node registry")
nh.nodes = registry.NewNodeRegistry(streamConnections, validator)
Expand Down
140 changes: 140 additions & 0 deletions nodehost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/lni/dragonboat/v4/internal/id"
"github.com/lni/dragonboat/v4/internal/invariants"
"github.com/lni/dragonboat/v4/internal/logdb"
"github.com/lni/dragonboat/v4/internal/registry"
"github.com/lni/dragonboat/v4/internal/rsm"
"github.com/lni/dragonboat/v4/internal/server"
"github.com/lni/dragonboat/v4/internal/settings"
Expand Down Expand Up @@ -1193,6 +1194,145 @@ func TestGossipCanHandleDynamicRaftAddress(t *testing.T) {
testProposal()
}

type testRegistry struct {
*registry.Registry
// map of nhid -> host:port
nodeAddrs map[string]string
}

func (tr *testRegistry) Resolve(shardID uint64, replicaID uint64) (string, string, error) {
nhid, ck, err := tr.Registry.Resolve(shardID, replicaID)
if err != nil {
return "", "", err
}
return tr.nodeAddrs[nhid], ck, nil
}

type testRegistryFactory struct {
// map of nhid -> host:port
nodeAddrs map[string]string
}

func (trf *testRegistryFactory) Create(nhid string, streamConnections uint64, v config.TargetValidator) (raftio.INodeRegistry, error) {
return &testRegistry{
registry.NewNodeRegistry(streamConnections, v),
trf.nodeAddrs,
}, nil
}

func TestExternalNodeRegistryFunction(t *testing.T) {
fs := vfs.GetTestFS()
datadir1 := fs.PathJoin(singleNodeHostTestDir, "nh1")
datadir2 := fs.PathJoin(singleNodeHostTestDir, "nh2")
os.RemoveAll(singleNodeHostTestDir)
defer os.RemoveAll(singleNodeHostTestDir)
addr1 := nodeHostTestAddr1
addr2 := nodeHostTestAddr2
nhc1 := config.NodeHostConfig{
NodeHostDir: datadir1,
RTTMillisecond: getRTTMillisecond(fs, datadir1),
RaftAddress: addr1,
Expert: config.ExpertConfig{
FS: fs,
},
}
nhc2 := config.NodeHostConfig{
NodeHostDir: datadir2,
RTTMillisecond: getRTTMillisecond(fs, datadir2),
RaftAddress: addr2,
Expert: config.ExpertConfig{
FS: fs,
},
}
nhid1, err := id.NewUUID(testNodeHostID1)
if err != nil {
t.Fatalf("failed to parse nhid")
}
nhc1.NodeHostID = nhid1.String()
nhid2, err := id.NewUUID(testNodeHostID2)
if err != nil {
t.Fatalf("failed to parse nhid")
}
nhc2.NodeHostID = nhid2.String()
testRegistryFactory := &testRegistryFactory{
nodeAddrs: map[string]string{
nhc1.NodeHostID: nodeHostTestAddr1,
nhc2.NodeHostID: nodeHostTestAddr2,
},
}
nhc1.Expert.NodeRegistryFactory = testRegistryFactory
nhc2.Expert.NodeRegistryFactory = testRegistryFactory

nh1, err := NewNodeHost(nhc1)
if err != nil {
t.Fatalf("failed to create nh, %v", err)
}
defer nh1.Close()
nh2, err := NewNodeHost(nhc2)
if err != nil {
t.Fatalf("failed to create nh2, %v", err)
}
nh2NodeHostID := nh2.ID()
peers := make(map[uint64]string)
peers[1] = testNodeHostID1
peers[2] = testNodeHostID2
createSM := func(uint64, uint64) sm.IStateMachine {
return &PST{}
}
rc := config.Config{
ShardID: 1,
ReplicaID: 1,
ElectionRTT: 3,
HeartbeatRTT: 1,
SnapshotEntries: 0,
}
if err := nh1.StartReplica(peers, false, createSM, rc); err != nil {
t.Fatalf("failed to start node %v", err)
}
rc.ReplicaID = 2
if err := nh2.StartReplica(peers, false, createSM, rc); err != nil {
t.Fatalf("failed to start node %v", err)
}
waitForLeaderToBeElected(t, nh1, 1)
waitForLeaderToBeElected(t, nh2, 1)
pto := lpto(nh1)
session := nh1.GetNoOPSession(1)
testProposal := func() {
done := false
for i := 0; i < 100; i++ {
ctx, cancel := context.WithTimeout(context.Background(), pto)
_, err := nh1.SyncPropose(ctx, session, make([]byte, 0))
cancel()
if err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
done = true
break
}
if !done {
t.Fatalf("failed to make proposal")
}
}
testProposal()
nh2.Close()
nhc2.RaftAddress = nodeHostTestAddr3
testRegistryFactory.nodeAddrs[nh2NodeHostID] = nodeHostTestAddr3
nh2, err = NewNodeHost(nhc2)
if err != nil {
t.Fatalf("failed to restart nh2, %v", err)
}
defer nh2.Close()
if nh2.ID() != nh2NodeHostID {
t.Fatalf("NodeHostID changed, got %s, want %s", nh2.ID(), nh2NodeHostID)
}
if err := nh2.StartReplica(peers, false, createSM, rc); err != nil {
t.Fatalf("failed to start node %v", err)
}
waitForLeaderToBeElected(t, nh2, 1)
testProposal()
}

func TestNewNodeHostReturnErrorOnInvalidConfig(t *testing.T) {
fs := vfs.GetTestFS()
to := &testOption{
Expand Down
38 changes: 38 additions & 0 deletions raftio/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2017-2021 Lei Ni ([email protected]) and other contributors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package raftio contains structs, interfaces and function definitions required
to build custom persistent Raft log storage and transport modules.
Structs, interfaces and functions defined in the raftio package are only
required when building your custom persistent Raft log storage or transport
modules. Skip this package if you plan to use the default built-in LogDB and
transport modules provided by Dragonboat.
Structs, interfaces and functions defined in the raftio package are not
considered as a part of Dragonboat's public APIs. Breaking changes might
happen in the coming minor releases.
*/
package raftio

// INodeRegistry is the registry interface used to resolve all known
// NodeHosts and their shards and replicas in the system.
type INodeRegistry interface {
Close() error
Add(shardID uint64, replicaID uint64, url string)
Remove(shardID uint64, replicaID uint64)
RemoveShard(shardID uint64)
Resolve(shardID uint64, replicaID uint64) (string, string, error)
}

0 comments on commit 5f815b6

Please sign in to comment.