Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support joining to a cluster from an unattached node. Fixes #2543 #2544

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 51 additions & 17 deletions controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ func (self *Controller) agentOpSnapshotDb(m *channel.Message, ch channel.Channel

func (self *Controller) agentOpRaftList(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "cluster.list", "controller not running in clustered mode", false)
return
}

members, err := self.raftController.ListMembers()
if err != nil {
handler_common.SendOpResult(m, ch, "raft.list", err.Error(), false)
handler_common.SendOpResult(m, ch, "cluster.list", err.Error(), false)
}

result := &mgmt_pb.RaftMemberListResponse{}
Expand All @@ -111,22 +111,18 @@ func (self *Controller) agentOpRaftList(m *channel.Message, ch channel.Channel)

func (self *Controller) agentOpRaftAddPeer(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "cluster.add-peer", "controller not running in clustered mode", false)
return
}

if !self.raftController.IsBootstrapped() {
handler_common.SendOpResult(m, ch, "raft.list",
"Local instance not initialized. If all instances are uninitialized, initialize one first using"+
" 'ziti agent controller init' then add nodes to that instance. If you have existing initialized nodes,"+
" use the 'ziti agent cluster add' command on an initialized node, rather than here.",
false)
self.agentOpRaftJoinCluster(m, ch)
return
}

addr, found := m.GetStringHeader(AgentAddrHeader)
if !found {
handler_common.SendOpResult(m, ch, "raft.join", "address not supplied", false)
handler_common.SendOpResult(m, ch, "cluster.add-peer", "address not supplied", false)
return
}

Expand All @@ -135,7 +131,7 @@ func (self *Controller) agentOpRaftAddPeer(m *channel.Message, ch channel.Channe
peerId, peerAddr, err := self.raftController.Mesh.GetPeerInfo(addr, 15*time.Second)
if err != nil {
errMsg := fmt.Sprintf("id not supplied and unable to retrieve [%v]", err.Error())
handler_common.SendOpResult(m, ch, "raft.join", errMsg, false)
handler_common.SendOpResult(m, ch, "cluster.add-peer", errMsg, false)
return
}
id = string(peerId)
Expand All @@ -160,9 +156,47 @@ func (self *Controller) agentOpRaftAddPeer(m *channel.Message, ch channel.Channe
handler_common.SendOpResult(m, ch, "cluster.add-peer", fmt.Sprintf("success, added %v at %v to cluster", id, addr), true)
}

func (self *Controller) agentOpRaftJoinCluster(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "cluster.join", "controller not running in clustered mode", false)
return
}

if self.raftController.IsBootstrapped() {
handler_common.SendOpResult(m, ch, "cluster.join",
"Local instance is already initialized. Only uninitialized nodes may be joined to a cluster. ",
false)
return
}

addr, found := m.GetStringHeader(AgentAddrHeader)
if !found {
handler_common.SendOpResult(m, ch, "cluster.join", "address not supplied", false)
return
}

isVoter, found := m.GetBoolHeader(AgentIsVoterHeader)
if !found {
isVoter = true
}

req := &cmd_pb.AddPeerRequest{
Addr: self.raftController.Config.AdvertiseAddress.String(),
Id: self.config.Id.Token,
IsVoter: isVoter,
}

if err := self.raftController.ForwardToAddr(addr, req); err != nil {
handler_common.SendOpResult(m, ch, "cluster.join", err.Error(), false)
return
}

handler_common.SendOpResult(m, ch, "cluster.join", "success, added self to cluster", true)
}

func (self *Controller) agentOpRaftRemovePeer(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "cluster.remove-peer", "controller not running in clustered mode", false)
return
}

Expand All @@ -185,7 +219,7 @@ func (self *Controller) agentOpRaftRemovePeer(m *channel.Message, ch channel.Cha

func (self *Controller) agentOpRaftTransferLeadership(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "cluster.transfer-leadership", "controller not running in clustered mode", false)
return
}

Expand All @@ -203,26 +237,26 @@ func (self *Controller) agentOpRaftTransferLeadership(m *channel.Message, ch cha

func (self *Controller) agentOpInitFromDb(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "cluster.init-from-db", "controller not running in clustered mode", false)
return
}

sourceDbPath := string(m.Body)
if len(sourceDbPath) == 0 {
handler_common.SendOpResult(m, ch, "raft.initFromDb", "source db not supplied", false)
handler_common.SendOpResult(m, ch, "cluster.init-from-db", "source db not supplied", false)
return
}

if err := self.InitializeRaftFromBoltDb(sourceDbPath); err != nil {
handler_common.SendOpResult(m, ch, "raft.initFromDb", err.Error(), false)
handler_common.SendOpResult(m, ch, "cluster.init-from-db", err.Error(), false)
return
}
handler_common.SendOpResult(m, ch, "raft.initFromDb", fmt.Sprintf("success, initialized from [%v]", sourceDbPath), true)
handler_common.SendOpResult(m, ch, "cluster.init-from-db", fmt.Sprintf("success, initialized from [%v]", sourceDbPath), true)
}

func (self *Controller) agentOpInit(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false)
handler_common.SendOpResult(m, ch, "init.edge", "controller not running in clustered mode", false)
return
}

Expand Down
5 changes: 5 additions & 0 deletions controller/handler_peer_ctrl/add_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func (self *addPeerHandler) handleAddPeer(m *channel.Message, ch channel.Channel

log.Infof("received join request id: %v, addr: %v, voter: %v", req.Id, req.Addr, !req.IsVoter)

if !self.controller.IsBootstrapped() {
sendErrorResponse(m, ch, errors.New("node not member of bootstrapped cluster, unable to add peers"), peermsg.ErrorCodeGeneric)
return
}

if err := self.controller.HandleAddPeer(req); err != nil {
if errors.Is(err, raft2.ErrNotLeader) {
sendErrorResponse(m, ch, err, peermsg.ErrorCodeNotLeader)
Expand Down
6 changes: 5 additions & 1 deletion controller/raft/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ func (self *Controller) forwardToLeader(req protobufs.TypedMessage) error {
return errors.New("no leader, unable to forward request")
}

peer, err := self.GetMesh().GetOrConnectPeer(self.GetLeaderAddr(), 5*time.Second)
return self.ForwardToAddr(leader, req)
}

func (self *Controller) ForwardToAddr(addr string, req protobufs.TypedMessage) error {
peer, err := self.GetMesh().GetOrConnectPeer(addr, 5*time.Second)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion controller/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ func (self *Controller) retryBootstrapMember(bootstrapMember string) {
}

if err = self.Join(req); err == nil {
pfxlog.Logger().WithError(err).Errorf("erroring adding bootstrap member [%s], stopping attempts to join it to the cluster", bootstrapMember)
pfxlog.Logger().WithError(err).Errorf("error adding bootstrap member [%s], stopping attempts to join it to the cluster", bootstrapMember)
return
}
}
Expand Down
Loading