Skip to content

Commit

Permalink
Change type of pf result, move lstate to gstate
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Mar 12, 2024
1 parent 6833842 commit 737db39
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class PhysicalPathFinding : public PhysicalComparisonJoin {
//! Local copy of the expression executor
ExpressionExecutor executor;

//! Final result for the path-finding pairs
DataChunk local_results;

};

public:
Expand Down
16 changes: 8 additions & 8 deletions duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void PhysicalPathFinding::LocalCompressedSparseRow::Sink(
Vector result = Vector(src.GetType());
ValidityMask &result_validity = FlatVector::Validity(result);
result.SetVectorType(VectorType::FLAT_VECTOR);
auto result_data = FlatVector::GetData<int64_t>(result);
auto result_data = FlatVector::GetData<int32_t>(result);

vector<std::bitset<LANE_LIMIT>> seen(v_size);
vector<std::bitset<LANE_LIMIT>> visit1(v_size);
Expand Down Expand Up @@ -210,7 +210,9 @@ void PhysicalPathFinding::LocalCompressedSparseRow::Sink(
}
}
}
result.Print();
local_results.data.emplace_back(result);
local_results.SetCardinality(input);
local_results.Print();
return;
}
CreateCSR(input, global_csr);
Expand Down Expand Up @@ -253,6 +255,8 @@ class PathFindingGlobalState : public GlobalSinkState {
unique_ptr<GlobalCompressedSparseRow> global_csr;
size_t child;

DataChunk result;

};

unique_ptr<GlobalSinkState>
Expand Down Expand Up @@ -288,6 +292,7 @@ PhysicalPathFinding::Combine(ExecutionContext &context,
auto &lstate = input.local_state.Cast<PathFindingLocalState>();
auto &client_profiler = QueryProfiler::Get(context.client);

gstate.result.Move(lstate.local_csr.local_results);
client_profiler.Flush(context.thread.profiler);

return SinkCombineResultType::FINISHED;
Expand Down Expand Up @@ -326,8 +331,6 @@ class PathFindingLocalSourceState : public LocalSourceState {
}

const PhysicalPathFinding &op;

DataChunk pf_results;
};

class PathFindingGlobalSourceState : public GlobalSourceState {
Expand All @@ -345,10 +348,8 @@ class PathFindingGlobalSourceState : public GlobalSourceState {

public:
idx_t MaxThreads() override {
// We can't leverage any more threads than block pairs.
const auto &sink_state = (op.sink_state->Cast<PathFindingGlobalState>());
return 1;

}

void GetNextPair(ClientContext &client, PathFindingGlobalState &gstate,
Expand Down Expand Up @@ -378,11 +379,10 @@ PhysicalPathFinding::GetData(ExecutionContext &context, DataChunk &result,
auto &pf_sink = sink_state->Cast<PathFindingGlobalState>();
auto &pf_gstate = input.global_state.Cast<PathFindingGlobalSourceState>();
auto &pf_lstate = input.local_state.Cast<PathFindingLocalSourceState>();

result.Move(pf_sink.result);
result.Print();
pf_gstate.Initialize(pf_sink);


return result.size() == 0 ? SourceResultType::FINISHED
: SourceResultType::HAVE_MORE_OUTPUT;
}
Expand Down

0 comments on commit 737db39

Please sign in to comment.