Skip to content

Commit

Permalink
GenericClientResponse of grpc while copyable, reports memory leak on …
Browse files Browse the repository at this point in the history
…some cases.

Avoided to copy the entire bytebuffer, but just the slice which is individually
refcounted.

Added more allocator and minor improvements in buffer
  • Loading branch information
hkadayam committed Apr 23, 2024
1 parent 370c772 commit 9627788
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 33 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "12.1.1"
version = "12.2.1"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
46 changes: 45 additions & 1 deletion include/sisl/fds/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,45 @@ class AlignedAllocator {
AlignedAllocatorMetrics m_metrics;
};

template < typename T, std::size_t Alignment = 512 >
class AlignedTypeAllocator {
private:
static_assert(Alignment >= alignof(T),
"Beware that types like int have minimum alignment requirements "
"or access will result in crashes.");

public:
/**
* This is only necessary because AlignedAllocator has a second template
* argument for the alignment that will make the default
* std::allocator_traits implementation fail during compilation.
* @see https://stackoverflow.com/a/48062758/2191065
*
* Taken this from link:
* https://stackoverflow.com/questions/60169819/modern-approach-to-making-stdvector-allocate-aligned-memory
*/
template < class OtherT >
struct rebind {
using other = AlignedTypeAllocator< OtherT, Alignment >;
};

public:
constexpr AlignedTypeAllocator() noexcept = default;
constexpr AlignedTypeAllocator(const AlignedTypeAllocator&) noexcept = default;

template < typename U >
constexpr AlignedTypeAllocator(AlignedTypeAllocator< U, Alignment > const&) noexcept {}

T* allocate(std::size_t nelems) {
if (nelems > std::numeric_limits< std::size_t >::max() / sizeof(T)) { throw std::bad_array_new_length(); }
return r_cast< T* >(AlignedAllocator::allocator().aligned_alloc(Alignment, nelems * sizeof(T), buftag::common));
}

void deallocate(T* ptr, [[maybe_unused]] std::size_t nbytes) {
AlignedAllocator::allocator().aligned_free(uintptr_cast(ptr), buftag::common);
}
};

#define sisl_aligned_alloc sisl::AlignedAllocator::allocator().aligned_alloc
#define sisl_aligned_free sisl::AlignedAllocator::allocator().aligned_free
#define sisl_aligned_realloc sisl::AlignedAllocator::allocator().aligned_realloc
Expand Down Expand Up @@ -273,6 +312,9 @@ class aligned_shared_ptr : public std::shared_ptr< T > {
aligned_shared_ptr(T* p) : std::shared_ptr< T >(p) {}
};

template < typename T, std::size_t Alignment = 512 >
using aligned_vector = std::vector< T, AlignedTypeAllocator< T, Alignment > >;

struct io_blob;
using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >;

Expand Down Expand Up @@ -338,7 +380,7 @@ struct io_blob : public blob {
*/
struct io_blob_safe final : public io_blob {
public:
buftag m_tag;
buftag m_tag{buftag::common};

public:
io_blob_safe() = default;
Expand Down Expand Up @@ -367,6 +409,8 @@ struct io_blob_safe final : public io_blob {
other.size_ = 0;
return *this;
}

void buf_alloc(size_t sz, uint32_t align_size = 512) { io_blob::buf_alloc(sz, align_size, m_tag); }
};

using byte_array_impl = io_blob_safe;
Expand Down
5 changes: 2 additions & 3 deletions include/sisl/grpc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,18 @@ class ClientRpcDataFuture : public ClientRpcDataInternal< ReqT, RespT > {
folly::Promise< Result< RespT > > m_promise;
};

class GenericClientResponse {
class GenericClientResponse : public ObjLifeCounter< GenericClientResponse > {
public:
GenericClientResponse() = default;
GenericClientResponse(grpc::ByteBuffer const& buf) : m_response_buf{buf} {}

GenericClientResponse(GenericClientResponse&& other);
GenericClientResponse& operator=(GenericClientResponse&& other);
GenericClientResponse(GenericClientResponse const& other) = default;
GenericClientResponse& operator=(GenericClientResponse const& other) = default;
GenericClientResponse& operator=(GenericClientResponse const& other);
~GenericClientResponse() = default;

io_blob response_blob();
grpc::ByteBuffer const& response_buf(bool need_contiguous = true);

private:
grpc::ByteBuffer m_response_buf;
Expand Down
2 changes: 1 addition & 1 deletion include/sisl/utility/obj_life_counter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ struct ObjLifeCounter {
s_alive.fetch_sub(1, std::memory_order_relaxed);
}

ObjLifeCounter(const ObjLifeCounter& o) noexcept { s_alive.fetch_add(1, std::memory_order_relaxed); }
ObjLifeCounter(const ObjLifeCounter&) noexcept { s_alive.fetch_add(1, std::memory_order_relaxed); }
static std::atomic< int64_t > s_created;
static std::atomic< int64_t > s_alive;
static ObjTypeWrapper< T > s_type;
Expand Down
26 changes: 14 additions & 12 deletions src/grpc/rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,21 +196,23 @@ GenericClientResponse& GenericClientResponse::operator=(GenericClientResponse&&
return *this;
}

grpc::ByteBuffer const& GenericClientResponse::response_buf(bool need_contiguous) {
if (!need_contiguous || m_single_slice.size() || !m_response_buf.Valid()) { return m_response_buf; }

auto status = m_response_buf.TrySingleSlice(&m_single_slice);
if (status.ok()) { return m_response_buf; }

status = m_response_buf.DumpToSingleSlice(&m_single_slice);
RELEASE_ASSERT(status.ok(), "Failed to deserialize response: code: {}. msg: {}",
static_cast< int >(status.error_code()), status.error_message());

return m_response_buf;
GenericClientResponse& GenericClientResponse::operator=(GenericClientResponse const& other) {
m_response_buf = other.m_response_buf;
m_single_slice = other.m_single_slice;
return *this;
}

io_blob GenericClientResponse::response_blob() {
response_buf(true /* need_contiguous */);
if ((m_single_slice.size() == 0) && m_response_buf.Valid()) {
auto status = m_response_buf.TrySingleSlice(&m_single_slice);
if (!status.ok()) {
status = m_response_buf.DumpToSingleSlice(&m_single_slice);
RELEASE_ASSERT(status.ok(), "Failed to deserialize response: code: {}. msg: {}",
static_cast< int >(status.error_code()), status.error_message());
}
m_response_buf.Clear(); // Since we dumped everything to a single slice, we don't need bytebyffer anymore
}

auto const size = uint32_cast(m_single_slice.size());
return size ? io_blob{m_single_slice.begin(), size, false /* is_aligned */} : io_blob{};
}
Expand Down
17 changes: 2 additions & 15 deletions src/grpc/tests/unit/client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ using namespace ::grpc;
buffer.Swap(&tmp);
}

static std::string DeserializeFromBuffer(ByteBuffer const& buffer) {
[[maybe_unused]] static std::string DeserializeFromBuffer(ByteBuffer const& buffer) {
std::vector< grpc::Slice > slices;
(void)buffer.Dump(&slices);
std::string buf;
Expand Down Expand Up @@ -67,18 +67,14 @@ static std::string blob_to_string(io_blob const& b) { return std::string(c_charp
static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) {
GenericClientResponse resp1(bbuf);
{
auto const& rbuf1 = resp1.response_buf();
EXPECT_EQ(msg, DeserializeFromBuffer(rbuf1));
// EXPECT_EQ(msg, DeserializeFromBuffer(rbuf1));
EXPECT_EQ(msg, blob_to_string(resp1.response_blob()));
}

// move construction
GenericClientResponse resp2(std::move(resp1));
{
EXPECT_EQ(msg, blob_to_string(resp2.response_blob()));
EXPECT_TRUE(resp2.response_buf().Valid());

EXPECT_FALSE(resp1.response_buf().Valid());
auto blb1 = resp1.response_blob();
EXPECT_EQ(blb1.size(), 0);
EXPECT_EQ(blb1.cbytes(), nullptr);
Expand All @@ -89,9 +85,6 @@ static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) {
resp3 = std::move(resp2);
{
EXPECT_EQ(msg, blob_to_string(resp3.response_blob()));
EXPECT_TRUE(resp3.response_buf().Valid());

EXPECT_FALSE(resp2.response_buf().Valid());
auto blb2 = resp2.response_blob();
EXPECT_EQ(blb2.size(), 0);
EXPECT_EQ(blb2.cbytes(), nullptr);
Expand All @@ -101,9 +94,6 @@ static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) {
{
GenericClientResponse resp4(resp3);
EXPECT_EQ(msg, blob_to_string(resp4.response_blob()));
EXPECT_TRUE(resp4.response_buf().Valid());

EXPECT_TRUE(resp3.response_buf().Valid());
EXPECT_EQ(msg, blob_to_string(resp3.response_blob()));
}

Expand All @@ -112,9 +102,6 @@ static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) {
GenericClientResponse resp5;
resp5 = resp3;
EXPECT_EQ(msg, blob_to_string(resp5.response_blob()));
EXPECT_TRUE(resp5.response_buf().Valid());

EXPECT_TRUE(resp3.response_buf().Valid());
EXPECT_EQ(msg, blob_to_string(resp3.response_blob()));
}
}
Expand Down

0 comments on commit 9627788

Please sign in to comment.