Skip to content

Commit

Permalink
set primary with force (major)
Browse files Browse the repository at this point in the history
* revise as 2PC (prepare=true/false) sequence
  - TODO: make it begin-(abort|commit)
* consolidate all related logic in ais/psetforce
* add put(BMD) interface
* tests: join (3+3) cluster => (6+6) cluster
* with substantial refactoring
* part seven, prev. commit: b72a4ec

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 7, 2024
1 parent f61b440 commit 199af07
Show file tree
Hide file tree
Showing 13 changed files with 793 additions and 580 deletions.
3 changes: 2 additions & 1 deletion ais/bucketmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
// - bucketMD versioning is monotonic and incremental
//
// - bucketMD typical update transaction:
// lock -- clone() -- modify the clone -- bmdOwner.put(clone) -- unlock
// lock -- clone() -- modify the clone -- bmdOwner.put(clone) -- unlock
//
// (*) for merges and conflict resolution, check the current version prior to put()
// (note that version check must be protected by the same critical section)
Expand All @@ -59,6 +59,7 @@ type (

init() bool // true when loaded previous version
get() (bmd *bucketMD)
put(bmd *bucketMD)
putPersist(bmd *bucketMD, payload msPayload) error
persist(clone *bucketMD, payload msPayload) error
modify(*bmdModifier) (*bucketMD, error)
Expand Down
1 change: 1 addition & 0 deletions ais/htcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ var (
errForwarded = errors.New("forwarded")
errSendingResp = errors.New("err-sending-resp")
errFastKalive = errors.New("cannot fast-keepalive")
errIntraControl = errors.New("expected intra-control request")
)

// BMD uuid errs
Expand Down
94 changes: 38 additions & 56 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,9 @@ func logmsync(lver int64, revs revs, msg *aisMsg, opts ...string) { // caller [,
var (
what string
caller = opts[0]
uuid = revs.uuid()
lv = "v" + strconv.FormatInt(lver, 10)
luuid string
)
switch len(opts) {
case 1:
Expand All @@ -1553,27 +1555,31 @@ func logmsync(lver int64, revs revs, msg *aisMsg, opts ...string) { // caller [,
case 2:
what = opts[1]
if strings.IndexByte(what, '[') < 0 {
if uuid := revs.uuid(); uuid != "" {
if uuid != "" {
what += "[" + uuid + "]"
}
}
case 3:
what = opts[1]
luuid := opts[2]
lv += "[" + luuid + "]"

if uuid := revs.uuid(); luuid != "" && uuid != luuid {
// versions incomparable (see apc.ActPrimaryForce)
nlog.InfoDepth(1, "Warning", tag, what, "( different cluster", lv, msg.String(), "<--", caller, ")")
return
}
}
// different uuids (clusters) - versions cannot be compared
if luuid != "" && uuid != "" && uuid != luuid {
nlog.InfoDepth(1, "Warning", tag, what, "( different cluster", lv, msg.String(), "<--", caller, msg.String(), ")")
return
}

// compare l(ocal) and newly received
switch {
case lver == revs.version():
nlog.InfoDepth(1, tag, what, "( same", lv, msg.String(), "<--", caller, ")")
s := "( same"
if lver == 0 {
s = "( initial"
}
nlog.InfoDepth(1, tag, what, s, lv, msg.String(), "<--", caller, ")")
case lver > revs.version():
nlog.InfoDepth(1, "Warning", tag, what, "( down from", lv, msg.String(), "<--", caller, ")")
nlog.InfoDepth(1, "Warning", tag, what, "( down from", lv, msg.String(), "<--", caller, msg.String(), ")")
default:
nlog.InfoDepth(1, tag, "new", what, "( have", lv, msg.String(), "<--", caller, ")")
}
Expand Down Expand Up @@ -1657,7 +1663,7 @@ func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation boo
return
}
}
if skipValidation {
if skipValidation || (msg.Action == apc.ActPrimaryForce && newSmap.isValid()) {
return
}

Expand Down Expand Up @@ -1723,15 +1729,17 @@ func (h *htrun) extractRMD(payload msPayload, caller string) (newRMD *rebMD, msg
}

rmd := h.owner.rmd.get()
logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID)

if msg.Action == apc.ActPrimaryForce {
return
}

if newRMD.CluID != "" && newRMD.CluID != rmd.CluID && rmd.CluID != "" {
logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID)
err = h.owner.rmd.newClusterIntegrityErr(h.String(), newRMD.CluID, rmd.CluID, rmd.Version)
cos.ExitLog(err) // FATAL
}

if cmn.Rom.FastV(4, cos.SmoduleAIS) {
logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID)
}
if newRMD.version() <= rmd.version() && msg.Action != apc.ActPrimaryForce {
if newRMD.version() < rmd.version() {
err = newErrDowngrade(h.si, rmd.String(), newRMD.String())
Expand Down Expand Up @@ -1782,7 +1790,7 @@ func (h *htrun) receiveSmap(newSmap *smapX, msg *aisMsg, payload msPayload, call
smap := h.owner.smap.get()
logmsync(smap.Version, newSmap, msg, caller, newSmap.StringEx(), smap.UUID)

if !newSmap.isPresent(h.si) && msg.Action != apc.ActPrimaryForce {
if !newSmap.isPresent(h.si) {
return &errSelfNotFound{act: "receive-smap", si: h.si, tag: "new", smap: newSmap}
}
return h.owner.smap.synchronize(h.si, newSmap, payload, cb)
Expand Down Expand Up @@ -1989,13 +1997,14 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, htext hte
path string
skipPrxKalive = h.si.IsProxy() || keepalive
opts = cmetaFillOpt{
htext: htext,
skipSmap: skipPrxKalive, // when targets self- or admin-join
skipBMD: skipPrxKalive, // ditto
skipRMD: true, // NOTE: not used yet
skipConfig: true, // ditto
skipEtlMD: true, // ditto
fillRebMarker: !keepalive,
htext: htext,
skipSmap: skipPrxKalive, // when targets self- or admin-join
skipBMD: skipPrxKalive, // ditto
skipRMD: true, // NOTE: not used yet
skipConfig: true, // ditto
skipEtlMD: true, // ditto

fillRebMarker: !keepalive && htext != nil, // TODO -- FIXME
skipPrimeTime: true,
}
)
Expand Down Expand Up @@ -2228,48 +2237,19 @@ func (h *htrun) externalWD(w http.ResponseWriter, r *http.Request) (responded bo
// intra-cluster health ping
// - pub addr permitted (see reqHealth)
// - compare w/ h.ensureIntraControl
err := h.isIntraCall(r.Header, false /* from primary */)
err := h.checkIntraCall(r.Header, false /* from primary */)
if err != nil {
h.writeErr(w, r, err)
responded = true
}
return
}

// (primary forceJoin() calling)
func (h *htrun) prepForceJoin(w http.ResponseWriter, r *http.Request) {
const tag = "prep-force-join"
q := r.URL.Query()
if !cos.IsParseBool(q.Get(apc.QparamPrepare)) {
err := errors.New(tag + ": expecting '" + apc.QparamPrepare + "=true' query")
h.writeErr(w, r, err)
return
}
msg, err := h.readAisMsg(w, r)
if err != nil {
return
}
newSmap := &smapX{}
if err := cos.MorphMarshal(msg.Value, &newSmap); err != nil {
h.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, h.si, msg.Action, msg.Value, err)
return
}
var (
npsi = newSmap.Primary
smap = h.owner.smap.get()
tout = cmn.Rom.CplaneOperation()
)
if _, code, err := h.reqHealth(npsi, tout, nil, smap, true /*retry pub-addr*/); err != nil {
err = fmt.Errorf("%s: failed to reach %s [%v(%d)]", tag, npsi.StringEx(), err, code)
h.writeErr(w, r, err)
}
}

//
// intra-cluster request validations and helpers
//

func (h *htrun) isIntraCall(hdr http.Header, fromPrimary bool) (err error) {
func (h *htrun) checkIntraCall(hdr http.Header, fromPrimary bool) (err error) {
debug.Assert(hdr != nil)
var (
smap = h.owner.smap.get()
Expand All @@ -2280,7 +2260,7 @@ func (h *htrun) isIntraCall(hdr http.Header, fromPrimary bool) (err error) {
erP error
)
if ok := callerID != "" && callerName != ""; !ok {
return fmt.Errorf("%s: expected %s request", h, cmn.NetIntraControl)
return errIntraControl
}
if !smap.isValid() {
return
Expand All @@ -2299,7 +2279,9 @@ func (h *htrun) isIntraCall(hdr http.Header, fromPrimary bool) (err error) {
// we still trust the request when the sender's Smap is more current
if callerVer > smap.version() {
if h.ClusterStarted() {
nlog.Errorf("%s: %s < Smap(v%s) from %s - proceeding anyway...", h, smap, callerSver, callerName)
// (exception: setting primary w/ force)
warn := h.String() + ": local " + smap.String() + " is older than (caller's) " + callerName + " Smap v" + callerSver
nlog.ErrorDepth(1, warn, "- proceeding anyway...")
}
runtime.Gosched()
return
Expand All @@ -2316,7 +2298,7 @@ func (h *htrun) isIntraCall(hdr http.Header, fromPrimary bool) (err error) {
}

func (h *htrun) ensureIntraControl(w http.ResponseWriter, r *http.Request, onlyPrimary bool) (isIntra bool) {
err := h.isIntraCall(r.Header, onlyPrimary)
err := h.checkIntraCall(r.Header, onlyPrimary)
if err != nil {
h.writeErr(w, r, err)
return
Expand Down
3 changes: 2 additions & 1 deletion ais/metasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,8 @@ func (y *metasyncer) _pending() (pending meta.NodeMap, smap *smapX) {
if !ok || v < revs.version() {
inSync = false
break
} else if v > revs.version() {
}
if v > revs.version() {
// skip older versions (TODO: don't skip sending associated aisMsg)
nlog.Errorf("v: %d; revs.version: %d", v, revs.version())
}
Expand Down
134 changes: 5 additions & 129 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2901,135 +2901,6 @@ func (p *proxy) smapFromURL(baseURL string) (smap *smapX, err error) {
return
}

func (p *proxy) daeSetPrimary(w http.ResponseWriter, r *http.Request) {
apiItems, err := p.parseURL(w, r, apc.URLPathDae.L, 2, false)
if err != nil {
return
}
var (
proxyID = apiItems[1]
query = r.URL.Query()
force = cos.IsParseBool(query.Get(apc.QparamForce))
)
// force primary change
if force && apiItems[0] == apc.Proxy {
if smap := p.owner.smap.get(); !smap.isPrimary(p.si) {
p.writeErr(w, r, newErrNotPrimary(p.si, smap))
return
}
p.forceJoin(w, r, proxyID, query) // TODO -- FIXME: test
return
}
prepare, err := cos.ParseBool(query.Get(apc.QparamPrepare))
if err != nil {
p.writeErrf(w, r, "failed to parse URL query %q: %v", apc.QparamPrepare, err)
return
}
if p.owner.smap.get().isPrimary(p.si) {
p.writeErrf(w, r, "%s (self) is primary, expecting '/v1/cluster/...' when designating a new one", p)
return
}
if prepare {
var cluMeta cluMeta
if err := cmn.ReadJSON(w, r, &cluMeta); err != nil {
return
}
if err := p.recvCluMeta(&cluMeta, "set-primary", cluMeta.SI.String()); err != nil {
p.writeErrf(w, r, "%s: failed to receive clu-meta: %v", p, err)
return
}
}

// self
if p.SID() == proxyID {
smap := p.owner.smap.get()
if smap.GetActiveNode(proxyID) == nil {
p.writeErrf(w, r, "%s: in maintenance or decommissioned", p)
return
}
if !prepare {
p.becomeNewPrimary("")
}
return
}

// other
smap := p.owner.smap.get()
psi := smap.GetProxy(proxyID)
if psi == nil {
err := &errNodeNotFound{"cannot set new primary", proxyID, p.si, smap}
p.writeErr(w, r, err)
return
}
if prepare {
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
nlog.Infoln("Preparation step: do nothing")
}
return
}
ctx := &smapModifier{pre: func(_ *smapModifier, clone *smapX) error { clone.Primary = psi; return nil }}
err = p.owner.smap.modify(ctx)
debug.AssertNoErr(err)
}

func (p *proxy) becomeNewPrimary(proxyIDToRemove string) {
ctx := &smapModifier{
pre: p._becomePre,
final: p._becomeFinal,
sid: proxyIDToRemove,
}
err := p.owner.smap.modify(ctx)
cos.AssertNoErr(err)
}

func (p *proxy) _becomePre(ctx *smapModifier, clone *smapX) error {
if !clone.isPresent(p.si) {
cos.Assertf(false, "%s must always be present in the %s", p.si, clone.pp())
}
if ctx.sid != "" && clone.GetNode(ctx.sid) != nil {
// decision is made: going ahead to remove
nlog.Infof("%s: removing failed primary %s", p, ctx.sid)
clone.delProxy(ctx.sid)

// Remove reverse proxy entry for the node.
p.rproxy.nodes.Delete(ctx.sid)
}

clone.Primary = clone.GetProxy(p.SID())
clone.Version += 100
clone.staffIC()
return nil
}

func (p *proxy) _becomeFinal(ctx *smapModifier, clone *smapX) {
var (
bmd = p.owner.bmd.get()
rmd = p.owner.rmd.get()
msg = p.newAmsgStr(apc.ActNewPrimary, bmd)
pairs = []revsPair{{clone, msg}, {bmd, msg}, {rmd, msg}}
)
nlog.Infof("%s: distributing (%s, %s, %s) with newly elected primary (self)", p, clone, bmd, rmd)
config, err := p.ensureConfigURLs()
if err != nil {
nlog.Errorln(err)
}
if config != nil {
pairs = append(pairs, revsPair{config, msg})
nlog.Infof("%s: plus %s", p, config)
}
etl := p.owner.etl.get()
if etl != nil && etl.version() > 0 {
pairs = append(pairs, revsPair{etl, msg})
nlog.Infof("%s: plus %s", p, etl)
}
// metasync
debug.Assert(clone._sgl != nil)
_ = p.metasyncer.sync(pairs...)

// synchronize IC tables
p.syncNewICOwners(ctx.smap, clone)
}

func (p *proxy) ensureConfigURLs() (config *globalConfig, err error) {
config, err = p.owner.config.modify(&configModifier{pre: p._configURLs})
if err != nil {
Expand Down Expand Up @@ -3303,6 +3174,11 @@ func (p *proxy) receiveRMD(newRMD *rebMD, msg *aisMsg, caller string) (err error
rmd := p.owner.rmd.get()
logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID)

if msg.Action == apc.ActPrimaryForce {
err = p.owner.rmd.synch(newRMD, false)
return err
}

p.owner.rmd.Lock()
rmd = p.owner.rmd.get()
if newRMD.version() <= rmd.version() {
Expand Down
2 changes: 1 addition & 1 deletion ais/prxauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (p *proxy) access(hdr http.Header, bck *meta.Bck, ace apc.AccessAttrs) (err
tk *tok.Token
bucket *cmn.Bck
)
if p.isIntraCall(hdr, false /*from primary*/) == nil {
if p.checkIntraCall(hdr, false /*from primary*/) == nil {
return nil
}
if cmn.Rom.AuthEnabled() { // config.Auth.Enabled
Expand Down
Loading

0 comments on commit 199af07

Please sign in to comment.