Skip to content

Commit

Permalink
DPL: Gather TF status if complete/incomplete/empty, to be sent as mes…
Browse files Browse the repository at this point in the history
…sage
  • Loading branch information
davidrohr committed Sep 11, 2024
1 parent 158e6f8 commit 29072b4
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions Framework/Core/src/ExternalFairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,13 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
{
// Check for missing data.
static std::vector<bool> present;
static std::vector<bool> ignored;
static std::vector<size_t> dataSizes;
static std::vector<bool> showSize;
present.clear();
present.resize(routes.size(), false);
ignored.clear();
ignored.resize(routes.size(), false);
dataSizes.clear();
dataSizes.resize(routes.size(), 0);
showSize.clear();
Expand All @@ -260,7 +263,7 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
for (size_t pi = 0; pi < present.size(); ++pi) {
auto& spec = routes[pi].matcher;
if (DataSpecUtils::asConcreteDataTypeMatcher(spec).description == header::DataDescription("DISTSUBTIMEFRAME")) {
present[pi] = true;
ignored[pi] = true;
continue;
}
if (routes[pi].timeslice == 0) {
Expand All @@ -269,6 +272,7 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
}

size_t foundDataSpecs = 0;
bool skipAsAllFound = false;
for (int msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
bool allFound = true;
int addToSize = -1;
Expand Down Expand Up @@ -300,24 +304,24 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
dph = o2::header::get<DataProcessingHeader*>(parts.At(msgidx)->GetData());
for (size_t pi = 0; pi < present.size(); ++pi) {
if (routes[pi].timeslice != (dph->startTime % routes[pi].maxTimeslices)) {
present[pi] = true;
ignored[pi] = true;
}
}
}
for (size_t pi = 0; pi < present.size(); ++pi) {
if (present[pi] && !doPrintSizes) {
if ((present[pi] || ignored[pi]) && !doPrintSizes) {
continue;
}
// Consider uninvolved pipelines as present.
if (routes[pi].timeslice != (dph->startTime % routes[pi].maxTimeslices)) {
present[pi] = true;
ignored[pi] = true;
continue;
}
allFound = false;
auto& spec = routes[pi].matcher;
OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
if (DataSpecUtils::match(spec, query)) {
if (!present[pi]) {
if (!present[pi] && !ignored[pi]) {
++foundDataSpecs;
present[pi] = true;
showSize[pi] = true;
Expand All @@ -336,16 +340,27 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
// Skip the rest of the block of messages. We subtract 2 because above we increment by 2.
msgidx = msgidxLast - 2;
if (allFound && !doPrintSizes) {
return;
skipAsAllFound = true;
break;
}
}

bool emptyTf = true;
for (size_t pi = 0; pi < present.size(); ++pi) {
if (!present[pi]) {
if (present[pi] && !ignored[pi]) {
emptyTf = false;
}
if (!present[pi] && !ignored[pi]) {
showSize[pi] = true;
unmatchedDescriptions.push_back(pi);
}
}
int timeframeCompleteness = emptyTf ? 0 : (unmatchedDescriptions.size() ? -1 : 1);
(void)timeframeCompleteness; // To be sent as message

if (skipAsAllFound && !doPrintSizes) {
return;
}

if (firstDH && doPrintSizes) {
std::string sizes = "";
Expand Down

0 comments on commit 29072b4

Please sign in to comment.