Skip to content

Commit

Permalink
Long running for index service.
Browse files Browse the repository at this point in the history
Main changes

add concurrent UT for index tree
change btree_test.py (add device list and run time)
introduce run_time
  • Loading branch information
shosseinimotlagh committed Nov 11, 2023
1 parent e74e78f commit 7c0dd5a
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 20 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "4.8.2"
version = "4.8.3"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
7 changes: 5 additions & 2 deletions src/tests/btree_helpers/btree_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct BtreeTestHelper : public testing::Test {

if (m_is_multi_threaded) {
std::mutex mtx;
m_run_time = SISL_OPTIONS["run_time"].as< uint32_t >();
iomanager.run_on_wait(iomgr::reactor_regex::all_io, [this, &mtx]() {
auto fv = iomanager.sync_io_capable_fibers();
std::unique_lock lg(mtx);
Expand All @@ -73,6 +74,7 @@ struct BtreeTestHelper : public testing::Test {
BtreeConfig m_cfg{g_node_size};
uint32_t m_max_range_input{1000};
bool m_is_multi_threaded{false};
uint32_t m_run_time{0};

std::map< std::string, op_func_t > m_operations;
std::vector< iomgr::io_fiber_t > m_fibers;
Expand Down Expand Up @@ -378,8 +380,9 @@ struct BtreeTestHelper : public testing::Test {

// Construct a weighted distribution based on the input frequencies
std::discrete_distribution< uint32_t > s_rand_op_generator(weights.begin(), weights.end());

for (uint32_t i = 0; i < num_iters_per_thread; i++) {
auto m_start_time = Clock::now();
auto time_to_stop = [this, m_start_time]() {return (get_elapsed_time_sec(m_start_time) > m_run_time);};
for (uint32_t i = 0; i < num_iters_per_thread && !time_to_stop(); i++) {
uint32_t op_idx = s_rand_op_generator(re);
(this->m_operations[op_list[op_idx].first])();
}
Expand Down
11 changes: 7 additions & 4 deletions src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const std::string USER_WANT_DIRECT_IO{"USER_WANT_DIRECT_IO"}; // u
SISL_OPTION_GROUP(test_common_setup,
(num_threads, "", "num_threads", "number of threads",
::cxxopts::value< uint32_t >()->default_value("2"), "number"),
(num_fibers, "", "num_fibers", "number of fibers per thread",
::cxxopts::value< uint32_t >()->default_value("2"), "number"),
(num_devs, "", "num_devs", "number of devices to create",
::cxxopts::value< uint32_t >()->default_value("2"), "number"),
(dev_size_mb, "", "dev_size_mb", "size of each device in MB",
Expand Down Expand Up @@ -111,7 +113,8 @@ class HSTestHelper {
hs_before_services_starting_cb_t cb = nullptr, bool restart = false) {
auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >();
auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024;
auto nthreads = SISL_OPTIONS["num_threads"].as< uint32_t >();
auto num_threads = SISL_OPTIONS["num_threads"].as< uint32_t >();
auto num_fibers = SISL_OPTIONS["num_fibers"].as< uint32_t >();
auto is_spdk = SISL_OPTIONS["spdk"].as< bool >();

if (restart) {
Expand Down Expand Up @@ -145,11 +148,11 @@ class HSTestHelper {

if (is_spdk) {
LOGINFO("Spdk with more than 2 threads will cause overburden test systems, changing nthreads to 2");
nthreads = 2;
num_threads = 2;
}

LOGINFO("Starting iomgr with {} threads, spdk: {}", nthreads, is_spdk);
ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = nthreads, .is_spdk = is_spdk});
LOGINFO("Starting iomgr with {} threads, spdk: {}", num_threads, is_spdk);
ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = num_threads, .is_spdk = is_spdk, .num_fibers = num_fibers});

auto const http_port = SISL_OPTIONS["http_port"].as< int >();
if (http_port != 0) {
Expand Down
97 changes: 97 additions & 0 deletions src/tests/test_index_btree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ SISL_OPTION_GROUP(test_index_btree,
::cxxopts::value< uint32_t >()->default_value("500"), "number"),
(num_entries, "", "num_entries", "number of entries to test with",
::cxxopts::value< uint32_t >()->default_value("5000"), "number"),
(run_time, "", "run_time", "run time for io", ::cxxopts::value< uint32_t >()->default_value("360000"), "seconds"),
(disable_merge, "", "disable_merge", "disable_merge", ::cxxopts::value< bool >()->default_value("0"), ""),
(operation_list, "", "operation_list", "operation list instead of default created following by percentage",
::cxxopts::value< std::vector< std::string > >(), "operations [...]"),
(preload_size, "", "preload_size", "number of entries to preload tree with",
::cxxopts::value< uint32_t >()->default_value("1000"), "number"),
(seed, "", "seed", "random engine seed, use random if not defined",
::cxxopts::value< uint64_t >()->default_value("0"), "number"))

Expand Down Expand Up @@ -449,6 +455,97 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) {
LOGINFO("ThreadedCpFlush test end");
}

template < typename TestType >
struct BtreeConcurrentTest : public BtreeTestHelper< TestType > {

using T = TestType;
using K = typename TestType::KeyType;
using V = typename TestType::ValueType;
class TestIndexServiceCallbacks : public IndexServiceCallbacks {
public:
TestIndexServiceCallbacks(BtreeConcurrentTest* test) : m_test(test) {}
std::shared_ptr< IndexTableBase > on_index_table_found(const superblk< index_table_sb >& sb) override {
LOGINFO("Index table recovered");
LOGINFO("Root bnode_id {} version {}", sb->root_node, sb->link_version);
m_test->m_bt = std::make_shared< typename T::BtreeType >(sb, m_test->m_cfg);
return m_test->m_bt;
}

private:
BtreeConcurrentTest* m_test;
};

BtreeConcurrentTest() { this->m_is_multi_threaded = true; }

void SetUp() override {
test_common::HSTestHelper::start_homestore(
"test_index_btree",
{{HS_SERVICE::META, {.size_pct = 10.0}},
{HS_SERVICE::INDEX, {.size_pct = 70.0, .index_svc_cbs = new TestIndexServiceCallbacks(this)}}});

LOGINFO("Node size {} ", hs()->index_service().node_size());
this->m_cfg = BtreeConfig(hs()->index_service().node_size());

auto uuid = boost::uuids::random_generator()();
auto parent_uuid = boost::uuids::random_generator()();

// Test cp flush of write back.
HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) {
s.generic.cache_max_throttle_cnt = 10000;
HS_SETTINGS_FACTORY().save();
});
homestore::hs()->resource_mgr().reset_dirty_buf_qd();

// Create index table and attach to index service.
BtreeTestHelper< TestType >::SetUp();
this->m_bt = std::make_shared< typename T::BtreeType >(uuid, parent_uuid, 0, this->m_cfg);
hs()->index_service().add_index_table(this->m_bt);
LOGINFO("Added index table to index service");
}

void TearDown() override {
BtreeTestHelper< TestType >::TearDown();
test_common::HSTestHelper::shutdown_homestore();
}
};

TYPED_TEST_SUITE(BtreeConcurrentTest, BtreeTypes);
TYPED_TEST(BtreeConcurrentTest, ConcurrentAllOps) {
// range put is not supported for non-extent keys
std::vector< std::string > input_ops = {"put:20", "remove:20", "range_put:20", "range_remove:20", "query:20"};
std::vector< std::pair< std::string, int > > ops;
if (SISL_OPTIONS.count("operation_list")) {
input_ops = SISL_OPTIONS["operation_list"].as< std::vector< std::string > >();
}
int total = std::accumulate(input_ops.begin(), input_ops.end(), 0, [](int sum, const auto& str) {
std::vector< std::string > tokens;
boost::split(tokens, str, boost::is_any_of(":"));
if (tokens.size() == 2) {
try {
return sum + std::stoi(tokens[1]);
} catch (const std::exception&) {
// Invalid frequency, ignore this element
}
}
return sum; // Ignore malformed strings
});

std::transform(input_ops.begin(), input_ops.end(), std::back_inserter(ops), [total](const auto& str) {
std::vector< std::string > tokens;
boost::split(tokens, str, boost::is_any_of(":"));
if (tokens.size() == 2) {
try {
return std::make_pair(tokens[0], (int)(100.0 * std::stoi(tokens[1]) / total));
} catch (const std::exception&) {
// Invalid frequency, ignore this element
}
}
return std::make_pair(std::string(), 0);
});

this->multi_op_execute(ops);
}

int main(int argc, char* argv[]) {
int parsed_argc{argc};
::testing::InitGoogleTest(&parsed_argc, argv);
Expand Down
4 changes: 2 additions & 2 deletions src/tests/test_mem_btree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ SISL_OPTION_GROUP(
(num_entries, "", "num_entries", "number of entries to test with",
::cxxopts::value< uint32_t >()->default_value("10000"), "number"),
(disable_merge, "", "disable_merge", "disable_merge", ::cxxopts::value< bool >()->default_value("0"), ""),
(n_threads, "", "num_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"),
(n_fibers, "", "num_fibers", "number of fibers", ::cxxopts::value< uint32_t >()->default_value("10"), "number"),
(num_threads, "", "num_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"),
(num_fibers, "", "num_fibers", "number of fibers", ::cxxopts::value< uint32_t >()->default_value("10"), "number"),
(operation_list, "", "operation_list", "operation list instead of default created following by percentage",
::cxxopts::value< std::vector< std::string > >(), "operations [...]"),
(preload_size, "", "preload_size", "number of entries to preload tree with",
Expand Down
35 changes: 24 additions & 11 deletions src/tests/test_scripts/btree_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@

opts, args = getopt.getopt(sys.argv[1:], 'tdlme:',
['test_suits=', 'dirpath=', 'op_list=', 'log_mods=', 'threads=', 'fibers=', 'preload_size=',
'op_list=', 'num_entries=', 'num_iters='])
'op_list=', 'num_entries=', 'num_iters=', 'dev_list=', 'run_time='])
test_suits = ""
dirpath = "./"
op_list = ""
log_mods = ""
threads = " --n_threads=10"
fibers = " --n_fibers=10"
preload_size = " --preload_size=2000"
num_entries = " --num_entries=10000"
num_iters = " --num_iters=1000000"
threads = " --num_threads=10"
fibers = " --num_fibers=10"
preload_size = " --preload_size=16384"
num_entries = " --num_entries=65536"
num_iters = " --num_iters=10000000"
run_time = " --run_time=36000"
dev_list = ""

for opt, arg in opts:
if opt in ('-t', '--test_suits'):
Expand All @@ -38,33 +40,44 @@
log_mods = arg
print("log_mods (%s)" % arg)
if opt in ('-f', '--fibers'):
fibers = " --n_fibers=" + arg
fibers = " --num_fibers=" + arg
print("number of fibers per thread (%s)" % arg)
if opt in ('-p', '--preload_size'):
preload_size = " --preload_size=" + arg
print("preload_size = (%s)" % arg)
if opt in ('-t', '--threads'):
threads = " --n_threads=" + arg
threads = " --num_threads=" + arg
print("number of threads (%s)" % arg)
if opt in ('-n', '--num_entries'):
num_entries = " --num_entries=" + arg
print("number of entries (%s)" % arg)
if opt in ('-i', '--num_iters'):
num_iters = " --num_iters=" + arg
print("number of iterations (%s)" % arg)
if opt in ('-r', '--run_time'):
run_time = " --run_time=" + arg
print("total run time (%s)" % arg)
if opt in ('-v', '--dev_list'):
dev_list = arg
print(("device list (%s)") % (arg))

operations = ""
if bool(op_list and op_list.strip()):
operations = ''.join([f' --operation_list={op}' for op in op_list.split()])

btree_options = num_entries + num_iters + preload_size + fibers + threads + operations
addln_opts = ' '
if bool(dev_list and dev_list.strip()):
addln_opts += ' --device_list '
addln_opts += dev_list

btree_options = num_entries + num_iters + preload_size + fibers + threads + operations + run_time + addln_opts


def normal():
print("normal test started with (%s)" % btree_options)
# " --operation_list=query:20 --operation_list=put:20 --operation_list=remove:20"
cmd_opts = " --gtest_filter=BtreeConcurrentTest/*.AllTree" + btree_options + " "+log_mods
subprocess.check_call(dirpath + "test_mem_btree " + cmd_opts, stderr=subprocess.STDOUT, shell=True)
cmd_opts = " --gtest_filter=BtreeConcurrentTest/*.ConcurrentAllOps" + btree_options + " "+log_mods
subprocess.check_call(dirpath + "test_index_btree " + cmd_opts, stderr=subprocess.STDOUT, shell=True)
print("normal test completed")


Expand Down

0 comments on commit 7c0dd5a

Please sign in to comment.