From 81ca5f990982d202e654fa585f6b64ffc4f01883 Mon Sep 17 00:00:00 2001 From: Pingan Ren Date: Mon, 26 Feb 2024 17:14:00 +0100 Subject: [PATCH] Implement new idea --- .../duckpgq/functions/scalar/CMakeLists.txt | 2 + .../scalar/iterativelength_lowerbound.cpp | 37 +- .../scalar/iterativelength_two_phase.cpp | 249 +++++++++++++ .../scalar/shortest_path_lowerbound.cpp | 57 ++- .../scalar/shortest_path_two_phase.cpp | 334 ++++++++++++++++++ test/sql/path-finding/iterativelength.test | 196 ++++++++++ .../sql/path-finding/shortest_path_bound.test | 82 +++-- 7 files changed, 868 insertions(+), 89 deletions(-) create mode 100644 duckpgq/src/duckpgq/functions/scalar/iterativelength_two_phase.cpp create mode 100644 duckpgq/src/duckpgq/functions/scalar/shortest_path_two_phase.cpp create mode 100644 test/sql/path-finding/iterativelength.test diff --git a/duckpgq/src/duckpgq/functions/scalar/CMakeLists.txt b/duckpgq/src/duckpgq/functions/scalar/CMakeLists.txt index 44c3eea2..e03d89e9 100644 --- a/duckpgq/src/duckpgq/functions/scalar/CMakeLists.txt +++ b/duckpgq/src/duckpgq/functions/scalar/CMakeLists.txt @@ -6,11 +6,13 @@ set(EXTENSION_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/csr_get_w_type.cpp ${CMAKE_CURRENT_SOURCE_DIR}/iterativelength.cpp ${CMAKE_CURRENT_SOURCE_DIR}/iterativelength_lowerbound.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/iterativelength_two_phase.cpp ${CMAKE_CURRENT_SOURCE_DIR}/iterativelength2.cpp ${CMAKE_CURRENT_SOURCE_DIR}/iterativelength_bidirectional.cpp ${CMAKE_CURRENT_SOURCE_DIR}/reachability.cpp ${CMAKE_CURRENT_SOURCE_DIR}/shortest_path.cpp ${CMAKE_CURRENT_SOURCE_DIR}/shortest_path_lowerbound.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/shortest_path_two_phase.cpp ${CMAKE_CURRENT_SOURCE_DIR}/csr_creation.cpp PARENT_SCOPE ) \ No newline at end of file diff --git a/duckpgq/src/duckpgq/functions/scalar/iterativelength_lowerbound.cpp b/duckpgq/src/duckpgq/functions/scalar/iterativelength_lowerbound.cpp index 1fe6d52d..8c8c5f89 100644 --- a/duckpgq/src/duckpgq/functions/scalar/iterativelength_lowerbound.cpp +++ b/duckpgq/src/duckpgq/functions/scalar/iterativelength_lowerbound.cpp @@ -8,13 +8,14 @@ namespace duckdb { static bool IterativeLengthLowerBound(int64_t v_size, int64_t *v, vector &e, - vector>> &parents_v, + // vector>> &parents_v, vector> &seen, vector> &visit, vector> &next) { bool change = false; for (auto i = 0; i < v_size; i++) { next[i] = 0; + seen[i] = 0; } for (auto lane = 0; lane < LANE_LIMIT; lane++) { @@ -22,11 +23,12 @@ static bool IterativeLengthLowerBound(int64_t v_size, int64_t *v, vectorget_index(search_num); - int64_t dst_pos = vdata_dst.sel->get_index(search_num); if (!vdata_src.validity.RowIsValid(src_pos)) { result_validity.SetInvalid(search_num); result_data[search_num] = (int64_t)-1; /* no path */ - } else if (src_data[src_pos] == dst_data[dst_pos]) { - result_data[search_num] = (int64_t)-1; /* no path */ - visit1[src_data[src_pos]][lane] = true; - lane_to_num[lane] = search_num; // active lane - active++; - break; } else { result_data[search_num] = (int64_t)-1; /* initialize to no path */ - seen[src_data[src_pos]][lane] = true; visit1[src_data[src_pos]][lane] = true; lane_to_num[lane] = search_num; // active lane active++; @@ -150,8 +144,10 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState & // make passes while a lane is still active for (int64_t iter = 1; active && iter <= upper_bound; iter++) { - bool stop = !IterativeLengthLowerBound(v_size, v, e, parents_v, seen, (iter & 1) ? visit1 : visit2, - (iter & 1) ? visit2 : visit1); + if (!IterativeLengthLowerBound(v_size, v, e, seen, (iter & 1) ? visit1 : visit2, + (iter & 1) ? visit2 : visit1)) { + break; + } // detect lanes that finished for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { int64_t search_num = lane_to_num[lane]; @@ -164,9 +160,9 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState & if (iter < lower_bound) { // when reach the destination too early, treat destination as null // looks like the graph does not have that vertex - seen[dst_data[dst_pos]][lane] = false; - (iter & 1) ? visit2[dst_data[dst_pos]][lane] = false - : visit1[dst_data[dst_pos]][lane] = false; + // seen[dst_data[dst_pos]][lane] = false; + // (iter & 1) ? visit2[dst_data[dst_pos]][lane] = false + // : visit1[dst_data[dst_pos]][lane] = false; continue; } else { result_data[search_num] = @@ -178,9 +174,6 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState & } } } - if (stop) { - break; - } } // no changes anymore: any still active searches have no path diff --git a/duckpgq/src/duckpgq/functions/scalar/iterativelength_two_phase.cpp b/duckpgq/src/duckpgq/functions/scalar/iterativelength_two_phase.cpp new file mode 100644 index 00000000..e270f5d5 --- /dev/null +++ b/duckpgq/src/duckpgq/functions/scalar/iterativelength_two_phase.cpp @@ -0,0 +1,249 @@ +#include +#include "duckdb/main/client_data.hpp" +#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp" +#include "duckdb/planner/expression/bound_function_expression.hpp" +#include "duckpgq/common.hpp" +#include "duckpgq/duckpgq_functions.hpp" + +namespace duckdb { + +static bool IterativeLengthPhaseOne(int64_t v_size, int64_t *v, vector &e, + vector> &visit, + vector> &next) { + bool change = false; + for (auto i = 0; i < v_size; i++) { + next[i] = 0; + } + + for (auto i = 0; i < v_size; i++) { + if (visit[i].any()) { + for (auto offset = v[i]; offset < v[i + 1]; offset++) { + auto n = e[offset]; + next[n] = next[n] | visit[i]; + } + } + } + + for (auto i = 0; i < v_size; i++) { + change |= next[i].any(); + } + + return change; +} + +static bool IterativeLengthPhaseTwo(int64_t v_size, int64_t *v, vector &e, + vector> &seen, + vector> &visit, + vector> &next) { + bool change = false; + for (auto i = 0; i < v_size; i++) { + next[i] = 0; + } + + for (auto i = 0; i < v_size; i++) { + if (visit[i].any()) { + for (auto offset = v[i]; offset < v[i + 1]; offset++) { + auto n = e[offset]; + next[n] = next[n] | visit[i]; + } + } + } + + for (auto i = 0; i < v_size; i++) { + next[i] = next[i] & ~seen[i]; + seen[i] = seen[i] | next[i]; + change |= next[i].any(); + } + + return change; + +} + +static int64_t IterativeLengthInternal(int64_t lane, int64_t v_size, int64_t destination, + int64_t bound, + int64_t *v, vector &e, + vector> &visit) { + vector src; + for (int64_t v = 0; v < v_size; v++) { + if (visit[v][lane]) { + src.push_back(v); + } + } + vector> seen(v_size); + vector> visit1(v_size); + vector> visit2(v_size); + + idx_t started_searches = 0; + while (started_searches < src.size()) { + for (auto i = 0; i < v_size; i++) { + seen[i] = 0; + visit1[i] = 0; + } + // add search jobs to free lanes + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + while (started_searches < src.size()) { + int64_t search_num = started_searches++; + visit1[src[search_num]][lane] = true; + } + } + + for (int64_t iter = 1; iter <= bound; iter++) { + if (!IterativeLengthPhaseTwo(v_size, v, e, seen, (iter & 1) ? visit1 : visit2, + (iter & 1) ? visit2 : visit1)) { + break; + } + // detect lanes that found the destination + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + if (seen[destination][lane]) { + return iter; + } + } + } + } + return -1; +} + +static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &state, + Vector &result) { + auto &func_expr = (BoundFunctionExpression &)state.expr; + auto &info = (IterativeLengthFunctionData &)*func_expr.bind_info; + auto duckpgq_state_entry = info.context.registered_state.find("duckpgq"); + if (duckpgq_state_entry == info.context.registered_state.end()) { + //! Wondering how you can get here if the extension wasn't loaded, but + //! leaving this check in anyways + throw MissingExtensionException( + "The DuckPGQ extension has not been loaded"); + } + auto duckpgq_state = + reinterpret_cast(duckpgq_state_entry->second.get()); + + D_ASSERT(duckpgq_state->csr_list[info.csr_id]); + + if ((uint64_t)info.csr_id + 1 > duckpgq_state->csr_list.size()) { + throw ConstraintException("Invalid ID"); + } + auto csr_entry = duckpgq_state->csr_list.find((uint64_t)info.csr_id); + if (csr_entry == duckpgq_state->csr_list.end()) { + throw ConstraintException( + "Need to initialize CSR before doing shortest path"); + } + + if (!(csr_entry->second->initialized_v && csr_entry->second->initialized_e)) { + throw ConstraintException( + "Need to initialize CSR before doing shortest path"); + } + int64_t v_size = args.data[1].GetValue(0).GetValue(); + int64_t *v = (int64_t *)duckpgq_state->csr_list[info.csr_id]->v; + vector &e = duckpgq_state->csr_list[info.csr_id]->e; + + // get src and dst vectors for searches + auto &src = args.data[2]; + auto &dst = args.data[3]; + UnifiedVectorFormat vdata_src; + UnifiedVectorFormat vdata_dst; + src.ToUnifiedFormat(args.size(), vdata_src); + dst.ToUnifiedFormat(args.size(), vdata_dst); + auto src_data = (int64_t *)vdata_src.data; + auto dst_data = (int64_t *)vdata_dst.data; + + // get lowerbound and upperbound + auto &lower = args.data[4]; + auto &upper = args.data[5]; + UnifiedVectorFormat vdata_lower_bound; + UnifiedVectorFormat vdata_upper_bound; + lower.ToUnifiedFormat(args.size(), vdata_lower_bound); + upper.ToUnifiedFormat(args.size(), vdata_upper_bound); + auto lower_bound = ((int64_t *)vdata_lower_bound.data)[0]; + auto upper_bound = ((int64_t *)vdata_upper_bound.data)[0]; + + ValidityMask &result_validity = FlatVector::Validity(result); + + // create result vector + result.SetVectorType(VectorType::FLAT_VECTOR); + auto result_data = FlatVector::GetData(result); + + // create temp SIMD arrays + vector> seen(v_size); + vector> visit1(v_size); + vector> visit2(v_size); + + // maps lane to search number + short lane_to_num[LANE_LIMIT]; + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; // inactive + } + + idx_t started_searches = 0; + while (started_searches < args.size()) { + + // empty visit vectors + for (auto i = 0; i < v_size; i++) { + seen[i] = 0; + visit1[i] = 0; + } + + // add search jobs to free lanes + uint64_t active = 0; + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; + while (started_searches < args.size()) { + int64_t search_num = started_searches++; + int64_t src_pos = vdata_src.sel->get_index(search_num); + if (!vdata_src.validity.RowIsValid(src_pos)) { + result_validity.SetInvalid(search_num); + result_data[search_num] = (int64_t)-1; /* no path */ + } else { + result_data[search_num] = (int64_t)-1; /* initialize to no path */ + visit1[src_data[src_pos]][lane] = true; + lane_to_num[lane] = search_num; // active lane + active++; + break; + } + } + } + + int64_t iter = 1; + // phase one: search without seen until lower bound - 1 + for (; iter < lower_bound; iter++) { + IterativeLengthPhaseOne(v_size, v, e, (iter & 1) ? visit1 : visit2, + (iter & 1) ? visit2 : visit1); + } + + // phase two: search with seen until upper bound + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + auto search_num = lane_to_num[lane]; + if (search_num >= 0) { + int64_t dst_pos = vdata_dst.sel->get_index(search_num); + auto length = IterativeLengthInternal(lane, v_size, + dst_data[dst_pos], upper_bound - lower_bound + 1, v, e, (iter & 1) ? visit1 : visit2); + if (length >= 0) { + result_data[search_num] = length + lower_bound - 1; + lane_to_num[lane] = -1; // mark inactive + } + } + } + + // no changes anymore: any still active searches have no path + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = lane_to_num[lane]; + if (search_num >= 0) { // active lane + result_validity.SetInvalid(search_num); + result_data[search_num] = (int64_t)-1; /* no path */ + lane_to_num[lane] = -1; // mark inactive + } + } + } + duckpgq_state->csr_to_delete.insert(info.csr_id); +} + +// CreateScalarFunctionInfo DuckPGQFunctions::GetIterativeLengthLowerBoundFunction() { +// auto fun = ScalarFunction("iterativelength_lowerbound", +// {LogicalType::INTEGER, LogicalType::BIGINT, +// LogicalType::BIGINT, LogicalType::BIGINT, +// LogicalType::BIGINT, LogicalType::BIGINT}, +// LogicalType::BIGINT, IterativeLengthLowerBoundFunction, +// IterativeLengthFunctionData::IterativeLengthBind); +// return CreateScalarFunctionInfo(fun); +// } + +} // namespace duckdb diff --git a/duckpgq/src/duckpgq/functions/scalar/shortest_path_lowerbound.cpp b/duckpgq/src/duckpgq/functions/scalar/shortest_path_lowerbound.cpp index 63daba93..c1af6859 100644 --- a/duckpgq/src/duckpgq/functions/scalar/shortest_path_lowerbound.cpp +++ b/duckpgq/src/duckpgq/functions/scalar/shortest_path_lowerbound.cpp @@ -13,18 +13,19 @@ namespace duckdb { static bool IterativeLengthLowerBound(int64_t v_size, int64_t *V, vector &E, vector &edge_ids, - vector>> &parents_v, + // vector>> &parents_v, vector>> &paths_v, vector>> &paths_e, vector> &seen, vector> &visit, vector> &next) { bool change = false; - map, unordered_set> parents_v_cache; + // map, unordered_set> parents_v_cache; map, vector> paths_v_cache; map, vector> paths_e_cache; for (auto v = 0; v < v_size; v++) { next[v] = 0; + seen[v] = 0; } //! Keep track of edge id through which the node was reached for (auto v = 0; v < v_size; v++) { @@ -36,17 +37,17 @@ static bool IterativeLengthLowerBound(int64_t v_size, int64_t *V, vector> visit1(v_size); vector> visit2(v_size); - vector>> parents_v(v_size, std::vector>(LANE_LIMIT)); + // vector>> parents_v(v_size, std::vector>(LANE_LIMIT)); vector>> paths_v(v_size, std::vector>(LANE_LIMIT)); vector>> paths_e(v_size, std::vector>(LANE_LIMIT)); @@ -156,17 +157,10 @@ static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &sta while (started_searches < args.size()) { int64_t search_num = started_searches++; int64_t src_pos = vdata_src.sel->get_index(search_num); - int64_t dst_pos = vdata_dst.sel->get_index(search_num); if (!vdata_src.validity.RowIsValid(src_pos)) { result_validity.SetInvalid(search_num); - } else if (src_data[src_pos] == dst_data[dst_pos]) { - visit1[src_data[src_pos]][lane] = true; - lane_to_num[lane] = search_num; // active lane - active++; - break; } else { visit1[src_data[src_pos]][lane] = true; - seen[src_data[src_pos]][lane] = true; lane_to_num[lane] = search_num; // active lane active++; break; @@ -177,7 +171,7 @@ static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &sta //! make passes while a lane is still active for (int64_t iter = 1; active && iter <= upper_bound; iter++) { //! Perform one step of bfs exploration - if (!IterativeLengthLowerBound(v_size, v, e, edge_ids, parents_v, paths_v, paths_e, seen, + if (!IterativeLengthLowerBound(v_size, v, e, edge_ids, paths_v, paths_e, seen, (iter & 1) ? visit1 : visit2, (iter & 1) ? visit2 : visit1)) { break; @@ -192,11 +186,6 @@ static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &sta // check if the path length is within bounds // bound vector is either a constant or a flat vector if (iter < lower_bound) { - // when reach the destination too early, treat destination as null - // looks like the graph does not have that vertex - seen[dst_data[dst_pos]][lane] = false; - (iter & 1) ? visit2[dst_data[dst_pos]][lane] = false - : visit1[dst_data[dst_pos]][lane] = false; continue; } else { vector output_vector; @@ -240,15 +229,15 @@ static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &sta duckpgq_state->csr_to_delete.insert(info.csr_id); } -CreateScalarFunctionInfo DuckPGQFunctions::GetShortestPathLowerBoundFunction() { - auto fun = ScalarFunction("shortestpath_lowerbound", - {LogicalType::INTEGER, LogicalType::BIGINT, - LogicalType::BIGINT, LogicalType::BIGINT, - LogicalType::BIGINT, LogicalType::BIGINT}, - LogicalType::LIST(LogicalType::BIGINT), - ShortestPathLowerBoundFunction, - IterativeLengthFunctionData::IterativeLengthBind); - return CreateScalarFunctionInfo(fun); -} +// CreateScalarFunctionInfo DuckPGQFunctions::GetShortestPathLowerBoundFunction() { +// auto fun = ScalarFunction("shortestpath_lowerbound", +// {LogicalType::INTEGER, LogicalType::BIGINT, +// LogicalType::BIGINT, LogicalType::BIGINT, +// LogicalType::BIGINT, LogicalType::BIGINT}, +// LogicalType::LIST(LogicalType::BIGINT), +// ShortestPathLowerBoundFunction, +// IterativeLengthFunctionData::IterativeLengthBind); +// return CreateScalarFunctionInfo(fun); +// } } // namespace duckdb diff --git a/duckpgq/src/duckpgq/functions/scalar/shortest_path_two_phase.cpp b/duckpgq/src/duckpgq/functions/scalar/shortest_path_two_phase.cpp new file mode 100644 index 00000000..93adb3d7 --- /dev/null +++ b/duckpgq/src/duckpgq/functions/scalar/shortest_path_two_phase.cpp @@ -0,0 +1,334 @@ +#include "duckdb/common/fstream.hpp" +#include "duckdb/common/profiler.hpp" +#include "duckdb/main/client_data.hpp" +#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp" +#include "duckdb/planner/expression/bound_function_expression.hpp" +#include "duckpgq/common.hpp" +#include "duckpgq/duckpgq_functions.hpp" + +#include +#include + +namespace duckdb { + +static bool IterativeLengthPhaseOne(int64_t v_size, int64_t *V, vector &E, + int64_t iter, vector &edge_ids, + vector>> &paths_v, + vector>> &paths_e, + vector> &seen, + vector> &visit, + vector> &next) { + bool change = false; + for (auto v = 0; v < v_size; v++) { + next[v] = 0; + seen[v] = 0; + } + //! Keep track of edge id through which the node was reached + for (auto v = 0; v < v_size; v++) { + if (visit[v].any()) { + for (auto e = V[v]; e < V[v + 1]; e++) { + auto n = E[e]; + auto edge_id = edge_ids[e]; + next[n] = next[n] | visit[v]; + for (auto lane = 0; lane < LANE_LIMIT; lane++) { + if (visit[v][lane]) { + paths_v[n][lane][iter] = v; + paths_e[n][lane][iter] = edge_id; + } + } + } + } + } + + for (auto v = 0; v < v_size; v++) { + seen[v] = seen[v] | next[v]; + change |= next[v].any(); + } + return change; +} + +static bool IterativeLengthPhaseTwo(int64_t v_size, int64_t *V, vector &E, + vector &edge_ids, + vector> &parents_v, + vector> &parents_e, + vector> &seen, + vector> &visit, + vector> &next) { + bool change = false; + for (auto v = 0; v < v_size; v++) { + next[v] = 0; + } + //! Keep track of edge id through which the node was reached + for (auto v = 0; v < v_size; v++) { + if (visit[v].any()) { + for (auto e = V[v]; e < V[v + 1]; e++) { + auto n = E[e]; + auto edge_id = edge_ids[e]; + next[n] = next[n] | visit[v]; + for (auto l = 0; l < LANE_LIMIT; l++) { + parents_v[n][l] = + ((parents_v[n][l] == -1) && visit[v][l]) ? v : parents_v[n][l]; + parents_e[n][l] = ((parents_e[n][l] == -1) && visit[v][l]) + ? edge_id + : parents_e[n][l]; + } + } + } + } + + for (auto v = 0; v < v_size; v++) { + next[v] = next[v] & ~seen[v]; + seen[v] = seen[v] | next[v]; + change |= next[v].any(); + } + return change; +} + +static std::tuple> ShortestPathInternal(int64_t lane, int64_t v_size, int64_t destination, + int64_t bound, + int64_t *v, vector &e, vector &edge_ids, + vector> &visit) { + vector src; + vector result; + for (int64_t v = 0; v < v_size; v++) { + if (visit[v][lane]) { + src.push_back(v); + } + } + vector> seen(v_size); + vector> visit1(v_size); + vector> visit2(v_size); + + vector> parents_v(v_size, + std::vector(LANE_LIMIT, -1)); + vector> parents_e(v_size, + std::vector(LANE_LIMIT, -1)); + + + // maps lane to search number + int16_t lane_to_num[LANE_LIMIT]; + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; // inactive + } + + idx_t started_searches = 0; + while (started_searches < src.size()) { + for (auto i = 0; i < v_size; i++) { + seen[i] = 0; + visit1[i] = 0; + } + // add search jobs to free lanes + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + if (started_searches < src.size()) { + int64_t search_num = started_searches++; + visit1[src[search_num]][lane] = true; + lane_to_num[lane] = search_num; + } else { + break; + } + } + + for (int64_t iter = 1; iter <= bound; iter++) { + if (!IterativeLengthPhaseTwo(v_size, v, e, edge_ids, parents_v, parents_e, + seen, (iter & 1) ? visit1 : visit2, + (iter & 1) ? visit2 : visit1)) { + break; + } + // detect lanes that found the destination + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + if (seen[destination][lane]) { + auto search_num = lane_to_num[lane]; + + // found the destination, reconstruct the path + auto parent_vertex = parents_v[destination][lane]; + auto parent_edge = parents_e[destination][lane]; + + result.push_back(destination); + result.push_back(parent_edge); + while (parent_vertex != src[search_num]) { + result.push_back(parent_vertex); + parent_edge = parents_e[parent_vertex][lane]; + parent_vertex = parents_v[parent_vertex][lane]; + result.push_back(parent_edge); + } + result.push_back(src[search_num]); + std::reverse(result.begin(), result.end()); + return std::make_tuple(src[search_num], result); + } + } + } + } + return std::make_tuple(-1, result); +} + +static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &state, + Vector &result) { + auto &func_expr = (BoundFunctionExpression &)state.expr; + auto &info = (IterativeLengthFunctionData &)*func_expr.bind_info; + auto duckpgq_state_entry = info.context.registered_state.find("duckpgq"); + if (duckpgq_state_entry == info.context.registered_state.end()) { + //! Wondering how you can get here if the extension wasn't loaded, but + //! leaving this check in anyways + throw MissingExtensionException( + "The DuckPGQ extension has not been loaded"); + } + auto duckpgq_state = + reinterpret_cast(duckpgq_state_entry->second.get()); + + D_ASSERT(duckpgq_state->csr_list[info.csr_id]); + int32_t id = args.data[0].GetValue(0).GetValue(); + int64_t v_size = args.data[1].GetValue(0).GetValue(); + + int64_t *v = (int64_t *)duckpgq_state->csr_list[id]->v; + vector &e = duckpgq_state->csr_list[id]->e; + vector &edge_ids = duckpgq_state->csr_list[id]->edge_ids; + + auto &src = args.data[2]; + auto &target = args.data[3]; + + UnifiedVectorFormat vdata_src, vdata_dst; + src.ToUnifiedFormat(args.size(), vdata_src); + target.ToUnifiedFormat(args.size(), vdata_dst); + + auto src_data = (int64_t *)vdata_src.data; + auto dst_data = (int64_t *)vdata_dst.data; + + // get lowerbound and upperbound + auto &lower = args.data[4]; + auto &upper = args.data[5]; + UnifiedVectorFormat vdata_lower_bound; + UnifiedVectorFormat vdata_upper_bound; + lower.ToUnifiedFormat(args.size(), vdata_lower_bound); + upper.ToUnifiedFormat(args.size(), vdata_upper_bound); + auto lower_bound = ((int64_t *)vdata_lower_bound.data)[0]; + auto upper_bound = ((int64_t *)vdata_upper_bound.data)[0]; + + result.SetVectorType(VectorType::FLAT_VECTOR); + auto result_data = FlatVector::GetData(result); + ValidityMask &result_validity = FlatVector::Validity(result); + + // create temp SIMD arrays + vector> seen(v_size); + vector> visit1(v_size); + vector> visit2(v_size); + + vector>> paths_v(v_size, + std::vector>(LANE_LIMIT)); + vector>> paths_e(v_size, + std::vector>(LANE_LIMIT)); + + + // maps lane to search number + int16_t lane_to_num[LANE_LIMIT]; + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; // inactive + } + int64_t total_len = 0; + + idx_t started_searches = 0; + while (started_searches < args.size()) { + + // empty visit vectors + for (auto i = 0; i < v_size; i++) { + seen[i] = 0; + visit1[i] = 0; + } + + // add search jobs to free lanes + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; + while (started_searches < args.size()) { + int64_t search_num = started_searches++; + int64_t src_pos = vdata_src.sel->get_index(search_num); + if (!vdata_src.validity.RowIsValid(src_pos)) { + result_validity.SetInvalid(search_num); + } else { + visit1[src_data[src_pos]][lane] = true; + lane_to_num[lane] = search_num; // active lane + break; + } + } + } + + int64_t iter = 1; + for (; iter < lower_bound; iter++) { + IterativeLengthPhaseOne(v_size, v, e, iter, edge_ids, paths_v, paths_e, seen, + (iter & 1) ? visit1 : visit2, + (iter & 1) ? visit2 : visit1); + } + + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + auto search_num = lane_to_num[lane]; + if (search_num >= 0) { + int64_t src_pos = vdata_src.sel->get_index(search_num); + int64_t dst_pos = vdata_dst.sel->get_index(search_num); + auto phase_two_result = ShortestPathInternal(lane, v_size, dst_data[dst_pos], + upper_bound - lower_bound + 1, v, e, edge_ids, (iter & 1) ? visit1 : visit2); + auto phase_two_src = std::get<0>(phase_two_result); + auto phase_two_path = std::get<1>(phase_two_result); + if (phase_two_src >= 0) { + vector output_vector; + // construct the path of phase one + if (paths_v[phase_two_src][lane].size() > 0) { + auto parent_vertex = paths_v[phase_two_src][lane][lower_bound - 1]; + auto parent_edge = paths_e[phase_two_src][lane][lower_bound - 1]; + + output_vector.push_back(parent_edge); + while (parent_vertex != src_data[src_pos]) { + output_vector.push_back(parent_vertex); + parent_edge = paths_e[parent_vertex][lane][lower_bound - 1]; + parent_vertex = paths_v[parent_vertex][lane][lower_bound - 1]; + output_vector.push_back(parent_edge); + } + output_vector.push_back(src_data[src_pos]); + std::reverse(output_vector.begin(), output_vector.end()); + } + + // construct the path of phase two + for (auto val : phase_two_path) { + output_vector.push_back(val); + } + + // construct the output + auto output = make_uniq(LogicalType::LIST(LogicalType::BIGINT)); + for (auto val : output_vector) { + Value value_to_insert = val; + ListVector::PushBack(*output, value_to_insert); + } + result_data[search_num].length = ListVector::GetListSize(*output); + result_data[search_num].offset = total_len; + ListVector::Append(result, ListVector::GetEntry(*output), + ListVector::GetListSize(*output)); + total_len += result_data[search_num].length; + lane_to_num[lane] = -1; // mark inactive + } else { + result_validity.SetInvalid(search_num); + lane_to_num[lane] = -1; // mark inactive + } + } + } + + // no changes anymore: any still active searches have no path + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = lane_to_num[lane]; + if (search_num >= 0) { // active lane + result_validity.SetInvalid(search_num); + lane_to_num[lane] = -1; // mark inactive + } + } + } + duckpgq_state->csr_to_delete.insert(info.csr_id); +} + +CreateScalarFunctionInfo DuckPGQFunctions::GetShortestPathLowerBoundFunction() { + auto fun = ScalarFunction("shortestpath_lowerbound", + {LogicalType::INTEGER, LogicalType::BIGINT, + LogicalType::BIGINT, LogicalType::BIGINT, + LogicalType::BIGINT, LogicalType::BIGINT}, + LogicalType::LIST(LogicalType::BIGINT), + ShortestPathLowerBoundFunction, + IterativeLengthFunctionData::IterativeLengthBind); + return CreateScalarFunctionInfo(fun); +} + +} // namespace duckdb diff --git a/test/sql/path-finding/iterativelength.test b/test/sql/path-finding/iterativelength.test new file mode 100644 index 00000000..a93fb82c --- /dev/null +++ b/test/sql/path-finding/iterativelength.test @@ -0,0 +1,196 @@ +# name: test/sql/path-finding/iterativelength.test +# group: [iterative] + +statement ok +pragma enable_verification + +require duckpgq + +# Graph to test shortest path bound with a cycle +# (0) --> (1) <-> (2) + +statement ok +CREATE TABLE Point3(id BIGINT); INSERT INTO Point3 VALUES (0), (1), (2); + +statement ok +CREATE TABLE know3(src BIGINT, dst BIGINT); INSERT INTO know3 VALUES (0, 1), (1, 2), (2, 1); + +statement ok +-CREATE PROPERTY GRAPH pg3 +VERTEX TABLES ( + Point3 PROPERTIES ( id ) LABEL Pnt + ) +EDGE TABLES ( + know3 SOURCE KEY ( src ) REFERENCES Point3 ( id ) + DESTINATION KEY ( dst ) REFERENCES Point3 ( id ) + LABEL Knows + ); + +query III +WITH cte1 AS ( + SELECT CREATE_CSR_EDGE( + 0, + (SELECT count(a.id) FROM Point3 a), + CAST ( + (SELECT sum(CREATE_CSR_VERTEX( + 0, + (SELECT count(a.id) FROM Point3 a), + sub.dense_id, + sub.cnt) + ) + FROM ( + SELECT a.rowid as dense_id, count(k.src) as cnt + FROM Point3 a + LEFT JOIN know3 k ON k.src = a.id + GROUP BY a.rowid) sub + ) + AS BIGINT), + a.rowid, + c.rowid, + k.rowid) as temp + FROM know3 k + JOIN Point3 a on a.id = k.src + JOIN Point3 c on c.id = k.dst +) SELECT a.id, b.id, iterativelength_lowerbound(0, (select count(*) from Point3), a.rowid, b.rowid, 2, 3) as path_length + FROM Point3 a, Point3 b, (select count(cte1.temp) * 0 as temp from cte1) __x + WHERE __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from Point3), a.rowid, b.rowid, 2, 3); +---- +0 1 3 +0 2 2 +1 1 2 +1 2 3 +2 1 3 +2 2 2 + +# (0) --> (1) --> (4) +# ↗ ↘ +# (3) <-- (2) + +statement ok +CREATE TABLE Point4(id BIGINT); INSERT INTO Point4 VALUES (0), (1), (2), (3), (4); + +statement ok +CREATE TABLE know4(src BIGINT, dst BIGINT); INSERT INTO know4 VALUES (0, 1), (1, 4), (1, 2), (2, 3), (3, 1); + +statement ok +-CREATE PROPERTY GRAPH pg4 +VERTEX TABLES ( + Point4 PROPERTIES ( id ) LABEL Pnt + ) +EDGE TABLES ( + know4 SOURCE KEY ( src ) REFERENCES Point4 ( id ) + DESTINATION KEY ( dst ) REFERENCES Point4 ( id ) + LABEL Knows + ); + +query III +WITH cte1 AS ( + SELECT CREATE_CSR_EDGE( + 0, + (SELECT count(a.id) FROM Point4 a), + CAST ( + (SELECT sum(CREATE_CSR_VERTEX( + 0, + (SELECT count(a.id) FROM Point4 a), + sub.dense_id, + sub.cnt) + ) + FROM ( + SELECT a.rowid as dense_id, count(k.src) as cnt + FROM Point4 a + LEFT JOIN know4 k ON k.src = a.id + GROUP BY a.rowid) sub + ) + AS BIGINT), + a.rowid, + c.rowid, + k.rowid) as temp + FROM know4 k + JOIN Point4 a on a.id = k.src + JOIN Point4 c on c.id = k.dst +) SELECT a.id, b.id, iterativelength_lowerbound(0, (select count(*) from Point4), a.rowid, b.rowid, 3, 10) as path_length + FROM Point4 a, Point4 b, (select count(cte1.temp) * 0 as temp from cte1) __x + WHERE __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from Point4), a.rowid, b.rowid, 3, 10); +---- +0 1 4 +0 2 5 +0 3 3 +0 4 5 +1 1 3 +1 2 4 +1 3 5 +1 4 4 +2 1 5 +2 2 3 +2 3 4 +2 4 3 +3 1 4 +3 2 5 +3 3 3 +3 4 5 + +# (0) --> (1) --> (4) +# ↗ ↘ +# (3) <-- (2) + +statement ok +CREATE TABLE Point5(id BIGINT); INSERT INTO Point5 VALUES (0), (1), (2), (3), (4); + +statement ok +CREATE TABLE know5(src BIGINT, dst BIGINT); INSERT INTO know5 VALUES (0, 1), (1, 4), (0, 2), (2, 3), (3, 0); + +statement ok +-CREATE PROPERTY GRAPH pg5 +VERTEX TABLES ( + Point5 PROPERTIES ( id ) LABEL Pnt + ) +EDGE TABLES ( + know5 SOURCE KEY ( src ) REFERENCES Point5 ( id ) + DESTINATION KEY ( dst ) REFERENCES Point5 ( id ) + LABEL Knows + ); + +query III +WITH cte1 AS ( + SELECT CREATE_CSR_EDGE( + 0, + (SELECT count(a.id) FROM Point5 a), + CAST ( + (SELECT sum(CREATE_CSR_VERTEX( + 0, + (SELECT count(a.id) FROM Point5 a), + sub.dense_id, + sub.cnt) + ) + FROM ( + SELECT a.rowid as dense_id, count(k.src) as cnt + FROM Point5 a + LEFT JOIN know5 k ON k.src = a.id + GROUP BY a.rowid) sub + ) + AS BIGINT), + a.rowid, + c.rowid, + k.rowid) as temp + FROM know5 k + JOIN Point5 a on a.id = k.src + JOIN Point5 c on c.id = k.dst +) SELECT a.id, b.id, iterativelength_lowerbound(0, (select count(*) from Point5), a.rowid, b.rowid, 3, 10) as path_length + FROM Point5 a, Point5 b, (select count(cte1.temp) * 0 as temp from cte1) __x + WHERE __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from Point5), a.rowid, b.rowid, 3, 10); +---- +0 0 3 +0 1 4 +0 2 4 +0 3 5 +0 4 5 +2 0 5 +2 1 3 +2 2 3 +2 3 4 +2 4 4 +3 0 4 +3 1 5 +3 2 5 +3 3 3 +3 4 3 \ No newline at end of file diff --git a/test/sql/path-finding/shortest_path_bound.test b/test/sql/path-finding/shortest_path_bound.test index 97ee93ba..9d32f589 100644 --- a/test/sql/path-finding/shortest_path_bound.test +++ b/test/sql/path-finding/shortest_path_bound.test @@ -53,12 +53,12 @@ WITH cte1 AS ( FROM Know k JOIN Point a on a.id = k.src JOIN Point c on c.id = k.dst -) SELECT a.id as srd_id, b.id as dst_id, iterativelength_lowerbound(0, (select count(*) from Point), a.rowid, b.rowid, 2, 3) as path_length +) SELECT a.id as srd_id, b.id as dst_id, shortestpath_lowerbound(0, (select count(*) from Point), a.rowid, b.rowid, 2, 3) as path FROM Point a, Point b, (select count(cte1.temp) * 0 as temp from cte1) __x WHERE a.id = 0 and __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from Point), a.rowid, b.rowid, 2, 3); ---- -0 1 3 -0 3 2 +0 1 [0, 1, 2, 2, 3, 3, 1] +0 3 [0, 1, 2, 2, 3] query III WITH cte1 AS ( @@ -85,13 +85,13 @@ WITH cte1 AS ( FROM Know k JOIN Point a on a.id = k.src JOIN Point c on c.id = k.dst -) SELECT a.id, b.id, iterativelength(0, (select count(*) from Point), a.rowid, b.rowid, 1, 3) as path_length +) SELECT a.id, b.id, shortestpath_lowerbound(0, (select count(*) from Point), a.rowid, b.rowid, 1, 3) as path FROM Point a, Point b, (select count(cte1.temp) * 0 as temp from cte1) __x WHERE a.id = 0 and __x.temp * 0 + iterativelength(0, (select count(*) from Point), a.rowid, b.rowid, 1, 3); ---- -0 1 1 -0 2 1 -0 3 2 +0 1 [0, 0, 1] +0 2 [0, 1, 2] +0 3 [0, 1, 2, 2, 3] # Graph to test shortest path bound with a cycle # (0) --> (1) <-> (2) @@ -139,11 +139,12 @@ WITH cte1 AS ( FROM know2 k JOIN Point2 a on a.id = k.src JOIN Point2 c on c.id = k.dst -) SELECT a.id, b.id, iterativelength_lowerbound(0, (select count(*) from Point2), a.rowid, b.rowid, 2, 30) as path_length +) SELECT a.id, b.id, shortestpath_lowerbound(0, (select count(*) from Point2), a.rowid, b.rowid, 2, 30) as path FROM Point2 a, Point2 b, (select count(cte1.temp) * 0 as temp from cte1) __x WHERE a.id = 0 and __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from Point2), a.rowid, b.rowid, 2, 30); ---- -0 2 2 +0 1 [0, 0, 1, 1, 2, 2, 1] +0 2 [0, 0, 1, 1, 2] # Graph to test shortest path bound with a cycle @@ -192,11 +193,16 @@ WITH cte1 AS ( FROM know3 k JOIN Point3 a on a.id = k.src JOIN Point3 c on c.id = k.dst -) SELECT a.id, b.id, iterativelength_lowerbound(0, (select count(*) from Point3), a.rowid, b.rowid, 2, 3) as path_length +) SELECT a.id, b.id, shortestpath_lowerbound(0, (select count(*) from Point3), a.rowid, b.rowid, 2, 3) as path FROM Point3 a, Point3 b, (select count(cte1.temp) * 0 as temp from cte1) __x - WHERE a.id = 0 and __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from Point3), a.rowid, b.rowid, 2, 3); + WHERE __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from Point3), a.rowid, b.rowid, 2, 3); ---- -0 0 2 +0 0 [0, 1, 2, 2, 0] +0 1 [0, 1, 2, 2, 0, 0, 1] +0 2 [0, 1, 2, 2, 0, 1, 2] +2 0 [2, 2, 0, 1, 2, 2, 0] +2 1 [2, 2, 0, 0, 1] +2 2 [2, 2, 0, 1, 2] # Graph to test shortest path bound with a cycle # (1) <- (0) <-> (2) @@ -223,12 +229,16 @@ query III -FROM GRAPH_TABLE (pg4 MATCH p = ANY SHORTEST (a:Point4)-[k:know4]->{2,3}(b:Point4) - COLUMNS (a.id, b.id, vertices(p)) - ) tmp; + COLUMNS (a.id AS id1, b.id AS id2, element_id(p)) + ) tmp + ORDER BY tmp.id1, tmp.id2; ---- -0 0 [0, 2, 0] -2 1 [2, 0, 1] -2 2 [2, 0, 2] +0 0 [0, 1, 2, 2, 0] +0 1 [0, 1, 2, 2, 0, 0, 1] +0 2 [0, 1, 2, 2, 0, 1, 2] +2 0 [2, 2, 0, 1, 2, 2, 0] +2 1 [2, 2, 0, 0, 1] +2 2 [2, 2, 0, 1, 2] # Description: Test algorithm's capability to ignore isolated nodes. @@ -256,11 +266,13 @@ query III -FROM GRAPH_TABLE (pg5 MATCH p = ANY SHORTEST (a:Point5)-[k:know5]->{2,3}(b:Point5) - COLUMNS (a.id, b.id, vertices(p)) + COLUMNS (a.id, b.id, element_id(p)) ) tmp; ---- -0 0 [0, 2, 0] -2 2 [2, 0, 2] +0 0 [0, 0, 2, 1, 0] +2 0 [2, 1, 0, 0, 2, 1, 0] +0 2 [0, 0, 2, 1, 0, 0, 2] +2 2 [2, 1, 0, 0, 2] # Description: Test shortest paths in a graph with cycles. # Graph Structure: @@ -289,21 +301,25 @@ query III -FROM GRAPH_TABLE (pg6 MATCH p = ANY SHORTEST (a:Point6)-[k:know6]->{2,4}(b:Point6) - COLUMNS (a.id as id1, b.id as id2, vertices(p)) + COLUMNS (a.id as id1, b.id as id2, element_id(p)) ) tmp order by tmp.id1, tmp.id2; ---- -0 0 [0, 2, 0] -0 1 [0, 2, 3, 1] -0 3 [0, 2, 3] -1 1 [1, 0, 2, 3, 1] -1 2 [1, 0, 2] -1 3 [1, 0, 2, 3] -2 0 [2, 3, 1, 0] -2 1 [2, 3, 1] -2 2 [2, 3, 2] -3 0 [3, 2, 0] -3 2 [3, 1, 0, 2] -3 3 [3, 2, 3] +0 0 [0, 0, 2, 1, 0] +0 1 [0, 0, 2, 2, 3, 4, 1] +0 2 [0, 0, 2, 1, 0, 0, 2] +0 3 [0, 0, 2, 2, 3] +1 0 [1, 5, 0, 0, 2, 1, 0] +1 1 [1, 5, 0, 0, 2, 2, 3, 4, 1] +1 2 [1, 5, 0, 0, 2] +1 3 [1, 5, 0, 0, 2, 2, 3] +2 0 [2, 1, 0, 0, 2, 1, 0] +2 1 [2, 2, 3, 4, 1] +2 2 [2, 1, 0, 0, 2] +2 3 [2, 1, 0, 0, 2, 2, 3] +3 0 [3, 4, 1, 5, 0] +3 1 [3, 3, 2, 2, 3, 4, 1] +3 2 [3, 4, 1, 5, 0, 0, 2] +3 3 [3, 3, 2, 2, 3]