Skip to content

Commit

Permalink
implement fix pg size
Browse files Browse the repository at this point in the history
  • Loading branch information
Hooper9973 committed Oct 29, 2024
1 parent ff926bb commit 77e4908
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 60 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.4]@oss/master")
self.requires("homestore/[^6.5]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
239 changes: 216 additions & 23 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,19 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const
return nullptr;
}

// shardid -> chunkid map is maintained by ShardManager
// pg_id->pdev_id map is maintained by PgManager
// chunselector will not take care of the two maps for now.
uint32_t pdevID = 0;
auto& pdevIdHint = hint.pdev_id_hint;
if (!pdevIdHint.has_value()) {
// this is the first shard of this pg, select a pdev with the most available blocks for it
auto&& it =
std::max_element(m_per_dev_heap.begin(), m_per_dev_heap.end(),
[](const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& lhs,
const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& rhs) {
return lhs.second->available_blk_count.load() < rhs.second->available_blk_count.load();
});
if (it == m_per_dev_heap.end()) {
LOGWARNMOD(homeobject, "No pdev found for new pg");
return nullptr;
}
pdevID = it->first;
// Temporary bypass using pdev_id_hint to represent pg_id_hint
pg_id_t pg_id = 0;
auto& pg_id_hint = hint.pdev_id_hint;
if (!pg_id_hint.has_value()) {
LOGWARNMOD(homeobject, "should not allocated a chunk without exiting pg_id {} in hint!", chunkIdHint.value());
return nullptr;
} else {
pdevID = pdevIdHint.value();
pg_id = pg_id_hint.value();
}

auto it = m_per_dev_heap.find(pdevID);
if (it == m_per_dev_heap.end()) {
LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID);
auto it = m_per_pg_heap.find(pg_id);
if (it == m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return nullptr;
}

Expand All @@ -99,7 +87,7 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const
avalableBlkCounter.fetch_sub(vchunk.available_blks());
remove_chunk_from_defrag_heap(vchunk.get_chunk_id());
} else {
LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID);
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
}

return vchunk.get_internal_chunk();
Expand Down Expand Up @@ -148,6 +136,48 @@ csharedChunk HeapChunkSelector::select_specific_chunk(const chunk_num_t chunkID)
return vchunk.get_internal_chunk();
}

csharedChunk HeapChunkSelector::select_specific_chunk(const pg_id_t pg_id, const chunk_num_t chunkID) {
if (m_chunks.find(chunkID) == m_chunks.end()) {
// sanity check
LOGWARNMOD(homeobject, "No chunk found for ChunkID {}", chunkID);
return nullptr;
}
auto pg_it = m_per_pg_heap.find(pg_id);
if (pg_it == m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return nullptr;
}


VChunk vchunk(nullptr);
auto& heap = pg_it->second->m_heap;
if (auto lock_guard = std::lock_guard< std::mutex >(pg_it->second->mtx); !heap.empty()) {
std::vector< VChunk > chunks;
chunks.reserve(heap.size());
while (!heap.empty()) {
auto c = heap.top();
heap.pop();
if (c.get_chunk_id() == chunkID) {
vchunk = c;
break;
}
chunks.push_back(std::move(c));
}

for (auto& c : chunks) {
heap.emplace(c);
}
}

if (vchunk.get_internal_chunk()) {
auto& avalableBlkCounter = pg_it->second->available_blk_count;
avalableBlkCounter.fetch_sub(vchunk.available_blks());
remove_chunk_from_defrag_heap(vchunk.get_chunk_id());
}

return vchunk.get_internal_chunk();
}

// most_defrag_chunk will only be called when GC is triggered, and will return the chunk with the most
// defrag blocks
csharedChunk HeapChunkSelector::most_defrag_chunk() {
Expand Down Expand Up @@ -196,6 +226,30 @@ void HeapChunkSelector::release_chunk(const chunk_num_t chunkID) {
}
}

void HeapChunkSelector::release_chunk(const pg_id_t pg_id, const chunk_num_t chunkID) {
if (m_chunks.find(chunkID) == m_chunks.end()) {
// sanity check
LOGWARNMOD(homeobject, "No chunk found for ChunkID {}", chunkID);
return;
}
auto pg_it = m_per_pg_heap.find(pg_id);
if (pg_it == m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return;
}

const auto& chunk = m_chunks[chunkID];
VChunk vchunk(chunk);
{
std::lock_guard< std::mutex > l(pg_it->second->mtx);
auto& pg_heap = pg_it->second->m_heap;
pg_heap.emplace(chunk);
}
auto& avalableBlkCounter = pg_it->second->available_blk_count;
avalableBlkCounter.fetch_add(vchunk.available_blks());

}

void HeapChunkSelector::build_per_dev_chunk_heap(const std::unordered_set< chunk_num_t >& excludingChunks) {
for (const auto& p : m_chunks) {
bool add_to_heap = true;
Expand All @@ -204,6 +258,145 @@ void HeapChunkSelector::build_per_dev_chunk_heap(const std::unordered_set< chunk
};
}

uint32_t HeapChunkSelector::get_chunk_size() const {
const auto& chunk = m_chunks.begin()->second;
auto vchunk = VChunk(chunk);
return vchunk.size();
}

void HeapChunkSelector::select_chunks_for_pg(pg_id_t pg_id, chunk_num_t num_chunk) {
LOGINFOMOD(homeobject, "[Hooper] start created pg {}, num_chunk {} ", pg_id, num_chunk);
std::unique_lock lock_guard(m_chunk_selector_mtx);
if (m_per_pg_heap.find(pg_id) != m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "Pg {} had been created", pg_id);
return;
}

//Select a pdev with the most available blocks for it referenced from HeapChunkSelector::select_chunk"
auto &&most_avail_dev_it =
std::max_element(m_per_dev_heap.begin(), m_per_dev_heap.end(),
[](const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& lhs,
const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& rhs) {
return lhs.second->available_blk_count.load() < rhs.second->available_blk_count.load();
});
auto& pdev_heap = most_avail_dev_it->second;
if (num_chunk > pdev_heap->size()) {
LOGWARNMOD(homeobject, "Pdev has no enough space to create pg {} with num_chunk {}", pg_id, num_chunk);
return;
}
auto vchunk = VChunk(nullptr);
auto it = m_per_pg_heap.emplace(pg_id, std::make_shared< PerDevHeap >()).first;
auto v2r_vector = m_v2r_chunk_map.emplace(pg_id, std::make_shared< std::vector < chunk_num_t > >()).first->second;
auto r2v_map = m_r2v_chunk_map.emplace(pg_id, std::make_shared< ChunkIdMap >()).first->second;

auto& pg_heap = it->second;
std::scoped_lock lock(pdev_heap->mtx, pg_heap->mtx);
v2r_vector->reserve(num_chunk);
for (chunk_num_t i = 0; i < num_chunk; ++i) {
vchunk = pdev_heap->m_heap.top();
pdev_heap->m_heap.pop();
pdev_heap->available_blk_count.fetch_sub(vchunk.available_blks());

pg_heap->m_heap.emplace(vchunk);
pg_heap->m_total_blks += vchunk.get_total_blks();
pg_heap->available_blk_count.fetch_add(vchunk.available_blks());
// v_chunk_id start from 0.
chunk_num_t v_chunk_id = i;
chunk_num_t r_chunk_id = vchunk.get_chunk_id();
v2r_vector->emplace_back(r_chunk_id);
r2v_map->emplace(r_chunk_id, v_chunk_id);
}
LOGINFOMOD(homeobject, "[Hooper] Successfully created pg {}, num_chunk {} ", pg_id, num_chunk);

return;
}

void HeapChunkSelector::persist_chunk_ids_into_pg_sb(pg_id_t pg_id, chunk_num_t num_chunk, homestore::chunk_num_t* pb_sb_chunk_ids) const {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto it = m_v2r_chunk_map.find(pg_id);
if (it == m_v2r_chunk_map.end()) {
LOGWARNMOD(homeobject, "Pg {} had never been created", pg_id);
return;
}
auto chunk_ids = it->second;
for (chunk_num_t i = 0; i < num_chunk; ++i) {
pb_sb_chunk_ids[i] = chunk_ids->at(i);
}
}

void HeapChunkSelector::recover_pg_chunk_map_from_pg_sb(pg_id_t pg_id, chunk_num_t num_chunk, homestore::chunk_num_t* chunk_ids) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
if (m_v2r_chunk_map.find(pg_id) != m_v2r_chunk_map.end()) {
LOGWARNMOD(homeobject, "Pg {} had been recovered", pg_id);
return;
}

auto v2r_vector = m_v2r_chunk_map.emplace(pg_id, std::make_shared< std::vector < chunk_num_t > >()).first->second;
auto r2v_map = m_r2v_chunk_map.emplace(pg_id, std::make_shared< ChunkIdMap >()).first->second;

v2r_vector->reserve(num_chunk);
for (chunk_num_t i = 0; i < num_chunk; ++i) {
// v_chunk_id start from 0.
chunk_num_t v_chunk_id = i;
chunk_num_t r_chunk_id = chunk_ids[i];
v2r_vector->emplace_back(r_chunk_id);
r2v_map->emplace(r_chunk_id, v_chunk_id);
}
}

void HeapChunkSelector::recover_per_dev_chunk_heap() {
std::unique_lock lock_guard(m_chunk_selector_mtx);
for (const auto& [chunk_id, _] : m_chunks) {
bool add_to_heap = true;
for (const auto& [_, chunk_map] : m_r2v_chunk_map) {
if (chunk_map->find(chunk_id) != chunk_map->end()) {
add_to_heap = false;
break;
}
}
add_chunk_internal(chunk_id, add_to_heap);

}
LOGINFOMOD(homeobject, "[Hooper] Successfully recover pdev_heap");
}

void HeapChunkSelector::recover_pg_chunk_heap(pg_id_t pg_id, const std::unordered_set< chunk_num_t >& excludingChunks)
{
std::unique_lock lock_guard(m_chunk_selector_mtx);
if (m_per_pg_heap.find(pg_id) != m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "Pg_heap {} had been recovered", pg_id);
return;
}
auto it = m_v2r_chunk_map.find(pg_id);
if (it == m_v2r_chunk_map.end()) {
LOGWARNMOD(homeobject, "Pg_chunk_map {} had never been recovered", pg_id);
return;
}
const auto& chunk_ids = it->second;
auto& pg_heap = m_per_pg_heap.emplace(pg_id, std::make_shared< PerDevHeap >()).first->second;
for (const auto& chunk_id : *chunk_ids) {
if (excludingChunks.find(chunk_id) == excludingChunks.end()) {
const auto& chunk = m_chunks[chunk_id];
auto vchunk = VChunk(chunk);
pg_heap->m_heap.emplace(vchunk);
pg_heap->m_total_blks += vchunk.get_total_blks();
pg_heap->available_blk_count.fetch_add(vchunk.available_blks());
}
}
LOGINFOMOD(homeobject, "[Hooper] Successfully recover pg_heap {} ", pg_id);
}

std::shared_ptr< const std::vector <chunk_num_t> > HeapChunkSelector::get_v2r_chunk_id_vector(pg_id_t pg_id) const {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto it = m_v2r_chunk_map.find(pg_id);
if (it != m_v2r_chunk_map.end()) {
return it->second;
} else {
LOGWARNMOD(homeobject, "get_v2r_chunk_id_vector had never been created, pg {}", pg_id);
return nullptr;
}
}

homestore::blk_alloc_hints HeapChunkSelector::chunk_to_hints(chunk_num_t chunk_id) const {
auto iter = m_chunks.find(chunk_id);
if (iter == m_chunks.end()) {
Expand Down
25 changes: 24 additions & 1 deletion src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
namespace homeobject {

using csharedChunk = homestore::cshared< homestore::Chunk >;
using ChunkIdMap = std::unordered_map < homestore::chunk_num_t, homestore::chunk_num_t >;
using chunk_num_t = homestore::chunk_num_t;

class HeapChunkSelector : public homestore::ChunkSelector {
public:
Expand All @@ -35,7 +37,6 @@ class HeapChunkSelector : public homestore::ChunkSelector {

using VChunkHeap = std::priority_queue< VChunk, std::vector< VChunk >, VChunkComparator >;
using VChunkDefragHeap = std::priority_queue< VChunk, std::vector< VChunk >, VChunkDefragComparator >;
using chunk_num_t = homestore::chunk_num_t;

struct PerDevHeap {
std::mutex mtx;
Expand All @@ -52,17 +53,33 @@ class HeapChunkSelector : public homestore::ChunkSelector {
// this function will be used by GC flow or recovery flow to mark one specific chunk to be busy, caller should be
// responsible to use release_chunk() interface to release it when no longer to use the chunk anymore.
csharedChunk select_specific_chunk(const chunk_num_t);
csharedChunk select_specific_chunk(const pg_id_t pg_id, const chunk_num_t);

// this function will be used by GC flow to select a chunk for GC
csharedChunk most_defrag_chunk();

// this function is used to return a chunk back to ChunkSelector when sealing a shard, and will only be used by
// Homeobject.
void release_chunk(const chunk_num_t);
void release_chunk(const pg_id_t pg_id, const chunk_num_t);

// this should be called after ShardManager is initialized and get all the open shards
void build_per_dev_chunk_heap(const std::unordered_set< chunk_num_t >& excludingChunks);
void recover_per_dev_chunk_heap();
void recover_pg_chunk_heap(pg_id_t pg_id, const std::unordered_set< chunk_num_t >& excludingChunks);

/**
* select chunks for pg, chunks need to be in same pdev.
*
* @param pg_id The ID of the pg.
* @param num_chunk The num of chunk which be decided by fix pg size.
*/
void select_chunks_for_pg(pg_id_t pg_id, chunk_num_t num_chunk);

void persist_chunk_ids_into_pg_sb(pg_id_t pg_id, chunk_num_t num_chunk, homestore::chunk_num_t* chunk_ids) const;
void recover_pg_chunk_map_from_pg_sb(pg_id_t pg_id, chunk_num_t num_chunk, homestore::chunk_num_t* chunk_ids);

std::shared_ptr< const std::vector <chunk_num_t> > get_v2r_chunk_id_vector(pg_id_t pg_id) const;
/**
* Retrieves the block allocation hints for a given chunk.
*
Expand Down Expand Up @@ -112,12 +129,18 @@ class HeapChunkSelector : public homestore::ChunkSelector {
*/
uint32_t total_chunks() const;

uint32_t get_chunk_size() const;

private:
std::unordered_map< uint32_t, std::shared_ptr< PerDevHeap > > m_per_dev_heap;
std::unordered_map< pg_id_t, std::shared_ptr< PerDevHeap > > m_per_pg_heap;
std::unordered_map< pg_id_t, std::shared_ptr< std::vector <chunk_num_t> > > m_v2r_chunk_map;
std::unordered_map< pg_id_t, std::shared_ptr< ChunkIdMap > > m_r2v_chunk_map;

// hold all the chunks , selected or not
std::unordered_map< chunk_num_t, csharedChunk > m_chunks;

mutable std::shared_mutex m_chunk_selector_mtx;
void add_chunk_internal(const chunk_num_t, bool add_to_heap = true);

VChunkDefragHeap m_defrag_heap;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ void HSHomeObject::on_replica_restart() {
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_pg_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr, true);
[this](bool success) { on_pg_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_pg_meta_name);

// recover shard
Expand Down
Loading

0 comments on commit 77e4908

Please sign in to comment.