Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yk repl truc #356

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class LogStoreService {
uint32_t used_size() const;
uint32_t total_size() const;
iomgr::io_fiber_t flush_thread() { return m_flush_fiber; }

// called by LogDev truncate;
iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; }

private:
Expand Down
2 changes: 2 additions & 0 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ class ReplDev {
/// @return Block size
virtual uint32_t get_blk_size() const = 0;

virtual void truncate_if_needed() = 0;

virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); }

virtual void detach_listener() {
Expand Down
3 changes: 2 additions & 1 deletion src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ VENUM(repl_impl_type, uint8_t,
solo // For single node - no replication
);


class ReplApplication;

class ReplicationService {
Expand Down Expand Up @@ -53,6 +52,8 @@ class ReplicationService {
virtual hs_stats get_cap_stats() const = 0;

virtual meta_sub_type get_meta_blk_name() const = 0;

virtual void resource_audit() = 0;
};

//////////////// Application which uses Replication needs to be provide the following callbacks ////////////////
Expand Down
2 changes: 1 addition & 1 deletion src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ CPManager::CPManager() :
nullptr);

resource_mgr().register_dirty_buf_exceed_cb(
[this]([[maybe_unused]] int64_t dirty_buf_count) { this->trigger_cp_flush(false /* false */); });
[this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) { this->trigger_cp_flush(false /* force */); });

start_cp_thread();
}
Expand Down
16 changes: 12 additions & 4 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,15 @@ table ResourceLimits {
/* precentage of memory used during recovery */
memory_in_recovery_precent: uint32 = 40;

/* journal size used percentage */
journal_size_percent: uint32 = 50;
/* journal size used percentage high watermark -- trigger cp */
journal_vdev_size_percent: uint32 = 50;

/* journal size used percentage critical watermark -- trigger truncation */
journal_vdev_size_percent_critical: uint32 = 90;

/* logdev num entries that will trigger mark this ready for truncation */
logdev_num_log_entries_threadhold: uint32 = 2000000(hotswap);

/* We crash if volume is 95 percent filled and no disk space left */
vol_threshhold_used_size_p: uint32 = 95;
}
Expand Down Expand Up @@ -199,8 +205,8 @@ table Consensus {
heartbeat_period_ms: uint32 = 250;

// Re-election timeout low and high mark
elect_to_low_ms: uint32 = 900;
elect_to_high_ms: uint32 = 1400;
elect_to_low_ms: uint32 = 800;
elect_to_high_ms: uint32 = 1700;

// When a new member is being synced, the batch size of number of logs to be shipped
log_sync_batch_size: int32 = 100;
Expand Down Expand Up @@ -228,6 +234,8 @@ table Consensus {

// data fetch max size limit in MB
data_fetch_max_size_mb: uint32 = 2;


}

table HomeStoreSettings {
Expand Down
52 changes: 43 additions & 9 deletions src/lib/common/resource_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,26 @@
namespace homestore {
ResourceMgr& resource_mgr() { return hs()->resource_mgr(); }

void ResourceMgr::set_total_cap(uint64_t total_cap) { m_total_cap = total_cap; }
void ResourceMgr::start(uint64_t total_cap) {
m_total_cap = total_cap;
start_timer();
}

void ResourceMgr::start_timer() {
auto const res_mgr_timer_us = HS_DYNAMIC_CONFIG(generic.res_mgr_timer_us);
LOGINFO("resource audit timer is set to {} usec", res_mgr_timer_us);

m_res_audit_timer_hdl = iomanager.schedule_global_timer(
res_mgr_timer_us * 1000, true /* recurring */, nullptr /* cookie */, iomgr::reactor_regex::all_worker,
[this](void*) {
// all resource timely audit routine should arrive here;
hs()->logstore_service().device_truncate();

// TODO: add device_truncate callback to audit how much space was freed per each LogDev and add related
// metrics;
},
true /* wait_to_schedule */);
}

/* monitor dirty buffer count */
void ResourceMgr::inc_dirty_buf_size(const uint32_t size) {
Expand Down Expand Up @@ -106,22 +125,37 @@ uint64_t ResourceMgr::get_cache_size() const {
return ((HS_STATIC_CONFIG(input.io_mem_size()) * HS_DYNAMIC_CONFIG(resource_limits.cache_size_percent)) / 100);
}

/* monitor journal size */
bool ResourceMgr::check_journal_size(const uint64_t used_size, const uint64_t total_size) {
if (m_journal_exceed_cb) {
bool ResourceMgr::check_journal_descriptor_size(const uint64_t used_size) {
return (used_size >= get_journal_descriptor_size_limit());
}

/* monitor journal vdev size */
bool ResourceMgr::check_journal_vdev_size(const uint64_t used_size, const uint64_t total_size) {
if (m_journal_vdev_exceed_cb) {
const uint32_t used_pct = (100 * used_size / total_size);
if (used_pct >= HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent)) {
m_journal_exceed_cb(used_size);
if (used_pct >= get_journal_vdev_size_limit()) {
m_journal_vdev_exceed_cb(used_size, used >= get_journal_vdev_size_critical_limit() /* is_critical */);
HS_LOG_EVERY_N(WARN, base, 50, "high watermark hit, used percentage: {}, high watermark percentage: {}",
used_pct, HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent));
used_pct, get_journal_vdev_size_limit());
return true;
}
}
return false;
}
void ResourceMgr::register_journal_exceed_cb(exceed_limit_cb_t cb) { m_journal_exceed_cb = std::move(cb); }

uint32_t ResourceMgr::get_journal_size_limit() const { return HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent); }
void ResourceMgr::register_journal_vdev_exceed_cb(exceed_limit_cb_t cb) { m_journal_vdev_exceed_cb = std::move(cb); }

uint32_t ResourceMgr::get_journal_descriptor_size_limit() const {
return HS_DYNAMIC_CONFIG(resource_limits.journal_descriptor_size_threshold_mb) * 1024 * 1024;
}

uint32_t ResourceMgr::get_journal_vdev_size_critical_limit() const {
return HS_DYNAMIC_CONFIG(resource_limits.journal_vdev_size_percent_critical);
}

uint32_t ResourceMgr::get_journal_vdev_size_limit() const {
return HS_DYNAMIC_CONFIG(resource_limits.journal_vdev_size_percent);
}

/* monitor chunk size */
void ResourceMgr::check_chunk_free_size_and_trigger_cp(uint64_t free_size, uint64_t alloc_size) {}
Expand Down
19 changes: 13 additions & 6 deletions src/lib/common/resource_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ class RsrcMgrMetrics : public sisl::MetricsGroup {
~RsrcMgrMetrics() { deregister_me_from_farm(); }
};

typedef std::function< void(int64_t /* dirty_buf_cnt */) > exceed_limit_cb_t;
typedef std::function< void(int64_t /* dirty_buf_cnt */, bool /* critical */ = false) > exceed_limit_cb_t;
const uint32_t max_qd_multiplier = 32;

class ResourceMgr {
public:
void set_total_cap(uint64_t total_cap);
void start(uint64_t total_cap);

/* monitor dirty buffer count */
void inc_dirty_buf_size(const uint32_t size);
Expand Down Expand Up @@ -76,10 +76,11 @@ class ResourceMgr {
uint64_t get_cache_size() const;

/* monitor journal size */
bool check_journal_size(const uint64_t used_size, const uint64_t total_size);
void register_journal_exceed_cb(exceed_limit_cb_t cb);
bool check_journal_vdev_size(const uint64_t used_size, const uint64_t total_size);
void register_journal_vdev_exceed_cb(exceed_limit_cb_t cb);

uint32_t get_journal_size_limit() const;
uint32_t get_journal_vdev_size_limit() const;
uint32_t get_journal_vdev_size_critical_limit() const;

/* monitor chunk size */
void check_chunk_free_size_and_trigger_cp(uint64_t free_size, uint64_t alloc_size);
Expand All @@ -92,18 +93,24 @@ class ResourceMgr {

private:
int64_t get_dirty_buf_limit() const;
void start_timer();

private:
std::atomic< int64_t > m_hs_dirty_buf_cnt;
std::atomic< int64_t > m_hs_fb_cnt; // free count
std::atomic< int64_t > m_hs_fb_size; // free size
std::atomic< int64_t > m_hs_ab_cnt; // alloc count
std::atomic< int64_t > m_memory_used_in_recovery;
std::atomic< uint32_t > m_flush_dirty_buf_q_depth{64};
uint64_t m_total_cap;

// TODO: make it event_cb
exceed_limit_cb_t m_dirty_buf_exceed_cb;
exceed_limit_cb_t m_free_blks_exceed_cb;
exceed_limit_cb_t m_journal_exceed_cb;
exceed_limit_cb_t m_journal_vdev_exceed_cb;
RsrcMgrMetrics m_metrics;

iomgr::timer_handle_t m_res_audit_timer_hdl;
};

extern ResourceMgr& resource_mgr();
Expand Down
36 changes: 35 additions & 1 deletion src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo
return private_blob;
},
m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size});

resource_mgr().register_journal_vdev_exceed_cb([this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) {
this->trigger_cp_flush(false /* force */);

if (critical) {
// call resource autit to replicaiton service to free up some space immediately
hs()->repl_service().resource_audit();
}
});
}

JournalVirtualDev::~JournalVirtualDev() {}
Expand Down Expand Up @@ -561,6 +570,21 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
m_write_sz_in_total.fetch_sub(size_to_truncate, std::memory_order_relaxed);
m_truncate_done = true;

//
// Conceptually in rare case(not poosible for NuObject, possibly true for NuBlox2.0) truncate itself can't garunteen
// the space is freed up upto satisfy resource manager. e.g. multiple log stores on this same descriptor and one
// logstore lagging really behind and not able to truncate much space. Doing multiple truncation won't help in this
// case.
//
// In this rare case, the next write on this descrptor will set ready flag again.
//
// And any write on any other descriptor will trigger a high_watermark_check, and if it were to trigger critial
// alert on this vdev, truncation will be made immediately on all descriptors;
//
// If still no space can be freed, there is nothing we can't here to back pressure to above layer by rejecting log
// writes on this descriptor;
//
unset_ready_for_truncate();
HS_PERIODIC_LOG(DEBUG, journalvdev, "After truncate desc {}", to_string());
}

Expand Down Expand Up @@ -625,8 +649,18 @@ bool JournalVirtualDev::Descriptor::is_offset_at_last_chunk(off_t bytes_offset)
return false;
}

//
// This API is ways called in single thread
//
void JournalVirtualDev::Descriptor::high_watermark_check() {
if (resource_mgr().check_journal_size(used_size(), size())) {
// high watermark check for the individual journal descriptor;
if (resource_mgr()->check_journal_descriptor_size(used_size())) {
// the next resource manager audit will call truncation for this descriptor;
set_ready_for_truncation();
}

// high watermark check for the entire journal vdev;
if (resource_mgr().check_journal_vdev_size(m_vdev.used_size(), m_vdev.size())) {
COUNTER_INCREMENT(m_vdev.m_metrics, vdev_high_watermark_count, 1);

if (m_vdev.m_event_cb && m_truncate_done) {
Expand Down
10 changes: 8 additions & 2 deletions src/lib/device/journal_vdev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class JournalVirtualDev : public VirtualDev {
uint64_t m_total_size{0}; // Total size of all chunks.
off_t m_end_offset{0}; // Offset right to window. Never reduced. Increased in multiple of chunk size.
bool m_end_offset_set{false}; // Adjust the m_end_offset only once during init.
std::atomic< bool > m_ready_for_truncate{false}; // reset by truncation thread and set by append thread;
friend class JournalVirtualDev;

public:
Expand All @@ -78,16 +79,21 @@ class JournalVirtualDev : public VirtualDev {
// Create and append the chunk to m_journal_chunks.
void append_chunk();

void set_ready_for_truncate() { m_ready_for_truncate.store(true, std::memory_order_relaxed); }

void unset_ready_for_truncate() { m_ready_for_truncate.store(false, std::memory_order_relaxed); }

/**
* @brief : allocate space specified by input size.
* this API will always be called in single thread;
*
* @param size : size to be allocated
*
* @return : the start unique offset of the allocated space
*
* Possible calling sequence:
* offset_1 = reserve(size1);
* offset_2 = reserve(size2);
* offset_1 = alloc_next_append_blk(size1);
* offset_2 = alloc_next_append_blk(size2);
* write_at_offset(offset_2);
* write_at_offset(offset_1);
*/
Expand Down
3 changes: 2 additions & 1 deletion src/lib/homestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ void HomeStore::do_start() {

m_meta_service->start(m_dev_mgr->is_first_time_boot());
m_cp_mgr->start(is_first_time_boot());
m_resource_mgr->set_total_cap(m_dev_mgr->total_capacity());

if (has_index_service()) { m_index_service->start(); }

Expand All @@ -221,6 +220,8 @@ void HomeStore::do_start() {
}

m_cp_mgr->start_timer();

m_resource_mgr->start(m_dev_mgr->total_capacity());
m_init_done = true;
}

Expand Down
44 changes: 27 additions & 17 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -683,25 +683,27 @@ void LogDev::remove_log_store(logstore_id_t store_id) {
}

void LogDev::device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq) {
run_under_flush_lock([this, treq]() {
iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() {
const logdev_key trunc_upto = do_device_truncate(treq->dry_run);
bool done{false};
if (treq->cb || treq->wait_till_done) {
{
std::lock_guard< std::mutex > lk{treq->mtx};
done = (--treq->trunc_outstanding == 0);
treq->m_trunc_upto_result[m_logdev_id] = trunc_upto;
if (m_vdev_jd->ready_for_truncate()) {
run_under_flush_lock([this, treq]() {
iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() {
const logdev_key trunc_upto = do_device_truncate(treq->dry_run);
bool done{false};
if (treq->cb || treq->wait_till_done) {
{
std::lock_guard< std::mutex > lk{treq->mtx};
done = (--treq->trunc_outstanding == 0);
treq->m_trunc_upto_result[m_logdev_id] = trunc_upto;
}
}
}
if (done) {
if (treq->cb) { treq->cb(treq->m_trunc_upto_result); }
if (treq->wait_till_done) { treq->cv.notify_one(); }
}
unlock_flush();
if (done) {
if (treq->cb) { treq->cb(treq->m_trunc_upto_result); }
if (treq->wait_till_done) { treq->cv.notify_one(); }
}
unlock_flush();
});
return false; // Do not release the flush lock yet, the scheduler will unlock it.
});
return false; // Do not release the flush lock yet, the scheduler will unlock it.
});
}
}

void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb) {
Expand Down Expand Up @@ -782,6 +784,12 @@ void LogDev::on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in
}
}

uint32_t LogDev::get_reserved_log_truncation_idx() const {
// TODO: are there any holes between m_log_idx and m_last_truncate_idx;
auto const total_in_use_ids = m_log_idx.load() - m_last_truncate_idx;
return std::min(total_in_use_ids, HS_DYNAMIC_CONFIG(resource_limits.logdev_num_log_entries_threadhold));
}

logdev_key LogDev::do_device_truncate(bool dry_run) {
static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_min_trunc_stores;
static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_non_participating_stores;
Expand Down Expand Up @@ -828,6 +836,8 @@ logdev_key LogDev::do_device_truncate(bool dry_run) {
return min_safe_ld_key;
}

min_safe_ld_key = std::min(min_safe_ld_key.idx, get_reserved_log_truncation_idx());

// Got the safest log id to truncate and actually truncate upto the safe log idx to the log device
if (!dry_run) { truncate(min_safe_ld_key); }
HS_PERIODIC_LOG(INFO, logstore,
Expand Down
Loading
Loading