From 6ae1d0b2e535c098f701f30e81d1873f20e0c982 Mon Sep 17 00:00:00 2001 From: Lei Ni Date: Mon, 13 Jun 2022 01:22:17 +0800 Subject: [PATCH] dragonboat: minor cleanups --- README.md | 4 +- benchmark_test.go | 2 +- client/session.pb.go | 4 +- client/session_test.go | 2 +- config/config.go | 18 +- docs/devops.md | 6 +- docs/overview.CHS.md | 4 +- docs/test.md | 12 +- engine.go | 46 ++--- engine_test.go | 18 +- event.go | 2 +- internal/logdb/db_test.go | 6 +- internal/logdb/logdb.go | 4 +- internal/logdb/sharded.go | 2 +- internal/raft/peer.go | 2 +- internal/raft/raft.go | 10 +- internal/raft/raft_etcd_paper_test.go | 4 +- internal/raft/raft_etcd_test.go | 14 +- internal/raft/raft_test.go | 2 +- internal/raft/readindex.go | 2 +- internal/registry/gossip_logger.go | 2 +- internal/registry/gossip_test.go | 2 +- internal/registry/nodehost.go | 8 +- internal/registry/registry.go | 4 +- internal/registry/registry_test.go | 2 +- internal/registry/view.go | 26 +-- internal/registry/view_test.go | 26 +-- internal/rsm/managed.go | 4 +- internal/server/partition.go | 6 +- internal/settings/hard.go | 2 +- internal/tan/logdb.go | 2 +- internal/tests/concurrentkv.go | 2 +- internal/tests/kvtest.go | 2 +- internal/transport/chunk_test.go | 2 +- internal/transport/job.go | 2 +- monkey.go | 2 +- node.go | 6 +- node_test.go | 4 +- nodehost.go | 231 +++++++++++++------------- nodehost_test.go | 66 ++++---- queue_test.go | 10 +- raftio/logdb.go | 10 +- request.go | 22 +-- statemachine/extension.go | 2 +- tools/checkdisk/main.go | 14 +- tools/import.go | 50 +++--- 46 files changed, 335 insertions(+), 338 deletions(-) diff --git a/README.md b/README.md index a94e3950d..13dc0711f 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ ## About ## Dragonboat is a high performance multi-group [Raft](https://raft.github.io/) [consensus](https://en.wikipedia.org/wiki/Consensus_(computer_science)) library in pure [Go](https://golang.org/). -Consensus algorithms such as Raft provides fault-tolerance by alllowing a system continue to operate as long as the majority member servers are available. For example, a Raft cluster of 5 servers can make progress even if 2 servers fail. It also appears to clients as a single entity with strong data consistency always provided. All Raft replicas can be used to handle read requests for aggregated read throughput. +Consensus algorithms such as Raft provides fault-tolerance by alllowing a system continue to operate as long as the majority member servers are available. For example, a Raft shard of 5 servers can make progress even if 2 servers fail. It also appears to clients as a single entity with strong data consistency always provided. All Raft replicas can be used to handle read requests for aggregated read throughput. Dragonboat handles all technical difficulties associated with Raft to allow users to just focus on their application domains. It is also very easy to use, our step-by-step [examples](https://github.com/lni/dragonboat-example) can help new users to master it in half an hour. @@ -25,7 +25,7 @@ Dragonboat handles all technical difficulties associated with Raft to allow user * Fully pipelined and TLS mutual authentication support, ready for high latency open environment * Custom Raft log storage and transport support, easy to integrate with latest I/O techs * Prometheus based health metrics support -* Built-in tool to repair Raft clusters that permanently lost the quorum +* Built-in tool to repair Raft shards that permanently lost the quorum * [Extensively tested](/docs/test.md) including using [Jepsen](https://aphyr.com/tags/jepsen)'s [Knossos](https://github.com/jepsen-io/knossos) linearizability checker, some results are [here](https://github.com/lni/knossos-data) All major features covered in Diego Ongaro's [Raft thesis](https://github.com/ongardie/dissertation/blob/master/stanford.pdf) have been supported - diff --git a/benchmark_test.go b/benchmark_test.go index 89f2f8409..9e03b39c9 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -255,7 +255,7 @@ func BenchmarkWorkerReady(b *testing.B) { rc := newWorkReady(1) b.RunParallel(func(pbt *testing.PB) { for pbt.Next() { - rc.clusterReady(1) + rc.shardReady(1) } }) } diff --git a/client/session.pb.go b/client/session.pb.go index 4d08b93d5..5a7c0b672 100644 --- a/client/session.pb.go +++ b/client/session.pb.go @@ -385,11 +385,11 @@ func (m *Session) IsNoOPSession() bool { return m.SeriesID == NoOPSeriesID } -// ShardIDMustMatch asserts that the input cluster id matches the cluster id +// ShardIDMustMatch asserts that the input shard id matches the shard id // of the client session. func (m *Session) ShardIDMustMatch(shardID uint64) { if m.ShardID != shardID { - panic("cluster id do not match") + panic("shard id do not match") } } diff --git a/client/session_test.go b/client/session_test.go index 5080c0089..b82b730e5 100644 --- a/client/session_test.go +++ b/client/session_test.go @@ -26,7 +26,7 @@ func TestNoOPSessionHasExpectedSeriesID(t *testing.T) { t.Errorf("series id unexpected") } if cs.ShardID != 120 { - t.Errorf("cluster id unexpected") + t.Errorf("shard id unexpected") } } diff --git a/config/config.go b/config/config.go index 91e71fcdc..eace1a5ea 100644 --- a/config/config.go +++ b/config/config.go @@ -63,7 +63,7 @@ const ( // Config is used to configure Raft nodes. type Config struct { - // ReplicaID is a non-zero value used to identify a node within a Raft cluster. + // ReplicaID is a non-zero value used to identify a node within a Raft shard. ReplicaID uint64 // ShardID is the unique value used to identify a Raft group that contains // multiple replicas. @@ -172,7 +172,7 @@ type Config struct { DisableAutoCompactions bool // IsNonVoting indicates whether this is a non-voting Raft node. Described as // non-voting members in the section 4.2.1 of Diego Ongaro's thesis, they are - // used to allow a new node to join the cluster and catch up with other + // used to allow a new node to join the shard and catch up with other // existing ndoes without impacting the availability. Extra non-voting nodes // can also be introduced to serve read-only requests. IsNonVoting bool @@ -187,8 +187,8 @@ type Config struct { // // Witness support is currently experimental. IsWitness bool - // Quiesce specifies whether to let the Raft cluster enter quiesce mode when - // there is no cluster activity. Shards in quiesce mode do not exchange + // Quiesce specifies whether to let the Raft shard enter quiesce mode when + // there is no shard activity. Shards in quiesce mode do not exchange // heartbeat messages to minimize bandwidth consumption. // // Quiesce support is currently experimental. @@ -369,11 +369,11 @@ type NodeHostConfig struct { // is unlimited. MaxReceiveQueueSize uint64 // MaxSnapshotSendBytesPerSecond defines how much snapshot data can be sent - // every second for all Raft clusters managed by the NodeHost instance. + // every second for all Raft shards managed by the NodeHost instance. // The default value 0 means there is no limit set for snapshot streaming. MaxSnapshotSendBytesPerSecond uint64 // MaxSnapshotRecvBytesPerSecond defines how much snapshot data can be - // received each second for all Raft clusters managed by the NodeHost instance. + // received each second for all Raft shards managed by the NodeHost instance. // The default value 0 means there is no limit for receiving snapshot data. MaxSnapshotRecvBytesPerSecond uint64 // NotifyCommit specifies whether clients should be notified when their @@ -414,7 +414,7 @@ type NodeHostConfig struct { // When starting Raft nodes or requesting new nodes to be added, use the above // mentioned NodeHostID values as the target parameters (which are of the // Target type). Let's say we want to start a Raft Node as a part of a three - // replicas Raft cluster, the initialMembers parameter of the StartShard + // replicas Raft shard, the initialMembers parameter of the StartShard // method can be set to // // initialMembers := map[uint64]Target { @@ -423,9 +423,9 @@ type NodeHostConfig struct { // 3: "nhid-zzzzz", // } // - // This indicates that node 1 of the cluster will be running on the NodeHost + // This indicates that node 1 of the shard will be running on the NodeHost // instance identified by the NodeHostID value "nhid-xxxxx", node 2 of the - // same cluster will be running on the NodeHost instance identified by the + // same shard will be running on the NodeHost instance identified by the // NodeHostID value of "nhid-yyyyy" and so on. // // The internal gossip service exchanges NodeHost details, including their diff --git a/docs/devops.md b/docs/devops.md index cfee7602d..c4243073b 100644 --- a/docs/devops.md +++ b/docs/devops.md @@ -1,14 +1,14 @@ # DevOps # -This document describes the DevOps requirements for operating Dragonboat based applications in production. Please note that incorrect DevOps operations can potentially corrupt your Raft clusters permanently. +This document describes the DevOps requirements for operating Dragonboat based applications in production. Please note that incorrect DevOps operations can potentially corrupt your Raft shards permanently. * It is recommended to use the ext4 filesystem, other filesystems have never been tested. * It is recommended to use enterprise NVME SSD with high write endurance rating. Must use local hard disks and avoid any NFS, CIFS, Samba, CEPH or other similar shared storage. -* Never try to backup or restore Dragonboat data by directly operating on Dragonboat data files or directories. It can immediately corrupt your Raft clusters. +* Never try to backup or restore Dragonboat data by directly operating on Dragonboat data files or directories. It can immediately corrupt your Raft shards. * Each Raft group has multiple replicas, the best way to safeguard the availability of your services and data is to increase the number of replicas. As an example, the Raft group can tolerant 2 node failures when there are 5 replicas, while it can only tolerant 1 node failure when using 3 replicas. * On node failure, the Raft group will be available when it still has the quorum. To handle such failures, you can add a non-voting node to start replicating data to it, once in sync with other replicas you can promote the non-voting node to a regular node and remove the failed node by using membership change APIs. For those failed nodes caused by intermittent failures such as short term network partition or power loss, you should resolve the network or power issue and try restarting the affected nodes. * On disk failure, such as when experiencing data integrity check errors or write failures, it is important to immediately replace the failed disk and remove the failed node using the above described membership change method. To restart nodes with such disk failures, it is important to have the failed disk replaced first to ensure corrupted data is removed. As a refreshed node with no existing data, that node must be assigned a new RaftAddress value to avoid confusing other nodes. -* When the quorum nodes are gone, you will not be able to resolve it without losing data. The github.com/lni/dragonboat/tools package provides the ImportSnapshot method to import a previously exported snapshot to repair such failed Raft cluster. +* When the quorum nodes are gone, you will not be able to resolve it without losing data. The github.com/lni/dragonboat/tools package provides the ImportSnapshot method to import a previously exported snapshot to repair such failed Raft shard. * By default, the RaftAddress value can not be changed between restarts, otherwise the system will panic with an error message. * When you can't provide a static IP for your nodes, e.g. when IP is dynamically assigned on node restart, you may want to configure a static DNS name for that node and update it on each restart. * When it is not possible to do so, you can choose to set the AddressByNodeHostID field to enable the gossip feature which is designed to handle dynamic RaftAddress. Check godocs for more details on the gossip feature. diff --git a/docs/overview.CHS.md b/docs/overview.CHS.md index 8c8f41aeb..22f63a0dd 100644 --- a/docs/overview.CHS.md +++ b/docs/overview.CHS.md @@ -59,13 +59,13 @@ Leader:Raft协议中定义的扮演Leader角色的节点。每个Raft组应有 使用一个节点前首先需要启动该节点,使得其被NodeHost装载并管理。NodeHost的StartShard, StartConcurrentShard与StartOnDiskShard方法用于启动相应节点。 -当一个Raft cluster的各初始成员首次启动时,用户需要提供该Raft cluster的所有初始成员信息(initial members),且各副本必须以完全相同的初始成员信息启动。该初始成员信息用于确保各副本从一个一致的成员列表开始演进后续用户要求的成员变更。当一个副本并非该Raft cluster的初始成员,而是后续通过成员变更(如SyncRequestAddNode)所新增的节点,其第一次启动时无需提供初始成员信息,只需要将join参数设置为true。 +当一个Raft shard的各初始成员首次启动时,用户需要提供该Raft shard的所有初始成员信息(initial members),且各副本必须以完全相同的初始成员信息启动。该初始成员信息用于确保各副本从一个一致的成员列表开始演进后续用户要求的成员变更。当一个副本并非该Raft shard的初始成员,而是后续通过成员变更(如SyncRequestAddNode)所新增的节点,其第一次启动时无需提供初始成员信息,只需要将join参数设置为true。 当一个节点重启时,不论该节点是一个初始节点还是后续通过成员变更添加的节点,均无需再次提供初始成员信息,也不再需要设置join参数为true。 ## 节点停止 ## -用户可以通过NodeHost的StopShard方法来停止所指定的Raft cluster在该NodeHost管理下的副本。停止后的节点不再响应读写请求,但可以通过上述节点启动方式再次重新启动。 +用户可以通过NodeHost的StopShard方法来停止所指定的Raft shard在该NodeHost管理下的副本。停止后的节点不再响应读写请求,但可以通过上述节点启动方式再次重新启动。 在一个副本被StopShard要求停止后,如果它正在执行快照的创建或恢复,该节点可能不会立刻停止而需等待至快照的创建或恢复完成。为避免这种长期等待,由用户实现的快照创建与恢复方法提供了一个<-chan struct{}的参数,当节点被要求停止后,该<-chan struct{}会被关闭,用户的快照创建与恢复方法可据此选择是否放弃当前的快照创建与恢复,从而快速响应节点停止的请求。 diff --git a/docs/test.md b/docs/test.md index b90671446..853780e99 100644 --- a/docs/test.md +++ b/docs/test.md @@ -11,7 +11,7 @@ ## Monkey Testing ## ### Setup ### * 5 NodeNosts and 3 Drummer servers per process -* hundreds of Raft clusters per process +* hundreds of Raft shards per process * randomly kill and restart NodeHosts and Drummer servers, each NodeHost usually stay online for a few minutes * randomly delete all data owned by a certain NodeHost to emulate permanent disk failure * randomly drop and re-order messages exchanged between NodeHosts @@ -19,18 +19,18 @@ * for selected instances, snapshotting and log compaction happen all the time in the background * committed entries are applied with random delays * snapshots are captured and applied with random delays -* a list of background workers keep writing to/reading from random Raft clusters with stale read checks +* a list of background workers keep writing to/reading from random Raft shards with stale read checks * client activity history files are verified by linearizability checkers such as Jepsen's Knossos * run hundreds of above described processes concurrently on each test server, 30 minutes each iteration, many iterations every night * run concurrently on many servers every night ### Checks ### * no linearizability violation -* no cluster is permanently stuck +* no shard is permanently stuck * state machines must be in sync -* cluster membership must be consistent +* shard membership must be consistent * raft log saved in LogDB must be consistent -* no zombie cluster node +* no zombie shard node ### Results ### Some history files in Jepsen's [Knossos](https://github.com/jepsen-io/knossos) edn format have been made publicly [available](https://github.com/lni/knossos-data). @@ -44,7 +44,7 @@ Some history files in Jepsen's [Knossos](https://github.com/jepsen-io/knossos) e * Ubuntu 16.04 with Spectre and Meltdown patches, ext4 file-system ## Benchmark method ## -* 48 Raft clusters on three NodeHost instances across three servers +* 48 Raft shards on three NodeHost instances across three servers * Each Raft node is backed by a in-memory Key-Value data store as RSM * Mostly update operations in the Key-Value store * All I/O requests are launched from local processes diff --git a/engine.go b/engine.go index 0d0195c8b..b0dff9585 100644 --- a/engine.go +++ b/engine.go @@ -157,7 +157,7 @@ func (wr *workReady) notify(idx uint64) { } } -func (wr *workReady) clusterReadyByUpdates(updates []pb.Update) { +func (wr *workReady) shardReadyByUpdates(updates []pb.Update) { var notified bitmap for _, ud := range updates { if len(ud.CommittedEntries) > 0 { @@ -177,7 +177,7 @@ func (wr *workReady) clusterReadyByUpdates(updates []pb.Update) { } } -func (wr *workReady) clusterReadyByMessageBatch(mb pb.MessageBatch) { +func (wr *workReady) shardReadyByMessageBatch(mb pb.MessageBatch) { var notified bitmap for _, req := range mb.Requests { idx := wr.partitioner.GetPartitionID(req.ShardID) @@ -209,7 +209,7 @@ func (wr *workReady) allShardsReady(nodes []*node) { } } -func (wr *workReady) clusterReady(shardID uint64) { +func (wr *workReady) shardReady(shardID uint64) { idx := wr.partitioner.GetPartitionID(shardID) readyMap := wr.maps[idx] readyMap.setShardReady(shardID) @@ -421,9 +421,9 @@ func (p *workerPool) workerPoolMain() { p.unloadNodes() return } else if chosen == 1 { - clusters := p.saveReady.getReadyMap(1) + shards := p.saveReady.getReadyMap(1) p.loadNodes() - for cid := range clusters { + for cid := range shards { if j, ok := p.getSaveJob(cid); ok { plog.Debugf("%s saveRequested for %d", p.nh.describe(), cid) p.pending = append(p.pending, j) @@ -431,9 +431,9 @@ func (p *workerPool) workerPoolMain() { } } } else if chosen == 2 { - clusters := p.recoverReady.getReadyMap(1) + shards := p.recoverReady.getReadyMap(1) p.loadNodes() - for cid := range clusters { + for cid := range shards { if j, ok := p.getRecoverJob(cid); ok { plog.Debugf("%s recoverRequested for %d", p.nh.describe(), cid) p.pending = append(p.pending, j) @@ -441,9 +441,9 @@ func (p *workerPool) workerPoolMain() { } } } else if chosen == 3 { - clusters := p.streamReady.getReadyMap(1) + shards := p.streamReady.getReadyMap(1) p.loadNodes() - for cid := range clusters { + for cid := range shards { if j, ok := p.getStreamJob(cid); ok { plog.Debugf("%s streamRequested for %d", p.nh.describe(), cid) p.pending = append(p.pending, j) @@ -917,7 +917,7 @@ func (p *closeWorkerPool) completed(workerID uint64) { plog.Panicf("close worker %d is not in busy state", workerID) } if _, ok := p.processing[shardID]; !ok { - plog.Panicf("cluster %d is not being processed", shardID) + plog.Panicf("shard %d is not being processed", shardID) } delete(p.processing, shardID) delete(p.busy, workerID) @@ -1420,7 +1420,7 @@ func (e *engine) setCloseReady(n *node) { } func (e *engine) setStepReadyByMessageBatch(mb pb.MessageBatch) { - e.stepWorkReady.clusterReadyByMessageBatch(mb) + e.stepWorkReady.shardReadyByMessageBatch(mb) } func (e *engine) setAllStepReady(nodes []*node) { @@ -1428,42 +1428,42 @@ func (e *engine) setAllStepReady(nodes []*node) { } func (e *engine) setStepReady(shardID uint64) { - e.stepWorkReady.clusterReady(shardID) + e.stepWorkReady.shardReady(shardID) } func (e *engine) setCommitReadyByUpdates(updates []pb.Update) { - e.commitWorkReady.clusterReadyByUpdates(updates) + e.commitWorkReady.shardReadyByUpdates(updates) } func (e *engine) setCommitReady(shardID uint64) { - e.commitWorkReady.clusterReady(shardID) + e.commitWorkReady.shardReady(shardID) } func (e *engine) setApplyReadyByUpdates(updates []pb.Update) { - e.applyWorkReady.clusterReadyByUpdates(updates) + e.applyWorkReady.shardReadyByUpdates(updates) } func (e *engine) setApplyReady(shardID uint64) { - e.applyWorkReady.clusterReady(shardID) + e.applyWorkReady.shardReady(shardID) } func (e *engine) setStreamReady(shardID uint64) { - e.wp.streamReady.clusterReady(shardID) + e.wp.streamReady.shardReady(shardID) } func (e *engine) setSaveReady(shardID uint64) { - e.wp.saveReady.clusterReady(shardID) + e.wp.saveReady.shardReady(shardID) } func (e *engine) setRecoverReady(shardID uint64) { - e.wp.recoverReady.clusterReady(shardID) + e.wp.recoverReady.shardReady(shardID) } func (e *engine) setCCIReady(shardID uint64) { - e.stepCCIReady.clusterReady(shardID) - e.commitCCIReady.clusterReady(shardID) - e.applyCCIReady.clusterReady(shardID) - e.wp.cciReady.clusterReady(shardID) + e.stepCCIReady.shardReady(shardID) + e.commitCCIReady.shardReady(shardID) + e.applyCCIReady.shardReady(shardID) + e.wp.cciReady.shardReady(shardID) } func (e *engine) offloadNodeMap(nodes map[uint64]*node) { diff --git a/engine_test.go b/engine_test.go index 390296c8b..124102e2e 100644 --- a/engine_test.go +++ b/engine_test.go @@ -89,7 +89,7 @@ func TestAllShardsReady(t *testing.T) { t.Errorf("unexpected map size") } if _, ok := m[i]; !ok { - t.Errorf("cluster not set") + t.Errorf("shard not set") } } nodes = nodes[:0] @@ -104,7 +104,7 @@ func TestAllShardsReady(t *testing.T) { rc := wr.maps[1] m := rc.getReadyShards() if len(m) != 0 { - t.Errorf("cluster map unexpected set") + t.Errorf("shard map unexpected set") } } @@ -121,7 +121,7 @@ func TestWorkCanBeSetAsReady(t *testing.T) { t.Errorf("ready signaled") default: } - wr.clusterReady(0) + wr.shardReady(0) select { case <-wr.waitCh(1): case <-wr.waitCh(2): @@ -133,7 +133,7 @@ func TestWorkCanBeSetAsReady(t *testing.T) { default: t.Errorf("ready not signaled") } - wr.clusterReady(9) + wr.shardReady(9) select { case <-wr.waitCh(1): t.Errorf("ready signaled") @@ -149,9 +149,9 @@ func TestWorkCanBeSetAsReady(t *testing.T) { func TestReturnedReadyMapContainsReadyShardID(t *testing.T) { wr := newWorkReady(4) - wr.clusterReady(0) - wr.clusterReady(4) - wr.clusterReady(129) + wr.shardReady(0) + wr.shardReady(4) + wr.shardReady(129) ready := wr.getReadyMap(1) if len(ready) != 2 { t.Errorf("unexpected ready map size, sz: %d", len(ready)) @@ -159,7 +159,7 @@ func TestReturnedReadyMapContainsReadyShardID(t *testing.T) { _, ok := ready[0] _, ok2 := ready[4] if !ok || !ok2 { - t.Errorf("missing cluster id") + t.Errorf("missing shard id") } ready = wr.getReadyMap(2) if len(ready) != 1 { @@ -167,7 +167,7 @@ func TestReturnedReadyMapContainsReadyShardID(t *testing.T) { } _, ok = ready[129] if !ok { - t.Errorf("missing cluster id") + t.Errorf("missing shard id") } ready = wr.getReadyMap(3) if len(ready) != 0 { diff --git a/event.go b/event.go index 72db27da2..9f7edb4a4 100644 --- a/event.go +++ b/event.go @@ -60,7 +60,7 @@ func newRaftEventListener(shardID uint64, replicaID uint64, queue: queue, } if useMetrics { - label := fmt.Sprintf(`{clusterid="%d",nodeid="%d"}`, shardID, replicaID) + label := fmt.Sprintf(`{shardid="%d",replicaid="%d"}`, shardID, replicaID) name := fmt.Sprintf(`dragonboat_raftnode_campaign_launched_total%s`, label) el.campaignLaunched = metrics.GetOrCreateCounter(name) name = fmt.Sprintf(`dragonboat_raftnode_campaign_skipped_total%s`, label) diff --git a/internal/logdb/db_test.go b/internal/logdb/db_test.go index 6a9d402f7..c2b9b0a0d 100644 --- a/internal/logdb/db_test.go +++ b/internal/logdb/db_test.go @@ -157,7 +157,7 @@ func TestBootstrapInfoCanBeSavedAndChecked(t *testing.T) { t.Errorf("failed to get node info list") } if ni[0].ShardID != 1 || ni[0].ReplicaID != 2 { - t.Errorf("unexpected cluster id/node id, %v", ni[0]) + t.Errorf("unexpected shard id/node id, %v", ni[0]) } if err := db.SaveBootstrapInfo(2, 3, bs); err != nil { t.Errorf("failed to save bootstrap info %v", err) @@ -691,7 +691,7 @@ func TestReadAllEntriesOnlyReturnEntriesFromTheSpecifiedNode(t *testing.T) { if rs.EntryCount != 2 { t.Errorf("ents sz %d, want 2", rs.EntryCount) } - // save the same data but with different cluster id + // save the same data but with different shard id ud.ReplicaID = 4 ud.ShardID = 4 err = db.SaveRaftState([]pb.Update{ud}, 3) @@ -759,7 +759,7 @@ func TestIterateEntriesOnlyReturnCurrentNodeEntries(t *testing.T) { if len(ents) != 3 { t.Errorf("ents sz %d, want 3", len(ents)) } - // save the same data again but under a different cluster id + // save the same data again but under a different shard id ud.ReplicaID = 4 ud.ShardID = 4 err = db.SaveRaftState([]pb.Update{ud}, 3) diff --git a/internal/logdb/logdb.go b/internal/logdb/logdb.go index cd8a878a1..bb14faee3 100644 --- a/internal/logdb/logdb.go +++ b/internal/logdb/logdb.go @@ -41,10 +41,10 @@ type IReusableKey interface { // with the specified entry index. SetEntryKey(shardID uint64, replicaID uint64, index uint64) // SetStateKey sets the key to be an persistent state key suitable - // for the specified Raft cluster node. + // for the specified Raft shard node. SetStateKey(shardID uint64, replicaID uint64) // SetMaxIndexKey sets the key to be the max possible index key for the - // specified Raft cluster node. + // specified Raft shard node. SetMaxIndexKey(shardID uint64, replicaID uint64) // Key returns the underlying byte slice of the key. Key() []byte diff --git a/internal/logdb/sharded.go b/internal/logdb/sharded.go index 0b65d8923..227c544c2 100644 --- a/internal/logdb/sharded.go +++ b/internal/logdb/sharded.go @@ -215,7 +215,7 @@ func (s *ShardedDB) SaveSnapshots(updates []pb.Update) error { } // GetSnapshot returns the most recent snapshot associated with the specified -// cluster. +// shard. func (s *ShardedDB) GetSnapshot(shardID uint64, replicaID uint64) (pb.Snapshot, error) { p := s.partitioner.GetPartitionID(shardID) diff --git a/internal/raft/peer.go b/internal/raft/peer.go index f42fad59a..dd2ff34aa 100644 --- a/internal/raft/peer.go +++ b/internal/raft/peer.go @@ -47,7 +47,7 @@ import ( pb "github.com/lni/dragonboat/v4/raftpb" ) -// PeerAddress is the basic info for a peer in the Raft cluster. +// PeerAddress is the basic info for a peer in the Raft shard. type PeerAddress struct { Address string ReplicaID uint64 diff --git a/internal/raft/raft.go b/internal/raft/raft.go index e4ac58f48..1b8426a22 100644 --- a/internal/raft/raft.go +++ b/internal/raft/raft.go @@ -1334,7 +1334,6 @@ func (r *raft) setWitness(replicaID uint64, match uint64, next uint64) { } } -// // helper methods required for the membership change implementation // // p33-35 of the raft thesis describes a simple membership change protocol which @@ -1352,18 +1351,17 @@ func (r *raft) setWitness(replicaID uint64, match uint64, next uint64) { // avoid the situation that two pending membership change entries are committed // in one go with the same quorum while they actually require different quorums. // consider the following situation - -// for a 3 nodes cluster with existing members X, Y and Z, let's say we first +// for a 3 nodes shard with existing members X, Y and Z, let's say we first // propose a membership change to add a new node A, before A gets committed and // applied, say we propose another membership change to add a new node B. When // B gets committed, A will be committed as well, both will be using the 3 node // membership quorum meaning both entries concerning A and B will become -// committed when any two of the X, Y, Z cluster have them replicated. this thus +// committed when any two of the X, Y, Z shard have them replicated. this thus // violates the safety requirement as B will require 3 out of the 4 nodes (X, // Y, Z, A) to have it replicated before it can be committed. // we use the following pendingConfigChange flag to help tracking whether there // is already a pending membership change entry in the log waiting to be // executed. -// func (r *raft) setPendingConfigChange() { r.pendingConfigChange = true } @@ -1631,7 +1629,7 @@ func (r *raft) canGrantVote(m pb.Message) bool { func (r *raft) handleNodeElection(m pb.Message) error { if !r.isLeader() { // there can be multiple pending membership change entries committed but not - // applied on this node. say with a cluster of X, Y and Z, there are two + // applied on this node. say with a shard of X, Y and Z, there are two // such entries for adding node A and B are committed but not applied // available on X. If X is allowed to start a new election, it can become the // leader with a vote from any one of the node Y or Z. Further proposals made @@ -1850,7 +1848,7 @@ func (r *raft) handleLeaderReadIndex(m pb.Message) error { plog.Errorf("%s dropped ReadIndex, witness node %d", r.describe(), m.From) } else if !r.isSingleNodeQuorum() { if !r.hasCommittedEntryAtCurrentTerm() { - // leader doesn't know the commit value of the cluster + // leader doesn't know the commit value of the shard // see raft thesis section 6.4, this is the first step of the ReadIndex // protocol. plog.Warningf("%s dropped ReadIndex, not ready", r.describe()) diff --git a/internal/raft/raft_etcd_paper_test.go b/internal/raft/raft_etcd_paper_test.go index 9699dc223..df643b277 100644 --- a/internal/raft/raft_etcd_paper_test.go +++ b/internal/raft/raft_etcd_paper_test.go @@ -149,7 +149,7 @@ func TestCandidateStartNewElection(t *testing.T) { // over election timeout, it begins an election to choose a new leader. It // increments its current term and transitions to candidate state. It then // votes for itself and issues RequestVote RPCs in parallel to each of the -// other servers in the cluster. +// other servers in the shard. // Reference: section 5.2 // Also if a candidate fails to obtain a majority, it will time out and // start a new election by incrementing its term and initiating another @@ -749,7 +749,7 @@ func TestLeaderSyncFollowerLog(t *testing.T) { } follower := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage) follower.loadState(pb.State{Term: term - 1}) - // It is necessary to have a three-node cluster. + // It is necessary to have a three-node shard. // The second may have more up-to-date log than the first one, so the // first node needs the vote from the third node to become the leader. n := newNetwork(lead, follower, nopStepper) diff --git a/internal/raft/raft_etcd_test.go b/internal/raft/raft_etcd_test.go index e32ec5bca..f1fd8e5b2 100644 --- a/internal/raft/raft_etcd_test.go +++ b/internal/raft/raft_etcd_test.go @@ -506,7 +506,7 @@ func TestLeaderCycle(t *testing.T) { testLeaderCycle(t) } -// testLeaderCycle verifies that each node in a cluster can campaign +// testLeaderCycle verifies that each node in a shard can campaign // and be elected in turn. This ensures that elections (including // pre-vote) work when not starting from a clean slate (as they do in // TestLeaderElection) @@ -1249,10 +1249,10 @@ func TestStepIgnoreOldTermMsg(t *testing.T) { } // TestHandleMTReplicate ensures: -// 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm. -// 2. If an existing entry conflicts with a new one (same index but different terms), -// delete the existing entry and all that follow it; append any new entries not already in the log. -// 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry). +// 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm. +// 2. If an existing entry conflicts with a new one (same index but different terms), +// delete the existing entry and all that follow it; append any new entries not already in the log. +// 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry). func TestHandleMTReplicate(t *testing.T) { tests := []struct { m pb.Message @@ -2584,7 +2584,7 @@ func TestRemoveNode(t *testing.T) { t.Errorf("nodes = %v, want %v", g, w) } - // remove all nodes from cluster + // remove all nodes from shard ne(r.removeNode(1), t) w = []uint64{} if g := r.nodesSorted(); !reflect.DeepEqual(g, w) { @@ -2667,7 +2667,7 @@ func testCampaignWhileLeader(t *testing.T) { // TestCommitAfterRemoveNode verifies that pending commands can become // committed when a config change reduces the quorum requirements. func TestCommitAfterRemoveNode(t *testing.T) { - // Create a cluster with two nodes. + // Create a shard with two nodes. s := NewTestLogDB() r := newTestRaft(1, []uint64{1, 2}, 5, 1, s) r.becomeCandidate() diff --git a/internal/raft/raft_test.go b/internal/raft/raft_test.go index 212d03a0e..a86a6a448 100644 --- a/internal/raft/raft_test.go +++ b/internal/raft/raft_test.go @@ -238,7 +238,7 @@ func TestRaftHelperMethods(t *testing.T) { v := ReplicaID(100) v2 := ShardID(100) if v != "n00100" || v2 != "c00100" { - t.Errorf("unexpected node id / cluster id value") + t.Errorf("unexpected node id / shard id value") } r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewTestLogDB()) r.becomeFollower(2, 3) diff --git a/internal/raft/readindex.go b/internal/raft/readindex.go index 49bd215fb..64623023c 100644 --- a/internal/raft/readindex.go +++ b/internal/raft/readindex.go @@ -45,7 +45,7 @@ func (r *readIndex) addRequest(index uint64, if _, ok := r.pending[ctx]; ok { return } - // index is the committed value of the cluster, it should never move + // index is the committed value of the shard, it should never move // backward, check it here if len(r.queue) > 0 { p, ok := r.pending[r.peepCtx()] diff --git a/internal/registry/gossip_logger.go b/internal/registry/gossip_logger.go index c0eb0129f..1bf59359f 100644 --- a/internal/registry/gossip_logger.go +++ b/internal/registry/gossip_logger.go @@ -50,7 +50,7 @@ func (l *gossipLogWriter) Write(p []byte) (int, error) { } // newGossipLogWrapper prepare log wrapper for gossip. -// Inspirited by https://github.com/docker/docker-ce/blob/master/components/engine/libnetwork/networkdb/cluster.go#L30 +// Inspirited by https://github.com/docker/docker-ce/blob/master/components/engine/libnetwork/networkdb/shard.go#L30 func newGossipLogWrapper() *log.Logger { return log.New(&gossipLogWriter{ logger: logger.GetLogger("gossip"), diff --git a/internal/registry/gossip_test.go b/internal/registry/gossip_test.go index e88268220..d232223ed 100644 --- a/internal/registry/gossip_test.go +++ b/internal/registry/gossip_test.go @@ -98,7 +98,7 @@ func TestGossipRegistry(t *testing.T) { if addr != nhConfig.RaftAddress { t.Errorf("unexpected addr %s", addr) } - // remove cluster + // remove shard r.RemoveShard(123) if _, _, err = r.Resolve(123, 456); err != ErrUnknownTarget { t.Fatalf("failed to get addr, %v", err) diff --git a/internal/registry/nodehost.go b/internal/registry/nodehost.go index f67dea59b..3b176c314 100644 --- a/internal/registry/nodehost.go +++ b/internal/registry/nodehost.go @@ -20,10 +20,10 @@ type NodeHostRegistry struct { view *view } -// NumOfShards returns the number of clusters known to the current NodeHost +// NumOfShards returns the number of shards known to the current NodeHost // instance. func (r *NodeHostRegistry) NumOfShards() int { - return r.view.clusterCount() + return r.view.shardCount() } // GetMeta returns gossip metadata associated with the specified NodeHost @@ -36,13 +36,13 @@ func (r *NodeHostRegistry) GetMeta(nhID string) ([]byte, bool) { return m.Data, true } -// GetShardInfo returns the cluster info for the specified cluster if it is +// GetShardInfo returns the shard info for the specified shard if it is // available in the gossip view. func (r *NodeHostRegistry) GetShardInfo(shardID uint64) (ShardView, bool) { r.view.mu.Lock() defer r.view.mu.Unlock() - ci, ok := r.view.mu.clusters[shardID] + ci, ok := r.view.mu.shards[shardID] if !ok { return ShardView{}, false } diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 057e82f18..5863d0c8b 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -32,7 +32,7 @@ var ( ErrUnknownTarget = errors.New("target address unknown") ) -// IResolver converts the (cluster id, node id( tuple to network address. +// IResolver converts the (shard id, node id( tuple to network address. type IResolver interface { Resolve(uint64, uint64) (string, string, error) Add(uint64, uint64, string) @@ -98,7 +98,7 @@ func (n *Registry) Remove(shardID uint64, replicaID uint64) { n.addr.Delete(raftio.GetNodeInfo(shardID, replicaID)) } -// RemoveShard removes all nodes info associated with the specified cluster +// RemoveShard removes all nodes info associated with the specified shard func (n *Registry) RemoveShard(shardID uint64) { var toRemove []raftio.NodeInfo n.addr.Range(func(k, v interface{}) bool { diff --git a/internal/registry/registry_test.go b/internal/registry/registry_test.go index 19bc32063..1389f90f1 100644 --- a/internal/registry/registry_test.go +++ b/internal/registry/registry_test.go @@ -74,7 +74,7 @@ func TestRemoveShard(t *testing.T) { nodes.RemoveShard(100) _, _, err := nodes.Resolve(100, 2) if err == nil { - t.Errorf("cluster not removed") + t.Errorf("shard not removed") } _, _, err = nodes.Resolve(200, 2) if err != nil { diff --git a/internal/registry/view.go b/internal/registry/view.go index bc5ba2ce2..01bd6522b 100644 --- a/internal/registry/view.go +++ b/internal/registry/view.go @@ -31,14 +31,14 @@ var ( binaryEnc = binary.BigEndian ) -// ShardInfo is a record for representing the state of a Raft cluster based +// ShardInfo is a record for representing the state of a Raft shard based // on the knowledge of the local NodeHost instance. type ShardInfo struct { // Nodes is a map of member node IDs to their Raft addresses. Nodes map[uint64]string - // ShardID is the cluster ID of the Raft cluster node. + // ShardID is the shard ID of the Raft shard node. ShardID uint64 - // ReplicaID is the node ID of the Raft cluster node. + // ReplicaID is the replica ID of the Raft node. ReplicaID uint64 // ConfigChangeIndex is the current config change index of the Raft node. // ConfigChangeIndex is Raft Log index of the last applied membership @@ -49,7 +49,7 @@ type ShardInfo struct { // IsLeader indicates whether this is a leader node. // Deprecated: Use LeaderID and Term instead. IsLeader bool - // LeaderID is the node ID of the current leader + // LeaderID is the replica ID of the current leader LeaderID uint64 // Term is the term of the current leader Term uint64 @@ -57,13 +57,13 @@ type ShardInfo struct { IsNonVoting bool // IsWitness indicates whether this is a witness node without actual log. IsWitness bool - // Pending is a boolean flag indicating whether details of the cluster node + // Pending is a boolean flag indicating whether details of the shard node // is not available. The Pending flag is set to true usually because the node // has not had anything applied yet. Pending bool } -// ShardView is the view of a cluster from gossip's point of view. +// ShardView is the view of a shard from gossip's point of view. type ShardView struct { ShardID uint64 Nodes map[uint64]string @@ -97,7 +97,7 @@ type view struct { // shardID -> ShardView mu struct { sync.Mutex - clusters map[uint64]ShardView + shards map[uint64]ShardView } } @@ -105,14 +105,14 @@ func newView(deploymentID uint64) *view { v := &view{ deploymentID: deploymentID, } - v.mu.clusters = make(map[uint64]ShardView) + v.mu.shards = make(map[uint64]ShardView) return v } -func (v *view) clusterCount() int { +func (v *view) shardCount() int { v.mu.Lock() defer v.mu.Unlock() - return len(v.mu.clusters) + return len(v.mu.shards) } func mergeShardInfo(current ShardView, update ShardView) ShardView { @@ -136,11 +136,11 @@ func (v *view) update(updates []ShardView) { defer v.mu.Unlock() for _, u := range updates { - current, ok := v.mu.clusters[u.ShardID] + current, ok := v.mu.shards[u.ShardID] if !ok { current = ShardView{ShardID: u.ShardID} } - v.mu.clusters[u.ShardID] = mergeShardInfo(current, u) + v.mu.shards[u.ShardID] = mergeShardInfo(current, u) } } @@ -149,7 +149,7 @@ func (v *view) toShuffledList() []ShardView { func() { v.mu.Lock() defer v.mu.Unlock() - for _, v := range v.mu.clusters { + for _, v := range v.mu.shards { ci = append(ci, v) } }() diff --git a/internal/registry/view_test.go b/internal/registry/view_test.go index 214c2d7b6..3b26184f1 100644 --- a/internal/registry/view_test.go +++ b/internal/registry/view_test.go @@ -71,7 +71,7 @@ func TestConfigChangeIndexIsChecked(t *testing.T) { }, } v.update(update) - c, ok := v.mu.clusters[1340] + c, ok := v.mu.shards[1340] assert.True(t, ok) assert.Equal(t, uint64(126200), c.ConfigChangeIndex) assert.Equal(t, 3, len(c.Nodes)) @@ -87,7 +87,7 @@ func TestConfigChangeIndexIsChecked(t *testing.T) { }, } v.update(update) - c, ok = v.mu.clusters[1340] + c, ok = v.mu.shards[1340] assert.True(t, ok) assert.Equal(t, uint64(226200), c.ConfigChangeIndex) assert.Equal(t, 2, len(c.Nodes)) @@ -101,7 +101,7 @@ func TestDeploymentIDIsChecked(t *testing.T) { v2 := newView(321) v2.updateFrom(data) - assert.Equal(t, 0, len(v2.mu.clusters)) + assert.Equal(t, 0, len(v2.mu.shards)) } func TestGetGossipData(t *testing.T) { @@ -121,7 +121,7 @@ func TestUpdateMembershipView(t *testing.T) { } cv.Nodes[1] = "t1" cv.Nodes[2] = "t2" - v.mu.clusters[123] = cv + v.mu.shards[123] = cv ncv := ShardView{ ShardID: 123, @@ -134,7 +134,7 @@ func TestUpdateMembershipView(t *testing.T) { updates := []ShardView{ncv} v.update(updates) - result, ok := v.mu.clusters[123] + result, ok := v.mu.shards[123] assert.True(t, ok) assert.Equal(t, ncv, result) } @@ -148,7 +148,7 @@ func TestOutOfDateMembershipInfoIsIgnored(t *testing.T) { } cv.Nodes[1] = "t1" cv.Nodes[2] = "t2" - v.mu.clusters[123] = cv + v.mu.shards[123] = cv ncv := ShardView{ ShardID: 123, @@ -161,7 +161,7 @@ func TestOutOfDateMembershipInfoIsIgnored(t *testing.T) { updates := []ShardView{ncv} v.update(updates) - result, ok := v.mu.clusters[123] + result, ok := v.mu.shards[123] assert.True(t, ok) assert.Equal(t, cv, result) } @@ -173,7 +173,7 @@ func TestUpdateLeadershipView(t *testing.T) { LeaderID: 10, Term: 20, } - v.mu.clusters[123] = cv + v.mu.shards[123] = cv ncv := ShardView{ ShardID: 123, @@ -183,7 +183,7 @@ func TestUpdateLeadershipView(t *testing.T) { updates := []ShardView{ncv} v.update(updates) - result, ok := v.mu.clusters[123] + result, ok := v.mu.shards[123] assert.True(t, ok) assert.Equal(t, ncv, result) } @@ -193,7 +193,7 @@ func TestInitialLeaderInfoCanBeRecorded(t *testing.T) { cv := ShardView{ ShardID: 123, } - v.mu.clusters[123] = cv + v.mu.shards[123] = cv ncv := ShardView{ ShardID: 123, @@ -203,7 +203,7 @@ func TestInitialLeaderInfoCanBeRecorded(t *testing.T) { updates := []ShardView{ncv} v.update(updates) - result, ok := v.mu.clusters[123] + result, ok := v.mu.shards[123] assert.True(t, ok) assert.Equal(t, ncv, result) } @@ -215,7 +215,7 @@ func TestUnknownLeaderIsIgnored(t *testing.T) { LeaderID: 10, Term: 20, } - v.mu.clusters[123] = cv + v.mu.shards[123] = cv ncv := ShardView{ ShardID: 123, @@ -225,7 +225,7 @@ func TestUnknownLeaderIsIgnored(t *testing.T) { updates := []ShardView{ncv} v.update(updates) - result, ok := v.mu.clusters[123] + result, ok := v.mu.shards[123] assert.True(t, ok) assert.Equal(t, cv, result) } diff --git a/internal/rsm/managed.go b/internal/rsm/managed.go index 924a8b014..4573bfe0f 100644 --- a/internal/rsm/managed.go +++ b/internal/rsm/managed.go @@ -26,8 +26,8 @@ import ( ) var ( - // ErrShardClosed indicates that the cluster has been closed - ErrShardClosed = errors.New("raft cluster already closed") + // ErrShardClosed indicates that the shard has been closed + ErrShardClosed = errors.New("raft shard already closed") ) // IStreamable is the interface for types that can be snapshot streamed. diff --git a/internal/server/partition.go b/internal/server/partition.go index bdbafc377..251b87c9b 100644 --- a/internal/server/partition.go +++ b/internal/server/partition.go @@ -18,7 +18,7 @@ var ( defaultShardIDMod uint64 = 512 ) -// IPartitioner is the interface for partitioning clusters. +// IPartitioner is the interface for partitioning shards. type IPartitioner interface { GetPartitionID(shardID uint64) uint64 } @@ -34,7 +34,7 @@ func NewFixedPartitioner(capacity uint64) *FixedPartitioner { return &FixedPartitioner{capacity: capacity} } -// GetPartitionID returns the partition ID for the specified raft cluster. +// GetPartitionID returns the partition ID for the specified raft shard. func (p *FixedPartitioner) GetPartitionID(shardID uint64) uint64 { return shardID % p.capacity } @@ -55,7 +55,7 @@ func NewDoubleFixedPartitioner(capacity uint64, } } -// GetPartitionID returns the partition ID for the specified raft cluster. +// GetPartitionID returns the partition ID for the specified raft shard. func (p *DoubleFixedPartitioner) GetPartitionID(shardID uint64) uint64 { return (shardID % p.workerCount) % p.capacity } diff --git a/internal/settings/hard.go b/internal/settings/hard.go index 80839733f..1c6664cde 100644 --- a/internal/settings/hard.go +++ b/internal/settings/hard.go @@ -66,7 +66,7 @@ var Hard = getHardSettings() type hard struct { // LRUMaxSessionCount is the max number of client sessions that can be - // concurrently held and managed by each raft cluster. + // concurrently held and managed by each raft shard. LRUMaxSessionCount uint64 // LogDBEntryBatchSize is the max size of each entry batch. LogDBEntryBatchSize uint64 diff --git a/internal/tan/logdb.go b/internal/tan/logdb.go index 9c0c80dff..8ccc51687 100644 --- a/internal/tan/logdb.go +++ b/internal/tan/logdb.go @@ -108,7 +108,7 @@ func CreateTan(cfg config.NodeHostConfig, cb config.LogDBCallback, } // CreateLogMultiplexedTan creates and returns a tan instance that uses -// multiplexed log files. A multiplexed log allow multiple raft clusters to +// multiplexed log files. A multiplexed log allow multiple raft shards to // share the same underlying physical log file, this is required when you // want to run thousands of raft nodes on the same server without having // thousands action log files. diff --git a/internal/tests/concurrentkv.go b/internal/tests/concurrentkv.go index 1d031ca6e..048f187f8 100644 --- a/internal/tests/concurrentkv.go +++ b/internal/tests/concurrentkv.go @@ -61,7 +61,7 @@ func NewConcurrentKVTest(shardID uint64, replicaID uint64) sm.IConcurrentStateMa ReplicaID: replicaID, } kvdata := &kvdata{junk: make([]byte, 3*1024)} - // write some junk data consistent across the cluster + // write some junk data consistent across the shard for i := 0; i < len(kvdata.junk); i++ { kvdata.junk[i] = 2 } diff --git a/internal/tests/kvtest.go b/internal/tests/kvtest.go index 607dde58e..83bf778e2 100644 --- a/internal/tests/kvtest.go +++ b/internal/tests/kvtest.go @@ -105,7 +105,7 @@ func NewKVTest(shardID uint64, replicaID uint64) sm.IStateMachine { } v := os.Getenv("EXTERNALFILETEST") s.externalFileTest = len(v) > 0 - // write some junk data consistent across the cluster + // write some junk data consistent across the shard for i := 0; i < len(s.Junk); i++ { s.Junk[i] = 2 } diff --git a/internal/transport/chunk_test.go b/internal/transport/chunk_test.go index 388f3f92b..faa1d808c 100644 --- a/internal/transport/chunk_test.go +++ b/internal/transport/chunk_test.go @@ -618,7 +618,7 @@ func TestSnapshotRecordWithoutExternalFilesCanBeSplitIntoChunk(t *testing.T) { t.Errorf("bin ver not set") } if c.ShardID != msg.ShardID { - t.Errorf("unexpected cluster id") + t.Errorf("unexpected shard id") } if c.ReplicaID != msg.To { t.Errorf("unexpected node id") diff --git a/internal/transport/job.go b/internal/transport/job.go index 6b3275cb1..290ca0dd2 100644 --- a/internal/transport/job.go +++ b/internal/transport/job.go @@ -55,7 +55,7 @@ func (s *Sink) Close() error { return nil } -// ShardID returns the cluster ID of the source node. +// ShardID returns the shard ID of the source node. func (s *Sink) ShardID() uint64 { return s.j.shardID } diff --git a/monkey.go b/monkey.go index 021680703..9c93b225a 100644 --- a/monkey.go +++ b/monkey.go @@ -49,7 +49,7 @@ func GetTestFS() config.IFS { func (nh *NodeHost) Shards() []*node { result := make([]*node, 0) nh.mu.RLock() - nh.mu.clusters.Range(func(k, v interface{}) bool { + nh.mu.shards.Range(func(k, v interface{}) bool { result = append(result, v.(*node)) return true }) diff --git a/node.go b/node.go index 376d22c06..1ce06e718 100644 --- a/node.go +++ b/node.go @@ -75,7 +75,7 @@ type leaderInfo struct { } type node struct { - clusterInfo atomic.Value + shardInfo atomic.Value leaderInfo atomic.Value nodeRegistry registry.INodeRegistry logdb raftio.ILogDB @@ -1593,7 +1593,7 @@ func (n *node) notifyConfigChange() { ConfigChangeIndex: m.ConfigChangeId, Nodes: m.Addresses, } - n.clusterInfo.Store(ci) + n.shardInfo.Store(ci) n.sysEvents.Publish(server.SystemEvent{ Type: server.MembershipChanged, ShardID: n.shardID, @@ -1602,7 +1602,7 @@ func (n *node) notifyConfigChange() { } func (n *node) getShardInfo() ShardInfo { - v := n.clusterInfo.Load() + v := n.shardInfo.Load() if v == nil { return ShardInfo{ ShardID: n.shardID, diff --git a/node_test.go b/node_test.go index 1f8977f07..32295b073 100644 --- a/node_test.go +++ b/node_test.go @@ -121,7 +121,7 @@ func (r *testRouter) shouldDrop(msg pb.Message) bool { func (r *testRouter) send(msg pb.Message) { if msg.ShardID != r.shardID { - panic("cluster id does not match") + panic("shard id does not match") } if r.shouldDrop(msg) { return @@ -134,7 +134,7 @@ func (r *testRouter) send(msg pb.Message) { func (r *testRouter) getQ(shardID uint64, replicaID uint64) *server.MessageQueue { if shardID != r.shardID { - panic("cluster id does not match") + panic("shard id does not match") } q, ok := r.qm[replicaID] if !ok { diff --git a/nodehost.go b/nodehost.go index f84e56842..f898968ef 100644 --- a/nodehost.go +++ b/nodehost.go @@ -19,18 +19,18 @@ implementation for providing consensus in distributed systems. The NodeHost struct is the facade interface for all features provided by the dragonboat package. Each NodeHost instance usually runs on a separate server managing CPU, storage and network resources used for achieving consensus. Each -NodeHost manages Raft nodes from different Raft groups known as Raft clusters. -Each Raft cluster is identified by its ShardID, it usually consists of +NodeHost manages Raft nodes from different Raft groups known as Raft shards. +Each Raft shard is identified by its ShardID, it usually consists of multiple nodes (also known as replicas) each identified by a ReplicaID value. -Nodes from the same Raft cluster suppose to be distributed on different NodeHost +Nodes from the same Raft shard suppose to be distributed on different NodeHost instances across the network, this brings fault tolerance for machine and -network failures as application data stored in the Raft cluster will be +network failures as application data stored in the Raft shard will be available as long as the majority of its managing NodeHost instances (i.e. its underlying servers) are accessible. -Arbitrary number of Raft clusters can be launched across the network to +Arbitrary number of Raft shards can be launched across the network to aggregate distributed processing and storage capacities. Users can also make -membership change requests to add or remove nodes from selected Raft cluster. +membership change requests to add or remove nodes from selected Raft shard. User applications can leverage the power of the Raft protocol by implementing the IStateMachine or IOnDiskStateMachine component, as defined in @@ -41,7 +41,7 @@ itself. Dragonboat guarantees the linearizability of your I/O when interacting with the IStateMachine or IOnDiskStateMachine instances. In plain English, writes (via -making proposals) to your Raft cluster appears to be instantaneous, once a write +making proposals) to your Raft shard appears to be instantaneous, once a write is completed, all later reads (via linearizable read based on Raft's ReadIndex protocol) should return the value of that write or a later write. Once a value is returned by a linearizable read, all later reads should return the same value @@ -107,24 +107,24 @@ var ( ErrClosed = errors.New("dragonboat: closed") // ErrReplicaRemoved indictes that the requested node has been removed. ErrReplicaRemoved = errors.New("node removed") - // ErrShardNotFound indicates that the specified cluster is not found. - ErrShardNotFound = errors.New("cluster not found") - // ErrShardAlreadyExist indicates that the specified cluster already exist. - ErrShardAlreadyExist = errors.New("cluster already exist") - // ErrShardNotStopped indicates that the specified cluster is still running + // ErrShardNotFound indicates that the specified shard is not found. + ErrShardNotFound = errors.New("shard not found") + // ErrShardAlreadyExist indicates that the specified shard already exist. + ErrShardAlreadyExist = errors.New("shard already exist") + // ErrShardNotStopped indicates that the specified shard is still running // and thus prevented the requested operation to be completed. - ErrShardNotStopped = errors.New("cluster not stopped") - // ErrInvalidShardSettings indicates that cluster settings specified for + ErrShardNotStopped = errors.New("shard not stopped") + // ErrInvalidShardSettings indicates that shard settings specified for // the StartShard method are invalid. - ErrInvalidShardSettings = errors.New("cluster settings are invalid") - // ErrShardNotBootstrapped indicates that the specified cluster has not + ErrInvalidShardSettings = errors.New("shard settings are invalid") + // ErrShardNotBootstrapped indicates that the specified shard has not // been boostrapped yet. When starting this node, depending on whether this - // node is an initial member of the Raft cluster, you must either specify + // node is an initial member of the Raft shard, you must either specify // all of its initial members or set the join flag to true. // When used correctly, dragonboat only returns this error in the rare // situation when you try to restart a node crashed during its previous // bootstrap attempt. - ErrShardNotBootstrapped = errors.New("cluster not bootstrapped") + ErrShardNotBootstrapped = errors.New("shard not bootstrapped") // ErrDeadlineNotSet indicates that the context parameter provided does not // carry a deadline. ErrDeadlineNotSet = errors.New("deadline not set") @@ -139,11 +139,11 @@ var ( ErrInvalidRange = errors.New("invalid log range") ) -// ShardInfo is a record for representing the state of a Raft cluster based +// ShardInfo is a record for representing the state of a Raft shard based // on the knowledge of the local NodeHost instance. type ShardInfo = registry.ShardInfo -// ShardView is a record for representing the state of a Raft cluster based +// ShardView is a record for representing the state of a Raft shard based // on the knowledge of distributed NodeHost instances as shared by gossip. type ShardView = registry.ShardView @@ -162,7 +162,7 @@ type GossipInfo struct { } // NodeHostInfo provides info about the NodeHost, including its managed Raft -// cluster nodes and available Raft logs saved in its local persistent storage. +// shard nodes and available Raft logs saved in its local persistent storage. type NodeHostInfo struct { // NodeHostID is the unique identifier of the NodeHost instance. NodeHostID string @@ -171,7 +171,7 @@ type NodeHostInfo struct { RaftAddress string // Gossip contains gossip service related information. Gossip GossipInfo - // ShardInfo is a list of all Raft clusters managed by the NodeHost + // ShardInfo is a list of all Raft shards managed by the NodeHost ShardInfoList []ShardInfo // LogInfo is a list of raftio.NodeInfo values representing all Raft logs // stored on the NodeHost. @@ -208,7 +208,7 @@ type SnapshotOption struct { CompactionIndex uint64 // Exported is a boolean flag indicating whether to export the requested // snapshot. For an exported snapshot, users are responsible for managing the - // snapshot files. An exported snapshot is usually used to repair the cluster + // snapshot files. An exported snapshot is usually used to repair the shard // when it permanently loses its majority quorum. See the ImportSnapshot method // in the tools package for more details. Exported bool @@ -261,17 +261,17 @@ var DefaultSnapshotOption SnapshotOption // NodeHostConfig.AddressByNodeHostID is set. type Target = string -// NodeHost manages Raft clusters and enables them to share resources such as +// NodeHost manages Raft shards and enables them to share resources such as // transport and persistent storage etc. NodeHost is also the central thread // safe access point for accessing Dragonboat functionalities. type NodeHost struct { mu struct { sync.RWMutex - cci uint64 - cciCh chan struct{} - clusters sync.Map - lm sync.Map - logdb raftio.ILogDB + cci uint64 + cciCh chan struct{} + shards sync.Map + lm sync.Map + logdb raftio.ILogDB } events struct { leaderInfoQ *leaderInfoQueue @@ -401,7 +401,7 @@ func (nh *NodeHost) Close() { }) for _, node := range nodes { if err := nh.stopNode(node.ShardID, node.ReplicaID, true); err != nil { - plog.Errorf("failed to remove cluster %s", + plog.Errorf("failed to remove shard %s", logutil.ShardID(node.ShardID)) } } @@ -470,33 +470,32 @@ func (nh *NodeHost) GetNodeHostRegistry() (INodeHostRegistry, bool) { // sm.IStateMachine interface. // // The input parameter initialMembers is a map of node ID to node target for all -// Raft cluster's initial member nodes. By default, the target is the +// Raft shard's initial member nodes. By default, the target is the // RaftAddress value of the NodeHost where the node will be running. When running // in the AddressByNodeHostID mode, target should be set to the NodeHostID value // of the NodeHost where the node will be running. See the godoc of NodeHost's ID -// method for the full definition of NodeHostID. For the same Raft cluster, the +// method for the full definition of NodeHostID. For the same Raft shard, the // same initialMembers map should be specified when starting its initial member // nodes on distributed NodeHost instances. // // The join flag indicates whether the node is a new node joining an existing -// cluster. create is a factory function for creating the IStateMachine instance, +// shard. create is a factory function for creating the IStateMachine instance, // cfg is the configuration instance that will be passed to the underlying Raft -// node object, the cluster ID and node ID of the involved node are specified in +// node object, the shard ID and replica ID of the involved node are specified in // the ShardID and ReplicaID fields of the provided cfg parameter. // // Note that this method is not for changing the membership of the specified -// Raft cluster, it launches a node that is already a member of the Raft -// cluster. +// Raft shard, it launches a node that is already a member of the Raft shard. // // As a summary, when - -// - starting a brand new Raft cluster, set join to false and specify all initial -// member node details in the initialMembers map. -// - joining a new node to an existing Raft cluster, set join to true and leave -// the initialMembers map empty. This requires the joining node to have already -// been added as a member node of the Raft cluster. -// - restarting an crashed or stopped node, set join to false and leave the -// initialMembers map to be empty. This applies to both initial member nodes -// and those joined later. +// - starting a brand new Raft shard, set join to false and specify all initial +// member node details in the initialMembers map. +// - joining a new node to an existing Raft shard, set join to true and leave +// the initialMembers map empty. This requires the joining node to have already +// been added as a member node of the Raft shard. +// - restarting an crashed or stopped node, set join to false and leave the +// initialMembers map to be empty. This applies to both initial member nodes +// and those joined later. func (nh *NodeHost) StartReplica(initialMembers map[uint64]Target, join bool, create sm.CreateStateMachineFunc, cfg config.Config) error { cf := func(shardID uint64, replicaID uint64, @@ -537,7 +536,7 @@ func (nh *NodeHost) StartOnDiskReplica(initialMembers map[uint64]Target, // shard. // // Note that this is not the membership change operation required to remove the -// node from the Raft cluster. +// node from the Raft shard. func (nh *NodeHost) StopShard(shardID uint64) error { if atomic.LoadInt32(&nh.closed) != 0 { return ErrClosed @@ -548,7 +547,7 @@ func (nh *NodeHost) StopShard(shardID uint64) error { // StopReplica stops the specified Raft replica. // // Note that this is not the membership change operation required to remove the -// node from the Raft cluster. +// node from the Raft shard. func (nh *NodeHost) StopReplica(shardID uint64, replicaID uint64) error { if atomic.LoadInt32(&nh.closed) != 0 { return ErrClosed @@ -556,7 +555,7 @@ func (nh *NodeHost) StopReplica(shardID uint64, replicaID uint64) error { return nh.stopNode(shardID, replicaID, true) } -// SyncPropose makes a synchronous proposal on the Raft cluster specified by +// SyncPropose makes a synchronous proposal on the Raft shard specified by // the input client session object. The specified context parameter must has // the timeout value set. // @@ -593,7 +592,7 @@ func (nh *NodeHost) SyncPropose(ctx context.Context, } // SyncRead performs a synchronous linearizable read on the specified Raft -// cluster. The specified context parameter must has the timeout value set. The +// shard. The specified context parameter must has the timeout value set. The // query interface{} specifies what to query, it will be passed to the Lookup // method of the IStateMachine or IOnDiskStateMachine after the system // determines that it is safe to perform the local read. It returns the query @@ -628,7 +627,7 @@ func (nh *NodeHost) GetLogReader(shardID uint64) (ReadonlyLogReader, error) { return n.logReader, nil } -// Membership is the struct used to describe Raft cluster membership. +// Membership is the struct used to describe Raft shard membership. type Membership struct { // ConfigChangeID is the Raft entry index of the last applied membership // change entry. @@ -637,18 +636,18 @@ type Membership struct { // Raft nodes. Nodes map[uint64]string // NonVotings is a map of ReplicaID values to NodeHost Raft addresses for all - // nonVotings in the Raft cluster. + // nonVotings in the Raft shard. NonVotings map[uint64]string // Witnesses is a map of ReplicaID values to NodeHost Raft addrsses for all - // witnesses in the Raft cluster. + // witnesses in the Raft shard. Witnesses map[uint64]string // Removed is a set of ReplicaID values that have been removed from the Raft - // cluster. They are not allowed to be added back to the cluster. + // shard. They are not allowed to be added back to the shard. Removed map[uint64]struct{} } // SyncGetShardMembership is a rsynchronous method that queries the membership -// information from the specified Raft cluster. The specified context parameter +// information from the specified Raft shard. The specified context parameter // must has the timeout value set. func (nh *NodeHost) SyncGetShardMembership(ctx context.Context, shardID uint64) (*Membership, error) { @@ -676,7 +675,7 @@ func (nh *NodeHost) SyncGetShardMembership(ctx context.Context, return v.(*Membership), nil } -// GetLeaderID returns the leader node ID of the specified Raft cluster based +// GetLeaderID returns the leader node ID of the specified Raft shard based // on local node's knowledge. The returned boolean value indicates whether the // leader information is available. func (nh *NodeHost) GetLeaderID(shardID uint64) (uint64, bool, error) { @@ -711,7 +710,7 @@ func (nh *NodeHost) GetNoOPSession(shardID uint64) *client.Session { } // SyncGetSession starts a synchronous proposal to create, register and return -// a new client session object for the specified Raft cluster. The specified +// a new client session object for the specified Raft shard. The specified // context parameter must has the timeout value set. // // A client session object is used to ensure that a retried proposal, e.g. @@ -773,7 +772,7 @@ func (nh *NodeHost) SyncCloseSession(ctx context.Context, } // QueryRaftLog starts an asynchronous query for raft logs in the specified -// range [firstIndex, lastIndex) on the given raft cluster. The returned +// range [firstIndex, lastIndex) on the given Raft shard. The returned // raft log entries are limited to maxSize in bytes. // // This method returns a RequestState instance or an error immediately. User @@ -784,7 +783,7 @@ func (nh *NodeHost) QueryRaftLog(shardID uint64, firstIndex uint64, return nh.queryRaftLog(shardID, firstIndex, lastIndex, maxSize) } -// Propose starts an asynchronous proposal on the Raft cluster specified by the +// Propose starts an asynchronous proposal on the Raft shard specified by the // Session object. The input byte slice can be reused for other purposes // immediate after the return of this method. // @@ -808,7 +807,7 @@ func (nh *NodeHost) Propose(session *client.Session, cmd []byte, return nh.propose(session, cmd, timeout) } -// ProposeSession starts an asynchronous proposal on the specified cluster +// ProposeSession starts an asynchronous proposal on the specified shard // for client session related operations. Depending on the state of the specified // client session object, the supported operations are for registering or // unregistering a client session. Application can select on the ResultC() @@ -832,7 +831,7 @@ func (nh *NodeHost) ProposeSession(session *client.Session, } // ReadIndex starts the asynchronous ReadIndex protocol used for linearizable -// read on the specified cluster. This method returns a RequestState instance +// read on the specified shard. This method returns a RequestState instance // or an error immediately. Application should wait on the ResultC() channel // of the returned RequestState object to get notified on the outcome of the // ReadIndex operation. On a successful completion, the ReadLocalNode method @@ -939,7 +938,7 @@ func (nh *NodeHost) SyncRequestSnapshot(ctx context.Context, } // RequestSnapshot requests a snapshot to be created asynchronously for the -// specified cluster node. For each node, only one ongoing snapshot operation +// specified shard node. For each node, only one ongoing snapshot operation // is allowed. // // Each requested snapshot will also trigger Raft log and snapshot compactions @@ -1089,21 +1088,21 @@ func (nh *NodeHost) SyncRequestAddWitness(ctx context.Context, return err } -// RequestDeleteReplica is a Raft cluster membership change method for requesting -// the specified node to be removed from the specified Raft cluster. It starts -// an asynchronous request to remove the node from the Raft cluster membership +// RequestDeleteReplica is a Raft shard membership change method for requesting +// the specified node to be removed from the specified Raft shard. It starts +// an asynchronous request to remove the node from the Raft shard membership // list. Application can wait on the ResultC() channel of the returned // RequestState instance to get notified for the outcome. // // It is not guaranteed that deleted node will automatically close itself and // be removed from its managing NodeHost instance. It is application's // responsibility to call StopShard on the right NodeHost instance to actually -// have the cluster node removed from its managing NodeHost instance. +// have the shard node removed from its managing NodeHost instance. // -// Once a node is successfully deleted from a Raft cluster, it will not be -// allowed to be added back to the cluster with the same node identity. +// Once a node is successfully deleted from a Raft shard, it will not be +// allowed to be added back to the shard with the same node identity. // -// When the raft cluster is created with the OrderedConfigChange config flag +// When the Raft shard is created with the OrderedConfigChange config flag // set as false, the configChangeIndex parameter is ignored. Otherwise, it // should be set to the most recent Config Change Index value returned by the // SyncGetShardMembership method. The requested delete node operation will be @@ -1124,21 +1123,21 @@ func (nh *NodeHost) RequestDeleteReplica(shardID uint64, return n.requestDeleteNodeWithOrderID(replicaID, configChangeIndex, tt) } -// RequestAddReplica is a Raft cluster membership change method for requesting the -// specified node to be added to the specified Raft cluster. It starts an -// asynchronous request to add the node to the Raft cluster membership list. +// RequestAddReplica is a Raft shard membership change method for requesting the +// specified node to be added to the specified Raft shard. It starts an +// asynchronous request to add the node to the Raft shard membership list. // Application can wait on the ResultC() channel of the returned RequestState // instance to get notified for the outcome. // -// If there is already an nonVoting with the same replicaID in the cluster, it will +// If there is already an nonVoting with the same replicaID in the shard, it will // be promoted to a regular node with voting power. The target parameter of the // RequestAddReplica call is ignored when promoting an nonVoting to a regular node. // -// After the node is successfully added to the Raft cluster, it is application's +// After the node is successfully added to the Raft shard, it is application's // responsibility to call StartShard on the target NodeHost instance to -// actually start the Raft cluster node. +// actually start the Raft shard node. // -// Requesting a removed node back to the Raft cluster will always be rejected. +// Requesting a removed node back to the Raft shard will always be rejected. // // By default, the target parameter is the RaftAddress of the NodeHost instance // where the new Raft node will be running. Note that fixed IP or static DNS @@ -1146,7 +1145,7 @@ func (nh *NodeHost) RequestDeleteReplica(shardID uint64, // AddressByNodeHostID mode, target should be set to NodeHost's ID value which // can be obtained by calling the ID() method. // -// When the Raft cluster is created with the OrderedConfigChange config flag +// When the Raft shard is created with the OrderedConfigChange config flag // set as false, the configChangeIndex parameter is ignored. Otherwise, it // should be set to the most recent Config Change Index value returned by the // SyncGetShardMembership method. The requested add node operation will be @@ -1167,8 +1166,8 @@ func (nh *NodeHost) RequestAddReplica(shardID uint64, target, configChangeIndex, nh.getTimeoutTick(timeout)) } -// RequestAddNonVoting is a Raft cluster membership change method for requesting -// the specified node to be added to the specified Raft cluster as an non-voting +// RequestAddNonVoting is a Raft shard membership change method for requesting +// the specified node to be added to the specified Raft shard as an non-voting // member without voting power. It starts an asynchronous request to add the // specified node as an non-voting member. // @@ -1176,7 +1175,7 @@ func (nh *NodeHost) RequestAddReplica(shardID uint64, // it is neither allowed to vote for leader, nor considered as a part of the // quorum when replicating state. An nonVoting can be promoted to a regular node // with voting power by making a RequestAddReplica call using its shardID and -// replicaID values. An nonVoting can be removed from the cluster by calling +// replicaID values. An nonVoting can be removed from the shard by calling // RequestDeleteReplica with its shardID and replicaID values. // // Application should later call StartShard with config.Config.IsNonVoting @@ -1199,13 +1198,13 @@ func (nh *NodeHost) RequestAddNonVoting(shardID uint64, target, configChangeIndex, nh.getTimeoutTick(timeout)) } -// RequestAddWitness is a Raft cluster membership change method for requesting -// the specified node to be added as a witness to the given Raft cluster. It +// RequestAddWitness is a Raft shard membership change method for requesting +// the specified node to be added as a witness to the given Raft shard. It // starts an asynchronous request to add the specified node as an witness. // // A witness can vote in elections but it doesn't have any Raft log or // application state machine associated. The witness node can not be used -// to initiate read, write or membership change operations on its Raft cluster. +// to initiate read, write or membership change operations on its Raft shard. // Section 11.7.2 of Diego Ongaro's thesis contains more info on such witness // role. // @@ -1230,7 +1229,7 @@ func (nh *NodeHost) RequestAddWitness(shardID uint64, } // RequestLeaderTransfer makes a request to transfer the leadership of the -// specified Raft cluster to the target node identified by targetReplicaID. It +// specified Raft shard to the target node identified by targetReplicaID. It // returns an error if the request fails to be started. There is no guarantee // that such request can be fulfilled. func (nh *NodeHost) RequestLeaderTransfer(shardID uint64, @@ -1242,7 +1241,7 @@ func (nh *NodeHost) RequestLeaderTransfer(shardID uint64, if !ok { return ErrShardNotFound } - plog.Debugf("RequestLeaderTransfer called on cluster %d target nodeid %d", + plog.Debugf("RequestLeaderTransfer called on shard %d target replicaID %d", shardID, targetReplicaID) defer nh.engine.setStepReady(shardID) return n.requestLeaderTransfer(targetReplicaID) @@ -1253,7 +1252,7 @@ func (nh *NodeHost) RequestLeaderTransfer(shardID uint64, // is cancelled or timeout. // // Similar to RemoveData, calling SyncRemoveData on a node that is still a Raft -// cluster member will corrupt the Raft cluster. +// shard member will corrupt the Raft shard. func (nh *NodeHost) SyncRemoveData(ctx context.Context, shardID uint64, replicaID uint64) error { if atomic.LoadInt32(&nh.closed) != 0 { @@ -1285,8 +1284,8 @@ func (nh *NodeHost) SyncRemoveData(ctx context.Context, // RemoveData tries to remove all data associated with the specified node. This // method should only be used after the node has been deleted from its Raft -// cluster. Calling RemoveData on a node that is still a Raft cluster member -// will corrupt the Raft cluster. +// shard. Calling RemoveData on a node that is still a Raft shard member +// will corrupt the Raft shard. // // RemoveData returns ErrShardNotStopped when the specified node has not been // fully offloaded from the NodeHost instance. @@ -1352,7 +1351,7 @@ func (nh *NodeHost) HasNodeInfo(shardID uint64, replicaID uint64) bool { } // GetNodeHostInfo returns a NodeHostInfo instance that contains all details -// of the NodeHost, this includes details of all Raft clusters managed by the +// of the NodeHost, this includes details of all Raft shards managed by the // the NodeHost instance. func (nh *NodeHost) GetNodeHostInfo(opt NodeHostInfoOption) *NodeHostInfo { nhi := &NodeHostInfo{ @@ -1456,7 +1455,7 @@ func (nh *NodeHost) linearizableRead(ctx context.Context, } func (nh *NodeHost) getShard(shardID uint64) (*node, bool) { - n, ok := nh.mu.clusters.Load(shardID) + n, ok := nh.mu.shards.Load(shardID) if !ok { return nil, false } @@ -1466,7 +1465,7 @@ func (nh *NodeHost) getShard(shardID uint64) (*node, bool) { func (nh *NodeHost) forEachShard(f func(uint64, *node) bool) uint64 { nh.mu.RLock() defer nh.mu.RUnlock() - nh.mu.clusters.Range(func(k, v interface{}) bool { + nh.mu.shards.Range(func(k, v interface{}) bool { return f(k.(uint64), v.(*node)) }) return nh.mu.cci @@ -1478,19 +1477,19 @@ func (nh *NodeHost) getShardSetIndex() uint64 { return nh.mu.cci } -// there are three major reasons to bootstrap the cluster +// there are three major reasons to bootstrap the shard - // -// 1. when possible, we check whether user incorrectly specified parameters -// for the startShard method, e.g. call startShard with join=true first, -// then restart the NodeHost instance and call startShard again with -// join=false and len(nodes) > 0 -// 2. when restarting a node which is a part of the initial cluster members, -// for user convenience, we allow the caller not to provide the details of -// initial members. when the initial cluster member info is required, however -// we still need to get the initial member info from somewhere. bootstrap is -// the procedure that records such info. -// 3. the bootstrap record is used as a marker record in our default LogDB -// implementation to indicate that a certain node exists here +// 1. when possible, we check whether user incorrectly specified parameters +// for the startShard method, e.g. call startShard with join=true first, +// then restart the NodeHost instance and call startShard again with +// join=false and len(nodes) > 0 +// 2. when restarting a node which is a part of the initial shard members, +// for user convenience, we allow the caller not to provide the details of +// initial members. when the initial shard member info is required, however +// we still need to get the initial member info from somewhere. bootstrap is +// the procedure that records such info. +// 3. the bootstrap record is used as a marker record in our default LogDB +// implementation to indicate that a certain node exists here func (nh *NodeHost) bootstrapShard(initialMembers map[uint64]Target, join bool, cfg config.Config, smType pb.StateMachineType) (map[uint64]string, bool, error) { @@ -1537,7 +1536,7 @@ func (nh *NodeHost) startShard(initialMembers map[uint64]Target, if atomic.LoadInt32(&nh.closed) != 0 { return ErrClosed } - if _, ok := nh.mu.clusters.Load(shardID); ok { + if _, ok := nh.mu.shards.Load(shardID); ok { return ErrShardAlreadyExist } if nh.engine.nodeLoaded(shardID, replicaID) { @@ -1600,7 +1599,7 @@ func (nh *NodeHost) startShard(initialMembers map[uint64]Target, panicNow(err) } rn.loaded() - nh.mu.clusters.Store(shardID, rn) + nh.mu.shards.Store(shardID, rn) nh.mu.cci++ nh.cciUpdated() nh.engine.setCCIReady(shardID) @@ -1758,7 +1757,7 @@ func (nh *NodeHost) createTransport() error { func (nh *NodeHost) stopNode(shardID uint64, replicaID uint64, check bool) error { nh.mu.Lock() defer nh.mu.Unlock() - v, ok := nh.mu.clusters.Load(shardID) + v, ok := nh.mu.shards.Load(shardID) if !ok { return ErrShardNotFound } @@ -1766,7 +1765,7 @@ func (nh *NodeHost) stopNode(shardID uint64, replicaID uint64, check bool) error if check && n.replicaID != replicaID { return ErrShardNotFound } - nh.mu.clusters.Delete(shardID) + nh.mu.shards.Delete(shardID) nh.mu.cci++ nh.cciUpdated() nh.engine.setCCIReady(shardID) @@ -1780,12 +1779,12 @@ func (nh *NodeHost) stopNode(shardID uint64, replicaID uint64, check bool) error } func (nh *NodeHost) getShardInfo() []ShardInfo { - clusterInfoList := make([]ShardInfo, 0) + shardInfoList := make([]ShardInfo, 0) nh.forEachShard(func(cid uint64, node *node) bool { - clusterInfoList = append(clusterInfoList, node.getShardInfo()) + shardInfoList = append(shardInfoList, node.getShardInfo()) return true }) - return clusterInfoList + return shardInfoList } func (nh *NodeHost) tickWorkerMain() { @@ -1867,8 +1866,8 @@ func (nh *NodeHost) sendMessage(msg pb.Message) { } } -func (nh *NodeHost) sendTickMessage(clusters []*node, tick uint64) { - for _, n := range clusters { +func (nh *NodeHost) sendTickMessage(shards []*node, tick uint64) { + for _, n := range shards { m := pb.Message{ Type: pb.LocalTick, To: n.replicaID, @@ -1963,19 +1962,19 @@ func getRequestState(ctx context.Context, rs *RequestState) (sm.Result, error) { // INodeUser is the interface implemented by a Raft node user type. A Raft node // user can be used to directly initiate proposals or read index operations // without locating the Raft node in NodeHost's node list first. It is useful -// when doing bulk load operations on selected clusters. +// when doing bulk load operations on selected shards. type INodeUser interface { - // ShardID is the cluster ID of the node. + // ShardID is the shard ID of the node. ShardID() uint64 - // ReplicaID is the node ID of the node. + // ReplicaID is the replica ID of the node. ReplicaID() uint64 - // Propose starts an asynchronous proposal on the Raft cluster represented by + // Propose starts an asynchronous proposal on the Raft shard represented by // the INodeUser instance. Its semantics is the same as the Propose() method // in NodeHost. Propose(s *client.Session, cmd []byte, timeout time.Duration) (*RequestState, error) // ReadIndex starts the asynchronous ReadIndex protocol used for linearizable - // reads on the Raft cluster represented by the INodeUser instance. Its + // reads on the Raft shard represented by the INodeUser instance. Its // semantics is the same as the ReadIndex() method in NodeHost. ReadIndex(timeout time.Duration) (*RequestState, error) } @@ -2065,7 +2064,7 @@ func (h *messageHandler) HandleMessageBatch(msg pb.MessageBatch) (uint64, uint64 n.mq.MustAdd(req) snapshotCount++ } else if req.Type == pb.SnapshotReceived { - plog.Debugf("SnapshotReceived received, cluster id %d, node id %d", + plog.Debugf("SnapshotReceived received, shard id %d, replica id %d", req.ShardID, req.From) n.mq.AddDelayed(pb.Message{ Type: pb.SnapshotStatus, diff --git a/nodehost_test.go b/nodehost_test.go index c2195c8c5..c40dbccb0 100644 --- a/nodehost_test.go +++ b/nodehost_test.go @@ -510,17 +510,17 @@ func createSingleTestNode(t *testing.T, to *testOption, nh *NodeHost) { } if to.createSM != nil { if err := nh.StartReplica(peers, to.join, to.createSM, *cfg); err != nil { - t.Fatalf("start cluster failed: %v", err) + t.Fatalf("start shard failed: %v", err) } } else if to.createConcurrentSM != nil { if err := nh.StartConcurrentReplica(peers, to.join, to.createConcurrentSM, *cfg); err != nil { - t.Fatalf("start concurrent cluster failed: %v", err) + t.Fatalf("start concurrent shard failed: %v", err) } } else if to.createOnDiskSM != nil { if err := nh.StartOnDiskReplica(peers, to.join, to.createOnDiskSM, *cfg); err != nil { - t.Fatalf("start on disk cluster fail: %v", err) + t.Fatalf("start on disk shard fail: %v", err) } } else { t.Fatalf("?!?") @@ -1016,7 +1016,7 @@ func TestNodeHostRegistry(t *testing.T) { } } if !good { - t.Fatalf("registry failed to report the expected num of clusters") + t.Fatalf("registry failed to report the expected num of shards") } rc.ShardID = 100 if err := nh2.StartReplica(peers, false, createSM, rc); err != nil { @@ -1040,7 +1040,7 @@ func TestNodeHostRegistry(t *testing.T) { return } } - t.Fatalf("failed to report the expected num of clusters") + t.Fatalf("failed to report the expected num of shards") } func TestGossipCanHandleDynamicRaftAddress(t *testing.T) { @@ -1541,7 +1541,7 @@ func TestJoinedShardCanBeRestartedOrJoinedAgain(t *testing.T) { peers := make(map[uint64]string) newPST := func(uint64, uint64) sm.IStateMachine { return &PST{} } if err := nh.StopShard(1); err != nil { - t.Fatalf("failed to stop the cluster: %v", err) + t.Fatalf("failed to stop the shard: %v", err) } for i := 0; i < 1000; i++ { err := nh.StartReplica(peers, true, newPST, *cfg) @@ -1552,7 +1552,7 @@ func TestJoinedShardCanBeRestartedOrJoinedAgain(t *testing.T) { time.Sleep(5 * time.Millisecond) continue } else { - t.Fatalf("failed to join the cluster again, %v", err) + t.Fatalf("failed to join the shard again, %v", err) } } }, @@ -1857,7 +1857,7 @@ func TestGetShardMembership(t *testing.T) { defer cancel() _, err := nh.SyncGetShardMembership(ctx, 1) if err != nil { - t.Fatalf("failed to get cluster membership") + t.Fatalf("failed to get shard membership") } }, } @@ -2074,7 +2074,7 @@ func TestNodeHostSyncIOAPIs(t *testing.T) { t.Errorf("failed to get result") } if err := nh.StopShard(1); err != nil { - t.Errorf("failed to stop cluster 2 %v", err) + t.Errorf("failed to stop shard 2 %v", err) } listener, ok := nh.events.sys.ul.(*testSysEventListener) if !ok { @@ -2338,7 +2338,7 @@ func TestNodeHostAddNonVotingRemoveNode(t *testing.T) { defer cancel() membership, err := nh.SyncGetShardMembership(ctx, 1) if err != nil { - t.Fatalf("failed to get cluster membership %v", err) + t.Fatalf("failed to get shard membership %v", err) } if len(membership.Nodes) != 1 || len(membership.Removed) != 0 { t.Errorf("unexpected nodes/removed len") @@ -2363,7 +2363,7 @@ func TestNodeHostAddNonVotingRemoveNode(t *testing.T) { defer cancel() membership, err = nh.SyncGetShardMembership(ctx, 1) if err != nil { - t.Fatalf("failed to get cluster membership %v", err) + t.Fatalf("failed to get shard membership %v", err) } if len(membership.Nodes) != 1 || len(membership.Removed) != 1 { t.Errorf("unexpected nodes/removed len") @@ -2578,7 +2578,7 @@ func TestOnDiskSMCanStreamSnapshot(t *testing.T) { return sm1 } if err := nh1.StartOnDiskReplica(peers, false, newSM, rc); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } waitForLeaderToBeElected(t, nh1, 1) logdb := nh1.mu.logdb @@ -2633,7 +2633,7 @@ func TestOnDiskSMCanStreamSnapshot(t *testing.T) { } sm1.ClearAborted() if err := nh2.StartOnDiskReplica(nil, true, newSM2, rc); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } ssIndex := uint64(0) logdb = nh2.mu.logdb @@ -3599,7 +3599,7 @@ func TestSyncRemoveData(t *testing.T) { defaultTestNode: true, tf: func(nh *NodeHost) { if err := nh.StopShard(1); err != nil { - t.Fatalf("failed to remove cluster %v", err) + t.Fatalf("failed to remove shard %v", err) } pto := lpto(nh) ctx, cancel := context.WithTimeout(context.Background(), pto) @@ -3640,7 +3640,7 @@ func TestRestartingAnNodeWithRemovedDataWillBeRejected(t *testing.T) { defaultTestNode: true, tf: func(nh *NodeHost) { if err := nh.StopShard(1); err != nil { - t.Fatalf("failed to remove cluster %v", err) + t.Fatalf("failed to remove shard %v", err) } for { if err := nh.RemoveData(1, 1); err != nil { @@ -3660,7 +3660,7 @@ func TestRestartingAnNodeWithRemovedDataWillBeRejected(t *testing.T) { return &PST{} } if err := nh.StartReplica(peers, false, newPST, *rc); err != ErrReplicaRemoved { - t.Errorf("start cluster failed %v", err) + t.Errorf("start shard failed %v", err) } }, } @@ -3690,7 +3690,7 @@ func TestRemoveNodeDataRemovesAllNodeData(t *testing.T) { t.Errorf("failed to complete the requested snapshot") } if err := nh.StopShard(1); err != nil { - t.Fatalf("failed to stop cluster %v", err) + t.Fatalf("failed to stop shard %v", err) } logdb := nh.mu.logdb snapshot, err := logdb.GetSnapshot(1, 1) @@ -4062,7 +4062,7 @@ func testImportedSnapshotIsAlwaysRestored(t *testing.T, return tests.NewSimDiskSM(0) } if err := nh.StartOnDiskReplica(peers, false, newSM, rc); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } waitForLeaderToBeElected(t, nh, 1) makeProposals := func(nn *NodeHost) { @@ -4155,7 +4155,7 @@ func testImportedSnapshotIsAlwaysRestored(t *testing.T, return tests.NewSimDiskSM(applied) } if err := rnh.StartOnDiskReplica(nil, false, rnewSM, rc); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } waitForLeaderToBeElected(t, rnh, 1) ctx, cancel = context.WithTimeout(context.Background(), pto) @@ -4239,7 +4239,7 @@ func TestShardWithoutQuorumCanBeRestoreByImportingSnapshot(t *testing.T) { return tests.NewFakeDiskSM(0) } if err := nh1.StartOnDiskReplica(peers, false, newSM, rc); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } waitForLeaderToBeElected(t, nh1, 1) defer func() { @@ -4267,7 +4267,7 @@ func TestShardWithoutQuorumCanBeRestoreByImportingSnapshot(t *testing.T) { } } if !done { - t.Fatalf("failed to make proposal on restored cluster") + t.Fatalf("failed to make proposal on restored shard") } } mkproposal(nh1) @@ -4326,11 +4326,11 @@ func TestShardWithoutQuorumCanBeRestoreByImportingSnapshot(t *testing.T) { rnh2.Close() }() if err := rnh1.StartOnDiskReplica(nil, false, newSM, rc); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } rc.ReplicaID = 10 if err := rnh2.StartOnDiskReplica(nil, false, newSM2, rc); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } waitForLeaderToBeElected(t, rnh1, 1) mkproposal(rnh1) @@ -4807,7 +4807,7 @@ func TestLeaderInfoIsReported(t *testing.T) { t.Errorf("unexpected len: %d", len(nhi.ShardInfoList)) } if nhi.ShardInfoList[0].ShardID != 1 { - t.Fatalf("unexpected cluster id") + t.Fatalf("unexpected shard id") } if nhi.ShardInfoList[0].LeaderID != 1 { time.Sleep(20 * time.Millisecond) @@ -5117,7 +5117,7 @@ func testWitnessIO(t *testing.T, return tests.NewSimDiskSM(0) } if err := nh1.StartOnDiskReplica(peers, false, newSM, rc); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } waitForLeaderToBeElected(t, nh1, 1) for i := 0; i < 8; i++ { @@ -5152,7 +5152,7 @@ func testWitnessIO(t *testing.T, return witness } if err := nh2.StartOnDiskReplica(nil, true, newWitness, rc2); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } waitForLeaderToBeElected(t, nh2, 1) witnessTestFunc(nh1, nh2, witness) @@ -5372,7 +5372,7 @@ func TestNodeCanBeUnloadedOnceClosed(t *testing.T) { tf: func(nh *NodeHost) { countNodes := func(nh *NodeHost) uint64 { count := uint64(0) - nh.mu.clusters.Range(func(key, value interface{}) bool { + nh.mu.shards.Range(func(key, value interface{}) bool { count++ return true }) @@ -5577,7 +5577,7 @@ func TestHandleSnapshotStatus(t *testing.T) { h := messageHandler{nh: nh} mq := server.NewMessageQueue(1024, false, lazyFreeCycle, 1024) node := &node{shardID: 1, replicaID: 1, mq: mq} - h.nh.mu.clusters.Store(uint64(1), node) + h.nh.mu.shards.Store(uint64(1), node) h.HandleSnapshotStatus(1, 2, true) for i := uint64(0); i <= streamPushDelayTick; i++ { node.mq.Tick() @@ -5605,7 +5605,7 @@ func TestSnapshotReceivedMessageCanBeConverted(t *testing.T) { h := messageHandler{nh: nh} mq := server.NewMessageQueue(1024, false, lazyFreeCycle, 1024) node := &node{shardID: 1, replicaID: 1, mq: mq} - h.nh.mu.clusters.Store(uint64(1), node) + h.nh.mu.shards.Store(uint64(1), node) mb := pb.MessageBatch{ Requests: []pb.Message{{To: 1, From: 2, ShardID: 1, Type: pb.SnapshotReceived}}, } @@ -5639,7 +5639,7 @@ func TestIncorrectlyRoutedMessagesAreIgnored(t *testing.T) { h := messageHandler{nh: nh} mq := server.NewMessageQueue(1024, false, lazyFreeCycle, 1024) node := &node{shardID: 1, replicaID: 1, mq: mq} - h.nh.mu.clusters.Store(uint64(1), node) + h.nh.mu.shards.Store(uint64(1), node) mb := pb.MessageBatch{ Requests: []pb.Message{{To: 3, From: 2, ShardID: 1, Type: pb.SnapshotReceived}}, } @@ -5812,7 +5812,7 @@ func TestSlowTestStressedSnapshotWorker(t *testing.T) { for i := uint64(1); i <= uint64(96); i++ { rc.ShardID = i if err := nh1.StartReplica(peers, false, newRSM, rc); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } cs := nh1.GetNoOPSession(i) total := 20 @@ -5867,7 +5867,7 @@ func TestSlowTestStressedSnapshotWorker(t *testing.T) { rc.ReplicaID = 2 peers := make(map[uint64]string) if err := nh2.StartReplica(peers, true, newRSM, rc); err != nil { - t.Fatalf("failed to start cluster %v", err) + t.Fatalf("failed to start shard %v", err) } } for i := uint64(1); i <= uint64(96); i++ { @@ -5881,7 +5881,7 @@ func TestSlowTestStressedSnapshotWorker(t *testing.T) { if err == ErrTimeout || err == ErrShardNotReady { total-- if total == 0 { - t.Fatalf("failed to make proposal on cluster %d", i) + t.Fatalf("failed to make proposal on shard %d", i) } time.Sleep(100 * time.Millisecond) continue diff --git a/queue_test.go b/queue_test.go index fd486a8d4..a92551721 100644 --- a/queue_test.go +++ b/queue_test.go @@ -231,11 +231,11 @@ func TestShardCanBeSetAsReady(t *testing.T) { } _, ok := rc.ready[1] if !ok { - t.Errorf("cluster 1 not set as ready") + t.Errorf("shard 1 not set as ready") } _, ok = rc.ready[2] if !ok { - t.Errorf("cluster 2 not set as ready") + t.Errorf("shard 2 not set as ready") } } @@ -255,16 +255,16 @@ func TestReadyShardCanBeReturnedAndCleared(t *testing.T) { t.Errorf("ready map sz %d, want 2", len(r)) } if len(rc.ready) != 0 { - t.Errorf("cluster ready map not cleared") + t.Errorf("shard ready map not cleared") } r = rc.getReadyShards() if len(r) != 0 { - t.Errorf("cluster ready map not cleared") + t.Errorf("shard ready map not cleared") } rc.setShardReady(4) r = rc.getReadyShards() if len(r) != 1 { - t.Errorf("cluster ready not set") + t.Errorf("shard ready not set") } } diff --git a/raftio/logdb.go b/raftio/logdb.go index c45de46ea..1cee09815 100644 --- a/raftio/logdb.go +++ b/raftio/logdb.go @@ -50,10 +50,10 @@ type RaftState struct { EntryCount uint64 } -// GetNodeInfo returns a NodeInfo instance with the specified cluster ID -// and node ID. -func GetNodeInfo(cid uint64, nid uint64) NodeInfo { - return NodeInfo{ShardID: cid, ReplicaID: nid} +// GetNodeInfo returns a NodeInfo instance with the specified shard ID +// and replica ID. +func GetNodeInfo(shardID uint64, replicaID uint64) NodeInfo { + return NodeInfo{ShardID: shardID, ReplicaID: replicaID} } // ILogDB is the interface implemented by the log DB for persistently store @@ -100,7 +100,7 @@ type ILogDB interface { // SaveSnapshots saves all snapshot metadata found in the pb.Update list. SaveSnapshots([]pb.Update) error // GetSnapshot returns the most recent snapshot associated with the specified - // cluster. + // shard. GetSnapshot(shardID uint64, replicaID uint64) (pb.Snapshot, error) // RemoveNodeData removes all data associated with the specified node. RemoveNodeData(shardID uint64, replicaID uint64) error diff --git a/request.go b/request.go index dcc332a33..7f73e2e65 100644 --- a/request.go +++ b/request.go @@ -66,11 +66,11 @@ var ( // Raft config change operation, ErrSystemBusy means there is already such a // request waiting to be processed. ErrSystemBusy = errors.New("system is too busy try again later") - // ErrShardClosed indicates that the requested cluster is being shut down. - ErrShardClosed = errors.New("raft cluster already closed") + // ErrShardClosed indicates that the requested shard is being shut down. + ErrShardClosed = errors.New("raft shard already closed") // ErrShardNotInitialized indicates that the requested operation can not be - // completed as the involved raft cluster has not been initialized yet. - ErrShardNotInitialized = errors.New("raft cluster not initialized yet") + // completed as the involved raft shard has not been initialized yet. + ErrShardNotInitialized = errors.New("raft shard not initialized yet") // ErrTimeout indicates that the operation timed out. ErrTimeout = errors.New("timeout") // ErrCanceled indicates that the request has been canceled. @@ -78,10 +78,10 @@ var ( // ErrRejected indicates that the request has been rejected. ErrRejected = errors.New("request rejected") // ErrShardNotReady indicates that the request has been dropped as the - // specified raft cluster is not ready to handle the request. Unknown leader - // is the most common cause of this Error, trying to use a cluster not fully + // specified raft shard is not ready to handle the request. Unknown leader + // is the most common cause of this Error, trying to use a shard not fully // initialized is another major cause of ErrShardNotReady. - ErrShardNotReady = errors.New("request dropped as the cluster is not ready") + ErrShardNotReady = errors.New("request dropped as the shard is not ready") // ErrInvalidTarget indicates that the specified node id invalid. ErrInvalidTarget = errors.New("invalid target node ID") ) @@ -141,14 +141,14 @@ func (rr *RequestResult) Committed() bool { // Completed returns a boolean value indicating whether the request completed // successfully. For proposals, it means the proposal has been committed by the -// Raft cluster and applied on the local node. For ReadIndex operation, it means -// the cluster is now ready for a local read. +// Raft shard and applied on the local node. For ReadIndex operation, it means +// the shard is now ready for a local read. func (rr *RequestResult) Completed() bool { return rr.code == requestCompleted } // Terminated returns a boolean value indicating the request terminated due to -// the requested Raft cluster is being shut down. +// the requested Raft shard is being shut down. func (rr *RequestResult) Terminated() bool { return rr.code == requestTerminated } @@ -1104,7 +1104,7 @@ func (p *proposalShard) propose(session *client.Session, added, stopped := p.proposals.add(entry) if stopped { - plog.Warningf("%s dropped proposal, cluster stopped", + plog.Warningf("%s dropped proposal, shard stopped", dn(p.cfg.ShardID, p.cfg.ReplicaID)) p.mu.Lock() delete(p.pending, entry.Key) diff --git a/statemachine/extension.go b/statemachine/extension.go index a27eaddd9..fd1a848db 100644 --- a/statemachine/extension.go +++ b/statemachine/extension.go @@ -29,7 +29,7 @@ var ( type IHash interface { // GetHash returns a uint64 value used to represent the current state of the // state machine. The hash should be generated in a deterministic manner - // which means nodes from the same Raft cluster are suppose to return the + // which means nodes from the same Raft shard are suppose to return the // same hash result when they have the same Raft Log entries applied. // // GetHash is a read-only operation. diff --git a/tools/checkdisk/main.go b/tools/checkdisk/main.go index 07daeb588..3b826968b 100644 --- a/tools/checkdisk/main.go +++ b/tools/checkdisk/main.go @@ -44,7 +44,7 @@ const ( raftAddress2 = "localhost:26001" ) -var clustercount = flag.Int("num-of-clusters", 48, "number of raft clusters") +var shardcount = flag.Int("num-of-shards", 48, "number of raft shards") var read = flag.Bool("enable-read", false, "enable read") var readonly = flag.Bool("read-only", false, "read only") var batched = flag.Bool("batched-logdb", false, "use batched logdb") @@ -197,7 +197,7 @@ func main() { nodes[2] = raftAddress2 } nhList := make([]*dragonboat.NodeHost, 0) - for i := uint64(1); i <= uint64(*clustercount); i++ { + for i := uint64(1); i <= uint64(*shardcount); i++ { rc.ShardID = i if err := nh.StartReplica(nodes, false, newDummyStateMachine, rc); err != nil { panic(err) @@ -210,7 +210,7 @@ func main() { } } } - for i := uint64(1); i <= uint64(*clustercount); i++ { + for i := uint64(1); i <= uint64(*shardcount); i++ { for j := 0; j < 10000; j++ { leaderID, ok, err := nh.GetLeaderID(i) if err != nil { @@ -237,10 +237,10 @@ func main() { } } } - if len(nhList) != *clustercount { + if len(nhList) != *shardcount { panic(fmt.Sprintf("nhList len unexpected, %d", len(nhList))) } - fmt.Printf("clusters are ready, will run for %d seconds\n", *seconds) + fmt.Printf("shards are ready, will run for %d seconds\n", *seconds) if *cpupprof { f, err := os.Create("cpu.pprof") if err != nil { @@ -272,7 +272,7 @@ func main() { for i := uint64(0); i < uint64(*clientcount); i++ { workerID := i stopper.RunWorker(func() { - shardID := (workerID % uint64(*clustercount)) + 1 + shardID := (workerID % uint64(*shardcount)) + 1 nh := nhList[shardID-1] cs := nh.GetNoOPSession(shardID) cmd := make([]byte, 16) @@ -303,7 +303,7 @@ func main() { for i := uint64(0); i < uint64(*clientcount); i++ { workerID := i stopper.RunWorker(func() { - shardID := (workerID % uint64(*clustercount)) + 1 + shardID := (workerID % uint64(*shardcount)) + 1 nh := nhList[shardID-1] for { for j := 0; j < 32; j++ { diff --git a/tools/import.go b/tools/import.go index 5555f1aaf..2554b3a05 100644 --- a/tools/import.go +++ b/tools/import.go @@ -59,74 +59,74 @@ var ( var firstError = utils.FirstError -// ImportSnapshot is used to repair the Raft cluster already has its quorum +// ImportSnapshot is used to repair the Raft shard already has its quorum // nodes permanently lost or damaged. Such repair is only required when the -// Raft cluster permanently lose its quorum. You are not suppose to use this -// function when the cluster still have its majority nodes running or when +// Raft shard permanently lose its quorum. You are not suppose to use this +// function when the shard still have its majority nodes running or when // the node failures are not permanent. In our experience, a well monitored // and managed Dragonboat system can usually avoid using the ImportSnapshot // tool by always replace permanently dead nodes with available ones in time. // // ImportSnapshot imports the exported snapshot available in the specified // srcDir directory to the system and rewrites the history of node replicaID so -// the node owns the imported snapshot and the membership of the Raft cluster +// the node owns the imported snapshot and the membership of the Raft shard // is rewritten to the details specified in memberNodes. // // ImportSnapshot is typically invoked by a DevOps tool separated from the // Dragonboat based application. The NodeHost instance must be stopped on that // host when invoking the function ImportSnapshot. // -// As an example, consider a Raft cluster with three nodes with the ReplicaID +// As an example, consider a Raft shard with three nodes with the ReplicaID // values being 1, 2 and 3, they run on three distributed hostss each with a // running NodeHost instance and the RaftAddress values are m1, m2 and -// m3. The ShardID value of the Raft cluster is 100. Let's say hosts +// m3. The ShardID value of the Raft shard is 100. Let's say hosts // identified by m2 and m3 suddenly become permanently gone and thus cause the -// Raft cluster to lose its quorum nodes. To repair the cluster, we can use the +// Raft shard to lose its quorum nodes. To repair the shard, we can use the // ImportSnapshot function to overwrite the state and membership of the Raft -// cluster. +// shard. // // Assuming we have two other running hosts identified as m4 and m5, we want to // have two new nodes with ReplicaID 4 and 5 to replace the permanently lost ndoes // 2 and 3. In this case, the memberNodes map should contain the following // content: // -// memberNodes: map[uint64]string{ -// {1: "m1"}, {4: "m4"}, {5: "m5"}, -// } +// memberNodes: map[uint64]string{ +// {1: "m1"}, {4: "m4"}, {5: "m5"}, +// } // // we first shutdown NodeHost instances on all involved hosts and call the // ImportSnapshot function from the DevOps tool. Assuming the directory -// /backup/cluster100 contains the exported snapshot we previously saved by using +// /backup/shard100 contains the exported snapshot we previously saved by using // NodeHost's ExportSnapshot method, then - // // on m1, we call - -// ImportSnapshot(nhConfig1, "/backup/cluster100", memberNodes, 1) +// ImportSnapshot(nhConfig1, "/backup/shard100", memberNodes, 1) // // on m4 - -// ImportSnapshot(nhConfig4, "/backup/cluster100", memberNodes, 4) +// ImportSnapshot(nhConfig4, "/backup/shard100", memberNodes, 4) // // on m5 - -// ImportSnapshot(nhConfig5, "/backup/cluster100", memberNodes, 5) +// ImportSnapshot(nhConfig5, "/backup/shard100", memberNodes, 5) // // The nhConfig* value used above should be the same as the one used to start // your NodeHost instances, they are suppose to be slightly different on m1, m4 // and m5 to reflect the differences between these hosts, e.g. the RaftAddress -// values. srcDir values are all set to "/backup/cluster100", that directory +// values. srcDir values are all set to "/backup/shard100", that directory // should contain the exact same snapshot. The memberNodes value should be the // same across all three hosts. // // Once ImportSnapshot is called on all three of those hosts, we end up having -// the history of the Raft cluster overwritten to the state in which - -// * there are 3 nodes in the Raft cluster, the ReplicaID values are 1, 4 and 5. -// they run on hosts m1, m4 and m5. -// * nodes 2 and 3 are permanently removed from the cluster. you should never -// restart any of them as both hosts m2 and m3 are suppose to be permanently -// unavailable. -// * the state captured in the snapshot became the state of the cluster. all -// proposals more recent than the state of the snapshot are lost. +// the history of the Raft shard overwritten to the state in which - +// - there are 3 nodes in the Raft shard, the ReplicaID values are 1, 4 and 5. +// they run on hosts m1, m4 and m5. +// - nodes 2 and 3 are permanently removed from the shard. you should never +// restart any of them as both hosts m2 and m3 are suppose to be permanently +// unavailable. +// - the state captured in the snapshot became the state of the shard. all +// proposals more recent than the state of the snapshot are lost. // // Once the NodeHost instances are restarted on m1, m4 and m5, nodes 1, 4 and 5 -// of the Raft cluster 100 can be restarted in the same way as after rebooting +// of the Raft shard 100 can be restarted in the same way as after rebooting // the hosts m1, m4 and m5. // // It is your applications's responsibility to let m4 and m5 to be aware that