From 67094b73b8556a3cbe6255c5984ab7d1363ea7f8 Mon Sep 17 00:00:00 2001 From: zh Wang Date: Wed, 22 Feb 2023 17:37:04 +0800 Subject: [PATCH] Split queries in IVF and BF Signed-off-by: zh Wang --- knowhere/archive/BruteForce.cpp | 205 +++++++++++------- knowhere/common/ThreadPool.h | 11 + knowhere/index/VecIndexFactory.cpp | 8 +- knowhere/index/vector_index/FaissBaseIndex.h | 3 + knowhere/index/vector_index/IndexIDMAP.cpp | 52 ++++- knowhere/index/vector_index/IndexIVF.cpp | 54 ++++- .../index/vector_offset_index/IndexIVF_NM.cpp | 58 ++++- .../vector_offset_index/OffsetBaseIndex.h | 3 + thirdparty/ctpl/ctpl-std.h | 1 - 9 files changed, 276 insertions(+), 119 deletions(-) diff --git a/knowhere/archive/BruteForce.cpp b/knowhere/archive/BruteForce.cpp index 19cd9603f..6e40e73df 100644 --- a/knowhere/archive/BruteForce.cpp +++ b/knowhere/archive/BruteForce.cpp @@ -49,53 +49,67 @@ BruteForce::Search(const DatasetPtr base_dataset, const DatasetPtr query_dataset auto distances = new float[nq * topk]; auto pool = ThreadPool::GetGlobalThreadPool(); - auto future = pool->push([&] { - switch (faiss_metric_type) { - case faiss::METRIC_L2: { - faiss::float_maxheap_array_t buf{(size_t)nq, (size_t)topk, labels, distances}; - faiss::knn_L2sqr((const float*)xq, (const float*)xb, dim, nq, nb, &buf, nullptr, bitset); - break; - } - case faiss::METRIC_INNER_PRODUCT: { - faiss::float_minheap_array_t buf{(size_t)nq, (size_t)topk, labels, distances}; - faiss::knn_inner_product((const float*)xq, (const float*)xb, dim, nq, nb, &buf, bitset); - break; - } - case faiss::METRIC_Jaccard: - case faiss::METRIC_Tanimoto: { - faiss::float_maxheap_array_t res = {size_t(nq), size_t(topk), labels, distances}; - binary_distance_knn_hc(faiss::METRIC_Jaccard, &res, (const uint8_t*)xq, (const uint8_t*)xb, nb, dim / 8, - bitset); - - if (faiss_metric_type == faiss::METRIC_Tanimoto) { - for (int i = 0; i < topk * nq; i++) { - distances[i] = faiss::Jaccard_2_Tanimoto(distances[i]); + std::vector> futs; + for (int i = 0; i < nq; ++i) { + futs.push_back(pool->push([&, index = i] { + ThreadPool::ScopedOmpSetter setter(1); + auto cur_labels = labels + topk * index; + auto cur_distances = distances + topk * index; + switch (faiss_metric_type) { + case faiss::METRIC_L2: { + auto cur_query = (const float*)xq + dim * index; + faiss::float_maxheap_array_t buf{(size_t)1, (size_t)topk, cur_labels, cur_distances}; + faiss::knn_L2sqr(cur_query, (const float*)xb, dim, 1, nb, &buf, nullptr, bitset); + break; + } + case faiss::METRIC_INNER_PRODUCT: { + auto cur_query = (const float*)xq + dim * index; + faiss::float_minheap_array_t buf{(size_t)1, (size_t)topk, cur_labels, cur_distances}; + faiss::knn_inner_product(cur_query, (const float*)xb, dim, 1, nb, &buf, bitset); + break; + } + case faiss::METRIC_Jaccard: + case faiss::METRIC_Tanimoto: { + auto cur_query = (const uint8_t*)xq + (dim / 8) * index; + faiss::float_maxheap_array_t res = {size_t(1), size_t(topk), cur_labels, cur_distances}; + binary_distance_knn_hc(faiss::METRIC_Jaccard, &res, cur_query, (const uint8_t*)xb, nb, dim / 8, + bitset); + + if (faiss_metric_type == faiss::METRIC_Tanimoto) { + for (int i = 0; i < topk; i++) { + cur_distances[i] = faiss::Jaccard_2_Tanimoto(distances[i]); + } } + break; } - break; - } - case faiss::METRIC_Hamming: { - std::vector int_distances(nq * topk); - faiss::int_maxheap_array_t res = {size_t(nq), size_t(topk), labels, int_distances.data()}; - binary_distance_knn_hc(faiss::METRIC_Hamming, &res, (const uint8_t*)xq, (const uint8_t*)xb, nb, dim / 8, - bitset); - for (int i = 0; i < nq * topk; ++i) { - distances[i] = int_distances[i]; + case faiss::METRIC_Hamming: { + auto cur_query = (const uint8_t*)xq + (dim / 8) * index; + std::vector int_distances(topk); + faiss::int_maxheap_array_t res = {size_t(1), size_t(topk), cur_labels, int_distances.data()}; + binary_distance_knn_hc(faiss::METRIC_Hamming, &res, (const uint8_t*)cur_query, (const uint8_t*)xb, + nb, dim / 8, bitset); + for (int i = 0; i < topk; ++i) { + cur_distances[i] = int_distances[i]; + } + break; } - break; - } - case faiss::METRIC_Substructure: - case faiss::METRIC_Superstructure: { - // only matched ids will be chosen, not to use heap - binary_distance_knn_mc(faiss_metric_type, (const uint8_t*)xq, (const uint8_t*)xb, nq, nb, topk, dim / 8, - distances, labels, bitset); - break; + case faiss::METRIC_Substructure: + case faiss::METRIC_Superstructure: { + // only matched ids will be chosen, not to use heap + auto cur_query = (const uint8_t*)xq + (dim / 8) * index; + binary_distance_knn_mc(faiss_metric_type, cur_query, (const uint8_t*)xb, 1, nb, topk, dim / 8, + cur_distances, cur_labels, bitset); + break; + } + default: + KNOWHERE_THROW_MSG("BruteForce search not support metric type: " + metric_type); } - default: - KNOWHERE_THROW_MSG("BruteForce search not support metric type: " + metric_type); - } - }); - future.get(); + })); + } + for (auto& fut : futs) { + fut.get(); + } + return GenResultDataset(labels, distances); } @@ -118,52 +132,79 @@ BruteForce::RangeSearch(const DatasetPtr base_dataset, auto faiss_metric_type = GetFaissMetricType(metric_type); bool is_ip = false; - - faiss::RangeSearchResult res(nq); + bool range_filter_exist = CheckKeyInConfig(config, meta::RANGE_FILTER); + float range_filter = range_filter_exist ? GetMetaRangeFilter(config) : (1.0 / 0.0); auto pool = ThreadPool::GetGlobalThreadPool(); - auto future = pool->push([&] { - switch (faiss_metric_type) { - case faiss::METRIC_L2: - faiss::range_search_L2sqr((const float*)xq, (const float*)xb, dim, nq, nb, radius, &res, bitset); - break; - case faiss::METRIC_INNER_PRODUCT: - is_ip = true; - faiss::range_search_inner_product((const float*)xq, (const float*)xb, dim, nq, nb, radius, &res, - bitset); - break; - case faiss::METRIC_Jaccard: - faiss::binary_range_search, float>(faiss::METRIC_Jaccard, - (const uint8_t*)xq, (const uint8_t*)xb, - nq, nb, radius, dim / 8, &res, bitset); - break; - case faiss::METRIC_Tanimoto: - faiss::binary_range_search, float>(faiss::METRIC_Tanimoto, - (const uint8_t*)xq, (const uint8_t*)xb, - nq, nb, radius, dim / 8, &res, bitset); - break; - case faiss::METRIC_Hamming: - faiss::binary_range_search, int>(faiss::METRIC_Hamming, (const uint8_t*)xq, - (const uint8_t*)xb, nq, nb, (int)radius, - dim / 8, &res, bitset); - break; - default: - KNOWHERE_THROW_MSG("BruteForce range search not support metric type: " + metric_type); - } - }); - future.get(); + + std::vector> result_id_array(nq); + std::vector> result_dist_array(nq); + std::vector result_size(nq); + std::vector result_lims(nq + 1); + std::vector> futs; + futs.reserve(nq); + for (int i = 0; i < nq; ++i) { + futs.push_back(pool->push([&, index = i] { + ThreadPool::ScopedOmpSetter setter(1); + faiss::RangeSearchResult res(1); + switch (faiss_metric_type) { + case faiss::METRIC_L2: { + auto cur_query = (const float*)xq + dim * index; + faiss::range_search_L2sqr(cur_query, (const float*)xb, dim, 1, nb, radius, &res, bitset); + break; + } + case faiss::METRIC_INNER_PRODUCT: { + is_ip = true; + auto cur_query = (const float*)xq + dim * index; + faiss::range_search_inner_product(cur_query, (const float*)xb, dim, 1, nb, radius, &res, bitset); + break; + } + case faiss::METRIC_Jaccard: { + auto cur_query = (const uint8_t*)xq + (dim / 8) * index; + faiss::binary_range_search, float>( + faiss::METRIC_Jaccard, cur_query, (const uint8_t*)xb, 1, nb, radius, dim / 8, &res, bitset); + break; + } + case faiss::METRIC_Tanimoto: { + auto cur_query = (const uint8_t*)xq + (dim / 8) * index; + faiss::binary_range_search, float>( + faiss::METRIC_Tanimoto, cur_query, (const uint8_t*)xb, 1, nb, radius, dim / 8, &res, bitset); + break; + } + case faiss::METRIC_Hamming: { + auto cur_query = (const uint8_t*)xq + (dim / 8) * index; + faiss::binary_range_search, int>(faiss::METRIC_Hamming, cur_query, + (const uint8_t*)xb, 1, nb, (int)radius, + dim / 8, &res, bitset); + break; + } + default: + KNOWHERE_THROW_MSG("BruteForce range search not support metric type: " + metric_type); + } + + auto elem_cnt = res.lims[1]; + result_dist_array[index].resize(elem_cnt); + result_id_array[index].resize(elem_cnt); + result_size[index] = elem_cnt; + for (size_t j = 0; j < elem_cnt; j++) { + result_dist_array[index][j] = res.distances[j]; + result_id_array[index][j] = res.labels[j]; + } + if (range_filter_exist) { + FilterRangeSearchResultForOneNq(result_dist_array[index], result_id_array[index], is_ip, radius, + range_filter); + } + })); + } + for (auto& fut : futs) { + fut.get(); + } float* distances = nullptr; int64_t* labels = nullptr; size_t* lims = nullptr; - if (CheckKeyInConfig(config, meta::RANGE_FILTER)) { - auto range_filter = GetMetaRangeFilter(config); - GetRangeSearchResult(res, is_ip, nq, radius, range_filter, distances, labels, lims, bitset); - } else { - GetRangeSearchResult(res, is_ip, nq, radius, distances, labels, lims); - } - + GetRangeSearchResult(result_dist_array, result_id_array, is_ip, nq, radius, range_filter, distances, labels, lims); return GenResultDataset(labels, distances, lims); } diff --git a/knowhere/common/ThreadPool.h b/knowhere/common/ThreadPool.h index 569b9709c..c7fd414bf 100644 --- a/knowhere/common/ThreadPool.h +++ b/knowhere/common/ThreadPool.h @@ -58,6 +58,17 @@ class ThreadPool { static std::shared_ptr GetGlobalThreadPool(); + class ScopedOmpSetter { + int omp_before; + public: + explicit ScopedOmpSetter(int num_threads = 1) : omp_before(omp_get_num_threads()) { + omp_set_num_threads(num_threads); + } + ~ScopedOmpSetter() { + omp_set_num_threads(omp_before); + } + }; + private: std::unique_ptr pool_; }; diff --git a/knowhere/index/VecIndexFactory.cpp b/knowhere/index/VecIndexFactory.cpp index b59e8fd55..66c42d8dc 100644 --- a/knowhere/index/VecIndexFactory.cpp +++ b/knowhere/index/VecIndexFactory.cpp @@ -47,13 +47,13 @@ VecIndexFactory::CreateVecIndex(const IndexType& type, const IndexMode mode) { } else if (type == IndexEnum::INDEX_FAISS_BIN_IVFFLAT) { return std::make_shared(std::make_unique()); } else if (type == IndexEnum::INDEX_FAISS_IDMAP) { - return std::make_shared(std::make_unique()); + return std::make_shared(); } else if (type == IndexEnum::INDEX_FAISS_IVFFLAT) { - return std::make_shared(std::make_unique()); + return std::make_shared(); } else if (type == IndexEnum::INDEX_FAISS_IVFPQ) { - return std::make_shared(std::make_unique()); + return std::make_shared(); } else if (type == IndexEnum::INDEX_FAISS_IVFSQ8) { - return std::make_shared(std::make_unique()); + return std::make_shared(); } else if (type == IndexEnum::INDEX_ANNOY) { return std::make_shared(); } else if (type == IndexEnum::INDEX_HNSW) { diff --git a/knowhere/index/vector_index/FaissBaseIndex.h b/knowhere/index/vector_index/FaissBaseIndex.h index 76513e7b8..dd45f1556 100644 --- a/knowhere/index/vector_index/FaissBaseIndex.h +++ b/knowhere/index/vector_index/FaissBaseIndex.h @@ -17,6 +17,7 @@ #include #include "knowhere/common/BinarySet.h" +#include "knowhere/common/ThreadPool.h" #include "knowhere/index/IndexType.h" namespace knowhere { @@ -24,6 +25,7 @@ namespace knowhere { class FaissBaseIndex { protected: explicit FaissBaseIndex(std::shared_ptr index) : index_(std::move(index)) { + pool_ = ThreadPool::GetGlobalThreadPool(); } virtual BinarySet @@ -37,6 +39,7 @@ class FaissBaseIndex { public: std::shared_ptr index_ = nullptr; + std::shared_ptr pool_; }; } // namespace knowhere diff --git a/knowhere/index/vector_index/IndexIDMAP.cpp b/knowhere/index/vector_index/IndexIDMAP.cpp index b44958406..66114ae5e 100644 --- a/knowhere/index/vector_index/IndexIDMAP.cpp +++ b/knowhere/index/vector_index/IndexIDMAP.cpp @@ -219,7 +219,16 @@ IDMAP::QueryImpl(int64_t n, int64_t* labels, const Config& config, const faiss::BitsetView bitset) { - index_->search(n, data, k, distances, labels, bitset); + std::vector> futs; + for (int i = 0; i < n; ++i) { + futs.push_back(pool_->push([&, index = i] { + ThreadPool::ScopedOmpSetter setter(1); + index_->search(1, data + index * Dim(), k, distances + index * k, labels + index * k, bitset); + })); + } + for (auto& fut : futs) { + fut.get(); + } } void @@ -233,16 +242,39 @@ IDMAP::QueryByRangeImpl(int64_t n, auto idmap_index = dynamic_cast(index_.get()); float radius = GetMetaRadius(config); bool is_ip = (idmap_index->metric_type == faiss::METRIC_INNER_PRODUCT); - - faiss::RangeSearchResult res(n); - idmap_index->range_search(n, reinterpret_cast(data), radius, &res, bitset); - - if (CheckKeyInConfig(config, meta::RANGE_FILTER)) { - float range_filter = GetMetaRangeFilter(config); - GetRangeSearchResult(res, is_ip, n, radius, range_filter, distances, labels, lims, bitset); - } else { - GetRangeSearchResult(res, is_ip, n, radius, distances, labels, lims); + bool range_filter_exist = CheckKeyInConfig(config, meta::RANGE_FILTER); + float range_filter = range_filter_exist ? GetMetaRangeFilter(config) : (1.0 / 0.0); + + std::vector> result_id_array(n); + std::vector> result_dist_array(n); + std::vector result_size(n); + std::vector result_lims(n + 1); + std::vector> futs; + futs.reserve(n); + for (int i = 0; i < n; ++i) { + futs.push_back(pool_->push([&, index = i] { + ThreadPool::ScopedOmpSetter setter(1); + faiss::RangeSearchResult ret(1); + idmap_index->range_search(1, data + index * Dim(), radius, &ret, bitset); + auto elem_cnt = ret.lims[1]; + result_dist_array[index].resize(elem_cnt); + result_id_array[index].resize(elem_cnt); + result_size[index] = elem_cnt; + for (size_t j = 0; j < elem_cnt; j++) { + result_dist_array[index][j] = ret.distances[j]; + result_id_array[index][j] = ret.labels[j]; + } + if (range_filter_exist) { + FilterRangeSearchResultForOneNq(result_dist_array[index], result_id_array[index], is_ip, radius, + range_filter); + } + })); + } + for (auto& fut : futs) { + fut.get(); } + + GetRangeSearchResult(result_dist_array, result_id_array, is_ip, n, radius, range_filter, distances, labels, lims); } } // namespace knowhere diff --git a/knowhere/index/vector_index/IndexIVF.cpp b/knowhere/index/vector_index/IndexIVF.cpp index aa9b54278..16de8b420 100644 --- a/knowhere/index/vector_index/IndexIVF.cpp +++ b/knowhere/index/vector_index/IndexIVF.cpp @@ -319,7 +319,17 @@ IVF::QueryImpl(int64_t n, } size_t max_codes = 0; auto ivf_stats = std::dynamic_pointer_cast(stats); - ivf_index->search_thread_safe(n, xq, k, distances, labels, params->nprobe, parallel_mode, max_codes, bitset); + std::vector> futs; + for (int i = 0; i < n; ++i) { + futs.push_back(pool_->push([&, index = i] { + ThreadPool::ScopedOmpSetter setter(1); + ivf_index->search_thread_safe(1, xq + index * Dim(), k, distances + index * k, labels + index * k, + params->nprobe, parallel_mode, max_codes, bitset); + })); + } + for (auto& fut : futs) { + fut.get(); + } #if 0 stdclock::time_point after = stdclock::now(); double search_cost = (std::chrono::duration(after - before)).count(); @@ -367,16 +377,40 @@ IVF::QueryByRangeImpl(int64_t n, float radius = GetMetaRadius(config); bool is_ip = (ivf_index->metric_type == faiss::METRIC_INNER_PRODUCT); - - faiss::RangeSearchResult res(n); - ivf_index->range_search_thread_safe(n, xq, radius, &res, params->nprobe, parallel_mode, max_codes, bitset); - - if (CheckKeyInConfig(config, meta::RANGE_FILTER)) { - float range_filter = GetMetaRangeFilter(config); - GetRangeSearchResult(res, is_ip, n, radius, range_filter, distances, labels, lims, bitset); - } else { - GetRangeSearchResult(res, is_ip, n, radius, distances, labels, lims); + bool range_filter_exist = CheckKeyInConfig(config, meta::RANGE_FILTER); + float range_filter = range_filter_exist ? GetMetaRangeFilter(config) : (1.0 / 0.0); + + std::vector> result_id_array(n); + std::vector> result_dist_array(n); + std::vector result_size(n); + std::vector result_lims(n + 1); + std::vector> futs; + futs.reserve(n); + for (int i = 0; i < n; ++i) { + futs.push_back(pool_->push([&, index = i] { + ThreadPool::ScopedOmpSetter setter(1); + faiss::RangeSearchResult ret(1); + ivf_index->range_search_thread_safe(1, xq + index * Dim(), radius, &ret, params->nprobe, parallel_mode, + max_codes, bitset); + auto elem_cnt = ret.lims[1]; + result_dist_array[index].resize(elem_cnt); + result_id_array[index].resize(elem_cnt); + result_size[index] = elem_cnt; + for (size_t j = 0; j < elem_cnt; j++) { + result_dist_array[index][j] = ret.distances[j]; + result_id_array[index][j] = ret.labels[j]; + } + if (range_filter_exist) { + FilterRangeSearchResultForOneNq(result_dist_array[index], result_id_array[index], is_ip, radius, + range_filter); + } + })); } + for (auto& fut : futs) { + fut.get(); + } + + GetRangeSearchResult(result_dist_array, result_id_array, is_ip, n, radius, range_filter, distances, labels, lims); } void diff --git a/knowhere/index/vector_offset_index/IndexIVF_NM.cpp b/knowhere/index/vector_offset_index/IndexIVF_NM.cpp index cde0a76e7..8d440e225 100644 --- a/knowhere/index/vector_offset_index/IndexIVF_NM.cpp +++ b/knowhere/index/vector_offset_index/IndexIVF_NM.cpp @@ -376,8 +376,19 @@ IVF_NM::QueryImpl(int64_t n, auto arranged_data = static_cast(ro_codes_->data); #endif auto ivf_stats = std::dynamic_pointer_cast(stats); - ivf_index->search_without_codes_thread_safe(n, xq, k, distances, labels, params->nprobe, parallel_mode, max_codes, - bitset); + + std::vector> futs; + for (int i = 0; i < n; ++i) { + futs.push_back(pool_->push([&, index = i] { + ThreadPool::ScopedOmpSetter setter(1); + ivf_index->search_without_codes_thread_safe(1, xq + index * Dim(), k, distances + index * k, + labels + index * k, params->nprobe, parallel_mode, max_codes, + bitset); + })); + } + for (auto& fut : futs) { + fut.get(); + } #if 0 stdclock::time_point after = stdclock::now(); double search_cost = (std::chrono::duration(after - before)).count(); @@ -426,17 +437,40 @@ IVF_NM::QueryByRangeImpl(int64_t n, float radius = GetMetaRadius(config); bool is_ip = (ivf_index->metric_type == faiss::METRIC_INNER_PRODUCT); - - faiss::RangeSearchResult res(n); - ivf_index->range_search_without_codes_thread_safe(n, xq, radius, &res, params->nprobe, parallel_mode, max_codes, - bitset); - - if (CheckKeyInConfig(config, meta::RANGE_FILTER)) { - float range_filter = GetMetaRangeFilter(config); - GetRangeSearchResult(res, is_ip, n, radius, range_filter, distances, labels, lims, bitset); - } else { - GetRangeSearchResult(res, is_ip, n, radius, distances, labels, lims); + bool range_filter_exist = CheckKeyInConfig(config, meta::RANGE_FILTER); + float range_filter = range_filter_exist ? GetMetaRangeFilter(config) : (1.0 / 0.0); + + std::vector> result_id_array(n); + std::vector> result_dist_array(n); + std::vector result_size(n); + std::vector result_lims(n + 1); + std::vector> futs; + futs.reserve(n); + for (int i = 0; i < n; ++i) { + futs.push_back(pool_->push([&, index = i] { + ThreadPool::ScopedOmpSetter setter(1); + faiss::RangeSearchResult ret(1); + ivf_index->range_search_without_codes_thread_safe(1, xq + index * Dim(), radius, &ret, params->nprobe, + parallel_mode, max_codes, bitset); + auto elem_cnt = ret.lims[1]; + result_dist_array[index].resize(elem_cnt); + result_id_array[index].resize(elem_cnt); + result_size[index] = elem_cnt; + for (size_t j = 0; j < elem_cnt; j++) { + result_dist_array[index][j] = ret.distances[j]; + result_id_array[index][j] = ret.labels[j]; + } + if (range_filter_exist) { + FilterRangeSearchResultForOneNq(result_dist_array[index], result_id_array[index], is_ip, radius, + range_filter); + } + })); } + for (auto& fut : futs) { + fut.get(); + } + + GetRangeSearchResult(result_dist_array, result_id_array, is_ip, n, radius, range_filter, distances, labels, lims); } void diff --git a/knowhere/index/vector_offset_index/OffsetBaseIndex.h b/knowhere/index/vector_offset_index/OffsetBaseIndex.h index b7e6b77da..3e389d47f 100644 --- a/knowhere/index/vector_offset_index/OffsetBaseIndex.h +++ b/knowhere/index/vector_offset_index/OffsetBaseIndex.h @@ -17,6 +17,7 @@ #include #include "knowhere/common/BinarySet.h" +#include "knowhere/common/ThreadPool.h" #include "knowhere/index/IndexType.h" namespace knowhere { @@ -24,6 +25,7 @@ namespace knowhere { class OffsetBaseIndex { protected: explicit OffsetBaseIndex(std::shared_ptr index) : index_(std::move(index)) { + pool_ = ThreadPool::GetGlobalThreadPool(); } virtual BinarySet @@ -38,6 +40,7 @@ class OffsetBaseIndex { public: std::shared_ptr index_ = nullptr; + std::shared_ptr pool_; }; } // namespace knowhere diff --git a/thirdparty/ctpl/ctpl-std.h b/thirdparty/ctpl/ctpl-std.h index 38d3edf5b..ac4173e8b 100644 --- a/thirdparty/ctpl/ctpl-std.h +++ b/thirdparty/ctpl/ctpl-std.h @@ -210,7 +210,6 @@ namespace ctpl { void set_thread(int i) { std::shared_ptr> flag(this->flags[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { - omp_set_num_threads(1); std::atomic & _flag = *flag; std::function * _f; bool isPop = this->q.pop(_f);