Skip to content

Commit

Permalink
implement fix pg size
Browse files Browse the repository at this point in the history
  • Loading branch information
Hooper9973 committed Nov 4, 2024
1 parent a90a20f commit bf0f88b
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 138 deletions.
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.6"
version = "2.1.7"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down Expand Up @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.4]@oss/master")
self.requires("homestore/[^6.5]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
225 changes: 171 additions & 54 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void HeapChunkSelector::add_chunk_internal(const chunk_num_t chunkID, bool add_t
auto pdevID = vchunk.get_pdev_id();
// add this find here, since we don`t want to call make_shared in try_emplace every time.
auto it = m_per_dev_heap.find(pdevID);
if (it == m_per_dev_heap.end()) { it = m_per_dev_heap.emplace(pdevID, std::make_shared< PerDevHeap >()).first; }
if (it == m_per_dev_heap.end()) { it = m_per_dev_heap.emplace(pdevID, std::make_shared< ChunkHeap >()).first; }

// build total blks for every chunk on this device;
it->second->m_total_blks += vchunk.get_total_blks();
Expand Down Expand Up @@ -59,31 +59,19 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const
return nullptr;
}

// shardid -> chunkid map is maintained by ShardManager
// pg_id->pdev_id map is maintained by PgManager
// chunselector will not take care of the two maps for now.
uint32_t pdevID = 0;
auto& pdevIdHint = hint.pdev_id_hint;
if (!pdevIdHint.has_value()) {
// this is the first shard of this pg, select a pdev with the most available blocks for it
auto&& it =
std::max_element(m_per_dev_heap.begin(), m_per_dev_heap.end(),
[](const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& lhs,
const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& rhs) {
return lhs.second->available_blk_count.load() < rhs.second->available_blk_count.load();
});
if (it == m_per_dev_heap.end()) {
LOGWARNMOD(homeobject, "No pdev found for new pg");
return nullptr;
}
pdevID = it->first;
// Temporary bypass using pdev_id_hint to represent pg_id_hint, "identical layout" will change it
pg_id_t pg_id = 0;
auto& pg_id_hint = hint.pdev_id_hint;
if (!pg_id_hint.has_value()) {
LOGWARNMOD(homeobject, "should not allocated a chunk without exiting pg_id in hint!");
return nullptr;
} else {
pdevID = pdevIdHint.value();
pg_id = pg_id_hint.value();
}

auto it = m_per_dev_heap.find(pdevID);
if (it == m_per_dev_heap.end()) {
LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID);
auto it = m_per_pg_heap.find(pg_id);
if (it == m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return nullptr;
}

Expand All @@ -99,29 +87,28 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const
avalableBlkCounter.fetch_sub(vchunk.available_blks());
remove_chunk_from_defrag_heap(vchunk.get_chunk_id());
} else {
LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID);
LOGWARNMOD(homeobject, "no available chunks left for pg {}", pg_id);
}

return vchunk.get_internal_chunk();
}

csharedChunk HeapChunkSelector::select_specific_chunk(const chunk_num_t chunkID) {
csharedChunk HeapChunkSelector::select_specific_chunk(const pg_id_t pg_id, const chunk_num_t chunkID) {
if (m_chunks.find(chunkID) == m_chunks.end()) {
// sanity check
LOGWARNMOD(homeobject, "No chunk found for ChunkID {}", chunkID);
return nullptr;
}

auto const pdevID = VChunk(m_chunks[chunkID]).get_pdev_id();
auto it = m_per_dev_heap.find(pdevID);
if (it == m_per_dev_heap.end()) {
LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID);
auto pg_it = m_per_pg_heap.find(pg_id);
if (pg_it == m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return nullptr;
}

auto vchunk = VChunk(nullptr);
auto& heap = it->second->m_heap;
if (auto lock_guard = std::lock_guard< std::mutex >(it->second->mtx); !heap.empty()) {

VChunk vchunk(nullptr);
auto& heap = pg_it->second->m_heap;
if (auto lock_guard = std::lock_guard< std::mutex >(pg_it->second->mtx); !heap.empty()) {
std::vector< VChunk > chunks;
chunks.reserve(heap.size());
while (!heap.empty()) {
Expand All @@ -140,28 +127,29 @@ csharedChunk HeapChunkSelector::select_specific_chunk(const chunk_num_t chunkID)
}

if (vchunk.get_internal_chunk()) {
auto& avalableBlkCounter = it->second->available_blk_count;
auto& avalableBlkCounter = pg_it->second->available_blk_count;
avalableBlkCounter.fetch_sub(vchunk.available_blks());
remove_chunk_from_defrag_heap(vchunk.get_chunk_id());
}

return vchunk.get_internal_chunk();
}

// Temporarily commented out, the subsequent GC implementation needs to be adapted to fix pg size
// most_defrag_chunk will only be called when GC is triggered, and will return the chunk with the most
// defrag blocks
csharedChunk HeapChunkSelector::most_defrag_chunk() {
chunk_num_t chunkID{0};
// chunk_num_t chunkID{0};
// the chunk might be seleted for creating shard. if this happens, we need to select another chunk
for (;;) {
{
std::lock_guard< std::mutex > lg(m_defrag_mtx);
if (m_defrag_heap.empty()) break;
chunkID = m_defrag_heap.top().get_chunk_id();
}
auto chunk = select_specific_chunk(chunkID);
if (chunk) return chunk;
}
// for (;;) {
// {
// std::lock_guard< std::mutex > lg(m_defrag_mtx);
// if (m_defrag_heap.empty()) break;
// chunkID = m_defrag_heap.top().get_chunk_id();
// }
// auto chunk = select_specific_chunk(chunkID);
// if (chunk) return chunk;
// }
return nullptr;
}

Expand All @@ -186,22 +174,151 @@ void HeapChunkSelector::foreach_chunks(std::function< void(csharedChunk&) >&& cb
[cb = std::move(cb)](auto& p) { cb(p.second); });
}

void HeapChunkSelector::release_chunk(const chunk_num_t chunkID) {
const auto& it = m_chunks.find(chunkID);
if (it == m_chunks.end()) {
void HeapChunkSelector::release_chunk(const pg_id_t pg_id, const chunk_num_t chunkID) {
if (m_chunks.find(chunkID) == m_chunks.end()) {
// sanity check
LOGWARNMOD(homeobject, "No chunk found for ChunkID {}", chunkID);
} else {
add_chunk_internal(chunkID);
return;
}
auto pg_it = m_per_pg_heap.find(pg_id);
if (pg_it == m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return;
}

const auto& chunk = m_chunks[chunkID];
VChunk vchunk(chunk);
{
std::lock_guard< std::mutex > l(pg_it->second->mtx);
auto& pg_heap = pg_it->second->m_heap;
pg_heap.emplace(chunk);
}
auto& avalableBlkCounter = pg_it->second->available_blk_count;
avalableBlkCounter.fetch_add(vchunk.available_blks());

}

void HeapChunkSelector::build_per_dev_chunk_heap(const std::unordered_set< chunk_num_t >& excludingChunks) {
for (const auto& p : m_chunks) {
uint32_t HeapChunkSelector::get_chunk_size() const {
const auto& chunk = m_chunks.begin()->second;
auto vchunk = VChunk(chunk);
return vchunk.size();
}

std::optional< homestore::chunk_num_t > HeapChunkSelector::select_chunks_for_pg(pg_id_t pg_id, u_int64_t pg_size) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
auto iter = m_per_pg_heap.find(pg_id);
RELEASE_ASSERT(iter == m_per_pg_heap.end(), "PG had already created, pg_id {}", pg_id);

const auto chunk_size = get_chunk_size();
const uint32_t num_chunk = sisl::round_down(pg_size, chunk_size) / chunk_size;

//Select a pdev with the most available blocks for it referenced from HeapChunkSelector::select_chunk"
auto &&most_avail_dev_it =
std::max_element(m_per_dev_heap.begin(), m_per_dev_heap.end(),
[](const std::pair< const uint32_t, std::shared_ptr< ChunkHeap > >& lhs,
const std::pair< const uint32_t, std::shared_ptr< ChunkHeap > >& rhs) {
return lhs.second->available_blk_count.load() < rhs.second->available_blk_count.load();
});
auto& pdev_heap = most_avail_dev_it->second;
if (num_chunk > pdev_heap->size()) {
LOGWARNMOD(homeobject, "Pdev has no enough space to create pg {} with num_chunk {}", pg_id, num_chunk);
return std::nullopt;
}
auto vchunk = VChunk(nullptr);
auto it = m_per_pg_heap.emplace(pg_id, std::make_shared< ChunkHeap >()).first;
auto v2r_vector = m_v2r_chunk_map.emplace(pg_id, std::make_shared< std::vector < chunk_num_t > >()).first->second;
auto r2v_map = m_r2v_chunk_map.emplace(pg_id, std::make_shared< ChunkIdMap >()).first->second;

auto& pg_heap = it->second;
std::scoped_lock lock(pdev_heap->mtx, pg_heap->mtx);
v2r_vector->reserve(num_chunk);
for (chunk_num_t i = 0; i < num_chunk; ++i) {
vchunk = pdev_heap->m_heap.top();
//sanity check
RELEASE_ASSERT(vchunk.get_total_blks() == vchunk.available_blks(), "vchunk should be empty");
pdev_heap->m_heap.pop();
pdev_heap->available_blk_count.fetch_sub(vchunk.available_blks());

pg_heap->m_heap.emplace(vchunk);
pg_heap->m_total_blks += vchunk.get_total_blks();
pg_heap->available_blk_count.fetch_add(vchunk.available_blks());
// v_chunk_id start from 0.
chunk_num_t v_chunk_id = i;
chunk_num_t r_chunk_id = vchunk.get_chunk_id();
v2r_vector->emplace_back(r_chunk_id);
r2v_map->emplace(r_chunk_id, v_chunk_id);
}

return num_chunk;
}

void HeapChunkSelector::set_pg_chunks(pg_id_t pg_id, std::vector<chunk_num_t>&& chunk_ids) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
if (m_v2r_chunk_map.find(pg_id) != m_v2r_chunk_map.end()) {
LOGWARNMOD(homeobject, "PG {} had been recovered", pg_id);
return;
}

auto v2r_vector = m_v2r_chunk_map.emplace(pg_id, std::make_shared< std::vector < chunk_num_t > >(std::move(chunk_ids))).first->second;
auto r2v_map = m_r2v_chunk_map.emplace(pg_id, std::make_shared< ChunkIdMap >()).first->second;

for (chunk_num_t i = 0; i < v2r_vector->size(); ++i) {
// v_chunk_id start from 0.
chunk_num_t v_chunk_id = i;
chunk_num_t r_chunk_id = (*v2r_vector)[i];
r2v_map->emplace(r_chunk_id, v_chunk_id);
}
}

void HeapChunkSelector::recover_per_dev_chunk_heap() {
std::unique_lock lock_guard(m_chunk_selector_mtx);
for (const auto& [chunk_id, _] : m_chunks) {
bool add_to_heap = true;
if (excludingChunks.find(p.first) != excludingChunks.end()) { add_to_heap = false; }
add_chunk_internal(p.first, add_to_heap);
};
for (const auto& [_, chunk_map] : m_r2v_chunk_map) {
if (chunk_map->find(chunk_id) != chunk_map->end()) {
add_to_heap = false;
break;
}
}
add_chunk_internal(chunk_id, add_to_heap);

}
}

void HeapChunkSelector::recover_pg_chunk_heap(pg_id_t pg_id, const std::unordered_set< chunk_num_t >& excludingChunks)
{
std::unique_lock lock_guard(m_chunk_selector_mtx);
if (m_per_pg_heap.find(pg_id) != m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "Pg_heap {} had been recovered", pg_id);
return;
}
auto it = m_v2r_chunk_map.find(pg_id);
if (it == m_v2r_chunk_map.end()) {
LOGWARNMOD(homeobject, "Pg_chunk_map {} had never been recovered", pg_id);
return;
}
const auto& chunk_ids = it->second;
auto& pg_heap = m_per_pg_heap.emplace(pg_id, std::make_shared< ChunkHeap >()).first->second;
for (const auto& chunk_id : *chunk_ids) {
if (excludingChunks.find(chunk_id) == excludingChunks.end()) {
const auto& chunk = m_chunks[chunk_id];
auto vchunk = VChunk(chunk);
pg_heap->m_heap.emplace(vchunk);
pg_heap->m_total_blks += vchunk.get_total_blks();
pg_heap->available_blk_count.fetch_add(vchunk.available_blks());
}
}
}

std::shared_ptr< const std::vector <homestore::chunk_num_t> > HeapChunkSelector::get_pg_chunks(pg_id_t pg_id) const {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto it = m_v2r_chunk_map.find(pg_id);
if (it != m_v2r_chunk_map.end()) {
return it->second;
} else {
LOGWARNMOD(homeobject, "PG {} had never been created", pg_id);
return nullptr;
}
}

homestore::blk_alloc_hints HeapChunkSelector::chunk_to_hints(chunk_num_t chunk_id) const {
Expand Down
40 changes: 35 additions & 5 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ class HeapChunkSelector : public homestore::ChunkSelector {

using VChunkHeap = std::priority_queue< VChunk, std::vector< VChunk >, VChunkComparator >;
using VChunkDefragHeap = std::priority_queue< VChunk, std::vector< VChunk >, VChunkDefragComparator >;
using ChunkIdMap = std::unordered_map < homestore::chunk_num_t, homestore::chunk_num_t >; // used for real chunk id -> virtual chunk id map
using chunk_num_t = homestore::chunk_num_t;

struct PerDevHeap {
struct ChunkHeap {
std::mutex mtx;
VChunkHeap m_heap;
std::atomic_size_t available_blk_count;
Expand All @@ -46,22 +47,41 @@ class HeapChunkSelector : public homestore::ChunkSelector {
};

void add_chunk(csharedChunk&) override;

void foreach_chunks(std::function< void(csharedChunk&) >&& cb) override;

csharedChunk select_chunk([[maybe_unused]] homestore::blk_count_t nblks, const homestore::blk_alloc_hints& hints);

// this function will be used by GC flow or recovery flow to mark one specific chunk to be busy, caller should be
// responsible to use release_chunk() interface to release it when no longer to use the chunk anymore.
csharedChunk select_specific_chunk(const chunk_num_t);
csharedChunk select_specific_chunk(const pg_id_t pg_id, const chunk_num_t);

// this function will be used by GC flow to select a chunk for GC
csharedChunk most_defrag_chunk();

// this function is used to return a chunk back to ChunkSelector when sealing a shard, and will only be used by
// Homeobject.
void release_chunk(const chunk_num_t);
void release_chunk(const pg_id_t pg_id, const chunk_num_t);

/**
* select chunks for pg, chunks need to be in same pdev.
*
* @param pg_id The ID of the pg.
* @param pg_size The fix pg size.
* @return An optional chunk_num_t value representing num_chunk, or std::nullopt if no space left.
*/
std::optional< chunk_num_t > select_chunks_for_pg(pg_id_t pg_id, u_int64_t pg_size);

std::shared_ptr< const std::vector <chunk_num_t> > get_pg_chunks(pg_id_t pg_id) const;

// this should be called on each pg meta blk found
void set_pg_chunks(pg_id_t pg_id, std::vector<chunk_num_t>&& chunk_ids);

// this should be called after all pg meta blk recovered
void recover_per_dev_chunk_heap();

// this should be called after ShardManager is initialized and get all the open shards
void build_per_dev_chunk_heap(const std::unordered_set< chunk_num_t >& excludingChunks);
void recover_pg_chunk_heap(pg_id_t pg_id, const std::unordered_set< chunk_num_t >& excludingChunks);

/**
* Retrieves the block allocation hints for a given chunk.
Expand Down Expand Up @@ -112,12 +132,22 @@ class HeapChunkSelector : public homestore::ChunkSelector {
*/
uint32_t total_chunks() const;

uint32_t get_chunk_size() const;

private:
std::unordered_map< uint32_t, std::shared_ptr< PerDevHeap > > m_per_dev_heap;
std::unordered_map< uint32_t, std::shared_ptr< ChunkHeap > > m_per_dev_heap;
std::unordered_map< pg_id_t, std::shared_ptr< ChunkHeap > > m_per_pg_heap;

// These mappings ensure "identical layout" by providing bidirectional indexing between virtual and real chunk IDs.
// m_v2r_chunk_map: Maps each pg_id to a vector of real chunk IDs (r_chunk_id). The index in the vector corresponds to the virtual chunk ID (v_chunk_id).
std::unordered_map< pg_id_t, std::shared_ptr< std::vector <chunk_num_t> > > m_v2r_chunk_map;
// m_r2v_chunk_map: Maps each pg_id to a map that inversely maps real chunk IDs (r_chunk_id) to virtual chunk IDs (v_chunk_id).
std::unordered_map< pg_id_t, std::shared_ptr< ChunkIdMap > > m_r2v_chunk_map;

// hold all the chunks , selected or not
std::unordered_map< chunk_num_t, csharedChunk > m_chunks;

mutable std::shared_mutex m_chunk_selector_mtx;
void add_chunk_internal(const chunk_num_t, bool add_to_heap = true);

VChunkDefragHeap m_defrag_heap;
Expand Down
Loading

0 comments on commit bf0f88b

Please sign in to comment.