Skip to content

Commit

Permalink
[irods#7476] Prefer good replicas for write votes
Browse files Browse the repository at this point in the history
This commit changes the "preferred" replica status for votes on write
operations from stale to good.

One consequence of this decision is allowing replication operations to
to allow targeting good replicas for update. Instead of actually overwriting
the data in the good replica, fileModified is triggered directly to invoke
any policy defined by coordinating resources.

Clients can now request good replicas to be overwritten provided that the
source replica is good. Clients can now request that a replica overwrite
itself provided the source and destination replicas are good. In both cases,
no data movement occurs.

When the source and destination replicas are the same replica, the
replication operation is a no-op, although fileModified will be
triggered so that any configured policy will be in effect. If the
source replica is stale, it cannot be used to update any other
replicas regardless.
  • Loading branch information
alanking committed Feb 13, 2024
1 parent 3d7f487 commit 89d65b2
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ namespace irods {
copyKeyVal( ( keyValPair_t* )&object.cond_input(), &dataObjInp.condInput );
addKeyVal( &dataObjInp.condInput, RESC_HIER_STR_KW, child_.c_str() );
addKeyVal( &dataObjInp.condInput, DEST_RESC_HIER_STR_KW, hierarchy_string.c_str() );
addKeyVal( &dataObjInp.condInput, RESC_NAME_KW, root_resource_.c_str() );
addKeyVal( &dataObjInp.condInput, DEST_RESC_NAME_KW, root_resource_.c_str() );
addKeyVal( &dataObjInp.condInput, IN_PDMO_KW, sub_hier.c_str() );

try {
Expand Down
204 changes: 165 additions & 39 deletions server/api/src/rsDataObjRepl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "irods/dataObjPut.h"
#include "irods/dataObjRepl.h"
#include "irods/dataObjTrim.h"
#include "irods/fileDriver.hpp"
#include "irods/fileStageToCache.h"
#include "irods/fileSyncToArch.h"
#include "irods/getRemoteZoneResc.h"
Expand Down Expand Up @@ -77,6 +78,7 @@ namespace
namespace ir = irods::experimental::replica;
namespace irv = irods::experimental::resource::voting;
namespace rst = irods::replica_state_table;
using log_api = irods::experimental::log::api;

auto apply_static_peps(RsComm& _comm, l1desc& _l1desc, const int _operation_status) -> void
{
Expand Down Expand Up @@ -690,9 +692,11 @@ namespace
auto source_cond_input = irods::experimental::make_key_value_proxy(source_inp.condInput);
auto source_obj = resolve_hierarchy_and_get_data_object_info(_comm, source_inp, irods::OPEN_OPERATION);

const auto* source_hierarchy = source_cond_input.at(RESC_HIER_STR_KW).value().data();

// Copy the resolved hierarchy for the source back into the input struct to maintain legacy behavior.
if (!cond_input.contains(RESC_HIER_STR_KW)) {
cond_input[RESC_HIER_STR_KW] = source_cond_input.at(RESC_HIER_STR_KW).value();
cond_input[RESC_HIER_STR_KW] = source_hierarchy;
}

auto& source_replica = get_replica_with_hierarchy(
Expand All @@ -702,23 +706,25 @@ namespace
// If the resolved resource hierarchy does not contain the specified
// resource name or replica number, that means the vote for that resource
// returned as 0.0. Either the replica is inaccessible or it does not exist.
if (irods::experimental::keyword_has_a_value(_inp.condInput, RESC_NAME_KW)) {
const auto resolved_hierarchy = source_cond_input.at(RESC_HIER_STR_KW).value();
const auto resource_name = source_cond_input.at(RESC_NAME_KW).value();
if (!irods::hierarchy_parser{resolved_hierarchy.data()}.resc_in_hier(resource_name.data())) {
THROW(SYS_REPLICA_INACCESSIBLE, fmt::format(
"hierarchy descending from specified source resource name [{}] "
"does not have a replica or the replica is inaccessible at this time",
resource_name));
const auto* source_resource_input = getValByKey(&source_inp.condInput, RESC_NAME_KW);
const auto* replica_number_input = getValByKey(&source_inp.condInput, REPL_NUM_KW);
if (source_resource_input && std::strlen(source_resource_input) > 0) {
if (!irods::hierarchy_parser{source_hierarchy}.resc_in_hier(source_resource_input)) {
THROW(SYS_REPLICA_INACCESSIBLE,
fmt::format("hierarchy descending from specified source resource name [{}] "
"does not have a replica or the replica is inaccessible at this time; ",
"resolved hierarchy:[{}]",
source_resource_input,
source_hierarchy));
}
}
else if (irods::experimental::keyword_has_a_value(_inp.condInput, REPL_NUM_KW)) {
if (const auto replica_number = std::stoi(source_cond_input.at(REPL_NUM_KW).value().data());
else if (replica_number_input && std::strlen(replica_number_input) > 0) {
if (const auto replica_number = std::stoi(replica_number_input);
replica_number != source_replica.repl_num()) {
THROW(SYS_REPLICA_INACCESSIBLE, fmt::format(
"specified source replica number [{}] does not exist "
"or the replica is inaccessible at this time",
replica_number));
THROW(SYS_REPLICA_INACCESSIBLE,
fmt::format("specified source replica number [{}] does not exist "
"or the replica is inaccessible at this time",
replica_number_input));
}
}

Expand All @@ -728,31 +734,100 @@ namespace
auto destination_cond_input = irods::experimental::make_key_value_proxy(destination_inp.condInput);
auto destination_obj = resolve_hierarchy_and_get_data_object_info(_comm, destination_inp, irods::CREATE_OPERATION);

const auto* destination_hierarchy = destination_cond_input.at(DEST_RESC_HIER_STR_KW).value().data();

// Copy the resolved hierarchy for the destination back into the input struct to maintain legacy behavior.
if (!cond_input.contains(DEST_RESC_HIER_STR_KW)) {
cond_input[DEST_RESC_HIER_STR_KW] = destination_cond_input.at(DEST_RESC_HIER_STR_KW).value();
cond_input[DEST_RESC_HIER_STR_KW] = destination_hierarchy;
}

try {
if (irods::experimental::keyword_has_a_value(destination_inp.condInput, DEST_RESC_NAME_KW)) {
const auto resolved_hierarchy = destination_cond_input.at(DEST_RESC_HIER_STR_KW).value();
const auto resource_name = destination_cond_input.at(DEST_RESC_NAME_KW).value();
if (!irods::hierarchy_parser{resolved_hierarchy.data()}.resc_in_hier(resource_name.data())) {
THROW(SYS_REPLICA_INACCESSIBLE, fmt::format(
"hierarchy descending from specified destination resource name [{}] "
"does not have a replica or the replica is inaccessible at this time; "
"resolved hierarchy:[{}]",
resource_name, resolved_hierarchy.data()));
}
// Get the keyword from the original user input to ensure that the client explicitly requested this
// destination resource. If the resolved resource hierarchy does not have the requested destination resource
// in it, this is considered an error because the user's directive could not be carried out as requested.
const auto* destination_resource_input = getValByKey(&_inp.condInput, DEST_RESC_NAME_KW);
if (destination_resource_input && std::strlen(destination_resource_input) > 0 &&
!irods::hierarchy_parser{destination_hierarchy}.resc_in_hier(destination_resource_input))
{
THROW(SYS_REPLICA_INACCESSIBLE,
fmt::format("hierarchy descending from specified destination resource name [{}] "
"does not have a replica or the replica is inaccessible at this time; "
"resolved hierarchy:[{}]",
destination_resource_input,
destination_hierarchy));
}

const auto& destination_replica = get_replica_with_hierarchy(
_comm, destination_obj, destination_cond_input.at(DEST_RESC_HIER_STR_KW).value(),
irods::replication::log_errors::no);

const auto log_errors = cond_input.contains(RECURSIVE_OPR__KW) ? irods::replication::log_errors::no : irods::replication::log_errors::yes;
if (!irods::replication::is_allowed(_comm, source_replica, destination_replica, log_errors)) {
return SYS_NOT_ALLOWED;
const auto log_errors = cond_input.contains(RECURSIVE_OPR__KW);

// If the source replica is not good, it cannot be used to update an existing replica.
if (GOOD_REPLICA != source_replica.replica_status()) {
const std::string msg =
fmt::format("Selected source replica in hierarchy [{}] is not good and will overwrite an "
"existing replica. Replication is not allowed.",
source_replica.resc_hier());

log_api::debug("[{}:{}] - [{}]", __func__, __LINE__, msg);

if (log_errors) {
addRErrorMsg(&_comm.rError, SYS_NOT_ALLOWED, msg.c_str());
}

THROW(SYS_NOT_ALLOWED, msg);
}

// If the destination replica is not at rest, then it is not allowed to be updated.
if (GOOD_REPLICA != destination_replica.replica_status() &&
STALE_REPLICA != destination_replica.replica_status()) {
const std::string msg = fmt::format(
"Selected destination replica in hierarchy [{}] is not at rest. Replication is not allowed.",
destination_replica.resc_hier());

log_api::debug("[{}:{}] - [{}]", __func__, __LINE__, msg);

if (log_errors) {
addRErrorMsg(&_comm.rError, SYS_NOT_ALLOWED, msg.data());
}
}

// At this point, the replication being requested by the client has been fully vetted and is "allowed".
// However, there are two situations in which data movement would be inefficient or dangerous.
//
// If the destination replica is good, it does not need to be updated because it should be identical to the
// good replica that is going to update it.
//
// If the source and destination resource IDs match, that means that the source and destination replicas are
// the same good replica and it would be destructive to try to open it for read and also overwrite it. The
// reason this situation is allowed is because overwriting a good replica with itself can be considered as a
// no-op and in many cases is the result of hierarchy resolution and cannot be avoided in a properly working
// system. All that being said, this action is allowed even if the user explicitly requested it that way.
//
// These situations should NOT result in any data movement. However, the expected behavior in a resource
// hierarchy would be for policy to fire and give the appearance that the replica was updated.
if (source_replica.resc_id() == destination_replica.resc_id() ||
GOOD_REPLICA == destination_replica.replica_status()) {
// PDMOs should not trigger fileModified because fileModified is the PDMO. Bail out here.
if (const auto in_pdmo = cond_input.find(IN_PDMO_KW); in_pdmo != cond_input.cend()) {
log_api::debug("Not triggering fileModified for replicating [{}] - in PDMO [{}]",
destination_obj->logical_path(),
(*in_pdmo).value());
return 0;
}

log_api::debug("Only triggering fileModified for replicating [{}]", destination_obj->logical_path());

// Adding this keyword is required for existing resource plugins to understand what is happening.
addKeyVal(static_cast<KeyValPair*>(&destination_obj->cond_input()),
OPEN_TYPE_KW,
std::to_string(OPEN_FOR_WRITE_TYPE).c_str());
if (const auto err = fileModified(&_comm, destination_obj); !err.ok()) {
THROW(err.code(), err.result());
}

return 0;
}
}
catch (const irods::exception& e) {
Expand Down Expand Up @@ -839,17 +914,68 @@ namespace
destination_replica.resc_hier(),
destination_replica.vote()));

if (irods::replication::is_allowed(_comm, source_replica, destination_replica, irods::replication::log_errors::no) &&
destination_replica.vote() > irv::vote::zero)
{
destination_cond_input[RESC_HIER_STR_KW] = destination_replica.resc_hier();
destination_cond_input[DEST_RESC_HIER_STR_KW] = destination_replica.resc_hier();
// If the source and destination hierarchies match, just skip this one.
if (source_replica.resc_id() == destination_replica.resc_id()) {
log_api::debug(
"[{}:{}] - Destination hierarchy [{}] matches source hierarchy [{}]. Skipping replication.",
__func__,
__LINE__,
destination_replica.resc_hier(),
source_replica.resc_hier());

if (const int ec = replicate_data(_comm, source_inp, destination_inp, _stat);
ec < 0 && status >= 0)
{
status = ec;
}
continue;
}

log_api::info("src hier: [{}], src id: [{}], dest hier: [{}], dest id: [{}]",
source_replica.resc_hier(),
source_replica.resc_id(),
destination_replica.resc_hier(),
destination_replica.resc_id());

// If the destination replica did not vote positively, it should not be updated.
if (destination_replica.vote() <= irv::vote::zero) {
log_api::debug(
"[{}:{}] - Selected destination replica in hierarchy [{}] is not available for update. Replication "
"is not allowed.",
__func__,
__LINE__,
destination_replica.resc_hier());

continue;
}

if (GOOD_REPLICA == destination_replica.replica_status()) {
// If the destination replica is good, it does not need to be updated. Skip it.
log_api::debug("[{}:{}] - Selected destination replica in hierarchy [{}] is already good. Skipping.",
__func__,
__LINE__,
destination_replica.resc_hier());

// fileModified is NOT triggered in this case because all of the replicas are being updated. If it were
// to be triggered, new replicas could be created or existing replicas updated which are not accounted
// for in the irods::file_object over whose replicas we are currently iterating here. This could result
// in a replica getting wastefully or even erroneously overwritten (including the source replica!).

continue;
}

if (STALE_REPLICA != destination_replica.replica_status()) {
// If the destination replica is not good or stale, then it is not at rest and it is not allowed to be
// updated.
log_api::debug("[{}:{}] - Selected destination replica in hierarchy [{}] is not at rest. Replication "
"is not allowed.",
__func__,
__LINE__,
destination_replica.resc_hier());

continue;
}

destination_cond_input[RESC_HIER_STR_KW] = destination_replica.resc_hier();
destination_cond_input[DEST_RESC_HIER_STR_KW] = destination_replica.resc_hier();

if (const int ec = replicate_data(_comm, source_inp, destination_inp, _stat); ec < 0 && status >= 0) {
status = ec;
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/core/src/voting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ namespace detail {

float calculate_for_write(context& ctx)
{
return calculate_with_repl_status(ctx, STALE_REPLICA);
return calculate_with_repl_status(ctx, GOOD_REPLICA);
} // calculate_for_write
} // namespace detail

Expand Down

0 comments on commit 89d65b2

Please sign in to comment.