Skip to content

Commit

Permalink
remove lock since the plugin is sync-data-flow between c and go
Browse files Browse the repository at this point in the history
Signed-off-by: duxin40 <[email protected]>
  • Loading branch information
duxin40 committed Dec 31, 2024
1 parent 7eaef90 commit ac75587
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "source/common/buffer/buffer_impl.h"
#include "source/common/protobuf/utility.h"
#include "upstream_request.h"

namespace Envoy {
namespace Extensions {
Expand Down
2 changes: 1 addition & 1 deletion contrib/golang/upstreams/http/tcp/source/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/logger.h"
#include "source/common/http/codes.h"
#include "source/common/http/header_map_impl.h"
#include "source/common/http/utility.h"

#include "absl/status/status.h"
#include "contrib/golang/common/dso/dso.h"
#include "upstream_request.h"

namespace Envoy {
namespace Extensions {
Expand Down
105 changes: 37 additions & 68 deletions contrib/golang/upstreams/http/tcp/source/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ void TcpConnPool::onPoolReady(Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& co
HttpTcpBridge::HttpTcpBridge(Router::UpstreamToDownstream* upstream_request,
Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& upstream,
Dso::HttpTcpBridgeDsoPtr dynamic_lib, BridgeConfigSharedPtr config)
: encoding_state_(new EncodingProcessorState(*this)),
decoding_state_(new DecodingProcessorState(*this)),
: encoding_state_(*this),
decoding_state_(*this),
route_entry_(upstream_request->route().routeEntry()), upstream_request_(upstream_request),
upstream_conn_data_(std::move(upstream)), dynamic_lib_(dynamic_lib) {

Expand All @@ -133,18 +133,15 @@ HttpTcpBridge::HttpTcpBridge(Router::UpstreamToDownstream* upstream_request,
HttpTcpBridge::~HttpTcpBridge() {
ENVOY_LOG(debug, "golang http-tcp bridge on destroy");

ASSERT(!decoding_state_->isProcessingInGo() && !encoding_state_->isProcessingInGo());
ASSERT(!decoding_state_.isProcessingInGo() && !encoding_state_.isProcessingInGo());

dynamic_lib_->envoyGoHttpTcpBridgeOnDestroy(this);

delete encoding_state_;
delete decoding_state_;
}

void HttpTcpBridge::initResponse() {
if (decoding_state_->resp_headers == nullptr) {
if (decoding_state_.resp_headers == nullptr) {
auto headers{Envoy::Http::createHeaderMap<Envoy::Http::ResponseHeaderMapImpl>({})};
decoding_state_->resp_headers = std::move(headers);
decoding_state_.resp_headers = std::move(headers);
}
}

Expand All @@ -153,25 +150,24 @@ Envoy::Http::Status HttpTcpBridge::encodeHeaders(const Envoy::Http::RequestHeade
ENVOY_LOG(debug, "golang http-tcp bridge encodeHeaders, header size: {}, end_stream: {}",
headers.size(), end_stream);

encoding_state_->req_headers = &headers;
encoding_state_->setFilterState(FilterState::ProcessingHeader);
encoding_state_.req_headers = &headers;
encoding_state_.setFilterState(FilterState::ProcessingHeader);
Buffer::OwnedImpl buffer;
auto s = dynamic_cast<processState*>(encoding_state_);
auto s = dynamic_cast<processState*>(&encoding_state_);

GoUint64 go_status = dynamic_lib_->envoyGoHttpTcpBridgeOnEncodeHeader(
s, end_stream ? 1 : 0, headers.size(), headers.byteSize(),
reinterpret_cast<uint64_t>(&buffer), buffer.length());

encoding_state_->handleHeaderGolangStatus(static_cast<HttpTcpBridgeStatus>(go_status));
ENVOY_LOG(debug, "golang http-tcp bridge encodeHeaders, state: {}", encoding_state_->stateStr());
encoding_state_.handleHeaderGolangStatus(static_cast<HttpTcpBridgeStatus>(go_status));
ENVOY_LOG(debug, "golang http-tcp bridge encodeHeaders, state: {}", encoding_state_.stateStr());

// if go side set data for buffer, then we send it to upstream
bool send_data_to_upstream = (buffer.length() != 0);

trySendProxyData(send_data_to_upstream, end_stream);

if (send_data_to_upstream) {
Thread::LockGuard lock(mutex_for_c_and_go_);
upstream_conn_data_->connection().write(buffer, upstream_conn_self_half_close_);
}

Expand All @@ -182,28 +178,28 @@ void HttpTcpBridge::encodeData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(debug, "golang http-tcp bridge encodeData, data length: {}, end_stream: {}",
data.length(), end_stream);

switch (encoding_state_->filterState()) {
switch (encoding_state_.filterState()) {
case FilterState::WaitingData:
encodeDataGo(data, end_stream);
break;

case FilterState::WaitingAllData:
if (end_stream) {
if (!encoding_state_->isBufferDataEmpty()) {
if (!encoding_state_.isBufferDataEmpty()) {
// NP: new data = data_buffer_ + data
encoding_state_->addBufferData(data);
data.move(encoding_state_->getBufferData());
encoding_state_.addBufferData(data);
data.move(encoding_state_.getBufferData());
}
encodeDataGo(data, end_stream);
} else {
ENVOY_LOG(debug, "golang http-tcp bridge encodeData, appending data to buffer");
encoding_state_->addBufferData(data);
encoding_state_.addBufferData(data);
}
break;

default:
PANIC(fmt::format("golang http-tcp bridge encodeData, unexpected state: {}",
encoding_state_->stateStr()));
encoding_state_.stateStr()));
}
}

Expand Down Expand Up @@ -231,16 +227,23 @@ void HttpTcpBridge::onUpstreamData(Buffer::Instance& data, bool end_stream) {

initResponse();

decoding_state_->setFilterState(FilterState::ProcessingData);
auto s = dynamic_cast<processState*>(decoding_state_);
decoding_state_.setFilterState(FilterState::ProcessingData);
auto s = dynamic_cast<processState*>(&decoding_state_);

GoUint64 go_status = dynamic_lib_->envoyGoHttpTcpBridgeOnUpstreamData(
s, end_stream ? 1 : 0, (*decoding_state_->resp_headers).size(),
(*decoding_state_->resp_headers).byteSize(), reinterpret_cast<uint64_t>(&data),
s, end_stream ? 1 : 0, (*decoding_state_.resp_headers).size(),
(*decoding_state_.resp_headers).byteSize(), reinterpret_cast<uint64_t>(&data),
data.length());
decoding_state_->setFilterState(FilterState::Done);
decoding_state_.setFilterState(FilterState::Done);

switch (static_cast<HttpTcpBridgeStatus>(go_status)) {
case HttpTcpBridgeStatus::HttpTcpBridgeEndStream:
// endStream to downstream which means the whole resp to http has finished.
//
// from now on, any further data from upstream tcp conn will not come to onUpstreamData.

end_stream = true;

case HttpTcpBridgeStatus::HttpTcpBridgeContinue:
// go side in onUpstreamData will get each_data_piece, pass data and headers to downstream
// streaming.
Expand Down Expand Up @@ -272,21 +275,6 @@ void HttpTcpBridge::onUpstreamData(Buffer::Instance& data, bool end_stream) {
}
break;

case HttpTcpBridgeStatus::HttpTcpBridgeEndStream:
// endStream to downstream which means the whole resp to http has finished.
//
// from now on, any further data from upstream tcp conn will not come to onUpstreamData.

end_stream = true;
sendDataToDownstream(data, end_stream);

// TODO(duxin40): support use golang to set trailers and send it to
// downstream.(upstream_request_->decodeTrailers())

// http resp is end, clear data buffer
data.drain(data.length());
break;

default:
PANIC(fmt::format("golang http-tcp bridge onUpstreamData, unexpected go_tatus: {}", go_status));
}
Expand All @@ -306,7 +294,6 @@ void HttpTcpBridge::trySendProxyData(bool send_data_to_upstream, bool end_stream
}

if (data.length() != 0 || end_stream) {
Thread::LockGuard lock(mutex_for_c_and_go_);
bool self_half_close_connection =
end_stream && !send_data_to_upstream ? upstream_conn_self_half_close_ : false;
upstream_conn_data_->connection().write(data, self_half_close_connection);
Expand All @@ -318,19 +305,18 @@ void HttpTcpBridge::encodeDataGo(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(
debug,
"golang http-tcp bridge encodeDataGo, passing data to golang, state: {}, end_stream: {}",
encoding_state_->stateStr(), end_stream);
encoding_state_.stateStr(), end_stream);

encoding_state_->processData();
encoding_state_.processData();

auto s = dynamic_cast<processState*>(encoding_state_);
auto s = dynamic_cast<processState*>(&encoding_state_);
GoUint64 go_status = dynamic_lib_->envoyGoHttpTcpBridgeOnEncodeData(
s, end_stream ? 1 : 0, reinterpret_cast<uint64_t>(&data), data.length());

encoding_state_->handleDataGolangStatus(static_cast<HttpTcpBridgeStatus>(go_status), end_stream);
encoding_state_.handleDataGolangStatus(static_cast<HttpTcpBridgeStatus>(go_status), end_stream);

if (encoding_state_->filterState() == FilterState::Done ||
encoding_state_->filterState() == FilterState::WaitingData) {
Thread::LockGuard lock(mutex_for_c_and_go_);
if (encoding_state_.filterState() == FilterState::Done ||
encoding_state_.filterState() == FilterState::WaitingData) {
upstream_conn_data_->connection().write(data, upstream_conn_self_half_close_);
}
}
Expand All @@ -340,11 +326,11 @@ void HttpTcpBridge::sendDataToDownstream(Buffer::Instance& data, bool end_stream
ENVOY_LOG(debug, "golang http-tcp bridge send resp headers to downstream. end_stream: {}",
end_stream);
// we can send resp headers only one time.
if (!decoding_state_->resp_headers->Status()) {
if (!decoding_state_.resp_headers->Status()) {
// if go side not set status, c++ side set default status
decoding_state_->resp_headers->setStatus(static_cast<uint64_t>(HttpStatusCode::Success));
decoding_state_.resp_headers->setStatus(static_cast<uint64_t>(HttpStatusCode::Success));
}
upstream_request_->decodeHeaders(std::move(decoding_state_->resp_headers), false);
upstream_request_->decodeHeaders(std::move(decoding_state_.resp_headers), false);
already_send_resp_headers_ = true;
} else {
ENVOY_LOG(debug,
Expand Down Expand Up @@ -409,8 +395,6 @@ void copyHeaderMapToGo(const Envoy::Http::HeaderMap& m, GoString* go_strs, char*
}

CAPIStatus HttpTcpBridge::copyHeaders(ProcessorState& state, GoString* go_strs, char* go_buf) {
// lock until this function return since the same headers may be copied by multi Goroutines.
Thread::LockGuard lock(mutex_for_go_);
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang http-tcp bridge copyHeaders is not processing Go");
return CAPIStatus::CAPINotInGo;
Expand All @@ -436,8 +420,6 @@ CAPIStatus HttpTcpBridge::copyHeaders(ProcessorState& state, GoString* go_strs,

CAPIStatus HttpTcpBridge::setRespHeader(ProcessorState& state, absl::string_view key,
absl::string_view value, headerAction act) {
// lock until this function return since the resp header may be set by multi Goroutines.
Thread::LockGuard lock(mutex_for_go_);
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang http-tcp bridge setRespHeader is not processing Go");
return CAPIStatus::CAPINotInGo;
Expand Down Expand Up @@ -474,8 +456,6 @@ CAPIStatus HttpTcpBridge::setRespHeader(ProcessorState& state, absl::string_view
}

CAPIStatus HttpTcpBridge::removeRespHeader(ProcessorState& state, absl::string_view key) {
// lock until this function return since the resp header may be removed by multi Goroutines.
Thread::LockGuard lock(mutex_for_go_);
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang http-tcp bridge removeRespHeader is not processing Go");
return CAPIStatus::CAPINotInGo;
Expand All @@ -502,8 +482,6 @@ CAPIStatus HttpTcpBridge::removeRespHeader(ProcessorState& state, absl::string_v
}

CAPIStatus HttpTcpBridge::copyBuffer(ProcessorState& state, Buffer::Instance* buffer, char* data) {
// lock until this function return since the same buffer may be copied by multi Goroutines.
Thread::LockGuard lock(mutex_for_go_);
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang http-tcp bridge copyBuffer is not processing Go");
return CAPIStatus::CAPINotInGo;
Expand All @@ -519,8 +497,6 @@ CAPIStatus HttpTcpBridge::copyBuffer(ProcessorState& state, Buffer::Instance* bu

CAPIStatus HttpTcpBridge::drainBuffer(ProcessorState& state, Buffer::Instance* buffer,
uint64_t length) {
// lock until this function return since the same buffer may be drained by multi Goroutines.
Thread::LockGuard lock(mutex_for_go_);
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang http-tcp bridge drainBuffer is not processing Go");
return CAPIStatus::CAPINotInGo;
Expand All @@ -532,8 +508,6 @@ CAPIStatus HttpTcpBridge::drainBuffer(ProcessorState& state, Buffer::Instance* b

CAPIStatus HttpTcpBridge::setBufferHelper(ProcessorState& state, Buffer::Instance* buffer,
absl::string_view& value, bufferAction action) {
// lock until this function return since the same buffer may be set by multi Goroutines.
Thread::LockGuard lock(mutex_for_go_);
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang http-tcp bridge setBufferHelper is not processing Go");
return CAPIStatus::CAPINotInGo;
Expand All @@ -550,8 +524,6 @@ CAPIStatus HttpTcpBridge::setBufferHelper(ProcessorState& state, Buffer::Instanc
}

CAPIStatus HttpTcpBridge::getStringValue(int id, uint64_t* value_data, int* value_len) {
// lock until this function return since it be called by multi Goroutines.
Thread::LockGuard lock(mutex_for_c_and_go_);
// refer the string to str_value_, not deep clone, make sure it won't be freed while reading
// it on the Go side.
switch (static_cast<EnvoyValue>(id)) {
Expand All @@ -572,9 +544,6 @@ CAPIStatus HttpTcpBridge::getStringValue(int id, uint64_t* value_data, int* valu
}

CAPIStatus HttpTcpBridge::setSelfHalfCloseForUpstreamConn(int enabled) {
// lock until this function return since upstream_conn_self_half_close_ may be set by multi
// Goroutines.
Thread::LockGuard lock(mutex_for_c_and_go_);
if (enabled == 1) {
upstream_conn_self_half_close_ = true;
} else {
Expand Down
22 changes: 5 additions & 17 deletions contrib/golang/upstreams/http/tcp/source/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ namespace Http {
namespace Tcp {
namespace Golang {

class ProcessorState;
class DecodingProcessorState;
class EncodingProcessorState;

class HttpTcpBridge;

class BridgeConfig;
Expand Down Expand Up @@ -122,7 +118,7 @@ class TcpConnPool : public Router::GenericConnPool,
* The bridge enables an HTTP client to connect to a TCP server via a Golang plugin, facilitating
* Protocol Convert from HTTP to any RPC protocol in Envoy.
*
* Notice: the bridge is designed for sync data flow between go and c, so DO NOT use goroutine in
* Notice: the bridge is designed for sync-data-flow between go and c, so DO NOT use goroutine in
* go side.
*
*/
Expand Down Expand Up @@ -184,11 +180,8 @@ class HttpTcpBridge : public Router::GenericUpstream,
CAPIStatus getStringValue(int id, uint64_t* value_data, int* value_len);
CAPIStatus setSelfHalfCloseForUpstreamConn(int enabled);

DecodingProcessorState* decodingState() { return decoding_state_; }
EncodingProcessorState* encodingState() { return encoding_state_; }

EncodingProcessorState* encoding_state_;
DecodingProcessorState* decoding_state_;
EncodingProcessorState encoding_state_;
DecodingProcessorState decoding_state_;

const Router::RouteEntry* route_entry_;

Expand All @@ -205,14 +198,9 @@ class HttpTcpBridge : public Router::GenericUpstream,

bool already_send_resp_headers_{false};

// lock to avoid race in c thread between multi go code calls(go call c) and native c code.
Thread::MutexBasicLockable mutex_for_c_and_go_{};
// anchor a string temporarily, make sure it won't be freed before copied to Go.
std::string str_value_ ABSL_GUARDED_BY(mutex_for_c_and_go_);
bool upstream_conn_self_half_close_ ABSL_GUARDED_BY(mutex_for_c_and_go_){false};

// lock to avoid race in c thread between multi go code calls(go call c).
Thread::MutexBasicLockable mutex_for_go_{};
std::string str_value_;
bool upstream_conn_self_half_close_ {false};
};

} // namespace Golang
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ go_binary(
name = "filter.so",
srcs = [
"config.go",
"dubbo.apache.org/dubbo-go/v3/common/constant",
"dubbo.apache.org/dubbo-go/v3/filter/generic/generalizer",
"dubbo.apache.org/dubbo-go/v3/protocol",
"dubbo.apache.org/dubbo-go/v3/protocol/dubbo",
"dubbo.apache.org/dubbo-go/v3/protocol/invocation",
"dubbo.apache.org/dubbo-go/v3/remoting",
# "dubbo.apache.org/dubbo-go/v3/common/constant",
# "dubbo.apache.org/dubbo-go/v3/filter/generic/generalizer",
# "dubbo.apache.org/dubbo-go/v3/protocol",
# "dubbo.apache.org/dubbo-go/v3/protocol/dubbo",
# "dubbo.apache.org/dubbo-go/v3/protocol/invocation",
# "dubbo.apache.org/dubbo-go/v3/remoting",
"filter.go",
"github.com/apache/dubbo-go-hessian2",
"github.com/cncf/xds/go/xds/type/v3",
# "github.com/apache/dubbo-go-hessian2",
# "github.com/cncf/xds/go/xds/type/v3",
],
out = "filter.so",
cgo = True,
Expand Down

0 comments on commit ac75587

Please sign in to comment.