Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: dynamic pool #12

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ jobs:
- name: Run linter
uses: golangci/[email protected] # Action page: <https://github.com/golangci/golangci-lint-action>
with:
version: v1.60 # without patch version
version: v1.62 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m --build-tags=race
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
strategy:
fail-fast: false
matrix:
php: ["8.3"]
php: ["8.4"]
go: [stable]
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
steps:
Expand Down
96 changes: 92 additions & 4 deletions pool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package pool

import (
"runtime"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -32,12 +34,14 @@ type Config struct {
StreamTimeout time.Duration `mapstructure:"stream_timeout"`
// Supervision config to limit worker and pool memory usage.
Supervisor *SupervisorConfig `mapstructure:"supervisor"`
// Dynamic allocation config
DynamicAllocatorOpts *DynamicAllocationOpts `mapstructure:"dynamic_allocator"`
}

// InitDefaults enables default config values.
func (cfg *Config) InitDefaults() {
if cfg.NumWorkers == 0 {
cfg.NumWorkers = uint64(runtime.NumCPU())
cfg.NumWorkers = uint64(runtime.NumCPU()) //nolint:gosec
}

if cfg.AllocateTimeout == 0 {
Expand All @@ -56,10 +60,14 @@ func (cfg *Config) InitDefaults() {
cfg.ResetTimeout = time.Minute
}

if cfg.Supervisor == nil {
return
if cfg.Supervisor != nil {
cfg.Supervisor.InitDefaults()
}

// initialize the dynamic allocator
if cfg.DynamicAllocatorOpts != nil {
cfg.DynamicAllocatorOpts.InitDefaults()
}
cfg.Supervisor.InitDefaults()
}

type SupervisorConfig struct {
Expand All @@ -81,3 +89,83 @@ func (cfg *SupervisorConfig) InitDefaults() {
cfg.WatchTick = time.Second * 5
}
}

type DynamicAllocationOpts struct {
MaxWorkers uint64 `mapstructure:"max_workers"`
SpawnRate uint64 `mapstructure:"spawn_rate"`
IdleTimeout time.Duration `mapstructure:"idle_timeout"`

// internal, should be private and moved to the static_pool folder
currAllocated uint64
lock *sync.Mutex
ttlTriggerChan chan struct{}
started atomic.Pointer[bool]
}

func (d *DynamicAllocationOpts) TriggerTTL() {
d.ttlTriggerChan <- struct{}{}
}

func (d *DynamicAllocationOpts) GetTriggerTTLChan() chan struct{} {
return d.ttlTriggerChan
}

func (d *DynamicAllocationOpts) Lock() {
d.lock.Lock()
}

func (d *DynamicAllocationOpts) Unlock() {
d.lock.Unlock()
}

func (d *DynamicAllocationOpts) CurrAllocated() uint64 {
return d.currAllocated
}

func (d *DynamicAllocationOpts) IncAllocated() {
d.currAllocated++
}

func (d *DynamicAllocationOpts) DecAllocated() {
d.currAllocated--
}

func (d *DynamicAllocationOpts) ResetAllocated() {
d.currAllocated = 0
}

func (d *DynamicAllocationOpts) IsStarted() bool {
return *d.started.Load()
}

func (d *DynamicAllocationOpts) Start() {
d.started.Store(p(true))
}

func (d *DynamicAllocationOpts) Stop() {
d.started.Store(p(false))
}

func (d *DynamicAllocationOpts) InitDefaults() {
d.lock = &sync.Mutex{}

if d.MaxWorkers == 0 {
d.MaxWorkers = 10
}

if d.SpawnRate == 0 {
d.SpawnRate = 1
}

if d.IdleTimeout == 0 || d.IdleTimeout < time.Second {
d.IdleTimeout = time.Minute
}

d.ttlTriggerChan = make(chan struct{}, 1)
d.currAllocated = 0
d.started.Store(p(false))
}

func p[T any](val T) *T {
return &val
}
146 changes: 146 additions & 0 deletions pool/static_pool/dyn_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package static_pool //nolint:stylecheck

import (
"context"
stderr "errors"
"time"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/pool/fsm"
"github.com/roadrunner-server/pool/worker"
"go.uber.org/zap"
)

func (sp *Pool) allocateDynamically() (*worker.Process, error) {
const op = errors.Op("allocate_dynamically")

// obtain an operation lock
// we can use a lock-free approach here, but it's not necessary
sp.cfg.DynamicAllocatorOpts.Lock()
defer sp.cfg.DynamicAllocatorOpts.Unlock()

if !sp.cfg.DynamicAllocatorOpts.IsStarted() {
// start the dynamic allocator listener
sp.dynamicTTLListener()
}

// reset the TTL listener
sp.cfg.DynamicAllocatorOpts.TriggerTTL()

// if we already allocated max workers, we can't allocate more
if sp.cfg.DynamicAllocatorOpts.CurrAllocated() >= sp.cfg.DynamicAllocatorOpts.MaxWorkers {
// can't allocate more
return nil, errors.E(op, stderr.New("can't allocate more workers, increase max_workers option"))
}

// if we have dynamic allocator, we can try to allocate new worker
// this worker should not be released here, but instead will be released in the Exec function
nw, err := sp.allocator()
if err != nil {
sp.log.Error("failed to allocate dynamic worker", zap.Error(err))
return nil, errors.E(op, err)
}

watchW := make([]*worker.Process, 0, 10)
watchW = append(watchW, nw)

// increase number of additionally allocated options
sp.cfg.DynamicAllocatorOpts.IncAllocated()

sp.log.Debug("allocated additional worker",
zap.Int64("pid", nw.Pid()),
zap.Uint64("max_execs", nw.MaxExecs()),
zap.Uint64("currently additionally allocated", sp.cfg.DynamicAllocatorOpts.CurrAllocated()),
)

// we starting from the 1 because we already allocated one worker which would be released in the Exec function
for i := uint64(1); i <= sp.cfg.DynamicAllocatorOpts.SpawnRate; i++ {
// spawn as much workers as user specified in the spawn rate configuration, but not more than max workers
if sp.cfg.DynamicAllocatorOpts.CurrAllocated() >= sp.cfg.DynamicAllocatorOpts.MaxWorkers {
break
}

bw, err := sp.allocator()
if err != nil {
return nil, errors.E(op, err)
}

sp.cfg.DynamicAllocatorOpts.IncAllocated()
// add worker to the watcher
watchW = append(watchW, bw)

sp.log.Debug("allocated additional worker",
zap.Int64("pid", bw.Pid()),
zap.Uint64("max_execs", bw.MaxExecs()),
zap.Uint64("currently additionally allocated", sp.cfg.DynamicAllocatorOpts.CurrAllocated()),
)
}

err = sp.ww.Watch(watchW)
if err != nil {
return nil, errors.E(op, err)
}

return nw, nil
}

func (sp *Pool) dynamicTTLListener() {
if sp.cfg.DynamicAllocatorOpts == nil || sp.cfg.Debug {
return
}

sp.log.Debug("starting dynamic allocator listener", zap.Duration("idle_timeout", sp.cfg.DynamicAllocatorOpts.IdleTimeout))
go func() {
// DynamicAllocatorOpts are read-only, so we can use them without a lock
triggerTTL := time.NewTicker(sp.cfg.DynamicAllocatorOpts.IdleTimeout)
for {
select {
case <-sp.stopCh:
sp.log.Debug("dynamic allocator listener stopped")
goto exit
// when this channel is triggered, we should deallocate all dynamically allocated workers
case <-triggerTTL.C:
sp.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached"))
// get the Exec (the whole operation) lock
sp.mu.Lock()
// get the DynamicAllocatorOpts lock to prevent operations on the CurrAllocated
sp.cfg.DynamicAllocatorOpts.Lock()

// if we don't have any dynamically allocated workers, we can skip the deallocation
if sp.cfg.DynamicAllocatorOpts.CurrAllocated() == 0 {
sp.cfg.DynamicAllocatorOpts.Unlock()
sp.mu.Unlock()
goto exit
}

for i := sp.cfg.DynamicAllocatorOpts.CurrAllocated(); i > 0; i-- {
// take the worker from the stack, inifinite timeout
w, err := sp.ww.Take(context.Background())
if err != nil {
sp.log.Error("failed to take worker from the stack", zap.Error(err))
continue
}

// set the worker state to be destroyed
w.State().Transition(fsm.StateDestroyed)
// release the worker
sp.ww.Release(w)
}

sp.cfg.DynamicAllocatorOpts.ResetAllocated()

sp.cfg.DynamicAllocatorOpts.Unlock()
sp.mu.Unlock()
sp.log.Debug("dynamic workers deallocated", zap.String("reason", "idle timeout reached"))
triggerTTL.Stop()
goto exit

// when this channel is triggered, we should extend the TTL of all dynamically allocated workers
case <-sp.cfg.DynamicAllocatorOpts.GetTriggerTTLChan():
triggerTTL.Reset(sp.cfg.DynamicAllocatorOpts.IdleTimeout)
}
}
exit:
sp.cfg.DynamicAllocatorOpts.Stop()
}()
}
23 changes: 21 additions & 2 deletions pool/static_pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p
factory: factory,
log: log,
queue: 0,
stopCh: make(chan struct{}),
}

// apply options
Expand Down Expand Up @@ -110,7 +111,7 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p
p.supervisedExec = true
}
// start the supervisor
p.Start()
p.start()
}

return p, nil
Expand Down Expand Up @@ -364,6 +365,17 @@ func (sp *Pool) QueueSize() uint64 {
return atomic.LoadUint64(&sp.queue)
}

func (sp *Pool) NumDynamic() uint64 {
if sp.cfg.DynamicAllocatorOpts == nil {
return 0
}

sp.cfg.DynamicAllocatorOpts.Lock()
defer sp.cfg.DynamicAllocatorOpts.Unlock()

return sp.cfg.DynamicAllocatorOpts.CurrAllocated()
}

// Destroy all underlying stack (but let them complete the task).
func (sp *Pool) Destroy(ctx context.Context) {
sp.log.Info("destroy signal received", zap.Duration("timeout", sp.cfg.DestroyTimeout))
Expand All @@ -375,6 +387,7 @@ func (sp *Pool) Destroy(ctx context.Context) {
}
sp.ww.Destroy(ctx)
atomic.StoreUint64(&sp.queue, 0)
close(sp.stopCh)
}

func (sp *Pool) Reset(ctx context.Context) error {
Expand Down Expand Up @@ -409,7 +422,13 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr
zap.String("internal_event_name", events.EventNoFreeWorkers.String()),
zap.Error(err),
)
return nil, errors.E(op, err)

// if we don't have dynamic allocator or in debug mode, we can't allocate a new worker
if sp.cfg.DynamicAllocatorOpts == nil || sp.cfg.Debug {
return nil, errors.E(op, errors.NoFreeWorkers)
}

return sp.allocateDynamically()
}
// else if err not nil - return error
return nil, errors.E(op, err)
Expand Down
Loading
Loading