From 199af0726661341c95936a60d81c1ead7f0de5cb Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Thu, 7 Nov 2024 13:36:44 -0500 Subject: [PATCH] set primary with force (major) * revise as 2PC (prepare=true/false) sequence - TODO: make it begin-(abort|commit) * consolidate all related logic in ais/psetforce * add put(BMD) interface * tests: join (3+3) cluster => (6+6) cluster * with substantial refactoring * part seven, prev. commit: b72a4ec019353 Signed-off-by: Alex Aizman --- ais/bucketmeta.go | 3 +- ais/htcommon.go | 1 + ais/htrun.go | 94 ++--- ais/metasync.go | 3 +- ais/proxy.go | 134 +------ ais/prxauth.go | 2 +- ais/prxclu.go | 405 ++----------------- ais/psetforce.go | 689 +++++++++++++++++++++++++++++++++ ais/rebmeta.go | 13 + ais/target.go | 6 +- ais/test/scripts/force-join.sh | 11 +- ais/tgtbck.go | 4 +- ais/tgtcp.go | 8 +- 13 files changed, 793 insertions(+), 580 deletions(-) create mode 100644 ais/psetforce.go diff --git a/ais/bucketmeta.go b/ais/bucketmeta.go index 8937ab8611..bd426d7fec 100644 --- a/ais/bucketmeta.go +++ b/ais/bucketmeta.go @@ -38,7 +38,7 @@ import ( // - bucketMD versioning is monotonic and incremental // // - bucketMD typical update transaction: -// lock -- clone() -- modify the clone -- bmdOwner.put(clone) -- unlock +// lock -- clone() -- modify the clone -- bmdOwner.put(clone) -- unlock // // (*) for merges and conflict resolution, check the current version prior to put() // (note that version check must be protected by the same critical section) @@ -59,6 +59,7 @@ type ( init() bool // true when loaded previous version get() (bmd *bucketMD) + put(bmd *bucketMD) putPersist(bmd *bucketMD, payload msPayload) error persist(clone *bucketMD, payload msPayload) error modify(*bmdModifier) (*bucketMD, error) diff --git a/ais/htcommon.go b/ais/htcommon.go index 49f19a412c..255ada5815 100644 --- a/ais/htcommon.go +++ b/ais/htcommon.go @@ -231,6 +231,7 @@ var ( errForwarded = errors.New("forwarded") errSendingResp = errors.New("err-sending-resp") errFastKalive = errors.New("cannot fast-keepalive") + errIntraControl = errors.New("expected intra-control request") ) // BMD uuid errs diff --git a/ais/htrun.go b/ais/htrun.go index 751f4b3a5d..7102e481c0 100644 --- a/ais/htrun.go +++ b/ais/htrun.go @@ -1542,7 +1542,9 @@ func logmsync(lver int64, revs revs, msg *aisMsg, opts ...string) { // caller [, var ( what string caller = opts[0] + uuid = revs.uuid() lv = "v" + strconv.FormatInt(lver, 10) + luuid string ) switch len(opts) { case 1: @@ -1553,7 +1555,7 @@ func logmsync(lver int64, revs revs, msg *aisMsg, opts ...string) { // caller [, case 2: what = opts[1] if strings.IndexByte(what, '[') < 0 { - if uuid := revs.uuid(); uuid != "" { + if uuid != "" { what += "[" + uuid + "]" } } @@ -1561,19 +1563,23 @@ func logmsync(lver int64, revs revs, msg *aisMsg, opts ...string) { // caller [, what = opts[1] luuid := opts[2] lv += "[" + luuid + "]" - - if uuid := revs.uuid(); luuid != "" && uuid != luuid { - // versions incomparable (see apc.ActPrimaryForce) - nlog.InfoDepth(1, "Warning", tag, what, "( different cluster", lv, msg.String(), "<--", caller, ")") - return - } + } + // different uuids (clusters) - versions cannot be compared + if luuid != "" && uuid != "" && uuid != luuid { + nlog.InfoDepth(1, "Warning", tag, what, "( different cluster", lv, msg.String(), "<--", caller, msg.String(), ")") + return } + // compare l(ocal) and newly received switch { case lver == revs.version(): - nlog.InfoDepth(1, tag, what, "( same", lv, msg.String(), "<--", caller, ")") + s := "( same" + if lver == 0 { + s = "( initial" + } + nlog.InfoDepth(1, tag, what, s, lv, msg.String(), "<--", caller, ")") case lver > revs.version(): - nlog.InfoDepth(1, "Warning", tag, what, "( down from", lv, msg.String(), "<--", caller, ")") + nlog.InfoDepth(1, "Warning", tag, what, "( down from", lv, msg.String(), "<--", caller, msg.String(), ")") default: nlog.InfoDepth(1, tag, "new", what, "( have", lv, msg.String(), "<--", caller, ")") } @@ -1657,7 +1663,7 @@ func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation boo return } } - if skipValidation { + if skipValidation || (msg.Action == apc.ActPrimaryForce && newSmap.isValid()) { return } @@ -1723,15 +1729,17 @@ func (h *htrun) extractRMD(payload msPayload, caller string) (newRMD *rebMD, msg } rmd := h.owner.rmd.get() + logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID) + + if msg.Action == apc.ActPrimaryForce { + return + } + if newRMD.CluID != "" && newRMD.CluID != rmd.CluID && rmd.CluID != "" { - logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID) err = h.owner.rmd.newClusterIntegrityErr(h.String(), newRMD.CluID, rmd.CluID, rmd.Version) cos.ExitLog(err) // FATAL } - if cmn.Rom.FastV(4, cos.SmoduleAIS) { - logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID) - } if newRMD.version() <= rmd.version() && msg.Action != apc.ActPrimaryForce { if newRMD.version() < rmd.version() { err = newErrDowngrade(h.si, rmd.String(), newRMD.String()) @@ -1782,7 +1790,7 @@ func (h *htrun) receiveSmap(newSmap *smapX, msg *aisMsg, payload msPayload, call smap := h.owner.smap.get() logmsync(smap.Version, newSmap, msg, caller, newSmap.StringEx(), smap.UUID) - if !newSmap.isPresent(h.si) && msg.Action != apc.ActPrimaryForce { + if !newSmap.isPresent(h.si) { return &errSelfNotFound{act: "receive-smap", si: h.si, tag: "new", smap: newSmap} } return h.owner.smap.synchronize(h.si, newSmap, payload, cb) @@ -1989,13 +1997,14 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, htext hte path string skipPrxKalive = h.si.IsProxy() || keepalive opts = cmetaFillOpt{ - htext: htext, - skipSmap: skipPrxKalive, // when targets self- or admin-join - skipBMD: skipPrxKalive, // ditto - skipRMD: true, // NOTE: not used yet - skipConfig: true, // ditto - skipEtlMD: true, // ditto - fillRebMarker: !keepalive, + htext: htext, + skipSmap: skipPrxKalive, // when targets self- or admin-join + skipBMD: skipPrxKalive, // ditto + skipRMD: true, // NOTE: not used yet + skipConfig: true, // ditto + skipEtlMD: true, // ditto + + fillRebMarker: !keepalive && htext != nil, // TODO -- FIXME skipPrimeTime: true, } ) @@ -2228,7 +2237,7 @@ func (h *htrun) externalWD(w http.ResponseWriter, r *http.Request) (responded bo // intra-cluster health ping // - pub addr permitted (see reqHealth) // - compare w/ h.ensureIntraControl - err := h.isIntraCall(r.Header, false /* from primary */) + err := h.checkIntraCall(r.Header, false /* from primary */) if err != nil { h.writeErr(w, r, err) responded = true @@ -2236,40 +2245,11 @@ func (h *htrun) externalWD(w http.ResponseWriter, r *http.Request) (responded bo return } -// (primary forceJoin() calling) -func (h *htrun) prepForceJoin(w http.ResponseWriter, r *http.Request) { - const tag = "prep-force-join" - q := r.URL.Query() - if !cos.IsParseBool(q.Get(apc.QparamPrepare)) { - err := errors.New(tag + ": expecting '" + apc.QparamPrepare + "=true' query") - h.writeErr(w, r, err) - return - } - msg, err := h.readAisMsg(w, r) - if err != nil { - return - } - newSmap := &smapX{} - if err := cos.MorphMarshal(msg.Value, &newSmap); err != nil { - h.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, h.si, msg.Action, msg.Value, err) - return - } - var ( - npsi = newSmap.Primary - smap = h.owner.smap.get() - tout = cmn.Rom.CplaneOperation() - ) - if _, code, err := h.reqHealth(npsi, tout, nil, smap, true /*retry pub-addr*/); err != nil { - err = fmt.Errorf("%s: failed to reach %s [%v(%d)]", tag, npsi.StringEx(), err, code) - h.writeErr(w, r, err) - } -} - // // intra-cluster request validations and helpers // -func (h *htrun) isIntraCall(hdr http.Header, fromPrimary bool) (err error) { +func (h *htrun) checkIntraCall(hdr http.Header, fromPrimary bool) (err error) { debug.Assert(hdr != nil) var ( smap = h.owner.smap.get() @@ -2280,7 +2260,7 @@ func (h *htrun) isIntraCall(hdr http.Header, fromPrimary bool) (err error) { erP error ) if ok := callerID != "" && callerName != ""; !ok { - return fmt.Errorf("%s: expected %s request", h, cmn.NetIntraControl) + return errIntraControl } if !smap.isValid() { return @@ -2299,7 +2279,9 @@ func (h *htrun) isIntraCall(hdr http.Header, fromPrimary bool) (err error) { // we still trust the request when the sender's Smap is more current if callerVer > smap.version() { if h.ClusterStarted() { - nlog.Errorf("%s: %s < Smap(v%s) from %s - proceeding anyway...", h, smap, callerSver, callerName) + // (exception: setting primary w/ force) + warn := h.String() + ": local " + smap.String() + " is older than (caller's) " + callerName + " Smap v" + callerSver + nlog.ErrorDepth(1, warn, "- proceeding anyway...") } runtime.Gosched() return @@ -2316,7 +2298,7 @@ func (h *htrun) isIntraCall(hdr http.Header, fromPrimary bool) (err error) { } func (h *htrun) ensureIntraControl(w http.ResponseWriter, r *http.Request, onlyPrimary bool) (isIntra bool) { - err := h.isIntraCall(r.Header, onlyPrimary) + err := h.checkIntraCall(r.Header, onlyPrimary) if err != nil { h.writeErr(w, r, err) return diff --git a/ais/metasync.go b/ais/metasync.go index 2e58b66aaf..f2fe8e5af1 100644 --- a/ais/metasync.go +++ b/ais/metasync.go @@ -524,7 +524,8 @@ func (y *metasyncer) _pending() (pending meta.NodeMap, smap *smapX) { if !ok || v < revs.version() { inSync = false break - } else if v > revs.version() { + } + if v > revs.version() { // skip older versions (TODO: don't skip sending associated aisMsg) nlog.Errorf("v: %d; revs.version: %d", v, revs.version()) } diff --git a/ais/proxy.go b/ais/proxy.go index 032a652e29..7f7b2dd95d 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -2901,135 +2901,6 @@ func (p *proxy) smapFromURL(baseURL string) (smap *smapX, err error) { return } -func (p *proxy) daeSetPrimary(w http.ResponseWriter, r *http.Request) { - apiItems, err := p.parseURL(w, r, apc.URLPathDae.L, 2, false) - if err != nil { - return - } - var ( - proxyID = apiItems[1] - query = r.URL.Query() - force = cos.IsParseBool(query.Get(apc.QparamForce)) - ) - // force primary change - if force && apiItems[0] == apc.Proxy { - if smap := p.owner.smap.get(); !smap.isPrimary(p.si) { - p.writeErr(w, r, newErrNotPrimary(p.si, smap)) - return - } - p.forceJoin(w, r, proxyID, query) // TODO -- FIXME: test - return - } - prepare, err := cos.ParseBool(query.Get(apc.QparamPrepare)) - if err != nil { - p.writeErrf(w, r, "failed to parse URL query %q: %v", apc.QparamPrepare, err) - return - } - if p.owner.smap.get().isPrimary(p.si) { - p.writeErrf(w, r, "%s (self) is primary, expecting '/v1/cluster/...' when designating a new one", p) - return - } - if prepare { - var cluMeta cluMeta - if err := cmn.ReadJSON(w, r, &cluMeta); err != nil { - return - } - if err := p.recvCluMeta(&cluMeta, "set-primary", cluMeta.SI.String()); err != nil { - p.writeErrf(w, r, "%s: failed to receive clu-meta: %v", p, err) - return - } - } - - // self - if p.SID() == proxyID { - smap := p.owner.smap.get() - if smap.GetActiveNode(proxyID) == nil { - p.writeErrf(w, r, "%s: in maintenance or decommissioned", p) - return - } - if !prepare { - p.becomeNewPrimary("") - } - return - } - - // other - smap := p.owner.smap.get() - psi := smap.GetProxy(proxyID) - if psi == nil { - err := &errNodeNotFound{"cannot set new primary", proxyID, p.si, smap} - p.writeErr(w, r, err) - return - } - if prepare { - if cmn.Rom.FastV(4, cos.SmoduleAIS) { - nlog.Infoln("Preparation step: do nothing") - } - return - } - ctx := &smapModifier{pre: func(_ *smapModifier, clone *smapX) error { clone.Primary = psi; return nil }} - err = p.owner.smap.modify(ctx) - debug.AssertNoErr(err) -} - -func (p *proxy) becomeNewPrimary(proxyIDToRemove string) { - ctx := &smapModifier{ - pre: p._becomePre, - final: p._becomeFinal, - sid: proxyIDToRemove, - } - err := p.owner.smap.modify(ctx) - cos.AssertNoErr(err) -} - -func (p *proxy) _becomePre(ctx *smapModifier, clone *smapX) error { - if !clone.isPresent(p.si) { - cos.Assertf(false, "%s must always be present in the %s", p.si, clone.pp()) - } - if ctx.sid != "" && clone.GetNode(ctx.sid) != nil { - // decision is made: going ahead to remove - nlog.Infof("%s: removing failed primary %s", p, ctx.sid) - clone.delProxy(ctx.sid) - - // Remove reverse proxy entry for the node. - p.rproxy.nodes.Delete(ctx.sid) - } - - clone.Primary = clone.GetProxy(p.SID()) - clone.Version += 100 - clone.staffIC() - return nil -} - -func (p *proxy) _becomeFinal(ctx *smapModifier, clone *smapX) { - var ( - bmd = p.owner.bmd.get() - rmd = p.owner.rmd.get() - msg = p.newAmsgStr(apc.ActNewPrimary, bmd) - pairs = []revsPair{{clone, msg}, {bmd, msg}, {rmd, msg}} - ) - nlog.Infof("%s: distributing (%s, %s, %s) with newly elected primary (self)", p, clone, bmd, rmd) - config, err := p.ensureConfigURLs() - if err != nil { - nlog.Errorln(err) - } - if config != nil { - pairs = append(pairs, revsPair{config, msg}) - nlog.Infof("%s: plus %s", p, config) - } - etl := p.owner.etl.get() - if etl != nil && etl.version() > 0 { - pairs = append(pairs, revsPair{etl, msg}) - nlog.Infof("%s: plus %s", p, etl) - } - // metasync - debug.Assert(clone._sgl != nil) - _ = p.metasyncer.sync(pairs...) - - // synchronize IC tables - p.syncNewICOwners(ctx.smap, clone) -} - func (p *proxy) ensureConfigURLs() (config *globalConfig, err error) { config, err = p.owner.config.modify(&configModifier{pre: p._configURLs}) if err != nil { @@ -3303,6 +3174,11 @@ func (p *proxy) receiveRMD(newRMD *rebMD, msg *aisMsg, caller string) (err error rmd := p.owner.rmd.get() logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID) + if msg.Action == apc.ActPrimaryForce { + err = p.owner.rmd.synch(newRMD, false) + return err + } + p.owner.rmd.Lock() rmd = p.owner.rmd.get() if newRMD.version() <= rmd.version() { diff --git a/ais/prxauth.go b/ais/prxauth.go index 2ec2713168..f6f7304515 100644 --- a/ais/prxauth.go +++ b/ais/prxauth.go @@ -260,7 +260,7 @@ func (p *proxy) access(hdr http.Header, bck *meta.Bck, ace apc.AccessAttrs) (err tk *tok.Token bucket *cmn.Bck ) - if p.isIntraCall(hdr, false /*from primary*/) == nil { + if p.checkIntraCall(hdr, false /*from primary*/) == nil { return nil } if cmn.Rom.AuthEnabled() { // config.Auth.Enabled diff --git a/ais/prxclu.go b/ais/prxclu.go index 9a10fe8862..283240ae6b 100644 --- a/ais/prxclu.go +++ b/ais/prxclu.go @@ -1086,7 +1086,7 @@ func (p *proxy) cluputMsg(w http.ResponseWriter, r *http.Request) { // internal case apc.ActBumpMetasync: - p.bumpMsyncAll(w, r, msg) + p.msyncForceAll(w, r, msg) // fail default: @@ -1094,16 +1094,6 @@ func (p *proxy) cluputMsg(w http.ResponseWriter, r *http.Request) { } } -func (p *proxy) bumpMsyncAll(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) { - cm, err := p.cluMeta(cmetaFillOpt{}) - if err != nil { - p.writeErr(w, r, err) // (unlikely) - return - } - aimsg := p.newAmsgActVal(apc.ActPrimaryForce, msg) - cm.metasync(p, aimsg, false) -} - func (p *proxy) setCluCfgPersistent(w http.ResponseWriter, r *http.Request, toUpdate *cmn.ConfigToSet, msg *apc.ActMsg) { ctx := &configModifier{ pre: _setConfPre, @@ -1920,373 +1910,6 @@ func (p *proxy) _stopMaintRMD(ctx *smapModifier, clone *smapX) { ctx.rmdCtx = rmdCtx } -func (p *proxy) cluSetPrimary(w http.ResponseWriter, r *http.Request) { - apiItems, err := p.parseURL(w, r, apc.URLPathCluProxy.L, 1, false) - if err != nil { - return - } - - smap := p.owner.smap.get() - if err := smap.validate(); err != nil { - p.writeErr(w, r, err) - return - } - debug.Assert(smap.IsPrimary(p.si)) - npid := apiItems[0] - npsi := smap.GetProxy(npid) - - if npsi == nil { - // a) go with force - p._withForce(w, r, npid, smap) - return - } - - if npid == p.SID() { - debug.Assert(p.SID() == smap.Primary.ID()) // must be forwardCP-ed - nlog.Warningln(p.String(), "(self) is already primary, nothing to do") - return - } - - if err := _checkFlags(npsi); err != nil { - p.writeErr(w, r, err) - return - } - - // b) regular set-primary - if p.settingNewPrimary.CAS(false, true) { - p._setPrimary(w, r, npsi) - p.settingNewPrimary.Store(false) - } else { - nlog.Warningln("setting new primary in progress now...") - } -} - -func (p *proxy) _withForce(w http.ResponseWriter, r *http.Request, npid string, smap *smapX) { - query := r.URL.Query() - force := cos.IsParseBool(query.Get(apc.QparamForce)) - if !force { - err := &errNodeNotFound{msg: "set-primary failure:", id: npid, si: p.si, smap: smap} - p.writeErr(w, r, err, http.StatusNotFound) - return - } - if p.settingNewPrimary.CAS(false, true) { - p.forceJoin(w, r, npid, query) - p.settingNewPrimary.Store(false) - } else { - err := errors.New("setting new primary is in progress, cannot use force") - p.writeErr(w, r, err) - } -} - -func _checkFlags(npsi *meta.Snode) error { - if npsi.InMaintOrDecomm() { - s := "under maintenance" - if !npsi.InMaint() { - s = "being decommissioned" - } - return fmt.Errorf("%s cannot become a new primary as it is currently %s", npsi, s) - } - if npsi.Flags.IsSet(meta.SnodeNonElectable) { - return fmt.Errorf("%s is non-electable and cannot become a new primary", npsi) - } - return nil -} - -func (p *proxy) _setPrimary(w http.ResponseWriter, r *http.Request, npsi *meta.Snode) { - // - // (I.1) Prepare phase - inform other nodes. - // - urlPath := apc.URLPathDaeProxy.Join(npsi.ID()) - q := make(url.Values, 1) - q.Set(apc.QparamPrepare, "true") - args := allocBcArgs() - args.req = cmn.HreqArgs{Method: http.MethodPut, Path: urlPath, Query: q} - - cluMeta, errM := p.cluMeta(cmetaFillOpt{skipSmap: true, skipPrimeTime: true}) - if errM != nil { - p.writeErr(w, r, errM) - return - } - args.req.Body = cos.MustMarshal(cluMeta) - - npname := npsi.StringEx() - - args.to = core.AllNodes - results := p.bcastGroup(args) - freeBcArgs(args) - for _, res := range results { - if res.err == nil { - continue - } - err := res.errorf("node %s failed to set primary %s in the prepare phase (err: %v)", res.si, npname, res.err) - p.writeErr(w, r, err) - freeBcastRes(results) - return - } - freeBcastRes(results) - - // - // (I.2) Prepare phase - local changes. - // - err := p.owner.smap.modify(&smapModifier{pre: func(_ *smapModifier, clone *smapX) error { - clone.Primary = npsi - p.metasyncer.becomeNonPrimary() - return nil - }}) - debug.AssertNoErr(err) - - // - // (II) Commit phase. - // - q.Set(apc.QparamPrepare, "false") - args = allocBcArgs() - args.req = cmn.HreqArgs{Method: http.MethodPut, Path: urlPath, Query: q} - args.to = core.AllNodes - results = p.bcastGroup(args) - freeBcArgs(args) - for _, res := range results { - if res.err == nil { - continue - } - if res.si.ID() == npsi.ID() { - // FATAL - cos.ExitLogf("commit phase failure: new primary %s returned %v", npname, res.err) - } else { - nlog.Errorf("node %s failed to set new primary %s in the commit phase (err: %v)", res.si, npname, res.err) - } - } - freeBcastRes(results) -} - -// Force primary change (*****) -// 10-steps sequence that now supports merging two different clusters -// Background: -// - when for whatever reason some of the nodes that include at least one proxy stop seeing the _current_ primary -// they may, after keep-aliving for a while and talking to each other, go ahead and elect a new primary - -// from themselves and for themselves; -// - when the network is back up again we then discover split-brain in progress, and we may not like it. -// Beware!.. well, just beware. -func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q url.Values) { - const ( - tag = "designated primary" - act = "force-join" - ) - // 1. validate args - if p.SID() == npid { - nlog.Warningln(p.String(), "(self) is the", tag, "- nothing to do") - return - } - var ( - smap = p.owner.smap.get() - psi = smap.GetProxy(npid) - newPrimaryURL = q.Get(apc.QparamPrimaryCandidate) - nurl = newPrimaryURL - ) - if psi == nil && newPrimaryURL == "" { - msg := act + " failure (w/ empty destination URL):" - err := &errNodeNotFound{msg: msg, id: npid, si: p.si, smap: smap} - p.writeErr(w, r, err, http.StatusNotFound) - return - } - if nurl == "" { - nurl = cos.Left(psi.ControlNet.URL, psi.PubNet.URL) - } - if nurl == "" { - err := fmt.Errorf("cannot %s %s (and cluster %s) to %s[%s]: missing destination URL", act, p, smap, tag, npid) - p.writeErr(w, r, err) - return - } - - // 2. get destination Smap (henceforth, newSmap) - newSmap, err := p.smapFromURL(nurl) - if err != nil && psi != nil && psi.PubNet.URL != psi.ControlNet.URL { - newSmap, err = p.smapFromURL(psi.PubNet.URL) - } - if err != nil { - err := fmt.Errorf("%s %s to %s[%q, %q]: %s", act, p, tag, npid, newPrimaryURL, err) - p.writeErr(w, r, err) - return - } - npsi := newSmap.Primary - if nurl != npsi.PubNet.URL && nurl != npsi.ControlNet.URL { - // must be reachable via its own (new)Smap - err := fmt.Errorf("%s: %s URLs %q vs (pub %q, ctrl %q)", p, tag, nurl, npsi.PubNet.URL, npsi.ControlNet.URL) - nlog.Warningln(err) - if _, e := p.smapFromURL(npsi.ControlNet.URL); e != nil { - if _, e = p.smapFromURL(npsi.PubNet.URL); e != nil { - p.writeErr(w, r, err) - return - } - } - } - if npid != npsi.ID() { - err := fmt.Errorf("%s: according to the destination %s %s[%s] is _not_ primary", p, newSmap.StringEx(), tag, npid) - p.writeErr(w, r, err) - return - } - if err := _checkFlags(npsi); err != nil { - p.writeErr(w, r, err) - return - } - npname := npsi.StringEx() - - // - // 3. begin - // - what := "(split-brain):" - if smap.UUID != newSmap.UUID { - what = "a different cluster:" - } - nlog.Warningln(act, "entire cluster [", p.String(), smap.StringEx(), "] to:\n", "\t", what, "[", npname, newSmap.StringEx(), "]") - - // 4. get destination cluMeta from npsi - cargs := allocCargs() - { - cargs.si = npsi - cargs.timeout = cmn.Rom.MaxKeepalive() - cargs.req = cmn.HreqArgs{Path: apc.URLPathDae.S, Query: url.Values{apc.QparamWhat: []string{apc.WhatSmapVote}}} - cargs.cresv = cresCM{} // -> cluMeta - } - res := p.call(cargs, smap) - err = res.unwrap() - freeCargs(cargs) - if err != nil { - freeCR(res) - p.writeErr(w, r, err) - return - } - ncm, ok := res.v.(*cluMeta) - debug.Assert(ok) - freeCR(res) - - // 5. backup (see rollback below) - cm, err := p.cluMeta(cmetaFillOpt{skipPrimeTime: true}) - if err != nil { - p.writeErrf(w, r, "cannot %s %s to %s: %v", act, p, npname, err) // (unlikely) - return - } - - nlog.Infoln(act, "(6) prepare") - - // 6. prepare phase whereby all members health-ping => npsi (see `prepForceJoin`) - bargs := allocBcArgs() - { - aimsg := p.newAmsgActVal(apc.ActPrimaryForce, newSmap) - q := make(url.Values, 1) - q.Set(apc.QparamPrepare, "true") - bargs.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathDaeForceJoin.S, Query: q, Body: cos.MustMarshal(aimsg)} - bargs.to = core.AllNodes - } - results := p.bcastGroup(bargs) - freeBcArgs(bargs) - for _, res := range results { - if res.err != nil { - p.writeErrf(w, r, "node %s failed to contact new primary %s in the prepare phase (err: %v)", res.si, npname, res.err) - freeBcastRes(results) - return - } - } - freeBcastRes(results) - - nlog.Infoln(act, "(7) metasync") - - // 7. metasync destination cluMeta to _this_ cluster members - aimsg := p.newAmsgActVal(apc.ActPrimaryForce, nil) - ncm.metasync(p, aimsg, true) - - nlog.Infoln(act, "(8) update clu-meta") - - // 8. update cluMeta in memory (= destination) - if err = cmn.GCO.Update(&ncm.Config.ClusterConfig); err != nil { - // rollback #1 - nlog.Errorln(act, "rollback #1", err) - cm.metasync(p, aimsg, true) - p.writeErr(w, r, err) - return - } - bmdOwnerPrx, ok := p.owner.bmd.(*bmdOwnerPrx) - debug.Assert(ok) - bmdOwnerPrx.put(ncm.BMD) - - nlog.Infoln(act, "(9) join self") - - // 9. join self (up to 3 attempts) - joinURL, secondURL := npsi.ControlNet.URL, npsi.PubNet.URL - if nurl == npsi.PubNet.URL { - joinURL, secondURL = npsi.PubNet.URL, npsi.ControlNet.URL - } - - p.owner.smap.put(newSmap) - res = p.regTo(joinURL, npsi, apc.DefaultTimeout, nil, false /*keepalive*/) - e, eh := res.err, res.toErr() - freeCR(res) - if e != nil { - if joinURL != secondURL { - nlog.Warningln(res.toErr(), "- 2nd attempt via", secondURL) - res = p.regTo(secondURL, npsi, apc.DefaultTimeout, nil, false) - e, eh = res.err, res.toErr() - freeCR(res) - if e != nil { - time.Sleep(time.Second) - nlog.Warningln(res.toErr(), "- 3d (final) attempt via", joinURL) - res = p.regTo(joinURL, npsi, apc.DefaultTimeout, nil, false) - eh = res.toErr() - freeCR(res) - } - } - } - if eh != nil { - // rollback #2 - nlog.Errorln(act, "rollback #2", eh) - if nested := cmn.GCO.Update(&cm.Config.ClusterConfig); nested != nil { - nlog.Errorf("FATAL: nested config-update error when rolling back [%s %s to %s]: %v", - act, p, npname, nested) // (unlikely) - } - p.owner.smap.put(cm.Smap) - bmdOwnerPrx.put(cm.BMD) - - cm.metasync(p, aimsg, true) - - p.writeErr(w, r, eh) - return - } - - p.metasyncer.becomeNonPrimary() // point of no return - - time.Sleep(time.Second) - - nlog.Infoln(act, "(10) ask npsi to bump metasync") - - // 10. finally, ask npsi to bump versions and metasync all (see `bumpMsyncAll`) - cargs = allocCargs() - msg := &apc.ActMsg{Action: apc.ActBumpMetasync} - { - cargs.si = npsi - cargs.timeout = cmn.Rom.MaxKeepalive() - cargs.req = cmn.HreqArgs{Method: http.MethodPut, Path: apc.URLPathClu.S, Body: cos.MustMarshal(msg)} - } - res = p.call(cargs, newSmap) - err = res.unwrap() - freeCR(res) - if err != nil { - freeCargs(cargs) - // retry once - cargs = allocCargs() - cargs.si = npsi - cargs.req = cmn.HreqArgs{Method: http.MethodPut, Base: npsi.PubNet.URL, Path: apc.URLPathClu.S, Body: cos.MustMarshal(msg)} - cargs.timeout = cmn.Rom.MaxKeepalive() + time.Second - - nlog.Errorln(err, "- failed to bump metasync, retrying...") - res := p.call(cargs, newSmap) - err = res.unwrap() - freeCR(res) - } - if err != nil { - p.writeErrf(w, r, "%s: failed to bump metasync => %s: %v", p, npname, err) - } - freeCargs(cargs) -} - ////////////////////////////////////////// // DELETE /v1/cluster - self-unregister // ////////////////////////////////////////// @@ -2327,8 +1950,8 @@ func (p *proxy) httpcludel(w http.ResponseWriter, r *http.Request) { if err := p.checkAccess(w, r, nil, apc.AceAdmin); err != nil { return } - if err := p.isIntraCall(r.Header, false /*from primary*/); err != nil { - err = fmt.Errorf("expecting intra-cluster call for %q, got %w", apc.ActSelfRemove, err) + if err := p.checkIntraCall(r.Header, false /*from primary*/); err != nil { + err = fmt.Errorf("%v (action %q)", err, apc.ActSelfRemove) p.writeErr(w, r, err) return } @@ -2522,9 +2145,27 @@ func mustRebalance(ctx *smapModifier, cur *smapX) bool { // cluMeta // ///////////// +// check essentials +func (cm *cluMeta) validate() error { + if cm.Smap == nil || !cm.Smap.isValid() { + return errors.New("invalid Smap") + } + if cm.BMD == nil || cm.BMD.version() == 0 || !cos.IsValidUUID(cm.BMD.UUID) { + return errors.New("invalid BMD") + } + if cm.Config == nil || cm.Config.version() == 0 || !cos.IsValidUUID(cm.Config.UUID) { + return errors.New("invalid Config") + } + return nil +} + func (cm *cluMeta) metasync(p *proxy, msg *aisMsg, wait bool) { - revs := make([]revsPair, 0, 5) + var ( + detail string + revs = make([]revsPair, 0, 5) + ) if cm.Smap != nil && cm.Smap.isValid() { + detail = cm.Smap.StringEx() revs = append(revs, revsPair{cm.Smap, msg}) } if cm.BMD != nil && cm.BMD.version() > 0 && cos.IsValidUUID(cm.BMD.UUID) { @@ -2539,7 +2180,7 @@ func (cm *cluMeta) metasync(p *proxy, msg *aisMsg, wait bool) { if cm.EtlMD != nil { revs = append(revs, revsPair{cm.EtlMD, msg}) } - nlog.Infoln(p.String(), "metasync-all", msg.Action, len(revs)) + nlog.InfoDepth(1, p.String(), "metasync-all", msg.Action, detail, len(revs)) wg := p.metasyncer.sync(revs...) if wait { wg.Wait() diff --git a/ais/psetforce.go b/ais/psetforce.go new file mode 100644 index 0000000000..e52d514b68 --- /dev/null +++ b/ais/psetforce.go @@ -0,0 +1,689 @@ +// Package ais provides core functionality for the AIStore object storage. +/* + * Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved. + */ +package ais + +import ( + "errors" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/NVIDIA/aistore/api/apc" + "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/cos" + "github.com/NVIDIA/aistore/cmn/debug" + "github.com/NVIDIA/aistore/cmn/nlog" + "github.com/NVIDIA/aistore/core" + "github.com/NVIDIA/aistore/core/meta" +) + +// set new primary, scenarios including joining entire (split-brain) cluster + +// +// cluster +// + +func (p *proxy) cluSetPrimary(w http.ResponseWriter, r *http.Request) { + apiItems, err := p.parseURL(w, r, apc.URLPathCluProxy.L, 1, false) + if err != nil { + return + } + + smap := p.owner.smap.get() + if err := smap.validate(); err != nil { + p.writeErr(w, r, err) + return + } + debug.Assert(smap.IsPrimary(p.si)) + npid := apiItems[0] + npsi := smap.GetProxy(npid) + + if npsi == nil { + // a) go with force + p._withForce(w, r, npid, smap) + return + } + + if npid == p.SID() { + debug.Assert(p.SID() == smap.Primary.ID()) // must be forwardCP-ed + nlog.Warningln(p.String(), "(self) is already primary, nothing to do") + return + } + + if err := _checkFlags(npsi); err != nil { + p.writeErr(w, r, err) + return + } + + // b) regular set-primary + if p.settingNewPrimary.CAS(false, true) { + p._setPrimary(w, r, npsi) + p.settingNewPrimary.Store(false) + } else { + nlog.Warningln("setting new primary in progress now...") + } +} + +func (p *proxy) _withForce(w http.ResponseWriter, r *http.Request, npid string, smap *smapX) { + query := r.URL.Query() + force := cos.IsParseBool(query.Get(apc.QparamForce)) + if !force { + err := &errNodeNotFound{msg: "set-primary failure:", id: npid, si: p.si, smap: smap} + p.writeErr(w, r, err, http.StatusNotFound) + return + } + if p.settingNewPrimary.CAS(false, true) { + p.forceJoin(w, r, npid, query) + p.settingNewPrimary.Store(false) + } else { + err := errors.New("setting new primary is in progress, cannot use force") + p.writeErr(w, r, err) + } +} + +func _checkFlags(npsi *meta.Snode) error { + if npsi.InMaintOrDecomm() { + s := "under maintenance" + if !npsi.InMaint() { + s = "being decommissioned" + } + return fmt.Errorf("%s cannot become a new primary as it is currently %s", npsi, s) + } + if npsi.Flags.IsSet(meta.SnodeNonElectable) { + return fmt.Errorf("%s is non-electable and cannot become a new primary", npsi) + } + return nil +} + +func (p *proxy) _setPrimary(w http.ResponseWriter, r *http.Request, npsi *meta.Snode) { + // + // (I.1) Prepare phase - inform other nodes. + // + urlPath := apc.URLPathDaeProxy.Join(npsi.ID()) + q := make(url.Values, 1) + q.Set(apc.QparamPrepare, "true") + args := allocBcArgs() + args.req = cmn.HreqArgs{Method: http.MethodPut, Path: urlPath, Query: q} + + cluMeta, errM := p.cluMeta(cmetaFillOpt{skipSmap: true, skipPrimeTime: true}) + if errM != nil { + p.writeErr(w, r, errM) + return + } + args.req.Body = cos.MustMarshal(cluMeta) + + npname := npsi.StringEx() + + args.to = core.AllNodes + results := p.bcastGroup(args) + freeBcArgs(args) + for _, res := range results { + if res.err == nil { + continue + } + err := res.errorf("node %s failed to set primary %s in the prepare phase (err: %v)", res.si, npname, res.err) + p.writeErr(w, r, err) + freeBcastRes(results) + return + } + freeBcastRes(results) + + // + // (I.2) Prepare phase - local changes. + // + err := p.owner.smap.modify(&smapModifier{pre: func(_ *smapModifier, clone *smapX) error { + clone.Primary = npsi + p.metasyncer.becomeNonPrimary() + return nil + }}) + debug.AssertNoErr(err) + + // + // (II) Commit phase. + // + q.Set(apc.QparamPrepare, "false") + args = allocBcArgs() + args.req = cmn.HreqArgs{Method: http.MethodPut, Path: urlPath, Query: q} + args.to = core.AllNodes + results = p.bcastGroup(args) + freeBcArgs(args) + for _, res := range results { + if res.err == nil { + continue + } + if res.si.ID() == npsi.ID() { + // FATAL + cos.ExitLogf("commit phase failure: new primary %s returned %v", npname, res.err) + } else { + nlog.Errorf("node %s failed to set new primary %s in the commit phase (err: %v)", res.si, npname, res.err) + } + } + freeBcastRes(results) +} + +// Force primary change (*****) +// 10-steps sequence that now supports merging two different clusters +// Background: +// - when for whatever reason some of the nodes that include at least one proxy stop seeing the _current_ primary +// they may, after keep-aliving for a while and talking to each other, go ahead and elect a new primary - +// from themselves and for themselves; +// - when the network is back up again we then discover split-brain in progress, and we may not like it. +// +// Beware!.. well, just beware. +func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q url.Values) { + const ( + tag = "designated primary" + act = "force-join" + ) + // 1. validate args + if p.SID() == npid { + nlog.Warningln(p.String(), "(self) is the", tag, "- nothing to do") + return + } + var ( + smap = p.owner.smap.get() + psi = smap.GetProxy(npid) + newPrimaryURL = q.Get(apc.QparamPrimaryCandidate) + nurl = newPrimaryURL + ) + if psi == nil && newPrimaryURL == "" { + msg := act + " failure (w/ empty destination URL):" + err := &errNodeNotFound{msg: msg, id: npid, si: p.si, smap: smap} + p.writeErr(w, r, err, http.StatusNotFound) + return + } + if nurl == "" { + nurl = cos.Left(psi.ControlNet.URL, psi.PubNet.URL) + } + if nurl == "" { + err := fmt.Errorf("cannot %s %s (and cluster %s) to %s[%s]: missing destination URL", act, p, smap, tag, npid) + p.writeErr(w, r, err) + return + } + + // 2. get destination Smap (henceforth, newSmap) + newSmap, err := p.smapFromURL(nurl) + if err != nil && psi != nil && psi.PubNet.URL != psi.ControlNet.URL { + newSmap, err = p.smapFromURL(psi.PubNet.URL) + } + if err != nil { + err := fmt.Errorf("%s %s to %s[%q, %q]: %s", act, p, tag, npid, newPrimaryURL, err) + p.writeErr(w, r, err) + return + } + npsi := newSmap.Primary + if nurl != npsi.PubNet.URL && nurl != npsi.ControlNet.URL { + // must be reachable via its own (new)Smap + err := fmt.Errorf("%s: %s URLs %q vs (pub %q, ctrl %q)", p, tag, nurl, npsi.PubNet.URL, npsi.ControlNet.URL) + nlog.Warningln(err) + if _, e := p.smapFromURL(npsi.ControlNet.URL); e != nil { + if _, e = p.smapFromURL(npsi.PubNet.URL); e != nil { + p.writeErr(w, r, err) + return + } + } + } + if npid != npsi.ID() { + err := fmt.Errorf("%s: according to the destination %s %s[%s] is not _the_ primary", p, newSmap.StringEx(), tag, npid) + p.writeErr(w, r, err) + return + } + if err := _checkFlags(npsi); err != nil { + p.writeErr(w, r, err) + return + } + npname := npsi.StringEx() + + // + // 3. begin + // + what := "(split-brain):" + if smap.UUID != newSmap.UUID { + what = "a different cluster:" + } + nlog.Warningln(act, "entire cluster [", p.String(), smap.StringEx(), "] to:\n", "\t", what, "[", npname, newSmap.StringEx(), "]") + + // 4. get destination cluMeta from npsi + cargs := allocCargs() + { + cargs.si = npsi + cargs.timeout = cmn.Rom.MaxKeepalive() + cargs.req = cmn.HreqArgs{Path: apc.URLPathDae.S, Query: url.Values{apc.QparamWhat: []string{apc.WhatSmapVote}}} + cargs.cresv = cresCM{} // -> cluMeta + } + res := p.call(cargs, newSmap /* -> header */) + err = res.unwrap() + freeCargs(cargs) + if err != nil { + freeCR(res) + p.writeErr(w, r, err) + return + } + ncm, ok := res.v.(*cluMeta) + debug.Assert(ok) + freeCR(res) + + if err := ncm.validate(); err != nil { + p.writeErrf(w, r, "cannot %s %s to %s: %v", act, p, npname, err) + return + } + + // 5. backup (see rollback below) + cm, err := p.cluMeta(cmetaFillOpt{skipPrimeTime: true}) + if err != nil { + p.writeErrf(w, r, "cannot %s %s to %s: %v", act, p, npname, err) // (unlikely) + return + } + + nlog.Infoln(act, "(6) prepare") + + // 6. prepare phase whereby all members health-ping => npsi (see `prepForceJoin`) + bargs := allocBcArgs() + { + aimsg := p.newAmsgActVal(apc.ActPrimaryForce, newSmap) + q := make(url.Values, 1) + q.Set(apc.QparamPrepare, "true") + bargs.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathDaeForceJoin.S, Query: q, Body: cos.MustMarshal(aimsg)} + bargs.to = core.AllNodes + } + results := p.bcastGroup(bargs) + freeBcArgs(bargs) + for _, res := range results { + if res.err != nil { + p.writeErrf(w, r, "node %s failed to contact new primary %s in the prepare phase (err: %v)", + res.si, npname, res.err) + freeBcastRes(results) + return + } + } + freeBcastRes(results) + + aimsg := p.newAmsgActVal(apc.ActPrimaryForce, nil) + nlog.Infoln(act, "(7) update clu-meta in mem") + + // 7. update cluMeta in memory (= destination) + if err = cmn.GCO.Update(&ncm.Config.ClusterConfig); err != nil { + // rollback #1 + nlog.Errorln(act, "rollback #1", err) + cm.metasync(p, aimsg, true) + p.writeErr(w, r, err) + return + } + p.owner.bmd.put(ncm.BMD) + p.owner.rmd.put(ncm.RMD) + + nlog.Infoln(act, "(8) join self ->", npname) + + // 8. join self (up to 3 attempts) + joinURL, secondURL := npsi.ControlNet.URL, npsi.PubNet.URL + if nurl == npsi.PubNet.URL { + joinURL, secondURL = npsi.PubNet.URL, npsi.ControlNet.URL + } + p.owner.smap.put(newSmap) + res = p.regTo(joinURL, npsi, apc.DefaultTimeout, nil, false /*keepalive*/) + e, eh := res.err, res.toErr() + freeCR(res) + if e != nil { + if joinURL != secondURL { + nlog.Errorln(res.toErr(), "- 2nd attempt via", secondURL) + res = p.regTo(secondURL, npsi, apc.DefaultTimeout, nil, false) + e, eh = res.err, res.toErr() + freeCR(res) + if e != nil { + time.Sleep(time.Second) + nlog.Warningln(res.toErr(), "- 3d (final) attempt via", joinURL) + res = p.regTo(joinURL, npsi, apc.DefaultTimeout, nil, false) + eh = res.toErr() + freeCR(res) + } + } + } + if eh != nil { + // rollback #2 + nlog.Errorln(act, "rollback #2", eh) + if nested := cmn.GCO.Update(&cm.Config.ClusterConfig); nested != nil { + nlog.Errorf("FATAL: nested config-update error when rolling back [%s %s to %s]: %v", + act, p, npname, nested) // (unlikely) + } + p.owner.smap.put(cm.Smap) + p.owner.bmd.put(cm.BMD) + p.owner.rmd.put(cm.RMD) + + p.writeErr(w, r, eh) + return + } + + nlog.Infoln(act, "(9) commit") + + // 9. commit phase (see `prepForceJoin`) + if len(smap.Tmap) == 0 && len(smap.Pmap) == 1 { // TODO -- FIXME: count + goto st10 + } + bargs = allocBcArgs() + { + aimsg = p.newAmsgActVal(apc.ActPrimaryForce, ncm) + q := make(url.Values, 1) + q.Set(apc.QparamPrepare, "false") // TODO -- FIXME: begin/commit/abort + bargs.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathDaeForceJoin.S, Query: q, Body: cos.MustMarshal(aimsg)} + bargs.to = core.SelectedNodes + bargs.nodes = make([]meta.NodeMap, 0, 2) + if len(smap.Tmap) > 0 { + bargs.nodes = append(bargs.nodes, smap.Tmap) + } + if len(smap.Pmap) > 1 { + bargs.nodes = append(bargs.nodes, smap.Pmap) + } + } + if len(bargs.nodes) == 0 { + goto st10 + } + results = p.bcastGroup(bargs) + freeBcArgs(bargs) + for _, res := range results { + if res.err != nil { + p.writeErrf(w, r, "node %s failed commit phase (err: %v)", res.si, res.err) + freeBcastRes(results) + + // TODO -- FIXME: rollback #3 + return + } + } + freeBcastRes(results) +st10: + p.metasyncer.becomeNonPrimary() // point of no return + + time.Sleep(time.Second) + + nlog.Infoln(act, "(10) ask npsi to bump metasync") + + // 10. finally, ask npsi to bump versions and metasync all (see `msyncForceAll`) + cargs = allocCargs() + msg := &apc.ActMsg{Action: apc.ActBumpMetasync} + { + cargs.si = npsi + cargs.timeout = cmn.Rom.MaxKeepalive() + cargs.req = cmn.HreqArgs{Method: http.MethodPut, Path: apc.URLPathClu.S, Body: cos.MustMarshal(msg)} + } + res = p.call(cargs, newSmap) + err = res.unwrap() + freeCR(res) + if err != nil { + freeCargs(cargs) + // retry once + cargs = allocCargs() + cargs.si = npsi + cargs.req = cmn.HreqArgs{Method: http.MethodPut, Base: npsi.PubNet.URL, Path: apc.URLPathClu.S, Body: cos.MustMarshal(msg)} + cargs.timeout = cmn.Rom.MaxKeepalive() + time.Second + + nlog.Errorln(err, "- failed to bump metasync, retrying...") + res := p.call(cargs, newSmap) + err = res.unwrap() + freeCR(res) + } + if err != nil { + p.writeErrf(w, r, "%s: failed to bump metasync => %s: %v", p, npname, err) + } + freeCargs(cargs) +} + +// node (stray proxy, that is) +func (p *proxy) daeSetPrimary(w http.ResponseWriter, r *http.Request) { + apiItems, err := p.parseURL(w, r, apc.URLPathDae.L, 2, false) + if err != nil { + return + } + var ( + proxyID = apiItems[1] + query = r.URL.Query() + force = cos.IsParseBool(query.Get(apc.QparamForce)) + ) + // force primary change + if force && apiItems[0] == apc.Proxy { + if smap := p.owner.smap.get(); !smap.isPrimary(p.si) { + p.writeErr(w, r, newErrNotPrimary(p.si, smap)) + return + } + p.forceJoin(w, r, proxyID, query) // TODO -- FIXME: test + return + } + prepare, err := cos.ParseBool(query.Get(apc.QparamPrepare)) + if err != nil { + p.writeErrf(w, r, "failed to parse URL query %q: %v", apc.QparamPrepare, err) + return + } + if p.owner.smap.get().isPrimary(p.si) { + p.writeErrf(w, r, "%s (self) is primary, expecting '/v1/cluster/...' when designating a new one", p) + return + } + if prepare { + var cluMeta cluMeta + if err := cmn.ReadJSON(w, r, &cluMeta); err != nil { + return + } + if err := p.recvCluMeta(&cluMeta, "set-primary", cluMeta.SI.String()); err != nil { + p.writeErrf(w, r, "%s: failed to receive clu-meta: %v", p, err) + return + } + } + + // self + if p.SID() == proxyID { + smap := p.owner.smap.get() + if smap.GetActiveNode(proxyID) == nil { + p.writeErrf(w, r, "%s: in maintenance or decommissioned", p) + return + } + if !prepare { + p.becomeNewPrimary("") + } + return + } + + // other + smap := p.owner.smap.get() + psi := smap.GetProxy(proxyID) + if psi == nil { + err := &errNodeNotFound{"cannot set new primary", proxyID, p.si, smap} + p.writeErr(w, r, err) + return + } + if prepare { + if cmn.Rom.FastV(4, cos.SmoduleAIS) { + nlog.Infoln("Preparation step: do nothing") + } + return + } + ctx := &smapModifier{pre: func(_ *smapModifier, clone *smapX) error { clone.Primary = psi; return nil }} + err = p.owner.smap.modify(ctx) + debug.AssertNoErr(err) +} + +// destination (designated) primary +func (p *proxy) msyncForceAll(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) { + cm, err := p.cluMeta(cmetaFillOpt{}) + if err != nil { + p.writeErr(w, r, err) // (unlikely) + return + } + + if true { // DEBUG + aimsg := p.newAmsgActVal(apc.ActPrimaryForce, nil) + cm.metasync(p, aimsg, false) + return + } + ctx := &smapModifier{ + pre: func(_ *smapModifier, clone *smapX) error { clone.Version += 100; return nil }, // TODO -- FIXME: max(smap.Version, newSmap.Version) + 100 + final: func(_ *smapModifier, clone *smapX) { + aimsg := p.newAmsgActVal(apc.ActPrimaryForce, msg) + cm.Smap = clone + cm.metasync(p, aimsg, false) + }, + } + err = p.owner.smap.modify(ctx) + debug.AssertNoErr(err) // TODO -- FIXME: handle +} + +// +// becoming +// + +func (p *proxy) becomeNewPrimary(proxyIDToRemove string) { + ctx := &smapModifier{ + pre: p._becomePre, + final: p._becomeFinal, + sid: proxyIDToRemove, + } + err := p.owner.smap.modify(ctx) + cos.AssertNoErr(err) +} + +func (p *proxy) _becomePre(ctx *smapModifier, clone *smapX) error { + if !clone.isPresent(p.si) { + cos.Assertf(false, "%s must always be present in the %s", p.si, clone.pp()) + } + if ctx.sid != "" && clone.GetNode(ctx.sid) != nil { + // decision is made: going ahead to remove + nlog.Infof("%s: removing failed primary %s", p, ctx.sid) + clone.delProxy(ctx.sid) + + // Remove reverse proxy entry for the node. + p.rproxy.nodes.Delete(ctx.sid) + } + + clone.Primary = clone.GetProxy(p.SID()) + clone.Version += 100 + clone.staffIC() + return nil +} + +func (p *proxy) _becomeFinal(ctx *smapModifier, clone *smapX) { + var ( + bmd = p.owner.bmd.get() + rmd = p.owner.rmd.get() + msg = p.newAmsgStr(apc.ActNewPrimary, bmd) + pairs = []revsPair{{clone, msg}, {bmd, msg}, {rmd, msg}} + ) + nlog.Infof("%s: distributing (%s, %s, %s) with newly elected primary (self)", p, clone, bmd, rmd) + config, err := p.ensureConfigURLs() + if err != nil { + nlog.Errorln(err) + } + if config != nil { + pairs = append(pairs, revsPair{config, msg}) + nlog.Infof("%s: plus %s", p, config) + } + etl := p.owner.etl.get() + if etl != nil && etl.version() > 0 { + pairs = append(pairs, revsPair{etl, msg}) + nlog.Infof("%s: plus %s", p, etl) + } + // metasync + debug.Assert(clone._sgl != nil) + _ = p.metasyncer.sync(pairs...) + + // synchronize IC tables + p.syncNewICOwners(ctx.smap, clone) +} + +// +// all nodes except primary +// + +// (primary forceJoin() calling) +// TODO: refactor as 2PC begin--abort|commit +func (h *htrun) prepForceJoin(w http.ResponseWriter, r *http.Request) { + const ( + tag = "prep-force-join" + ) + // caller + var ( + callerID = r.Header.Get(apc.HdrCallerID) + smap = h.owner.smap.get() + psi = smap.GetNode(callerID) + ) + // msg and query params + q := r.URL.Query() + prepare, err := cos.ParseBool(q.Get(apc.QparamPrepare)) + if err != nil { + err := fmt.Errorf("failed to parse %q query: %v", apc.QparamPrepare, err) + h.writeErr(w, r, err) + return + } + msg, err := h.readAisMsg(w, r) + if err != nil { + return + } + + // TODO -- FIXME: refactor two methods + if prepare { + if !smap.IsPrimary(psi) { + h.writeErrf(w, r, "%s: expecting %s call from primary, got %q", h, tag, callerID) + return + } + + newSmap := &smapX{} // destination cluster's Smap + if err := cos.MorphMarshal(msg.Value, newSmap); err != nil { + h.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, h.si, msg.Action, msg.Value, err) + return + } + + // health handshake + var ( + npsi = newSmap.Primary + tout = cmn.Rom.CplaneOperation() + ) + if _, code, err := h.reqHealth(npsi, tout, nil, newSmap /* -> header */, true /*retry pub-addr*/); err != nil { + err = fmt.Errorf("%s: failed to reach %s, err: %v(%d)", tag, npsi.StringEx(), err, code) + h.writeErr(w, r, err) + } + + nlog.Infoln(h.String(), "prepare done", msg.String()) + return + } + + // commit + ncm := &cluMeta{} + if err := cos.MorphMarshal(msg.Value, ncm); err != nil { + h.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, h.si, msg.Action, msg.Value, err) + return + } + + // 7. update cluMeta in memory (= destination) + if err = cmn.GCO.Update(&ncm.Config.ClusterConfig); err != nil { + // rollback #1 + nlog.Errorln(msg.String(), "rollback #1", err) + h.writeErr(w, r, err) + return + } + + h.owner.bmd.put(ncm.BMD) + h.owner.rmd.put(ncm.RMD) + + npname := ncm.Smap.Primary.StringEx() + nlog.Infoln(msg.String(), "(8) join self ->", npname) + + // 8. join self (up to 3 attempts) + npsi := ncm.Smap.Primary + joinURL, secondURL := npsi.ControlNet.URL, npsi.PubNet.URL + h.owner.smap.put(ncm.Smap) + res := h.regTo(joinURL, npsi, apc.DefaultTimeout, nil, false /*keepalive*/) + e, eh := res.err, res.toErr() + freeCR(res) + if e != nil { + if joinURL != secondURL { + time.Sleep(time.Second) + nlog.Errorln(res.toErr(), "- 2nd attempt via", secondURL) + res = h.regTo(secondURL, npsi, apc.DefaultTimeout, nil, false) + eh = res.toErr() + freeCR(res) + } + } + if eh != nil { + h.writeErrf(w, r, "failed to join %s: %v", npname, err) // FATAL + return + } + nlog.Infoln(h.String(), "joined", npname, msg.String()) +} diff --git a/ais/rebmeta.go b/ais/rebmeta.go index 585b6f8b2c..8bccc02a16 100644 --- a/ais/rebmeta.go +++ b/ais/rebmeta.go @@ -134,6 +134,19 @@ func (r *rmdOwner) load() { func (r *rmdOwner) put(rmd *rebMD) { r.rmd.Store(rmd) } func (r *rmdOwner) get() *rebMD { return r.rmd.Load() } +func (r *rmdOwner) synch(rmd *rebMD, locked bool) (err error) { + if !locked { + r.Lock() + } + r.put(rmd) + err = r.persist(rmd) + debug.AssertNoErr(err) + if !locked { + r.Unlock() + } + return err +} + func (r *rmdOwner) modify(ctx *rmdModifier) (clone *rebMD, err error) { r.Lock() clone, err = r.do(ctx) diff --git a/ais/target.go b/ais/target.go index 972e9d1b8b..8ad20cd30a 100644 --- a/ais/target.go +++ b/ais/target.go @@ -670,7 +670,7 @@ func (t *target) httpobjget(w http.ResponseWriter, r *http.Request, apireq *apiR return } if cmn.Rom.Features().IsSet(feat.EnforceIntraClusterAccess) { - if apireq.dpq.ptime == "" /*isRedirect*/ && t.isIntraCall(r.Header, false /*from primary*/) != nil { + if apireq.dpq.ptime == "" /*isRedirect*/ && t.checkIntraCall(r.Header, false /*from primary*/) != nil { t.writeErrf(w, r, "%s: %s(obj) is expected to be redirected (remaddr=%s)", t.si, r.Method, r.RemoteAddr) return @@ -1011,7 +1011,7 @@ func (t *target) httpobjhead(w http.ResponseWriter, r *http.Request, apireq *api query, bck, objName := apireq.query, apireq.bck, apireq.items[1] if cmn.Rom.Features().IsSet(feat.EnforceIntraClusterAccess) { // validates that the request is internal (by a node in the same cluster) - if isRedirect(query) == "" && t.isIntraCall(r.Header, false) != nil { + if isRedirect(query) == "" && t.checkIntraCall(r.Header, false) != nil { t.writeErrf(w, r, "%s: %s(obj) is expected to be redirected (remaddr=%s)", t.si, r.Method, r.RemoteAddr) return @@ -1164,7 +1164,7 @@ func (t *target) httpobjpatch(w http.ResponseWriter, r *http.Request, apireq *ap return } if cmn.Rom.Features().IsSet(feat.EnforceIntraClusterAccess) { - if isRedirect(apireq.query) == "" && t.isIntraCall(r.Header, false) != nil { + if isRedirect(apireq.query) == "" && t.checkIntraCall(r.Header, false) != nil { t.writeErrf(w, r, "%s: %s(obj) is expected to be redirected (remaddr=%s)", t.si, r.Method, r.RemoteAddr) return diff --git a/ais/test/scripts/force-join.sh b/ais/test/scripts/force-join.sh index 366a4eb1cf..bbe70a775c 100755 --- a/ais/test/scripts/force-join.sh +++ b/ais/test/scripts/force-join.sh @@ -17,7 +17,8 @@ while (( "$#" )); do done make kill clean -./scripts/clean_deploy.sh --target-cnt 6 --proxy-cnt 6 --mountpath-cnt 4 --deployment all --debug --aws --gcp --azure >/dev/null +./scripts/clean_deploy.sh --target-cnt 6 --proxy-cnt 6 --mountpath-cnt 4 --deployment all --debug --aws --gcp --azure \ + --remote-target-cnt 3 --remote-proxy-cnt 3 >/dev/null if ! [ -x "$(command -v ais)" ]; then echo "Error: ais (CLI) not installed" >&2 @@ -59,12 +60,15 @@ fi if [[ ${smap} == "true" ]]; then + ais config cluster rebalance.enabled false tsi=$(ais show cluster -H target | awk '{print $1}') for i in {1..10}; do ais cluster add-remove-nodes start-maintenance $tsi --yes >/dev/null ais cluster add-remove-nodes stop-maintenance $tsi --yes >/dev/null done echo "victim smap:" + ais config cluster rebalance.enabled true + sleep 1 ais show cluster smap --json | tail -4 fi @@ -77,7 +81,10 @@ unset -v AIS_ENDPOINT echo "AIS_ENDPOINT=$AIS_ENDPOINT" aisloader -bucket=ais://nnn -cleanup=false -numworkers=8 -quiet -pctput=100 -minsize=4K -maxsize=4K --duration 20s -find /tmp/ais_next -type f | grep "mp[1-4].*/1/" | wc -l +echo "one remote-target that joined from a different cluster now has that many objects:" +find /tmp/ais_next -type f | grep "mp[1-4].*/3/" | wc -l +echo "another remote-target:" +find /tmp/ais_next -type f | grep "mp[1-4].*/4/" | wc -l if [[ ${config} == "true" ]]; then echo "resulting config:" diff --git a/ais/tgtbck.go b/ais/tgtbck.go index 99b3af0765..95e28187fd 100644 --- a/ais/tgtbck.go +++ b/ais/tgtbck.go @@ -39,7 +39,7 @@ func (t *target) httpbckget(w http.ResponseWriter, r *http.Request, dpq *dpq) { if err != nil { return } - if err = t.isIntraCall(r.Header, false); err != nil { + if err = t.checkIntraCall(r.Header, false); err != nil { t.writeErr(w, r, err) return } @@ -301,7 +301,7 @@ func (t *target) bsumm(w http.ResponseWriter, r *http.Request, phase string, bck if phase == apc.ActBegin { rns := xreg.RenewBckSummary(bck, msg) if rns.Err != nil { - t.writeErr(w, r, rns.Err, http.StatusInternalServerError) + t.writeErr(w, r, rns.Err, http.StatusInternalServerError, Silent) return } w.WriteHeader(http.StatusAccepted) diff --git a/ais/tgtcp.go b/ais/tgtcp.go index 87cdd9ca28..928475d8e6 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -880,6 +880,11 @@ func (t *target) _postBMD(newBMD *bucketMD, tag string, rmbcks []*meta.Bck) { // is called under lock func (t *target) receiveRMD(newRMD *rebMD, msg *aisMsg) (err error) { + if msg.Action == apc.ActPrimaryForce { + err = t.owner.rmd.synch(newRMD, true) + return err + } + rmd := t.owner.rmd.get() if newRMD.Version <= rmd.Version { if newRMD.Version < rmd.Version { @@ -1134,9 +1139,6 @@ func (t *target) metasyncPut(w http.ResponseWriter, r *http.Request) { errBMD = t.receiveBMD(newBMD, msgBMD, payload, bmdRecv, caller, false /*silent*/) } if errRMD == nil && newRMD != nil { - rmd := t.owner.rmd.get() - logmsync(rmd.Version, newRMD, msgRMD, caller, newRMD.String(), rmd.CluID) - t.owner.rmd.Lock() errRMD = t.receiveRMD(newRMD, msgRMD) t.owner.rmd.Unlock()