Skip to content

Commit

Permalink
Merge pull request #43338 from fwyzard/improve_GenericConsumer
Browse files Browse the repository at this point in the history
Extend the `GenericConsumer` to consume individual products
  • Loading branch information
cmsbuild authored Nov 28, 2023
2 parents e548244 + eec344d commit 53708a6
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 54 deletions.
247 changes: 193 additions & 54 deletions FWCore/Modules/src/GenericConsumer.cc
Original file line number Diff line number Diff line change
@@ -1,23 +1,120 @@
/*
* This plugin depends on all the event, lumi and run products produced by the modules listed in its configuration:
* - eventProducts: depend on the event products from these modules
* - lumiProducts: depend on the lumi products from these modules
* - runProducts: depend on the run products from these modules
* This EDAnalyzer will depend on all the event, lumi, run or process products declared by its configuration, both
* transient and persistent.
*
* Use "*" to depend on all the products in a given branch.
* The dependencies can be specified either as module labels (e.g. "<module label>") or as branch names (e.g.
* "<product type>_<module label>_<instance name>_<process name>").
* If a module label is used, no underscore ("_") must be present; this module will depend all the products produced
* by that module, including those produced by the Transformer functionality (such as the implicitly copied-to-host
* products in case of Alpaka-based modules).
* If a branch name is used, all four fields must be present, separated by underscores; this module will depend only
* on the matching product(s).
*
* Glob expressions ("?" and "*") are supported in module labels and within the individual fields of branch names,
* similar to an OutputModule's "keep" statements.
* Use "*" to depend on all products of a given category.
*
* For example, in the case of Alpaka-based modules running on a device, using
*
* eventProducts = cms.untracked.vstring( "module" )
*
* will cause "module" to run, along with automatic copy of its device products to the host.
* To avoid the copy, the DeviceProduct branch can be specified explicitly with
*
* eventProducts = cms.untracked.vstring( "*DeviceProduct_module_*_*" )
*
* .
*/

#include <algorithm>
#include <string>
#include <regex>
#include <vector>

#include <boost/algorithm/string/replace.hpp>

#include "DataFormats/Provenance/interface/BranchDescription.h"
#include "FWCore/Framework/interface/global/EDAnalyzer.h"
#include "FWCore/Framework/interface/MakerMacros.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterDescriptionNode.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"

namespace {
struct ProductBranch {
public:
ProductBranch(std::string const& label) {
static const char kSeparator = '_';
static const char kWildcard = '*';
static const std::regex kAny{".*"};

// wildcard
if (label == kWildcard) {
type_ = kAny;
moduleLabel_ = kAny;
productInstanceName_ = kAny;
processName_ = kAny;
return;
}

int fields = std::count(label.begin(), label.end(), kSeparator) + 1;
if (fields == 1) {
// convert the module label into a regular expression
type_ = kAny;
moduleLabel_ = glob_to_regex(label);
productInstanceName_ = kAny;
processName_ = kAny;
} else if (fields == 4) {
// split the branch name into <product type>_<module label>_<instance name>_<process name>
// and convert the glob expressions into regular expressions
size_t first = 0, last = 0;
last = label.find(kSeparator, first);
type_ = glob_to_regex(label.substr(first, last - first));
first = last + 1;
last = label.find(kSeparator, first);
moduleLabel_ = glob_to_regex(label.substr(first, last - first));
first = last + 1;
last = label.find(kSeparator, first);
productInstanceName_ = glob_to_regex(label.substr(first, last - first));
first = last + 1;
last = label.find(kSeparator, first);
processName_ = glob_to_regex(label.substr(first, last - first));
} else {
// invalid input
throw edm::Exception(edm::errors::Configuration) << "Invalid module label or branch name: \"" << label << "\"";
}
}

bool match(edm::BranchDescription const& branch) const {
return (std::regex_match(branch.friendlyClassName(), type_) and
std::regex_match(branch.moduleLabel(), moduleLabel_) and
std::regex_match(branch.productInstanceName(), productInstanceName_) and
std::regex_match(branch.processName(), processName_));
}

private:
static std::regex glob_to_regex(std::string pattern) {
boost::replace_all(pattern, "*", ".*");
boost::replace_all(pattern, "?", ".");
return std::regex(pattern);
}

std::regex type_;
std::regex moduleLabel_;
std::regex productInstanceName_;
std::regex processName_;
};

std::vector<ProductBranch> make_patterns(std::vector<std::string> const& labels) {
std::vector<ProductBranch> patterns;
patterns.reserve(labels.size());
for (auto const& label : labels)
patterns.emplace_back(label);
return patterns;
}
} // namespace

namespace edm {
class GenericConsumer : public edm::global::EDAnalyzer<> {
public:
Expand All @@ -29,59 +126,89 @@ namespace edm {
static void fillDescriptions(ConfigurationDescriptions& descriptions);

private:
std::vector<std::string> eventLabels_;
std::vector<std::string> lumiLabels_;
std::vector<std::string> runLabels_;
std::vector<std::string> processLabels_;
std::vector<ProductBranch> eventProducts_;
std::vector<ProductBranch> lumiProducts_;
std::vector<ProductBranch> runProducts_;
std::vector<ProductBranch> processProducts_;
std::string label_;
bool verbose_;
};

GenericConsumer::GenericConsumer(ParameterSet const& config)
: eventLabels_(config.getUntrackedParameter<std::vector<std::string>>("eventProducts")),
lumiLabels_(config.getUntrackedParameter<std::vector<std::string>>("lumiProducts")),
runLabels_(config.getUntrackedParameter<std::vector<std::string>>("runProducts")),
processLabels_(config.getUntrackedParameter<std::vector<std::string>>("processProducts")) {
std::sort(eventLabels_.begin(), eventLabels_.end());
std::sort(lumiLabels_.begin(), lumiLabels_.end());
std::sort(runLabels_.begin(), runLabels_.end());
std::sort(processLabels_.begin(), processLabels_.end());

: eventProducts_(make_patterns(config.getUntrackedParameter<std::vector<std::string>>("eventProducts"))),
lumiProducts_(make_patterns(config.getUntrackedParameter<std::vector<std::string>>("lumiProducts"))),
runProducts_(make_patterns(config.getUntrackedParameter<std::vector<std::string>>("runProducts"))),
processProducts_(make_patterns(config.getUntrackedParameter<std::vector<std::string>>("processProducts"))),
label_(config.getParameter<std::string>("@module_label")),
verbose_(config.getUntrackedParameter<bool>("verbose")) {
callWhenNewProductsRegistered([this](edm::BranchDescription const& branch) {
static const std::string kWildcard("*");
static const std::string kPathStatus("edm::PathStatus");
static const std::string kEndPathStatus("edm::EndPathStatus");

switch (branch.branchType()) {
case InEvent:
if (std::binary_search(eventLabels_.begin(), eventLabels_.end(), branch.moduleLabel()) or
(std::binary_search(eventLabels_.begin(), eventLabels_.end(), kWildcard) and
branch.className() != kPathStatus and branch.className() != kEndPathStatus))
this->consumes(edm::TypeToGet{branch.unwrappedTypeID(), PRODUCT_TYPE},
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()});
if (branch.className() == kPathStatus or branch.className() == kEndPathStatus)
return;
for (auto const& label : eventProducts_)
if (label.match(branch)) {
this->consumes(edm::TypeToGet{branch.unwrappedTypeID(), PRODUCT_TYPE},
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()});
if (verbose_) {
edm::LogVerbatim("GenericConsumer")
<< label_ << " consumes Event product " << branch.friendlyClassName() << '_' << branch.moduleLabel()
<< '_' << branch.productInstanceName() << '_' << branch.processName() << '\n';
}
break;
}
break;

case InLumi:
if (std::binary_search(lumiLabels_.begin(), lumiLabels_.end(), branch.moduleLabel()) or
std::binary_search(lumiLabels_.begin(), lumiLabels_.end(), kWildcard))
this->consumes<edm::InLumi>(
edm::TypeToGet{branch.unwrappedTypeID(), PRODUCT_TYPE},
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()});
for (auto const& label : lumiProducts_)
if (label.match(branch)) {
this->consumes<edm::InLumi>(
edm::TypeToGet{branch.unwrappedTypeID(), PRODUCT_TYPE},
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()});
if (verbose_) {
edm::LogVerbatim("GenericConsumer")
<< label_ << " consumes LuminosityBlock product " << branch.friendlyClassName() << '_'
<< branch.moduleLabel() << '_' << branch.productInstanceName() << '_' << branch.processName()
<< '\n';
}
break;
}
break;

case InRun:
if (std::binary_search(runLabels_.begin(), runLabels_.end(), branch.moduleLabel()) or
std::binary_search(runLabels_.begin(), runLabels_.end(), kWildcard))
this->consumes<edm::InRun>(
edm::TypeToGet{branch.unwrappedTypeID(), PRODUCT_TYPE},
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()});
for (auto const& label : runProducts_)
if (label.match(branch)) {
this->consumes<edm::InRun>(
edm::TypeToGet{branch.unwrappedTypeID(), PRODUCT_TYPE},
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()});
if (verbose_) {
edm::LogVerbatim("GenericConsumer")
<< label_ << " consumes Run product " << branch.friendlyClassName() << '_' << branch.moduleLabel()
<< '_' << branch.productInstanceName() << '_' << branch.processName() << '\n';
}
break;
}
break;

case InProcess:
if (std::binary_search(processLabels_.begin(), processLabels_.end(), branch.moduleLabel()) or
std::binary_search(processLabels_.begin(), processLabels_.end(), kWildcard))
this->consumes<edm::InProcess>(
edm::TypeToGet{branch.unwrappedTypeID(), PRODUCT_TYPE},
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()});
for (auto const& label : processProducts_)
if (label.match(branch)) {
this->consumes<edm::InProcess>(
edm::TypeToGet{branch.unwrappedTypeID(), PRODUCT_TYPE},
edm::InputTag{branch.moduleLabel(), branch.productInstanceName(), branch.processName()});
if (verbose_) {
edm::LogVerbatim("GenericConsumer")
<< label_ << " consumes Process product " << branch.friendlyClassName() << '_'
<< branch.moduleLabel() << '_' << branch.productInstanceName() << '_' << branch.processName()
<< '\n';
}
break;
}
break;

default:
throw Exception(errors::LogicError)
<< "Unexpected branch type " << branch.branchType() << "\nPlease contact a Framework developer\n";
Expand All @@ -91,30 +218,42 @@ namespace edm {

void GenericConsumer::fillDescriptions(ConfigurationDescriptions& descriptions) {
descriptions.setComment(
"This plugin depends on all the event, lumi and run products "
"produced by the modules listed in its configuration.");
R"(This EDAnalyzer will depend on all the event, lumi, run or process products declared by its configuration, both transient and persistent.
The dependencies can be specified either as module labels (e.g. "<module label>") or as branch names (e.g. "<product type>_<module label>_<instance name>_<process name>").
If a module label is used, no underscore ("_") must be present; this module will depend all the products produced by that module, including those produced by the Transformer functionality (such as the implicitly copied-to-host products in case of Alpaka-based modules).
If a branch name is used, all four fields must be present, separated by underscores; this module will depend only on the matching product(s).
Glob expressions ("?" and "*") are supported in module labels and within the individual fields of branch names, similar to an OutputModule's "keep" statements.
Use "*" to depend on all products of a given category.
For example, in the case of Alpaka-based modules running on a device, using
eventProducts = cms.untracked.vstring( "module" )
will cause "module" to run, along with automatic copy of its device products to the host.
To avoid the copy, the DeviceProduct branch can be specified explicitly with
eventProducts = cms.untracked.vstring( "*DeviceProduct_module_*_*" )
.)");

ParameterSetDescription desc;
desc.addUntracked<std::vector<std::string>>("eventProducts", {})
->setComment(
"List of modules whose event products this module will depend on. "
"Use \"*\" to depend on all event products.");
->setComment("List of modules or branches whose event products this module will depend on.");
desc.addUntracked<std::vector<std::string>>("lumiProducts", {})
->setComment(
"List of modules whose lumi products this module will depend on. "
"Use \"*\" to depend on all lumi products.");
->setComment("List of modules or branches whose lumi products this module will depend on.");
desc.addUntracked<std::vector<std::string>>("runProducts", {})
->setComment(
"List of modules whose run products this module will depend on. "
"Use \"*\" to depend on all run products.");
->setComment("List of modules or branches whose run products this module will depend on.");
desc.addUntracked<std::vector<std::string>>("processProducts", {})
->setComment(
"List of modules whose process products this module will depend on. "
"Use \"*\" to depend on all process products.");
->setComment("List of modules or branches whose process products this module will depend on.");
desc.addUntracked<bool>("verbose", false)
->setComment("Print the actual branch names for which the dependency are declared.");
descriptions.addWithDefaultLabel(desc);
}

} // namespace edm

#include "FWCore/Framework/interface/MakerMacros.h"
using edm::GenericConsumer;
DEFINE_FWK_MODULE(GenericConsumer);
2 changes: 2 additions & 0 deletions FWCore/Modules/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

<test name="TestFWCoreModulesEmptySourceRunsAndLumis" command="cmsRun ${LOCALTOP}/src/FWCore/Modules/test/emptysource_RunsAndLumis_cfg.py"/>

<test name="testGenericConsumer" command="${LOCALTOP}/src/FWCore/Modules/test/testGenericConsumer.sh"/>

<bin file="test_catch2_*.cc" name="TestFWCoreModulesTP">
<use name="FWCore/TestProcessor"/>
<use name="catch2"/>
Expand Down
49 changes: 49 additions & 0 deletions FWCore/Modules/test/testGenericConsumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import FWCore.ParameterSet.Config as cms

process = cms.Process("TEST")

process.load("FWCore.Framework.test.cmsExceptionsFatal_cff")
process.load("FWCore.MessageService.MessageLogger_cfi")
process.MessageLogger.cerr.INFO.limit = 10000000

process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(5)
)

process.source = cms.Source("EmptySource")

# This should run because it's consumed directly by process.consumer
process.thing = cms.EDProducer("ThingProducer")

# This should not run, because it's mot consumed by any other module
process.notRunningThing = cms.EDProducer("ThingProducer")

# This should run because it's consumed indirectly by process.consumer, via process.otherThing
process.anotherThing = cms.EDProducer("ThingProducer")

# This should run because it's consumed directly by process.consumer
process.otherThing = cms.EDProducer("OtherThingProducer",
thingTag = cms.InputTag('anotherThing'),
transient = cms.untracked.bool(True)
)

# Make the various modules available for unscheduled execution
process.task = cms.Task(
process.thing,
process.anotherThing,
process.otherThing,
process.notRunningThing
)

# Consumes the products of process.thing and process.otherThing, causing them to run
process.consumer = cms.EDAnalyzer("GenericConsumer",
eventProducts = cms.untracked.vstring("*_thing_*_*", "otherThing"),
verbose = cms.untracked.bool(True)
)

# Explicilty schedule process.consumer, causing it to run along with its dependencies, provided by process.task
process.path = cms.Path(process.consumer, process.task)

# Print the summary of all modules that were run
# The content of the summary is tested by testGenericConsumer.sh
process.options.wantSummary = True
6 changes: 6 additions & 0 deletions FWCore/Modules/test/testGenericConsumer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#! /bin/bash

[ "${LOCALTOP}" ] || LOCALTOP=$CMSSW_BASE

cmsRun ${LOCALTOP}/src/FWCore/Modules/test/testGenericConsumer.py 2>&1 | grep '^TrigReport' | \
awk 'BEGIN { KEEP = 0; } /Module Summary/ { KEEP = 1; } { if (! KEEP) next; print; } /\<thing\>|\<otherThing\>|\<anotherThing\>/ { if ($3 == 0) exit 1; } /\<notRunningThing\>/ { if ($3 != 0) exit 1; }'

0 comments on commit 53708a6

Please sign in to comment.