Skip to content

Commit

Permalink
Fix bugs. (infiniflow#2188)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix bug issues.

Issue link:
Not solved: infiniflow#2119
infiniflow#2112
infiniflow#2066

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
small-turtle-1 authored Nov 7, 2024
1 parent b9179a3 commit 292cf84
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 24 deletions.
33 changes: 18 additions & 15 deletions python/restart_test/test_memidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ def part1(infinity_obj):
table_obj1.insert(
[{"c1": 2, "c2": [0.1, 0.2, 0.3, -0.2]} for i in range(6)]
)
table_obj1.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)])

infinity_obj.drop_database("db2", conflict_type=ConflictType.Ignore)
db_obj2 = infinity_obj.create_database("db2")
Expand All @@ -177,6 +176,10 @@ def part1(infinity_obj):
table_obj2.insert(
[{"c1": 2, "c2": [0.1, 0.2, 0.3, -0.2]} for i in range(6)]
)

# wait memidx1 dump
time.sleep(1)
table_obj1.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)])
table_obj2.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)])

part1()
Expand All @@ -186,25 +189,25 @@ def part2(infinity_obj):
# wait for optimize
time.sleep(3)

idx1_files = list(pathlib.Path(data_dir).rglob(f"*{idx1_name}*"))
idx2_files = list(pathlib.Path(data_dir).rglob(f"*{idx2_name}*"))
assert len(idx1_files) == 1
assert len(idx2_files) == 1
idx1_dirs = list(pathlib.Path(data_dir).rglob(f"*{idx1_name}*"))
idx2_dirs = list(pathlib.Path(data_dir).rglob(f"*{idx2_name}*"))
assert len(idx1_dirs) == 1
assert len(idx2_dirs) == 1

idx1_dir = idx1_files[0]
idx1_files_in_dir = list(idx1_dir.glob("*"))
assert len(idx1_files_in_dir) == 3
idx1_dir = idx1_dirs[0]
idx1_files = list(idx1_dir.glob("*"))
assert len(idx1_files) == 3

idx2_dir = idx2_files[0]
idx2_files_in_dir = list(idx2_dir.glob("*"))
assert len(idx2_files_in_dir) == 3
idx2_dir = idx2_dirs[0]
idx2_files = list(idx2_dir.glob("*"))
assert len(idx2_files) == 3

infinity_obj.cleanup()
idx1_files_in_dir = list(idx1_dir.glob("*"))
assert len(idx1_files_in_dir) == 1
idx1_files = list(idx1_dir.glob("*"))
assert len(idx1_files) == 1

idx2_files_in_dir = list(idx2_dir.glob("*"))
assert len(idx2_files_in_dir) == 1
idx2_files = list(idx2_dir.glob("*"))
assert len(idx2_files) == 1

part2()

Expand Down
4 changes: 2 additions & 2 deletions scripts/collect_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
print("Error: stdout file not found")
else:
if failure:
shutil.copy(stdout_path, f"{output_dir}/{random_name}_1.log")
shutil.copy(stdout_path, f"{output_dir}/{random_name}_stdout.log")
print(f"Last {show_lines} lines from {stdout_path}:")
with open(stdout_path, "r") as f:
lines = f.readlines()
Expand All @@ -63,7 +63,7 @@
print("Error: stderror file not found")
else:
if failure:
shutil.copy(stderror_path, f"{output_dir}/{random_name}_2.log")
shutil.copy(stderror_path, f"{output_dir}/{random_name}_stderror.log")
print(f"Last {show_lines} lines from {stderror_path}:")
with open(stderror_path, "r") as f:
lines = f.readlines()
Expand Down
2 changes: 2 additions & 0 deletions src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,14 @@ void InfinityContext::SetIndexThreadPool(SizeT thread_num) {
thread_num = thread_num / 2;
if (thread_num < 2)
thread_num = 2;
LOG_TRACE(fmt::format("Set index thread pool size to {}", thread_num));
inverting_thread_pool_.resize(thread_num);
commiting_thread_pool_.resize(thread_num);
hnsw_build_thread_pool_.resize(thread_num);
}

void InfinityContext::RestoreIndexThreadPoolToDefault() {
LOG_TRACE("Restore index thread pool size to default");
inverting_thread_pool_.resize(4);
commiting_thread_pool_.resize(2);
hnsw_build_thread_pool_.resize(4);
Expand Down
3 changes: 3 additions & 0 deletions src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ private:
template <typename Iter, typename Index>
static void InsertVecs(Index &index, Iter &&iter, const HnswInsertConfig &config, SizeT &mem_usage) {
auto &thread_pool = InfinityContext::instance().GetHnswBuildThreadPool();
if (thread_pool.size() == 0) {
UnrecoverableError("Hnsw build thread pool is not initialized.");
}
using T = std::decay_t<decltype(index)>;
if constexpr (!std::is_same_v<T, std::nullptr_t>) {
SizeT mem1 = index->mem_usage();
Expand Down
14 changes: 7 additions & 7 deletions src/storage/persistence/persistence_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ PersistReadResult PersistenceManager::GetObjCache(const String &file_path) {
}
result.cached_ = true;
} else if (ObjStat *obj_stat = objects_->Get(it->second.obj_key_); obj_stat != nullptr) {
LOG_TRACE(fmt::format("GetObjCache object {} ref count {}", it->second.obj_key_, obj_stat->ref_count_));
LOG_TRACE(fmt::format("GetObjCache object {}, file_path: {}, ref count {}", it->second.obj_key_, file_path, obj_stat->ref_count_));
String read_path = GetObjPath(result.obj_addr_.obj_key_);
if (!VirtualStore::Exists(read_path)) {
obj_stat->cached_ = false;
Expand Down Expand Up @@ -406,12 +406,6 @@ void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr,
return;
}
}
if (check_ref_count) {
if (obj_stat->ref_count_ > 0) {
String error_message = fmt::format("CleanupNoLock object {} ref count is {}", object_addr.obj_key_, obj_stat->ref_count_);
UnrecoverableError(error_message);
}
}
Range orig_range(object_addr.part_offset_, object_addr.part_offset_ + object_addr.part_size_);
Range range(orig_range);
auto inst_it = obj_stat->deleted_ranges_.lower_bound(range);
Expand Down Expand Up @@ -473,6 +467,12 @@ void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr,
String error_message = fmt::format("Failed to find object key");
UnrecoverableError(error_message);
}
if (check_ref_count) {
if (obj_stat->ref_count_ > 0) {
String error_message = fmt::format("CleanupNoLock object {} ref count is {}", object_addr.obj_key_, obj_stat->ref_count_);
UnrecoverableError(error_message);
}
}
drop_from_remote_keys.emplace_back(object_addr.obj_key_);
objects_->Invalidate(object_addr.obj_key_);
LOG_TRACE(fmt::format("Deleted object {}", object_addr.obj_key_));
Expand Down

0 comments on commit 292cf84

Please sign in to comment.