Skip to content

Commit

Permalink
Make getVector return a fully functional vector
Browse files Browse the repository at this point in the history
  • Loading branch information
mkrzewic committed Dec 13, 2018
1 parent 5e24fdb commit 8281802
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 2 deletions.
125 changes: 123 additions & 2 deletions fairmq/MemoryResourceTools.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
namespace fair {
namespace mq {

using BytePmrAllocator = pmr::polymorphic_allocator<fair::mq::byte>;

//_________________________________________________________________________________________________
// return the message associated with the container or throw if it is not possible
template<typename ContainerT>
Expand Down Expand Up @@ -60,6 +58,129 @@ FairMQMessagePtr getMessage(ContainerT &&container_, FairMQMemoryResource *targe
container.data(),
containerSizeBytes);
return message;
}

//__________________________________________________________________________________________________
/// This memory resource only watches, does not allocate/deallocate anything.
/// Ownership of hte message is taken. Meant to be used for transparent data adoption in containers.
/// In combination with the SpectatorAllocator this is an alternative to using span, as raw memory
/// (e.g. an existing buffer message) will be accessible with appropriate container.
class MessageResource : public FairMQMemoryResource
{

public:
MessageResource() noexcept = delete;
MessageResource(const MessageResource&) noexcept = default;
MessageResource(MessageResource&&) noexcept = default;
MessageResource& operator=(const MessageResource&) = default;
MessageResource& operator=(MessageResource&&) = default;
MessageResource(FairMQMessagePtr message)
: mUpstream{ message->GetTransport()->GetMemoryResource() },
mMessageSize{ message->GetSize() },
mMessageData{ mUpstream ? mUpstream->setMessage(std::move(message))
: throw std::runtime_error("MessageResource::MessageResource upstream is nullptr") }
{
}
FairMQMessagePtr getMessage(void* p) override { return mUpstream->getMessage(p); }
void* setMessage(FairMQMessagePtr message) override { return mUpstream->setMessage(std::move(message)); }
FairMQTransportFactory* getTransportFactory() noexcept override { return nullptr; }
size_t getNumberOfMessages() const noexcept override { return mMessageData ? 1 : 0; }

protected:
FairMQMemoryResource* mUpstream{ nullptr };
size_t mMessageSize{ 0 };
void* mMessageData{ nullptr };
bool initialImport{ true };

void* do_allocate(std::size_t bytes, std::size_t alignment) override
{
if (initialImport) {
if (bytes > mMessageSize) {
throw std::bad_alloc();
}
initialImport = false;
return mMessageData;
} else {
return mUpstream->allocate(bytes, alignment);
}
}
void do_deallocate(void* p, std::size_t bytes, std::size_t alignment) override
{
mUpstream->deallocate(p, bytes, alignment);
return;
}
bool do_is_equal(const memory_resource& other) const noexcept override
{
return *mUpstream == *dynamic_cast<const FairMQMemoryResource*>(&other);
}
};

//__________________________________________________________________________________________________
/// This allocator has a pmr-like interface, but keeps the unique MessageResource as internal state,
/// allowing full resource (associated message) management internally without any global state.
template <typename T>
class OwningMessageSpectatorAllocator
{
public:
using value_type = T;

MessageResource mResource;

OwningMessageSpectatorAllocator() noexcept = default;
OwningMessageSpectatorAllocator(const OwningMessageSpectatorAllocator&) noexcept = default;
OwningMessageSpectatorAllocator(OwningMessageSpectatorAllocator&&) noexcept = default;
OwningMessageSpectatorAllocator(MessageResource&& resource) noexcept : mResource{ resource } {}

template <class U>
OwningMessageSpectatorAllocator(const OwningMessageSpectatorAllocator<U>& other) noexcept : mResource(other.mResource)
{
}

OwningMessageSpectatorAllocator& operator=(const OwningMessageSpectatorAllocator& other)
{
mResource = other.mResource;
return *this;
}

OwningMessageSpectatorAllocator select_on_container_copy_construction() const
{
return OwningMessageSpectatorAllocator();
}

boost::container::pmr::memory_resource* resource() { return &mResource; }

// skip default construction of empty elements
// this is important for two reasons: one: it allows us to adopt an existing buffer (e.g. incoming message) and
// quickly construct large vectors while skipping the element initialization.
template <class U>
void construct(U*)
{
}

// dont try to call destructors, makes no sense since resource is managed externally AND allowed
// types cannot have side effects
template <typename U>
void destroy(U*)
{
}

T* allocate(size_t size) { return reinterpret_cast<T*>(mResource.allocate(size * sizeof(T), 0)); }
void deallocate(T* ptr, size_t size)
{
mResource.deallocate(const_cast<typename std::remove_cv<T>::type*>(ptr), size);
}
};

//__________________________________________________________________________________________________
//__________________________________________________________________________________________________
/// Return a std::vector spanned over the contents of the message, takes ownership of the message
template <typename ElemT>
std::vector<ElemT,OwningMessageSpectatorAllocator<ElemT>>
getVector(size_t nelem, FairMQMessagePtr message)
{
static_assert(std::is_trivially_destructible<ElemT>::value);
return std::vector<ElemT, OwningMessageSpectatorAllocator<ElemT>>(
nelem, OwningMessageSpectatorAllocator<ElemT>(MessageResource{ std::move(message) }));
};

} /* namespace mq */
Expand Down
19 changes: 19 additions & 0 deletions test/memory_resources/_memory_resources.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,23 @@ TEST(MemoryResources, getMessage_test)
EXPECT_TRUE(messageArray[0] == 4 && messageArray[1] == 5 && messageArray[2] == 6);
}

TEST(MemoryResources, getVector)
{
std::vector<int,polymorphic_allocator<int>> v(polymorphic_allocator<int>{allocZMQ});
v.emplace_back(1);
v.emplace_back(2);
auto message = getMessage(std::move(v));
void* olddata = message->GetData();

auto t = getVector<int>(2,std::move(message));
EXPECT_TRUE(t.size()==2);
t.emplace_back(3);
EXPECT_TRUE(t.size()==3);
EXPECT_TRUE(t[2]==3);

auto m = getMessage(std::move(t));
void* newdata = m->GetData();
EXPECT_TRUE(newdata!=olddata);
}

} // namespace

0 comments on commit 8281802

Please sign in to comment.