Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPL I/O API: Supporting spectator vector as return for InputRecord::get<std::vector<T>> #2697

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions Framework/Core/include/Framework/InputRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ namespace framework

struct InputSpec;

template <typename T>
using SpectatorMemoryResource = o2::pmr::SpectatorMemoryResource<T>;

/// @class InputRecord
/// @brief The input API of the Data Processing Layer
/// This class holds the inputs which are valid for processing. The user can get an
Expand Down Expand Up @@ -313,11 +316,11 @@ class InputRecord
auto header = o2::header::get<const DataHeader*>(ref.header);
auto method = header->payloadSerializationMethod;
if (method == o2::header::gSerializationMethodNone) {
// TODO: construct a vector spectator
// this is a quick solution now which makes a copy of the plain vector data
auto* start = reinterpret_cast<typename T::value_type const*>(ref.payload);
auto* end = start + header->payloadSize / sizeof(typename T::value_type);
T result(start, end);
std::unique_ptr<char, Deleter<char>> resourcebuffer((char*)ref.payload, Deleter<char>(false));
std::unique_ptr<boost::container::pmr::memory_resource> resource(new SpectatorMemoryResource<decltype(resourcebuffer)>(std::move(resourcebuffer), header->payloadSize));
const_cast<InputRecord*>(this)->mSpectatorResources.push_back(std::unique_ptr<boost::container::pmr::memory_resource>(resource.release()));
size_t size = header->payloadSize / sizeof(typename T::value_type);
std::vector<typename T::value_type const, o2::pmr::SpectatorAllocator<typename T::value_type const>> result(size, o2::pmr::SpectatorAllocator<typename T::value_type const>(mSpectatorResources.back().get()));
return result;
} else if (method == o2::header::gSerializationMethodROOT) {
/// substitution for container of non-messageable objects with ROOT dictionary
Expand All @@ -330,10 +333,10 @@ class InputRecord
// we expect the unique_ptr to hold an object, exception should have been thrown
// otherwise
auto object = DataRefUtils::as<NonConstT>(ref);
// need to swap the content of the deserialized container to a local variable to force return
// value optimization
T container;
std::swap(const_cast<NonConstT&>(container), *object);
size_t nElements = object->size();
std::unique_ptr<boost::container::pmr::memory_resource> resource(new SpectatorMemoryResource<decltype(object)>(std::move(object)));
const_cast<InputRecord*>(this)->mSpectatorResources.push_back(std::unique_ptr<boost::container::pmr::memory_resource>(resource.release()));
std::vector<typename T::value_type const, o2::pmr::SpectatorAllocator<typename T::value_type const>> container(nElements, o2::pmr::SpectatorAllocator<typename T::value_type const>(mSpectatorResources.back().get()));
return container;
} else {
throw std::runtime_error("No supported conversion function for ROOT serialized message");
Expand Down Expand Up @@ -639,6 +642,7 @@ class InputRecord
private:
std::vector<InputRoute> const& mInputsSchema;
InputSpan mSpan;
std::vector<std::unique_ptr<boost::container::pmr::memory_resource>> mSpectatorResources;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should ask that FairMQMessage becomes a "spectator resource" itself, so that we do not need to have separate accounting of those. @rbx @mkrzewic what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind the FairMQ message does not know the type...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been playing with some ideas at FairMQLevel (see e.g. FairRootGroup/FairMQ#110). btw, the resource does not need to know the type, it is meant to allocate bytes only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean not only, but bytes is enough :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed resourcebuffer was just "char".

};

} // namespace framework
Expand Down
5 changes: 5 additions & 0 deletions Framework/Core/test/test_DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ DataProcessorSpec getSinkSpec()
// forward the read-only span on a different route
pc.outputs().snapshot(Output{"TST", "MSGABLVECTORCPY", 0, Lifetime::Timeframe}, object12);

// extract the trivially copyable vector by std::vector object which will return vector
// with special allocator and the underlting pointer is the original input data
auto vector12 = pc.inputs().get<std::vector<o2::test::TriviallyCopyable>>("input12");
ASSERT_ERROR((object12.data() == vector12.data()) && (object12.size() == vector12.size()));

LOG(INFO) << "extracting TNamed object from input13";
auto object13 = pc.inputs().get<TNamed*>("input13");
ASSERT_ERROR(strcmp(object13->GetName(), "a_name") == 0);
Expand Down