Skip to content

Commit

Permalink
Support flexiable num of L0 (blocking approach)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zexi Liu authored and ZexiLiu committed Oct 9, 2024
1 parent 5473c1b commit 0bba004
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 24 deletions.
9 changes: 5 additions & 4 deletions src/table_compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ namespace jungle {
//
Status TableMgr::compactLevelItr(const CompactOptions& options,
TableInfo* victim_table,
size_t level)
{
size_t level,
bool adjusting_num_l0) {
if (level >= mani->getNumLevels()) return Status::INVALID_LEVEL;

Status s;
Expand Down Expand Up @@ -376,7 +376,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options,
// 1) number of files after split, and
// 2) min keys for each new file.
do {
if (!isCompactionAllowed()) {
if (!isCompactionAllowed() && !adjusting_num_l0) {
throw Status(Status::COMPACTION_CANCELLED);
}

Expand Down Expand Up @@ -445,6 +445,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options,
? &twh.leasedWriters[worker_idx]->writerArgs
: &local_args;
w_args->callerAwaiter.reset();
w_args->adjustingNumL0 = adjusting_num_l0;

uint64_t count = (jj + 1 == num_new_tables)
? offsets.size() - new_tables[jj]->index
Expand Down Expand Up @@ -482,7 +483,7 @@ Status TableMgr::compactLevelItr(const CompactOptions& options,
}
}

if (!isCompactionAllowed()) {
if (!isCompactionAllowed() && !adjusting_num_l0) {
// NOTE: keys will be freed below.
for (LsmFlushResult& rr: results) delete rr.tFile;
throw Status(Status::COMPACTION_CANCELLED);
Expand Down
64 changes: 58 additions & 6 deletions src/table_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,6 @@ Status TableMgr::init(const TableMgrOptions& _options) {
opt.compressionEnabled = true;
}

compactStatus.resize(db_config->numL0Partitions);
for (size_t ii=0; ii<compactStatus.size(); ++ii) {
std::atomic<bool>*& entry = compactStatus[ii];
entry = new std::atomic<bool>(false);
}

Status s;
mani = new TableManifest(this, opt.fOps);
mani->setLogger(myLog);
Expand Down Expand Up @@ -252,6 +246,57 @@ Status TableMgr::init(const TableMgrOptions& _options) {
}
}

// Adjust num L0 partitions blockingly only when it is not logSectionOnly mdoe,
// not read only mode, and the number of L0 partitions read from existing manifest
// file is different from the number specified in db config.
if (!db_config->logSectionOnly && !db_config->readOnly
&& numL0Partitions != db_config->numL0Partitions) {
_log_info(myLog,
"adjust numL0 partitions: %zu -> %zu",
numL0Partitions,
db_config->numL0Partitions);

if (!db_config->nextLevelExtension) {
_log_err(myLog, "[Adjust numL0] not allowed in L0 only mode");
throw Status(Status::INVALID_CONFIG);
}

// Need to compact all existing L0 tables to L1 and recreate empty L0 tables,
// otherwise hash will be messed up.
for (size_t ii = 0; ii < numL0Partitions; ++ii) {
// Force compact L0 table to L1 in blocking manner to reduce L0
// partitions.
std::list<TableInfo*> tables;
s = mani->getL0Tables(ii, tables);
if (tables.size() != 1 || !s) {
_log_err(myLog, "[Adjust numL0] tables of hash %zu not found", ii);
throw s;
}
s = compactLevelItr(CompactOptions(), tables.back(), 0, true);
if (!s) {
_log_err(myLog, "[Adjust numL0] compaction error: %d", s);
throw s;
}
// The compacted table is remove from manifest in compactLevelItr,
// just release
for (TableInfo*& table: tables) {
table->done();
}
}
for (size_t ii = 0; ii < db_config->numL0Partitions; ++ii) {
TableFile* t_file = nullptr;
TableFileOptions t_opt;
// Set 1M bits as an initial size.
// It will be converging to some value as compaction happens.
t_opt.bloomFilterSize = 1024 * 1024;
EP(createNewTableFile(0, t_file, t_opt));
EP(mani->addTableFile(0, ii, SizedBuf(), t_file));
}
// Store manifest file.
mani->store(true);
numL0Partitions = db_config->numL0Partitions;
}

} else {
// Not exist, initial setup phase.

Expand Down Expand Up @@ -287,6 +332,13 @@ Status TableMgr::init(const TableMgrOptions& _options) {
// Store manifest file.
mani->store(true);
}

compactStatus.resize(numL0Partitions);
for (size_t ii = 0; ii < compactStatus.size(); ++ii) {
std::atomic<bool>*& entry = compactStatus[ii];
entry = new std::atomic<bool>(false);
}

logTableSettings(db_config);

removeStaleFiles();
Expand Down
6 changes: 4 additions & 2 deletions src/table_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ class TableMgr {

Status compactLevelItr(const CompactOptions& options,
TableInfo* victim_table,
size_t level);
size_t level,
bool adjust_num_l0 = false);

Status migrateLevel(const CompactOptions& options,
size_t level);
Expand Down Expand Up @@ -300,7 +301,8 @@ class TableMgr {
TableFile* dst_file,
std::vector<uint64_t>& offsets,
uint64_t start_index,
uint64_t count);
uint64_t count,
bool adjusting_num_l0 = false);

void setTableFileItrFlush(TableFile* dst_file,
std::list<Record*>& recs_batch,
Expand Down
6 changes: 3 additions & 3 deletions src/table_set_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ void TableMgr::setTableFileOffset( std::list<uint64_t>& checkpoints,
TableFile* dst_file,
std::vector<uint64_t>& offsets,
uint64_t start_index,
uint64_t count )
{
uint64_t count,
bool adjusting_num_l0 ) {
const DBConfig* db_config = getDbConfig();
(void)db_config;

Expand Down Expand Up @@ -86,7 +86,7 @@ void TableMgr::setTableFileOffset( std::list<uint64_t>& checkpoints,

try {
for (uint64_t ii = start_index; ii < start_index + count; ++ii) {
if (!isCompactionAllowed()) {
if (!isCompactionAllowed() && !adjusting_num_l0 ) {
// To avoid file corruption, we should flush all cached pages
// even for cancel.
Timer cancel_timer;
Expand Down
4 changes: 3 additions & 1 deletion src/table_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ TableWriterArgs::TableWriterArgs()
: writerId(0)
, stopSignal(false)
, myLog(nullptr)
, adjustingNumL0(false)
{}

void TableWriterArgs::invoke() {
Expand Down Expand Up @@ -170,7 +171,8 @@ void TableWriterMgr::doTableWrite(TableWriterArgs* args) {
args->payload.targetTableFile,
*args->payload.offsets,
args->payload.startIdx,
args->payload.count );
args->payload.count,
args->adjustingNumL0.load() );
}
}

Expand Down
1 change: 1 addition & 0 deletions src/table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ struct TableWriterArgs {
std::atomic<bool> stopSignal;
TableWritePayload payload;
SimpleLogger* myLog;
std::atomic<bool> adjustingNumL0;
};

struct TableWriterPkg {
Expand Down
41 changes: 33 additions & 8 deletions tests/jungle/basic_op_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1778,42 +1778,67 @@ int different_l0_partitions() {
config.numL0Partitions = 1;
CHK_Z(jungle::DB::open(&db, filename, config));
int n = 10;

// Insert, flush, and check
std::vector<jungle::KV> kv(n);
CHK_Z(_init_kv_pairs(n, kv, "key", "value"));
CHK_Z(_init_kv_pairs(n, kv, "key1", "value1"));
CHK_Z(_set_bykey_kv_pairs(0, n, db, kv));
CHK_Z(db->sync());
CHK_Z(db->flushLogs(jungle::FlushOptions()));
CHK_Z(_get_bykey_check(0, n, db, kv));
CHK_Z(jungle::DB::close(db));

// Change the number of partitions,
// but it should be ignored internally.
// Increase the number of partitions, it should be handle correctly internally.
config.numL0Partitions = 4;

// Reopen & check.
CHK_Z(jungle::DB::open(&db, filename, config));
CHK_Z(_get_bykey_check(0, n, db, kv));

// Insert more.
// Insert more, flush and check.
std::vector<jungle::KV> kv2(n);
CHK_Z(_init_kv_pairs(n, kv2, "key_new", "value_new"));
CHK_Z(_init_kv_pairs(n, kv2, "key2", "value2"));
CHK_Z(_set_bykey_kv_pairs(0, n, db, kv2));
std::vector<jungle::KV> kv3(n);
CHK_Z(_init_kv_pairs(n, kv3, "key3", "value3"));
CHK_Z(_set_bykey_kv_pairs(0, n, db, kv3));
std::vector<jungle::KV> kv4(n);
CHK_Z(_init_kv_pairs(n, kv4, "key4", "value4"));
CHK_Z(_set_bykey_kv_pairs(0, n, db, kv4));
CHK_Z(db->sync());
CHK_Z(db->flushLogs(jungle::FlushOptions()));

// Check both.
CHK_Z(_get_bykey_check(0, n, db, kv));
CHK_Z(_get_bykey_check(0, n, db, kv2));
CHK_Z(_get_bykey_check(0, n, db, kv3));
CHK_Z(_get_bykey_check(0, n, db, kv4));
CHK_Z(jungle::DB::close(db));

// Decrease the number of partitions, it should be handle correctly internally.
config.numL0Partitions = 2;
// Reopen & check.
CHK_Z(jungle::DB::open(&db, filename, config));
CHK_Z(_get_bykey_check(0, n, db, kv));
CHK_Z(_get_bykey_check(0, n, db, kv2));
CHK_Z(_get_bykey_check(0, n, db, kv3));
CHK_Z(_get_bykey_check(0, n, db, kv4));

// Insert more, flush and check
std::vector<jungle::KV> kv5(n);
CHK_Z(_init_kv_pairs(n, kv5, "key5", "value5"));
CHK_Z(_set_bykey_kv_pairs(0, n, db, kv5));
CHK_Z(db->sync());
CHK_Z(db->flushLogs(jungle::FlushOptions()));
CHK_Z(_get_bykey_check(0, n, db, kv5));

// Wait 7 seconds.
// TestSuite::sleep_sec(7);
CHK_Z(jungle::DB::close(db));

CHK_Z(jungle::shutdown());
_free_kv_pairs(n, kv);
_free_kv_pairs(n, kv2);
_free_kv_pairs(n, kv3);
_free_kv_pairs(n, kv4);
_free_kv_pairs(n, kv5);

TEST_SUITE_CLEANUP_PATH();
return 0;
Expand Down

0 comments on commit 0bba004

Please sign in to comment.