Skip to content

Commit

Permalink
Significant refactoring of dependencies in the Replication System's r…
Browse files Browse the repository at this point in the history
…equests

The dependency has been reversed to make the request classes depend on
the Controller. In the previous version of the code, the Controller was
the dependency "hub" by knowing about all possible request classes.
The change fixes the original design mistake in the Replication Framework.

The Controller class is no longer required to instantiate the requests.
The instantiation is now done via each request's "create::" factory method
that has been renamed into "::createAndStart" to reflect what the method actually does.
The very same factory method is also used to start request immediatelly
upon the instantiation. Note that in the new code each request object
self-registeres (and unregisteres) itself with the Controller, which
allows the Controller to keep a track of all active (started and not yet
finished) requests.

The management request classes that are meant for stopping o probing the worker-side
requests have been redesigned. The complex template definitions have been replaced
with simple classes.

The older version of the request status class was replaced with the simpler
status probe (explained in the previous paragraph) and a new request "tracking"
message that is used internally by the corresponding requests to track progress
of the requests at workers.

Overall, all these changes have reduced the amnount of the boilerplate code
that is needed for adding new request types to the Framework.
  • Loading branch information
iagaponenko committed Dec 5, 2024
1 parent 6e44e0d commit 4dadd4a
Show file tree
Hide file tree
Showing 123 changed files with 1,959 additions and 4,711 deletions.
44 changes: 22 additions & 22 deletions src/replica/apps/AdminApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <vector>

// Qserv headers
#include "replica/config/Configuration.h"
#include "replica/contr/Controller.h"
#include "replica/requests/RequestTracker.h"
#include "replica/util/Performance.h"
Expand Down Expand Up @@ -57,7 +58,8 @@ AdminApp::AdminApp(int argc, char* argv[])
::enableServiceProvider) {
// Configure the command line parser

parser().commands("operation", {"STATUS", "SUSPEND", "RESUME", "REQUESTS", "DRAIN"}, _operation)
parser().commands("operation", {"STATUS", "SUSPEND", "RESUME", "REQUESTS", "DRAIN", "RECONFIG"},
_operation)
.flag("all-workers",
"The flag for selecting all workers regardless of their status (DISABLED or READ-ONLY).",
_allWorkers)
Expand Down Expand Up @@ -85,6 +87,8 @@ AdminApp::AdminApp(int argc, char* argv[])

parser().command("DRAIN").description(
"Cancel the in-progress (if any) requests on all workers, then empty all queues.");

parser().command("RECONFIG").description("Re-configure the worker services.");
}

int AdminApp::runImpl() {
Expand All @@ -93,51 +97,48 @@ int AdminApp::runImpl() {
// Launch requests against a collection of workers

CommonRequestTracker<ServiceManagementRequestBase> tracker(cout, _progressReport, _errorReport);

auto const workerNames =
_allWorkers ? serviceProvider()->config()->allWorkers() : serviceProvider()->config()->workers();

for (auto&& workerName : workerNames) {
if (_operation == "STATUS") {
tracker.add(controller->statusOfWorkerService(
workerName, [&tracker](ServiceStatusRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));

tracker.add(ServiceStatusRequest::createAndStart(
controller, workerName,
[&tracker](ServiceStatusRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));
} else if (_operation == "SUSPEND") {
tracker.add(controller->suspendWorkerService(
workerName,
tracker.add(ServiceSuspendRequest::createAndStart(
controller, workerName,
[&tracker](ServiceSuspendRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));

} else if (_operation == "RESUME") {
tracker.add(controller->resumeWorkerService(
workerName, [&tracker](ServiceResumeRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));

tracker.add(ServiceResumeRequest::createAndStart(
controller, workerName,
[&tracker](ServiceResumeRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));
} else if (_operation == "REQUESTS") {
tracker.add(controller->requestsOfWorkerService(
workerName,
tracker.add(ServiceRequestsRequest::createAndStart(
controller, workerName,
[&tracker](ServiceRequestsRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));

} else if (_operation == "DRAIN") {
tracker.add(controller->drainWorkerService(
workerName, [&tracker](ServiceDrainRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));

tracker.add(ServiceDrainRequest::createAndStart(
controller, workerName,
[&tracker](ServiceDrainRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));
} else if (_operation == "RECONFIG") {
tracker.add(ServiceReconfigRequest::createAndStart(
controller, workerName,
[&tracker](ServiceReconfigRequest::Ptr const& ptr) { tracker.onFinish(ptr); }));
} else {
throw logic_error("AdminApp::" + string(__func__) + " unsupported operation: " + _operation);
}
}

// Wait before all request are finished

tracker.track();

// Analyze and display results

vector<string> workerName;
vector<string> startedSecondsAgo;
vector<string> state;
vector<string> numNewRequests;
vector<string> numInProgressRequests;
vector<string> numFinishedRequests;

for (auto const& ptr : tracker.requests) {
workerName.push_back(ptr->workerName());
if ((ptr->state() == Request::State::FINISHED) &&
Expand Down Expand Up @@ -174,7 +175,6 @@ int AdminApp::runImpl() {
vector<string> requestType;
vector<string> queue;
vector<uint32_t> priority;

auto analyzeRemoteRequestInfo = [&](string const& workerName_, string const& queueName,
ProtocolServiceResponseInfo const& info) {
workerName.push_back(workerName_);
Expand Down
Loading

0 comments on commit 4dadd4a

Please sign in to comment.