Skip to content

Commit

Permalink
Taking out csr of path finding operator
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Nov 5, 2024
1 parent 3e262a0 commit 805b3cf
Show file tree
Hide file tree
Showing 15 changed files with 332 additions and 487 deletions.
4 changes: 2 additions & 2 deletions src/core/functions/scalar/csr_creation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ static void CreateCsrEdgeFunction(DataChunk &args, ExpressionState &state,
auto pos = ++csr_entry->second->v[src + 1];
csr_entry->second->e[(int64_t)pos - 1] = dst;
csr_entry->second->edge_ids[(int64_t)pos - 1] = edge_id;
return 1;
return info.id;
});
return;
}
Expand Down Expand Up @@ -189,7 +189,7 @@ static void CreateCsrEdgeFunction(DataChunk &args, ExpressionState &state,
csr_entry->second->e[(int64_t)pos - 1] = dst;
csr_entry->second->edge_ids[(int64_t)pos - 1] = edge_id;
csr_entry->second->w_double[(int64_t)pos - 1] = weight;
return weight;
return info.id;
});
}

Expand Down
68 changes: 34 additions & 34 deletions src/core/operator/event/csr_edge_creation_event.cpp
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
#include "duckpgq/core/operator/event/csr_edge_creation_event.hpp"
#include <duckpgq/core/operator/task/csr_edge_creation_task.hpp>

namespace duckpgq {
namespace core {

CSREdgeCreationEvent::CSREdgeCreationEvent(PathFindingGlobalState &gstate_p, Pipeline &pipeline_p, const PhysicalOperator &op_p)
: BasePipelineEvent(pipeline_p), gstate(gstate_p), op(op_p) {}

void CSREdgeCreationEvent::Schedule() {
auto &context = pipeline->GetClientContext();
auto &ts = TaskScheduler::GetScheduler(context);
idx_t num_threads = ts.NumberOfThreads();
auto &scan_state = gstate.scan_state;
auto &global_inputs = gstate.global_inputs;

global_inputs->InitializeScan(scan_state);

vector<shared_ptr<Task>> tasks;
for (idx_t tnum = 0; tnum < num_threads; tnum++) {
tasks.push_back(make_uniq<PhysicalCSREdgeCreationTask>(shared_from_this(),
context, gstate, op));
}
SetTasks(std::move(tasks));
}

void CSREdgeCreationEvent::FinishEvent() {
auto &gstate = this->gstate;
auto &global_csr = gstate.global_csr;
global_csr->is_ready = true;
}

} // namespace core
} // namespace duckpgq
// #include "duckpgq/core/operator/event/csr_edge_creation_event.hpp"
// #include <duckpgq/core/operator/task/csr_edge_creation_task.hpp>
//
// namespace duckpgq {
// namespace core {
//
// CSREdgeCreationEvent::CSREdgeCreationEvent(PathFindingGlobalState &gstate_p, Pipeline &pipeline_p, const PhysicalOperator &op_p)
// : BasePipelineEvent(pipeline_p), gstate(gstate_p), op(op_p) {}
//
// void CSREdgeCreationEvent::Schedule() {
// auto &context = pipeline->GetClientContext();
// auto &ts = TaskScheduler::GetScheduler(context);
// idx_t num_threads = ts.NumberOfThreads();
// auto &scan_state = gstate.scan_state;
// auto &global_inputs = gstate.global_csr_id;
//
// global_inputs->InitializeScan(scan_state);
//
// vector<shared_ptr<Task>> tasks;
// for (idx_t tnum = 0; tnum < num_threads; tnum++) {
// tasks.push_back(make_uniq<PhysicalCSREdgeCreationTask>(shared_from_this(),
// context, gstate, op));
// }
// SetTasks(std::move(tasks));
// }
//
// void CSREdgeCreationEvent::FinishEvent() {
// auto &gstate = this->gstate;
// auto &global_csr = gstate.global_csr;
// global_csr->is_ready = true;
// }
//
// } // namespace core
// } // namespace duckpgq
4 changes: 2 additions & 2 deletions src/core/operator/event/iterative_length_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace duckpgq {
namespace core {

ParallelIterativeEvent::ParallelIterativeEvent(PathFindingGlobalState &gstate_p, Pipeline &pipeline_p, const PhysicalPathFinding &op_p)
ParallelIterativeEvent::ParallelIterativeEvent(PathFindingGlobalSinkState &gstate_p, Pipeline &pipeline_p, const PhysicalPathFinding &op_p)
: BasePipelineEvent(pipeline_p), gstate(gstate_p), op(op_p) {}


Expand All @@ -27,7 +27,7 @@ void ParallelIterativeEvent::FinishEvent() {
auto &bfs_state = gstate.global_bfs_state;

// if remaining pairs, schedule the BFS for the next batch
if (bfs_state->started_searches < gstate.global_tasks->Count()) {
if (bfs_state->started_searches < gstate.global_pairs->Count()) {
op.ScheduleBFSEvent(*pipeline, *this, gstate);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/operator/event/shortest_path_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace duckpgq {
namespace core {

ParallelShortestPathEvent::ParallelShortestPathEvent(PathFindingGlobalState &gstate_p,
ParallelShortestPathEvent::ParallelShortestPathEvent(PathFindingGlobalSinkState &gstate_p,
Pipeline &pipeline_p, const PhysicalPathFinding &op_p)
: BasePipelineEvent(pipeline_p), gstate(gstate_p), op(op_p) {

Expand All @@ -29,7 +29,7 @@ void ParallelShortestPathEvent::FinishEvent() {
auto &bfs_state = gstate.global_bfs_state;

// if remaining pairs, schedule the BFS for the next batch
if (bfs_state->started_searches < gstate.global_tasks->Count()) {
if (bfs_state->started_searches < gstate.global_pairs->Count()) {
op.ScheduleBFSEvent(*pipeline, *this, gstate);
}
};
Expand Down
Loading

0 comments on commit 805b3cf

Please sign in to comment.