Skip to content

Commit

Permalink
Updating the path record of the old algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
SiberiaWolfP committed Feb 28, 2024
1 parent 40530e3 commit 49c957f
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 151 deletions.
62 changes: 22 additions & 40 deletions duckpgq/src/duckpgq/functions/scalar/iterativelength_lowerbound.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ static bool IterativeLengthLowerBound(int64_t v_size, int64_t *v, vector<int64_t
next[i] = 0;
seen[i] = 0;
}

for (auto lane = 0; lane < LANE_LIMIT; lane++) {
for (auto i = 0; i < v_size; i++) {
if (visit[i][lane]) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
next[n][lane] = true;
}
for (auto i = 0; i < v_size; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
next[n] = next[n] | visit[i];
}
}
}

for (auto i = 0; i < v_size; i++) {
seen[i] = seen[i] | next[i];
change |= next[i].any();
Expand Down Expand Up @@ -100,8 +96,6 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args,
vector<std::bitset<LANE_LIMIT>> seen(v_size);
vector<std::bitset<LANE_LIMIT>> visit1(v_size);
vector<std::bitset<LANE_LIMIT>> visit2(v_size);
vector<vector<unordered_set<int64_t>>> parents_v(
v_size, std::vector<unordered_set<int64_t>>(LANE_LIMIT));

// maps lane to search number
short lane_to_num[LANE_LIMIT];
Expand All @@ -119,7 +113,6 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args,
}

// add search jobs to free lanes
uint64_t active = 0;
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
lane_to_num[lane] = -1;
while (started_searches < args.size()) {
Expand All @@ -132,41 +125,30 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args,
result_data[search_num] = (int64_t)-1; /* initialize to no path */
visit1[src_data[src_pos]][lane] = true;
lane_to_num[lane] = search_num; // active lane
active++;
break;
}
}
}

// make passes while a lane is still active
for (int64_t iter = 1; active && iter <= upper_bound; iter++) {
for (int64_t iter = 1; iter <= upper_bound; iter++) {
if (!IterativeLengthLowerBound(v_size, v, e, seen,
(iter & 1) ? visit1 : visit2,
(iter & 1) ? visit2 : visit1)) {
break;
}
if (iter < lower_bound) {
continue;
}
// detect lanes that finished
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = lane_to_num[lane];
if (search_num >= 0) { // active lane
int64_t dst_pos = vdata_dst.sel->get_index(search_num);
if (seen[dst_data[dst_pos]][lane]) {

// check if the path length is within bounds
// bound vector is either a constant or a flat vector
if (iter < lower_bound) {
// when reach the destination too early, treat destination as null
// looks like the graph does not have that vertex
// seen[dst_data[dst_pos]][lane] = false;
// (iter & 1) ? visit2[dst_data[dst_pos]][lane] = false
// : visit1[dst_data[dst_pos]][lane] = false;
continue;
} else {
result_data[search_num] =
iter; /* found at iter => iter = path length */
lane_to_num[lane] = -1; // mark inactive
active--;
}
result_data[search_num] =
iter; /* found at iter => iter = path length */
lane_to_num[lane] = -1; // mark inactive
}
}
}
Expand All @@ -185,15 +167,15 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args,
duckpgq_state->csr_to_delete.insert(info.csr_id);
}

// CreateScalarFunctionInfo
// DuckPGQFunctions::GetIterativeLengthLowerBoundFunction() {
// auto fun = ScalarFunction(
// "iterativelength_lowerbound",
// {LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
// LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
// LogicalType::BIGINT, IterativeLengthLowerBoundFunction,
// IterativeLengthFunctionData::IterativeLengthBind);
// return CreateScalarFunctionInfo(fun);
// }
CreateScalarFunctionInfo
DuckPGQFunctions::GetIterativeLengthLowerBoundFunction() {
auto fun = ScalarFunction(
"iterativelength_lowerbound",
{LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
LogicalType::BIGINT, IterativeLengthLowerBoundFunction,
IterativeLengthFunctionData::IterativeLengthBind);
return CreateScalarFunctionInfo(fun);
}

} // namespace duckdb
22 changes: 10 additions & 12 deletions duckpgq/src/duckpgq/functions/scalar/iterativelength_two_phase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &
}

// add search jobs to free lanes
uint64_t active = 0;
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
lane_to_num[lane] = -1;
while (started_searches < args.size()) {
Expand All @@ -194,7 +193,6 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &
result_data[search_num] = (int64_t)-1; /* initialize to no path */
visit1[src_data[src_pos]][lane] = true;
lane_to_num[lane] = search_num; // active lane
active++;
break;
}
}
Expand Down Expand Up @@ -234,15 +232,15 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &
duckpgq_state->csr_to_delete.insert(info.csr_id);
}

CreateScalarFunctionInfo
DuckPGQFunctions::GetIterativeLengthLowerBoundFunction() {
auto fun = ScalarFunction(
"iterativelength_lowerbound",
{LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
LogicalType::BIGINT, IterativeLengthLowerBoundFunction,
IterativeLengthFunctionData::IterativeLengthBind);
return CreateScalarFunctionInfo(fun);
}
// CreateScalarFunctionInfo
// DuckPGQFunctions::GetIterativeLengthLowerBoundFunction() {
// auto fun = ScalarFunction(
// "iterativelength_lowerbound",
// {LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
// LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
// LogicalType::BIGINT, IterativeLengthLowerBoundFunction,
// IterativeLengthFunctionData::IterativeLengthBind);
// return CreateScalarFunctionInfo(fun);
// }

} // namespace duckdb
128 changes: 51 additions & 77 deletions duckpgq/src/duckpgq/functions/scalar/shortest_path_lowerbound.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
namespace duckdb {

static bool IterativeLengthLowerBound(int64_t v_size, int64_t *V, vector<int64_t> &E,
vector<int64_t> &edge_ids,
vector<vector<vector<int64_t>>> &paths_v,
vector<vector<vector<int64_t>>> &paths_e,
int64_t iter, vector<int64_t> &edge_ids,
vector<vector<unordered_map<int64_t, int64_t>>> &paths_v,
vector<vector<unordered_map<int64_t, int64_t>>> &paths_e,
vector<std::bitset<LANE_LIMIT>> &seen,
vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next) {
bool change = false;
// map<pair<int64_t, int64_t>, unordered_set<int64_t>> parents_v_cache;
map<pair<int64_t, int64_t>, vector<int64_t>> paths_v_cache;
map<pair<int64_t, int64_t>, vector<int64_t>> paths_e_cache;
for (auto v = 0; v < v_size; v++) {
next[v] = 0;
seen[v] = 0;
Expand All @@ -32,42 +29,17 @@ static bool IterativeLengthLowerBound(int64_t v_size, int64_t *V, vector<int64_t
for (auto e = V[v]; e < V[v + 1]; e++) {
auto n = E[e];
auto edge_id = edge_ids[e];

next[n] = next[n] | visit[v];
for (auto lane = 0; lane < LANE_LIMIT; lane++) {
if (visit[v][lane]) {
//! If the node has not been visited, then update the parent and
//! edge
if (seen[n][lane] == false) {
if (visit[n][lane]) {
// parents_v_cache[make_pair(n, lane)] = parents_v[v][lane];
// parents_v_cache[make_pair(n, lane)].insert(v);
paths_v_cache[make_pair(n, lane)] = paths_v[v][lane];
paths_v_cache[make_pair(n, lane)].push_back(v);
paths_e_cache[make_pair(n, lane)] = paths_e[v][lane];
paths_e_cache[make_pair(n, lane)].push_back(edge_id);
} else {
// parents_v[n][lane] = parents_v[v][lane];
// parents_v[n][lane].insert(v);
paths_v[n][lane] = paths_v[v][lane];
paths_v[n][lane].push_back(v);
paths_e[n][lane] = paths_e[v][lane];
paths_e[n][lane].push_back(edge_id);
}
next[n][lane] = true;
}
paths_v[n][lane][iter] = v;
paths_e[n][lane][iter] = edge_id;
}
}
}
}
}

for (auto const &cache : paths_v_cache) {
paths_v[cache.first.first][cache.first.second] = cache.second;
}
for (auto const &cache : paths_e_cache) {
paths_e[cache.first.first][cache.first.second] = cache.second;
}

for (auto v = 0; v < v_size; v++) {
seen[v] = seen[v] | next[v];
change |= next[v].any();
Expand Down Expand Up @@ -127,10 +99,10 @@ static void ShortestPathLowerBoundFunction(DataChunk &args,
vector<std::bitset<LANE_LIMIT>> visit1(v_size);
vector<std::bitset<LANE_LIMIT>> visit2(v_size);

vector<vector<vector<int64_t>>> paths_v(
v_size, std::vector<vector<int64_t>>(LANE_LIMIT));
vector<vector<vector<int64_t>>> paths_e(
v_size, std::vector<vector<int64_t>>(LANE_LIMIT));
vector<vector<unordered_map<int64_t, int64_t>>> paths_v(v_size,
std::vector<unordered_map<int64_t, int64_t>>(LANE_LIMIT));
vector<vector<unordered_map<int64_t, int64_t>>> paths_e(v_size,
std::vector<unordered_map<int64_t, int64_t>>(LANE_LIMIT));

// maps lane to search number
int16_t lane_to_num[LANE_LIMIT];
Expand All @@ -146,6 +118,10 @@ static void ShortestPathLowerBoundFunction(DataChunk &args,
for (auto i = 0; i < v_size; i++) {
seen[i] = 0;
visit1[i] = 0;
for (auto j = 0; j < LANE_LIMIT; j++) {
paths_v[i][j].clear();
paths_v[i][j].clear();
}
}

// add search jobs to free lanes
Expand All @@ -170,47 +146,45 @@ static void ShortestPathLowerBoundFunction(DataChunk &args,
for (int64_t iter = 1; active && iter <= upper_bound; iter++) {
//! Perform one step of bfs exploration
if (!IterativeLengthLowerBound(
v_size, v, e, edge_ids, paths_v, paths_e, seen,
v_size, v, e, iter, edge_ids, paths_v, paths_e, seen,
(iter & 1) ? visit1 : visit2, (iter & 1) ? visit2 : visit1)) {
break;
}
if (iter < lower_bound) {
continue;
}
// detect lanes that finished
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = lane_to_num[lane];
if (search_num >= 0) { // active lane
//! Check if dst for a source has been seen
int64_t dst_pos = vdata_dst.sel->get_index(search_num);
if (seen[dst_data[dst_pos]][lane]) {
// check if the path length is within bounds
// bound vector is either a constant or a flat vector
if (iter < lower_bound) {
continue;
} else {
vector<int64_t> output_vector;
auto it_v = paths_v[dst_data[dst_pos]][lane].begin(),
end_v = paths_v[dst_data[dst_pos]][lane].end();
auto it_e = paths_e[dst_data[dst_pos]][lane].begin(),
end_e = paths_e[dst_data[dst_pos]][lane].end();
while (it_v != end_v && it_e != end_e) {
output_vector.push_back(*it_v);
output_vector.push_back(*it_e);
it_v++;
it_e++;
}
output_vector.push_back(dst_data[dst_pos]);
auto output =
make_uniq<Vector>(LogicalType::LIST(LogicalType::BIGINT));
for (auto val : output_vector) {
Value value_to_insert = val;
ListVector::PushBack(*output, value_to_insert);
}
result_data[search_num].length = ListVector::GetListSize(*output);
result_data[search_num].offset = total_len;
ListVector::Append(result, ListVector::GetEntry(*output),
ListVector::GetListSize(*output));
total_len += result_data[search_num].length;
lane_to_num[lane] = -1; // mark inactive
vector<int64_t> output_vector;
auto iterations = iter;
auto parent_vertex = paths_v[dst_data[dst_pos]][lane][iterations];
auto parent_edge = paths_e[dst_data[dst_pos]][lane][iterations];
output_vector.push_back(dst_data[dst_pos]);
while (iterations > 0) {
output_vector.push_back(parent_edge);
output_vector.push_back(parent_vertex);
iterations--;
parent_edge = paths_e[parent_vertex][lane][iterations];
parent_vertex = paths_v[parent_vertex][lane][iterations];
}
std::reverse(output_vector.begin(), output_vector.end());
auto output =
make_uniq<Vector>(LogicalType::LIST(LogicalType::BIGINT));
for (auto val : output_vector) {
Value value_to_insert = val;
ListVector::PushBack(*output, value_to_insert);
}
result_data[search_num].length = ListVector::GetListSize(*output);
result_data[search_num].offset = total_len;
ListVector::Append(result, ListVector::GetEntry(*output),
ListVector::GetListSize(*output));
total_len += result_data[search_num].length;
lane_to_num[lane] = -1; // mark inactive
}
}
}
Expand All @@ -228,14 +202,14 @@ static void ShortestPathLowerBoundFunction(DataChunk &args,
duckpgq_state->csr_to_delete.insert(info.csr_id);
}

// CreateScalarFunctionInfo DuckPGQFunctions::GetShortestPathLowerBoundFunction() {
// auto fun = ScalarFunction(
// "shortestpath_lowerbound",
// {LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
// LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
// LogicalType::LIST(LogicalType::BIGINT), ShortestPathLowerBoundFunction,
// IterativeLengthFunctionData::IterativeLengthBind);
// return CreateScalarFunctionInfo(fun);
// }
CreateScalarFunctionInfo DuckPGQFunctions::GetShortestPathLowerBoundFunction() {
auto fun = ScalarFunction(
"shortestpath_lowerbound",
{LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
LogicalType::LIST(LogicalType::BIGINT), ShortestPathLowerBoundFunction,
IterativeLengthFunctionData::IterativeLengthBind);
return CreateScalarFunctionInfo(fun);
}

} // namespace duckdb
Loading

0 comments on commit 49c957f

Please sign in to comment.