Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement fix pg size #220

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.6"
version = "2.1.7"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down Expand Up @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.4]@oss/master")
self.requires("homestore/[^6.5]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
234 changes: 180 additions & 54 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void HeapChunkSelector::add_chunk_internal(const chunk_num_t chunkID, bool add_t
auto pdevID = vchunk.get_pdev_id();
// add this find here, since we don`t want to call make_shared in try_emplace every time.
auto it = m_per_dev_heap.find(pdevID);
if (it == m_per_dev_heap.end()) { it = m_per_dev_heap.emplace(pdevID, std::make_shared< PerDevHeap >()).first; }
if (it == m_per_dev_heap.end()) { it = m_per_dev_heap.emplace(pdevID, std::make_shared< ChunkHeap >()).first; }

// build total blks for every chunk on this device;
it->second->m_total_blks += vchunk.get_total_blks();
Expand Down Expand Up @@ -59,31 +59,20 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const
return nullptr;
}

// shardid -> chunkid map is maintained by ShardManager
// pg_id->pdev_id map is maintained by PgManager
// chunselector will not take care of the two maps for now.
uint32_t pdevID = 0;
auto& pdevIdHint = hint.pdev_id_hint;
if (!pdevIdHint.has_value()) {
// this is the first shard of this pg, select a pdev with the most available blocks for it
auto&& it =
std::max_element(m_per_dev_heap.begin(), m_per_dev_heap.end(),
[](const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& lhs,
const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& rhs) {
return lhs.second->available_blk_count.load() < rhs.second->available_blk_count.load();
});
if (it == m_per_dev_heap.end()) {
LOGWARNMOD(homeobject, "No pdev found for new pg");
return nullptr;
}
pdevID = it->first;
std::shared_lock lock_guard(m_chunk_selector_mtx);
// FIXME @Hooper: Temporary bypass using pdev_id_hint to represent pg_id_hint, "identical layout" will change it
pg_id_t pg_id = 0;
auto& pg_id_hint = hint.pdev_id_hint;
if (!pg_id_hint.has_value()) {
LOGWARNMOD(homeobject, "should not allocated a chunk without exiting pg_id in hint!");
return nullptr;
} else {
pdevID = pdevIdHint.value();
pg_id = pg_id_hint.value();
}

auto it = m_per_dev_heap.find(pdevID);
if (it == m_per_dev_heap.end()) {
LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID);
auto it = m_per_pg_heap.find(pg_id);
if (it == m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return nullptr;
}

Expand All @@ -99,29 +88,29 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const
avalableBlkCounter.fetch_sub(vchunk.available_blks());
remove_chunk_from_defrag_heap(vchunk.get_chunk_id());
} else {
LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID);
LOGWARNMOD(homeobject, "no available chunks left for pg {}", pg_id);
}

return vchunk.get_internal_chunk();
}

csharedChunk HeapChunkSelector::select_specific_chunk(const chunk_num_t chunkID) {
csharedChunk HeapChunkSelector::select_specific_chunk(const pg_id_t pg_id, const chunk_num_t chunkID) {
Hooper9973 marked this conversation as resolved.
Show resolved Hide resolved
if (m_chunks.find(chunkID) == m_chunks.end()) {
// sanity check
LOGWARNMOD(homeobject, "No chunk found for ChunkID {}", chunkID);
return nullptr;
}

auto const pdevID = VChunk(m_chunks[chunkID]).get_pdev_id();
auto it = m_per_dev_heap.find(pdevID);
if (it == m_per_dev_heap.end()) {
LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID);
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_heap.find(pg_id);
if (pg_it == m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return nullptr;
}

auto vchunk = VChunk(nullptr);
auto& heap = it->second->m_heap;
if (auto lock_guard = std::lock_guard< std::mutex >(it->second->mtx); !heap.empty()) {
VChunk vchunk(nullptr);
auto& heap = pg_it->second->m_heap;
if (auto lock_guard = std::lock_guard< std::mutex >(pg_it->second->mtx); !heap.empty()) {
std::vector< VChunk > chunks;
chunks.reserve(heap.size());
while (!heap.empty()) {
Expand All @@ -133,35 +122,35 @@ csharedChunk HeapChunkSelector::select_specific_chunk(const chunk_num_t chunkID)
}
chunks.push_back(std::move(c));
}

for (auto& c : chunks) {
heap.emplace(c);
}
}

if (vchunk.get_internal_chunk()) {
auto& avalableBlkCounter = it->second->available_blk_count;
auto& avalableBlkCounter = pg_it->second->available_blk_count;
avalableBlkCounter.fetch_sub(vchunk.available_blks());
remove_chunk_from_defrag_heap(vchunk.get_chunk_id());
}

return vchunk.get_internal_chunk();
}

// Temporarily commented out, the subsequent GC implementation needs to be adapted to fix pg size
// most_defrag_chunk will only be called when GC is triggered, and will return the chunk with the most
// defrag blocks
csharedChunk HeapChunkSelector::most_defrag_chunk() {
chunk_num_t chunkID{0};
// chunk_num_t chunkID{0};
// the chunk might be seleted for creating shard. if this happens, we need to select another chunk
for (;;) {
{
std::lock_guard< std::mutex > lg(m_defrag_mtx);
if (m_defrag_heap.empty()) break;
chunkID = m_defrag_heap.top().get_chunk_id();
}
auto chunk = select_specific_chunk(chunkID);
if (chunk) return chunk;
}
// for (;;) {
// {
// std::lock_guard< std::mutex > lg(m_defrag_mtx);
// if (m_defrag_heap.empty()) break;
// chunkID = m_defrag_heap.top().get_chunk_id();
// }
// auto chunk = select_specific_chunk(chunkID);
// if (chunk) return chunk;
// }
return nullptr;
}

Expand All @@ -186,22 +175,155 @@ void HeapChunkSelector::foreach_chunks(std::function< void(csharedChunk&) >&& cb
[cb = std::move(cb)](auto& p) { cb(p.second); });
}

void HeapChunkSelector::release_chunk(const chunk_num_t chunkID) {
const auto& it = m_chunks.find(chunkID);
if (it == m_chunks.end()) {
void HeapChunkSelector::release_chunk(const pg_id_t pg_id, const chunk_num_t chunkID) {
std::shared_lock lock_guard(m_chunk_selector_mtx);
if (m_chunks.find(chunkID) == m_chunks.end()) {
// sanity check
LOGWARNMOD(homeobject, "No chunk found for ChunkID {}", chunkID);
} else {
add_chunk_internal(chunkID);
return;
}

auto pg_it = m_per_pg_heap.find(pg_id);
if (pg_it == m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return;
}

const auto& chunk = m_chunks[chunkID];
VChunk vchunk(chunk);
{
std::lock_guard< std::mutex > l(pg_it->second->mtx);
auto& pg_heap = pg_it->second->m_heap;
pg_heap.emplace(chunk);
}
auto& avalableBlkCounter = pg_it->second->available_blk_count;
avalableBlkCounter += vchunk.available_blks();

}

uint32_t HeapChunkSelector::get_chunk_size() const {
const auto& chunk = m_chunks.begin()->second;
auto vchunk = VChunk(chunk);
return vchunk.size();
}

void HeapChunkSelector::build_per_dev_chunk_heap(const std::unordered_set< chunk_num_t >& excludingChunks) {
for (const auto& p : m_chunks) {
std::optional< uint32_t > HeapChunkSelector::select_chunks_for_pg(pg_id_t pg_id, u_int64_t pg_size) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
if (m_per_pg_heap.find(pg_id) != m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "PG had already created, pg_id {}", pg_id);
return std::nullopt;
}

const auto chunk_size = get_chunk_size();
const uint32_t num_chunk = sisl::round_down(pg_size, chunk_size) / chunk_size;

//Select a pdev with the most available num chunk
auto &&most_avail_dev_it =
std::max_element(m_per_dev_heap.begin(), m_per_dev_heap.end(),
[](const std::pair< const uint32_t, std::shared_ptr< ChunkHeap > >& lhs,
const std::pair< const uint32_t, std::shared_ptr< ChunkHeap > >& rhs) {
return lhs.second->size() < rhs.second->size();
});
auto& pdev_heap = most_avail_dev_it->second;
if (num_chunk > pdev_heap->size()) {
LOGWARNMOD(homeobject, "Pdev has no enough space to create pg {} with num_chunk {}", pg_id, num_chunk);
return std::nullopt;
}
auto vchunk = VChunk(nullptr);
auto it = m_per_pg_heap.emplace(pg_id, std::make_shared< ChunkHeap >()).first;
auto v2r_vector = m_v2r_chunk_map.emplace(pg_id, std::make_shared< std::vector < chunk_num_t > >()).first->second;
auto r2v_map = m_r2v_chunk_map.emplace(pg_id, std::make_shared< ChunkIdMap >()).first->second;
JacksonYao287 marked this conversation as resolved.
Show resolved Hide resolved

auto& pg_heap = it->second;
std::scoped_lock lock(pdev_heap->mtx, pg_heap->mtx);
v2r_vector->reserve(num_chunk);
for (chunk_num_t i = 0; i < num_chunk; ++i) {
vchunk = pdev_heap->m_heap.top();
Hooper9973 marked this conversation as resolved.
Show resolved Hide resolved
//sanity check
RELEASE_ASSERT(vchunk.get_total_blks() == vchunk.available_blks(), "vchunk should be empty");
pdev_heap->m_heap.pop();
pdev_heap->available_blk_count -= vchunk.available_blks();

pg_heap->m_heap.emplace(vchunk);
pg_heap->m_total_blks += vchunk.get_total_blks();
pg_heap->available_blk_count += vchunk.available_blks();
// v_chunk_id start from 0.
chunk_num_t v_chunk_id = i;
chunk_num_t r_chunk_id = vchunk.get_chunk_id();
v2r_vector->emplace_back(r_chunk_id);
r2v_map->emplace(r_chunk_id, v_chunk_id);
}

return num_chunk;
}

void HeapChunkSelector::set_pg_chunks(pg_id_t pg_id, std::vector<chunk_num_t>&& chunk_ids) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
if (m_v2r_chunk_map.find(pg_id) != m_v2r_chunk_map.end()) {
LOGWARNMOD(homeobject, "PG {} had been recovered", pg_id);
return;
}

auto v2r_vector = m_v2r_chunk_map.emplace(pg_id, std::make_shared< std::vector < chunk_num_t > >(std::move(chunk_ids))).first->second;
auto r2v_map = m_r2v_chunk_map.emplace(pg_id, std::make_shared< ChunkIdMap >()).first->second;

for (chunk_num_t i = 0; i < v2r_vector->size(); ++i) {
// v_chunk_id start from 0.
chunk_num_t v_chunk_id = i;
chunk_num_t r_chunk_id = (*v2r_vector)[i];
r2v_map->emplace(r_chunk_id, v_chunk_id);
}
}

void HeapChunkSelector::recover_per_dev_chunk_heap() {
std::unique_lock lock_guard(m_chunk_selector_mtx);
for (const auto& [chunk_id, _] : m_chunks) {
bool add_to_heap = true;
if (excludingChunks.find(p.first) != excludingChunks.end()) { add_to_heap = false; }
add_chunk_internal(p.first, add_to_heap);
};
for (const auto& [_, chunk_map] : m_r2v_chunk_map) {
if (chunk_map->find(chunk_id) != chunk_map->end()) {
add_to_heap = false;
break;
}
}
add_chunk_internal(chunk_id, add_to_heap);

}
}

void HeapChunkSelector::recover_pg_chunk_heap(pg_id_t pg_id, const std::unordered_set< chunk_num_t >& excludingChunks)
{
std::unique_lock lock_guard(m_chunk_selector_mtx);
if (m_per_pg_heap.find(pg_id) != m_per_pg_heap.end()) {
LOGWARNMOD(homeobject, "Pg_heap {} had been recovered", pg_id);
return;
}
auto it = m_v2r_chunk_map.find(pg_id);
if (it == m_v2r_chunk_map.end()) {
LOGWARNMOD(homeobject, "Pg_chunk_map {} had never been recovered", pg_id);
return;
}
const auto& chunk_ids = it->second;
auto& pg_heap = m_per_pg_heap.emplace(pg_id, std::make_shared< ChunkHeap >()).first->second;
for (const auto& chunk_id : *chunk_ids) {
if (excludingChunks.find(chunk_id) == excludingChunks.end()) {
const auto& chunk = m_chunks[chunk_id];
auto vchunk = VChunk(chunk);
pg_heap->m_heap.emplace(vchunk);
pg_heap->m_total_blks += vchunk.get_total_blks();
pg_heap->available_blk_count += vchunk.available_blks();
}
}
}

std::shared_ptr< const std::vector <homestore::chunk_num_t> > HeapChunkSelector::get_pg_chunks(pg_id_t pg_id) const {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto it = m_v2r_chunk_map.find(pg_id);
if (it != m_v2r_chunk_map.end()) {
return it->second;
} else {
LOGWARNMOD(homeobject, "PG {} had never been created", pg_id);
return nullptr;
}
}

homestore::blk_alloc_hints HeapChunkSelector::chunk_to_hints(chunk_num_t chunk_id) const {
Expand All @@ -217,6 +339,7 @@ homestore::blk_alloc_hints HeapChunkSelector::chunk_to_hints(chunk_num_t chunk_i

// return the maximum number of chunks that can be allocated on pdev
uint32_t HeapChunkSelector::most_avail_num_chunks() const {
std::shared_lock lock_guard(m_chunk_selector_mtx);
uint32_t max_avail_num_chunks = 0ul;
for (auto const& [_, pdev_heap] : m_per_dev_heap) {
max_avail_num_chunks = std::max(max_avail_num_chunks, pdev_heap->size());
Expand All @@ -226,6 +349,7 @@ uint32_t HeapChunkSelector::most_avail_num_chunks() const {
}

uint32_t HeapChunkSelector::avail_num_chunks(uint32_t dev_id) const {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto it = m_per_dev_heap.find(dev_id);
if (it == m_per_dev_heap.end()) {
LOGWARNMOD(homeobject, "No pdev found for pdev {}", dev_id);
Expand All @@ -238,6 +362,7 @@ uint32_t HeapChunkSelector::avail_num_chunks(uint32_t dev_id) const {
uint32_t HeapChunkSelector::total_chunks() const { return m_chunks.size(); }

uint64_t HeapChunkSelector::avail_blks(std::optional< uint32_t > dev_it) const {
std::shared_lock lock_guard(m_chunk_selector_mtx);
if (!dev_it.has_value()) {
uint64_t max_avail_blks = 0ull;
for (auto const& [_, heap] : m_per_dev_heap) {
Expand All @@ -257,6 +382,7 @@ uint64_t HeapChunkSelector::avail_blks(std::optional< uint32_t > dev_it) const {
}

uint64_t HeapChunkSelector::total_blks(uint32_t dev_id) const {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto it = m_per_dev_heap.find(dev_id);
if (it == m_per_dev_heap.end()) {
LOGWARNMOD(homeobject, "No pdev found for pdev {}", dev_id);
Expand Down
Loading
Loading