Skip to content

Commit

Permalink
sparse: remove raw data cache in sparse inverted index
Browse files Browse the repository at this point in the history
To reduce memory usage, remove the raw data cache in sparse inverted index.
Note that config param `drop_ratio_build` and GetVectorByIds() are also be
removed.

Signed-off-by: Shawn Wang <[email protected]>
  • Loading branch information
sparknack committed Jan 3, 2025
1 parent 1cb3f0e commit 75ee982
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 338 deletions.
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ include(cmake/libs/libhnsw.cmake)

include_directories(thirdparty/faiss)

find_package(Boost REQUIRED)
include_directories(${Boost_INCLUDE_DIRS})

find_package(OpenMP REQUIRED)

find_package(folly REQUIRED)
Expand Down Expand Up @@ -177,6 +180,7 @@ endif()
include_directories(src)
include_directories(include)

list(APPEND KNOWHERE_LINKER_LIBS Boost::boost)
list(APPEND KNOWHERE_LINKER_LIBS faiss)
list(APPEND KNOWHERE_LINKER_LIBS glog::glog)
list(APPEND KNOWHERE_LINKER_LIBS nlohmann_json::nlohmann_json)
Expand Down
95 changes: 93 additions & 2 deletions include/knowhere/sparse_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <cstddef>
#include <cstdint>
#include <cstring>
Expand Down Expand Up @@ -59,6 +60,31 @@ GetDocValueBM25Computer(float k1, float b, float avgdl) {
};
}

class DocIdFilterByVector {
public:
DocIdFilterByVector(std::vector<table_t>&& docids) : docids_(std::move(docids)) {
std::sort(docids_.begin(), docids_.end());
}

[[nodiscard]] bool
test(const table_t id) {
// find the first id that is greater than or equal to the specific id
while (pos_ < docids_.size() && docids_[pos_] < id) {
++pos_;
}
return !(pos_ < docids_.size() && docids_[pos_] == id);
}

[[nodiscard]] bool
empty() const {
return docids_.empty();
}

private:
std::vector<table_t> docids_;
size_t pos_ = 0;
};

template <typename T>
class SparseRow {
static_assert(std::is_same_v<T, fp32>, "SparseRow supports float only");
Expand All @@ -72,6 +98,15 @@ class SparseRow {
SparseRow(size_t count, uint8_t* data, bool own_data) : data_(data), count_(count), own_data_(own_data) {
}

SparseRow(const std::vector<std::pair<table_t, T>>& data) : count_(data.size()), own_data_(true) {
data_ = new uint8_t[count_ * element_size()];
for (size_t i = 0; i < count_; ++i) {
auto* elem = reinterpret_cast<ElementProxy*>(data_) + i;
elem->index = data[i].first;
elem->value = data[i].second;
}
}

// copy constructor and copy assignment operator perform deep copy
SparseRow(const SparseRow<T>& other) : SparseRow(other.count_) {
std::memcpy(data_, other.data_, data_byte_size());
Expand Down Expand Up @@ -147,6 +182,9 @@ class SparseRow {

void
set_at(size_t i, table_t index, T value) {
if (i >= count_) {
throw std::out_of_range("set_at on a SparseRow with invalid index");
}
auto* elem = reinterpret_cast<ElementProxy*>(data_) + i;
elem->index = index;
elem->value = value;
Expand Down Expand Up @@ -300,12 +338,12 @@ class GrowableVectorView {
mmap_element_count_ = 0;
}

size_type
[[nodiscard]] size_type
capacity() const {
return mmap_byte_size_ / sizeof(T);
}

size_type
[[nodiscard]] size_type
size() const {
return mmap_element_count_;
}
Expand Down Expand Up @@ -346,6 +384,59 @@ class GrowableVectorView {
return reinterpret_cast<const T*>(mmap_data_)[i];
}

class iterator : public boost::iterator_facade<iterator, T, boost::random_access_traversal_tag, T&> {
public:
iterator() = default;
explicit iterator(T* ptr) : ptr_(ptr) {
}

friend class GrowableVectorView;
friend class boost::iterator_core_access;

T&
dereference() const {
return *ptr_;
}

void
increment() {
++ptr_;
}

void
decrement() {
--ptr_;
}

void
advance(std::ptrdiff_t n) {
ptr_ += n;
}

std::ptrdiff_t
distance_to(const iterator& other) const {
return other.ptr_ - ptr_;
}

bool
equal(const iterator& other) const {
return ptr_ == other.ptr_;
}

private:
T* ptr_ = nullptr;
};

iterator
begin() const {
return iterator(reinterpret_cast<T*>(mmap_data_));
}

iterator
end() const {
return iterator(reinterpret_cast<T*>(mmap_data_) + mmap_element_count_);
}

private:
void* mmap_data_ = nullptr;
size_type mmap_byte_size_ = 0;
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_readme():
get_numpy_include(),
os.path.join("..", "include"),
os.path.join("..", "thirdparty"),
get_thirdparty_prefix("boost-headers") + "/include",
get_thirdparty_prefix("nlohmann_json") + "/include",
get_thirdparty_prefix("libglog") + "/include",
get_thirdparty_prefix("gflags") + "/include"
Expand Down
58 changes: 14 additions & 44 deletions src/index/sparse/sparse_index_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,12 @@ class SparseInvertedIndexNode : public IndexNode {
LOG_KNOWHERE_ERROR_ << Type() << " only support metric_type IP or BM25";
return Status::invalid_metric_type;
}
auto drop_ratio_build = cfg.drop_ratio_build.value_or(0.0f);
auto index_or = CreateIndex</*mmapped=*/false>(cfg);
if (!index_or.has_value()) {
return index_or.error();
}
auto index = index_or.value();
index->Train(static_cast<const sparse::SparseRow<T>*>(dataset->GetTensor()), dataset->GetRows(),
drop_ratio_build);
index->Train(static_cast<const sparse::SparseRow<T>*>(dataset->GetTensor()), dataset->GetRows());
if (index_ != nullptr) {
LOG_KNOWHERE_WARNING_ << Type() << " has already been created, deleting old";
DeleteExistingIndex();
Expand Down Expand Up @@ -209,37 +207,17 @@ class SparseInvertedIndexNode : public IndexNode {

[[nodiscard]] expected<DataSetPtr>
GetVectorByIds(const DataSetPtr dataset) const override {
if (!index_) {
return expected<DataSetPtr>::Err(Status::empty_index, "index not loaded");
}

auto rows = dataset->GetRows();
auto ids = dataset->GetIds();

auto data = std::make_unique<sparse::SparseRow<T>[]>(rows);
int64_t dim = 0;
try {
for (int64_t i = 0; i < rows; ++i) {
auto& target = data[i];
index_->GetVectorById(ids[i], target);
dim = std::max(dim, target.dim());
}
} catch (std::exception& e) {
return expected<DataSetPtr>::Err(Status::invalid_args, "GetVectorByIds failed");
}
auto res = GenResultDataSet(rows, dim, data.release());
res->SetIsSparse(true);
return res;
return expected<DataSetPtr>::Err(Status::not_implemented, "GetVectorByIds not implemented");
}

[[nodiscard]] bool
HasRawData(const std::string& metric_type) const override {
return true;
return false;
}

[[nodiscard]] expected<DataSetPtr>
GetIndexMeta(std::unique_ptr<Config> cfg) const override {
throw std::runtime_error("GetIndexMeta not supported for current index type");
return expected<DataSetPtr>::Err(Status::not_implemented, "GetIndexMeta not supported for current index type");
}

Status
Expand Down Expand Up @@ -284,29 +262,33 @@ class SparseInvertedIndexNode : public IndexNode {
}
auto cfg = static_cast<const knowhere::SparseInvertedIndexConfig&>(*config);
auto reader = knowhere::FileReader(filename);
map_size_ = reader.size();
size_t map_size = reader.size();
int map_flags = MAP_SHARED;
#ifdef MAP_POPULATE
if (cfg.enable_mmap_pop.has_value() && cfg.enable_mmap_pop.value()) {
map_flags |= MAP_POPULATE;
}
#endif
map_ = static_cast<char*>(mmap(nullptr, map_size_, PROT_READ, map_flags, reader.descriptor(), 0));
if (map_ == MAP_FAILED) {
char* map = static_cast<char*>(mmap(nullptr, map_size, PROT_READ, map_flags, reader.descriptor(), 0));
if (map == MAP_FAILED) {
LOG_KNOWHERE_ERROR_ << "Failed to mmap file: " << strerror(errno);
return Status::disk_file_error;
}
if (madvise(map_, map_size_, MADV_RANDOM) != 0) {
if (madvise(map, map_size, MADV_RANDOM) != 0) {
LOG_KNOWHERE_WARNING_ << "Failed to madvise file: " << strerror(errno);
}
auto index_or = CreateIndex</*mmapped=*/true>(cfg);
if (!index_or.has_value()) {
return index_or.error();
}
index_ = index_or.value();
MemoryIOReader map_reader((uint8_t*)map_, map_size_);
MemoryIOReader map_reader((uint8_t*)map, map_size);
auto supplement_target_filename = filename + ".knowhere_sparse_index_supplement";
return index_->Load(map_reader, map_flags, supplement_target_filename);
auto status = index_->Load(map_reader, map_flags, supplement_target_filename);
if (munmap(map, map_size) != 0) {
LOG_KNOWHERE_ERROR_ << "Failed to munmap when trying to delete index: " << strerror(errno);
}
return status;
}

static std::unique_ptr<BaseConfig>
Expand Down Expand Up @@ -367,23 +349,11 @@ class SparseInvertedIndexNode : public IndexNode {
delete index_;
index_ = nullptr;
}
if (map_ != nullptr) {
auto res = munmap(map_, map_size_);
if (res != 0) {
LOG_KNOWHERE_ERROR_ << "Failed to munmap when trying to delete index: " << strerror(errno);
}
map_ = nullptr;
map_size_ = 0;
}
}

sparse::BaseInvertedIndex<T>* index_{};
std::shared_ptr<ThreadPool> search_pool_;
std::shared_ptr<ThreadPool> build_pool_;

// if map_ is not nullptr, it means the index is mmapped from disk.
char* map_ = nullptr;
size_t map_size_ = 0;
}; // class SparseInvertedIndexNode

// Concurrent version of SparseInvertedIndexNode
Expand Down
Loading

0 comments on commit 75ee982

Please sign in to comment.