Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Sep 17, 2023
1 parent 828e534 commit 7f32734
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 29 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def build_requirements(self):
self.build_requires("gtest/1.14.0")

def requirements(self):
self.requires("homestore/[~=4, include_prerelease=True]@oss/master")
self.requires("homestore/[~=4, include_prerelease=True]@user/testing")
self.requires("sisl/[~=10, include_prerelease=True]@oss/master")
# Remove when HomeStore Replication Service is mature
self.requires("nuraft_mesg/[~=1, include_prerelease=True]@oss/main")
Expand Down
73 changes: 52 additions & 21 deletions src/lib/homestore/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
@@ -1,38 +1,69 @@
#include "heap_chunk_selector.h"

#include <execution>
#include <algorithm>
#include <sisl/logging/logging.h>

namespace homeobject {

//add_chunk may be called in Homeobject(when sealing a shard) or in Homestore(when adding chunks to vdev) concurently
void HeapChunkSelector::add_chunk(csharedChunk& chunk) {
uint32_t pdevId = VChunk(chunk).get_pdev_id();
auto it = _pdev_heap_map.find(pdevId);
if (it == _pdev_heap_map.end()) it = _pdev_heap_map.try_emplace(pdevId, std::make_shared<std::pair<std::mutex, VChunkHeap>>()).first;
std::unique_lock<std::mutex> l(it->second->first);
VChunk vchunk(chunk);
auto&& pdevId = vchunk.get_pdev_id();
auto&& [it, ok] = m_pdev_heap_map.try_emplace(pdevId, std::make_shared<std::pair<std::mutex, VChunkHeap>>());

{
std::lock_guard<std::mutex> l(lock);
if(ok) m_pdev_avalable_blk_map.emplace(pdevId, vchunk.available_blks());
else m_pdev_avalable_blk_map[pdevId] += vchunk.available_blks();
m_chunks.emplace(chunk);
}

std::lock_guard<std::mutex> l(it->second->first);
it->second->second.emplace(chunk);
}

//select_chunk will only be called in homestore when allocating a block
csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const homestore::blk_alloc_hints& hint) {
auto it = _pdev_heap_map.find(hint.dev_id_hint);
if (it == _pdev_heap_map.end()) return nullptr;
std::unique_lock<std::mutex> l(it->second->first);
if(it->second->second.empty()) return nullptr;
auto pdevID = hint.dev_id_hint;
if(pdevID == homestore::INVALID_DEV_ID) {
// this is the first shard of this pg, select a pdev with the most available blocks for it
std::lock_guard<std::mutex> l(lock);
auto it = std::max_element(m_pdev_avalable_blk_map.begin(), m_pdev_avalable_blk_map.end(),
[](const std::pair<uint32_t, uint32_t>& lhs, const std::pair<uint32_t, uint32_t>& rhs) {
return lhs.second < rhs.second; }
);
pdevID = it->first;
}

auto it = m_pdev_heap_map.find(pdevID);
if (it == m_pdev_heap_map.end()) {
LOGINFO("No pdev found for pdev {}", pdevID);
return nullptr;
}


it->second->first.lock();
if(it->second->second.empty()) {
LOGINFO("No more available chunks found for pdev {}", pdevID);
return nullptr;
}
auto& vchunk = it->second->second.top();
it->second->second.pop();
it->second->first.unlock();

{
std::lock_guard<std::mutex> l(lock);
m_pdev_avalable_blk_map[pdevID] -= vchunk.available_blks();
}

return vchunk.get_internal_chunk();
}

void HeapChunkSelector::foreach_chunks(std::function< void(csharedChunk&) >&& cb) {
VChunkHeap tempHeap;
for(auto& p : _pdev_heap_map) {
std::unique_lock<std::mutex> l(p.second->first);
auto& heap = p.second->second;
while(!heap.empty()) {
auto& vchunk = heap.top();
heap.pop();
cb(vchunk.get_internal_chunk());
tempHeap.emplace(std::move(vchunk));
}
heap.swap(tempHeap);
}
//we should call `cb` on all the chunks, selected or not
std::lock_guard<std::mutex> l(lock);
std::for_each(std::execution::par_unseq, m_chunks.begin(), m_chunks.end(),
[cb = std::move(cb)](const VChunk& p){ cb(p.get_internal_chunk()); });
}

} // namespace homeobject
25 changes: 18 additions & 7 deletions src/lib/homestore/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,27 @@
#include <mutex>
#include <utility>
#include <memory>
#include <map>
#include <functional>

#include <folly/concurrency/ConcurrentHashMap.h>

using VChunk = homestore::VChunk;

template <>
struct std::hash< VChunk > {
std::size_t operator()(VChunk const& v) const noexcept { return std::hash< uint16_t >()(v.get_chunk_id()); }
};

namespace homeobject {

using csharedChunk = homestore::cshared<homestore::Chunk>;

class HeapChunkSelector : public homestore::ChunkSelector {
public:
HeapChunkSelector() = default;
HeapChunkSelector(const HeapChunkSelector&) = delete;
HeapChunkSelector(HeapChunkSelector&&) noexcept = delete;
HeapChunkSelector& operator=(const HeapChunkSelector&) = delete;
HeapChunkSelector& operator=(HeapChunkSelector&&) noexcept = delete;
~HeapChunkSelector() = default;

using VChunk = homestore::VChunk;

class VChunkComparator {
public:
bool operator()(VChunk& lhs, VChunk& rhs) {
Expand All @@ -41,6 +44,14 @@ class HeapChunkSelector : public homestore::ChunkSelector {
csharedChunk select_chunk([[maybe_unused]]homestore::blk_count_t nblks, const homestore::blk_alloc_hints& hints);

private:
folly::ConcurrentHashMap< uint32_t, std::shared_ptr<std::pair< std::mutex, VChunkHeap > > >_pdev_heap_map;
folly::ConcurrentHashMap< uint32_t, std::shared_ptr< std::pair< std::mutex, VChunkHeap > > > m_pdev_heap_map;
//for now, uint32_t is enough for the sum of all the available blocks of a pdev.
//if necessary , we can change this to uint64_t to hold a larger sum.
std::unordered_map< uint32_t, uint32_t> m_pdev_avalable_blk_map;

//hold all the chunks , selected or not
std::unordered_set< VChunk > m_chunks;
std::mutex lock;

};
} // namespace homeobject

0 comments on commit 7f32734

Please sign in to comment.