Skip to content

Commit

Permalink
make bfs support parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
SiberiaWolfP committed Apr 4, 2024
1 parent e31b833 commit 8642326
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class PhysicalPathFinding : public PhysicalComparisonJoin {

void BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) override;


// //! Schedules tasks to calculate the next iteration of the path-finding
static void ScheduleBFSTasks(Pipeline &pipeline, Event &event, GlobalSinkState &state);
};

} // namespace duckdb
226 changes: 221 additions & 5 deletions duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "duckdb/parallel/event.hpp"
#include "duckdb/parallel/meta_pipeline.hpp"
#include "duckdb/parallel/thread_context.hpp"
#include "duckdb/parallel/base_pipeline_event.hpp"
#include "duckdb/common/types/column/column_data_collection.hpp"
#include <thread>

Expand Down Expand Up @@ -115,6 +116,49 @@ void PathFindingLocalState::CreateCSR(DataChunk &input,
// global_csr.Print(); // Debug print
}

class GlobalBFSState {
public:
GlobalBFSState() = default;

Check warning on line 121 in duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / MacOS (osx_arm64, arm64, arm64-osx)

explicitly defaulted default constructor is implicitly deleted [-Wdefaulted-function-deleted]

Check warning on line 121 in duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / DuckDB-Wasm (wasm_eh, wasm32-emscripten)

explicitly defaulted default constructor is implicitly deleted [-Wdefaulted-function-deleted]

Check warning on line 121 in duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / DuckDB-Wasm (wasm_eh, wasm32-emscripten)

explicitly defaulted default constructor is implicitly deleted [-Wdefaulted-function-deleted]

Check warning on line 121 in duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / DuckDB-Wasm (wasm_mvp, wasm32-emscripten)

explicitly defaulted default constructor is implicitly deleted [-Wdefaulted-function-deleted]

Check warning on line 121 in duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / DuckDB-Wasm (wasm_mvp, wasm32-emscripten)

explicitly defaulted default constructor is implicitly deleted [-Wdefaulted-function-deleted]

Check warning on line 121 in duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / DuckDB-Wasm (wasm_threads, wasm32-emscripten)

explicitly defaulted default constructor is implicitly deleted [-Wdefaulted-function-deleted]

Check warning on line 121 in duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / DuckDB-Wasm (wasm_threads, wasm32-emscripten)

explicitly defaulted default constructor is implicitly deleted [-Wdefaulted-function-deleted]

GlobalBFSState(int64_t v_size_, idx_t pairs_size_, int64_t *src_, int64_t *dst_,
UnifiedVectorFormat &vdata_src_, UnifiedVectorFormat &vdata_dst_)
: iter(1), v_size(v_size_), src(src_), dst(dst_), vdata_src(std::move(vdata_src_)),
vdata_dst(std::move(vdata_dst_)), started_searches(0),
seen(v_size_), visit1(v_size_), visit2(v_size_),
change(false), result(LogicalTypeId::BIGINT, true, true, pairs_size_) {
for (auto i = 0; i < LANE_LIMIT; i++) {
lane_to_num[i] = -1;
}
}

void clear() {
iter = 1;
change = false;
for (auto i = 0; i < LANE_LIMIT; i++) {
lane_to_num[i] = -1;
}
// empty visit vectors
for (auto i = 0; i < v_size; i++) {
seen[i] = 0;
visit1[i] = 0;
}
}
public:
int64_t iter;
int64_t v_size;
int64_t *src;
int64_t *dst;
UnifiedVectorFormat vdata_src;
UnifiedVectorFormat vdata_dst;
idx_t started_searches;
idx_t lane_to_num[LANE_LIMIT];
vector<std::bitset<LANE_LIMIT>> seen;
vector<std::bitset<LANE_LIMIT>> visit1;
vector<std::bitset<LANE_LIMIT>> visit2;
bool change;
Vector result;
};

class PathFindingGlobalState : public GlobalSinkState {
public:
using GlobalCompressedSparseRow = PhysicalPathFinding::GlobalCompressedSparseRow;
Expand All @@ -129,7 +173,9 @@ class PathFindingGlobalState : public GlobalSinkState {

PathFindingGlobalState(PathFindingGlobalState &prev)
: GlobalSinkState(prev), global_tasks(prev.global_tasks),
global_csr(std::move(prev.global_csr)), child(prev.child + 1) {}
global_csr(std::move(prev.global_csr)), child(prev.child + 1) {

}

void Sink(DataChunk &input, PathFindingLocalState &lstate) const {
lstate.Sink(input, *global_csr);
Expand All @@ -144,6 +190,9 @@ class PathFindingGlobalState : public GlobalSinkState {

unique_ptr<GlobalCompressedSparseRow> global_csr;
size_t child;

// state for BFS
unique_ptr<GlobalBFSState> global_bfs_state;
};

unique_ptr<GlobalSinkState>
Expand Down Expand Up @@ -312,6 +361,114 @@ static void IterativeLengthFunction(const unique_ptr<PathFindingGlobalState::Glo
}
}

class PhysicalBFSTask : public ExecutorTask {
public:
PhysicalBFSTask(shared_ptr<Event> event_p, ClientContext &context, PathFindingGlobalState &state, idx_t start, idx_t end)
: ExecutorTask(context, std::move(event_p)), context(context), state(state), start(start), end(end) {
}

TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
auto& change = state.global_bfs_state->change;
auto& v_size = state.global_bfs_state->v_size;
auto& seen = state.global_bfs_state->seen;
auto& visit = state.global_bfs_state->iter & 1 ? state.global_bfs_state->visit1 : state.global_bfs_state->visit2;
auto& next = state.global_bfs_state->iter & 1 ? state.global_bfs_state->visit2 : state.global_bfs_state->visit1;
int64_t *v = (int64_t *)state.global_csr->v;
vector<int64_t> &e = state.global_csr->e;

for (auto i = 0; i < v_size; i++) {
next[i] = 0;
}
for (auto i = 0; i < v_size; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
next[n] = next[n] | visit[i];
}
}
}
for (auto i = 0; i < v_size; i++) {
next[i] = next[i] & ~seen[i];
seen[i] = seen[i] | next[i];
change |= next[i].any();
}

event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}

private:
ClientContext &context;
PathFindingGlobalState &state;
idx_t start;
idx_t end;
};

class BFSIterativeEvent : public BasePipelineEvent {
public:
BFSIterativeEvent(PathFindingGlobalState &gstate_p, Pipeline &pipeline_p)
: BasePipelineEvent(pipeline_p), gstate(gstate_p) {
}

PathFindingGlobalState &gstate;

public:
void Schedule() override {
auto &context = pipeline->GetClientContext();

// Schedule tasks equal to the number of threads, which will each merge multiple partitions
auto &ts = TaskScheduler::GetScheduler(context);
idx_t num_threads = ts.NumberOfThreads();

vector<shared_ptr<Task>> bfs_tasks;
bfs_tasks.push_back(make_uniq<PhysicalBFSTask>(shared_from_this(), context, gstate, 0, 0));
// for (idx_t tnum = 0; tnum < num_threads; tnum++) {
// bfs_tasks.push_back(make_uniq<PhysicalBFSTask>(shared_from_this(), context, gstate));
// }
SetTasks(std::move(bfs_tasks));
}

void FinishEvent() override {
auto& bfs_state = gstate.global_bfs_state;

auto result_data = FlatVector::GetData<int64_t>(bfs_state->result);
ValidityMask &result_validity = FlatVector::Validity(bfs_state->result);

if (bfs_state->change) {
// detect lanes that finished
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = bfs_state->lane_to_num[lane];
if (search_num >= 0) { // active lane
int64_t dst_pos = bfs_state->vdata_dst.sel->get_index(search_num);
if (bfs_state->seen[bfs_state->dst[dst_pos]][lane]) {
result_data[search_num] =
bfs_state->iter; /* found at iter => iter = path length */
bfs_state->lane_to_num[lane] = -1; // mark inactive
}
}
}
// into the next iteration
auto bfs_event = std::make_shared<BFSIterativeEvent>(gstate, *pipeline);
this->InsertEvent(std::dynamic_pointer_cast<BasePipelineEvent>(bfs_event));
} else {
// no changes anymore: any still active searches have no path
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = bfs_state->lane_to_num[lane];
if (search_num >= 0) { // active lane
result_validity.SetInvalid(search_num);
result_data[search_num] = (int64_t)-1; /* no path */
bfs_state->lane_to_num[lane] = -1; // mark inactive
}
}

// if remaining pairs, schedule the BFS for the next batch
if (bfs_state->started_searches < gstate.global_tasks.Count()) {
PhysicalPathFinding::ScheduleBFSTasks(*pipeline, *this, gstate);
}
}
}
};


SinkFinalizeType
PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event,
Expand All @@ -321,24 +478,83 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event,
auto &csr = gstate.global_csr;
auto &global_tasks = gstate.global_tasks;
if (global_tasks.Count() != 0) {
DataChunk all_pairs;
DataChunk pairs;
global_tasks.InitializeScanChunk(pairs);
ColumnDataScanState scan_state;
global_tasks.InitializeScan(scan_state);
while (global_tasks.Scan(scan_state, pairs)) {
Vector result(LogicalType::BIGINT, true, true);
IterativeLengthFunction(csr, pairs, result);
// debug print
Printer::Print(result.ToString(pairs.size()));
all_pairs.Append(pairs, true);
}
// debug print
all_pairs.Print();

auto &src = all_pairs.data[0];
auto &dst = all_pairs.data[1];
UnifiedVectorFormat vdata_src;
UnifiedVectorFormat vdata_dst;
src.ToUnifiedFormat(all_pairs.size(), vdata_src);
dst.ToUnifiedFormat(all_pairs.size(), vdata_dst);
auto src_data = FlatVector::GetData<int64_t>(src);
auto dst_data = FlatVector::GetData<int64_t>(dst);

gstate.global_bfs_state = make_uniq<GlobalBFSState>(csr->v_size,
global_tasks.Count(), src_data, dst_data, vdata_src, vdata_dst);

// Schedule the first round of BFS tasks
if (all_pairs.size() > 0) {
ScheduleBFSTasks(pipeline, event, gstate);
}
}

// debug print
gstate.global_bfs_state->result.Print(global_tasks.Count());

// Move to the next input child
++gstate.child;

return SinkFinalizeType::READY;
}

void ScheduleBFSTasks(Pipeline &pipeline, Event &event, GlobalSinkState &state) {
auto &gstate = state.Cast<PathFindingGlobalState>();
auto &bfs_state = gstate.global_bfs_state;

// for every batch of pairs, schedule a BFS task
bfs_state->clear();

// remaining pairs
if (bfs_state->started_searches < gstate.global_tasks.Count()) {

auto result_data = FlatVector::GetData<int64_t>(bfs_state->result);
auto& result_validity = FlatVector::Validity(bfs_state->result);

for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
bfs_state->lane_to_num[lane] = -1;
while (bfs_state->started_searches < gstate.global_tasks.Count()) {
int64_t search_num = bfs_state->started_searches++;
int64_t src_pos = bfs_state->vdata_src.sel->get_index(search_num);
int64_t dst_pos = bfs_state->vdata_src.sel->get_index(search_num);
if (!bfs_state->vdata_src.validity.RowIsValid(src_pos)) {
result_validity.SetInvalid(search_num);
result_data[search_num] = (uint64_t)-1; /* no path */
} else if (bfs_state->src[src_pos] == bfs_state->dst[dst_pos]) {
result_data[search_num] =
(uint64_t)0; // path of length 0 does not require a search
} else {
bfs_state->visit1[bfs_state->src[src_pos]][lane] = true;
bfs_state->lane_to_num[lane] = search_num; // active lane
break;
}
}
}

auto bfs_event = make_shared<BFSIterativeEvent>(gstate, pipeline);
event.InsertEvent(std::move(std::dynamic_pointer_cast<BasePipelineEvent>(bfs_event)));
}
}


//===--------------------------------------------------------------------===//
// Operator
//===--------------------------------------------------------------------===//
Expand Down

0 comments on commit 8642326

Please sign in to comment.