From 8679c0156298fc4981d9c14fb4560b21dd2a7bc4 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 27 Nov 2024 21:04:08 +0100 Subject: [PATCH 1/7] feature[wip]: dynamic pool Signed-off-by: Valery Piashchynski --- pool/config.go | 78 +++++++++++++++- pool/static_pool/pool.go | 149 ++++++++++++++++++++++++++++++- pool/static_pool/pool_test.go | 64 ++++++++++++- pool/static_pool/supervisor.go | 6 +- tests/composer.json | 37 ++++---- tests/psr-worker-bench.php | 24 ++--- tests/worker-slow-dyn.php | 31 +++++++ worker/worker.go | 2 +- worker_watcher/worker_watcher.go | 57 ++++++++++-- 9 files changed, 395 insertions(+), 53 deletions(-) create mode 100644 tests/worker-slow-dyn.php diff --git a/pool/config.go b/pool/config.go index 8cd2cea..ab0dee1 100644 --- a/pool/config.go +++ b/pool/config.go @@ -2,6 +2,7 @@ package pool import ( "runtime" + "sync" "time" ) @@ -32,12 +33,14 @@ type Config struct { StreamTimeout time.Duration `mapstructure:"stream_timeout"` // Supervision config to limit worker and pool memory usage. Supervisor *SupervisorConfig `mapstructure:"supervisor"` + // Dynamic allocation config + DynamicAllocatorOpts *DynamicAllocationOpts `mapstructure:"dynamic_allocator"` } // InitDefaults enables default config values. func (cfg *Config) InitDefaults() { if cfg.NumWorkers == 0 { - cfg.NumWorkers = uint64(runtime.NumCPU()) + cfg.NumWorkers = uint64(runtime.NumCPU()) //nolint:gosec } if cfg.AllocateTimeout == 0 { @@ -56,10 +59,14 @@ func (cfg *Config) InitDefaults() { cfg.ResetTimeout = time.Minute } - if cfg.Supervisor == nil { - return + if cfg.Supervisor != nil { + cfg.Supervisor.InitDefaults() + } + + // initialize the dynamic allocator + if cfg.DynamicAllocatorOpts != nil { + cfg.DynamicAllocatorOpts.InitDefaults() } - cfg.Supervisor.InitDefaults() } type SupervisorConfig struct { @@ -81,3 +88,66 @@ func (cfg *SupervisorConfig) InitDefaults() { cfg.WatchTick = time.Second * 5 } } + +type DynamicAllocationOpts struct { + MaxWorkers uint64 `mapstructure:"max_workers"` + SpawnRate uint64 `mapstructure:"spawn_rate"` + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + + // internal, should be private and moved to the static_pool folder + currAllocated uint64 + lock *sync.Mutex + ttlTriggerChan chan struct{} + ticketer *time.Ticker +} + +func (d *DynamicAllocationOpts) TriggerTTL() { + d.ttlTriggerChan <- struct{}{} +} + +func (d *DynamicAllocationOpts) GetTriggerTTLChan() chan struct{} { + return d.ttlTriggerChan +} + +func (d *DynamicAllocationOpts) Lock() { + d.lock.Lock() +} + +func (d *DynamicAllocationOpts) Unlock() { + d.lock.Unlock() +} + +func (d *DynamicAllocationOpts) CurrAllocated() uint64 { + return d.currAllocated +} + +func (d *DynamicAllocationOpts) IncAllocated() { + d.currAllocated++ +} + +func (d *DynamicAllocationOpts) DecAllocated() { + d.currAllocated-- +} + +func (d *DynamicAllocationOpts) ResetAllocated() { + d.currAllocated = 0 +} + +func (d *DynamicAllocationOpts) InitDefaults() { + d.lock = &sync.Mutex{} + + if d.MaxWorkers == 0 { + d.MaxWorkers = 10 + } + + if d.SpawnRate == 0 { + d.SpawnRate = 1 + } + + if d.IdleTimeout == 0 || d.IdleTimeout < time.Second { + d.IdleTimeout = time.Minute + } + + d.ttlTriggerChan = make(chan struct{}, 1) + d.currAllocated = 0 +} diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 8166272..74d96e7 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -5,6 +5,7 @@ import ( "runtime" "sync" "sync/atomic" + "time" "unsafe" "github.com/roadrunner-server/errors" @@ -71,6 +72,7 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p factory: factory, log: log, queue: 0, + stopCh: make(chan struct{}), } // apply options @@ -110,7 +112,11 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p p.supervisedExec = true } // start the supervisor - p.Start() + p.start() + } + + if p.cfg.DynamicAllocatorOpts != nil { + p.dynamicTTLListener() } return p, nil @@ -364,6 +370,17 @@ func (sp *Pool) QueueSize() uint64 { return atomic.LoadUint64(&sp.queue) } +func (sp *Pool) NumDynamic() uint64 { + if sp.cfg.DynamicAllocatorOpts == nil { + return 0 + } + + sp.cfg.DynamicAllocatorOpts.Lock() + defer sp.cfg.DynamicAllocatorOpts.Unlock() + + return sp.cfg.DynamicAllocatorOpts.CurrAllocated() +} + // Destroy all underlying stack (but let them complete the task). func (sp *Pool) Destroy(ctx context.Context) { sp.log.Info("destroy signal received", zap.Duration("timeout", sp.cfg.DestroyTimeout)) @@ -375,6 +392,7 @@ func (sp *Pool) Destroy(ctx context.Context) { } sp.ww.Destroy(ctx) atomic.StoreUint64(&sp.queue, 0) + close(sp.stopCh) } func (sp *Pool) Reset(ctx context.Context) error { @@ -409,10 +427,137 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr zap.String("internal_event_name", events.EventNoFreeWorkers.String()), zap.Error(err), ) - return nil, errors.E(op, err) + + // if we don't have dynamic allocator or in debug mode, we can't allocate a new worker + if sp.cfg.DynamicAllocatorOpts == nil || sp.cfg.Debug { + return nil, errors.E(op, err) + } + + // obtain an operation lock + // we can use a lock-free approach here, but it's not necessary + sp.cfg.DynamicAllocatorOpts.Lock() + defer sp.cfg.DynamicAllocatorOpts.Unlock() + + // reset the TTL listener + sp.cfg.DynamicAllocatorOpts.TriggerTTL() + + // if we already allocated max workers, we can't allocate more + if sp.cfg.DynamicAllocatorOpts.CurrAllocated() >= sp.cfg.DynamicAllocatorOpts.MaxWorkers { + // can't allocate more + return nil, errors.E(op, err) + } + + // if we have dynamic allocator, we can try to allocate new worker + // this worker should not be released here, but instead will be released in the Exec function + nw, err := sp.allocator() + if err != nil { + sp.log.Error("failed to allocate dynamic worker", zap.Error(err)) + return nil, errors.E(op, err) + } + + watchW := make([]*worker.Process, 0, 10) + watchW = append(watchW, nw) + + // increase number of additionally allocated options + sp.cfg.DynamicAllocatorOpts.IncAllocated() + + sp.log.Debug("allocated additional worker", + zap.Int64("pid", nw.Pid()), + zap.Uint64("max_execs", nw.MaxExecs()), + zap.Uint64("currently additionally allocated", sp.cfg.DynamicAllocatorOpts.CurrAllocated()), + ) + + // we starting from the 1 because we already allocated one worker which would be released in the Exec function + for i := uint64(1); i <= sp.cfg.DynamicAllocatorOpts.SpawnRate; i++ { + // spawn as much workers as user specified in the spawn rate configuration, but not more than max workers + if sp.cfg.DynamicAllocatorOpts.CurrAllocated() >= sp.cfg.DynamicAllocatorOpts.MaxWorkers { + break + } + + bw, err := sp.allocator() + if err != nil { + return nil, errors.E(op, err) + } + + sp.cfg.DynamicAllocatorOpts.IncAllocated() + // add worker to the watcher + watchW = append(watchW, bw) + + sp.log.Debug("allocated additional worker", + zap.Int64("pid", bw.Pid()), + zap.Uint64("max_execs", bw.MaxExecs()), + zap.Uint64("currently additionally allocated", sp.cfg.DynamicAllocatorOpts.CurrAllocated()), + ) + } + + err = sp.ww.Watch(watchW) + if err != nil { + return nil, errors.E(op, err) + } + + return nw, nil } // else if err not nil - return error return nil, errors.E(op, err) } return w, nil } + +func (sp *Pool) dynamicTTLListener() { + if sp.cfg.DynamicAllocatorOpts == nil || sp.cfg.Debug { + return + } + + sp.log.Debug("starting dynamic allocator listener", zap.Duration("idle_timeout", sp.cfg.DynamicAllocatorOpts.IdleTimeout)) + go func() { + // DynamicAllocatorOpts are read-only, so we can use them without a lock + triggerTTL := time.NewTicker(sp.cfg.DynamicAllocatorOpts.IdleTimeout) + for { + select { + case <-sp.stopCh: + sp.log.Debug("dynamic allocator listener stopped") + return + // when this channel is triggered, we should deallocate all dynamically allocated workers + case <-triggerTTL.C: + sp.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) + // get the Exec (the whole operation) lock + sp.mu.Lock() + // get the DynamicAllocatorOpts lock to prevent operations on the CurrAllocated + sp.cfg.DynamicAllocatorOpts.Lock() + + // if we don't have any dynamically allocated workers, we can skip the deallocation + if sp.cfg.DynamicAllocatorOpts.CurrAllocated() == 0 { + sp.cfg.DynamicAllocatorOpts.Unlock() + sp.mu.Unlock() + return + } + + for i := sp.cfg.DynamicAllocatorOpts.CurrAllocated(); i > 0; i-- { + // take the worker from the stack, inifinite timeout + w, err := sp.ww.Take(context.Background()) + if err != nil { + sp.log.Error("failed to take worker from the stack", zap.Error(err)) + continue + } + + // set the worker state to be destroyed + w.State().Transition(fsm.StateDestroyed) + // release the worker + sp.ww.Release(w) + } + + sp.cfg.DynamicAllocatorOpts.ResetAllocated() + + sp.cfg.DynamicAllocatorOpts.Unlock() + sp.mu.Unlock() + sp.log.Debug("dynamic workers deallocated", zap.String("reason", "idle timeout reached")) + triggerTTL.Stop() + return + + // when this channel is triggered, we should extend the TTL of all dynamically allocated workers + case <-sp.cfg.DynamicAllocatorOpts.GetTriggerTTLChan(): + triggerTTL.Reset(sp.cfg.DynamicAllocatorOpts.IdleTimeout) + } + } + }() +} diff --git a/pool/static_pool/pool_test.go b/pool/static_pool/pool_test.go index 7ef2a6c..1f360eb 100644 --- a/pool/static_pool/pool_test.go +++ b/pool/static_pool/pool_test.go @@ -55,6 +55,68 @@ func Test_NewPool(t *testing.T) { p.Destroy(ctx) } +func Test_DynamicPool(t *testing.T) { + dynAllCfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 5, + IdleTimeout: time.Second * 15, + SpawnRate: 2, + }, + } + + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(log()), + dynAllCfg, + log(), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + select { + case resp := <-r: + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + go func() { + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + select { + case resp := <-r: + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + + wg.Wait() + + time.Sleep(time.Second * 20) + + p.Destroy(ctx) +} + func Test_NewPoolAddRemoveWorkers(t *testing.T) { testCfg2 := &pool.Config{ NumWorkers: 1, @@ -1074,7 +1136,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ - NumWorkers: uint64(runtime.NumCPU()), + NumWorkers: uint64(runtime.NumCPU()), //nolint:gosec AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, }, diff --git a/pool/static_pool/supervisor.go b/pool/static_pool/supervisor.go index b65c8f9..3512702 100644 --- a/pool/static_pool/supervisor.go +++ b/pool/static_pool/supervisor.go @@ -16,7 +16,7 @@ const ( NsecInSec int64 = 1000000000 ) -func (sp *Pool) Start() { +func (sp *Pool) start() { go func() { watchTout := time.NewTicker(sp.cfg.Supervisor.WatchTick) defer watchTout.Stop() @@ -35,10 +35,6 @@ func (sp *Pool) Start() { }() } -func (sp *Pool) Stop() { - sp.stopCh <- struct{}{} -} - func (sp *Pool) control() { now := time.Now() diff --git a/tests/composer.json b/tests/composer.json index a6a2f47..5aecd89 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -1,21 +1,20 @@ { - "minimum-stability": "dev", - "prefer-stable": true, - "require": { - "nyholm/psr7": "^1.4", - "spiral/roadrunner": "^2.0", - "spiral/roadrunner-http": "^2.1", - "spiral/roadrunner-worker": "^2.2", - "temporal/sdk": ">=1.0", - "spiral/tokenizer": ">=2.7", - "spiral/goridge": "^3.2", - "spiral/roadrunner-metrics": "^2.0" - }, - "autoload": { - "psr-4": { - "Temporal\\Tests\\": "src" - } - }, - "name": "test/test", - "description": "test" + "minimum-stability": "dev", + "prefer-stable": true, + "require": { + "nyholm/psr7": "^1.4", + "spiral/roadrunner-http": "^3.5", + "spiral/roadrunner-worker": "^3.5", + "temporal/sdk": ">=1.0", + "spiral/tokenizer": ">=2.7", + "spiral/goridge": "^4.0", + "spiral/roadrunner-metrics": "^2.0" + }, + "autoload": { + "psr-4": { + "Temporal\\Tests\\": "src" + } + }, + "name": "test/test", + "description": "test" } diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php index e809f38..1513211 100644 --- a/tests/psr-worker-bench.php +++ b/tests/psr-worker-bench.php @@ -10,21 +10,21 @@ ini_set('display_errors', 'stderr'); require __DIR__ . "/vendor/autoload.php"; -$worker = RoadRunner\Worker::create(); +$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); $psr7 = new RoadRunner\Http\PSR7Worker( - $worker, - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory() + $worker, + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() ); while ($req = $psr7->waitRequest()) { - try { - $resp = new \Nyholm\Psr7\Response(); - $resp->getBody()->write("hello world"); + try { + $resp = new \Nyholm\Psr7\Response(); + $resp->getBody()->write("hello world"); - $psr7->respond($resp); - } catch (\Throwable $e) { - $psr7->getWorker()->error((string)$e); - } + $psr7->respond($resp); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } } diff --git a/tests/worker-slow-dyn.php b/tests/worker-slow-dyn.php new file mode 100644 index 0000000..3f4aa93 --- /dev/null +++ b/tests/worker-slow-dyn.php @@ -0,0 +1,31 @@ +waitRequest()) { + try { + $resp = new \Nyholm\Psr7\Response(); + sleep(10); + $resp->getBody()->write("hello world"); + + $psr7->respond($resp); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +} diff --git a/worker/worker.go b/worker/worker.go index 4decf09..ea1f079 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -381,7 +381,7 @@ func (w *Process) Exec(ctx context.Context, p *payload.Payload) (*payload.Payloa c := w.getCh() // set last used time - w.State().SetLastUsed(uint64(time.Now().UnixNano())) + w.State().SetLastUsed(uint64(time.Now().UnixNano())) //nolint:gosec w.State().Transition(fsm.StateWorking) go func() { diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 8491598..7dc1e42 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -15,6 +15,8 @@ import ( "go.uber.org/zap" ) +const maxWorkers = 500 + // Allocator is responsible for worker allocation in the pool type Allocator func() (*worker.Process, error) @@ -40,9 +42,8 @@ func NewSyncWorkerWatcher(allocator Allocator, log *zap.Logger, numWorkers uint6 eb, _ := events.NewEventBus() return &WorkerWatcher{ container: channel.NewVector(), - - log: log, - eventBus: eb, + log: log, + eventBus: eb, // pass a ptr to the number of workers to avoid blocking in the TTL loop numWorkers: numWorkers, allocateTimeout: allocateTimeout, @@ -54,17 +55,55 @@ func NewSyncWorkerWatcher(allocator Allocator, log *zap.Logger, numWorkers uint6 func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { ww.Lock() defer ww.Unlock() + + // if the container is full, return an error + if atomic.LoadUint64(&ww.numWorkers) >= maxWorkers { + return errors.E(errors.WorkerAllocate, errors.Str("container is full")) + } + + // if the number of workers to add is greater than the maximum number of workers + // we can add only the number of workers that can be added w/o exceeding the limit + if atomic.LoadUint64(&ww.numWorkers)+uint64(len(workers)) > maxWorkers { + // calculate the number of workers that can be added + // for example, if we have 100 [0..99] workers to add, but the container has 450 workers, + // we can add only 50 workers: 500 - 450 = 50 [0..49] + maxAllocated := maxWorkers - atomic.LoadUint64(&ww.numWorkers) + + // workers[:maxAllocated] - up to 50 including 50 (in our example) + toWatch := workers[:maxAllocated] + // toAllocate - will contain [0, 1, 2, ..., 50] + [50..99] (100 in total) + for i := 0; i < len(toWatch); i++ { + ww.container.Push(toWatch[i]) + // add worker to watch slice + ww.workers[toWatch[i].Pid()] = toWatch[i] + ww.addToWatch(toWatch[i]) + } + + // the rest of the workers from the workers slice should be stopped + toStop := workers[maxAllocated:] + for i := 0; i < len(toStop); i++ { + _ = toStop[i].Stop() + } + + return nil + } + + // else we can add all workers for i := 0; i < len(workers); i++ { - ii := i - ww.container.Push(workers[ii]) + ww.container.Push(workers[i]) // add worker to watch slice - ww.workers[workers[ii].Pid()] = workers[ii] - ww.addToWatch(workers[ii]) + ww.workers[workers[i].Pid()] = workers[i] + ww.addToWatch(workers[i]) } + return nil } func (ww *WorkerWatcher) AddWorker() error { + if atomic.LoadUint64(&ww.numWorkers) >= maxWorkers { + return errors.E(errors.WorkerAllocate, errors.Str("container is full")) + } + err := ww.Allocate() if err != nil { return err @@ -240,7 +279,7 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { ww.RLock() // that might be one of the workers is working. To proceed, all workers should be inside a channel - if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { + if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { //nolint:gosec ww.RUnlock() continue } @@ -322,7 +361,7 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { case <-tt.C: ww.RLock() // that might be one of the workers is working - if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { + if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { //nolint:gosec ww.RUnlock() continue } From 98275e594649b625aa7d38de6818399c4ef7c596 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 3 Dec 2024 00:38:04 +0100 Subject: [PATCH 2/7] fix: tests, dyn allocator Signed-off-by: Valery Piashchynski --- pool/config.go | 20 +++- pool/static_pool/dyn_allocator.go | 146 ++++++++++++++++++++++++++++++ pool/static_pool/pool.go | 130 +------------------------- pool/static_pool/pool_test.go | 3 - tests/client.php | 41 +++++---- tests/http/client.php | 56 ++++++------ tests/http/slow-client.php | 56 ++++++------ tests/slow-client.php | 32 +++---- tests/slow-destroy.php | 38 ++++---- 9 files changed, 279 insertions(+), 243 deletions(-) create mode 100644 pool/static_pool/dyn_allocator.go diff --git a/pool/config.go b/pool/config.go index ab0dee1..2652bd2 100644 --- a/pool/config.go +++ b/pool/config.go @@ -3,6 +3,7 @@ package pool import ( "runtime" "sync" + "sync/atomic" "time" ) @@ -98,7 +99,7 @@ type DynamicAllocationOpts struct { currAllocated uint64 lock *sync.Mutex ttlTriggerChan chan struct{} - ticketer *time.Ticker + started atomic.Pointer[bool] } func (d *DynamicAllocationOpts) TriggerTTL() { @@ -133,6 +134,18 @@ func (d *DynamicAllocationOpts) ResetAllocated() { d.currAllocated = 0 } +func (d *DynamicAllocationOpts) IsStarted() bool { + return *d.started.Load() +} + +func (d *DynamicAllocationOpts) Start() { + d.started.Store(p(true)) +} + +func (d *DynamicAllocationOpts) Stop() { + d.started.Store(p(false)) +} + func (d *DynamicAllocationOpts) InitDefaults() { d.lock = &sync.Mutex{} @@ -150,4 +163,9 @@ func (d *DynamicAllocationOpts) InitDefaults() { d.ttlTriggerChan = make(chan struct{}, 1) d.currAllocated = 0 + d.started.Store(p(false)) +} + +func p[T any](val T) *T { + return &val } diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go new file mode 100644 index 0000000..6541e97 --- /dev/null +++ b/pool/static_pool/dyn_allocator.go @@ -0,0 +1,146 @@ +package static_pool //nolint:stylecheck + +import ( + "context" + stderr "errors" + "time" + + "github.com/roadrunner-server/errors" + "github.com/roadrunner-server/pool/fsm" + "github.com/roadrunner-server/pool/worker" + "go.uber.org/zap" +) + +func (sp *Pool) allocateDynamically() (*worker.Process, error) { + const op = errors.Op("allocate_dynamically") + + // obtain an operation lock + // we can use a lock-free approach here, but it's not necessary + sp.cfg.DynamicAllocatorOpts.Lock() + defer sp.cfg.DynamicAllocatorOpts.Unlock() + + if !sp.cfg.DynamicAllocatorOpts.IsStarted() { + // start the dynamic allocator listener + sp.dynamicTTLListener() + } + + // reset the TTL listener + sp.cfg.DynamicAllocatorOpts.TriggerTTL() + + // if we already allocated max workers, we can't allocate more + if sp.cfg.DynamicAllocatorOpts.CurrAllocated() >= sp.cfg.DynamicAllocatorOpts.MaxWorkers { + // can't allocate more + return nil, errors.E(op, stderr.New("can't allocate more workers, increase max_workers option")) + } + + // if we have dynamic allocator, we can try to allocate new worker + // this worker should not be released here, but instead will be released in the Exec function + nw, err := sp.allocator() + if err != nil { + sp.log.Error("failed to allocate dynamic worker", zap.Error(err)) + return nil, errors.E(op, err) + } + + watchW := make([]*worker.Process, 0, 10) + watchW = append(watchW, nw) + + // increase number of additionally allocated options + sp.cfg.DynamicAllocatorOpts.IncAllocated() + + sp.log.Debug("allocated additional worker", + zap.Int64("pid", nw.Pid()), + zap.Uint64("max_execs", nw.MaxExecs()), + zap.Uint64("currently additionally allocated", sp.cfg.DynamicAllocatorOpts.CurrAllocated()), + ) + + // we starting from the 1 because we already allocated one worker which would be released in the Exec function + for i := uint64(1); i <= sp.cfg.DynamicAllocatorOpts.SpawnRate; i++ { + // spawn as much workers as user specified in the spawn rate configuration, but not more than max workers + if sp.cfg.DynamicAllocatorOpts.CurrAllocated() >= sp.cfg.DynamicAllocatorOpts.MaxWorkers { + break + } + + bw, err := sp.allocator() + if err != nil { + return nil, errors.E(op, err) + } + + sp.cfg.DynamicAllocatorOpts.IncAllocated() + // add worker to the watcher + watchW = append(watchW, bw) + + sp.log.Debug("allocated additional worker", + zap.Int64("pid", bw.Pid()), + zap.Uint64("max_execs", bw.MaxExecs()), + zap.Uint64("currently additionally allocated", sp.cfg.DynamicAllocatorOpts.CurrAllocated()), + ) + } + + err = sp.ww.Watch(watchW) + if err != nil { + return nil, errors.E(op, err) + } + + return nw, nil +} + +func (sp *Pool) dynamicTTLListener() { + if sp.cfg.DynamicAllocatorOpts == nil || sp.cfg.Debug { + return + } + + sp.log.Debug("starting dynamic allocator listener", zap.Duration("idle_timeout", sp.cfg.DynamicAllocatorOpts.IdleTimeout)) + go func() { + // DynamicAllocatorOpts are read-only, so we can use them without a lock + triggerTTL := time.NewTicker(sp.cfg.DynamicAllocatorOpts.IdleTimeout) + for { + select { + case <-sp.stopCh: + sp.log.Debug("dynamic allocator listener stopped") + goto exit + // when this channel is triggered, we should deallocate all dynamically allocated workers + case <-triggerTTL.C: + sp.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) + // get the Exec (the whole operation) lock + sp.mu.Lock() + // get the DynamicAllocatorOpts lock to prevent operations on the CurrAllocated + sp.cfg.DynamicAllocatorOpts.Lock() + + // if we don't have any dynamically allocated workers, we can skip the deallocation + if sp.cfg.DynamicAllocatorOpts.CurrAllocated() == 0 { + sp.cfg.DynamicAllocatorOpts.Unlock() + sp.mu.Unlock() + goto exit + } + + for i := sp.cfg.DynamicAllocatorOpts.CurrAllocated(); i > 0; i-- { + // take the worker from the stack, inifinite timeout + w, err := sp.ww.Take(context.Background()) + if err != nil { + sp.log.Error("failed to take worker from the stack", zap.Error(err)) + continue + } + + // set the worker state to be destroyed + w.State().Transition(fsm.StateDestroyed) + // release the worker + sp.ww.Release(w) + } + + sp.cfg.DynamicAllocatorOpts.ResetAllocated() + + sp.cfg.DynamicAllocatorOpts.Unlock() + sp.mu.Unlock() + sp.log.Debug("dynamic workers deallocated", zap.String("reason", "idle timeout reached")) + triggerTTL.Stop() + goto exit + + // when this channel is triggered, we should extend the TTL of all dynamically allocated workers + case <-sp.cfg.DynamicAllocatorOpts.GetTriggerTTLChan(): + triggerTTL.Reset(sp.cfg.DynamicAllocatorOpts.IdleTimeout) + } + } + exit: + sp.cfg.DynamicAllocatorOpts.Stop() + }() +} diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 74d96e7..9b988ea 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -5,7 +5,6 @@ import ( "runtime" "sync" "sync/atomic" - "time" "unsafe" "github.com/roadrunner-server/errors" @@ -115,10 +114,6 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p p.start() } - if p.cfg.DynamicAllocatorOpts != nil { - p.dynamicTTLListener() - } - return p, nil } @@ -430,134 +425,13 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr // if we don't have dynamic allocator or in debug mode, we can't allocate a new worker if sp.cfg.DynamicAllocatorOpts == nil || sp.cfg.Debug { - return nil, errors.E(op, err) - } - - // obtain an operation lock - // we can use a lock-free approach here, but it's not necessary - sp.cfg.DynamicAllocatorOpts.Lock() - defer sp.cfg.DynamicAllocatorOpts.Unlock() - - // reset the TTL listener - sp.cfg.DynamicAllocatorOpts.TriggerTTL() - - // if we already allocated max workers, we can't allocate more - if sp.cfg.DynamicAllocatorOpts.CurrAllocated() >= sp.cfg.DynamicAllocatorOpts.MaxWorkers { - // can't allocate more - return nil, errors.E(op, err) - } - - // if we have dynamic allocator, we can try to allocate new worker - // this worker should not be released here, but instead will be released in the Exec function - nw, err := sp.allocator() - if err != nil { - sp.log.Error("failed to allocate dynamic worker", zap.Error(err)) - return nil, errors.E(op, err) - } - - watchW := make([]*worker.Process, 0, 10) - watchW = append(watchW, nw) - - // increase number of additionally allocated options - sp.cfg.DynamicAllocatorOpts.IncAllocated() - - sp.log.Debug("allocated additional worker", - zap.Int64("pid", nw.Pid()), - zap.Uint64("max_execs", nw.MaxExecs()), - zap.Uint64("currently additionally allocated", sp.cfg.DynamicAllocatorOpts.CurrAllocated()), - ) - - // we starting from the 1 because we already allocated one worker which would be released in the Exec function - for i := uint64(1); i <= sp.cfg.DynamicAllocatorOpts.SpawnRate; i++ { - // spawn as much workers as user specified in the spawn rate configuration, but not more than max workers - if sp.cfg.DynamicAllocatorOpts.CurrAllocated() >= sp.cfg.DynamicAllocatorOpts.MaxWorkers { - break - } - - bw, err := sp.allocator() - if err != nil { - return nil, errors.E(op, err) - } - - sp.cfg.DynamicAllocatorOpts.IncAllocated() - // add worker to the watcher - watchW = append(watchW, bw) - - sp.log.Debug("allocated additional worker", - zap.Int64("pid", bw.Pid()), - zap.Uint64("max_execs", bw.MaxExecs()), - zap.Uint64("currently additionally allocated", sp.cfg.DynamicAllocatorOpts.CurrAllocated()), - ) - } - - err = sp.ww.Watch(watchW) - if err != nil { - return nil, errors.E(op, err) + return nil, errors.E(op, errors.NoFreeWorkers) } - return nw, nil + return sp.allocateDynamically() } // else if err not nil - return error return nil, errors.E(op, err) } return w, nil } - -func (sp *Pool) dynamicTTLListener() { - if sp.cfg.DynamicAllocatorOpts == nil || sp.cfg.Debug { - return - } - - sp.log.Debug("starting dynamic allocator listener", zap.Duration("idle_timeout", sp.cfg.DynamicAllocatorOpts.IdleTimeout)) - go func() { - // DynamicAllocatorOpts are read-only, so we can use them without a lock - triggerTTL := time.NewTicker(sp.cfg.DynamicAllocatorOpts.IdleTimeout) - for { - select { - case <-sp.stopCh: - sp.log.Debug("dynamic allocator listener stopped") - return - // when this channel is triggered, we should deallocate all dynamically allocated workers - case <-triggerTTL.C: - sp.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) - // get the Exec (the whole operation) lock - sp.mu.Lock() - // get the DynamicAllocatorOpts lock to prevent operations on the CurrAllocated - sp.cfg.DynamicAllocatorOpts.Lock() - - // if we don't have any dynamically allocated workers, we can skip the deallocation - if sp.cfg.DynamicAllocatorOpts.CurrAllocated() == 0 { - sp.cfg.DynamicAllocatorOpts.Unlock() - sp.mu.Unlock() - return - } - - for i := sp.cfg.DynamicAllocatorOpts.CurrAllocated(); i > 0; i-- { - // take the worker from the stack, inifinite timeout - w, err := sp.ww.Take(context.Background()) - if err != nil { - sp.log.Error("failed to take worker from the stack", zap.Error(err)) - continue - } - - // set the worker state to be destroyed - w.State().Transition(fsm.StateDestroyed) - // release the worker - sp.ww.Release(w) - } - - sp.cfg.DynamicAllocatorOpts.ResetAllocated() - - sp.cfg.DynamicAllocatorOpts.Unlock() - sp.mu.Unlock() - sp.log.Debug("dynamic workers deallocated", zap.String("reason", "idle timeout reached")) - triggerTTL.Stop() - return - - // when this channel is triggered, we should extend the TTL of all dynamically allocated workers - case <-sp.cfg.DynamicAllocatorOpts.GetTriggerTTLChan(): - triggerTTL.Reset(sp.cfg.DynamicAllocatorOpts.IdleTimeout) - } - } - }() -} diff --git a/pool/static_pool/pool_test.go b/pool/static_pool/pool_test.go index 1f360eb..ab21064 100644 --- a/pool/static_pool/pool_test.go +++ b/pool/static_pool/pool_test.go @@ -402,7 +402,6 @@ func Test_StaticPool_QueueSizeLimit(t *testing.T) { WithQueueSize(1), ) require.NoError(t, err) - defer p.Destroy(ctx) assert.NotNil(t, p) @@ -432,8 +431,6 @@ func Test_StaticPool_QueueSizeLimit(t *testing.T) { assert.Equal(t, "hello world", res.Payload().String()) wg.Wait() - - p.Destroy(ctx) } func Test_StaticPool_Echo(t *testing.T) { diff --git a/tests/client.php b/tests/client.php index f81d33e..a5461ce 100644 --- a/tests/client.php +++ b/tests/client.php @@ -6,30 +6,31 @@ require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { - die("need 2 arguments"); + die("need 2 arguments"); } list($test, $goridge) = [$argv[1], $argv[2]]; switch ($goridge) { - case "pipes": - $relay = new Goridge\StreamRelay(STDIN, STDOUT); - break; - - case "tcp": - $relay = new Goridge\SocketRelay("127.0.0.1", 9007); - break; - - case "unix": - $relay = new Goridge\SocketRelay( - "sock.unix", - null, - Goridge\SocketRelay::SOCK_UNIX - ); - break; - - default: - die("invalid protocol selection"); + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; + + case "tcp": + $relay = new Goridge\SocketRelay("127.0.0.1", 9007); + break; + + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketType::UNIX, + ); + break; + + default: + die("invalid protocol selection"); } -require_once sprintf("%s/%s.php", __DIR__, $test); \ No newline at end of file +require_once sprintf("%s/%s.php", __DIR__, $test); + diff --git a/tests/http/client.php b/tests/http/client.php index 90b5c2b..fa9865b 100644 --- a/tests/http/client.php +++ b/tests/http/client.php @@ -7,45 +7,45 @@ require dirname(__DIR__) . "/vendor/autoload.php"; if (count($argv) < 3) { - die("need 2 arguments"); + die("need 2 arguments"); } list($test, $goridge) = [$argv[1], $argv[2]]; switch ($goridge) { - case "pipes": - $relay = new Goridge\StreamRelay(STDIN, STDOUT); - break; - - case "tcp": - $relay = new Goridge\SocketRelay("127.0.0.1", 9007); - break; - - case "unix": - $relay = new Goridge\SocketRelay( - "sock.unix", - null, - Goridge\SocketRelay::SOCK_UNIX - ); - break; - - default: - die("invalid protocol selection"); + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; + + case "tcp": + $relay = new Goridge\SocketRelay("127.0.0.1", 9007); + break; + + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketType::UNIX, + ); + break; + + default: + die("invalid protocol selection"); } $psr7 = new RoadRunner\Http\PSR7Worker( - new RoadRunner\Worker($relay), - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory() + new RoadRunner\Worker($relay), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() ); require_once sprintf("%s/%s.php", __DIR__, $test); while ($req = $psr7->waitRequest()) { - try { - $psr7->respond(handleRequest($req, new \Nyholm\Psr7\Response())); - } catch (\Throwable $e) { - $psr7->getWorker()->error((string)$e); - } + try { + $psr7->respond(handleRequest($req, new \Nyholm\Psr7\Response())); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } } diff --git a/tests/http/slow-client.php b/tests/http/slow-client.php index 1eaa7bc..be554a7 100644 --- a/tests/http/slow-client.php +++ b/tests/http/slow-client.php @@ -7,46 +7,46 @@ require dirname(__DIR__) . "/vendor/autoload.php"; if (count($argv) < 3) { - die("need 2 arguments"); + die("need 2 arguments"); } [$test, $goridge, $bootDelay] = [$argv[1], $argv[2], $argv[3]]; usleep($bootDelay * 1000); switch ($goridge) { - case "pipes": - $relay = new Goridge\StreamRelay(STDIN, STDOUT); - break; - - case "tcp": - $relay = new Goridge\SocketRelay("127.0.0.1", 9007); - break; - - case "unix": - $relay = new Goridge\SocketRelay( - "sock.unix", - null, - Goridge\SocketRelay::SOCK_UNIX - ); - break; - - default: - die("invalid protocol selection"); + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; + + case "tcp": + $relay = new Goridge\SocketRelay("127.0.0.1", 9007); + break; + + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketType::UNIX, + ); + break; + + default: + die("invalid protocol selection"); } $psr7 = new RoadRunner\Http\PSR7Worker( - new RoadRunner\Worker($relay), - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory() + new RoadRunner\Worker($relay), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() ); require_once sprintf("%s/%s.php", __DIR__, $test); while ($req = $psr7->waitRequest()) { - try { - $psr7->respond(handleRequest($req, new \Nyholm\Psr7\Response())); - } catch (\Throwable $e) { - $psr7->getWorker()->error((string) $e); - } + try { + $psr7->respond(handleRequest($req, new \Nyholm\Psr7\Response())); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string) $e); + } } diff --git a/tests/slow-client.php b/tests/slow-client.php index c21b45d..565fe50 100644 --- a/tests/slow-client.php +++ b/tests/slow-client.php @@ -6,31 +6,31 @@ require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { - die("need 2 arguments"); + die("need 2 arguments"); } list($test, $goridge, $bootDelay, $shutdownDelay) = [$argv[1], $argv[2], $argv[3], $argv[4]]; switch ($goridge) { - case "pipes": - $relay = new Goridge\StreamRelay(STDIN, STDOUT); - break; + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; - case "tcp": - $relay = new Goridge\SocketRelay("127.0.0.1", 9007); - break; + case "tcp": + $relay = new Goridge\SocketRelay("127.0.0.1", 9007); + break; - case "unix": - $relay = new Goridge\SocketRelay( - "sock.unix", - null, - Goridge\SocketRelay::SOCK_UNIX - ); + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketType::UNIX, + ); - break; + break; - default: - die("invalid protocol selection"); + default: + die("invalid protocol selection"); } usleep($bootDelay * 1000); diff --git a/tests/slow-destroy.php b/tests/slow-destroy.php index 2edbc0d..a8a7e64 100644 --- a/tests/slow-destroy.php +++ b/tests/slow-destroy.php @@ -6,30 +6,30 @@ require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { - die("need 2 arguments"); + die("need 2 arguments"); } list($test, $goridge) = [$argv[1], $argv[2]]; switch ($goridge) { - case "pipes": - $relay = new Goridge\StreamRelay(STDIN, STDOUT); - break; - - case "tcp": - $relay = new Goridge\SocketRelay("127.0.0.1", 9007); - break; - - case "unix": - $relay = new Goridge\SocketRelay( - "sock.unix", - null, - Goridge\SocketRelay::SOCK_UNIX - ); - break; - - default: - die("invalid protocol selection"); + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; + + case "tcp": + $relay = new Goridge\SocketRelay("127.0.0.1", 9007); + break; + + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketType::UNIX, + ); + break; + + default: + die("invalid protocol selection"); } require_once sprintf("%s/%s.php", __DIR__, $test); From 18d03899e92a2dd2ec29bdfca8f530551384bb15 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 3 Dec 2024 00:40:14 +0100 Subject: [PATCH 3/7] chore: update golangci Signed-off-by: Valery Piashchynski --- .github/workflows/linters.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index eff4bdd..fae5b65 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -18,6 +18,6 @@ jobs: - name: Run linter uses: golangci/golangci-lint-action@v6.1.1 # Action page: with: - version: v1.60 # without patch version + version: v1.62 # without patch version only-new-issues: false # show only new issues if it's a pull request args: --timeout=10m --build-tags=race From b0b829342705fb2ec0bf34bc7a7c92b5270a28b2 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 3 Dec 2024 00:42:15 +0100 Subject: [PATCH 4/7] chore: php 8.4 Signed-off-by: Valery Piashchynski --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4722596..02e44d8 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -10,7 +10,7 @@ jobs: strategy: fail-fast: false matrix: - php: ["8.3"] + php: ["8.4"] go: [stable] os: ["ubuntu-latest", "macos-latest", "windows-latest"] steps: From 1187e331562cad09f5c21d332d5f24fd36ec040e Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 3 Dec 2024 15:31:26 +0100 Subject: [PATCH 5/7] chore: refactoring for the dynamic-pool Signed-off-by: Valery Piashchynski --- pool/config.go | 73 ++------- pool/static_pool/dyn_allocator.go | 190 ++++++++++++++++-------- pool/static_pool/pool.go | 22 ++- pool/static_pool/pool_test.go | 86 +++++++++++ worker_watcher/container/channel/vec.go | 2 +- worker_watcher/worker_watcher.go | 34 +---- 6 files changed, 240 insertions(+), 167 deletions(-) diff --git a/pool/config.go b/pool/config.go index 2652bd2..46f4d85 100644 --- a/pool/config.go +++ b/pool/config.go @@ -2,8 +2,6 @@ package pool import ( "runtime" - "sync" - "sync/atomic" "time" ) @@ -94,78 +92,27 @@ type DynamicAllocationOpts struct { MaxWorkers uint64 `mapstructure:"max_workers"` SpawnRate uint64 `mapstructure:"spawn_rate"` IdleTimeout time.Duration `mapstructure:"idle_timeout"` - - // internal, should be private and moved to the static_pool folder - currAllocated uint64 - lock *sync.Mutex - ttlTriggerChan chan struct{} - started atomic.Pointer[bool] -} - -func (d *DynamicAllocationOpts) TriggerTTL() { - d.ttlTriggerChan <- struct{}{} -} - -func (d *DynamicAllocationOpts) GetTriggerTTLChan() chan struct{} { - return d.ttlTriggerChan -} - -func (d *DynamicAllocationOpts) Lock() { - d.lock.Lock() -} - -func (d *DynamicAllocationOpts) Unlock() { - d.lock.Unlock() -} - -func (d *DynamicAllocationOpts) CurrAllocated() uint64 { - return d.currAllocated -} - -func (d *DynamicAllocationOpts) IncAllocated() { - d.currAllocated++ -} - -func (d *DynamicAllocationOpts) DecAllocated() { - d.currAllocated-- -} - -func (d *DynamicAllocationOpts) ResetAllocated() { - d.currAllocated = 0 -} - -func (d *DynamicAllocationOpts) IsStarted() bool { - return *d.started.Load() -} - -func (d *DynamicAllocationOpts) Start() { - d.started.Store(p(true)) -} - -func (d *DynamicAllocationOpts) Stop() { - d.started.Store(p(false)) } func (d *DynamicAllocationOpts) InitDefaults() { - d.lock = &sync.Mutex{} - if d.MaxWorkers == 0 { d.MaxWorkers = 10 } + // limit max workers to 100 + if d.MaxWorkers > 100 { + d.MaxWorkers = 100 + } + if d.SpawnRate == 0 { - d.SpawnRate = 1 + d.SpawnRate = 5 + } + + if d.SpawnRate > 100 { + d.SpawnRate = 100 } if d.IdleTimeout == 0 || d.IdleTimeout < time.Second { d.IdleTimeout = time.Minute } - - d.ttlTriggerChan = make(chan struct{}, 1) - d.currAllocated = 0 - d.started.Store(p(false)) -} - -func p[T any](val T) *T { - return &val } diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index 6541e97..dfc8f88 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -3,144 +3,204 @@ package static_pool //nolint:stylecheck import ( "context" stderr "errors" + "sync" + "sync/atomic" "time" "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/pool/fsm" + "github.com/roadrunner-server/pool/pool" "github.com/roadrunner-server/pool/worker" + "github.com/roadrunner-server/pool/worker_watcher" "go.uber.org/zap" ) -func (sp *Pool) allocateDynamically() (*worker.Process, error) { +type dynAllocator struct { + // derived from the config + maxWorkers uint64 + spawnRate uint64 + idleTimeout time.Duration + + // internal + currAllocated atomic.Pointer[uint64] + mu *sync.Mutex + execLock *sync.RWMutex + ttlTriggerChan chan struct{} + started atomic.Pointer[bool] + log *zap.Logger + // pool + ww *worker_watcher.WorkerWatcher + allocator func() (*worker.Process, error) + stopCh chan struct{} +} + +func newDynAllocator( + log *zap.Logger, + ww *worker_watcher.WorkerWatcher, + alloc func() (*worker.Process, error), + stopCh chan struct{}, + execLock *sync.RWMutex, + cfg *pool.Config) *dynAllocator { + da := &dynAllocator{ + maxWorkers: cfg.DynamicAllocatorOpts.MaxWorkers, + spawnRate: cfg.DynamicAllocatorOpts.SpawnRate, + idleTimeout: cfg.DynamicAllocatorOpts.IdleTimeout, + mu: &sync.Mutex{}, + ttlTriggerChan: make(chan struct{}, 1), + ww: ww, + execLock: execLock, + allocator: alloc, + log: log, + stopCh: stopCh, + } + + da.currAllocated.Store(p(uint64(0))) + da.started.Store(p(false)) + + return da +} + +func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { const op = errors.Op("allocate_dynamically") // obtain an operation lock // we can use a lock-free approach here, but it's not necessary - sp.cfg.DynamicAllocatorOpts.Lock() - defer sp.cfg.DynamicAllocatorOpts.Unlock() + da.mu.Lock() + defer da.mu.Unlock() - if !sp.cfg.DynamicAllocatorOpts.IsStarted() { + if !*da.started.Load() { // start the dynamic allocator listener - sp.dynamicTTLListener() + da.dynamicTTLListener() + da.started.Store(p(true)) + } else { + // if the listener was started we can try to get the worker with a very short timeout, which was probably allocated by the previous NoFreeWorkers error + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + w, err := da.ww.Take(ctx) + cancel() + if err != nil { + if errors.Is(errors.NoFreeWorkers, err) { + goto allocate + } + + return nil, errors.E(op, err) + } + + return w, nil } +allocate: + // otherwise, we can try to allocate a new batch of workers + // reset the TTL listener - sp.cfg.DynamicAllocatorOpts.TriggerTTL() + select { + case da.ttlTriggerChan <- struct{}{}: + case <-time.After(time.Minute): + return nil, errors.E(op, stderr.New("failed to reset the TTL listener")) + } // if we already allocated max workers, we can't allocate more - if sp.cfg.DynamicAllocatorOpts.CurrAllocated() >= sp.cfg.DynamicAllocatorOpts.MaxWorkers { + if *da.currAllocated.Load() >= da.maxWorkers { // can't allocate more - return nil, errors.E(op, stderr.New("can't allocate more workers, increase max_workers option")) + return nil, errors.E(op, stderr.New("can't allocate more workers, increase max_workers option (max_workers limit is 100)")) } // if we have dynamic allocator, we can try to allocate new worker // this worker should not be released here, but instead will be released in the Exec function - nw, err := sp.allocator() + err := da.ww.AddWorker() if err != nil { - sp.log.Error("failed to allocate dynamic worker", zap.Error(err)) + da.log.Error("failed to allocate dynamic worker", zap.Error(err)) return nil, errors.E(op, err) } - watchW := make([]*worker.Process, 0, 10) - watchW = append(watchW, nw) - // increase number of additionally allocated options - sp.cfg.DynamicAllocatorOpts.IncAllocated() + _ = da.currAllocated.Swap(p(*da.currAllocated.Load() + 1)) - sp.log.Debug("allocated additional worker", - zap.Int64("pid", nw.Pid()), - zap.Uint64("max_execs", nw.MaxExecs()), - zap.Uint64("currently additionally allocated", sp.cfg.DynamicAllocatorOpts.CurrAllocated()), - ) + da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", *da.currAllocated.Load())) // we starting from the 1 because we already allocated one worker which would be released in the Exec function - for i := uint64(1); i <= sp.cfg.DynamicAllocatorOpts.SpawnRate; i++ { + for i := uint64(1); i <= da.spawnRate; i++ { // spawn as much workers as user specified in the spawn rate configuration, but not more than max workers - if sp.cfg.DynamicAllocatorOpts.CurrAllocated() >= sp.cfg.DynamicAllocatorOpts.MaxWorkers { + if *da.currAllocated.Load() >= da.maxWorkers { break } - bw, err := sp.allocator() + err = da.ww.AddWorker() if err != nil { return nil, errors.E(op, err) } - sp.cfg.DynamicAllocatorOpts.IncAllocated() - // add worker to the watcher - watchW = append(watchW, bw) + // increase number of additionally allocated options + _ = da.currAllocated.Swap(p(*da.currAllocated.Load() + 1)) - sp.log.Debug("allocated additional worker", - zap.Int64("pid", bw.Pid()), - zap.Uint64("max_execs", bw.MaxExecs()), - zap.Uint64("currently additionally allocated", sp.cfg.DynamicAllocatorOpts.CurrAllocated()), - ) + da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", *da.currAllocated.Load())) } - err = sp.ww.Watch(watchW) - if err != nil { - return nil, errors.E(op, err) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + w, err := da.ww.Take(ctx) + cancel() - return nw, nil + return w, err } -func (sp *Pool) dynamicTTLListener() { - if sp.cfg.DynamicAllocatorOpts == nil || sp.cfg.Debug { - return - } - - sp.log.Debug("starting dynamic allocator listener", zap.Duration("idle_timeout", sp.cfg.DynamicAllocatorOpts.IdleTimeout)) +func (da *dynAllocator) dynamicTTLListener() { + da.log.Debug("starting dynamic allocator listener", zap.Duration("idle_timeout", da.idleTimeout)) go func() { // DynamicAllocatorOpts are read-only, so we can use them without a lock - triggerTTL := time.NewTicker(sp.cfg.DynamicAllocatorOpts.IdleTimeout) + triggerTTL := time.NewTicker(da.idleTimeout) for { select { - case <-sp.stopCh: - sp.log.Debug("dynamic allocator listener stopped") + case <-da.stopCh: + da.log.Debug("dynamic allocator listener stopped") goto exit // when this channel is triggered, we should deallocate all dynamically allocated workers case <-triggerTTL.C: - sp.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) + da.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) // get the Exec (the whole operation) lock - sp.mu.Lock() + da.execLock.Lock() // get the DynamicAllocatorOpts lock to prevent operations on the CurrAllocated - sp.cfg.DynamicAllocatorOpts.Lock() + da.mu.Lock() // if we don't have any dynamically allocated workers, we can skip the deallocation - if sp.cfg.DynamicAllocatorOpts.CurrAllocated() == 0 { - sp.cfg.DynamicAllocatorOpts.Unlock() - sp.mu.Unlock() + if *da.currAllocated.Load() == 0 { + da.mu.Unlock() + da.execLock.Unlock() goto exit } - for i := sp.cfg.DynamicAllocatorOpts.CurrAllocated(); i > 0; i-- { + for i := *da.currAllocated.Load(); i > 0; i-- { // take the worker from the stack, inifinite timeout - w, err := sp.ww.Take(context.Background()) + // we should not block here forever + err := da.ww.RemoveWorker(context.Background()) if err != nil { - sp.log.Error("failed to take worker from the stack", zap.Error(err)) + da.log.Error("failed to take worker from the stack", zap.Error(err)) continue } - // set the worker state to be destroyed - w.State().Transition(fsm.StateDestroyed) - // release the worker - sp.ww.Release(w) + // reset the number of allocated workers + // potential problem: if we'd have an error in the da.ww.Take code block, we'd still have the currAllocated > 0 + + // decrease number of additionally allocated options + _ = da.currAllocated.Swap(p(*da.currAllocated.Load() - 1)) } - sp.cfg.DynamicAllocatorOpts.ResetAllocated() + if *da.currAllocated.Load() != 0 { + da.log.Error("failed to deallocate all dynamically allocated workers", zap.Uint64("remaining", *da.currAllocated.Load())) + } - sp.cfg.DynamicAllocatorOpts.Unlock() - sp.mu.Unlock() - sp.log.Debug("dynamic workers deallocated", zap.String("reason", "idle timeout reached")) + da.mu.Unlock() + da.execLock.Unlock() triggerTTL.Stop() goto exit // when this channel is triggered, we should extend the TTL of all dynamically allocated workers - case <-sp.cfg.DynamicAllocatorOpts.GetTriggerTTLChan(): - triggerTTL.Reset(sp.cfg.DynamicAllocatorOpts.IdleTimeout) + case <-da.ttlTriggerChan: + triggerTTL.Reset(da.idleTimeout) } } exit: - sp.cfg.DynamicAllocatorOpts.Stop() + da.started.Store(p(false)) }() } + +func p[T any](val T) *T { + return &val +} diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 9b988ea..3420b07 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -35,6 +35,8 @@ type Pool struct { factory pool.Factory // manages worker states and TTLs ww *workerWatcher.WorkerWatcher + // dynamic allocator + dynamicAllocator *dynAllocator // allocate new worker allocator func() (*worker.Process, error) // exec queue size @@ -58,6 +60,11 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p cfg.InitDefaults() + // limit the number of workers to 500 + if cfg.NumWorkers > 500 { + return nil, errors.Str("number of workers can't be more than 500") + } + // for debug mode we need to set the number of workers to 0 (no pre-allocated workers) and max jobs to 1 if cfg.Debug { cfg.NumWorkers = 0 @@ -114,6 +121,10 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p p.start() } + if p.cfg.DynamicAllocatorOpts != nil { + p.dynamicAllocator = newDynAllocator(p.log, p.ww, p.allocator, p.stopCh, &p.mu, p.cfg) + } + return p, nil } @@ -370,10 +381,7 @@ func (sp *Pool) NumDynamic() uint64 { return 0 } - sp.cfg.DynamicAllocatorOpts.Lock() - defer sp.cfg.DynamicAllocatorOpts.Unlock() - - return sp.cfg.DynamicAllocatorOpts.CurrAllocated() + return *sp.dynamicAllocator.currAllocated.Load() } // Destroy all underlying stack (but let them complete the task). @@ -428,7 +436,11 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr return nil, errors.E(op, errors.NoFreeWorkers) } - return sp.allocateDynamically() + // for the dynamic allocator, we can would have many requests waiting at the same time on the lock in the dyn allocator + // this will lead to the following case - all previous requests would be able to get the worker, since we're allocating them in the allocateDynamically + // however, requests waiting for the lock, won't allocate a new worker and would be failed + + return sp.dynamicAllocator.allocateDynamically() } // else if err not nil - return error return nil, errors.E(op, err) diff --git a/pool/static_pool/pool_test.go b/pool/static_pool/pool_test.go index ab21064..32caf6c 100644 --- a/pool/static_pool/pool_test.go +++ b/pool/static_pool/pool_test.go @@ -117,6 +117,92 @@ func Test_DynamicPool(t *testing.T) { p.Destroy(ctx) } +func Test_MaxWorkers(t *testing.T) { + dynAllCfg := &pool.Config{ + NumWorkers: 501, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second, + } + + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(log()), + dynAllCfg, + log(), + ) + assert.Error(t, err) + assert.Nil(t, p) +} + +func Test_DynamicPool_500W(t *testing.T) { + dynAllCfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 100, + IdleTimeout: time.Second * 15, + // should be corrected to 100 by RR + SpawnRate: 101, + }, + } + + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(log()), + dynAllCfg, + log(), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + require.Len(t, p.Workers(), 1) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + assert.NoError(t, err) + select { + case resp := <-r: + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + + go func() { + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + assert.NoError(t, err) + select { + case resp := <-r: + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + + wg.Wait() + + require.Len(t, p.Workers(), 101) + time.Sleep(time.Second * 20) + p.Destroy(ctx) +} + func Test_NewPoolAddRemoveWorkers(t *testing.T) { testCfg2 := &pool.Config{ NumWorkers: 1, diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 5215504..40c93f2 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -25,7 +25,7 @@ func NewVector() *Vec { vec := &Vec{ destroy: 0, reset: 0, - workers: make(chan *worker.Process, 500), + workers: make(chan *worker.Process, 600), } return vec diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 7dc1e42..280e957 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -56,38 +56,6 @@ func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { ww.Lock() defer ww.Unlock() - // if the container is full, return an error - if atomic.LoadUint64(&ww.numWorkers) >= maxWorkers { - return errors.E(errors.WorkerAllocate, errors.Str("container is full")) - } - - // if the number of workers to add is greater than the maximum number of workers - // we can add only the number of workers that can be added w/o exceeding the limit - if atomic.LoadUint64(&ww.numWorkers)+uint64(len(workers)) > maxWorkers { - // calculate the number of workers that can be added - // for example, if we have 100 [0..99] workers to add, but the container has 450 workers, - // we can add only 50 workers: 500 - 450 = 50 [0..49] - maxAllocated := maxWorkers - atomic.LoadUint64(&ww.numWorkers) - - // workers[:maxAllocated] - up to 50 including 50 (in our example) - toWatch := workers[:maxAllocated] - // toAllocate - will contain [0, 1, 2, ..., 50] + [50..99] (100 in total) - for i := 0; i < len(toWatch); i++ { - ww.container.Push(toWatch[i]) - // add worker to watch slice - ww.workers[toWatch[i].Pid()] = toWatch[i] - ww.addToWatch(toWatch[i]) - } - - // the rest of the workers from the workers slice should be stopped - toStop := workers[maxAllocated:] - for i := 0; i < len(toStop); i++ { - _ = toStop[i].Stop() - } - - return nil - } - // else we can add all workers for i := 0; i < len(workers); i++ { ww.container.Push(workers[i]) @@ -101,7 +69,7 @@ func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { func (ww *WorkerWatcher) AddWorker() error { if atomic.LoadUint64(&ww.numWorkers) >= maxWorkers { - return errors.E(errors.WorkerAllocate, errors.Str("container is full")) + return errors.E(errors.WorkerAllocate, errors.Str("container is full, maximum number of workers reached")) } err := ww.Allocate() From 7b979c5b919bc4dc111d07a77c5a62a468d39e0a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 3 Dec 2024 16:32:51 +0100 Subject: [PATCH 6/7] chore: reset ttl after every allocated worker Signed-off-by: Valery Piashchynski --- pool/static_pool/dyn_allocator.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index dfc8f88..0253f79 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -128,6 +128,13 @@ allocate: return nil, errors.E(op, err) } + // reset ttl after every alloated worker + select { + case da.ttlTriggerChan <- struct{}{}: + case <-time.After(time.Minute): + return nil, errors.E(op, stderr.New("failed to reset the TTL listener")) + } + // increase number of additionally allocated options _ = da.currAllocated.Swap(p(*da.currAllocated.Load() + 1)) From 1cd40f4c3b5f7e9e16753ae79a9540839bf6e559 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 3 Dec 2024 16:58:10 +0100 Subject: [PATCH 7/7] chore: update deps Signed-off-by: Valery Piashchynski --- go.mod | 6 +++--- go.sum | 6 ++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index b58b1ec..f29b4ea 100644 --- a/go.mod +++ b/go.mod @@ -10,9 +10,9 @@ require ( github.com/roadrunner-server/events v1.0.1 github.com/roadrunner-server/goridge/v3 v3.8.3 github.com/shirou/gopsutil v3.21.11+incompatible - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.8.0 + golang.org/x/sync v0.9.0 ) require ( @@ -24,6 +24,6 @@ require ( github.com/tklauser/numcpus v0.9.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/sys v0.26.0 // indirect + golang.org/x/sys v0.27.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 187fcd3..82fcad2 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKl github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU= github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY= github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPDo= @@ -33,10 +35,14 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=