Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: decouple worker threads from non-worker threads #1137

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
fe1158f
Decouple workers.
Alliballibaba2 Nov 1, 2024
ad34140
Moves code to separate file.
Alliballibaba2 Nov 1, 2024
89b211d
Cleans up the exponential backoff.
Alliballibaba2 Nov 2, 2024
7d2ab8c
Initial working implementation.
Alliballibaba2 Nov 2, 2024
f7e7d41
Refactors php threads to take callbacks.
Alliballibaba2 Nov 2, 2024
c03c59b
Cleanup.
Alliballibaba2 Nov 2, 2024
a9857dc
Cleanup.
Alliballibaba2 Nov 2, 2024
bac9555
Cleanup.
Alliballibaba2 Nov 2, 2024
a2f8d59
Cleanup.
Alliballibaba2 Nov 2, 2024
279924c
Merge branch 'main' into refactor/start-worker-threads-directly
Alliballibaba2 Nov 3, 2024
0825453
Adjusts watcher logic.
Alliballibaba2 Nov 3, 2024
17d5cbe
Adjusts the watcher logic.
Alliballibaba2 Nov 3, 2024
09e0ca6
Fix opcache_reset race condition.
Alliballibaba2 Nov 4, 2024
a726a2c
Merge branch 'main' into refactor/start-worker-threads-directly
Alliballibaba2 Nov 4, 2024
7f13ada
Fixing merge conflicts and formatting.
Alliballibaba2 Nov 4, 2024
13fb4bb
Prevents overlapping of TSRM reservation and script execution.
Alliballibaba2 Nov 5, 2024
a8a00c8
Adjustments as suggested by @dunglas.
Alliballibaba2 Nov 5, 2024
b4dd138
Adds error assertions.
Alliballibaba2 Nov 5, 2024
03f98fa
Adds comments.
Alliballibaba2 Nov 5, 2024
e52dd0f
Removes logs and explicitly compares to C.false.
Alliballibaba2 Nov 5, 2024
cd98e33
Resets check.
Alliballibaba2 Nov 5, 2024
4e2a2c6
Adds cast for safety.
Alliballibaba2 Nov 5, 2024
c51eb93
Fixes waitgroup overflow.
Alliballibaba2 Nov 5, 2024
89d8e26
Resolves waitgroup race condition on startup.
Alliballibaba2 Nov 6, 2024
3587243
Moves worker request logic to worker.go.
Alliballibaba2 Nov 7, 2024
ec32f0c
Removes defer.
Alliballibaba2 Nov 7, 2024
4e35698
Removes call from go to c.
Alliballibaba2 Nov 11, 2024
740fac7
Merge branch 'main' into refactor/start-worker-threads-directly
Alliballibaba2 Nov 15, 2024
8a272cb
Fixes merge conflict.
Alliballibaba2 Nov 15, 2024
ecce5d5
Adds fibers test back in.
Alliballibaba2 Nov 15, 2024
06ebd67
Refactors new thread loop approach.
Alliballibaba2 Nov 15, 2024
c811f4a
Removes redundant check.
Alliballibaba2 Nov 16, 2024
6bd047a
Adds compareAndSwap.
Alliballibaba2 Nov 16, 2024
55ad8ba
Refactor: removes global waitgroups and uses a 'thread state' abstrac…
Alliballibaba2 Nov 17, 2024
3ffbe06
Merge branch 'main' into refactor/start-worker-threads-directly
Alliballibaba2 Nov 17, 2024
01ed92b
Removes unnecessary method.
Alliballibaba2 Nov 17, 2024
790cccc
Updates comment.
Alliballibaba2 Nov 17, 2024
0dd2605
Removes unnecessary booleans.
Alliballibaba2 Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package frankenphp

// #include "frankenphp.h"
import "C"
import (
"os"
"strings"
"unsafe"
)

//export go_putenv
func go_putenv(str *C.char, length C.int) C.bool {
// Create a byte slice from C string with a specified length
s := C.GoBytes(unsafe.Pointer(str), length)

// Convert byte slice to string
envString := string(s)

// Check if '=' is present in the string
if key, val, found := strings.Cut(envString, "="); found {
if os.Setenv(key, val) != nil {
return false // Failure
}
} else {
// No '=', unset the environment variable
if os.Unsetenv(envString) != nil {
return false // Failure
}
}

return true // Success
}

//export go_getfullenv
func go_getfullenv(threadIndex C.uintptr_t) (*C.go_string, C.size_t) {
thread := phpThreads[threadIndex]

env := os.Environ()
goStrings := make([]C.go_string, len(env)*2)

for i, envVar := range env {
key, val, _ := strings.Cut(envVar, "=")
goStrings[i*2] = C.go_string{C.size_t(len(key)), thread.pinString(key)}
goStrings[i*2+1] = C.go_string{C.size_t(len(val)), thread.pinString(val)}
}

value := unsafe.SliceData(goStrings)
thread.Pin(value)

return value, C.size_t(len(env))
}

//export go_getenv
func go_getenv(threadIndex C.uintptr_t, name *C.go_string) (C.bool, *C.go_string) {
thread := phpThreads[threadIndex]

// Create a byte slice from C string with a specified length
envName := C.GoStringN(name.data, C.int(name.len))

// Get the environment variable value
envValue, exists := os.LookupEnv(envName)
if !exists {
// Environment variable does not exist
return false, nil // Return 0 to indicate failure
}

// Convert Go string to C string
value := &C.go_string{C.size_t(len(envValue)), thread.pinString(envValue)}
thread.Pin(value)

return true, value // Return 1 to indicate success
}

//export go_sapi_getenv
func go_sapi_getenv(threadIndex C.uintptr_t, name *C.go_string) *C.char {
envName := C.GoStringN(name.data, C.int(name.len))

envValue, exists := os.LookupEnv(envName)
if !exists {
return nil
}

return phpThreads[threadIndex].pinCString(envValue)
}
60 changes: 60 additions & 0 deletions exponential_backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package frankenphp

import (
"sync"
"time"
)

const maxBackoff = 1 * time.Second
const minBackoff = 100 * time.Millisecond
const maxConsecutiveFailures = 6

type exponentialBackoff struct {
backoff time.Duration
failureCount int
mu sync.RWMutex
upFunc sync.Once
}

func newExponentialBackoff() *exponentialBackoff {
return &exponentialBackoff{backoff: minBackoff}
}

func (e *exponentialBackoff) reset() {
e.mu.Lock()
e.upFunc = sync.Once{}
wait := e.backoff * 2
e.mu.Unlock()
go func() {
time.Sleep(wait)
e.mu.Lock()
defer e.mu.Unlock()
e.upFunc.Do(func() {
// if we come back to a stable state, reset the failure count
if e.backoff == minBackoff {
e.failureCount = 0
}

// earn back the backoff over time
if e.failureCount > 0 {
e.backoff = max(e.backoff/2, minBackoff)
}
})
}()
}

func (e *exponentialBackoff) trigger(onMaxFailures func(failureCount int)) {
e.mu.RLock()
e.upFunc.Do(func() {
if e.failureCount >= maxConsecutiveFailures {
onMaxFailures(e.failureCount)
}
e.failureCount += 1
})
wait := e.backoff
e.mu.RUnlock()
time.Sleep(wait)
e.mu.Lock()
e.backoff = min(e.backoff*2, maxBackoff)
e.mu.Unlock()
}
75 changes: 33 additions & 42 deletions frankenphp.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ PHP_FUNCTION(frankenphp_finish_request) { /* {{{ */
php_header();

if (ctx->has_active_request) {
go_frankenphp_finish_request(thread_index, false);
go_frankenphp_finish_php_request(thread_index);
}

ctx->finished = true;
Expand Down Expand Up @@ -443,7 +443,7 @@ PHP_FUNCTION(frankenphp_handle_request) {

frankenphp_worker_request_shutdown();
ctx->has_active_request = false;
go_frankenphp_finish_request(thread_index, true);
go_frankenphp_finish_worker_request(thread_index);

RETURN_TRUE;
}
Expand Down Expand Up @@ -808,9 +808,9 @@ static void set_thread_name(char *thread_name) {
}

static void *php_thread(void *arg) {
char thread_name[16] = {0};
snprintf(thread_name, 16, "php-%" PRIxPTR, (uintptr_t)arg);
thread_index = (uintptr_t)arg;
char thread_name[16] = {0};
snprintf(thread_name, 16, "php-%" PRIxPTR, thread_index);
set_thread_name(thread_name);

#ifdef ZTS
Expand All @@ -820,7 +820,6 @@ static void *php_thread(void *arg) {
ZEND_TSRMLS_CACHE_UPDATE();
#endif
#endif

local_ctx = malloc(sizeof(frankenphp_server_context));

/* check if a default filter is set in php.ini and only filter if
Expand All @@ -829,7 +828,20 @@ static void *php_thread(void *arg) {
cfg_get_string("filter.default", &default_filter);
should_filter_var = default_filter != NULL;

while (go_handle_request(thread_index)) {
// perform work until go signals to stop
while (true) {
char *scriptName = go_frankenphp_before_script_execution(thread_index);

// if the script name is NULL, the thread should exit
if (scriptName == NULL) {
break;
}

// if the script name is not empty, execute the PHP script
if (strlen(scriptName) != 0) {
int exit_status = frankenphp_execute_script(scriptName);
go_frankenphp_after_script_execution(thread_index, exit_status);
}
}

go_frankenphp_release_known_variable_keys(thread_index);
Expand All @@ -838,6 +850,8 @@ static void *php_thread(void *arg) {
ts_free_thread();
#endif

go_frankenphp_on_thread_shutdown(thread_index);

return NULL;
}

Expand All @@ -855,13 +869,11 @@ static void *php_main(void *arg) {
exit(EXIT_FAILURE);
}

intptr_t num_threads = (intptr_t)arg;

set_thread_name("php-main");

#ifdef ZTS
#if (PHP_VERSION_ID >= 80300)
php_tsrm_startup_ex(num_threads);
php_tsrm_startup_ex((intptr_t)arg);
#else
php_tsrm_startup();
#endif
Expand Down Expand Up @@ -889,28 +901,7 @@ static void *php_main(void *arg) {

frankenphp_sapi_module.startup(&frankenphp_sapi_module);

pthread_t *threads = malloc(num_threads * sizeof(pthread_t));
if (threads == NULL) {
perror("malloc failed");
exit(EXIT_FAILURE);
}

for (uintptr_t i = 0; i < num_threads; i++) {
if (pthread_create(&(*(threads + i)), NULL, &php_thread, (void *)i) != 0) {
perror("failed to create PHP thread");
free(threads);
exit(EXIT_FAILURE);
}
}

for (int i = 0; i < num_threads; i++) {
if (pthread_join((*(threads + i)), NULL) != 0) {
perror("failed to join PHP thread");
free(threads);
exit(EXIT_FAILURE);
}
}
free(threads);
go_frankenphp_main_thread_is_ready();

/* channel closed, shutdown gracefully */
frankenphp_sapi_module.shutdown(&frankenphp_sapi_module);
Expand All @@ -926,25 +917,29 @@ static void *php_main(void *arg) {
frankenphp_sapi_module.ini_entries = NULL;
}
#endif

go_shutdown();

go_frankenphp_shutdown_main_thread();
return NULL;
}

int frankenphp_init(int num_threads) {
int frankenphp_new_main_thread(int num_threads) {
pthread_t thread;

if (pthread_create(&thread, NULL, &php_main, (void *)(intptr_t)num_threads) !=
0) {
go_shutdown();

return -1;
}

return pthread_detach(thread);
}

bool frankenphp_new_php_thread(uintptr_t thread_index) {
pthread_t thread;
if (pthread_create(&thread, NULL, &php_thread, (void *)thread_index) != 0) {
return false;
}
pthread_detach(thread);
return true;
}

int frankenphp_request_startup() {
if (php_request_startup() == SUCCESS) {
return SUCCESS;
Expand All @@ -957,8 +952,6 @@ int frankenphp_request_startup() {

int frankenphp_execute_script(char *file_name) {
if (frankenphp_request_startup() == FAILURE) {
free(file_name);
file_name = NULL;

return FAILURE;
}
Expand All @@ -967,8 +960,6 @@ int frankenphp_execute_script(char *file_name) {

zend_file_handle file_handle;
zend_stream_init_filename(&file_handle, file_name);
free(file_name);
file_name = NULL;

file_handle.primary_script = 1;

Expand Down
Loading
Loading