From f103154542613e797c2221f332a0a12108019170 Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Thu, 14 Nov 2024 21:44:31 -0500 Subject: [PATCH] Support joining to a cluster from an unattached node. Fixes #2543 --- controller/agent.go | 68 ++++++++++++++++++------ controller/handler_peer_ctrl/add_peer.go | 5 ++ controller/raft/member.go | 6 ++- controller/raft/raft.go | 2 +- 4 files changed, 62 insertions(+), 19 deletions(-) diff --git a/controller/agent.go b/controller/agent.go index 836344c1d..a89e1ab37 100644 --- a/controller/agent.go +++ b/controller/agent.go @@ -83,13 +83,13 @@ func (self *Controller) agentOpSnapshotDb(m *channel.Message, ch channel.Channel func (self *Controller) agentOpRaftList(m *channel.Message, ch channel.Channel) { if self.raftController == nil { - handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false) + handler_common.SendOpResult(m, ch, "cluster.list", "controller not running in clustered mode", false) return } members, err := self.raftController.ListMembers() if err != nil { - handler_common.SendOpResult(m, ch, "raft.list", err.Error(), false) + handler_common.SendOpResult(m, ch, "cluster.list", err.Error(), false) } result := &mgmt_pb.RaftMemberListResponse{} @@ -111,22 +111,18 @@ func (self *Controller) agentOpRaftList(m *channel.Message, ch channel.Channel) func (self *Controller) agentOpRaftAddPeer(m *channel.Message, ch channel.Channel) { if self.raftController == nil { - handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false) + handler_common.SendOpResult(m, ch, "cluster.add-peer", "controller not running in clustered mode", false) return } if !self.raftController.IsBootstrapped() { - handler_common.SendOpResult(m, ch, "raft.list", - "Local instance not initialized. If all instances are uninitialized, initialize one first using"+ - " 'ziti agent controller init' then add nodes to that instance. If you have existing initialized nodes,"+ - " use the 'ziti agent cluster add' command on an initialized node, rather than here.", - false) + self.agentOpRaftJoinCluster(m, ch) return } addr, found := m.GetStringHeader(AgentAddrHeader) if !found { - handler_common.SendOpResult(m, ch, "raft.join", "address not supplied", false) + handler_common.SendOpResult(m, ch, "cluster.add-peer", "address not supplied", false) return } @@ -135,7 +131,7 @@ func (self *Controller) agentOpRaftAddPeer(m *channel.Message, ch channel.Channe peerId, peerAddr, err := self.raftController.Mesh.GetPeerInfo(addr, 15*time.Second) if err != nil { errMsg := fmt.Sprintf("id not supplied and unable to retrieve [%v]", err.Error()) - handler_common.SendOpResult(m, ch, "raft.join", errMsg, false) + handler_common.SendOpResult(m, ch, "cluster.add-peer", errMsg, false) return } id = string(peerId) @@ -160,9 +156,47 @@ func (self *Controller) agentOpRaftAddPeer(m *channel.Message, ch channel.Channe handler_common.SendOpResult(m, ch, "cluster.add-peer", fmt.Sprintf("success, added %v at %v to cluster", id, addr), true) } +func (self *Controller) agentOpRaftJoinCluster(m *channel.Message, ch channel.Channel) { + if self.raftController == nil { + handler_common.SendOpResult(m, ch, "cluster.join", "controller not running in clustered mode", false) + return + } + + if self.raftController.IsBootstrapped() { + handler_common.SendOpResult(m, ch, "cluster.join", + "Local instance is already initialized. Only uninitialized nodes may be joined to a cluster. ", + false) + return + } + + addr, found := m.GetStringHeader(AgentAddrHeader) + if !found { + handler_common.SendOpResult(m, ch, "cluster.join", "address not supplied", false) + return + } + + isVoter, found := m.GetBoolHeader(AgentIsVoterHeader) + if !found { + isVoter = true + } + + req := &cmd_pb.AddPeerRequest{ + Addr: self.raftController.Config.AdvertiseAddress.String(), + Id: self.config.Id.Token, + IsVoter: isVoter, + } + + if err := self.raftController.ForwardToAddr(addr, req); err != nil { + handler_common.SendOpResult(m, ch, "cluster.join", err.Error(), false) + return + } + + handler_common.SendOpResult(m, ch, "cluster.join", "success, added self to cluster", true) +} + func (self *Controller) agentOpRaftRemovePeer(m *channel.Message, ch channel.Channel) { if self.raftController == nil { - handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false) + handler_common.SendOpResult(m, ch, "cluster.remove-peer", "controller not running in clustered mode", false) return } @@ -185,7 +219,7 @@ func (self *Controller) agentOpRaftRemovePeer(m *channel.Message, ch channel.Cha func (self *Controller) agentOpRaftTransferLeadership(m *channel.Message, ch channel.Channel) { if self.raftController == nil { - handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false) + handler_common.SendOpResult(m, ch, "cluster.transfer-leadership", "controller not running in clustered mode", false) return } @@ -203,26 +237,26 @@ func (self *Controller) agentOpRaftTransferLeadership(m *channel.Message, ch cha func (self *Controller) agentOpInitFromDb(m *channel.Message, ch channel.Channel) { if self.raftController == nil { - handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false) + handler_common.SendOpResult(m, ch, "cluster.init-from-db", "controller not running in clustered mode", false) return } sourceDbPath := string(m.Body) if len(sourceDbPath) == 0 { - handler_common.SendOpResult(m, ch, "raft.initFromDb", "source db not supplied", false) + handler_common.SendOpResult(m, ch, "cluster.init-from-db", "source db not supplied", false) return } if err := self.InitializeRaftFromBoltDb(sourceDbPath); err != nil { - handler_common.SendOpResult(m, ch, "raft.initFromDb", err.Error(), false) + handler_common.SendOpResult(m, ch, "cluster.init-from-db", err.Error(), false) return } - handler_common.SendOpResult(m, ch, "raft.initFromDb", fmt.Sprintf("success, initialized from [%v]", sourceDbPath), true) + handler_common.SendOpResult(m, ch, "cluster.init-from-db", fmt.Sprintf("success, initialized from [%v]", sourceDbPath), true) } func (self *Controller) agentOpInit(m *channel.Message, ch channel.Channel) { if self.raftController == nil { - handler_common.SendOpResult(m, ch, "raft.list", "controller not running in clustered mode", false) + handler_common.SendOpResult(m, ch, "init.edge", "controller not running in clustered mode", false) return } diff --git a/controller/handler_peer_ctrl/add_peer.go b/controller/handler_peer_ctrl/add_peer.go index 29bfd75c1..fe8c80f84 100644 --- a/controller/handler_peer_ctrl/add_peer.go +++ b/controller/handler_peer_ctrl/add_peer.go @@ -58,6 +58,11 @@ func (self *addPeerHandler) handleAddPeer(m *channel.Message, ch channel.Channel log.Infof("received join request id: %v, addr: %v, voter: %v", req.Id, req.Addr, !req.IsVoter) + if !self.controller.IsBootstrapped() { + sendErrorResponse(m, ch, errors.New("node not member of bootstrapped cluster, unable to add peers"), peermsg.ErrorCodeGeneric) + return + } + if err := self.controller.HandleAddPeer(req); err != nil { if errors.Is(err, raft2.ErrNotLeader) { sendErrorResponse(m, ch, err, peermsg.ErrorCodeNotLeader) diff --git a/controller/raft/member.go b/controller/raft/member.go index d681009c1..cfd8d5730 100644 --- a/controller/raft/member.go +++ b/controller/raft/member.go @@ -213,7 +213,11 @@ func (self *Controller) forwardToLeader(req protobufs.TypedMessage) error { return errors.New("no leader, unable to forward request") } - peer, err := self.GetMesh().GetOrConnectPeer(self.GetLeaderAddr(), 5*time.Second) + return self.ForwardToAddr(leader, req) +} + +func (self *Controller) ForwardToAddr(addr string, req protobufs.TypedMessage) error { + peer, err := self.GetMesh().GetOrConnectPeer(addr, 5*time.Second) if err != nil { return err } diff --git a/controller/raft/raft.go b/controller/raft/raft.go index b1cc4fb68..443cce4ab 100644 --- a/controller/raft/raft.go +++ b/controller/raft/raft.go @@ -783,7 +783,7 @@ func (self *Controller) retryBootstrapMember(bootstrapMember string) { } if err = self.Join(req); err == nil { - pfxlog.Logger().WithError(err).Errorf("erroring adding bootstrap member [%s], stopping attempts to join it to the cluster", bootstrapMember) + pfxlog.Logger().WithError(err).Errorf("error adding bootstrap member [%s], stopping attempts to join it to the cluster", bootstrapMember) return } }