Skip to content

Commit

Permalink
GenericClientResponse of grpc while copyable, reports memory leak on …
Browse files Browse the repository at this point in the history
…some cases. (#232)

Avoided to copy the entire bytebuffer, but just the slice which is individually
refcounted.

Added more allocator and minor improvements in buffer
  • Loading branch information
hkadayam authored Apr 23, 2024
1 parent 370c772 commit 8808be8
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 33 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "12.1.1"
version = "12.2.1"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
46 changes: 45 additions & 1 deletion include/sisl/fds/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,45 @@ class AlignedAllocator {
AlignedAllocatorMetrics m_metrics;
};

template < typename T, std::size_t Alignment = 512 >
class AlignedTypeAllocator {
private:
static_assert(Alignment >= alignof(T),
"Beware that types like int have minimum alignment requirements "
"or access will result in crashes.");

public:
/**
* This is only necessary because AlignedAllocator has a second template
* argument for the alignment that will make the default
* std::allocator_traits implementation fail during compilation.
* @see https://stackoverflow.com/a/48062758/2191065
*
* Taken this from link:
* https://stackoverflow.com/questions/60169819/modern-approach-to-making-stdvector-allocate-aligned-memory
*/
template < class OtherT >
struct rebind {
using other = AlignedTypeAllocator< OtherT, Alignment >;
};

public:
constexpr AlignedTypeAllocator() noexcept = default;
constexpr AlignedTypeAllocator(const AlignedTypeAllocator&) noexcept = default;

template < typename U >
constexpr AlignedTypeAllocator(AlignedTypeAllocator< U, Alignment > const&) noexcept {}

T* allocate(std::size_t nelems) {
if (nelems > std::numeric_limits< std::size_t >::max() / sizeof(T)) { throw std::bad_array_new_length(); }
return r_cast< T* >(AlignedAllocator::allocator().aligned_alloc(Alignment, nelems * sizeof(T), buftag::common));
}

void deallocate(T* ptr, [[maybe_unused]] std::size_t nbytes) {
AlignedAllocator::allocator().aligned_free(uintptr_cast(ptr), buftag::common);
}
};

#define sisl_aligned_alloc sisl::AlignedAllocator::allocator().aligned_alloc
#define sisl_aligned_free sisl::AlignedAllocator::allocator().aligned_free
#define sisl_aligned_realloc sisl::AlignedAllocator::allocator().aligned_realloc
Expand Down Expand Up @@ -273,6 +312,9 @@ class aligned_shared_ptr : public std::shared_ptr< T > {
aligned_shared_ptr(T* p) : std::shared_ptr< T >(p) {}
};

template < typename T, std::size_t Alignment = 512 >
using aligned_vector = std::vector< T, AlignedTypeAllocator< T, Alignment > >;

struct io_blob;
using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >;

Expand Down Expand Up @@ -338,7 +380,7 @@ struct io_blob : public blob {
*/
struct io_blob_safe final : public io_blob {
public:
buftag m_tag;
buftag m_tag{buftag::common};

public:
io_blob_safe() = default;
Expand Down Expand Up @@ -367,6 +409,8 @@ struct io_blob_safe final : public io_blob {
other.size_ = 0;
return *this;
}

void buf_alloc(size_t sz, uint32_t align_size = 512) { io_blob::buf_alloc(sz, align_size, m_tag); }
};

using byte_array_impl = io_blob_safe;
Expand Down
5 changes: 2 additions & 3 deletions include/sisl/grpc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,18 @@ class ClientRpcDataFuture : public ClientRpcDataInternal< ReqT, RespT > {
folly::Promise< Result< RespT > > m_promise;
};

class GenericClientResponse {
class GenericClientResponse : public ObjLifeCounter< GenericClientResponse > {
public:
GenericClientResponse() = default;
GenericClientResponse(grpc::ByteBuffer const& buf) : m_response_buf{buf} {}

GenericClientResponse(GenericClientResponse&& other);
GenericClientResponse& operator=(GenericClientResponse&& other);
GenericClientResponse(GenericClientResponse const& other) = default;
GenericClientResponse& operator=(GenericClientResponse const& other) = default;
GenericClientResponse& operator=(GenericClientResponse const& other);
~GenericClientResponse() = default;

io_blob response_blob();
grpc::ByteBuffer const& response_buf(bool need_contiguous = true);

private:
grpc::ByteBuffer m_response_buf;
Expand Down
2 changes: 1 addition & 1 deletion include/sisl/utility/obj_life_counter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ struct ObjLifeCounter {
s_alive.fetch_sub(1, std::memory_order_relaxed);
}

ObjLifeCounter(const ObjLifeCounter& o) noexcept { s_alive.fetch_add(1, std::memory_order_relaxed); }
ObjLifeCounter(const ObjLifeCounter&) noexcept { s_alive.fetch_add(1, std::memory_order_relaxed); }
static std::atomic< int64_t > s_created;
static std::atomic< int64_t > s_alive;
static ObjTypeWrapper< T > s_type;
Expand Down
26 changes: 14 additions & 12 deletions src/grpc/rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,21 +196,23 @@ GenericClientResponse& GenericClientResponse::operator=(GenericClientResponse&&
return *this;
}

grpc::ByteBuffer const& GenericClientResponse::response_buf(bool need_contiguous) {
if (!need_contiguous || m_single_slice.size() || !m_response_buf.Valid()) { return m_response_buf; }

auto status = m_response_buf.TrySingleSlice(&m_single_slice);
if (status.ok()) { return m_response_buf; }

status = m_response_buf.DumpToSingleSlice(&m_single_slice);
RELEASE_ASSERT(status.ok(), "Failed to deserialize response: code: {}. msg: {}",
static_cast< int >(status.error_code()), status.error_message());

return m_response_buf;
GenericClientResponse& GenericClientResponse::operator=(GenericClientResponse const& other) {
m_response_buf = other.m_response_buf;
m_single_slice = other.m_single_slice;
return *this;
}

io_blob GenericClientResponse::response_blob() {
response_buf(true /* need_contiguous */);
if ((m_single_slice.size() == 0) && m_response_buf.Valid()) {
auto status = m_response_buf.TrySingleSlice(&m_single_slice);
if (!status.ok()) {
status = m_response_buf.DumpToSingleSlice(&m_single_slice);
RELEASE_ASSERT(status.ok(), "Failed to deserialize response: code: {}. msg: {}",
static_cast< int >(status.error_code()), status.error_message());
}
m_response_buf.Clear(); // Since we dumped everything to a single slice, we don't need bytebyffer anymore
}

auto const size = uint32_cast(m_single_slice.size());
return size ? io_blob{m_single_slice.begin(), size, false /* is_aligned */} : io_blob{};
}
Expand Down
17 changes: 2 additions & 15 deletions src/grpc/tests/unit/client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ using namespace ::grpc;
buffer.Swap(&tmp);
}

static std::string DeserializeFromBuffer(ByteBuffer const& buffer) {
[[maybe_unused]] static std::string DeserializeFromBuffer(ByteBuffer const& buffer) {
std::vector< grpc::Slice > slices;
(void)buffer.Dump(&slices);
std::string buf;
Expand Down Expand Up @@ -67,18 +67,14 @@ static std::string blob_to_string(io_blob const& b) { return std::string(c_charp
static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) {
GenericClientResponse resp1(bbuf);
{
auto const& rbuf1 = resp1.response_buf();
EXPECT_EQ(msg, DeserializeFromBuffer(rbuf1));
// EXPECT_EQ(msg, DeserializeFromBuffer(rbuf1));
EXPECT_EQ(msg, blob_to_string(resp1.response_blob()));
}

// move construction
GenericClientResponse resp2(std::move(resp1));
{
EXPECT_EQ(msg, blob_to_string(resp2.response_blob()));
EXPECT_TRUE(resp2.response_buf().Valid());

EXPECT_FALSE(resp1.response_buf().Valid());
auto blb1 = resp1.response_blob();
EXPECT_EQ(blb1.size(), 0);
EXPECT_EQ(blb1.cbytes(), nullptr);
Expand All @@ -89,9 +85,6 @@ static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) {
resp3 = std::move(resp2);
{
EXPECT_EQ(msg, blob_to_string(resp3.response_blob()));
EXPECT_TRUE(resp3.response_buf().Valid());

EXPECT_FALSE(resp2.response_buf().Valid());
auto blb2 = resp2.response_blob();
EXPECT_EQ(blb2.size(), 0);
EXPECT_EQ(blb2.cbytes(), nullptr);
Expand All @@ -101,9 +94,6 @@ static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) {
{
GenericClientResponse resp4(resp3);
EXPECT_EQ(msg, blob_to_string(resp4.response_blob()));
EXPECT_TRUE(resp4.response_buf().Valid());

EXPECT_TRUE(resp3.response_buf().Valid());
EXPECT_EQ(msg, blob_to_string(resp3.response_blob()));
}

Expand All @@ -112,9 +102,6 @@ static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) {
GenericClientResponse resp5;
resp5 = resp3;
EXPECT_EQ(msg, blob_to_string(resp5.response_blob()));
EXPECT_TRUE(resp5.response_buf().Valid());

EXPECT_TRUE(resp3.response_buf().Valid());
EXPECT_EQ(msg, blob_to_string(resp3.response_blob()));
}
}
Expand Down

0 comments on commit 8808be8

Please sign in to comment.