Skip to content

Commit

Permalink
Remove logdev which are unopened during recovery.
Browse files Browse the repository at this point in the history
Removing repl dev will remove the logdev. If logdev or log
store are not opened during, they are marked as ununsed and
deleted. Remove chunks from the journal descriptor.
Remove the logdev metablk.
  • Loading branch information
sanebay committed May 3, 2024
1 parent abeddaa commit 988fea7
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 41 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.1"
version = "6.4.2"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
9 changes: 9 additions & 0 deletions src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ class LogStoreService {
*/
void open_logdev(logdev_id_t logdev_id);

/**
* @brief Destroy a log dev.
*
* @param logdev_id: Logdev ID
*/
void destroy_log_dev(logdev_id_t logdev_id);

/**
* @brief Create a brand new log store (both in-memory and on device) and returns its instance. It also book
* keeps the created log store and user can get this instance of log store by using logstore_id
Expand Down Expand Up @@ -168,6 +175,7 @@ class LogStoreService {

private:
std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id);
void delete_unopened_logdevs();
void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie);
void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie);
void start_threads();
Expand All @@ -182,6 +190,7 @@ class LogStoreService {
iomgr::io_fiber_t m_truncate_fiber;
iomgr::io_fiber_t m_flush_fiber;
LogStoreServiceMetrics m_metrics;
std::unordered_set< logdev_id_t > m_unopened_logdev;
};

extern LogStoreService& logstore_service();
Expand Down
13 changes: 7 additions & 6 deletions src/lib/device/device_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ shared< VirtualDev > DeviceManager::create_vdev(vdev_parameters&& vparam) {
vparam.vdev_name, in_bytes(input_chunk_size), in_bytes(vparam.chunk_size));
}

vparam.vdev_size = sisl::round_down(vparam.vdev_size, vparam.chunk_size);
// For dynamic size vdev, size starts with zero.
vparam.vdev_size = 0;
if (input_vdev_size != vparam.vdev_size) {
LOGINFO("{} Virtual device is attempted to be created with size={}, it needs to be rounded to new_size={}",
vparam.vdev_name, in_bytes(input_vdev_size), in_bytes(vparam.vdev_size));
Expand All @@ -285,7 +286,6 @@ shared< VirtualDev > DeviceManager::create_vdev(vdev_parameters&& vparam) {
RELEASE_ASSERT(vparam.chunk_size >= min_chunk_size, "chunk_size should be greater than or equal to min_chunk_size");

RELEASE_ASSERT(vparam.num_chunks <= max_num_chunks, "num_chunks should be less than or equal to max_num_chunks");
RELEASE_ASSERT(input_vdev_size >= vparam.vdev_size, "vdev_size should be less than or equal to input_vdev_size");

LOGINFO(
"New Virtal Dev={} of size={} with id={} is attempted to be created with multi_pdev_opts={}. The params are "
Expand Down Expand Up @@ -419,7 +419,7 @@ shared< Chunk > DeviceManager::create_chunk(HSDevType dev_type, uint32_t vdev_id
pdev->write_super_block(buf, vdev_info::size, offset);
hs_utils::iobuf_free(buf, sisl::buftag::superblk);

HS_LOG(TRACE, device, "Created chunk id={} dev_type={} vdev_id={} size={}", chunk_id, (uint8_t)dev_type, vdev_id,
HS_LOG(DEBUG, device, "Created chunk_id={} dev_type={} vdev_id={} size={}", chunk_id, (uint8_t)dev_type, vdev_id,
chunk_size);
return chunk;
}
Expand Down Expand Up @@ -459,7 +459,7 @@ void DeviceManager::remove_chunk_locked(shared< Chunk > chunk) {
pdev->write_super_block(buf, vdev_info::size, offset);
hs_utils::iobuf_free(buf, sisl::buftag::superblk);

HS_LOG(TRACE, device, "Removed chunk id={} vdev_id={}", chunk_id, vdev_id);
HS_LOG(DEBUG, device, "Removed chunk_id={} vdev_id={}", chunk_id, vdev_id);
}

uint32_t DeviceManager::populate_pdev_info(const dev_info& dinfo, const iomgr::drive_attributes& attr,
Expand Down Expand Up @@ -512,6 +512,7 @@ static void populate_vdev_info(const vdev_parameters& vparam, uint32_t vdev_id,
out_info->set_user_private(vparam.context_data);
out_info->alloc_type = s_cast< uint8_t >(vparam.alloc_type);
out_info->chunk_sel_type = s_cast< uint8_t >(vparam.chunk_sel_type);
out_info->size_type = vparam.size_type;
out_info->compute_checksum();
}

Expand Down Expand Up @@ -609,7 +610,7 @@ void ChunkPool::start() {
m_run_pool = true;
}
m_producer_thread = std::thread(&ChunkPool::producer, this);
HS_LOG(INFO, device, "Starting chunk pool for vdev {}", m_params.vdev_id);
HS_LOG(INFO, device, "Starting chunk pool for vdev_id={}", m_params.vdev_id);
}

void ChunkPool::producer() {
Expand All @@ -634,7 +635,7 @@ void ChunkPool::producer() {
m_params.chunk_size, std::move(private_data));
RELEASE_ASSERT(chunk, "Cannot create chunk");
m_pool.push_back(chunk);
HS_LOG(TRACE, device, "Produced chunk to pool id {} type {} vdev {} size {}", chunk->chunk_id(),
HS_LOG(TRACE, device, "Produced chunk to pool chunk_id={} type={} vdev_id={} size {}", chunk->chunk_id(),
m_params.hs_dev_type, m_params.vdev_id, m_params.chunk_size);
m_pool_cv.notify_one();
}
Expand Down
34 changes: 27 additions & 7 deletions src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,19 @@ void JournalVirtualDev::init() {
if (!visited_chunks.count(chunk->chunk_id())) { orphan_chunks.push_back(chunk); }
}

for (auto& chunk : orphan_chunks) {
// Remove the orphan chunks.
if (!orphan_chunks.empty()) {
LOGINFOMOD(journalvdev, "Removing orphan chunks");
remove_journal_chunks(orphan_chunks);
}

// Start the chunk pool.
m_chunk_pool->start();
LOGINFO("Journal vdev init done");
}

void JournalVirtualDev::remove_journal_chunks(std::vector< shared< Chunk > >& chunks) {
for (auto& chunk : chunks) {
auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private()));
auto chunk_id = chunk->chunk_id();
auto logdev_id = data->logdev_id;
Expand All @@ -135,14 +147,9 @@ void JournalVirtualDev::init() {
*data = JournalChunkPrivate{};
update_chunk_private(chunk, data);

LOGINFOMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id,
next_chunk);
LOGINFOMOD(journalvdev, "Removing chunk {} found for logdev {} next {}.", chunk_id, logdev_id, next_chunk);
m_dmgr.remove_chunk_locked(chunk);
}

// Start the chunk pool.
m_chunk_pool->start();
LOGINFO("Journal vdev init done");
}

void JournalVirtualDev::update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* private_data) {
Expand Down Expand Up @@ -171,6 +178,19 @@ shared< JournalVirtualDev::Descriptor > JournalVirtualDev::open(logdev_id_t logd
return it->second;
}

void JournalVirtualDev::destroy(logdev_id_t logdev_id) {
auto it = m_journal_descriptors.find(logdev_id);
if (it == m_journal_descriptors.end()) {
LOGERROR("logdev not found log_dev={}", logdev_id);
return;
}

// Remove all the chunks.
remove_journal_chunks(it->second->m_journal_chunks);
m_journal_descriptors.erase(it);
LOGINFOMOD(journalvdev, "Journal vdev destroyed log_dev={}", logdev_id);
}

void JournalVirtualDev::Descriptor::append_chunk() {
// Get a new chunk from the pool.
auto new_chunk = m_vdev.m_chunk_pool->dequeue();
Expand Down
5 changes: 5 additions & 0 deletions src/lib/device/journal_vdev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ class JournalVirtualDev : public VirtualDev {
// where log entries are stored. It also mantains offsets, size etc.
shared< Descriptor > open(logdev_id_t id);

// Destroy a logdev and release all the chunks related to the logdev.
void destroy(logdev_id_t id);

/**
* @brief Get the status of the journal vdev and its internal structures
* @param log_level: Log level to do verbosity.
Expand All @@ -410,7 +413,9 @@ class JournalVirtualDev : public VirtualDev {

uint64_t used_size() const override;
uint64_t available_blks() const override;
uint64_t num_descriptors() const { return m_journal_descriptors.size(); }

void remove_journal_chunks(std::vector< shared< Chunk > >& chunks);
void update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* chunk_private);
uint64_t get_end_of_chunk(shared< Chunk >& chunk) const;

Expand Down
2 changes: 1 addition & 1 deletion src/lib/device/physical_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin

auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot);
ret_chunks.push_back(chunk);
get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], std::move(chunk)});
get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], chunk});
HS_LOG(INFO, device, "Creating chunk {}", chunk->to_string());
cinfo->~chunk_info();
}
Expand Down
24 changes: 16 additions & 8 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,16 @@ SISL_LOGGING_DECL(logstore)
static bool has_data_service() { return HomeStore::instance()->has_data_service(); }
// static BlkDataService& data_service() { return HomeStore::instance()->data_service(); }

LogDev::LogDev(const logdev_id_t id) : m_logdev_id{id} {
LogDev::LogDev(const logdev_id_t id, JournalVirtualDev* vdev) : m_logdev_id{id}, m_vdev(vdev) {
m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_logdev);
}

LogDev::~LogDev() = default;

void LogDev::start(bool format, JournalVirtualDev* vdev) {
// Each logdev has one journal descriptor.
m_vdev = vdev;
m_vdev_jd = m_vdev->open(m_logdev_id);
RELEASE_ASSERT(m_vdev_jd, "Journal descriptor is null");
}

LogDev::~LogDev() = default;

void LogDev::start(bool format) {
if (m_flush_size_multiple == 0) { m_flush_size_multiple = m_vdev->optimal_page_size(); }
THIS_LOGDEV_LOG(INFO, "Initializing logdev with flush size multiple={}", m_flush_size_multiple);

Expand Down Expand Up @@ -106,7 +104,7 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) {
}

void LogDev::stop() {
THIS_LOGDEV_LOG(INFO, "Logdev stopping id {}", m_logdev_id);
THIS_LOGDEV_LOG(INFO, "Logdev stopping log_dev={}", m_logdev_id);
HS_LOG_ASSERT((m_pending_flush_size == 0), "LogDev stop attempted while writes to logdev are pending completion");
const bool locked_now = run_under_flush_lock([this]() {
{
Expand Down Expand Up @@ -151,6 +149,11 @@ void LogDev::stop() {
m_hs.reset();
}

void LogDev::destroy() {
THIS_LOGDEV_LOG(INFO, "Logdev destroy metablks log_dev={}", m_logdev_id);
m_logdev_meta.destroy();
}

void LogDev::start_timer() {
// Currently only tests set it to 0.
if (HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) == 0) { return; }
Expand Down Expand Up @@ -920,6 +923,11 @@ logdev_superblk* LogDevMetadata::create(logdev_id_t id) {
return sb;
}

void LogDevMetadata::destroy() {
m_rollback_sb.destroy();
m_sb.destroy();
}

void LogDevMetadata::reset() {
m_id_reserver.reset();
m_store_info.clear();
Expand Down
15 changes: 11 additions & 4 deletions src/lib/logstore/log_dev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ class LogDevMetadata {
~LogDevMetadata() = default;

logdev_superblk* create(logdev_id_t id);
void destroy();
void reset();
std::vector< std::pair< logstore_id_t, logstore_superblk > > load();
void persist();
Expand Down Expand Up @@ -614,7 +615,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > {
return HS_DYNAMIC_CONFIG(logstore.flush_threshold_size) - sizeof(log_group_header);
}

LogDev(logdev_id_t logdev_id);
LogDev(logdev_id_t logdev_id, JournalVirtualDev* vdev);
LogDev(const LogDev&) = delete;
LogDev& operator=(const LogDev&) = delete;
LogDev(LogDev&&) noexcept = delete;
Expand All @@ -626,16 +627,21 @@ class LogDev : public std::enable_shared_from_this< LogDev > {
* to the recovery. It is expected that all callbacks are registered before calling the start.
*
* @param format: Do we need to format the logdev or not.
* @param blk_store: The blk_store associated to this logdev
*/
void start(bool format, JournalVirtualDev* vdev);
void start(bool format);

/**
* @brief Stop the logdev. It resets all the parameters it is using and thus can be started later
*
*/
void stop();

/**
* @brief Destroy the logdev metablks.
*
*/
void destroy();

/**
* @brief Start the flush timer.
*
Expand Down Expand Up @@ -798,6 +804,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > {
void handle_unopened_log_stores(bool format);
logdev_id_t get_id() { return m_logdev_id; }
shared< JournalVirtualDev::Descriptor > get_journal_descriptor() const { return m_vdev_jd; }
bool is_stopped() { return m_stopped; }

// bool ready_for_truncate() const { return m_vdev_jd->ready_for_truncate(); }

Expand Down Expand Up @@ -854,7 +861,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > {
std::atomic< logid_t > m_log_idx{0}; // Generator of log idx
std::atomic< int64_t > m_pending_flush_size{0}; // How much flushable logs are pending
std::atomic< bool > m_is_flushing{false}; // Is LogDev currently flushing (so far supports one flusher at a time)
bool m_stopped{false}; // Is Logdev stopped. We don't need lock here, because it is updated under flush lock
bool m_stopped{true}; // Is Logdev stopped. We don't need lock here, because it is updated under flush lock
logdev_id_t m_logdev_id;
JournalVirtualDev* m_vdev{nullptr};
shared< JournalVirtualDev::Descriptor > m_vdev_jd; // Journal descriptor.
Expand Down
Loading

0 comments on commit 988fea7

Please sign in to comment.