diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 8e68113f72..30d5df6b0e 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -25,6 +25,7 @@ */ // IWYU pragma: no_include +#include #include // IWYU pragma: no_include #include @@ -33,6 +34,7 @@ #include #include #include +#include #include #include // IWYU pragma: keep #include @@ -2986,17 +2988,17 @@ void server_state::do_update_app_info(const std::string &app_path, void server_state::set_app_envs(const app_env_rpc &env_rpc) { - const configuration_update_app_env_request &request = env_rpc.request(); + const auto &request = env_rpc.request(); if (!request.__isset.keys || !request.__isset.values || - request.keys.size() != request.values.size() || request.keys.size() <= 0) { + request.keys.size() != request.values.size() || request.keys.empty()) { env_rpc.response().err = ERR_INVALID_PARAMETERS; LOG_WARNING("set app envs failed with invalid request"); return; } - const std::vector &keys = request.keys; - const std::vector &values = request.values; - const std::string &app_name = request.app_name; + const auto &keys = request.keys; + const auto &values = request.values; + const auto &app_name = request.app_name; std::ostringstream os; for (size_t i = 0; i < keys.size(); ++i) { @@ -3078,7 +3080,7 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc) } }); - std::shared_ptr app = get_app(app_name); + auto app = get_app(app_name); // The table might be removed just before the callback function is invoked, thus we must // check if this table still exists. @@ -3095,6 +3097,8 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc) return; } + env_rpc.response().err = ERR_OK; + const auto &old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); // Update envs of local memory. @@ -3109,70 +3113,119 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc) void server_state::del_app_envs(const app_env_rpc &env_rpc) { - const configuration_update_app_env_request &request = env_rpc.request(); - if (!request.__isset.keys || request.keys.size() <= 0) { + const auto &request = env_rpc.request(); + if (!request.__isset.keys || request.keys.empty()) { env_rpc.response().err = ERR_INVALID_PARAMETERS; LOG_WARNING("del app envs failed with invalid request"); return; } - const std::vector &keys = request.keys; - const std::string &app_name = request.app_name; - std::ostringstream os; - for (int i = 0; i < keys.size(); i++) { - if (i != 0) - os << ","; - os << keys[i]; - } + const auto &keys = request.keys; + const auto &app_name = request.app_name; + LOG_INFO("del app envs for app({}) from remote({}): keys = {}", app_name, env_rpc.remote_address(), - os.str()); + boost::join(keys, ",")); app_info ainfo; std::string app_path; { + FAIL_POINT_INJECT_NOT_RETURN_F("del_app_envs_failed", [app_name, this](std::string_view s) { + zauto_write_lock l(_lock); + + if (s == "not_found") { + CHECK_EQ(_exist_apps.erase(app_name), 1); + return; + } + + if (s == "dropping") { + gutil::FindOrDie(_exist_apps, app_name)->status = app_status::AS_DROPPING; + return; + } + }); + zauto_read_lock l(_lock); - std::shared_ptr app = get_app(app_name); - if (app == nullptr) { - LOG_WARNING("del app envs failed with invalid app_name({})", app_name); - env_rpc.response().err = ERR_INVALID_PARAMETERS; - env_rpc.response().hint_message = "invalid app name"; + + const auto &app = get_app(app_name); + if (!app) { + LOG_WARNING("del app envs failed since app_name({}) cannot be found", app_name); + env_rpc.response().err = ERR_APP_NOT_EXIST; + env_rpc.response().hint_message = "app cannot be found"; return; - } else { - ainfo = *(reinterpret_cast(app.get())); - app_path = get_app_path(*app); } + + if (app->status == app_status::AS_DROPPING) { + LOG_WARNING("del app envs failed since app(name={}, id={}) is being dropped", + app_name, + app->app_id); + env_rpc.response().err = ERR_BUSY_DROPPING; + env_rpc.response().hint_message = "app is being dropped"; + return; + } + + ainfo = *app; + app_path = get_app_path(*app); } - std::ostringstream oss; - oss << "deleted keys:"; - int deleted = 0; + std::string deleted_keys_info("deleted keys:"); + size_t deleted_count = 0; for (const auto &key : keys) { - if (ainfo.envs.erase(key) > 0) { - oss << std::endl << " " << key; - deleted++; + if (ainfo.envs.erase(key) == 0) { + continue; } + + fmt::format_to(std::back_inserter(deleted_keys_info), "\n {}", key); + ++deleted_count; } - if (deleted == 0) { - LOG_INFO("no key need to delete"); - env_rpc.response().hint_message = "no key need to delete"; + if (deleted_count == 0) { + LOG_INFO("no key needs to be deleted for app({})", app_name); + env_rpc.response().err = ERR_OK; + env_rpc.response().hint_message = "no key needs to be deleted"; return; - } else { - env_rpc.response().hint_message = oss.str(); } + env_rpc.response().hint_message = std::move(deleted_keys_info); + do_update_app_info(app_path, ainfo, [this, app_name, keys, env_rpc](error_code ec) { - CHECK_EQ_MSG(ec, ERR_OK, "update app info to remote storage failed"); + CHECK_EQ_MSG(ec, ERR_OK, "update app({}) info to remote storage failed", app_name); zauto_write_lock l(_lock); - std::shared_ptr app = get_app(app_name); - std::string old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); + + FAIL_POINT_INJECT_NOT_RETURN_F("del_app_envs_failed", [app_name, this](std::string_view s) { + if (s == "dropped_after") { + CHECK_EQ(_exist_apps.erase(app_name), 1); + return; + } + }); + + auto app = get_app(app_name); + + // The table might be removed just before the callback function is invoked, thus we must + // check if this table still exists. + // + // TODO(wangdan): should make updates to remote storage sequential by supporting atomic + // set, otherwise update might be missing. For example, an update is setting the envs + // while another is dropping a table. The update setting the envs does not contain the + // dropped state. Once it is applied by remote storage after another update dropping + // the table, the state of the table would always be non-dropped on remote storage. + if (!app) { + LOG_ERROR("del app envs failed since app({}) has just been dropped", app_name); + env_rpc.response().err = ERR_APP_DROPPED; + env_rpc.response().hint_message = "app has just been dropped"; + return; + } + + env_rpc.response().err = ERR_OK; + + const auto &old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); + for (const auto &key : keys) { app->envs.erase(key); } - std::string new_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); + + const auto &new_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); LOG_INFO("app envs changed: old_envs = {}, new_envs = {}", old_envs, new_envs); }); } diff --git a/src/meta/test/server_state_test.cpp b/src/meta/test/server_state_test.cpp index d84d586fe4..90af811859 100644 --- a/src/meta/test/server_state_test.cpp +++ b/src/meta/test/server_state_test.cpp @@ -132,6 +132,21 @@ class server_state_test return rpc; } + void test_set_app_envs(const std::string &app_name, + const std::vector &env_keys, + const std::vector &env_vals, + const error_code expected_err) + { + configuration_update_app_env_request request; + request.__set_app_name(app_name); + request.__set_op(app_env_operation::type::APP_ENV_OP_SET); + request.__set_keys(env_keys); + request.__set_values(env_vals); + + auto rpc = set_app_envs(request); + ASSERT_EQ(expected_err, rpc.response().err); + } + app_env_rpc del_app_envs(const configuration_update_app_env_request &request) { auto rpc = create_app_env_rpc(request); @@ -141,6 +156,19 @@ class server_state_test return rpc; } + void test_del_app_envs(const std::string &app_name, + const std::vector &env_keys, + const error_code expected_err) + { + configuration_update_app_env_request request; + request.__set_app_name(app_name); + request.__set_op(app_env_operation::type::APP_ENV_OP_DEL); + request.__set_keys(env_keys); + + auto rpc = del_app_envs(request); + ASSERT_EQ(expected_err, rpc.response().err); + } + app_env_rpc clear_app_envs(const configuration_update_app_env_request &request) { auto rpc = create_app_env_rpc(request); @@ -220,22 +248,21 @@ void meta_service_test_app::app_envs_basic_test() test.load_apps({"test_app1", "test_set_app_envs_not_found", "test_set_app_envs_dropping", - "test_set_app_envs_dropped_after"}); + "test_set_app_envs_dropped_after", + "test_del_app_envs_not_found", + "test_del_app_envs_dropping", + "test_del_app_envs_dropped_after"}); #define TEST_SET_APP_ENVS_FAILED(action, err_code) \ std::cout << "test server_state::set_app_envs(" #action ")..." << std::endl; \ do { \ - configuration_update_app_env_request request; \ - request.__set_app_name("test_set_app_envs_" #action); \ - request.__set_op(app_env_operation::type::APP_ENV_OP_SET); \ - request.__set_keys({replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}); \ - request.__set_values({"67108864"}); \ - \ fail::setup(); \ fail::cfg("set_app_envs_failed", "void(" #action ")"); \ \ - auto rpc = test.set_app_envs(request); \ - ASSERT_EQ(err_code, rpc.response().err); \ + test.test_set_app_envs("test_set_app_envs_" #action, \ + {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}, \ + {"67108864"}, \ + err_code); \ \ fail::teardown(); \ } while (0) @@ -255,14 +282,7 @@ void meta_service_test_app::app_envs_basic_test() // Normal case for setting envs. std::cout << "test server_state::set_app_envs(success)..." << std::endl; { - configuration_update_app_env_request request; - request.__set_app_name("test_app1"); - request.__set_op(app_env_operation::type::APP_ENV_OP_SET); - request.__set_keys(keys); - request.__set_values(values); - - auto rpc = test.set_app_envs(request); - ASSERT_EQ(ERR_OK, rpc.response().err); + test.test_set_app_envs("test_app1", keys, values, ERR_OK); const auto &app = test.get_app("test_app1"); ASSERT_TRUE(app); @@ -276,15 +296,39 @@ void meta_service_test_app::app_envs_basic_test() } } - std::cout << "test server_state::del_app_envs()..." << std::endl; - { - configuration_update_app_env_request request; - request.__set_app_name("test_app1"); - request.__set_op(app_env_operation::type::APP_ENV_OP_DEL); - request.__set_keys(del_keys); +#define TEST_DEL_APP_ENVS_FAILED(action, err_code) \ + std::cout << "test server_state::del_app_envs(" #action ")..." << std::endl; \ + do { \ + test.test_set_app_envs("test_del_app_envs_" #action, \ + {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}, \ + {"67108864"}, \ + ERR_OK); \ + \ + fail::setup(); \ + fail::cfg("del_app_envs_failed", "void(" #action ")"); \ + \ + test.test_del_app_envs( \ + "test_del_app_envs_" #action, {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}, err_code); \ + \ + fail::teardown(); \ + } while (0) + + // Failed to deleting envs while table was not found. + TEST_DEL_APP_ENVS_FAILED(not_found, ERR_APP_NOT_EXIST); + + // Failed to deleting envs while table was being dropped as the intermediate state. + TEST_DEL_APP_ENVS_FAILED(dropping, ERR_BUSY_DROPPING); + + // The table was found dropped after the new envs had been persistent on the remote + // meta storage. + TEST_DEL_APP_ENVS_FAILED(dropped_after, ERR_APP_DROPPED); - auto rpc = test.del_app_envs(request); - ASSERT_EQ(ERR_OK, rpc.response().err); +#undef TEST_DEL_APP_ENVS_FAILED + + // Normal case for deleting envs. + std::cout << "test server_state::del_app_envs(success)..." << std::endl; + { + test.test_del_app_envs("test_app1", del_keys, ERR_OK); const auto &app = test.get_app("test_app1"); ASSERT_TRUE(app);