From 54f4e42d6fe69884c0bbd39b44cc68dc4021aee5 Mon Sep 17 00:00:00 2001 From: dtenwolde Date: Wed, 20 Mar 2024 16:54:14 +0100 Subject: [PATCH] Changed to ColumnDataCollection --- .../physical_path_finding_operator.hpp | 2 - .../physical_path_finding_operator.cpp | 212 ++++-------------- 2 files changed, 48 insertions(+), 166 deletions(-) diff --git a/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp b/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp index 734469c2..c519096e 100644 --- a/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp +++ b/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp @@ -77,9 +77,7 @@ class PhysicalPathFinding : public PhysicalComparisonJoin { LocalCompressedSparseRow(ClientContext &context, const PhysicalPathFinding &op); - void Sink(DataChunk &input, GlobalCompressedSparseRow &global_csr); - static void CreateCSR(DataChunk &input, GlobalCompressedSparseRow &global_csr); static bool IterativeLength(int64_t v_size, int64_t *v, vector &e, vector> &seen, diff --git a/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp b/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp index 591407a0..51e4bf38 100644 --- a/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp +++ b/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp @@ -6,9 +6,9 @@ #include "duckdb/parallel/event.hpp" #include "duckdb/parallel/meta_pipeline.hpp" #include "duckdb/parallel/thread_context.hpp" +#include "duckdb/common/types/column/column_data_collection.hpp" #include -#include namespace duckdb { @@ -55,20 +55,45 @@ void PhysicalPathFinding::GlobalCompressedSparseRow::InitializeEdge( initialized_e = true; } -PhysicalPathFinding::LocalCompressedSparseRow::LocalCompressedSparseRow( - duckdb::ClientContext &context, const duckdb::PhysicalPathFinding &op) : - op(op), executor(context) {} +//===--------------------------------------------------------------------===// +// Sink +//===--------------------------------------------------------------------===// +class PathFindingLocalState : public LocalSinkState { +public: + using GlobalCompressedSparseRow = + PhysicalPathFinding::GlobalCompressedSparseRow; + PathFindingLocalState(ClientContext &context, const PhysicalPathFinding &op, + const idx_t child) : local_tasks(context, op.children[0]->GetTypes()) { + } + + void Sink(DataChunk &input, GlobalCompressedSparseRow &global_csr) { + if (global_csr.is_ready) { + // Add the tasks (src, dst) to sink + // Optimizations: Eliminate duplicate sources/destinations + input.Print(); + local_tasks.Append(input); + local_tasks.Print(); + return; + } + CreateCSR(input, global_csr); + } + + static void CreateCSR(DataChunk &input, + GlobalCompressedSparseRow &global_csr); -void PhysicalPathFinding::LocalCompressedSparseRow::CreateCSR(DataChunk &input, - PhysicalPathFinding::GlobalCompressedSparseRow &global_csr) { + ColumnDataCollection local_tasks; +}; + +void PathFindingLocalState::CreateCSR(DataChunk &input, + GlobalCompressedSparseRow &global_csr) { if (!global_csr.initialized_v) { const auto v_size = input.data[8].GetValue(0).GetValue(); global_csr.InitializeVertex(v_size); } - Vector result = Vector(LogicalTypeId::BIGINT); + auto result = Vector(LogicalTypeId::BIGINT); BinaryExecutor::Execute( input.data[6], input.data[5], result, input.size(), - [&](int64_t src, int64_t cnt) { + [&](const int64_t src, const int64_t cnt) { int64_t edge_count = 0; global_csr.v[src + 2] = cnt; edge_count = edge_count + cnt; @@ -82,162 +107,19 @@ void PhysicalPathFinding::LocalCompressedSparseRow::CreateCSR(DataChunk &input, TernaryExecutor::Execute( input.data[6], input.data[4], input.data[2], result, input.size(), [&](int64_t src, int64_t dst, int64_t edge_id) { - auto pos = ++global_csr.v[src + 1]; - global_csr.e[(int64_t)pos - 1] = dst; - global_csr.edge_ids[(int64_t)pos - 1] = edge_id; + const auto pos = ++global_csr.v[src + 1]; + global_csr.e[static_cast(pos) - 1] = dst; + global_csr.edge_ids[static_cast(pos) - 1] = edge_id; return 1; }); - global_csr.Print(); -} - -bool PhysicalPathFinding::LocalCompressedSparseRow::IterativeLength(int64_t v_size, int64_t *v, vector &e, - vector> &seen, - vector> &visit, - vector> &next) { - bool change = false; - for (auto i = 0; i < v_size; i++) { - next[i] = 0; - } - for (auto i = 0; i < v_size; i++) { - if (visit[i].any()) { - for (auto offset = v[i]; offset < v[i + 1]; offset++) { - auto n = e[offset]; - next[n] = next[n] | visit[i]; - } - } - } - for (auto i = 0; i < v_size; i++) { - next[i] = next[i] & ~seen[i]; - seen[i] = seen[i] | next[i]; - change |= next[i].any(); - } - return change; + global_csr.Print(); // Debug print } - -void PhysicalPathFinding::LocalCompressedSparseRow::Sink( - DataChunk &input, - PhysicalPathFinding::GlobalCompressedSparseRow &global_csr) { - if (global_csr.is_ready) { - // Go to path-finding --> CSR is ready - input.Print(); - global_csr.Print(); - auto &src = input.data[0]; - auto &dst = input.data[1]; - - - auto v_size = global_csr.v_size; - auto *v = (int64_t *)global_csr.v; - vector &e = global_csr.e; - - UnifiedVectorFormat vdata_src; - UnifiedVectorFormat vdata_dst; - src.ToUnifiedFormat(input.size(), vdata_src); - dst.ToUnifiedFormat(input.size(), vdata_dst); - auto src_data = (int32_t*)vdata_src.data; - auto dst_data = (int32_t*)vdata_dst.data; - Vector result = Vector(src.GetType()); - ValidityMask &result_validity = FlatVector::Validity(result); - result.SetVectorType(VectorType::FLAT_VECTOR); - auto result_data = FlatVector::GetData(result); - - vector> seen(v_size); - vector> visit1(v_size); - vector> visit2(v_size); - short lane_to_num[LANE_LIMIT]; - for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { - lane_to_num[lane] = -1; // inactive - } - idx_t started_searches = 0; - while (started_searches < input.size()) { - // empty visit vectors - for (auto i = 0; i < v_size; i++) { - seen[i] = 0; - visit1[i] = 0; - } - - // add search jobs to free lanes - uint64_t active = 0; - for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { - lane_to_num[lane] = -1; - while (started_searches < input.size()) { - int64_t search_num = started_searches++; - int64_t src_pos = vdata_src.sel->get_index(search_num); - int64_t dst_pos = vdata_dst.sel->get_index(search_num); - if (!vdata_src.validity.RowIsValid(src_pos)) { - result_validity.SetInvalid(search_num); - result_data[search_num] = (uint64_t)-1; /* no path */ - } else if (src_data[src_pos] == dst_data[dst_pos]) { - result_data[search_num] = - (uint64_t)0; // path of length 0 does not require a search - } else { - visit1[src_data[src_pos]][lane] = true; - lane_to_num[lane] = search_num; // active lane - active++; - break; - } - } - } - - // make passes while a lane is still active - for (int64_t iter = 1; active; iter++) { - if (!IterativeLength(v_size, v, e, seen, (iter & 1) ? visit1 : visit2, - (iter & 1) ? visit2 : visit1)) { - break; - } - // detect lanes that finished - for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { - int64_t search_num = lane_to_num[lane]; - if (search_num >= 0) { // active lane - int64_t dst_pos = vdata_dst.sel->get_index(search_num); - if (seen[dst_data[dst_pos]][lane]) { - result_data[search_num] = - iter; /* found at iter => iter = path length */ - lane_to_num[lane] = -1; // mark inactive - active--; - } - } - } - } - - // no changes anymore: any still active searches have no path - for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { - int64_t search_num = lane_to_num[lane]; - if (search_num >= 0) { // active lane - result_validity.SetInvalid(search_num); - result_data[search_num] = (int64_t)-1; /* no path */ - lane_to_num[lane] = -1; // mark inactive - } - } - } - local_results.data.emplace_back(result); - local_results.SetCardinality(input); - local_results.Print(); - return; - } - CreateCSR(input, global_csr); -} - -//===--------------------------------------------------------------------===// -// Sink -//===--------------------------------------------------------------------===// -class PathFindingLocalState : public LocalSinkState { -public: - using LocalCompressedSparseRow = PhysicalPathFinding::LocalCompressedSparseRow; - PathFindingLocalState(ClientContext &context, const PhysicalPathFinding &op, - const idx_t child) - : local_csr(context, op) { - } - - //! local csr - LocalCompressedSparseRow local_csr; -}; - class PathFindingGlobalState : public GlobalSinkState { public: using GlobalCompressedSparseRow = PhysicalPathFinding::GlobalCompressedSparseRow; PathFindingGlobalState(ClientContext &context, - const PhysicalPathFinding &op) { + const PhysicalPathFinding &op) : global_tasks(context, op.children[0]->GetTypes()) { RowLayout lhs_layout; lhs_layout.Initialize(op.children[0]->types); RowLayout rhs_layout; @@ -246,17 +128,19 @@ class PathFindingGlobalState : public GlobalSinkState { } PathFindingGlobalState(PathFindingGlobalState &prev) - : GlobalSinkState(prev), global_csr(std::move(prev.global_csr)), child(prev.child+1) {} + : GlobalSinkState(prev), global_tasks(prev.global_tasks), + global_csr(std::move(prev.global_csr)), child(prev.child + 1) {} - void Sink(DataChunk &input, PathFindingLocalState &lstate) { - lstate.local_csr.Sink(input, *global_csr); + void Sink(DataChunk &input, PathFindingLocalState &lstate) const { + lstate.Sink(input, *global_csr); } unique_ptr global_csr; size_t child; - DataChunk result; - + ColumnDataCollection global_tasks; + ColumnDataScanState scan_state; + ColumnDataAppendState append_state; }; unique_ptr @@ -292,9 +176,9 @@ PhysicalPathFinding::Combine(ExecutionContext &context, auto &lstate = input.local_state.Cast(); auto &client_profiler = QueryProfiler::Get(context.client); - gstate.result.Move(lstate.local_csr.local_results); + gstate.global_tasks.Combine(lstate.local_tasks); client_profiler.Flush(context.thread.profiler); - + gstate.global_tasks.Print(); return SinkCombineResultType::FINISHED; } @@ -379,7 +263,7 @@ PhysicalPathFinding::GetData(ExecutionContext &context, DataChunk &result, auto &pf_sink = sink_state->Cast(); auto &pf_gstate = input.global_state.Cast(); auto &pf_lstate = input.local_state.Cast(); - result.Move(pf_sink.result); + pf_sink.global_tasks.Scan(pf_sink.scan_state, result); result.Print(); pf_gstate.Initialize(pf_sink);