Skip to content

Commit

Permalink
Fix truncation issues and add long running log store tests.
Browse files Browse the repository at this point in the history
Fix truncation issues on boundary cases. Release chunks
if truncate cross end of chunk boundaries.
Enable logstore test except the parallel write and truncate
test case. Truncate can cause data start to go to next chunk
start offset. Change truncate api to return that offset.
  • Loading branch information
sanebay committed May 21, 2024
1 parent 8e9e6a6 commit 54dd65e
Show file tree
Hide file tree
Showing 11 changed files with 824 additions and 97 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.8"
version = "6.4.9"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
151 changes: 110 additions & 41 deletions src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo
m_init_private_data = std::make_shared< JournalChunkPrivate >();
m_chunk_pool = std::make_unique< ChunkPool >(
dmgr,
ChunkPool::Params{
HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity),
[this]() {
m_init_private_data->created_at = get_time_since_epoch_ms();
m_init_private_data->end_of_chunk = m_vdev_info.chunk_size;
sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), sizeof(JournalChunkPrivate)};
return private_blob;
},
m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size});
ChunkPool::Params{HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity),
[this]() {
m_init_private_data->created_at = get_time_since_epoch_ms();
m_init_private_data->end_of_chunk = m_vdev_info.chunk_size;
sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()),
sizeof(JournalChunkPrivate)};
return private_blob;
},
m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size});

resource_mgr().register_journal_vdev_exceed_cb([this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) {
// either it is critical or non-critical, call cp_flush;
Expand Down Expand Up @@ -152,6 +152,19 @@ void JournalVirtualDev::remove_journal_chunks(std::vector< shared< Chunk > >& ch
}
}

void JournalVirtualDev::release_chunk_to_pool(shared< Chunk > chunk) {
// Clear the private chunk data before adding to pool.
auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private()));
*data = JournalChunkPrivate{};
update_chunk_private(chunk, data);

// We ideally want to zero out chunks as chunks are reused after free across
// logdev's. But zero out chunk is very expensive, We look at crc mismatches
// to know the end offset of the log dev during recovery.
m_chunk_pool->enqueue(chunk);
LOGINFOMOD(journalvdev, "Released chunk to pool {}", chunk->to_string());
}

void JournalVirtualDev::update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* private_data) {
sisl::blob private_blob{r_cast< uint8_t* >(private_data), sizeof(JournalChunkPrivate)};
chunk->set_user_private(private_blob);
Expand All @@ -172,7 +185,7 @@ shared< JournalVirtualDev::Descriptor > JournalVirtualDev::open(logdev_id_t logd

LOGINFOMOD(journalvdev, "Opened journal vdev descriptor log_dev={}", logdev_id);
for (auto& chunk : it->second->m_journal_chunks) {
LOGINFOMOD(journalvdev, " log_dev={} end_of_chunk={} chunk={}", logdev_id, get_end_of_chunk(chunk),
LOGINFOMOD(journalvdev, "log_dev={} end_of_chunk={} chunk {}", logdev_id, to_hex(get_end_of_chunk(chunk)),
chunk->to_string());
}
return it->second;
Expand Down Expand Up @@ -242,7 +255,7 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) {

if ((tail_offset() + static_cast< off_t >(sz)) >= m_end_offset) {
// not enough space left, add a new chunk.
LOGDEBUGMOD(journalvdev, "No space left for size {} Creating chunk desc {}", sz, to_string());
LOGINFOMOD(journalvdev, "No space left for size {} Creating chunk desc {}", sz, to_string());

#ifdef _PRERELEASE
iomgr_flip::test_and_abort("abort_before_update_eof_cur_chunk");
Expand Down Expand Up @@ -373,13 +386,13 @@ void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, o
}

/////////////////////////////// Read Section //////////////////////////////////
size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) {
if (m_journal_chunks.empty()) { return 0; }
int64_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) {
if (m_journal_chunks.empty()) { return -1; }

HS_REL_ASSERT_LE(m_seek_cursor, m_end_offset, "seek_cursor {} exceeded end_offset {}", m_seek_cursor, m_end_offset);
if (m_seek_cursor >= m_end_offset) {
LOGTRACEMOD(journalvdev, "sync_next_read reached end of chunks");
return 0;
return -1;
}

auto [chunk, _, offset_in_chunk] = offset_to_chunk(m_seek_cursor);
Expand All @@ -400,6 +413,10 @@ size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_r
// truncate size to what is left;
size_rd = end_of_chunk - offset_in_chunk;
across_chunk = true;
if (size_rd == 0) {
// If there are no more data in the current chunk, move the seek_cursor to the next chunk.
m_seek_cursor += (chunk->size() - end_of_chunk);
}
}

if (buf == nullptr) { return size_rd; }
Expand Down Expand Up @@ -542,11 +559,9 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) {
LOGINFOMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string());
}

void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
const off_t ds_off = data_start_offset();

COUNTER_INCREMENT(m_vdev.m_metrics, vdev_truncate_count, 1);

HS_PERIODIC_LOG(DEBUG, journalvdev, "truncating to logical offset: 0x{} desc {}", to_hex(truncate_offset),
to_string());

Expand All @@ -559,54 +574,108 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
}

// Find the chunk which has the truncation offset. This will be the new
// head chunk in the list. We first update the is_head is true of this chunk.
// head chunk in the list. We first update the is_head to true for that new head chunk.
// So if a crash happens after this, we could have two chunks which has is_head
// true in the list and during recovery we select head with the highest creation
// timestamp and reuse or cleanup the other.
auto [new_head_chunk, _, offset_in_chunk] = offset_to_chunk(truncate_offset);
// timestamp and cleanup the other. We check if the truncation offset happens to be
// between the end_of_chunk mark and actual chunk end. In that case there is not data
// in that chunk and we release this chunk and select the next chunk.
bool update_truncate_offset = false;
auto [new_head_chunk, index, offset_in_chunk] = offset_to_chunk(truncate_offset);
auto end_new_head_chunk = m_vdev.get_end_of_chunk(new_head_chunk);
if (offset_in_chunk >= static_cast< int64_t >(end_new_head_chunk)) {
// If the truncation offset is same as end_of_chunk, we dont have any more
// data in this chunk to read. Select the next chunk.
if (++index <= (m_journal_chunks.size() - 1)) {
// Go to the next chunk to make it the new head chunk.
new_head_chunk = m_journal_chunks[index];
update_truncate_offset = true;
size_to_truncate += (new_head_chunk->size() - end_new_head_chunk);
}
}

auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(new_head_chunk->user_private()));
private_data->is_head = true;
private_data->logdev_id = m_logdev_id;
m_vdev.update_chunk_private(new_head_chunk, private_data);

// Find all chunks which needs to be removed from the start of m_journal_chunks.
// We stop till the truncation offset. Start from the old data_start_offset.
// Align the data_start_offset to the chunk_size as we deleting chunks and
// all chunks are same size in a journal vdev.
uint32_t start = sisl::round_down(ds_off, m_vdev.info().chunk_size);
// All the chunks which are covered inside the truncate offset are released.
// cover_offset is a logical offset.
index = 0;
off_t tail_off =
static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz;
auto chunk_size = m_vdev.info().chunk_size;
LOGINFOMOD(journalvdev, "Truncate begin truncate {} desc {}", to_hex(truncate_offset), to_string());

#ifdef _PRERELEASE
for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end(); ++it) {
auto chunk = *it;
auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private()));
LOGINFOMOD(journalvdev, "log_dev={} chunk_id={} is_head={}", m_logdev_id, chunk->chunk_id(),
private_data->is_head)
}
#endif

off_t cover_offset = sisl::round_down(data_start_offset(), chunk_size);
auto total_num_chunks = m_journal_chunks.size();
for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end();) {
auto chunk = *it;
start += chunk->size();
off_t end_of_chunk = m_vdev.get_end_of_chunk(chunk);
off_t chunk_hole = 0;
if (index == total_num_chunks - 1) {
// If its the last chunk, only read upto the tail_offset
// There are no holes in the last chunk.
cover_offset += (tail_off % chunk_size);
} else {
// For other chunks take the whole size.
cover_offset += chunk_size;
chunk_hole = (chunk_size - end_of_chunk);
}

// Also if its the last chunk and there is no data after truncate, we release chunk.
auto write_sz_in_total = m_write_sz_in_total.load(std::memory_order_relaxed);
if (start >= truncate_offset) { break; }
index++;

m_total_size -= chunk->size();
it = m_journal_chunks.erase(it);
// Check if the offset is inside the truncate offset. We also check if the truncate offset lies
// between end_of_chunk - chunk_size. If either condition satifies, we release the chunks.
if (cover_offset <= truncate_offset || ((cover_offset - chunk_hole) <= truncate_offset)) {
m_total_size -= chunk->size();
it = m_journal_chunks.erase(it);

// Clear the private chunk data before adding to pool.
auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private()));
*data = JournalChunkPrivate{};
m_vdev.update_chunk_private(chunk, data);
// Release the chunk back to pool.
LOGINFOMOD(journalvdev,
"Released chunk_id={} log_dev={} cover={} truncate_offset={} tail={} end_of_chunk={} desc {}",
chunk->chunk_id(), m_logdev_id, to_hex(cover_offset), to_hex(truncate_offset), to_hex(tail_off),
m_vdev.get_end_of_chunk(chunk), to_string());
m_vdev.release_chunk_to_pool(chunk);
} else {
++it;
}
}

// We ideally want to zero out chunks as chunks are reused after free across
// logdev's. But zero out chunk is very expensive, We look at crc mismatches
// to know the end offset of the log dev during recovery.
// Format and add back to pool.
m_vdev.m_chunk_pool->enqueue(chunk);
LOGINFOMOD(journalvdev, "After truncate released chunk {}", chunk->to_string());
if (update_truncate_offset) {
// Update the truncate offset to align with the chunk size.
truncate_offset = sisl::round_up(truncate_offset, m_vdev.info().chunk_size);
}

// Update our start offset, to keep track of actual size
HS_REL_ASSERT_LE(truncate_offset, m_end_offset, "truncate offset less than end offset");
update_data_start_offset(truncate_offset);

// update in-memory total write size counter;
m_write_sz_in_total.fetch_sub(size_to_truncate, std::memory_order_relaxed);
m_truncate_done = true;

HS_PERIODIC_LOG(INFO, journalvdev, "After truncate desc {}", to_string());
#ifdef _PRERELEASE
for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end(); ++it) {
auto chunk = *it;
auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private()));
LOGINFOMOD(journalvdev, "log_dev={} chunk_id={} is_head={}", m_logdev_id, chunk->chunk_id(),
private_data->is_head)
}
#endif

LOGINFOMOD(journalvdev, "Truncate end truncate {} desc {}", to_hex(truncate_offset), to_string());
return data_start_offset();
}

#if 0
Expand Down
8 changes: 5 additions & 3 deletions src/lib/device/journal_vdev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ class JournalVirtualDev : public VirtualDev {
*
* @return : On success, the number of bytes read is returned (zero indicates end of file), and the cursor is
* advanced by this number. it is not an error if this number is smaller than the number requested, because it
* can be end of chunk, since read won't across chunk.
* can be end of chunk, since read won't across chunk. Returns -1 if there are no bytes available to read.
*/
size_t sync_next_read(uint8_t* buf, size_t count_in);
int64_t sync_next_read(uint8_t* buf, size_t count_in);

/**
* @brief : reads up to count bytes at offset into the buffer starting at buf.
Expand Down Expand Up @@ -275,8 +275,9 @@ class JournalVirtualDev : public VirtualDev {
* 1. update in-memory counter of total write size.
* 2. update vdev superblock of the new start logical offset that is being truncate to;
*
* @return : return the new data start offset after truncation.
*/
void truncate(off_t truncate_offset);
off_t truncate(off_t truncate_offset);

/**
* @brief : get the total size in journal
Expand Down Expand Up @@ -416,6 +417,7 @@ class JournalVirtualDev : public VirtualDev {
uint64_t num_descriptors() const { return m_journal_descriptors.size(); }

void remove_journal_chunks(std::vector< shared< Chunk > >& chunks);
void release_chunk_to_pool(shared< Chunk > chunk);
void update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* chunk_private);
uint64_t get_end_of_chunk(shared< Chunk >& chunk) const;

Expand Down
16 changes: 11 additions & 5 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_nu

log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_record_header) {
auto buf = sisl::make_byte_array(initial_read_size, m_flush_size_multiple, sisl::buftag::logread);
m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset);
auto ec = m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset);
if (ec) {
LOGERROR("Failed to read from ournal vdev log_dev={} {} {}", m_logdev_id, ec.value(), ec.message());
return {};
}

auto* header = r_cast< const log_group_header* >(buf->cbytes());
// THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header);
Expand Down Expand Up @@ -571,15 +575,15 @@ uint64_t LogDev::truncate(const logdev_key& key) {
"Truncating log device upto log_dev={} log_id={} vdev_offset={} truncated {} log records",
m_logdev_id, key.idx, key.dev_offset, num_records_to_truncate);
m_log_records->truncate(key.idx);
m_vdev_jd->truncate(key.dev_offset);
THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate done {} ", key.idx);
off_t new_offset = m_vdev_jd->truncate(key.dev_offset);
THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate done {} offset old {} new {}", key.idx, key.dev_offset, new_offset);
m_last_truncate_idx = key.idx;

{
std::unique_lock< std::mutex > lk{m_meta_mutex};

// Update the start offset to be read upon restart
m_logdev_meta.set_start_dev_offset(key.dev_offset, key.idx + 1, false /* persist_now */);
m_logdev_meta.set_start_dev_offset(new_offset, key.idx + 1, false /* persist_now */);

// Now that store is truncated, we can reclaim the store ids which are garbaged (if any) earlier
#ifdef _PRERELEASE
Expand Down Expand Up @@ -658,7 +662,7 @@ std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) {
HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id);
m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode}));
}
LOGINFO("Created log store log_dev={} log_store={}", m_logdev_id, store_id);
HS_LOG(INFO, logstore, "Created log store log_dev={} log_store={}", m_logdev_id, store_id);
return lstore;
}

Expand Down Expand Up @@ -934,7 +938,9 @@ void LogDevMetadata::reset() {
}

void LogDevMetadata::logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) {

m_sb.load(buf, meta_cookie);
LOGINFO("Logdev superblk found log_dev={}", m_sb->logdev_id);
HS_REL_ASSERT_EQ(m_sb->get_magic(), logdev_superblk::LOGDEV_SB_MAGIC, "Invalid logdev metablk, magic mismatch");
HS_REL_ASSERT_EQ(m_sb->get_version(), logdev_superblk::LOGDEV_SB_VERSION, "Invalid version of logdev metablk");
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/logstore/log_dev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ class log_stream_reader {
sisl::byte_view group_in_next_page();

private:
sisl::byte_view read_next_bytes(uint64_t nbytes);
sisl::byte_view read_next_bytes(uint64_t nbytes, bool& end_of_stream);

private:
JournalVirtualDev* m_vdev;
Expand Down
12 changes: 6 additions & 6 deletions src/lib/logstore/log_store_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ logdev_id_t LogStoreService::create_new_logdev() {
auto logdev = create_new_logdev_internal(logdev_id);
logdev->start(true /* format */);
COUNTER_INCREMENT(m_metrics, logdevs_count, 1);
LOGINFO("Created log_dev={}", logdev_id);
HS_LOG(INFO, logstore, "Created log_dev={}", logdev_id);
return logdev_id;
}

Expand All @@ -149,12 +149,12 @@ void LogStoreService::destroy_log_dev(logdev_id_t logdev_id) {

m_id_logdev_map.erase(it);
COUNTER_DECREMENT(m_metrics, logdevs_count, 1);
LOGINFO("Removed log_dev={}", logdev_id);
HS_LOG(INFO, logstore, "Removed log_dev={}", logdev_id);
}

void LogStoreService::delete_unopened_logdevs() {
for (auto logdev_id : m_unopened_logdev) {
LOGINFO("Deleting unopened log_dev={}", logdev_id);
HS_LOG(INFO, logstore, "Deleting unopened log_dev={}", logdev_id);
destroy_log_dev(logdev_id);
}
m_unopened_logdev.clear();
Expand All @@ -178,7 +178,7 @@ void LogStoreService::open_logdev(logdev_id_t logdev_id) {
LOGDEBUGMOD(logstore, "log_dev={} does not exist, created!", logdev_id);
}
m_unopened_logdev.erase(logdev_id);
LOGDEBUGMOD(logstore, "Opened log_dev={}", logdev_id);
HS_LOG(INFO, logstore, "Opened log_dev={}", logdev_id);
}

std::vector< std::shared_ptr< LogDev > > LogStoreService::get_all_logdevs() {
Expand Down Expand Up @@ -206,7 +206,7 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
std::shared_ptr< LogDev > logdev;
auto id = sb->logdev_id;
LOGDEBUGMOD(logstore, "Log dev superblk found logdev={}", id);
HS_LOG(DEBUG, logstore, "Log dev superblk found logdev={}", id);
const auto it = m_id_logdev_map.find(id);
// We could update the logdev map either with logdev or rollback superblks found callbacks.
if (it != m_id_logdev_map.end()) {
Expand Down Expand Up @@ -235,7 +235,7 @@ void LogStoreService::rollback_super_blk_found(const sisl::byte_view& buf, void*
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
std::shared_ptr< LogDev > logdev;
auto id = rollback_sb->logdev_id;
LOGDEBUGMOD(logstore, "Log dev rollback superblk found logdev={}", id);
HS_LOG(DEBUG, logstore, "Log dev rollback superblk found logdev={}", id);
const auto it = m_id_logdev_map.find(id);
HS_REL_ASSERT((it != m_id_logdev_map.end()),
"found a rollback_super_blk of logdev id {}, but the logdev with id {} doesnt exist", id);
Expand Down
Loading

0 comments on commit 54dd65e

Please sign in to comment.