Skip to content

Commit

Permalink
EC streams: synchronize opening cluster-wide; throttle more aggressiv…
Browse files Browse the repository at this point in the history
…ely when OOM

* all ec xactions: synchronize via `GoRunW`
* throttle vs OOM
* prev. commits: 5eb4677, aaa0bfa

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 1, 2024
1 parent 485dc87 commit 441d2dd
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 18 deletions.
5 changes: 2 additions & 3 deletions ec/bencodex.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,9 @@ func newXactBckEncode(bck *meta.Bck, uuid string, checkAndRecover bool) (r *Xact
return r, nil
}

func (r *XactBckEncode) Run(wg *sync.WaitGroup) {
wg.Done()

func (r *XactBckEncode) Run(gowg *sync.WaitGroup) {
ECM.incActive(r)
gowg.Done()

opts := &mpather.JgroupOpts{
CTs: []string{fs.ObjectType},
Expand Down
6 changes: 4 additions & 2 deletions ec/getx.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func (p *getFactory) Start() error {
xec := ECM.NewGetXact(p.Bck.Bucket())
xec.DemandBase.Init(cos.GenUUID(), p.Kind(), p.Bck, 0 /*use default*/)
p.xctn = xec
go xec.Run(nil)

xact.GoRunW(xec)
return nil
}
func (*getFactory) Kind() string { return apc.ActECGet }
Expand Down Expand Up @@ -184,7 +185,7 @@ func (r *XactGet) dispatchReq(req *request, lom *core.LOM) error {
return nil
}

func (r *XactGet) Run(*sync.WaitGroup) {
func (r *XactGet) Run(gowg *sync.WaitGroup) {
nlog.Infoln(r.Name())
for _, jog := range r.getJoggers {
go jog.run()
Expand All @@ -194,6 +195,7 @@ func (r *XactGet) Run(*sync.WaitGroup) {
defer ticker.Stop()

ECM.incActive(r)
gowg.Done()

// as of now all requests are equal. Some may get throttling later
for {
Expand Down
10 changes: 6 additions & 4 deletions ec/putjogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type (
stopCh cos.StopCh // jogger management channel: to stop it

ntotal int64 // (throttle to prevent OOM)
micro bool // (throttle tuneup)
toDisk bool // use files or SGL (NOTE: toDisk == false may cause OOM)
}
)
Expand Down Expand Up @@ -147,11 +148,12 @@ func (c *putJogger) _do(req *request, lom *core.LOM) {
c.parent.AddErr(err, 0)
} else if !c.toDisk { // throttle
c.ntotal++
if fs.IsMiniThrottle(c.ntotal) {
if pressure := g.pmm.Pressure(); pressure >= memsys.PressureExtreme {
if (c.micro && fs.IsMicroThrottle(c.ntotal)) || fs.IsMiniThrottle(c.ntotal) {
if pressure := g.pmm.Pressure(); pressure >= memsys.PressureHigh {
time.Sleep(fs.Throttle100ms)
} else if pressure == memsys.PressureHigh {
time.Sleep(fs.Throttle10ms)
if !c.micro && pressure >= memsys.PressureExtreme {
c.micro = true
}
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions ec/putx.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func (p *putFactory) Start() error {
xec := ECM.NewPutXact(p.Bck.Bucket())
xec.DemandBase.Init(cos.GenUUID(), p.Kind(), p.Bck, 0 /*use default*/)
p.xctn = xec
go xec.Run(nil)

xact.GoRunW(xec)
return nil
}

Expand Down Expand Up @@ -153,7 +154,7 @@ func (r *XactPut) dispatchRequest(req *request, lom *core.LOM) error {
return nil
}

func (r *XactPut) Run(*sync.WaitGroup) {
func (r *XactPut) Run(gowg *sync.WaitGroup) {
nlog.Infoln(r.Name())

var wg sync.WaitGroup
Expand All @@ -163,6 +164,7 @@ func (r *XactPut) Run(*sync.WaitGroup) {
}

ECM.incActive(r)
gowg.Done()

ticker := time.NewTicker(r.config.Periodic.StatsTime.D())
r.mainLoop(ticker)
Expand All @@ -172,7 +174,6 @@ func (r *XactPut) Run(*sync.WaitGroup) {
r.Finish()
}

// all requests are equal, throttle TODO
func (r *XactPut) mainLoop(ticker *time.Ticker) {
for {
select {
Expand Down
6 changes: 4 additions & 2 deletions ec/respondx.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func (p *rspFactory) Start() error {
xec := ECM.NewRespondXact(p.Bck.Bucket())
xec.DemandBase.Init(cos.GenUUID(), p.Kind(), p.Bck, 0 /*use default*/)
p.xctn = xec
go xec.Run(nil)

xact.GoRunW(xec)
return nil
}

Expand All @@ -77,10 +78,11 @@ func newRespondXact(bck *cmn.Bck, mgr *Manager) *XactRespond {
return xctn
}

func (r *XactRespond) Run(*sync.WaitGroup) {
func (r *XactRespond) Run(gowg *sync.WaitGroup) {
nlog.Infoln(r.Name())

ECM.incActive(r)
gowg.Done()

ticker := time.NewTicker(r.config.Periodic.StatsTime.D())
defer ticker.Stop()
Expand Down
10 changes: 6 additions & 4 deletions fs/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ const (
Throttle10ms = 10 * time.Millisecond
Throttle100ms = 100 * time.Millisecond

throttleBatch = 0x1f // a.k.a. unit or period
throMiniBatch = 0x1f >> 1 // ditto
throttleBatch = 0x1f // a.k.a. unit or period
throMiniBatch = 0x1f >> 1
throMicroBatch = 0x1f >> 2
)

func IsThrottle(n int64) bool { return n&throttleBatch == throttleBatch }
func IsMiniThrottle(n int64) bool { return n&throMiniBatch == throMiniBatch }
func IsThrottle(n int64) bool { return n&throttleBatch == throttleBatch }
func IsMiniThrottle(n int64) bool { return n&throMiniBatch == throMiniBatch }
func IsMicroThrottle(n int64) bool { return n&throMicroBatch == throMicroBatch }

0 comments on commit 441d2dd

Please sign in to comment.