From 8808be84afd8a967f83feb437e3652a87bf13b46 Mon Sep 17 00:00:00 2001 From: Harihara Kadayam Date: Tue, 23 Apr 2024 10:21:13 -0700 Subject: [PATCH] GenericClientResponse of grpc while copyable, reports memory leak on some cases. (#232) Avoided to copy the entire bytebuffer, but just the slice which is individually refcounted. Added more allocator and minor improvements in buffer --- conanfile.py | 2 +- include/sisl/fds/buffer.hpp | 46 ++++++++++++++++++++++- include/sisl/grpc/rpc_client.hpp | 5 +-- include/sisl/utility/obj_life_counter.hpp | 2 +- src/grpc/rpc_client.cpp | 26 +++++++------ src/grpc/tests/unit/client_test.cpp | 17 +-------- 6 files changed, 65 insertions(+), 33 deletions(-) diff --git a/conanfile.py b/conanfile.py index 20936370..d3a0e2cc 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class SISLConan(ConanFile): name = "sisl" - version = "12.1.1" + version = "12.2.1" homepage = "https://github.com/eBay/sisl" description = "Library for fast data structures, utilities" diff --git a/include/sisl/fds/buffer.hpp b/include/sisl/fds/buffer.hpp index 8f514d50..56b5f8ca 100644 --- a/include/sisl/fds/buffer.hpp +++ b/include/sisl/fds/buffer.hpp @@ -229,6 +229,45 @@ class AlignedAllocator { AlignedAllocatorMetrics m_metrics; }; +template < typename T, std::size_t Alignment = 512 > +class AlignedTypeAllocator { +private: + static_assert(Alignment >= alignof(T), + "Beware that types like int have minimum alignment requirements " + "or access will result in crashes."); + +public: + /** + * This is only necessary because AlignedAllocator has a second template + * argument for the alignment that will make the default + * std::allocator_traits implementation fail during compilation. + * @see https://stackoverflow.com/a/48062758/2191065 + * + * Taken this from link: + * https://stackoverflow.com/questions/60169819/modern-approach-to-making-stdvector-allocate-aligned-memory + */ + template < class OtherT > + struct rebind { + using other = AlignedTypeAllocator< OtherT, Alignment >; + }; + +public: + constexpr AlignedTypeAllocator() noexcept = default; + constexpr AlignedTypeAllocator(const AlignedTypeAllocator&) noexcept = default; + + template < typename U > + constexpr AlignedTypeAllocator(AlignedTypeAllocator< U, Alignment > const&) noexcept {} + + T* allocate(std::size_t nelems) { + if (nelems > std::numeric_limits< std::size_t >::max() / sizeof(T)) { throw std::bad_array_new_length(); } + return r_cast< T* >(AlignedAllocator::allocator().aligned_alloc(Alignment, nelems * sizeof(T), buftag::common)); + } + + void deallocate(T* ptr, [[maybe_unused]] std::size_t nbytes) { + AlignedAllocator::allocator().aligned_free(uintptr_cast(ptr), buftag::common); + } +}; + #define sisl_aligned_alloc sisl::AlignedAllocator::allocator().aligned_alloc #define sisl_aligned_free sisl::AlignedAllocator::allocator().aligned_free #define sisl_aligned_realloc sisl::AlignedAllocator::allocator().aligned_realloc @@ -273,6 +312,9 @@ class aligned_shared_ptr : public std::shared_ptr< T > { aligned_shared_ptr(T* p) : std::shared_ptr< T >(p) {} }; +template < typename T, std::size_t Alignment = 512 > +using aligned_vector = std::vector< T, AlignedTypeAllocator< T, Alignment > >; + struct io_blob; using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >; @@ -338,7 +380,7 @@ struct io_blob : public blob { */ struct io_blob_safe final : public io_blob { public: - buftag m_tag; + buftag m_tag{buftag::common}; public: io_blob_safe() = default; @@ -367,6 +409,8 @@ struct io_blob_safe final : public io_blob { other.size_ = 0; return *this; } + + void buf_alloc(size_t sz, uint32_t align_size = 512) { io_blob::buf_alloc(sz, align_size, m_tag); } }; using byte_array_impl = io_blob_safe; diff --git a/include/sisl/grpc/rpc_client.hpp b/include/sisl/grpc/rpc_client.hpp index 6e5d57c8..0d47b35f 100644 --- a/include/sisl/grpc/rpc_client.hpp +++ b/include/sisl/grpc/rpc_client.hpp @@ -163,7 +163,7 @@ class ClientRpcDataFuture : public ClientRpcDataInternal< ReqT, RespT > { folly::Promise< Result< RespT > > m_promise; }; -class GenericClientResponse { +class GenericClientResponse : public ObjLifeCounter< GenericClientResponse > { public: GenericClientResponse() = default; GenericClientResponse(grpc::ByteBuffer const& buf) : m_response_buf{buf} {} @@ -171,11 +171,10 @@ class GenericClientResponse { GenericClientResponse(GenericClientResponse&& other); GenericClientResponse& operator=(GenericClientResponse&& other); GenericClientResponse(GenericClientResponse const& other) = default; - GenericClientResponse& operator=(GenericClientResponse const& other) = default; + GenericClientResponse& operator=(GenericClientResponse const& other); ~GenericClientResponse() = default; io_blob response_blob(); - grpc::ByteBuffer const& response_buf(bool need_contiguous = true); private: grpc::ByteBuffer m_response_buf; diff --git a/include/sisl/utility/obj_life_counter.hpp b/include/sisl/utility/obj_life_counter.hpp index a37c2891..fa9237d7 100644 --- a/include/sisl/utility/obj_life_counter.hpp +++ b/include/sisl/utility/obj_life_counter.hpp @@ -145,7 +145,7 @@ struct ObjLifeCounter { s_alive.fetch_sub(1, std::memory_order_relaxed); } - ObjLifeCounter(const ObjLifeCounter& o) noexcept { s_alive.fetch_add(1, std::memory_order_relaxed); } + ObjLifeCounter(const ObjLifeCounter&) noexcept { s_alive.fetch_add(1, std::memory_order_relaxed); } static std::atomic< int64_t > s_created; static std::atomic< int64_t > s_alive; static ObjTypeWrapper< T > s_type; diff --git a/src/grpc/rpc_client.cpp b/src/grpc/rpc_client.cpp index 20589110..40d0a2fb 100644 --- a/src/grpc/rpc_client.cpp +++ b/src/grpc/rpc_client.cpp @@ -196,21 +196,23 @@ GenericClientResponse& GenericClientResponse::operator=(GenericClientResponse&& return *this; } -grpc::ByteBuffer const& GenericClientResponse::response_buf(bool need_contiguous) { - if (!need_contiguous || m_single_slice.size() || !m_response_buf.Valid()) { return m_response_buf; } - - auto status = m_response_buf.TrySingleSlice(&m_single_slice); - if (status.ok()) { return m_response_buf; } - - status = m_response_buf.DumpToSingleSlice(&m_single_slice); - RELEASE_ASSERT(status.ok(), "Failed to deserialize response: code: {}. msg: {}", - static_cast< int >(status.error_code()), status.error_message()); - - return m_response_buf; +GenericClientResponse& GenericClientResponse::operator=(GenericClientResponse const& other) { + m_response_buf = other.m_response_buf; + m_single_slice = other.m_single_slice; + return *this; } io_blob GenericClientResponse::response_blob() { - response_buf(true /* need_contiguous */); + if ((m_single_slice.size() == 0) && m_response_buf.Valid()) { + auto status = m_response_buf.TrySingleSlice(&m_single_slice); + if (!status.ok()) { + status = m_response_buf.DumpToSingleSlice(&m_single_slice); + RELEASE_ASSERT(status.ok(), "Failed to deserialize response: code: {}. msg: {}", + static_cast< int >(status.error_code()), status.error_message()); + } + m_response_buf.Clear(); // Since we dumped everything to a single slice, we don't need bytebyffer anymore + } + auto const size = uint32_cast(m_single_slice.size()); return size ? io_blob{m_single_slice.begin(), size, false /* is_aligned */} : io_blob{}; } diff --git a/src/grpc/tests/unit/client_test.cpp b/src/grpc/tests/unit/client_test.cpp index 838f1856..d3f8b209 100644 --- a/src/grpc/tests/unit/client_test.cpp +++ b/src/grpc/tests/unit/client_test.cpp @@ -20,7 +20,7 @@ using namespace ::grpc; buffer.Swap(&tmp); } -static std::string DeserializeFromBuffer(ByteBuffer const& buffer) { +[[maybe_unused]] static std::string DeserializeFromBuffer(ByteBuffer const& buffer) { std::vector< grpc::Slice > slices; (void)buffer.Dump(&slices); std::string buf; @@ -67,8 +67,7 @@ static std::string blob_to_string(io_blob const& b) { return std::string(c_charp static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) { GenericClientResponse resp1(bbuf); { - auto const& rbuf1 = resp1.response_buf(); - EXPECT_EQ(msg, DeserializeFromBuffer(rbuf1)); + // EXPECT_EQ(msg, DeserializeFromBuffer(rbuf1)); EXPECT_EQ(msg, blob_to_string(resp1.response_blob())); } @@ -76,9 +75,6 @@ static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) { GenericClientResponse resp2(std::move(resp1)); { EXPECT_EQ(msg, blob_to_string(resp2.response_blob())); - EXPECT_TRUE(resp2.response_buf().Valid()); - - EXPECT_FALSE(resp1.response_buf().Valid()); auto blb1 = resp1.response_blob(); EXPECT_EQ(blb1.size(), 0); EXPECT_EQ(blb1.cbytes(), nullptr); @@ -89,9 +85,6 @@ static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) { resp3 = std::move(resp2); { EXPECT_EQ(msg, blob_to_string(resp3.response_blob())); - EXPECT_TRUE(resp3.response_buf().Valid()); - - EXPECT_FALSE(resp2.response_buf().Valid()); auto blb2 = resp2.response_blob(); EXPECT_EQ(blb2.size(), 0); EXPECT_EQ(blb2.cbytes(), nullptr); @@ -101,9 +94,6 @@ static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) { { GenericClientResponse resp4(resp3); EXPECT_EQ(msg, blob_to_string(resp4.response_blob())); - EXPECT_TRUE(resp4.response_buf().Valid()); - - EXPECT_TRUE(resp3.response_buf().Valid()); EXPECT_EQ(msg, blob_to_string(resp3.response_blob())); } @@ -112,9 +102,6 @@ static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) { GenericClientResponse resp5; resp5 = resp3; EXPECT_EQ(msg, blob_to_string(resp5.response_blob())); - EXPECT_TRUE(resp5.response_buf().Valid()); - - EXPECT_TRUE(resp3.response_buf().Valid()); EXPECT_EQ(msg, blob_to_string(resp3.response_blob())); } }