Skip to content

Commit

Permalink
snapshot dump and load extended by generic_index::_next_id to prevent…
Browse files Browse the repository at this point in the history
… update_proposal failures on nodes configured using snapshot.
  • Loading branch information
vogel76 committed Oct 20, 2022
1 parent 69a9a74 commit 4a7df5f
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 25 deletions.
14 changes: 12 additions & 2 deletions libraries/chainbase/include/chainbase/chainbase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,15 @@ namespace chainbase {
const index_type& indicies()const { return _indices; }
int64_t revision()const { return _revision; }

id_type get_next_id() const
{
return _next_id;
}

void store_next_id(id_type next_id)
{
_next_id = next_id;
}

/**
* Restores the state to how it was prior to the current session discarding all changes
Expand Down Expand Up @@ -892,14 +901,15 @@ namespace chainbase {
virtual void dump_snapshot(snapshot_writer& writer) const override final
{
generic_index_snapshot_dumper<BaseIndex> dumper(_base, writer);
dumper.dump();
dumper.dump(_base.get_next_id());
}

virtual void load_snapshot(snapshot_reader& reader) override final
{
clear();
generic_index_snapshot_loader<BaseIndex> loader(_base, reader);
loader.load();
auto next_id = loader.load();
_base.store_next_id(next_id);
}

private:
Expand Down
32 changes: 22 additions & 10 deletions libraries/chainbase/include/chainbase/state_snapshot_support.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class snapshot_writer : public snapshot_base_serializer

typedef std::vector<worker*> workers;

virtual workers prepare(const std::string& indexDescription, size_t firstId, size_t lastId, size_t indexSize, snapshot_converter_t converter) = 0;
virtual workers prepare(const std::string& indexDescription, size_t firstId, size_t lastId, size_t indexSize, size_t indexNextId, snapshot_converter_t converter) = 0;
virtual void start(const workers& workers) = 0;

protected:
Expand Down Expand Up @@ -153,7 +153,7 @@ class snapshot_reader : public snapshot_base_serializer

typedef std::vector<worker*> workers;

virtual workers prepare(const std::string& indexDescription, snapshot_converter_t converter) = 0;
virtual workers prepare(const std::string& indexDescription, snapshot_converter_t converter, size_t* snapshot_index_next_id) = 0;
virtual void start(const workers& workers) = 0;

protected:
Expand Down Expand Up @@ -182,13 +182,15 @@ template <class GenericIndexType>
class generic_index_snapshot_dumper final : public generic_index_serialize_base
{
public:
using id_type = typename GenericIndexType::id_type;

generic_index_snapshot_dumper(const GenericIndexType& index, snapshot_writer& writer) :
_index(index),
_writer(writer) {}

void dump() const
void dump(id_type index_next_id) const
{
dump_index(_index.indices());
dump_index(index_next_id, _index.indices());
}

private:
Expand Down Expand Up @@ -292,7 +294,7 @@ class generic_index_snapshot_dumper final : public generic_index_serialize_base
};

template <class MultiIndexType>
void dump_index(const MultiIndexType& index) const
void dump_index(id_type index_next_id, const MultiIndexType& index) const
{
typedef dumper_data< MultiIndexType> dumper_t;

Expand All @@ -315,7 +317,7 @@ class generic_index_snapshot_dumper final : public generic_index_serialize_base
lastId = byIdIdx.rbegin()->get_id();
}

auto workers = _writer.prepare(indexName, firstId, lastId, index.size(), converter);
auto workers = _writer.prepare(indexName, firstId, lastId, index.size(), index_next_id, converter);

std::vector<std::unique_ptr<dumper_t>> workerData;

Expand All @@ -337,13 +339,19 @@ template <class GenericIndexType>
class generic_index_snapshot_loader final : public generic_index_serialize_base
{
public:
using id_type = typename GenericIndexType::id_type;

generic_index_snapshot_loader(GenericIndexType& index, snapshot_reader& reader) :
_index(index),
_reader(reader) {}

void load()
/// <summary>
/// Allows to load index contents from snapshot.
/// Returns index next_id value.
/// </summary>
id_type load()
{
load_index(_index.mutable_indices());
return load_index(_index.mutable_indices());
}

private:
Expand Down Expand Up @@ -411,7 +419,7 @@ class generic_index_snapshot_loader final : public generic_index_serialize_base
};

template <class MultiIndexType>
void load_index(MultiIndexType& index)
id_type load_index(MultiIndexType& index)
{
typedef loader_data<MultiIndexType> loader_t;

Expand All @@ -424,7 +432,9 @@ class generic_index_snapshot_loader final : public generic_index_serialize_base
actualData->doConversion(w);
};

auto workers = _reader.prepare(indexName, converter);
size_t index_next_id = 0;

auto workers = _reader.prepare(indexName, converter, &index_next_id);

std::vector<std::unique_ptr<loader_t>> workerData;

Expand All @@ -435,6 +445,8 @@ class generic_index_snapshot_loader final : public generic_index_serialize_base
}

_reader.start(workers);

return id_type(index_next_id);
}

private:
Expand Down
71 changes: 58 additions & 13 deletions libraries/plugins/state_snapshot/state_snapshot_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

namespace bpo = boost::program_options;

#define SNAPSHOT_FORMAT_VERSION "2.0"

namespace {

namespace bfs = boost::filesystem;
Expand All @@ -57,6 +59,10 @@ struct index_manifest_info
size_t dumpedItems = 0;
size_t firstId = 0;
size_t lastId = 0;
/** \warning indexNextId must be then explicitly loaded to generic_index::next_id to conform proposal ID rules. next_id can be different (higher)
than last_object_id + 1 due to removing proposals, which does not correct next_id
*/
size_t indexNextId = 0;
std::vector<index_manifest_file_info> storage_files;
};

Expand Down Expand Up @@ -230,7 +236,7 @@ class snapshot_load_supplement_helper final : public hive::plugins::chain::snaps

} /// namespace anonymous

FC_REFLECT(index_manifest_info, (name)(dumpedItems)(firstId)(lastId)(storage_files))
FC_REFLECT(index_manifest_info, (name)(dumpedItems)(firstId)(lastId)(indexNextId)(storage_files))
FC_REFLECT(index_manifest_file_info, (relative_path)(file_size))

namespace hive { namespace plugins { namespace state_snapshot {
Expand Down Expand Up @@ -413,14 +419,14 @@ class index_dump_writer final : public snapshot_processor_data<chainbase::snapsh
index_dump_writer(const chain::database& mainDb, const chainbase::abstract_index& index, const bfs::path& outputRootPath,
bool allow_concurrency) :
snapshot_processor_data<chainbase::snapshot_writer>(outputRootPath), _mainDb(mainDb), _index(index), _firstId(0), _lastId(0),
_allow_concurrency(allow_concurrency) {}
_nextId(0), _allow_concurrency(allow_concurrency) {}

index_dump_writer(const index_dump_writer&) = delete;
index_dump_writer& operator=(const index_dump_writer&) = delete;

virtual ~index_dump_writer() = default;

virtual workers prepare(const std::string& indexDescription, size_t firstId, size_t lastId, size_t indexSize,
virtual workers prepare(const std::string& indexDescription, size_t firstId, size_t lastId, size_t indexSize, size_t indexNextId,
snapshot_converter_t converter) override;
virtual void start(const workers& workers) override;

Expand All @@ -437,6 +443,7 @@ class index_dump_writer final : public snapshot_processor_data<chainbase::snapsh
std::vector <std::unique_ptr<dumping_worker>> _builtWorkers;
size_t _firstId;
size_t _lastId;
size_t _nextId;
bool _allow_concurrency;
};

Expand All @@ -452,7 +459,7 @@ class index_dump_reader final : public snapshot_processor_data<chainbase::snapsh

virtual ~index_dump_reader() = default;

virtual workers prepare(const std::string& indexDescription, snapshot_converter_t converter) override;
virtual workers prepare(const std::string& indexDescription, snapshot_converter_t converter, size_t* snapshot_index_next_id) override;
virtual void start(const workers& workers) override;

size_t getCurrentlyProcessedId() const;
Expand Down Expand Up @@ -604,18 +611,19 @@ void dumping_worker::prepareWriter()

chainbase::snapshot_writer::workers
index_dump_writer::prepare(const std::string& indexDescription, size_t firstId, size_t lastId, size_t indexSize,
snapshot_converter_t converter)
size_t indexNextId, snapshot_converter_t converter)
{
if(indexSize == 0 || process_index(indexDescription) == false)
return workers();

ilog("Preparing snapshot writer to store index holding `${d}' items. Index size: ${s}. Index id range: <${f}, ${l}>.",
("d", indexDescription)("s", indexSize)("f", firstId)("l", lastId));
ilog("Preparing snapshot writer to store index holding `${d}' items. Index size: ${s}. Index next_id: ${indexNextId}.Index id range: <${f}, ${l}>.",
("d", indexDescription)("s", indexSize)(indexNextId)("f", firstId)("l", lastId));

_converter = converter;
_indexDescription = indexDescription;
_firstId = firstId;
_lastId = lastId;
_nextId = indexNextId;

if(indexSize == 0 || process_index(indexDescription) == false)
return workers();

chainbase::snapshot_writer::workers retVal;

Expand Down Expand Up @@ -706,6 +714,7 @@ void index_dump_writer::store_index_manifest(index_manifest_info* manifest) cons
manifest->dumpedItems = _index.size();
manifest->firstId = _firstId;
manifest->lastId = _lastId;
manifest->indexNextId = _nextId;

size_t totalWrittenEntries = 0;

Expand All @@ -721,6 +730,8 @@ void index_dump_writer::store_index_manifest(index_manifest_info* manifest) cons

FC_ASSERT(_index.size() == totalWrittenEntries, "Mismatch between written entries: ${e} and size ${s} of index: `${i}",
("e", totalWrittenEntries)("s", _index.size())("i", _indexDescription));

ilog("Saved manifest for index: '${d}' containing ${s} items and ${n} saved as next_id", ("d", _indexDescription)("s", manifest->dumpedItems)("n", manifest->indexNextId));
}

class loading_worker final : public chainbase::snapshot_reader::worker
Expand Down Expand Up @@ -833,7 +844,7 @@ void loading_worker::perform_load()
}

chainbase::snapshot_reader::workers
index_dump_reader::prepare(const std::string& indexDescription, snapshot_converter_t converter)
index_dump_reader::prepare(const std::string& indexDescription, snapshot_converter_t converter, size_t* snapshot_index_next_id)
{
_converter = converter;
_indexDescription = indexDescription;
Expand All @@ -850,6 +861,8 @@ index_dump_reader::prepare(const std::string& indexDescription, snapshot_convert

const index_manifest_info& manifestInfo = *snapshotIt;

*snapshot_index_next_id = manifestInfo.indexNextId;

_builtWorkers.emplace_back(std::make_unique<loading_worker>(manifestInfo, _rootPath, *this));

workers retVal;
Expand Down Expand Up @@ -1059,6 +1072,20 @@ void state_snapshot_plugin::impl::store_snapshot_manifest(const bfs::path& actua
}
}

{
Slice key("SNAPSHOT_VERSION");
Slice value(SNAPSHOT_FORMAT_VERSION);
auto status = db->Put(writeOptions, snapshotManifestCF, key, value);

if (status.ok() == false)
{
elog("Cannot write an index manifest entry to output file: `${p}'. Error details: `${e}'.", ("p", manifestDbPath.string())("e", status.ToString()));
ilog("Failing key value: \"SNAPSHOT_VERSION\"");

throw std::exception();
}
}

db.close();
}

Expand Down Expand Up @@ -1124,7 +1151,7 @@ std::tuple<snapshot_manifest, plugin_external_data_index, uint32_t> state_snapsh

FC_ASSERT(keySlice.ToString() == info.name);

ilog("Loaded manifest info for index ${i} having storage files: ${sf}", ("i", info.name)("sf", info.storage_files));
ilog("Loaded manifest info for index ${i}, containing ${s} items, next_id: ${n}, having storage files: ${sf}", ("i", info.name)("s", info.dumpedItems)("n", info.indexNextId)("sf", info.storage_files));

retVal.emplace(std::move(info));

Expand Down Expand Up @@ -1180,15 +1207,33 @@ std::tuple<snapshot_manifest, plugin_external_data_index, uint32_t> state_snapsh
FC_ASSERT(irreversibleStateIterator->Valid(), "No entry for IRREVERSIBLE_STATE. Probably used old snapshot format (must be regenerated).");

std::vector<char> buffer;
Slice keySlice = irreversibleStateIterator->key();
auto valueSlice = irreversibleStateIterator->value();

std::string keyName = keySlice.ToString();;

FC_ASSERT(keyName == "LAST_IRREVERSIBLE_BLOCK", "Broken snapshot - no entry for LAST_IRREVERSIBLE_BLOCK");

buffer.insert(buffer.end(), valueSlice.data(), valueSlice.data() + valueSlice.size());
chainbase::serialization::unpack_from_buffer(lib, buffer);
buffer.clear();
//ilog("lib: ${s}", ("s", lib));

irreversibleStateIterator->Next();
FC_ASSERT(irreversibleStateIterator->Valid() == false, "Multiple entries specifying irreversible block ?");
FC_ASSERT(irreversibleStateIterator->Valid(), "Expected multiple entries specifying irreversible block.");

keySlice = irreversibleStateIterator->key();
valueSlice = irreversibleStateIterator->value();
keyName = keySlice.ToString();

FC_ASSERT(keyName == "SNAPSHOT_VERSION", "Broken snapshot - no entry for SNAPSHOT_VERSION, ${k} found.", ("k", keyName));

std::string versionValue = valueSlice.ToString();
FC_ASSERT(versionValue == SNAPSHOT_FORMAT_VERSION, "Snapshot version mismatch - ${f} found, ${e} expected.", ("f", versionValue)("e", SNAPSHOT_FORMAT_VERSION));

irreversibleStateIterator->Next();

FC_ASSERT(irreversibleStateIterator->Valid() == false, "Unexpected entries specifying irreversible block ?");
}

for(auto* cfh : cfHandles)
Expand Down

0 comments on commit 4a7df5f

Please sign in to comment.