diff --git a/embetcd/client.go b/embetcd/client.go index fcaa099..98bacfb 100644 --- a/embetcd/client.go +++ b/embetcd/client.go @@ -84,30 +84,57 @@ func (c *Client) PutWithKeepAlive(ctx context.Context, key string, value string, return lease, keepAlive, cancel, err } +func (c *Client) arePeerURLSInCluster(ctx context.Context, apURLS []url.URL) (areIn bool, err error) { + var members *cli.MemberListResponse + members, err = c.MemberList(ctx) + + if err == nil && members != nil && members.Members != nil { + apStrings := URLSToStringSlice(apURLS) + + for _, member := range members.Members { + memberPURLs := member.GetPeerURLs() + for _, u := range apStrings { + if StringIsInStringSlice(u, memberPURLs) { + areIn = true + break + } + } + } + } + return areIn, err +} + // getServerPeers returns the peer urls for the cluster formatted for the initialCluster server configuration. // The context that is passed in should have a configured timeout. -func (c *Client) getServerPeers(ctx context.Context, initialCluster string) (peers string, err error) { +func (c *Client) getServerPeers(ctx context.Context, initialCluster string, serverName *string, apURLS []url.URL, dialTimeout *time.Duration) (peers string, err error) { var members *cli.MemberListResponse + var timeout context.Context + var cancel context.CancelFunc + defer CancelContext(cancel) + + apURLStrings := URLSToStringSlice(apURLS) for ctx.Err() == nil && (err == nil || err.Error() != etcdserver.ErrStopped.Error()) { // initialize peers with the supplied initial cluster string peers = initialCluster // get the list of members - members, err = c.MemberList(ctx) + timeout, cancel = context.WithTimeout(ctx, DurationOrDefault(dialTimeout, DefaultDialTimeout)) + members, err = c.MemberList(timeout) + cancel() if err == nil { // add members to the initial cluster for _, member := range members.Members { - // if there's at least one peer url add it to the initial cluster if pURLS := member.GetPeerURLs(); len(pURLS) > 0 { // peers should already have this server's address so we can safely append ",%s=%s" for _, url := range member.GetPeerURLs() { - peers = fmt.Sprintf("%s,%s=%s", peers, member.Name, url) + if !StringIsInStringSlice(url, apURLStrings) { + peers = fmt.Sprintf("%s,%s=%s", peers, member.Name, url) + } } } - } break } @@ -126,6 +153,7 @@ func (c *Client) serverNameConflicts(ctx context.Context, name string) (conflict if member.Name == name { conflicts = true err = ErrNameConflict + break } } } @@ -148,36 +176,6 @@ func (c *Client) clusterName(ctx context.Context) (name string, err error) { return name, err } -// addMemberToExistingCluster informs an etcd cluster that a server is about to be added to the cluster. The cluster -// can premptively reject this addition if it violates quorum -func (c *Client) addMemberToExistingCluster(ctx context.Context, serverName string, apURLs []url.URL) (err error) { - // loop while the context hasn't closed - var conflict bool - for ctx.Err() == nil && (err == nil || err.Error() != etcdserver.ErrStopped.Error()) { - - // Ensure that the server name does not already exist in the cluster. - // We want to ensure uniquely named cluster members. - // If this member died and is trying to rejoin, we want to retry until - // the cluster removes it or our parent context expires. - conflict, err = c.serverNameConflicts(ctx, serverName) - - if !conflict && err == nil { - - // add the member - _, err = c.MemberAdd(ctx, URLSToStringSlice(apURLs)) - - // break out of loop if we added ourselves cleanly - if err == nil { - break - } - } - - time.Sleep(time.Second * 1) - } - - return err -} - // NewClient returns a new etcd v3client wrapped with some helper functions func NewClient(cfg cli.Config) (client *Client, err error) { var etcdClient *cli.Client diff --git a/embetcd/common.go b/embetcd/common.go index b4da76e..2b99ec8 100644 --- a/embetcd/common.go +++ b/embetcd/common.go @@ -78,3 +78,14 @@ func RevokeLease(ctx context.Context, client *Client, lease *cli.LeaseGrantRespo client.Revoke(ctx, lease.ID) } } + +// StringIsInStringSlice returns true if the given string is in the slice of strings +func StringIsInStringSlice(s string, strs []string) (resp bool) { + for _, i := range strs { + if s == i { + resp = true + break + } + } + return +} diff --git a/embetcd/config.go b/embetcd/config.go index 6249c56..2516ac9 100644 --- a/embetcd/config.go +++ b/embetcd/config.go @@ -2,7 +2,6 @@ package embetcd import ( "context" - "crypto/tls" "time" cli "github.com/coreos/etcd/clientv3" @@ -31,7 +30,6 @@ func (c *Config) GetClientFromConfig(ctx context.Context) (*Client, error) { return NewClient(cli.Config{ Endpoints: c.InitialCluster, DialTimeout: DurationOrDefault(c.DialTimeout, DefaultDialTimeout), - TLS: &tls.Config{InsecureSkipVerify: true}, // insecure for now AutoSyncInterval: DurationOrDefault(c.AutoSyncInterval, DefaultAutoSyncInterval), Context: ctx, // pass in the context so the temp client closes with a cancelled context }) diff --git a/embetcd/server.go b/embetcd/server.go index 8cee5ac..d1ae97a 100644 --- a/embetcd/server.go +++ b/embetcd/server.go @@ -24,7 +24,7 @@ const ( // DefaultUnhealthyTTL is the grace period to wait before removing an unhealthy member DefaultUnhealthyTTL = time.Second * 15 // DefaultCleanUpInterval is the interval at which to poll for the health of the cluster - DefaultCleanUpInterval = time.Second * 15 + DefaultCleanUpInterval = time.Second * 10 // DefaultStartUpGracePeriod is the graceperiod to wait for new cluster members to startup // before they're subject to health checks DefaultStartUpGracePeriod = time.Second * 60 @@ -159,16 +159,9 @@ func (s *Server) IsRunning() bool { return s.isRunning() } -// prepare a new cluster -func (s *Server) prepareForNewCluster(ctx context.Context) (err error) { - s.config.Config.InitialCluster = s.config.InitialClusterFromName(s.config.Name) - return err -} - -// prepare for an existing cluster -func (s *Server) prepareForExistingCluster(ctx context.Context) (err error) { - // create a temporary client +func getClusterClientWithServerNamespace(ctx context.Context, cfg *Config) (*Client, error) { var tempcli *Client + var err error defer CloseClient(tempcli) // get an etcdclient to the cluster using the config file @@ -177,42 +170,127 @@ func (s *Server) prepareForExistingCluster(ctx context.Context) (err error) { CloseClient(tempcli) // create the client - tempcli, err = s.config.GetClientFromConfig(ctx) + tempcli, err = cfg.GetClientFromConfig(ctx) if err == nil { // set up the temp cli for the cluster namespace setupClusterNamespace(tempcli) break } } + return tempcli, err +} - // check for conflicting server names - if err == nil { - var clusterName string - if clusterName, err = tempcli.clusterName(ctx); err != nil || (clusterName != "" && clusterName != s.config.ClusterName) { - err = ErrClusterNameConflict +func (s *Server) startAsJoiner(ctx context.Context, cfg *Config, tempcli *Client) (err error) { + var timeout context.Context + var cancel context.CancelFunc + defer CancelContext(cancel) + + // continually try to prepare and start the server + for ctx.Err() == nil { + CloseServer(s) + + // remove old data directory + os.RemoveAll(cfg.Dir) + + // Ensure that the server name does not already exist in the cluster. + // We want to ensure uniquely named cluster members. + // If this member died and is trying to rejoin, we want to retry until + // the cluster removes it or our parent context expires. + timeout, cancel = context.WithTimeout(ctx, DurationOrDefault(cfg.DialTimeout, DefaultDialTimeout)) + _, err = tempcli.serverNameConflicts(timeout, cfg.Name) + cancel() + + // check for conflicts, get peer urls, and announce that we're joining the cluster + if err == nil { + serverName := cfg.Name + s.config.Config.InitialCluster, err = tempcli.getServerPeers(ctx, cfg.InitialClusterFromName(cfg.Name), &serverName, cfg.APUrls, cfg.DialTimeout) + } + + // Announce only once to the cluster that we're going to add this server. + // this offers some protection for errors while joining an existing one node cluster. + // If we announce this new node and then need to denounce it, we won't be able to because of quorum violations + // the cluster is configured to 2 members, but only 1 is started. Removing 1 member from a 2 node cluster + // breaks quorum. + if err == nil { + timeout, cancel = context.WithTimeout(ctx, DurationOrDefault(cfg.DialTimeout, DefaultDialTimeout)) + var isIn bool + isIn, err = tempcli.arePeerURLSInCluster(timeout, cfg.APUrls) + cancel() + if err == nil && !isIn { + timeout, cancel = context.WithTimeout(ctx, DurationOrDefault(cfg.DialTimeout, DefaultDialTimeout)) + _, err = tempcli.MemberAdd(timeout, URLSToStringSlice(cfg.APUrls)) + cancel() + } + } + + // start the server + if err == nil { + s.Etcd, err = embed.StartEtcd(cfg.Config) + } + + // wait for the server to be ready or error out + if err == nil && s.Etcd != nil { + err = WaitForStructChOrErrCh(ctx, s.Etcd.Server.ReadyNotify(), s.Etcd.Err()) + } + + // break the loop if successful + if err == nil { + break } } + return err +} + +// starts the etcd server and joins an existing cluster +func (s *Server) join(ctx context.Context, cfg *Config) (err error) { + // create a temporary client + var tempcli *Client + defer CloseClient(tempcli) + + tempcli, err = getClusterClientWithServerNamespace(ctx, cfg) - // get the peer address string for joining the cluster if err == nil { - s.config.Config.InitialCluster, err = tempcli.getServerPeers(ctx, s.config.InitialClusterFromName(s.config.Name)) + // check for conflicting cluster names + var clusterName string + if clusterName, err = tempcli.clusterName(ctx); err == nil && (clusterName != "" && clusterName != s.config.ClusterName) { + err = ErrClusterNameConflict + return + } } - // announce to the cluster that we're going to add this server + // start as a joiner if err == nil { - err = tempcli.addMemberToExistingCluster(ctx, s.config.Name, s.config.APUrls) + err = s.startAsJoiner(ctx, cfg, tempcli) } return err } -// prepare will either prepare the server to start a new cluster or join an existing cluster -func (s *Server) prepare(ctx context.Context) (err error) { - // prepare the server to start - if s.config.ClusterState == embed.ClusterStateFlagNew { - err = s.prepareForNewCluster(ctx) - } else { - err = s.prepareForExistingCluster(ctx) +// starts the etcd server as a seed node +func (s *Server) seed(ctx context.Context, cfg *Config) (err error) { + for ctx.Err() == nil { + CloseServer(s) + + // remove old data directory + os.RemoveAll(cfg.Dir) + + // set the initial cluster string + s.config.Config.InitialCluster = s.config.InitialClusterFromName(s.config.Name) + + // start the server + if err == nil { + s.Etcd, err = embed.StartEtcd(cfg.Config) + } + + // wait for the server to be ready or error out + if err == nil && s.Etcd != nil { + err = WaitForStructChOrErrCh(ctx, s.Etcd.Server.ReadyNotify(), s.Etcd.Err()) + } + + // break the loop if successful + if err == nil { + break + } } return err } @@ -228,27 +306,6 @@ func (s *Server) startupValidation(cfg *Config) error { return cfg.Validate() } -// start starts the etcd server after it has been prepared and config has been validated -// it will retry starting the etcd server until the context is cancelled -func (s *Server) start(ctx context.Context, cfg *Config) (err error) { - // retry starting the etcd server until it succeeds - for ctx.Err() == nil { - CloseServer(s) - - // remove the data dir because we require each server to be completely removed - // from the cluster before we can rejoin - // TODO: if we ever use snapshotting or want to restore a cluster this will need to be revised - os.RemoveAll(cfg.Dir) - - // create a context for this server - s.Etcd, err = embed.StartEtcd(cfg.Config) - if err == nil { - break - } - } - return err -} - // Start starts the server with the given config func (s *Server) Start(ctx context.Context, cfg *Config) (err error) { s.mutex.Lock() @@ -262,17 +319,11 @@ func (s *Server) Start(ctx context.Context, cfg *Config) (err error) { // save the config to the server for reference s.config = cfg - // prepare the server to either start as a new cluster or join an existing cluster - err = s.prepare(ctx) - - // start the server - if err == nil { - err = s.start(ctx, cfg) - } - - // wait for the server to be ready or error out - if err == nil && s.Etcd != nil { - err = WaitForStructChOrErrCh(ctx, s.Etcd.Server.ReadyNotify(), s.Etcd.Err()) + // start the etcd server as either a seeder or a joiner + if s.config.ClusterState == embed.ClusterStateFlagNew { + err = s.seed(ctx, cfg) + } else { + err = s.join(ctx, cfg) } // set the cluster name now that the cluster has started without error