Skip to content

Commit

Permalink
Fix log store read across chunks and device manager load of chunks. (#…
Browse files Browse the repository at this point in the history
…321)

Chunks getting duplciate start offset while recovery. Add or cleanup
logs for debugging. Add test case. Add more logdev for logstore test.
Enable logstore test. Move rollback test to new logdev test.
  • Loading branch information
sanebay authored Mar 11, 2024
1 parent 239ccb5 commit 2c500c5
Show file tree
Hide file tree
Showing 24 changed files with 788 additions and 358 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.1.12"
version = "5.1.14"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
5 changes: 4 additions & 1 deletion src/include/homestore/logstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
* @param cookie : Any cookie or context which will passed back in the callback
* @param cb Callback upon completion which is called with the status, seq_num and cookie that was passed.
*/
void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb);
void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb,
bool flush_wait = false);

/**
* @brief This method appends the blob into the log and it returns the generated seq number
Expand Down Expand Up @@ -210,6 +211,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
return (ts == std::numeric_limits< logstore_seq_num_t >::max()) ? -1 : ts;
}

sisl::StreamTracker< logstore_record >& log_records() { return m_records; }

/**
* @brief iterator to get all the log buffers;
*
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/logstore/log_store_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ struct logstore_req {
bool is_internal_req; // If the req is created internally by HomeLogStore itself
log_req_comp_cb_t cb; // Callback upon completion of write (overridden than default)
Clock::time_point start_time;
bool flush_wait{false}; // Wait for the flush to happen

logstore_req(const logstore_req&) = delete;
logstore_req& operator=(const logstore_req&) = delete;
Expand Down
1 change: 1 addition & 0 deletions src/lib/device/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class DeviceManager {

std::vector< PhysicalDev* > get_pdevs_by_dev_type(HSDevType dtype) const;
std::vector< shared< VirtualDev > > get_vdevs() const;
std::vector< shared< Chunk > > get_chunks() const;

uint64_t total_capacity() const;
uint64_t total_capacity(HSDevType dtype) const;
Expand Down
10 changes: 10 additions & 0 deletions src/lib/device/device_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,16 @@ std::vector< shared< VirtualDev > > DeviceManager::get_vdevs() const {
return ret_v;
}

std::vector< shared< Chunk > > DeviceManager::get_chunks() const {
std::unique_lock lg{m_vdev_mutex};
std::vector< shared< Chunk > > res;
res.reserve(m_chunks.size());
for (auto& chunk : m_chunks) {
if (chunk) res.push_back(chunk);
}
return res;
}

// Some of the hs_super_blk details
uint64_t hs_super_blk::vdev_super_block_size() { return (hs_super_blk::MAX_VDEVS_IN_SYSTEM * vdev_info::size); }

Expand Down
11 changes: 8 additions & 3 deletions src/lib/device/hs_super_blk.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <map>
#include <vector>

#include <iomgr/iomgr_flip.hpp>
#include <iomgr/iomgr.hpp>
#include <sisl/fds/sparse_vector.hpp>
#include <homestore/homestore_decl.hpp>
Expand Down Expand Up @@ -193,9 +194,13 @@ class hs_super_blk {
return (dinfo.dev_type == HSDevType::Fast) ? EXTRA_SB_SIZE_FOR_FAST_DEVICE : EXTRA_SB_SIZE_FOR_DATA_DEVICE;
}
static uint32_t max_chunks_in_pdev(const dev_info& dinfo) {
return (dinfo.dev_size - 1) /
((dinfo.dev_type == HSDevType::Fast) ? MIN_CHUNK_SIZE_FAST_DEVICE : MIN_CHUNK_SIZE_DATA_DEVICE) +
1;
uint64_t min_chunk_size =
(dinfo.dev_type == HSDevType::Fast) ? MIN_CHUNK_SIZE_FAST_DEVICE : MIN_CHUNK_SIZE_DATA_DEVICE;
#ifdef _PRERELEASE
auto chunk_size = iomgr_flip::instance()->get_test_flip< long >("set_minimum_chunk_size");
if (chunk_size) { min_chunk_size = chunk_size.get(); }
#endif
return (dinfo.dev_size - 1) / min_chunk_size + 1;
}
};

Expand Down
138 changes: 93 additions & 45 deletions src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo
VirtualDev{dmgr, vinfo, std::move(event_cb), false /* is_auto_recovery */} {

// Private data stored when chunks are created.
init_private_data = std::make_shared< JournalChunkPrivate >();
m_init_private_data = std::make_shared< JournalChunkPrivate >();
m_chunk_pool = std::make_unique< ChunkPool >(
dmgr,
ChunkPool::Params{
HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity),
[this]() {
init_private_data->created_at = get_time_since_epoch_ms();
sisl::blob private_blob{r_cast< uint8_t* >(init_private_data.get()), sizeof(JournalChunkPrivate)};
m_init_private_data->created_at = get_time_since_epoch_ms();
m_init_private_data->end_of_chunk = m_vdev_info.chunk_size;
sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), sizeof(JournalChunkPrivate)};
return private_blob;
},
m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size});
Expand Down Expand Up @@ -84,17 +85,17 @@ void JournalVirtualDev::init() {
// Create descriptor for each logdev_id
auto journal_desc = std::make_shared< JournalVirtualDev::Descriptor >(*this, logdev_id);
m_journal_descriptors.emplace(logdev_id, journal_desc);
LOGDEBUGMOD(journalvdev, "Loading descriptor {}", logdev_id);
LOGINFOMOD(journalvdev, "Loading descriptor log_dev={}", logdev_id);
// Traverse the list starting from the head and add those chunks
// in order to the journal descriptor. next_chunk is stored in private_data.
// Last chunk will have next_chunk as 0.
auto chunk_num = head.chunk_num;
while (chunk_num != 0) {
auto& c = chunk_map[chunk_num];
RELEASE_ASSERT(c, "Invalid chunk found logdev {} chunk {}", logdev_id, chunk_num);
RELEASE_ASSERT(c, "Invalid chunk found log_dev={} chunk={}", logdev_id, c->to_string());
journal_desc->m_journal_chunks.push_back(c);
visited_chunks.insert(chunk_num);
LOGDEBUGMOD(journalvdev, "Loading chunk {} descriptor {}", chunk_num, logdev_id);
LOGINFOMOD(journalvdev, "Loading log_dev={} chunk={}", logdev_id, c->to_string());

// Increase the the total size.
journal_desc->m_total_size += c->size();
Expand All @@ -121,8 +122,8 @@ void JournalVirtualDev::init() {
*data = JournalChunkPrivate{};
update_chunk_private(chunk, data);

LOGDEBUGMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id,
next_chunk);
LOGINFOMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id,
next_chunk);
m_dmgr.remove_chunk_locked(chunk);
}

Expand All @@ -149,7 +150,11 @@ shared< JournalVirtualDev::Descriptor > JournalVirtualDev::open(logdev_id_t logd
return journal_desc;
}

LOGDEBUGMOD(journalvdev, "Opened log device descriptor {}", logdev_id);
LOGINFOMOD(journalvdev, "Opened journal vdev descriptor log_dev={}", logdev_id);
for (auto& chunk : it->second->m_journal_chunks) {
LOGINFOMOD(journalvdev, " log_dev={} end_of_chunk={} chunk={}", logdev_id, get_end_of_chunk(chunk),
chunk->to_string());
}
return it->second;
}

Expand Down Expand Up @@ -181,18 +186,19 @@ void JournalVirtualDev::Descriptor::append_chunk() {
last_chunk_private->end_of_chunk = offset_in_chunk;
}
m_vdev.update_chunk_private(last_chunk, last_chunk_private);
LOGDEBUGMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->chunk_id(), last_chunk->chunk_id(),
to_string());
LOGINFOMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->to_string(), last_chunk->chunk_id(),
to_string());

} else {
// If the list is empty, update the new chunk as the head. Only head chunk contains the logdev_id.
auto* new_chunk_private = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(new_chunk->user_private()));
new_chunk_private->is_head = true;
new_chunk_private->logdev_id = m_logdev_id;
new_chunk_private->end_of_chunk = m_vdev.info().chunk_size;
// Append the new chunk
m_journal_chunks.push_back(new_chunk);
m_vdev.update_chunk_private(new_chunk, new_chunk_private);
LOGDEBUGMOD(journalvdev, "Added head chunk {} desc {}", new_chunk->chunk_id(), to_string());
LOGINFOMOD(journalvdev, "Added head chunk={} desc {}", new_chunk->to_string(), to_string());
}
}

Expand Down Expand Up @@ -231,7 +237,8 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) {

// assert that returnning logical offset is in good range
HS_DBG_ASSERT_LE(tail_off, m_end_offset);
LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} desc {}", to_hex(tail_off), to_string());
LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} tail_off {} size {} desc {}", to_hex(tail_off), tail_off, sz,
to_string());
return tail_off;
}

Expand Down Expand Up @@ -336,30 +343,45 @@ void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, o
size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) {
if (m_journal_chunks.empty()) { return 0; }

HS_REL_ASSERT_LE(m_seek_cursor, m_end_offset, "seek_cursor {} exceeded end_offset {}", m_seek_cursor, m_end_offset);
if (m_seek_cursor >= m_end_offset) {
LOGTRACEMOD(journalvdev, "sync_next_read reached end of chunks");
return 0;
}

auto [chunk, _, offset_in_chunk] = offset_to_chunk(m_seek_cursor);
auto const end_of_chunk = m_vdev.get_end_of_chunk(chunk);
auto const chunk_size = std::min< uint64_t >(end_of_chunk, chunk->size());
bool across_chunk{false};

// LOGINFO("sync_next_read size_rd {} chunk {} seek_cursor {} end_of_chunk {} {}", size_rd, chunk->to_string(),
// m_seek_cursor, end_of_chunk, chunk_size);

HS_REL_ASSERT_LE((uint64_t)end_of_chunk, chunk->size(), "Invalid end of chunk: {} detected on chunk num: {}",
end_of_chunk, chunk->chunk_id());
HS_REL_ASSERT_LE((uint64_t)offset_in_chunk, chunk->size(),
"Invalid m_seek_cursor: {} which falls in beyond end of chunk: {}!", m_seek_cursor, end_of_chunk);

// if read size is larger then what's left in this chunk
if (size_rd >= (chunk->size() - offset_in_chunk)) {
if (size_rd >= (end_of_chunk - offset_in_chunk)) {
// truncate size to what is left;
size_rd = chunk->size() - offset_in_chunk;
size_rd = end_of_chunk - offset_in_chunk;
across_chunk = true;
}

if (buf == nullptr) { return size_rd; }

auto ec = sync_pread(buf, size_rd, m_seek_cursor);
// TODO: Check if we can have tolerate this error and somehow start homestore without replaying or in degraded mode?
HS_REL_ASSERT(!ec, "Error in reading next stream of bytes, proceeding could cause some inconsistency, exiting");

// Update seek cursor after read;
m_seek_cursor += size_rd;
if (across_chunk) { m_seek_cursor += (chunk->size() - end_of_chunk); }
if (across_chunk) {
m_seek_cursor += (chunk->size() - end_of_chunk);
LOGTRACEMOD(journalvdev, "Across size_rd {} chunk {} seek_cursor {} end_of_chunk {}", size_rd,
chunk->to_string(), m_seek_cursor, end_of_chunk);
}
return size_rd;
}

Expand Down Expand Up @@ -412,37 +434,51 @@ off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) {
break;
}

LOGINFOMOD(journalvdev, "lseek desc {} offset 0x{} whence {} ", to_string(), to_hex(offset), whence);
return m_seek_cursor;
}

/**
* @brief :- it returns the vdev offset after nbytes from start offset
*/
off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const {
if (m_journal_chunks.empty()) { return data_start_offset(); }
if (nbytes == 0 || m_journal_chunks.empty()) {
// If no chunks return start offset.
return data_start_offset();
}

off_t vdev_offset = data_start_offset();
uint32_t dev_id{0}, chunk_id{0};
off_t offset_in_chunk{0};
off_t cur_read_cur{0};

while (cur_read_cur != nbytes) {
auto [chunk, _, offset_in_chunk] = offset_to_chunk(vdev_offset);

auto const end_of_chunk = m_vdev.get_end_of_chunk(chunk);
auto const chunk_size = std::min< uint64_t >(end_of_chunk, chunk->size());
auto const remaining = nbytes - cur_read_cur;
if (remaining >= (static_cast< off_t >(chunk->size()) - offset_in_chunk)) {
cur_read_cur += (chunk->size() - offset_in_chunk);
vdev_offset += (chunk->size() - offset_in_chunk);
} else {
auto chunk_size = m_vdev.info().chunk_size;
uint64_t remaining = nbytes;
auto start_offset = data_start_offset() % chunk_size;

// data_start_offset coulde be anywhere in the first chunk.
// because when we truncate and data_start_offset lies in first chunk
// we dont delete that first chunk. other chunks will have start_offset as 0.
for (auto chunk : m_journal_chunks) {
uint64_t end_of_chunk = std::min< uint64_t >(m_vdev.get_end_of_chunk(chunk), chunk_size);

auto num_data_bytes = end_of_chunk - start_offset;
if (remaining < num_data_bytes) {
vdev_offset += remaining;
cur_read_cur = nbytes;
break;
}

remaining -= num_data_bytes;
vdev_offset += (chunk_size - start_offset);
start_offset = 0;
}
return vdev_offset;
}

void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) {
m_data_start_offset = offset;
auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size);
m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size;
LOGINFOMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string());
RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}", to_string());
}

off_t JournalVirtualDev::Descriptor::tail_offset(bool reserve_space_include) const {
off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed));
if (reserve_space_include) { tail += m_reserved_sz; }
Expand All @@ -456,13 +492,13 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) {

if (tail >= start) {
m_write_sz_in_total.store(tail - start, std::memory_order_relaxed);
} else {
RELEASE_ASSERT(false, "tail {} less than start offset {}", tail, start);
} else if (tail != 0) {
LOGERROR("tail {} less than start offset {} desc {}", tail, start, to_string());
RELEASE_ASSERT(false, "Invalid tail offset");
}
lseek(tail);

LOGDEBUGMOD(journalvdev, "tail arg 0x{} desc {} ", to_hex(tail), to_string());
HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation 0x{} : {}", tail_offset(), tail);
LOGINFOMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string());
}

void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
Expand Down Expand Up @@ -510,8 +546,11 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
*data = JournalChunkPrivate{};
m_vdev.update_chunk_private(chunk, data);

// We ideally want to zero out chunks as chunks are reused after free across
// logdev's. But zero out chunk is very expensive, We look at crc mismatches
// to know the end offset of the log dev during recovery.
// Format and add back to pool.
m_vdev.m_chunk_pool->enqueue(chunk);
HS_PERIODIC_LOG(TRACE, journalvdev, "adding chunk {} back to pool desc {}", chunk->chunk_id(), to_string());
}

// Update our start offset, to keep track of actual size
Expand Down Expand Up @@ -561,7 +600,8 @@ uint64_t JournalVirtualDev::Descriptor::logical_to_dev_offset(off_t log_offset,
}
#endif

std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset) const {
std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset,
bool check) const {
uint64_t chunk_aligned_offset = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size);
uint64_t off_l{static_cast< uint64_t >(log_offset) - chunk_aligned_offset};
uint32_t index = 0;
Expand All @@ -574,10 +614,17 @@ std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::of
}
}

HS_DBG_ASSERT(false, "Input log_offset is invalid: {}", log_offset);
if (check) { HS_DBG_ASSERT(false, "Input log_offset is invalid: {} {}", log_offset, to_string()); }
return {nullptr, 0L, 0L};
}

bool JournalVirtualDev::Descriptor::is_offset_at_last_chunk(off_t bytes_offset) {
auto [chunk, chunk_index, _] = offset_to_chunk(bytes_offset, false);
if (chunk == nullptr) return true;
if (chunk_index == m_journal_chunks.size() - 1) { return true; }
return false;
}

void JournalVirtualDev::Descriptor::high_watermark_check() {
if (resource_mgr().check_journal_size(used_size(), size())) {
COUNTER_INCREMENT(m_vdev.m_metrics, vdev_high_watermark_count, 1);
Expand All @@ -598,7 +645,7 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const {

nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
nlohmann::json j;
j["logdev_id"] = m_logdev_id;
j["logdev"] = m_logdev_id;
j["seek_cursor"] = m_seek_cursor;
j["data_start_offset"] = m_data_start_offset;
j["end_offset"] = m_end_offset;
Expand All @@ -613,7 +660,7 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
nlohmann::json c;
auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private()));
c["chunk_id"] = chunk->chunk_id();
c["logdev_id"] = private_data->logdev_id;
c["logdev"] = private_data->logdev_id;
c["is_head"] = private_data->is_head;
c["end_of_chunk"] = private_data->end_of_chunk;
c["next_chunk"] = private_data->next_chunk;
Expand All @@ -627,12 +674,13 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
}

std::string JournalVirtualDev::Descriptor::to_string() const {
std::string str{fmt::format("id={};ds=0x{};end=0x{};writesz={};tail=0x{};"
off_t tail =
static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz;
std::string str{fmt::format("log_dev={};ds=0x{};end=0x{};writesz={};tail=0x{};"
"rsvdsz={};chunks={};trunc={};total={};seek=0x{} ",
m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset),
m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail_offset()),
m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size,
to_hex(m_seek_cursor))};
m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz,
m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))};
return str;
}

Expand Down
Loading

0 comments on commit 2c500c5

Please sign in to comment.