Skip to content

Commit

Permalink
Comment out prints
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Nov 7, 2024
1 parent 0bd0c19 commit 4c0a671
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 61 deletions.
3 changes: 3 additions & 0 deletions src/core/operator/event/shortest_path_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ void ParallelShortestPathEvent::FinishEvent() {
auto &bfs_state = gstate.global_bfs_state;

// if remaining pairs, schedule the BFS for the next batch
// std::cout << "Started searches: " << bfs_state->started_searches << std::endl;
// std::cout << "Total pairs: " << gstate.global_pairs->Count() << std::endl;
if (bfs_state->started_searches < gstate.global_pairs->Count()) {
op.ScheduleBFSEvent(*pipeline, *this, gstate);
}
// std::cout << "Finished event" << std::endl;
};

} // namespace core
Expand Down
1 change: 1 addition & 0 deletions src/core/operator/physical_path_finding_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ void GlobalBFSState::CreateTasks() {
if (current_task_start < (idx_t)v_size) {
global_task_queue.push_back({current_task_start, v_size});
}
// std::cout << "Set the number of tasks to " << global_task_queue.size() << std::endl;
}


Expand Down
129 changes: 68 additions & 61 deletions src/core/operator/task/shortest_path_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ PhysicalShortestPathTask::PhysicalShortestPathTask(shared_ptr<Event> event_p, Cl
auto &barrier = bfs_state->barrier;

do {
// std::cout << worker_id << ": Starting iterative path" << std::endl;
IterativePath();
// std::cout << worker_id << ": Finished iterative path" << std::endl;
barrier->Wait([&]() {
bfs_state->ResetTaskIndex(); // Reset task index safely
});

barrier->Wait();

if (worker_id == 0) {
// std::cout << worker_id << ": Starting reach detected" << std::endl;
ReachDetect();
// std::cout << worker_id << ": Finished reach detected" << std::endl;
}

barrier->Wait([&]() {
Expand All @@ -33,12 +37,14 @@ PhysicalShortestPathTask::PhysicalShortestPathTask(shared_ptr<Event> event_p, Cl

barrier->Wait();
} while (bfs_state->change);

if (worker_id == 0) {
// std::cout << worker_id << ": Starting path construction" << std::endl;
PathConstruction();
// std::cout << worker_id << ": Finished path construction" << std::endl;
}

event->FinishTask();
// std::cout << worker_id << ": Finished event" << std::endl;
return TaskExecutionResult::TASK_FINISHED;
}

Expand All @@ -53,79 +59,80 @@ PhysicalShortestPathTask::PhysicalShortestPathTask(shared_ptr<Event> event_p, Cl
}

void PhysicalShortestPathTask::IterativePath() {
auto &bfs_state = state.global_bfs_state;
auto &seen = bfs_state->seen;
auto &visit = bfs_state->iter & 1 ? bfs_state->visit1 : bfs_state->visit2;
auto &next = bfs_state->iter & 1 ? bfs_state->visit2 : bfs_state->visit1;
auto &barrier = bfs_state->barrier;
int64_t *v = (int64_t *)state.csr->v;
vector<int64_t> &e = state.csr->e;
auto &edge_ids = state.csr->edge_ids;
auto &parents_ve = bfs_state->parents_ve;
auto &change = bfs_state->change;

if (!SetTaskRange()) {
return; // no more tasks
}
// clear next before each iteration
for (auto i = left; i < right; i++) {
next[i] = 0;
}
auto &bfs_state = state.global_bfs_state;
auto &seen = bfs_state->seen;
auto &visit = bfs_state->iter & 1 ? bfs_state->visit1 : bfs_state->visit2;
auto &next = bfs_state->iter & 1 ? bfs_state->visit2 : bfs_state->visit1;
auto &barrier = bfs_state->barrier;
int64_t *v = (int64_t *)state.csr->v;
vector<int64_t> &e = state.csr->e;
auto &edge_ids = state.csr->edge_ids;
auto &parents_ve = bfs_state->parents_ve;
auto &change = bfs_state->change;


barrier->Wait();

while (true) {
if (!SetTaskRange()) {
return; // no more tasks
}
// clear next before each iteration
for (auto i = left; i < right; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
auto edge_id = edge_ids[offset];
{
std::lock_guard<std::mutex> lock(bfs_state->element_locks[n]);
next[n] |= visit[i];
}
for (auto l = 0; l < LANE_LIMIT; l++) {
if (parents_ve[n][l].GetV() == -1 && visit[i][l]) {
parents_ve[n][l] = {static_cast<int64_t>(i), edge_id};
next[i] = 0;
}
// std::cout << worker_id << " Finished clearing next" << std::endl;

barrier->Wait();

while (true) {
for (auto i = left; i < right; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
auto edge_id = edge_ids[offset];
{
std::lock_guard<std::mutex> lock(bfs_state->element_locks[n]);
next[n] |= visit[i];
}
for (auto l = 0; l < LANE_LIMIT; l++) {
if (parents_ve[n][l].GetV() == -1 && visit[i][l]) {
parents_ve[n][l] = {static_cast<int64_t>(i), edge_id};
}
}
}
}
}
if (!SetTaskRange()) {
// std::cout << worker_id << ": No more tasks found to explore" << std::endl;
break; // no more tasks
}
}
if (!SetTaskRange()) {
break; // no more tasks
}
}
change = false;
barrier->Wait([&]() {
bfs_state->ResetTaskIndex(); // Reset task index safely
});
barrier->Wait([&]() {
bfs_state->ResetTaskIndex(); // Reset task index safely
});

barrier->Wait();

if (!SetTaskRange()) {
return; // no more tasks
}
barrier->Wait();
//
// if (!SetTaskRange()) {
// return; // no more tasks
// }

while (true) {
for (auto i = left; i < right; i++) {
if (next[i].any()) {
next[i] &= ~seen[i];
seen[i] |= next[i];
change |= next[i].any();
change = false;
do {
for (auto i = left; i < right; i++) {
if (next[i].any()) {
next[i] &= ~seen[i];
seen[i] |= next[i];
change |= next[i].any();
}
}
}
} while (SetTaskRange());

if (!SetTaskRange()) {
break; // no more tasks
}
}
barrier->Wait([&]() {
bfs_state->ResetTaskIndex(); // Reset task index safely
});
barrier->Wait([&]() {
bfs_state->ResetTaskIndex(); // Reset task index safely
});

barrier->Wait();
}
barrier->Wait();
}

void PhysicalShortestPathTask::ReachDetect() {
auto &bfs_state = state.global_bfs_state;
Expand Down

0 comments on commit 4c0a671

Please sign in to comment.