diff --git a/plugins/resources/replication/src/irods_create_write_replicator.cpp b/plugins/resources/replication/src/irods_create_write_replicator.cpp index 209c6c9f59..5b1ddda2f6 100644 --- a/plugins/resources/replication/src/irods_create_write_replicator.cpp +++ b/plugins/resources/replication/src/irods_create_write_replicator.cpp @@ -57,8 +57,6 @@ namespace irods { copyKeyVal( ( keyValPair_t* )&object.cond_input(), &dataObjInp.condInput ); addKeyVal( &dataObjInp.condInput, RESC_HIER_STR_KW, child_.c_str() ); addKeyVal( &dataObjInp.condInput, DEST_RESC_HIER_STR_KW, hierarchy_string.c_str() ); - addKeyVal( &dataObjInp.condInput, RESC_NAME_KW, root_resource_.c_str() ); - addKeyVal( &dataObjInp.condInput, DEST_RESC_NAME_KW, root_resource_.c_str() ); addKeyVal( &dataObjInp.condInput, IN_PDMO_KW, sub_hier.c_str() ); try { diff --git a/server/api/src/rsDataObjRepl.cpp b/server/api/src/rsDataObjRepl.cpp index 554a152cf5..88a41344ab 100644 --- a/server/api/src/rsDataObjRepl.cpp +++ b/server/api/src/rsDataObjRepl.cpp @@ -8,6 +8,7 @@ #include "irods/dataObjPut.h" #include "irods/dataObjRepl.h" #include "irods/dataObjTrim.h" +#include "irods/fileDriver.hpp" #include "irods/fileStageToCache.h" #include "irods/fileSyncToArch.h" #include "irods/getRemoteZoneResc.h" @@ -77,6 +78,7 @@ namespace namespace ir = irods::experimental::replica; namespace irv = irods::experimental::resource::voting; namespace rst = irods::replica_state_table; + using log_api = irods::experimental::log::api; auto apply_static_peps(RsComm& _comm, l1desc& _l1desc, const int _operation_status) -> void { @@ -690,9 +692,11 @@ namespace auto source_cond_input = irods::experimental::make_key_value_proxy(source_inp.condInput); auto source_obj = resolve_hierarchy_and_get_data_object_info(_comm, source_inp, irods::OPEN_OPERATION); + const auto* source_hierarchy = source_cond_input.at(RESC_HIER_STR_KW).value().data(); + // Copy the resolved hierarchy for the source back into the input struct to maintain legacy behavior. if (!cond_input.contains(RESC_HIER_STR_KW)) { - cond_input[RESC_HIER_STR_KW] = source_cond_input.at(RESC_HIER_STR_KW).value(); + cond_input[RESC_HIER_STR_KW] = source_hierarchy; } auto& source_replica = get_replica_with_hierarchy( @@ -702,23 +706,25 @@ namespace // If the resolved resource hierarchy does not contain the specified // resource name or replica number, that means the vote for that resource // returned as 0.0. Either the replica is inaccessible or it does not exist. - if (irods::experimental::keyword_has_a_value(_inp.condInput, RESC_NAME_KW)) { - const auto resolved_hierarchy = source_cond_input.at(RESC_HIER_STR_KW).value(); - const auto resource_name = source_cond_input.at(RESC_NAME_KW).value(); - if (!irods::hierarchy_parser{resolved_hierarchy.data()}.resc_in_hier(resource_name.data())) { - THROW(SYS_REPLICA_INACCESSIBLE, fmt::format( - "hierarchy descending from specified source resource name [{}] " - "does not have a replica or the replica is inaccessible at this time", - resource_name)); + const auto* source_resource_input = getValByKey(&source_inp.condInput, RESC_NAME_KW); + const auto* replica_number_input = getValByKey(&source_inp.condInput, REPL_NUM_KW); + if (source_resource_input && std::strlen(source_resource_input) > 0) { + if (!irods::hierarchy_parser{source_hierarchy}.resc_in_hier(source_resource_input)) { + THROW(SYS_REPLICA_INACCESSIBLE, + fmt::format("hierarchy descending from specified source resource name [{}] " + "does not have a replica or the replica is inaccessible at this time; ", + "resolved hierarchy:[{}]", + source_resource_input, + source_hierarchy)); } } - else if (irods::experimental::keyword_has_a_value(_inp.condInput, REPL_NUM_KW)) { - if (const auto replica_number = std::stoi(source_cond_input.at(REPL_NUM_KW).value().data()); + else if (replica_number_input && std::strlen(replica_number_input) > 0) { + if (const auto replica_number = std::stoi(replica_number_input); replica_number != source_replica.repl_num()) { - THROW(SYS_REPLICA_INACCESSIBLE, fmt::format( - "specified source replica number [{}] does not exist " - "or the replica is inaccessible at this time", - replica_number)); + THROW(SYS_REPLICA_INACCESSIBLE, + fmt::format("specified source replica number [{}] does not exist " + "or the replica is inaccessible at this time", + replica_number_input)); } } @@ -728,31 +734,100 @@ namespace auto destination_cond_input = irods::experimental::make_key_value_proxy(destination_inp.condInput); auto destination_obj = resolve_hierarchy_and_get_data_object_info(_comm, destination_inp, irods::CREATE_OPERATION); + const auto* destination_hierarchy = destination_cond_input.at(DEST_RESC_HIER_STR_KW).value().data(); + // Copy the resolved hierarchy for the destination back into the input struct to maintain legacy behavior. if (!cond_input.contains(DEST_RESC_HIER_STR_KW)) { - cond_input[DEST_RESC_HIER_STR_KW] = destination_cond_input.at(DEST_RESC_HIER_STR_KW).value(); + cond_input[DEST_RESC_HIER_STR_KW] = destination_hierarchy; } try { - if (irods::experimental::keyword_has_a_value(destination_inp.condInput, DEST_RESC_NAME_KW)) { - const auto resolved_hierarchy = destination_cond_input.at(DEST_RESC_HIER_STR_KW).value(); - const auto resource_name = destination_cond_input.at(DEST_RESC_NAME_KW).value(); - if (!irods::hierarchy_parser{resolved_hierarchy.data()}.resc_in_hier(resource_name.data())) { - THROW(SYS_REPLICA_INACCESSIBLE, fmt::format( - "hierarchy descending from specified destination resource name [{}] " - "does not have a replica or the replica is inaccessible at this time; " - "resolved hierarchy:[{}]", - resource_name, resolved_hierarchy.data())); - } + // Get the keyword from the original user input to ensure that the client explicitly requested this + // destination resource. If the resolved resource hierarchy does not have the requested destination resource + // in it, this is considered an error because the user's directive could not be carried out as requested. + const auto* destination_resource_input = getValByKey(&_inp.condInput, DEST_RESC_NAME_KW); + if (destination_resource_input && std::strlen(destination_resource_input) > 0 && + !irods::hierarchy_parser{destination_hierarchy}.resc_in_hier(destination_resource_input)) + { + THROW(SYS_REPLICA_INACCESSIBLE, + fmt::format("hierarchy descending from specified destination resource name [{}] " + "does not have a replica or the replica is inaccessible at this time; " + "resolved hierarchy:[{}]", + destination_resource_input, + destination_hierarchy)); } const auto& destination_replica = get_replica_with_hierarchy( _comm, destination_obj, destination_cond_input.at(DEST_RESC_HIER_STR_KW).value(), irods::replication::log_errors::no); - const auto log_errors = cond_input.contains(RECURSIVE_OPR__KW) ? irods::replication::log_errors::no : irods::replication::log_errors::yes; - if (!irods::replication::is_allowed(_comm, source_replica, destination_replica, log_errors)) { - return SYS_NOT_ALLOWED; + const auto log_errors = cond_input.contains(RECURSIVE_OPR__KW); + + // If the source replica is not good, it cannot be used to update an existing replica. + if (GOOD_REPLICA != source_replica.replica_status()) { + const std::string msg = + fmt::format("Selected source replica in hierarchy [{}] is not good and will overwrite an " + "existing replica. Replication is not allowed.", + source_replica.resc_hier()); + + log_api::debug("[{}:{}] - [{}]", __func__, __LINE__, msg); + + if (log_errors) { + addRErrorMsg(&_comm.rError, SYS_NOT_ALLOWED, msg.c_str()); + } + + THROW(SYS_NOT_ALLOWED, msg); + } + + // If the destination replica is not at rest, then it is not allowed to be updated. + if (GOOD_REPLICA != destination_replica.replica_status() && + STALE_REPLICA != destination_replica.replica_status()) { + const std::string msg = fmt::format( + "Selected destination replica in hierarchy [{}] is not at rest. Replication is not allowed.", + destination_replica.resc_hier()); + + log_api::debug("[{}:{}] - [{}]", __func__, __LINE__, msg); + + if (log_errors) { + addRErrorMsg(&_comm.rError, SYS_NOT_ALLOWED, msg.data()); + } + } + + // At this point, the replication being requested by the client has been fully vetted and is "allowed". + // However, there are two situations in which data movement would be inefficient or dangerous. + // + // If the destination replica is good, it does not need to be updated because it should be identical to the + // good replica that is going to update it. + // + // If the source and destination resource IDs match, that means that the source and destination replicas are + // the same good replica and it would be destructive to try to open it for read and also overwrite it. The + // reason this situation is allowed is because overwriting a good replica with itself can be considered as a + // no-op and in many cases is the result of hierarchy resolution and cannot be avoided in a properly working + // system. All that being said, this action is allowed even if the user explicitly requested it that way. + // + // These situations should NOT result in any data movement. However, the expected behavior in a resource + // hierarchy would be for policy to fire and give the appearance that the replica was updated. + if (source_replica.resc_id() == destination_replica.resc_id() || + GOOD_REPLICA == destination_replica.replica_status()) { + // PDMOs should not trigger fileModified because fileModified is the PDMO. Bail out here. + if (const auto in_pdmo = cond_input.find(IN_PDMO_KW); in_pdmo != cond_input.cend()) { + log_api::debug("Not triggering fileModified for replicating [{}] - in PDMO [{}]", + destination_obj->logical_path(), + (*in_pdmo).value()); + return 0; + } + + log_api::debug("Only triggering fileModified for replicating [{}]", destination_obj->logical_path()); + + // Adding this keyword is required for existing resource plugins to understand what is happening. + addKeyVal(static_cast(&destination_obj->cond_input()), + OPEN_TYPE_KW, + std::to_string(OPEN_FOR_WRITE_TYPE).c_str()); + if (const auto err = fileModified(&_comm, destination_obj); !err.ok()) { + THROW(err.code(), err.result()); + } + + return 0; } } catch (const irods::exception& e) { @@ -839,17 +914,68 @@ namespace destination_replica.resc_hier(), destination_replica.vote())); - if (irods::replication::is_allowed(_comm, source_replica, destination_replica, irods::replication::log_errors::no) && - destination_replica.vote() > irv::vote::zero) - { - destination_cond_input[RESC_HIER_STR_KW] = destination_replica.resc_hier(); - destination_cond_input[DEST_RESC_HIER_STR_KW] = destination_replica.resc_hier(); + // If the source and destination hierarchies match, just skip this one. + if (source_replica.resc_id() == destination_replica.resc_id()) { + log_api::debug( + "[{}:{}] - Destination hierarchy [{}] matches source hierarchy [{}]. Skipping replication.", + __func__, + __LINE__, + destination_replica.resc_hier(), + source_replica.resc_hier()); - if (const int ec = replicate_data(_comm, source_inp, destination_inp, _stat); - ec < 0 && status >= 0) - { - status = ec; - } + continue; + } + + log_api::info("src hier: [{}], src id: [{}], dest hier: [{}], dest id: [{}]", + source_replica.resc_hier(), + source_replica.resc_id(), + destination_replica.resc_hier(), + destination_replica.resc_id()); + + // If the destination replica did not vote positively, it should not be updated. + if (destination_replica.vote() <= irv::vote::zero) { + log_api::debug( + "[{}:{}] - Selected destination replica in hierarchy [{}] is not available for update. Replication " + "is not allowed.", + __func__, + __LINE__, + destination_replica.resc_hier()); + + continue; + } + + if (GOOD_REPLICA == destination_replica.replica_status()) { + // If the destination replica is good, it does not need to be updated. Skip it. + log_api::debug("[{}:{}] - Selected destination replica in hierarchy [{}] is already good. Skipping.", + __func__, + __LINE__, + destination_replica.resc_hier()); + + // fileModified is NOT triggered in this case because all of the replicas are being updated. If it were + // to be triggered, new replicas could be created or existing replicas updated which are not accounted + // for in the irods::file_object over whose replicas we are currently iterating here. This could result + // in a replica getting wastefully or even erroneously overwritten (including the source replica!). + + continue; + } + + if (STALE_REPLICA != destination_replica.replica_status()) { + // If the destination replica is not good or stale, then it is not at rest and it is not allowed to be + // updated. + log_api::debug("[{}:{}] - Selected destination replica in hierarchy [{}] is not at rest. Replication " + "is not allowed.", + __func__, + __LINE__, + destination_replica.resc_hier()); + + continue; + } + + destination_cond_input[RESC_HIER_STR_KW] = destination_replica.resc_hier(); + destination_cond_input[DEST_RESC_HIER_STR_KW] = destination_replica.resc_hier(); + + if (const int ec = replicate_data(_comm, source_inp, destination_inp, _stat); ec < 0 && status >= 0) { + status = ec; } } diff --git a/server/core/src/voting.cpp b/server/core/src/voting.cpp index 5ab9ace9da..63f21bedb4 100644 --- a/server/core/src/voting.cpp +++ b/server/core/src/voting.cpp @@ -213,7 +213,7 @@ namespace detail { float calculate_for_write(context& ctx) { - return calculate_with_repl_status(ctx, STALE_REPLICA); + return calculate_with_repl_status(ctx, GOOD_REPLICA); } // calculate_for_write } // namespace detail