Skip to content

Commit

Permalink
set primary with force
Browse files Browse the repository at this point in the history
* add 'bump-sync-all'
* critical section around setting new primary (force or no-force)
* msync-Rx: check cluster configs for same UUIDs
  (cluster map and bmd already were getting checked)
* fix 'logmsync' to show UUIDs (not to compare incomparables)
* with refactoring
* part six, prev. commit: aeed9cb

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 5, 2024
1 parent aeed9cb commit b72a4ec
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 90 deletions.
1 change: 1 addition & 0 deletions ais/bucketmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (m *bucketMD) validateUUID(nbmd *bucketMD, si, nsi *meta.Snode, caller stri
// as revs
func (*bucketMD) tag() string { return revsBMDTag }
func (m *bucketMD) version() int64 { return m.Version }
func (m *bucketMD) uuid() string { return m.UUID }
func (*bucketMD) jit(p *proxy) revs { return p.owner.bmd.get() }

func (m *bucketMD) sgl() *memsys.SGL {
Expand Down
1 change: 1 addition & 0 deletions ais/clustermap.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var (
// as revs
func (*smapX) tag() string { return revsSmapTag }
func (m *smapX) version() int64 { return m.Version }
func (m *smapX) uuid() string { return m.UUID }
func (*smapX) jit(p *proxy) revs { return p.owner.smap.get() }

func (m *smapX) sgl() *memsys.SGL {
Expand Down
1 change: 1 addition & 0 deletions ais/etlmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func newEtlMD() (e *etlMD) {
// as revs
func (*etlMD) tag() string { return revsEtlMDTag }
func (e *etlMD) version() int64 { return e.Version }
func (*etlMD) uuid() string { return "" } // TODO: add
func (*etlMD) jit(p *proxy) revs { return p.owner.etl.get() }
func (*etlMD) sgl() *memsys.SGL { return nil }

Expand Down
1 change: 1 addition & 0 deletions ais/gconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var _ revs = (*globalConfig)(nil)
// as revs
func (*globalConfig) tag() string { return revsConfTag }
func (config *globalConfig) version() int64 { return config.Version }
func (config *globalConfig) uuid() string { return config.UUID }
func (*globalConfig) jit(p *proxy) revs { g, _ := p.owner.config.get(); return g }

func (config *globalConfig) sgl() *memsys.SGL {
Expand Down
104 changes: 70 additions & 34 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,25 +1519,63 @@ func (h *htrun) _bch(c *getMaxCii, smap *smapX, nodeTy string) {
// metasync Rx
//

func logmsync(ver int64, revs revs, msg *aisMsg, opts ...string) {
// TODO: reinforce - make it another `ensure*` method
func (h *htrun) warnMsync(r *http.Request, smap *smapX) {
const (
tag = "metasync-recv"
)
if !smap.isValid() {
return
}
pid := r.Header.Get(apc.HdrCallerID)
psi := smap.GetNode(pid)
if psi == nil {
err := &errNodeNotFound{msg: tag + " warning:", id: pid, si: h.si, smap: smap}
nlog.Warningln(err)
} else if !smap.isPrimary(psi) {
nlog.Warningln(h.String(), tag, "expecting primary, got", psi.StringEx(), smap.StringEx())
}
}

func logmsync(lver int64, revs revs, msg *aisMsg, opts ...string) { // caller [, what, luuid]
const tag = "msync Rx:"
var (
what string
caller = opts[0]
lv = strconv.FormatInt(ver, 10)
lv = "v" + strconv.FormatInt(lver, 10)
)
if len(opts) == 1 {
switch len(opts) {
case 1:
what = revs.String()
} else {
if uuid := revs.uuid(); uuid != "" {
what += "[" + uuid + "]"
}
case 2:
what = opts[1]
if strings.IndexByte(what, '[') < 0 {
if uuid := revs.uuid(); uuid != "" {
what += "[" + uuid + "]"
}
}
case 3:
what = opts[1]
luuid := opts[2]
lv += "[" + luuid + "]"

if uuid := revs.uuid(); luuid != "" && uuid != luuid {
// versions incomparable (see apc.ActPrimaryForce)
nlog.InfoDepth(1, "Warning", tag, what, "( different cluster", lv, msg.String(), "<--", caller, ")")
return
}
}

switch {
case ver == revs.version():
nlog.InfoDepth(1, tag, what, "(same v"+lv+",", msg.String(), "<--", caller+")")
case ver > revs.version():
nlog.InfoDepth(1, "Warning", tag, what, "(down from v"+lv+",", msg.String(), "<--", caller+")")
case lver == revs.version():
nlog.InfoDepth(1, tag, what, "( same", lv, msg.String(), "<--", caller, ")")
case lver > revs.version():
nlog.InfoDepth(1, "Warning", tag, what, "( down from", lv, msg.String(), "<--", caller, ")")
default:
nlog.InfoDepth(1, tag, "new", what, "(have v"+lv+",", msg.String(), "<--", caller+")")
nlog.InfoDepth(1, tag, "new", what, "( have", lv, msg.String(), "<--", caller, ")")
}
}

Expand All @@ -1560,7 +1598,7 @@ func (h *htrun) extractConfig(payload msPayload, caller string) (newConfig *glob
}
config := cmn.GCO.Get()
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
logmsync(config.Version, newConfig, msg, caller)
logmsync(config.Version, newConfig, msg, caller, newConfig.String(), config.UUID)
}
if newConfig.version() <= config.Version && msg.Action != apc.ActPrimaryForce {
if newConfig.version() < config.Version {
Expand Down Expand Up @@ -1637,28 +1675,22 @@ func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation boo
return
}

if msg.Action == apc.ActPrimaryForce {
origSmap := &smapX{}
if err := cos.MorphMarshal(msg.Value, origSmap); err != nil {
debug.AssertNoErr(err) // unlikely
}
if !origSmap.isPresent(h.si) {
err = &errSelfNotFound{act: act + " (with force)", si: h.si, tag: "orig", smap: origSmap}
if !newSmap.isPresent(h.si) {
err = &errSelfNotFound{act: act, si: h.si, tag: "new", smap: newSmap}

if msg.Action != apc.ActPrimaryForce {
return
}
logmsync(smap.Version, newSmap, msg, caller)
nlog.Warningln(err, "- proceeding with force")
err = nil
return
}

if !newSmap.isPresent(h.si) {
err = &errSelfNotFound{act: act, si: h.si, tag: "new", smap: newSmap}
return
}
if err = smap.validateUUID(h.si, newSmap, caller, 50 /* ciError */); err != nil {
return // FATAL: cluster integrity error
}
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
logmsync(smap.Version, newSmap, msg, caller)
logmsync(smap.Version, newSmap, msg, caller, newSmap.String(), smap.UUID)
}
_, sameOrigin, _, eq := smap.Compare(&newSmap.Smap)
debug.Assert(sameOrigin)
Expand Down Expand Up @@ -1692,13 +1724,13 @@ func (h *htrun) extractRMD(payload msPayload, caller string) (newRMD *rebMD, msg

rmd := h.owner.rmd.get()
if newRMD.CluID != "" && newRMD.CluID != rmd.CluID && rmd.CluID != "" {
logmsync(rmd.Version, newRMD, msg, caller)
logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID)
err = h.owner.rmd.newClusterIntegrityErr(h.String(), newRMD.CluID, rmd.CluID, rmd.Version)
cos.ExitLog(err) // FATAL
}

if cmn.Rom.FastV(4, cos.SmoduleAIS) {
logmsync(rmd.Version, newRMD, msg, caller)
logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID)
}
if newRMD.version() <= rmd.version() && msg.Action != apc.ActPrimaryForce {
if newRMD.version() < rmd.version() {
Expand Down Expand Up @@ -1728,7 +1760,7 @@ func (h *htrun) extractBMD(payload msPayload, caller string) (newBMD *bucketMD,
}
bmd := h.owner.bmd.get()
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
logmsync(bmd.Version, newBMD, msg, caller)
logmsync(bmd.Version, newBMD, msg, caller, newBMD.String(), bmd.UUID)
}
// skip older iff not transactional - see t.receiveBMD()
if h.si.IsTarget() && msg.UUID != "" {
Expand All @@ -1748,7 +1780,7 @@ func (h *htrun) receiveSmap(newSmap *smapX, msg *aisMsg, payload msPayload, call
return nil
}
smap := h.owner.smap.get()
logmsync(smap.Version, newSmap, msg, caller, newSmap.StringEx())
logmsync(smap.Version, newSmap, msg, caller, newSmap.StringEx(), smap.UUID)

if !newSmap.isPresent(h.si) && msg.Action != apc.ActPrimaryForce {
return &errSelfNotFound{act: "receive-smap", si: h.si, tag: "new", smap: newSmap}
Expand Down Expand Up @@ -1791,13 +1823,17 @@ func (h *htrun) _recvCfg(newConfig *globalConfig, msg *aisMsg, payload msPayload
}
return newErrDowngrade(h.si, config.String(), newConfig.String())
}
if err = h.owner.config.persist(newConfig, payload); err != nil {
return
if config.UUID != "" && config.UUID != newConfig.UUID {
err = fmt.Errorf("%s: cluster configs have different UUIDs: (curr %q vs new %q)", ciError(110), config.UUID, newConfig.UUID)
if msg.Action != apc.ActPrimaryForce {
return err
}
nlog.Warningln(err, "- proceeding with force")
}
if err = cmn.GCO.Update(&newConfig.ClusterConfig); err != nil {
return
if err = h.owner.config.persist(newConfig, payload); err != nil {
return err
}
return
return cmn.GCO.Update(&newConfig.ClusterConfig)
}

func (h *htrun) extractRevokedTokenList(payload msPayload, caller string) (*tokenList, error) {
Expand Down Expand Up @@ -2368,8 +2404,8 @@ func (h *htrun) newAmsgActVal(act string, val any) *aisMsg {
return h.newAmsg(&apc.ActMsg{Action: act, Value: val}, nil)
}

func (h *htrun) newAmsg(actionMsg *apc.ActMsg, bmd *bucketMD, uuid ...string) *aisMsg {
msg := &aisMsg{ActMsg: *actionMsg}
func (h *htrun) newAmsg(amsg *apc.ActMsg, bmd *bucketMD, uuid ...string) *aisMsg {
msg := &aisMsg{ActMsg: *amsg}
if bmd != nil {
msg.BMDVersion = bmd.Version
} else {
Expand Down
1 change: 1 addition & 0 deletions ais/metasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type (
revs interface {
tag() string // enum { revsSmapTag, ... }
version() int64 // the version
uuid() string // UUID
marshal() (b []byte) // marshals the revs
jit(p *proxy) revs // current (just-in-time) instance
sgl() *memsys.SGL // jsp-encoded SGL
Expand Down
8 changes: 5 additions & 3 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,8 @@ func (p *proxy) metasyncHandler(w http.ResponseWriter, r *http.Request) {
return
}

p.warnMsync(r, smap)

payload := make(msPayload)
if errP := payload.unmarshal(r.Body, "metasync put"); errP != nil {
cmn.WriteErr(w, r, errP)
Expand Down Expand Up @@ -3206,7 +3208,7 @@ func (p *proxy) htHandler(w http.ResponseWriter, r *http.Request) {
// compare w/ t.receiveConfig
func (p *proxy) receiveConfig(newConfig *globalConfig, msg *aisMsg, payload msPayload, caller string) (err error) {
oldConfig := cmn.GCO.Get()
logmsync(oldConfig.Version, newConfig, msg, caller)
logmsync(oldConfig.Version, newConfig, msg, caller, newConfig.String(), oldConfig.UUID)

p.owner.config.Lock()
err = p._recvCfg(newConfig, msg, payload)
Expand Down Expand Up @@ -3299,7 +3301,7 @@ func (p *proxy) _remais(newConfig *cmn.ClusterConfig, blocking bool) {

func (p *proxy) receiveRMD(newRMD *rebMD, msg *aisMsg, caller string) (err error) {
rmd := p.owner.rmd.get()
logmsync(rmd.Version, newRMD, msg, caller)
logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID)

p.owner.rmd.Lock()
rmd = p.owner.rmd.get()
Expand Down Expand Up @@ -3349,7 +3351,7 @@ func (p *proxy) smapOnUpdate(newSmap, oldSmap *smapX, nfl, ofl cos.BitFlags) {

func (p *proxy) receiveBMD(newBMD *bucketMD, msg *aisMsg, payload msPayload, caller string) (err error) {
bmd := p.owner.bmd.get()
logmsync(bmd.Version, newBMD, msg, caller)
logmsync(bmd.Version, newBMD, msg, caller, newBMD.String(), bmd.UUID)

p.owner.bmd.Lock()
bmd = p.owner.bmd.get()
Expand Down
1 change: 1 addition & 0 deletions ais/prxauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ var _ revs = (*tokenList)(nil)

func (*tokenList) tag() string { return revsTokenTag }
func (t *tokenList) version() int64 { return t.Version } // no versioning: receivers keep adding tokens to their lists
func (*tokenList) uuid() string { return "" } // TODO: add
func (t *tokenList) marshal() []byte { return cos.MustMarshal(t) }
func (t *tokenList) jit(_ *proxy) revs { return t }
func (*tokenList) sgl() *memsys.SGL { return nil }
Expand Down
Loading

0 comments on commit b72a4ec

Please sign in to comment.