Skip to content

Commit

Permalink
[API change] extend join-cluster; set primary with force
Browse files Browse the repository at this point in the history
* extend join-cluster API: add node flags
  - "renewing" a node in cluster map now will take into account flags as
    well
* deprecate 'config.proxy.non-electable'
* remove query 'non-electable'; simplify internal join* methods
* docs: update cli/cluster.md
* part four, prev. commit: 6e12209

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 4, 2024
1 parent 6e12209 commit 5cec026
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 65 deletions.
30 changes: 19 additions & 11 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1854,21 +1854,29 @@ func (h *htrun) extractRevokedTokenList(payload msPayload, caller string) (*toke
// - if these fails we try the candidates provided by the caller.
//
// ================================== Background =========================================
func (h *htrun) join(query url.Values, htext htext, contactURLs ...string) (res *callResult, err error) {
func (h *htrun) join(htext htext, contactURLs ...string) (res *callResult, err error) {
var (
config = cmn.GCO.Get()
_, primaryURL, psi = h._primus(nil, config)
)
if psi != nil && psi.ID() == h.SID() {
debug.Assert(h.si.IsProxy())
return nil, fmt.Errorf("%s (self) - not joining, am primary [%q]", h, primaryURL) // (unlikely)
}

var (
config = cmn.GCO.Get()
candidates = make([]string, 0, 4+len(contactURLs))
selfPublicURL, pubValid = cos.ParseURL(h.si.URL(cmn.NetPublic))
selfIntraURL, intraValid = cos.ParseURL(h.si.URL(cmn.NetIntraControl))
resPrev *callResult
)
debug.Assert(pubValid && intraValid)
debug.Assertf(pubValid && intraValid, "%q (%t), %q (%t)", selfPublicURL, pubValid, selfIntraURL, intraValid)

// env goes first
// env first
if daemon.EP != "" {
candidates = _addCan(daemon.EP, selfPublicURL.Host, selfIntraURL.Host, candidates)
}
_, primaryURL, psi := h._primus(nil, config)

candidates = _addCan(primaryURL, selfPublicURL.Host, selfIntraURL.Host, candidates)
if psi != nil {
candidates = _addCan(psi.URL(cmn.NetPublic), selfPublicURL.Host, selfIntraURL.Host, candidates)
Expand All @@ -1890,7 +1898,7 @@ func (h *htrun) join(query url.Values, htext htext, contactURLs ...string) (res
freeCR(resPrev)
resPrev = nil //nolint:ineffassign // readability
}
res = h.regTo(candidateURL, nil, apc.DefaultTimeout, query, htext, false /*keepalive*/)
res = h.regTo(candidateURL, nil, apc.DefaultTimeout, htext, false /*keepalive*/)
if res.err == nil {
nlog.Infoln(h.String()+": primary responded Ok via", candidateURL)
return // ok
Expand Down Expand Up @@ -1923,7 +1931,7 @@ func (h *htrun) join(query url.Values, htext htext, contactURLs ...string) (res
if nlog.Stopping() {
return res, h.errStopping()
}
res = h.regTo(primaryURL, nil, apc.DefaultTimeout, query, htext, false /*keepalive*/)
res = h.regTo(primaryURL, nil, apc.DefaultTimeout, htext, false /*keepalive*/)
if res.err == nil {
nlog.Infoln(h.String()+": joined cluster via", primaryURL)
}
Expand All @@ -1940,7 +1948,7 @@ func _addCan(url, selfPub, selfCtrl string, candidates []string) []string {
return append(candidates, url)
}

func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, q url.Values, htext htext, keepalive bool) *callResult {
func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, htext htext, keepalive bool) *callResult {
var (
path string
skipPrxKalive = h.si.IsProxy() || keepalive
Expand Down Expand Up @@ -1970,7 +1978,7 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, q url.Val
cargs := allocCargs()
{
cargs.si = psi
cargs.req = cmn.HreqArgs{Method: http.MethodPost, Base: url, Path: path, Query: q, Body: cos.MustMarshal(cm)}
cargs.req = cmn.HreqArgs{Method: http.MethodPost, Base: url, Path: path, Body: cos.MustMarshal(cm)}
cargs.timeout = tout
}
smap := cm.Smap
Expand Down Expand Up @@ -2019,7 +2027,7 @@ func (h *htrun) slowKalive(smap *smapX, htext htext, timeout time.Duration) (str
}
pid, primaryURL, psi := h._primus(smap, nil)

res := h.regTo(primaryURL, psi, timeout, nil, htext, true /*keepalive*/)
res := h.regTo(primaryURL, psi, timeout, htext, true /*keepalive*/)
if res.err == nil {
freeCR(res)
return pid, 0, nil
Expand All @@ -2042,7 +2050,7 @@ func (h *htrun) slowKalive(smap *smapX, htext htext, timeout time.Duration) (str
nlog.Warningln("retrying via pub addr", primaryURL, "[", s, pid, "]")

freeCR(res)
res = h.regTo(primaryURL, psi, timeout, nil, htext, true /*keepalive*/)
res = h.regTo(primaryURL, psi, timeout, htext, true /*keepalive*/)
}

status, err := res.status, res.err
Expand Down
9 changes: 1 addition & 8 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,7 @@ func (p *proxy) Run() error {
}

func (p *proxy) joinCluster(action string, primaryURLs ...string) (status int, err error) {
var query url.Values
if smap := p.owner.smap.get(); smap.isPrimary(p.si) {
return 0, fmt.Errorf("%s should not be joining: is primary, %s", p, smap.StringEx())
}
if cmn.GCO.Get().Proxy.NonElectable {
query = url.Values{apc.QparamNonElectable: []string{"true"}}
}
res, err := p.join(query, nil /*htext*/, primaryURLs...)
res, err := p.join(nil /*htext*/, primaryURLs...)
if err != nil {
return status, err
}
Expand Down
49 changes: 29 additions & 20 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) {
return
}
nsi = regReq.SI
case apc.AdminJoin: // administrative join
case apc.AdminJoin: // (administrative join)
if err := p.checkAccess(w, r, nil, apc.AceAdmin); err != nil {
return
}
Expand All @@ -410,7 +410,7 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) {
}
// NOTE: node ID and 3-networks configuration is obtained from the node itself
*nsi = *si
case apc.SelfJoin: // auto-join at node startup
case apc.SelfJoin: // (auto-join at node startup)
if cmn.ReadJSON(w, r, &regReq) != nil {
return
}
Expand Down Expand Up @@ -456,15 +456,6 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) {
return
}
}
var (
nonElectable bool
)
if nsi.IsProxy() {
s := r.URL.Query().Get(apc.QparamNonElectable)
if nonElectable, err = cos.ParseBool(s); err != nil {
nlog.Errorf("%s: failed to parse %s for non-electability: %v", p, s, err)
}
}
if _, err := cmn.ParseHost2IP(nsi.PubNet.Hostname); err != nil {
p.writeErrf(w, r, "%s: failed to %s %s: invalid hostname: %v", p.si, apiOp, nsi.StringEx(), err)
return
Expand All @@ -474,8 +465,24 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) {
if osi := smap.GetNode(nsi.ID()); osi != nil {
nsi.Flags = osi.Flags
}
if nonElectable {
nsi.Flags = nsi.Flags.Set(meta.SnodeNonElectable)
if s := r.Header.Get(apc.HdrNodeFlags); s != "" {
fl, err := strconv.ParseUint(s, 10, 64)
if err != nil {
p.writeErrf(w, r, "%s joining %s: failed to parse %s: %v", p, nsi, apc.HdrNodeFlags, err)
return
}
flags := cos.BitFlags(fl)
if flags != 0 {
nsi.Flags = nsi.Flags.Set(meta.SnodeNonElectable)
// [NOTE]
// - limiting support to 'non-electability'
// - rest upon demand, including resetting non-electable -> electable
if !nsi.IsProxy() || flags != meta.SnodeNonElectable {
p.writeErrf(w, r, "%s joining %s: expecting only 'non-electable' bit (and only proxies), got %s=%s",
p, nsi, apc.HdrNodeFlags, nsi.Fl2S())
return
}
}
}

// handshake | check dup
Expand Down Expand Up @@ -542,6 +549,7 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) {
return
}

// go ahead to join
nlog.Infof("%s: %s(%q) %s (%s)", p, apiOp, action, nsi.StringEx(), regReq.Smap)

if apiOp == apc.AdminJoin {
Expand Down Expand Up @@ -723,13 +731,13 @@ func (p *proxy) rereg(nsi, osi *meta.Snode) bool {
if !p.NodeStarted() {
return true
}
if osi.Eq(nsi) {
nlog.Infoln(p.String()+":", nsi.StringEx(), "is already _in_")
if osi.Eq(nsi) && osi.Flags == nsi.Flags {
nlog.Infoln(p.String(), "node", nsi.StringEx(), "is already _in_ - nothing to do")
return false
}

// NOTE: see also ref0417 (ais/earlystart)
nlog.Warningln(p.String()+":", "renewing", nsi.StringEx(), "=>", nsi.StrURLs())
// NOTE: also ref0417 (ais/earlystart)
nlog.Warningf("%s: renewing %s(flags %s) => %s(flags %s)", p, osi.StringEx(), osi.Fl2S(), nsi.StringEx(), nsi.Fl2S())
return true
}

Expand Down Expand Up @@ -2183,7 +2191,7 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
return
}

// 6. prepare phase whereby all members health-ping(npsi)
// 6. prepare phase whereby all members health-ping => npsi
bargs := allocBcArgs()
{
aimsg := p.newAmsgActVal(apc.ActPrimaryForce, newSmap)
Expand Down Expand Up @@ -2228,13 +2236,13 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
}

p.owner.smap.put(newSmap)
res = p.regTo(joinURL, npsi, apc.DefaultTimeout, nil, nil, false /*keepalive*/)
res = p.regTo(joinURL, npsi, apc.DefaultTimeout, nil, false /*keepalive*/)
e, eh := res.err, res.toErr()
freeCR(res)
if e != nil {
if joinURL != secondURL {
nlog.Warningln(res.toErr(), "- 2nd attempt via", secondURL)
res = p.regTo(secondURL, npsi, apc.DefaultTimeout, nil, nil, false)
res = p.regTo(secondURL, npsi, apc.DefaultTimeout, nil, false)
eh = res.toErr()
freeCR(res)
}
Expand All @@ -2245,6 +2253,7 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
nlog.Errorf("FATAL: nested config-update error when rolling back [%s %s to %s]: %v",
act, p, npname, nested) // (unlikely)
}
p.owner.smap.put(cm.Smap)
bmdOwnerPrx.put(cm.BMD)
wg := p.metasyncer.sync(revsPair{cm.Smap, aimsg}, revsPair{cm.BMD, aimsg}, revsPair{cm.Config, aimsg}, revsPair{cm.RMD, aimsg})
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion ais/tgtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type delb struct {
}

func (t *target) joinCluster(action string, primaryURLs ...string) (status int, err error) {
res, err := t.join(nil, t, primaryURLs...)
res, err := t.join(t /*htext*/, primaryURLs...)
if err != nil {
return status, err
}
Expand Down
2 changes: 2 additions & 0 deletions api/apc/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ const (
// uptimes, respectively
HdrNodeUptime = aisPrefix + "Node-Uptime"
HdrClusterUptime = aisPrefix + "Cluster-Uptime"

HdrNodeFlags = aisPrefix + "Node-Flags"
)

// Custom S3 headers
Expand Down
1 change: 0 additions & 1 deletion api/apc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ const (
QparamProxyID = "pid" // ID of the redirecting proxy.
QparamPrimaryCandidate = "can" // candidate for the primary proxy (voting ID, force URL)
QparamPrepare = "prp" // 2-phase commit where 'true' corresponds to 'begin'; usage: (primary election; set-primary)
QparamNonElectable = "nel" // true: proxy is non-electable for the primary role
QparamUnixTime = "utm" // Unix time since 01/01/70 UTC (nanoseconds)
QparamIsGFNRequest = "gfn" // true if the request is a Get-From-Neighbor
QparamRebStatus = "rbs" // true: get detailed rebalancing status
Expand Down
6 changes: 5 additions & 1 deletion api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
Expand Down Expand Up @@ -184,14 +185,17 @@ func GetConfiguredBackends(bp BaseParams) (out []string, err error) {
}

// JoinCluster add a node to a cluster.
func JoinCluster(bp BaseParams, nodeInfo *meta.Snode) (rebID, sid string, err error) {
func JoinCluster(bp BaseParams, nodeInfo *meta.Snode, flags cos.BitFlags) (rebID, sid string, err error) {
bp.Method = http.MethodPost
reqParams := AllocRp()
{
reqParams.BaseParams = bp
reqParams.Path = apc.URLPathCluUserReg.S
reqParams.Body = cos.MustMarshal(nodeInfo)
reqParams.Header = http.Header{cos.HdrContentType: []string{cos.ContentJSON}}
if flags != 0 {
reqParams.Header.Set(apc.HdrNodeFlags, strconv.FormatUint(uint64(flags), 10))
}
}

var info apc.JoinNodeResult
Expand Down
12 changes: 11 additions & 1 deletion cmd/cli/cli/cluster_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/NVIDIA/aistore/api"
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/xact"
"github.com/urfave/cli"
Expand Down Expand Up @@ -50,6 +51,7 @@ var (
},
cmdJoin: {
roleFlag,
nonElectableFlag,
},
cmdStartMaint: {
noRebalanceFlag,
Expand Down Expand Up @@ -357,7 +359,15 @@ func joinNodeHandler(c *cli.Context) (err error) {
// for the primary to perform initial handshake, validation, and the rest of it (NOTE: control-net)
ControlNet: netInfo,
}
if rebID, nodeInfo.DaeID, err = api.JoinCluster(apiBP, nodeInfo); err != nil {

var flags cos.BitFlags
if flagIsSet(c, nonElectableFlag) {
if daemonType == apc.Target {
return fmt.Errorf("option %s does not apply - targets are non-electable", qflprn(nonElectableFlag))
}
flags = meta.SnodeNonElectable
}
if rebID, nodeInfo.DaeID, err = api.JoinCluster(apiBP, nodeInfo, flags); err != nil {
return
}

Expand Down
4 changes: 4 additions & 0 deletions cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,10 @@ var (
Name: "role", Required: true,
Usage: "role of this AIS daemon: proxy or target",
}
nonElectableFlag = cli.BoolFlag{
Name: "non-electable",
Usage: "this proxy must not be elected as primary (advanced use)",
}
noRebalanceFlag = cli.BoolFlag{
Name: "no-rebalance",
Usage: "do _not_ run global rebalance after putting node in maintenance (caution: advanced usage only!)",
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli
go 1.23.2

require (
github.com/NVIDIA/aistore v1.3.26-0.20241101003456-acae3f044257
github.com/NVIDIA/aistore v1.3.26-0.20241104174155-c8fb2e54f015
github.com/fatih/color v1.17.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo/v2 v2.20.2
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/NVIDIA/aistore v1.3.26-0.20241101003456-acae3f044257 h1:eoj+3YSO3wUasLALcPng6EwHHRDnF7QmdLBcNjotuYk=
github.com/NVIDIA/aistore v1.3.26-0.20241101003456-acae3f044257/go.mod h1:IGdDyXEbwtj194tZukn6ptpn8ldL2pApqfOSIyTQyw4=
github.com/NVIDIA/aistore v1.3.26-0.20241104174155-c8fb2e54f015 h1:xh9EglFymBCbVtfcu99BENTKynxQe5nTMeWHblJ9TQM=
github.com/NVIDIA/aistore v1.3.26-0.20241104174155-c8fb2e54f015/go.mod h1:IGdDyXEbwtj194tZukn6ptpn8ldL2pApqfOSIyTQyw4=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
7 changes: 3 additions & 4 deletions cmn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ type (
SamplerProbablity float64 `json:"-"`
}

// NOTE: Updating TracingConfig requires daemon restart.
// NOTE: Updating TracingConfig requires restart.
TracingConfToSet struct {
ExporterEndpoint *string `json:"exporter_endpoint,omitempty"` // gRPC exporter endpoint
ExporterAuth *TraceExporterAuthConfToSet `json:"exporter_auth,omitempty"` // exporter auth config
Expand All @@ -283,7 +283,7 @@ type (
TokenFile *string `json:"token_file,omitempty"` // filepath from where auth token can be obtained
}

// NOTE: StatsTime is a one important timer
// NOTE: StatsTime is one important timer - a pulse
PeriodConf struct {
StatsTime cos.Duration `json:"stats_time"` // collect and publish stats; other house-keeping
RetrySyncTime cos.Duration `json:"retry_sync_time"` // metasync retry
Expand Down Expand Up @@ -334,13 +334,12 @@ type (
PrimaryURL string `json:"primary_url"`
OriginalURL string `json:"original_url"`
DiscoveryURL string `json:"discovery_url"`
NonElectable bool `json:"non_electable"`
NonElectable bool `json:"non_electable"` // NOTE: deprecated, not used
}
ProxyConfToSet struct {
PrimaryURL *string `json:"primary_url,omitempty"`
OriginalURL *string `json:"original_url,omitempty"`
DiscoveryURL *string `json:"discovery_url,omitempty"`
NonElectable *bool `json:"non_electable,omitempty"`
}

SpaceConf struct {
Expand Down
Loading

0 comments on commit 5cec026

Please sign in to comment.