Skip to content

Commit

Permalink
Merge pull request #2544 from openziti/join-cluster
Browse files Browse the repository at this point in the history
Support joining to a cluster from an unattached node. Fixes #2543
  • Loading branch information
plorenz authored Nov 15, 2024
2 parents 4c9c647 + f103154 commit 4687125
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 19 deletions.
68 changes: 51 additions & 17 deletions controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ func (self *Controller) agentOpSnapshotDb(m *channel.Message, ch channel.Channel

func (self *Controller) agentOpRaftList(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "cluster.list", "controller not running in clustered mode", false)
return
}

members, err := self.raftController.ListMembers()
if err != nil {
handler_common.SendOpResult(m, ch, "raft.list", err.Error(), false)
handler_common.SendOpResult(m, ch, "cluster.list", err.Error(), false)
}

result := &mgmt_pb.RaftMemberListResponse{}
Expand All @@ -111,22 +111,18 @@ func (self *Controller) agentOpRaftList(m *channel.Message, ch channel.Channel)

func (self *Controller) agentOpRaftAddPeer(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "cluster.add-peer", "controller not running in clustered mode", false)
return
}

if !self.raftController.IsBootstrapped() {
handler_common.SendOpResult(m, ch, "raft.list",
"Local instance not initialized. If all instances are uninitialized, initialize one first using"+
" 'ziti agent controller init' then add nodes to that instance. If you have existing initialized nodes,"+
" use the 'ziti agent cluster add' command on an initialized node, rather than here.",
false)
self.agentOpRaftJoinCluster(m, ch)
return
}

addr, found := m.GetStringHeader(AgentAddrHeader)
if !found {
handler_common.SendOpResult(m, ch, "raft.join", "address not supplied", false)
handler_common.SendOpResult(m, ch, "cluster.add-peer", "address not supplied", false)
return
}

Expand All @@ -135,7 +131,7 @@ func (self *Controller) agentOpRaftAddPeer(m *channel.Message, ch channel.Channe
peerId, peerAddr, err := self.raftController.Mesh.GetPeerInfo(addr, 15*time.Second)
if err != nil {
errMsg := fmt.Sprintf("id not supplied and unable to retrieve [%v]", err.Error())
handler_common.SendOpResult(m, ch, "raft.join", errMsg, false)
handler_common.SendOpResult(m, ch, "cluster.add-peer", errMsg, false)
return
}
id = string(peerId)
Expand All @@ -160,9 +156,47 @@ func (self *Controller) agentOpRaftAddPeer(m *channel.Message, ch channel.Channe
handler_common.SendOpResult(m, ch, "cluster.add-peer", fmt.Sprintf("success, added %v at %v to cluster", id, addr), true)
}

func (self *Controller) agentOpRaftJoinCluster(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "cluster.join", "controller not running in clustered mode", false)
return
}

if self.raftController.IsBootstrapped() {
handler_common.SendOpResult(m, ch, "cluster.join",
"Local instance is already initialized. Only uninitialized nodes may be joined to a cluster. ",
false)
return
}

addr, found := m.GetStringHeader(AgentAddrHeader)
if !found {
handler_common.SendOpResult(m, ch, "cluster.join", "address not supplied", false)
return
}

isVoter, found := m.GetBoolHeader(AgentIsVoterHeader)
if !found {
isVoter = true
}

req := &cmd_pb.AddPeerRequest{
Addr: self.raftController.Config.AdvertiseAddress.String(),
Id: self.config.Id.Token,
IsVoter: isVoter,
}

if err := self.raftController.ForwardToAddr(addr, req); err != nil {
handler_common.SendOpResult(m, ch, "cluster.join", err.Error(), false)
return
}

handler_common.SendOpResult(m, ch, "cluster.join", "success, added self to cluster", true)
}

func (self *Controller) agentOpRaftRemovePeer(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "cluster.remove-peer", "controller not running in clustered mode", false)
return
}

Expand All @@ -185,7 +219,7 @@ func (self *Controller) agentOpRaftRemovePeer(m *channel.Message, ch channel.Cha

func (self *Controller) agentOpRaftTransferLeadership(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "cluster.transfer-leadership", "controller not running in clustered mode", false)
return
}

Expand All @@ -203,26 +237,26 @@ func (self *Controller) agentOpRaftTransferLeadership(m *channel.Message, ch cha

func (self *Controller) agentOpInitFromDb(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "cluster.init-from-db", "controller not running in clustered mode", false)
return
}

sourceDbPath := string(m.Body)
if len(sourceDbPath) == 0 {
handler_common.SendOpResult(m, ch, "raft.initFromDb", "source db not supplied", false)
handler_common.SendOpResult(m, ch, "cluster.init-from-db", "source db not supplied", false)
return
}

if err := self.InitializeRaftFromBoltDb(sourceDbPath); err != nil {
handler_common.SendOpResult(m, ch, "raft.initFromDb", err.Error(), false)
handler_common.SendOpResult(m, ch, "cluster.init-from-db", err.Error(), false)
return
}
handler_common.SendOpResult(m, ch, "raft.initFromDb", fmt.Sprintf("success, initialized from [%v]", sourceDbPath), true)
handler_common.SendOpResult(m, ch, "cluster.init-from-db", fmt.Sprintf("success, initialized from [%v]", sourceDbPath), true)
}

func (self *Controller) agentOpInit(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "init.edge", "controller not running in clustered mode", false)
return
}

Expand Down
5 changes: 5 additions & 0 deletions controller/handler_peer_ctrl/add_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func (self *addPeerHandler) handleAddPeer(m *channel.Message, ch channel.Channel

log.Infof("received join request id: %v, addr: %v, voter: %v", req.Id, req.Addr, !req.IsVoter)

if !self.controller.IsBootstrapped() {
sendErrorResponse(m, ch, errors.New("node not member of bootstrapped cluster, unable to add peers"), peermsg.ErrorCodeGeneric)
return
}

if err := self.controller.HandleAddPeer(req); err != nil {
if errors.Is(err, raft2.ErrNotLeader) {
sendErrorResponse(m, ch, err, peermsg.ErrorCodeNotLeader)
Expand Down
6 changes: 5 additions & 1 deletion controller/raft/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ func (self *Controller) forwardToLeader(req protobufs.TypedMessage) error {
return errors.New("no leader, unable to forward request")
}

peer, err := self.GetMesh().GetOrConnectPeer(self.GetLeaderAddr(), 5*time.Second)
return self.ForwardToAddr(leader, req)
}

func (self *Controller) ForwardToAddr(addr string, req protobufs.TypedMessage) error {
peer, err := self.GetMesh().GetOrConnectPeer(addr, 5*time.Second)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion controller/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ func (self *Controller) retryBootstrapMember(bootstrapMember string) {
}

if err = self.Join(req); err == nil {
pfxlog.Logger().WithError(err).Errorf("erroring adding bootstrap member [%s], stopping attempts to join it to the cluster", bootstrapMember)
pfxlog.Logger().WithError(err).Errorf("error adding bootstrap member [%s], stopping attempts to join it to the cluster", bootstrapMember)
return
}
}
Expand Down

0 comments on commit 4687125

Please sign in to comment.