From 76218ce7040c8f2ea62e571f5b9797a0fdac7cb2 Mon Sep 17 00:00:00 2001 From: dtenwolde Date: Wed, 6 Nov 2024 10:37:34 +0100 Subject: [PATCH] Update v size --- .../physical_path_finding_operator.cpp | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/core/operator/physical_path_finding_operator.cpp b/src/core/operator/physical_path_finding_operator.cpp index 8228c90..0ff2193 100644 --- a/src/core/operator/physical_path_finding_operator.cpp +++ b/src/core/operator/physical_path_finding_operator.cpp @@ -204,10 +204,10 @@ PhysicalPathFinding::Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const { auto &gstate = input.global_state.Cast(); auto &lstate = input.local_state.Cast(); - auto &client_profiler = QueryProfiler::Get(context.client); - + if (gstate.child == 0) { + return SinkCombineResultType::FINISHED; + } gstate.global_pairs->Combine(lstate.local_pairs); - client_profiler.Flush(context.thread.profiler); return SinkCombineResultType::FINISHED; } @@ -223,11 +223,18 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event, auto &global_tasks = gstate.global_pairs; auto duckpgq_state = GetDuckPGQState(context); // TODO - auto csr = duckpgq_state->GetCSR(0); - // Check if we have to do anything for CSR child + if (gstate.csr == nullptr) { + throw InternalException("CSR not initialized"); + } + std::cout << gstate.csr->ToString() << std::endl; + // Check if we have to do anything for CSR child + if (gstate.child == 0) { + ++gstate.child; + return SinkFinalizeType::READY; + } - if (gstate.child == 1 && global_tasks->Count() > 0) { + if (global_tasks->Count() > 0) { auto all_pairs = make_shared_ptr(); DataChunk pairs; global_tasks->InitializeScanChunk(*all_pairs); @@ -242,7 +249,7 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event, idx_t num_threads = ts.NumberOfThreads(); idx_t mode = this->mode == "iterativelength" ? 0 : 1; // TODO - gstate.global_bfs_state = make_uniq(all_pairs, 0, + gstate.global_bfs_state = make_uniq(all_pairs, gstate.csr->vsize, num_threads, mode, context); Value task_size_value;