Skip to content

Commit

Permalink
PCBC-984: fix compatiblity with pcntl_fork() (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
avsej authored Mar 27, 2024
1 parent 4807e92 commit fcae782
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 21 deletions.
28 changes: 28 additions & 0 deletions Couchbase/Cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,34 @@ function (string $connectionString, ClusterOptions $options) {
);
}

/**
* Notifies the SDK about usage of `fork(2)` syscall. Typically PHP exposes it using `pcntl_fork()` function, but
* the library should have chance to properly close descriptors and reach safe point to allow forking the process.
* This is not a problem in case of `proc_open()` as in this case the memory and descriptors are not inherited by
* the child process.
*
* Allowed values for `$event` are:
*
* * ForkEvent::PREPARE - must be used before `fork()` to ensure the SDK reaches safe point
* * ForkEvent::CHILD - must be used in the child process, the branch where `pcntl_fork()` returns zero
* * ForkEvent::PARENT - must be used in the parent process, the branch where `pcntl_fork()` returns pid of the child process
*
* In case `pcntl_fork()` returns negative value, and the application decides to continue, `notifyFork(ForkEvent::PARENT)`
* must be invoked to resume the SDK.
*
* @see https://www.php.net/manual/en/function.pcntl-fork.php
* @see https://www.php.net/manual/en/function.proc-open.php
*
* @param string $event type of the event to send to the library (one of the constants in ForkEvent).
* @return void
*
* @since 4.2.1
*/
public static function notifyFork(string $event)
{
return Extension\notifyFork($event);
}

/**
* Returns a new bucket object.
*
Expand Down
51 changes: 51 additions & 0 deletions Couchbase/ForkEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

/**
* Copyright 2014-Present Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

declare(strict_types=1);

namespace Couchbase;

/**
* ForkEvent defines types of events, that can happen when forking the process.
*
* @see \Couchbase\Cluster::notifyFork()
* @since 4.2.1
*/
interface ForkEvent
{
/**
* Prepare the library for fork() call. This event should be used in the parent process before
* invoking `pcntl_fork()`. Once \Couchbase\Cluster::notifyFork() the library reaches the safe
* state when it is ready for fork() syscall (i.e. no background threads running, all operations
* completed, etc.)
*/
public const PREPARE = "prepare";

/**
* Resume progress of the child process. This usually gives the library the chance to open new
* connections, and restart IO threads.
*/
public const CHILD = "child";

/**
* Resume progress of the parent process. Typically parent process could continue using all
* descriptors that were open before fork process, and also the library will restart background
* IO threads.
*/
public const PARENT = "parent";
}
21 changes: 21 additions & 0 deletions src/php_couchbase.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,22 @@ PHP_FUNCTION(version)
couchbase::php::core_version(return_value);
}

PHP_FUNCTION(notifyFork)
{
zend_string* fork_event = nullptr;

ZEND_PARSE_PARAMETERS_START(1, 1)
Z_PARAM_STR(fork_event)
ZEND_PARSE_PARAMETERS_END();

if (auto e = couchbase::php::notify_fork(fork_event); e.ec) {
couchbase_throw_exception(e);
RETURN_THROWS();
}

RETURN_NULL();
}

PHP_FUNCTION(createConnection)
{
zend_string* connection_hash = nullptr;
Expand Down Expand Up @@ -3226,6 +3242,10 @@ static PHP_MINFO_FUNCTION(couchbase)
ZEND_BEGIN_ARG_INFO_EX(ai_CouchbaseExtension_version, 0, 0, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(ai_CouchbaseExtension_notifyFork, 0, 0, 1)
ZEND_ARG_TYPE_INFO(0, forkEvent, IS_STRING, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(ai_CouchbaseExtension_clusterVersion, 0, 0, 2)
ZEND_ARG_INFO(0, connection)
ZEND_ARG_TYPE_INFO(0, bucketName, IS_STRING, 0)
Expand Down Expand Up @@ -3998,6 +4018,7 @@ ZEND_END_ARG_INFO()

// clang-format off
static zend_function_entry couchbase_functions[] = {
ZEND_NS_FE("Couchbase\\Extension", notifyFork, ai_CouchbaseExtension_notifyFork)
ZEND_NS_FE("Couchbase\\Extension", version, ai_CouchbaseExtension_version)
ZEND_NS_FE("Couchbase\\Extension", clusterVersion, ai_CouchbaseExtension_clusterVersion)
ZEND_NS_FE("Couchbase\\Extension", replicasConfiguredForBucket, ai_CouchbaseExtension_replicasConfiguredForBucket)
Expand Down
126 changes: 106 additions & 20 deletions src/wrapper/connection_handle.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class connection_handle::impl : public std::enable_shared_from_this<connection_h

void start()
{
worker = std::thread([self = shared_from_this()]() { self->ctx_.run(); });
worker_ = std::thread([self = shared_from_this()]() { self->ctx_.run(); });
}

void stop()
Expand All @@ -418,8 +418,8 @@ class connection_handle::impl : public std::enable_shared_from_this<connection_h
cluster_->close([barrier]() { barrier->set_value(); });
f.wait();
cluster_.reset();
if (worker.joinable()) {
worker.join();
if (worker_.joinable()) {
worker_.join();
}
}
}
Expand Down Expand Up @@ -574,10 +574,33 @@ class connection_handle::impl : public std::enable_shared_from_this<connection_h
return couchbase::cluster(*cluster_).bucket(bucket).scope(scope).collection(collection);
}

void notify_fork(fork_event event)
{
switch (event) {
case fork_event::prepare:
ctx_.stop();
worker_.join();
ctx_.notify_fork(asio::execution_context::fork_prepare);
break;

case fork_event::parent:
ctx_.notify_fork(asio::execution_context::fork_parent);
ctx_.restart();
worker_ = std::thread([self = shared_from_this()]() { self->ctx_.run(); });
break;

case fork_event::child:
ctx_.notify_fork(asio::execution_context::fork_child);
ctx_.restart();
worker_ = std::thread([self = shared_from_this()]() { self->ctx_.run(); });
break;
}
}

private:
asio::io_context ctx_{};
std::shared_ptr<couchbase::core::cluster> cluster_{ std::make_shared<couchbase::core::cluster>(ctx_) };
std::thread worker;
std::thread worker_;
core::origin origin_;
};

Expand Down Expand Up @@ -620,6 +643,12 @@ connection_handle::replicas_configured_for_bucket(const zend_string* bucket_name
return impl_->replicas_configured_for_bucket(cb_string_new(bucket_name));
}

void
connection_handle::notify_fork(fork_event event) const
{
return impl_->notify_fork(event);
}

COUCHBASE_API
core_error_info
connection_handle::bucket_open(const std::string& name)
Expand Down Expand Up @@ -2659,9 +2688,9 @@ zval_to_search_index(couchbase::core::operations::management::search_index_upser
if (auto e = cb_assign_string(idx.plan_params_json, index, "planParams"); e.ec) {
return e;
}
request.index = idx;
request.index = idx;

return {};
return {};
}

COUCHBASE_API
Expand Down Expand Up @@ -2844,7 +2873,10 @@ connection_handle::search_index_control_plan_freeze(zval* return_value, const ze

COUCHBASE_API
core_error_info
connection_handle::search_index_analyze_document(zval* return_value, const zend_string* index_name, const zend_string* document, const zval* options)
connection_handle::search_index_analyze_document(zval* return_value,
const zend_string* index_name,
const zend_string* document,
const zval* options)
{
couchbase::core::operations::management::search_index_analyze_document_request request{};
request.index_name = cb_string_new(index_name);
Expand All @@ -2867,7 +2899,11 @@ connection_handle::search_index_analyze_document(zval* return_value, const zend_

COUCHBASE_API
core_error_info
connection_handle::scope_search_index_get(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, const zval* options)
connection_handle::scope_search_index_get(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zend_string* index_name,
const zval* options)
{
couchbase::core::operations::management::search_index_get_request request{ cb_string_new(index_name) };

Expand All @@ -2892,7 +2928,10 @@ connection_handle::scope_search_index_get(zval* return_value, const zend_string*

COUCHBASE_API
core_error_info
connection_handle::scope_search_index_get_all(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zval* options)
connection_handle::scope_search_index_get_all(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zval* options)
{
couchbase::core::operations::management::search_index_get_all_request request{};

Expand Down Expand Up @@ -2922,7 +2961,11 @@ connection_handle::scope_search_index_get_all(zval* return_value, const zend_str

COUCHBASE_API
core_error_info
connection_handle::scope_search_index_upsert(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zval* index, const zval* options)
connection_handle::scope_search_index_upsert(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zval* index,
const zval* options)
{
couchbase::core::operations::management::search_index_upsert_request request{};

Expand Down Expand Up @@ -2951,7 +2994,11 @@ connection_handle::scope_search_index_upsert(zval* return_value, const zend_stri

COUCHBASE_API
core_error_info
connection_handle::scope_search_index_drop(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, const zval* options)
connection_handle::scope_search_index_drop(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zend_string* index_name,
const zval* options)
{
couchbase::core::operations::management::search_index_drop_request request{ cb_string_new(index_name) };

Expand All @@ -2973,7 +3020,11 @@ connection_handle::scope_search_index_drop(zval* return_value, const zend_string

COUCHBASE_API
core_error_info
connection_handle::scope_search_index_get_documents_count(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, const zval* options)
connection_handle::scope_search_index_get_documents_count(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zend_string* index_name,
const zval* options)
{
couchbase::core::operations::management::search_index_get_documents_count_request request{ cb_string_new(index_name) };

Expand All @@ -2997,7 +3048,12 @@ connection_handle::scope_search_index_get_documents_count(zval* return_value, co

COUCHBASE_API
core_error_info
connection_handle::scope_search_index_control_ingest(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, bool pause, const zval* options)
connection_handle::scope_search_index_control_ingest(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zend_string* index_name,
bool pause,
const zval* options)
{
couchbase::core::operations::management::search_index_control_ingest_request request{};

Expand All @@ -3022,7 +3078,12 @@ connection_handle::scope_search_index_control_ingest(zval* return_value, const z

COUCHBASE_API
core_error_info
connection_handle::scope_search_index_control_query(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, bool allow, const zval* options)
connection_handle::scope_search_index_control_query(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zend_string* index_name,
bool allow,
const zval* options)
{
couchbase::core::operations::management::search_index_control_query_request request{};

Expand All @@ -3047,7 +3108,12 @@ connection_handle::scope_search_index_control_query(zval* return_value, const ze

COUCHBASE_API
core_error_info
connection_handle::scope_search_index_control_plan_freeze(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, bool freeze, const zval* options)
connection_handle::scope_search_index_control_plan_freeze(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zend_string* index_name,
bool freeze,
const zval* options)
{
couchbase::core::operations::management::search_index_control_plan_freeze_request request{};

Expand All @@ -3072,7 +3138,12 @@ connection_handle::scope_search_index_control_plan_freeze(zval* return_value, co

COUCHBASE_API
core_error_info
connection_handle::scope_search_index_analyze_document(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* index_name, const zend_string* document, const zval* options)
connection_handle::scope_search_index_analyze_document(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zend_string* index_name,
const zend_string* document,
const zval* options)
{
couchbase::core::operations::management::search_index_analyze_document_request request{};

Expand Down Expand Up @@ -3286,7 +3357,8 @@ zval_to_bucket_settings(const zval* bucket_settings)
} else if (e.ec) {
return { e, {} };
}
if (auto e = cb_assign_boolean(bucket.history_retention_collection_default, bucket_settings, "historyRetentionCollectionDefault"); e.ec) {
if (auto e = cb_assign_boolean(bucket.history_retention_collection_default, bucket_settings, "historyRetentionCollectionDefault");
e.ec) {
return { e, {} };
}
if (auto e = cb_assign_integer(bucket.history_retention_bytes, bucket_settings, "historyRetentionBytes"); e.ec) {
Expand Down Expand Up @@ -3655,7 +3727,12 @@ connection_handle::scope_drop(zval* return_value, const zend_string* bucket_name

COUCHBASE_API
core_error_info
connection_handle::collection_create(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* collection_name, const zval* settings, const zval* options)
connection_handle::collection_create(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zend_string* collection_name,
const zval* settings,
const zval* options)
{
couchbase::core::operations::management::collection_create_request request{};

Expand Down Expand Up @@ -3686,7 +3763,11 @@ connection_handle::collection_create(zval* return_value, const zend_string* buck

COUCHBASE_API
core_error_info
connection_handle::collection_drop(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* collection_name, const zval* options)
connection_handle::collection_drop(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zend_string* collection_name,
const zval* options)
{
couchbase::core::operations::management::collection_drop_request request{};

Expand All @@ -3709,7 +3790,12 @@ connection_handle::collection_drop(zval* return_value, const zend_string* bucket

COUCHBASE_API
core_error_info
connection_handle::collection_update(zval* return_value, const zend_string* bucket_name, const zend_string* scope_name, const zend_string* collection_name, const zval* settings, const zval* options)
connection_handle::collection_update(zval* return_value,
const zend_string* bucket_name,
const zend_string* scope_name,
const zend_string* collection_name,
const zval* settings,
const zval* options)
{
couchbase::core::operations::management::collection_update_request request{};

Expand Down
Loading

0 comments on commit fcae782

Please sign in to comment.