From 0bba004b1dbe917e1ba6975519a09ba682d28162 Mon Sep 17 00:00:00 2001 From: Zexi Liu Date: Tue, 8 Oct 2024 12:47:50 -0700 Subject: [PATCH] Support flexiable num of L0 (blocking approach) --- src/table_compaction.cc | 9 ++--- src/table_mgr.cc | 64 +++++++++++++++++++++++++++++++---- src/table_mgr.h | 6 ++-- src/table_set_batch.cc | 6 ++-- src/table_writer.cc | 4 ++- src/table_writer.h | 1 + tests/jungle/basic_op_test.cc | 41 +++++++++++++++++----- 7 files changed, 107 insertions(+), 24 deletions(-) diff --git a/src/table_compaction.cc b/src/table_compaction.cc index 083b9db..615e3b7 100644 --- a/src/table_compaction.cc +++ b/src/table_compaction.cc @@ -28,8 +28,8 @@ namespace jungle { // Status TableMgr::compactLevelItr(const CompactOptions& options, TableInfo* victim_table, - size_t level) -{ + size_t level, + bool adjusting_num_l0) { if (level >= mani->getNumLevels()) return Status::INVALID_LEVEL; Status s; @@ -376,7 +376,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options, // 1) number of files after split, and // 2) min keys for each new file. do { - if (!isCompactionAllowed()) { + if (!isCompactionAllowed() && !adjusting_num_l0) { throw Status(Status::COMPACTION_CANCELLED); } @@ -445,6 +445,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options, ? &twh.leasedWriters[worker_idx]->writerArgs : &local_args; w_args->callerAwaiter.reset(); + w_args->adjustingNumL0 = adjusting_num_l0; uint64_t count = (jj + 1 == num_new_tables) ? offsets.size() - new_tables[jj]->index @@ -482,7 +483,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options, } } - if (!isCompactionAllowed()) { + if (!isCompactionAllowed() && !adjusting_num_l0) { // NOTE: keys will be freed below. for (LsmFlushResult& rr: results) delete rr.tFile; throw Status(Status::COMPACTION_CANCELLED); diff --git a/src/table_mgr.cc b/src/table_mgr.cc index 0bd5a0f..9766116 100644 --- a/src/table_mgr.cc +++ b/src/table_mgr.cc @@ -182,12 +182,6 @@ Status TableMgr::init(const TableMgrOptions& _options) { opt.compressionEnabled = true; } - compactStatus.resize(db_config->numL0Partitions); - for (size_t ii=0; ii*& entry = compactStatus[ii]; - entry = new std::atomic(false); - } - Status s; mani = new TableManifest(this, opt.fOps); mani->setLogger(myLog); @@ -252,6 +246,57 @@ Status TableMgr::init(const TableMgrOptions& _options) { } } + // Adjust num L0 partitions blockingly only when it is not logSectionOnly mdoe, + // not read only mode, and the number of L0 partitions read from existing manifest + // file is different from the number specified in db config. + if (!db_config->logSectionOnly && !db_config->readOnly + && numL0Partitions != db_config->numL0Partitions) { + _log_info(myLog, + "adjust numL0 partitions: %zu -> %zu", + numL0Partitions, + db_config->numL0Partitions); + + if (!db_config->nextLevelExtension) { + _log_err(myLog, "[Adjust numL0] not allowed in L0 only mode"); + throw Status(Status::INVALID_CONFIG); + } + + // Need to compact all existing L0 tables to L1 and recreate empty L0 tables, + // otherwise hash will be messed up. + for (size_t ii = 0; ii < numL0Partitions; ++ii) { + // Force compact L0 table to L1 in blocking manner to reduce L0 + // partitions. + std::list tables; + s = mani->getL0Tables(ii, tables); + if (tables.size() != 1 || !s) { + _log_err(myLog, "[Adjust numL0] tables of hash %zu not found", ii); + throw s; + } + s = compactLevelItr(CompactOptions(), tables.back(), 0, true); + if (!s) { + _log_err(myLog, "[Adjust numL0] compaction error: %d", s); + throw s; + } + // The compacted table is remove from manifest in compactLevelItr, + // just release + for (TableInfo*& table: tables) { + table->done(); + } + } + for (size_t ii = 0; ii < db_config->numL0Partitions; ++ii) { + TableFile* t_file = nullptr; + TableFileOptions t_opt; + // Set 1M bits as an initial size. + // It will be converging to some value as compaction happens. + t_opt.bloomFilterSize = 1024 * 1024; + EP(createNewTableFile(0, t_file, t_opt)); + EP(mani->addTableFile(0, ii, SizedBuf(), t_file)); + } + // Store manifest file. + mani->store(true); + numL0Partitions = db_config->numL0Partitions; + } + } else { // Not exist, initial setup phase. @@ -287,6 +332,13 @@ Status TableMgr::init(const TableMgrOptions& _options) { // Store manifest file. mani->store(true); } + + compactStatus.resize(numL0Partitions); + for (size_t ii = 0; ii < compactStatus.size(); ++ii) { + std::atomic*& entry = compactStatus[ii]; + entry = new std::atomic(false); + } + logTableSettings(db_config); removeStaleFiles(); diff --git a/src/table_mgr.h b/src/table_mgr.h index c395f13..077ce02 100644 --- a/src/table_mgr.h +++ b/src/table_mgr.h @@ -236,7 +236,8 @@ class TableMgr { Status compactLevelItr(const CompactOptions& options, TableInfo* victim_table, - size_t level); + size_t level, + bool adjust_num_l0 = false); Status migrateLevel(const CompactOptions& options, size_t level); @@ -300,7 +301,8 @@ class TableMgr { TableFile* dst_file, std::vector& offsets, uint64_t start_index, - uint64_t count); + uint64_t count, + bool adjusting_num_l0 = false); void setTableFileItrFlush(TableFile* dst_file, std::list& recs_batch, diff --git a/src/table_set_batch.cc b/src/table_set_batch.cc index 254db0b..2e723e3 100644 --- a/src/table_set_batch.cc +++ b/src/table_set_batch.cc @@ -55,8 +55,8 @@ void TableMgr::setTableFileOffset( std::list& checkpoints, TableFile* dst_file, std::vector& offsets, uint64_t start_index, - uint64_t count ) -{ + uint64_t count, + bool adjusting_num_l0 ) { const DBConfig* db_config = getDbConfig(); (void)db_config; @@ -86,7 +86,7 @@ void TableMgr::setTableFileOffset( std::list& checkpoints, try { for (uint64_t ii = start_index; ii < start_index + count; ++ii) { - if (!isCompactionAllowed()) { + if (!isCompactionAllowed() && !adjusting_num_l0 ) { // To avoid file corruption, we should flush all cached pages // even for cancel. Timer cancel_timer; diff --git a/src/table_writer.cc b/src/table_writer.cc index e3d3187..badb0fb 100644 --- a/src/table_writer.cc +++ b/src/table_writer.cc @@ -28,6 +28,7 @@ TableWriterArgs::TableWriterArgs() : writerId(0) , stopSignal(false) , myLog(nullptr) + , adjustingNumL0(false) {} void TableWriterArgs::invoke() { @@ -170,7 +171,8 @@ void TableWriterMgr::doTableWrite(TableWriterArgs* args) { args->payload.targetTableFile, *args->payload.offsets, args->payload.startIdx, - args->payload.count ); + args->payload.count, + args->adjustingNumL0.load() ); } } diff --git a/src/table_writer.h b/src/table_writer.h index 34a0699..35a01f0 100644 --- a/src/table_writer.h +++ b/src/table_writer.h @@ -162,6 +162,7 @@ struct TableWriterArgs { std::atomic stopSignal; TableWritePayload payload; SimpleLogger* myLog; + std::atomic adjustingNumL0; }; struct TableWriterPkg { diff --git a/tests/jungle/basic_op_test.cc b/tests/jungle/basic_op_test.cc index c55e970..3f6c1cc 100644 --- a/tests/jungle/basic_op_test.cc +++ b/tests/jungle/basic_op_test.cc @@ -1778,42 +1778,67 @@ int different_l0_partitions() { config.numL0Partitions = 1; CHK_Z(jungle::DB::open(&db, filename, config)); int n = 10; + + // Insert, flush, and check std::vector kv(n); - CHK_Z(_init_kv_pairs(n, kv, "key", "value")); + CHK_Z(_init_kv_pairs(n, kv, "key1", "value1")); CHK_Z(_set_bykey_kv_pairs(0, n, db, kv)); CHK_Z(db->sync()); CHK_Z(db->flushLogs(jungle::FlushOptions())); + CHK_Z(_get_bykey_check(0, n, db, kv)); CHK_Z(jungle::DB::close(db)); - // Change the number of partitions, - // but it should be ignored internally. + // Increase the number of partitions, it should be handle correctly internally. config.numL0Partitions = 4; - // Reopen & check. CHK_Z(jungle::DB::open(&db, filename, config)); CHK_Z(_get_bykey_check(0, n, db, kv)); - // Insert more. + // Insert more, flush and check. std::vector kv2(n); - CHK_Z(_init_kv_pairs(n, kv2, "key_new", "value_new")); + CHK_Z(_init_kv_pairs(n, kv2, "key2", "value2")); CHK_Z(_set_bykey_kv_pairs(0, n, db, kv2)); + std::vector kv3(n); + CHK_Z(_init_kv_pairs(n, kv3, "key3", "value3")); + CHK_Z(_set_bykey_kv_pairs(0, n, db, kv3)); + std::vector kv4(n); + CHK_Z(_init_kv_pairs(n, kv4, "key4", "value4")); + CHK_Z(_set_bykey_kv_pairs(0, n, db, kv4)); CHK_Z(db->sync()); CHK_Z(db->flushLogs(jungle::FlushOptions())); - - // Check both. CHK_Z(_get_bykey_check(0, n, db, kv)); CHK_Z(_get_bykey_check(0, n, db, kv2)); + CHK_Z(_get_bykey_check(0, n, db, kv3)); + CHK_Z(_get_bykey_check(0, n, db, kv4)); CHK_Z(jungle::DB::close(db)); + // Decrease the number of partitions, it should be handle correctly internally. + config.numL0Partitions = 2; // Reopen & check. CHK_Z(jungle::DB::open(&db, filename, config)); CHK_Z(_get_bykey_check(0, n, db, kv)); CHK_Z(_get_bykey_check(0, n, db, kv2)); + CHK_Z(_get_bykey_check(0, n, db, kv3)); + CHK_Z(_get_bykey_check(0, n, db, kv4)); + + // Insert more, flush and check + std::vector kv5(n); + CHK_Z(_init_kv_pairs(n, kv5, "key5", "value5")); + CHK_Z(_set_bykey_kv_pairs(0, n, db, kv5)); + CHK_Z(db->sync()); + CHK_Z(db->flushLogs(jungle::FlushOptions())); + CHK_Z(_get_bykey_check(0, n, db, kv5)); + + // Wait 7 seconds. + // TestSuite::sleep_sec(7); CHK_Z(jungle::DB::close(db)); CHK_Z(jungle::shutdown()); _free_kv_pairs(n, kv); _free_kv_pairs(n, kv2); + _free_kv_pairs(n, kv3); + _free_kv_pairs(n, kv4); + _free_kv_pairs(n, kv5); TEST_SUITE_CLEANUP_PATH(); return 0;