diff --git a/backoff.go b/backoff.go new file mode 100644 index 000000000..f5132458e --- /dev/null +++ b/backoff.go @@ -0,0 +1,51 @@ +package frankenphp + +import ( + "sync" + "time" +) + +type exponentialBackoff struct { + backoff time.Duration + failureCount int + mu sync.RWMutex + maxBackoff time.Duration + minBackoff time.Duration + maxConsecutiveFailures int +} + +// recordSuccess resets the backoff and failureCount +func (e *exponentialBackoff) recordSuccess() { + e.mu.Lock() + e.failureCount = 0 + e.backoff = e.minBackoff + e.mu.Unlock() +} + +// recordFailure increments the failure count and increases the backoff, it returns true if maxConsecutiveFailures has been reached +func (e *exponentialBackoff) recordFailure() bool { + e.mu.Lock() + e.failureCount += 1 + if e.backoff < e.minBackoff { + e.backoff = e.minBackoff + } + + e.backoff = min(e.backoff*2, e.maxBackoff) + + e.mu.Unlock() + return e.failureCount >= e.maxConsecutiveFailures +} + +// wait sleeps for the backoff duration if failureCount is non-zero. +// NOTE: this is not tested and should be kept 'obviously correct' (i.e., simple) +func (e *exponentialBackoff) wait() { + e.mu.RLock() + if e.failureCount == 0 { + e.mu.RUnlock() + + return + } + e.mu.RUnlock() + + time.Sleep(e.backoff) +} diff --git a/backoff_test.go b/backoff_test.go new file mode 100644 index 000000000..5ced2e4cd --- /dev/null +++ b/backoff_test.go @@ -0,0 +1,41 @@ +package frankenphp + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestExponentialBackoff_Reset(t *testing.T) { + e := &exponentialBackoff{ + maxBackoff: 5 * time.Second, + minBackoff: 500 * time.Millisecond, + maxConsecutiveFailures: 3, + } + + assert.False(t, e.recordFailure()) + assert.False(t, e.recordFailure()) + e.recordSuccess() + + e.mu.RLock() + defer e.mu.RUnlock() + assert.Equal(t, 0, e.failureCount, "expected failureCount to be reset to 0") + assert.Equal(t, e.backoff, e.minBackoff, "expected backoff to be reset to minBackoff") +} + +func TestExponentialBackoff_Trigger(t *testing.T) { + e := &exponentialBackoff{ + maxBackoff: 500 * 3 * time.Millisecond, + minBackoff: 500 * time.Millisecond, + maxConsecutiveFailures: 3, + } + + assert.False(t, e.recordFailure()) + assert.False(t, e.recordFailure()) + assert.True(t, e.recordFailure()) + + e.mu.RLock() + defer e.mu.RUnlock() + assert.Equal(t, e.failureCount, e.maxConsecutiveFailures, "expected failureCount to be maxConsecutiveFailures") + assert.Equal(t, e.backoff, e.maxBackoff, "expected backoff to be maxBackoff") +} diff --git a/env.go b/env.go new file mode 100644 index 000000000..d1e464dfb --- /dev/null +++ b/env.go @@ -0,0 +1,74 @@ +package frankenphp + +// #include "frankenphp.h" +import "C" +import ( + "os" + "strings" + "unsafe" +) + +//export go_putenv +func go_putenv(str *C.char, length C.int) C.bool { + envString := C.GoStringN(str, length) + + // Check if '=' is present in the string + if key, val, found := strings.Cut(envString, "="); found { + return os.Setenv(key, val) == nil + } + + // No '=', unset the environment variable + return os.Unsetenv(envString) == nil +} + +//export go_getfullenv +func go_getfullenv(threadIndex C.uintptr_t) (*C.go_string, C.size_t) { + thread := phpThreads[threadIndex] + + env := os.Environ() + goStrings := make([]C.go_string, len(env)*2) + + for i, envVar := range env { + key, val, _ := strings.Cut(envVar, "=") + goStrings[i*2] = C.go_string{C.size_t(len(key)), thread.pinString(key)} + goStrings[i*2+1] = C.go_string{C.size_t(len(val)), thread.pinString(val)} + } + + value := unsafe.SliceData(goStrings) + thread.Pin(value) + + return value, C.size_t(len(env)) +} + +//export go_getenv +func go_getenv(threadIndex C.uintptr_t, name *C.go_string) (C.bool, *C.go_string) { + thread := phpThreads[threadIndex] + + // Create a byte slice from C string with a specified length + envName := C.GoStringN(name.data, C.int(name.len)) + + // Get the environment variable value + envValue, exists := os.LookupEnv(envName) + if !exists { + // Environment variable does not exist + return false, nil // Return 0 to indicate failure + } + + // Convert Go string to C string + value := &C.go_string{C.size_t(len(envValue)), thread.pinString(envValue)} + thread.Pin(value) + + return true, value // Return 1 to indicate success +} + +//export go_sapi_getenv +func go_sapi_getenv(threadIndex C.uintptr_t, name *C.go_string) *C.char { + envName := C.GoStringN(name.data, C.int(name.len)) + + envValue, exists := os.LookupEnv(envName) + if !exists { + return nil + } + + return phpThreads[threadIndex].pinCString(envValue) +} diff --git a/frankenphp.c b/frankenphp.c index a19a56dab..31b87a6ad 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -243,7 +243,11 @@ PHP_FUNCTION(frankenphp_finish_request) { /* {{{ */ php_header(); if (ctx->has_active_request) { +#ifdef NEW_WORKER + go_frankenphp_finish_php_request(thread_index); +#else go_frankenphp_finish_request(thread_index, false); +#endif } ctx->finished = true; @@ -443,7 +447,11 @@ PHP_FUNCTION(frankenphp_handle_request) { frankenphp_worker_request_shutdown(); ctx->has_active_request = false; +#ifdef NEW_WORKER + go_frankenphp_finish_worker_request(thread_index); +#else go_frankenphp_finish_request(thread_index, true); +#endif RETURN_TRUE; } @@ -832,8 +840,24 @@ static void *php_thread(void *arg) { cfg_get_string("filter.default", &default_filter); should_filter_var = default_filter != NULL; +#ifdef NEW_WORKER + go_frankenphp_on_thread_startup(thread_index); + + while(true) { + char *scriptName = go_frankenphp_before_script_execution(thread_index); + + // if the script name is NULL, the thread should exit + if (scriptName == NULL || scriptName[0] == '\0') { + break; + } + + int exit_status = frankenphp_execute_script(scriptName); + go_frankenphp_after_script_execution(thread_index, exit_status); + } +#else while (go_handle_request(thread_index)) { } +#endif go_frankenphp_release_known_variable_keys(thread_index); @@ -841,6 +865,10 @@ static void *php_thread(void *arg) { ts_free_thread(); #endif +#ifdef NEW_WORKER + go_frankenphp_on_thread_shutdown(thread_index); +#endif + return NULL; } diff --git a/frankenphp.go b/frankenphp.go index d7e35604c..37a8ac930 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -505,88 +505,6 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error return nil } -//export go_putenv -func go_putenv(str *C.char, length C.int) C.bool { - // Create a byte slice from C string with a specified length - s := C.GoBytes(unsafe.Pointer(str), length) - - // Convert byte slice to string - envString := string(s) - - // Check if '=' is present in the string - if key, val, found := strings.Cut(envString, "="); found { - if os.Setenv(key, val) != nil { - return false // Failure - } - } else { - // No '=', unset the environment variable - if os.Unsetenv(envString) != nil { - return false // Failure - } - } - - return true // Success -} - -//export go_getfullenv -func go_getfullenv(threadIndex C.uintptr_t) (*C.go_string, C.size_t) { - thread := phpThreads[threadIndex] - - env := os.Environ() - goStrings := make([]C.go_string, len(env)*2) - - for i, envVar := range env { - key, val, _ := strings.Cut(envVar, "=") - k := unsafe.StringData(key) - v := unsafe.StringData(val) - thread.Pin(k) - thread.Pin(v) - - goStrings[i*2] = C.go_string{C.size_t(len(key)), (*C.char)(unsafe.Pointer(k))} - goStrings[i*2+1] = C.go_string{C.size_t(len(val)), (*C.char)(unsafe.Pointer(v))} - } - - value := unsafe.SliceData(goStrings) - thread.Pin(value) - - return value, C.size_t(len(env)) -} - -//export go_getenv -func go_getenv(threadIndex C.uintptr_t, name *C.go_string) (C.bool, *C.go_string) { - thread := phpThreads[threadIndex] - - // Create a byte slice from C string with a specified length - envName := C.GoStringN(name.data, C.int(name.len)) - - // Get the environment variable value - envValue, exists := os.LookupEnv(envName) - if !exists { - // Environment variable does not exist - return false, nil // Return 0 to indicate failure - } - - // Convert Go string to C string - val := unsafe.StringData(envValue) - thread.Pin(val) - value := &C.go_string{C.size_t(len(envValue)), (*C.char)(unsafe.Pointer(val))} - thread.Pin(value) - - return true, value // Return 1 to indicate success -} - -//export go_sapi_getenv -func go_sapi_getenv(threadIndex C.uintptr_t, name *C.go_string) *C.char { - envName := C.GoStringN(name.data, C.int(name.len)) - - envValue, exists := os.LookupEnv(envName) - if !exists { - return nil - } - - return phpThreads[threadIndex].pinCString(envValue) -} - //export go_handle_request func go_handle_request(threadIndex C.uintptr_t) bool { select { diff --git a/new-worker.go b/new-worker.go new file mode 100644 index 000000000..89d94c280 --- /dev/null +++ b/new-worker.go @@ -0,0 +1,89 @@ +//go:build newworker + +package frankenphp + +// #cgo CFLAGS: -DNEW_WORKER +// #include +// #include "frankenphp.h" +import "C" +import "net/http" + +// worker represents a PHP worker script. +type worker struct{} + +// handleRequest handles an incoming HTTP request and passes it to the worker thread. +func (worker *worker) handleRequest(r *http.Request) {} + +// A map of workers by identity. +var workers = make(map[string]*worker) + +// initWorkers initializes the workers. +func initWorkers(opt []workerOpt) error { + panic("not implemented") +} + +// stopWorkers stops the workers. +func stopWorkers() {} + +// drainWorkers drains the workers. +func drainWorkers() {} + +// restartWorkers restarts the workers. +func restartWorkers(workerOpts []workerOpt) {} + +// go_frankenphp_worker_handle_request_start handles the start of a worker request. +// +//export go_frankenphp_worker_handle_request_start +func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool { + panic("not implemented") +} + +// go_frankenphp_finish_php_request should flush the buffers and return the response. +// this does not mean the php code has finished executing, +// but that the request has been fully processed and can be returned to the client. +// +//export go_frankenphp_finish_php_request +func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) { + thread := phpThreads[threadIndex] + r := thread.getActiveRequest() + fc := r.Context().Value(contextKey).(*FrankenPHPContext) + maybeCloseContext(fc) +} + +// go_frankenphp_on_thread_startup is called when a thread is started. +// +//export go_frankenphp_on_thread_startup +func go_frankenphp_on_thread_startup(threadIndex C.uintptr_t) { +} + +// go_frankenphp_before_script_execution is called before a request handling script is executed. +// it should return the script name to execute. +// +//export go_frankenphp_before_script_execution +func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char { + panic("not implemented") +} + +// go_frankenphp_after_script_execution is called after a request handling script is executed +// +//export go_frankenphp_after_script_execution +func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C.int) { +} + +// go_frankenphp_on_thread_shutdown is called when a thread is shutting down. +// +//export go_frankenphp_on_thread_shutdown +func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) { +} + +// go_frankenphp_finish_worker_request is called when a worker has finished processing a request. +// +//export go_frankenphp_finish_worker_request +func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) { + +} + +// restartWorkersOnFileChanges restarts the workers on file changes. +func restartWorkersOnFileChanges(workerOpts []workerOpt) error { + panic("not implemented") +} diff --git a/worker.go b/worker.go index 909953644..ffea813ed 100644 --- a/worker.go +++ b/worker.go @@ -1,3 +1,5 @@ +//go:build !newworker + package frankenphp // #include @@ -26,10 +28,6 @@ type worker struct { ready chan struct{} } -const maxWorkerErrorBackoff = 1 * time.Second -const minWorkerErrorBackoff = 100 * time.Millisecond -const maxWorkerConsecutiveFailures = 6 - var ( watcherIsEnabled bool workerShutdownWG sync.WaitGroup @@ -97,33 +95,15 @@ func newWorker(o workerOpt) (*worker, error) { func (worker *worker) startNewWorkerThread() { workerShutdownWG.Add(1) defer workerShutdownWG.Done() - - backoff := minWorkerErrorBackoff - failureCount := 0 - backingOffLock := sync.RWMutex{} + backoff := &exponentialBackoff{ + maxBackoff: 1 * time.Second, + minBackoff: 100 * time.Millisecond, + maxConsecutiveFailures: 6, + } for { // if the worker can stay up longer than backoff*2, it is probably an application error - upFunc := sync.Once{} - go func() { - backingOffLock.RLock() - wait := backoff * 2 - backingOffLock.RUnlock() - time.Sleep(wait) - upFunc.Do(func() { - backingOffLock.Lock() - defer backingOffLock.Unlock() - // if we come back to a stable state, reset the failure count - if backoff == minWorkerErrorBackoff { - failureCount = 0 - } - - // earn back the backoff over time - if failureCount > 0 { - backoff = max(backoff/2, 100*time.Millisecond) - } - }) - }() + backoff.wait() metrics.StartWorker(worker.fileName) @@ -176,31 +156,17 @@ func (worker *worker) startNewWorkerThread() { c.Write(zap.String("worker", worker.fileName)) } metrics.StopWorker(worker.fileName, StopReasonRestart) + backoff.recordSuccess() continue } // on exit status 1 we log the error and apply an exponential backoff when restarting - upFunc.Do(func() { - backingOffLock.Lock() - defer backingOffLock.Unlock() - // if we end up here, the worker has not been up for backoff*2 - // this is probably due to a syntax error or another fatal error - if failureCount >= maxWorkerConsecutiveFailures { - if !watcherIsEnabled { - panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName)) - } - logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", failureCount)) + if backoff.recordFailure() { + if !watcherIsEnabled { + panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName)) } - failureCount += 1 - }) - backingOffLock.RLock() - wait := backoff - backingOffLock.RUnlock() - time.Sleep(wait) - backingOffLock.Lock() - backoff *= 2 - backoff = min(backoff, maxWorkerErrorBackoff) - backingOffLock.Unlock() + logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", backoff.failureCount)) + } metrics.StopWorker(worker.fileName, StopReasonCrash) } diff --git a/worker_test.go b/worker_test.go index b6df580f7..a840454f0 100644 --- a/worker_test.go +++ b/worker_test.go @@ -1,3 +1,5 @@ +//go:build !newworker + package frankenphp_test import (