Skip to content

Commit

Permalink
set primary with force (major)
Browse files Browse the repository at this point in the history
* part two, prev. commit: acae3f0

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 1, 2024
1 parent 8534c0a commit f3bf414
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 110 deletions.
9 changes: 9 additions & 0 deletions ais/htcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,13 @@ type (
errNodeNotFound struct {
msg string
id string
si *meta.Snode // self
smap *smapX
}
errSelfNotFound struct {
act string
si *meta.Snode
tag string
smap *smapX
}
errNotEnoughTargets struct {
Expand Down Expand Up @@ -237,6 +243,9 @@ func (e *errSmapUUIDDiffer) Error() string { return e.detail }
func (e *errNodeNotFound) Error() string {
return fmt.Sprintf("%s: %s node %s not present in the %s", e.si, e.msg, e.id, e.smap)
}
func (e *errSelfNotFound) Error() string {
return fmt.Sprintf("%s: %s failure: not finding self in the %s %s", e.si, e.act, e.tag, e.smap.StringEx())
}

/////////////////////
// errNoUnregister //
Expand Down
37 changes: 26 additions & 11 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,7 +1562,7 @@ func (h *htrun) extractConfig(payload msPayload, caller string) (newConfig *glob
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
logmsync(config.Version, newConfig, msg, caller)
}
if newConfig.version() <= config.Version {
if newConfig.version() <= config.Version && msg.Action != apc.ActPrimaryForce {
if newConfig.version() < config.Version {
err = newErrDowngrade(h.si, config.String(), newConfig.String())
}
Expand Down Expand Up @@ -1592,7 +1592,7 @@ func (h *htrun) extractEtlMD(payload msPayload, caller string) (newMD *etlMD, ms
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
logmsync(etlMD.Version, newMD, msg, caller)
}
if newMD.version() <= etlMD.version() {
if newMD.version() <= etlMD.version() && msg.Action != apc.ActPrimaryForce {
if newMD.version() < etlMD.version() {
err = newErrDowngrade(h.si, etlMD.String(), newMD.String())
}
Expand All @@ -1602,13 +1602,14 @@ func (h *htrun) extractEtlMD(payload msPayload, caller string) (newMD *etlMD, ms
}

func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation bool) (newSmap *smapX, msg *aisMsg, err error) {
const act = "extract-smap"
if _, ok := payload[revsSmapTag]; !ok {
return
}
newSmap, msg = &smapX{}, &aisMsg{}
smapValue := payload[revsSmapTag]
reader := bytes.NewBuffer(smapValue)
if _, err1 := jsp.Decode(io.NopCloser(reader), newSmap, newSmap.JspOpts(), "extractSmap"); err1 != nil {
if _, err1 := jsp.Decode(io.NopCloser(reader), newSmap, newSmap.JspOpts(), act); err1 != nil {
err = fmt.Errorf(cmn.FmtErrUnmarshal, h, "new Smap", cos.BHead(smapValue), err1)
return
}
Expand All @@ -1635,8 +1636,22 @@ func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation boo
err = cmn.NewErrFailedTo(h, "extract", newSmap, newSmap.validate())
return
}

if msg.Action == apc.ActPrimaryForce {
var origSmap smapX
if err := cos.MorphMarshal(msg.Value, &origSmap); err != nil {
debug.AssertNoErr(err) // unlikely
}
if !origSmap.isPresent(h.si) {
err = &errSelfNotFound{act: act + " (with force)", si: h.si, tag: "orig", smap: &origSmap}
return
}
logmsync(smap.Version, newSmap, msg, caller)
return
}

if !newSmap.isPresent(h.si) {
err = fmt.Errorf("%s: not finding ourselves in %s", h, newSmap)
err = &errSelfNotFound{act: act, si: h.si, tag: "new", smap: newSmap}
return
}
if err = smap.validateUUID(h.si, newSmap, caller, 50 /* ciError */); err != nil {
Expand Down Expand Up @@ -1685,7 +1700,7 @@ func (h *htrun) extractRMD(payload msPayload, caller string) (newRMD *rebMD, msg
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
logmsync(rmd.Version, newRMD, msg, caller)
}
if newRMD.version() <= rmd.version() {
if newRMD.version() <= rmd.version() && msg.Action != apc.ActPrimaryForce {
if newRMD.version() < rmd.version() {
err = newErrDowngrade(h.si, rmd.String(), newRMD.String())
}
Expand Down Expand Up @@ -1719,7 +1734,7 @@ func (h *htrun) extractBMD(payload msPayload, caller string) (newBMD *bucketMD,
if h.si.IsTarget() && msg.UUID != "" {
return
}
if newBMD.version() <= bmd.version() {
if newBMD.version() <= bmd.version() && msg.Action != apc.ActPrimaryForce {
if newBMD.version() < bmd.version() {
err = newErrDowngrade(h.si, bmd.StringEx(), newBMD.StringEx())
}
Expand All @@ -1735,8 +1750,8 @@ func (h *htrun) receiveSmap(newSmap *smapX, msg *aisMsg, payload msPayload, call
smap := h.owner.smap.get()
logmsync(smap.Version, newSmap, msg, caller, newSmap.StringEx())

if !newSmap.isPresent(h.si) {
return fmt.Errorf("%s: not finding self in the new %s", h, newSmap)
if !newSmap.isPresent(h.si) && msg.Action != apc.ActPrimaryForce {
return &errSelfNotFound{act: "receive-smap", si: h.si, tag: "new", smap: newSmap}
}
return h.owner.smap.synchronize(h.si, newSmap, payload, cb)
}
Expand All @@ -1750,7 +1765,7 @@ func (h *htrun) receiveEtlMD(newEtlMD *etlMD, msg *aisMsg, payload msPayload, ca

h.owner.etl.Lock()
etlMD = h.owner.etl.get()
if newEtlMD.version() <= etlMD.version() {
if newEtlMD.version() <= etlMD.version() && msg.Action != apc.ActPrimaryForce {
h.owner.etl.Unlock()
if newEtlMD.version() < etlMD.version() {
err = newErrDowngrade(h.si, etlMD.String(), newEtlMD.String())
Expand All @@ -1768,9 +1783,9 @@ func (h *htrun) receiveEtlMD(newEtlMD *etlMD, msg *aisMsg, payload msPayload, ca
}

// under lock
func (h *htrun) _recvCfg(newConfig *globalConfig, payload msPayload) (err error) {
func (h *htrun) _recvCfg(newConfig *globalConfig, msg *aisMsg, payload msPayload) (err error) {
config := cmn.GCO.Get()
if newConfig.version() <= config.Version {
if newConfig.version() <= config.Version && msg.Action != apc.ActPrimaryForce {
if newConfig.version() == config.Version {
return
}
Expand Down
2 changes: 1 addition & 1 deletion ais/metasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) {
msg, tag = pair.msg, pair.revs.tag()
revs = pair.revs
)
if reqT == reqNotify {
if reqT == reqNotify || msg.Action == apc.ActPrimaryForce {
revsBody = revs.marshal()
} else {
revs = y.jit(pair)
Expand Down
69 changes: 15 additions & 54 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2899,69 +2899,23 @@ func (p *proxy) smapFromURL(baseURL string) (smap *smapX, err error) {
return
}

// forceful primary change - is used when the original primary network is down
// for a while and the remained nodes selected a new primary. After the
// original primary is back it does not attach automatically to the new primary
// and the cluster gets into split-brain mode. This request makes original
// primary connect to the new primary
func (p *proxy) forcefulJoin(w http.ResponseWriter, r *http.Request, proxyID string) {
newPrimaryURL := r.URL.Query().Get(apc.QparamPrimaryCandidate)
nlog.Infof("%s: force new primary %s (URL: %s)", p, proxyID, newPrimaryURL)

if p.SID() == proxyID {
nlog.Warningf("%s is already primary", p)
return
}
smap := p.owner.smap.get()
psi := smap.GetProxy(proxyID)
if psi == nil && newPrimaryURL == "" {
err := &errNodeNotFound{"failed to find new primary", proxyID, p.si, smap}
p.writeErr(w, r, err, http.StatusNotFound)
return
}
if newPrimaryURL == "" {
newPrimaryURL = psi.ControlNet.URL
}
if newPrimaryURL == "" {
err := &errNodeNotFound{"failed to get new primary's direct URL", proxyID, p.si, smap}
p.writeErr(w, r, err)
return
}
newSmap, err := p.smapFromURL(newPrimaryURL)
if err != nil {
p.writeErr(w, r, err)
return
}
primary := newSmap.Primary
if proxyID != primary.ID() {
p.writeErrf(w, r, "%s: proxy %s is not the primary, current %s", p.si, proxyID, newSmap.pp())
return
}

p.metasyncer.becomeNonPrimary() // metasync to stop syncing and cancel all pending requests
p.owner.smap.put(newSmap)
res := p.regTo(primary.ControlNet.URL, primary, apc.DefaultTimeout, nil, nil, false /*keepalive*/)
if res.err != nil {
p.writeErr(w, r, res.toErr())
}
}

func (p *proxy) daeSetPrimary(w http.ResponseWriter, r *http.Request) {
apiItems, err := p.parseURL(w, r, apc.URLPathDae.L, 2, false)
if err != nil {
return
}
proxyID := apiItems[1]
query := r.URL.Query()
force := cos.IsParseBool(query.Get(apc.QparamForce))

var (
proxyID = apiItems[1]
query = r.URL.Query()
force = cos.IsParseBool(query.Get(apc.QparamForce))
)
// force primary change
if force && apiItems[0] == apc.Proxy {
if smap := p.owner.smap.get(); !smap.isPrimary(p.si) {
p.writeErr(w, r, newErrNotPrimary(p.si, smap))
return
}
p.forcefulJoin(w, r, proxyID)
p.forceJoin(w, r, proxyID, query) // TODO -- FIXME: test
return
}
prepare, err := cos.ParseBool(query.Get(apc.QparamPrepare))
Expand All @@ -2970,7 +2924,7 @@ func (p *proxy) daeSetPrimary(w http.ResponseWriter, r *http.Request) {
return
}
if p.owner.smap.get().isPrimary(p.si) {
p.writeErrf(w, r, "%s: am PRIMARY, expecting '/v1/cluster/...' when designating a new one", p)
p.writeErrf(w, r, "%s (self) is primary, expecting '/v1/cluster/...' when designating a new one", p)
return
}
if prepare {
Expand Down Expand Up @@ -3255,7 +3209,7 @@ func (p *proxy) receiveConfig(newConfig *globalConfig, msg *aisMsg, payload msPa
logmsync(oldConfig.Version, newConfig, msg, caller)

p.owner.config.Lock()
err = p._recvCfg(newConfig, payload)
err = p._recvCfg(newConfig, msg, payload)
p.owner.config.Unlock()
if err != nil {
return
Expand Down Expand Up @@ -3399,6 +3353,11 @@ func (p *proxy) receiveBMD(newBMD *bucketMD, msg *aisMsg, payload msPayload, cal

p.owner.bmd.Lock()
bmd = p.owner.bmd.get()

if msg.Action == apc.ActPrimaryForce {
goto skip
}

if err = bmd.validateUUID(newBMD, p.si, nil, caller); err != nil {
cos.Assert(!p.owner.smap.get().isPrimary(p.si))
// cluster integrity error: making exception for non-primary proxies
Expand All @@ -3407,6 +3366,8 @@ func (p *proxy) receiveBMD(newBMD *bucketMD, msg *aisMsg, payload msPayload, cal
p.owner.bmd.Unlock()
return newErrDowngrade(p.si, bmd.String(), newBMD.String())
}

skip:
err = p.owner.bmd.putPersist(newBMD, payload)
debug.AssertNoErr(err)
p.owner.bmd.Unlock()
Expand Down
Loading

0 comments on commit f3bf414

Please sign in to comment.