Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into HeapChunkSelector
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Sep 19, 2023
2 parents a9603af + 1cbf09d commit b2682fd
Show file tree
Hide file tree
Showing 24 changed files with 1,147 additions and 178 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "0.9.3"
version = "0.10.2"
homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
topics = ("ebay")
Expand Down
7 changes: 5 additions & 2 deletions src/include/homeobject/blob_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ namespace homeobject {
ENUM(BlobError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_SHARD, UNKNOWN_BLOB,
CHECKSUM_MISMATCH);

using unique_buffer = std::unique_ptr< sisl::byte_array_impl >;
struct Blob {
unique_buffer body;
Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o) : body(std::move(b)), user_key(u), object_off(o) {}

Blob clone() const;

sisl::io_blob_safe body;
std::string user_key;
uint64_t object_off;
std::optional< peer_id > current_leader{std::nullopt};
Expand Down
2 changes: 2 additions & 0 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ struct ShardInfo {
uint64_t deleted_capacity_bytes;
std::optional< peer_id > current_leader{std::nullopt};
};

using InfoList = std::list< ShardInfo >;

class ShardManager : public Manager< ShardError > {
public:
static uint64_t max_shard_size(); // Static function forces runtime evaluation.
static uint64_t max_shard_num_in_pg();

virtual AsyncResult< ShardInfo > get_shard(shard_id id) const = 0;
virtual AsyncResult< InfoList > list_shards(pg_id id) const = 0;
Expand Down
14 changes: 10 additions & 4 deletions src/lib/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,30 @@ BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id shard, blob_id con
uint64_t len) const {
return _get_shard(shard).thenValue([this, blob](auto const e) -> BlobManager::Result< Blob > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _get_blob(e.value(), blob);
return _get_blob(e.value().info, blob);
});
}

BlobManager::AsyncResult< blob_id > HomeObjectImpl::put(shard_id shard, Blob&& blob) {
return _get_shard(shard).thenValue(
[this, blob = std::move(blob)](auto const e) mutable -> BlobManager::Result< blob_id > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value(), std::move(blob));
if (ShardInfo::State::SEALED == e.value().info.state) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value().info, std::move(blob));
});
}

BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id shard, blob_id const& blob) {
return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullResult {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _del_blob(e.value(), blob);
return _del_blob(e.value().info, blob);
});
}

Blob Blob::clone() const {
auto new_body = sisl::io_blob_safe(body.size);
std::memcpy(new_body.bytes, body.bytes, body.size);
return Blob(std::move(new_body), user_key, object_off);
}

} // namespace homeobject
49 changes: 32 additions & 17 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@

#include <sisl/logging/logging.h>

template <>
struct std::hash< homeobject::ShardInfo > {
std::size_t operator()(homeobject::ShardInfo const& i) const noexcept { return std::hash< uint64_t >()(i.id); }
};

namespace home_replication {
class ReplicationService;
}
Expand All @@ -21,15 +16,29 @@ namespace homeobject {
inline bool operator<(ShardInfo const& lhs, ShardInfo const& rhs) { return lhs.id < rhs.id; }
inline bool operator==(ShardInfo const& lhs, ShardInfo const& rhs) { return lhs.id == rhs.id; }

struct Shard {
explicit Shard(ShardInfo info) : info(std::move(info)) {}
ShardInfo info;
uint16_t chunk_id;
void* metablk_cookie{nullptr};
};

using ShardList = std::list< Shard >;
using ShardIterator = ShardList::iterator;

struct PG {
explicit PG(PGInfo info) : pg_info(std::move(info)) {}
PGInfo pg_info;
uint64_t shard_sequence_num{0};
ShardList shards;
};

class HomeObjectImpl : public HomeObject,
public BlobManager,
public PGManager,
public ShardManager,
public std::enable_shared_from_this< HomeObjectImpl > {

std::mutex _repl_lock;
std::shared_ptr< home_replication::ReplicationService > _repl_svc;

/// Implementation defines these
virtual ShardManager::Result< ShardInfo > _create_shard(pg_id, uint64_t size_bytes) = 0;
virtual ShardManager::Result< ShardInfo > _seal_shard(shard_id) = 0;
Expand All @@ -38,35 +47,41 @@ class HomeObjectImpl : public HomeObject,
virtual BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const = 0;
virtual BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) = 0;
///

folly::Future< ShardManager::Result< ShardInfo > > _get_shard(shard_id id) const;
auto _defer() const { return folly::makeSemiFuture().via(folly::getGlobalCPUExecutor()); }
folly::Future< ShardManager::Result< Shard > > _get_shard(shard_id id) const;
auto _defer() const { return folly::makeSemiFuture().via(folly::getGlobalCPUExecutor());}

protected:
std::mutex _repl_lock;
std::shared_ptr< home_replication::ReplicationService > _repl_svc;
//std::shared_ptr<homestore::ReplicationService> _repl_svc;
peer_id _our_id;

/// Our SvcId retrieval and SvcId->IP mapping
std::weak_ptr< HomeObjectApplication > _application;

///
mutable std::shared_mutex _pg_lock;
using shard_set = std::unordered_set< ShardInfo >;
using pg_pair = std::pair< PGInfo, shard_set >;
std::map< pg_id, pg_pair > _pg_map;
std::map< pg_id, PG > _pg_map;

mutable std::shared_mutex _shard_lock;
std::map< shard_id, shard_set::const_iterator > _shard_map;
std::map< shard_id, ShardIterator > _shard_map;
///

PGManager::Result< PG > _get_pg(pg_id pg);
public:
explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application) :
_application(std::move(application)) {}

~HomeObjectImpl() override = default;
HomeObjectImpl(const HomeObjectImpl&) = delete;
HomeObjectImpl(HomeObjectImpl&&) noexcept = delete;
HomeObjectImpl& operator=(const HomeObjectImpl&) = delete;
HomeObjectImpl& operator=(HomeObjectImpl&&) noexcept = delete;

// This is public but not exposed in the API above
void init_repl_svc();

std::shared_ptr< home_replication::ReplicationService > get_repl_svc() { return _repl_svc;}

std::shared_ptr< BlobManager > blob_manager() final;
std::shared_ptr< PGManager > pg_manager() final;
std::shared_ptr< ShardManager > shard_manager() final;
Expand All @@ -82,7 +97,7 @@ class HomeObjectImpl : public HomeObject,
ShardManager::AsyncResult< ShardInfo > create_shard(pg_id pg_owner, uint64_t size_bytes) final;
ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const final;
ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) final;

uint64_t get_current_timestamp();
/// BlobManager
BlobManager::AsyncResult< blob_id > put(shard_id shard, Blob&&) final;
BlobManager::AsyncResult< Blob > get(shard_id shard, blob_id const& blob, uint64_t off, uint64_t len) const final;
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
blob_manager.cpp
shard_manager.cpp
heap_chunk_selector.cpp
replication_state_machine.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}_core>
)
target_link_libraries("${PROJECT_NAME}_homestore"
Expand Down
31 changes: 30 additions & 1 deletion src/lib/homestore/homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

#include <homestore/homestore.hpp>
#include <homestore/index_service.hpp>
#include <homestore/meta_service.hpp>
#include <iomgr/io_environment.hpp>

namespace homeobject {

const std::string HSHomeObject::s_shard_info_sub_type = "shard_info";

extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) {
LOGINFOMOD(homeobject, "Initializing HomeObject");
auto instance = std::make_shared< HSHomeObject >(std::move(application));
Expand Down Expand Up @@ -40,7 +43,10 @@ void HSHomeObject::init_homestore() {
uint32_t services = HS_SERVICE::META | HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL | HS_SERVICE::DATA;

bool need_format = HomeStore::instance()->start(
hs_input_params{.devices = device_info, .app_mem_size = app_mem_size, .services = services});
hs_input_params{.devices = device_info, .app_mem_size = app_mem_size, .services = services},
[this]() {
register_homestore_metablk_callback();
});

/// TODO how should this work?
LOGWARN("Persistence Looks Vacant, Formatting!!");
Expand All @@ -55,6 +61,17 @@ void HSHomeObject::init_homestore() {
}
}

void HSHomeObject::register_homestore_metablk_callback() {
//register some callbacks for metadata recovery;
using namespace homestore;
HomeStore::instance()->meta_service().register_handler(HSHomeObject::s_shard_info_sub_type,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_shard_meta_blk_found(mblk, buf, size);
},
nullptr,
true);
}

void HomeObjectImpl::init_repl_svc() {
auto lg = std::scoped_lock(_repl_lock);
if (!_repl_svc) {
Expand All @@ -72,4 +89,16 @@ HSHomeObject::~HSHomeObject() {
iomanager.stop();
}

void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
std::string shard_info_str;
shard_info_str.append(r_cast<const char*>(buf.bytes()), size);

auto shard = deserialize_shard(shard_info_str);
shard.metablk_cookie = mblk;

//As shard info in the homestore metablk is always the latest state(OPEN or SEALED),
//we can always create a shard from this shard info and once shard is deleted, the associated metablk will be deleted too.
do_commit_new_shard(shard);
}

} // namespace homeobject
30 changes: 29 additions & 1 deletion src/lib/homestore/homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

#include "lib/homeobject_impl.hpp"

namespace homestore {
struct meta_blk;
}

namespace homeobject {

class HSHomeObject : public HomeObjectImpl {
Expand All @@ -18,12 +22,36 @@ class HSHomeObject : public HomeObjectImpl {
BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const override;
BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) override;
///

mutable std::shared_mutex _flying_shard_lock;
std::map< int64_t, Shard > _flying_shards;
private:
shard_id generate_new_shard_id(pg_id pg);
shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num) const;
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id);
std::string serialize_shard(const Shard& shard) const;
Shard deserialize_shard(const std::string& shard_info_str) const;
void do_commit_new_shard(const Shard& shard);
void do_commit_seal_shard(const Shard& shard);
void register_homestore_metablk_callback();
void* get_shard_metablk(shard_id id);
public:
using HomeObjectImpl::HomeObjectImpl;
~HSHomeObject();

void init_homestore();

static const std::string s_shard_info_sub_type;
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size);

bool precheck_and_decode_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
std::string* msg);

void on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
void* user_ctx);
void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
void* user_ctx);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
void* user_ctx);
};

} // namespace homeobject
33 changes: 33 additions & 0 deletions src/lib/homestore/replication_message.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include "homeobject/common.hpp"

#include <sisl/utility/enum.hpp>
#include <isa-l/crc.h>

namespace homeobject {

ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSAGE, UNKNOWN_MESSAGE);

// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum'
static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34;
static constexpr uint32_t HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 = 0x01;
static constexpr uint32_t init_crc32 = 0;

#pragma pack(1)
struct ReplicationMessageHeader {
uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC};
uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1};
ReplicationMessageType message_type;
uint32_t payload_size;
uint32_t payload_crc;
uint8_t reserved_pad[6]{};
uint32_t header_crc;
uint32_t calculate_crc() const {
return crc32_ieee(init_crc32, r_cast<const unsigned char*>(this), sizeof(*this) - sizeof(header_crc));
}
};

#pragma pack()

}
60 changes: 60 additions & 0 deletions src/lib/homestore/replication_state_machine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include "replication_message.hpp"
#include "replication_state_machine.hpp"

namespace homeobject {

void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
const home_replication::pba_list_t& pbas, void* ctx) {
LOGINFO("applying raft log commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);

switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_shard_message_commit(lsn, header, key,ctx);
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
default: {
break;
}
}
}

void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {
LOGINFO("on_pre_commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);

switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_pre_commit_shard_msg(lsn, header, key,ctx);
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
default: {
break;
}
}
}

void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {
LOGINFO("rollback with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);

switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_rollback_shard_msg(lsn, header, key,ctx);
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
default: {
break;
}
}
}

void ReplicationStateMachine::on_replica_stop() {}

}
Loading

0 comments on commit b2682fd

Please sign in to comment.