Skip to content

Commit

Permalink
Changed to ColumnDataCollection
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Mar 20, 2024
1 parent d6c801b commit 54f4e42
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ class PhysicalPathFinding : public PhysicalComparisonJoin {
LocalCompressedSparseRow(ClientContext &context,
const PhysicalPathFinding &op);

void Sink(DataChunk &input, GlobalCompressedSparseRow &global_csr);

static void CreateCSR(DataChunk &input, GlobalCompressedSparseRow &global_csr);

static bool IterativeLength(int64_t v_size, int64_t *v, vector<int64_t> &e,
vector<std::bitset<LANE_LIMIT>> &seen,
Expand Down
212 changes: 48 additions & 164 deletions duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#include "duckdb/parallel/event.hpp"
#include "duckdb/parallel/meta_pipeline.hpp"
#include "duckdb/parallel/thread_context.hpp"
#include "duckdb/common/types/column/column_data_collection.hpp"
#include <thread>

#include <iostream>
namespace duckdb {


Expand Down Expand Up @@ -55,20 +55,45 @@ void PhysicalPathFinding::GlobalCompressedSparseRow::InitializeEdge(
initialized_e = true;
}

PhysicalPathFinding::LocalCompressedSparseRow::LocalCompressedSparseRow(
duckdb::ClientContext &context, const duckdb::PhysicalPathFinding &op) :
op(op), executor(context) {}
//===--------------------------------------------------------------------===//
// Sink
//===--------------------------------------------------------------------===//
class PathFindingLocalState : public LocalSinkState {
public:
using GlobalCompressedSparseRow =
PhysicalPathFinding::GlobalCompressedSparseRow;
PathFindingLocalState(ClientContext &context, const PhysicalPathFinding &op,
const idx_t child) : local_tasks(context, op.children[0]->GetTypes()) {
}

void Sink(DataChunk &input, GlobalCompressedSparseRow &global_csr) {
if (global_csr.is_ready) {
// Add the tasks (src, dst) to sink
// Optimizations: Eliminate duplicate sources/destinations
input.Print();
local_tasks.Append(input);
local_tasks.Print();
return;
}
CreateCSR(input, global_csr);
}

static void CreateCSR(DataChunk &input,
GlobalCompressedSparseRow &global_csr);

void PhysicalPathFinding::LocalCompressedSparseRow::CreateCSR(DataChunk &input,
PhysicalPathFinding::GlobalCompressedSparseRow &global_csr) {
ColumnDataCollection local_tasks;
};

void PathFindingLocalState::CreateCSR(DataChunk &input,
GlobalCompressedSparseRow &global_csr) {
if (!global_csr.initialized_v) {
const auto v_size = input.data[8].GetValue(0).GetValue<int64_t>();
global_csr.InitializeVertex(v_size);
}
Vector result = Vector(LogicalTypeId::BIGINT);
auto result = Vector(LogicalTypeId::BIGINT);
BinaryExecutor::Execute<int64_t, int64_t, int64_t>(
input.data[6], input.data[5], result, input.size(),
[&](int64_t src, int64_t cnt) {
[&](const int64_t src, const int64_t cnt) {
int64_t edge_count = 0;
global_csr.v[src + 2] = cnt;
edge_count = edge_count + cnt;
Expand All @@ -82,162 +107,19 @@ void PhysicalPathFinding::LocalCompressedSparseRow::CreateCSR(DataChunk &input,
TernaryExecutor::Execute<int64_t, int64_t, int64_t, int32_t>(
input.data[6], input.data[4], input.data[2], result, input.size(),
[&](int64_t src, int64_t dst, int64_t edge_id) {
auto pos = ++global_csr.v[src + 1];
global_csr.e[(int64_t)pos - 1] = dst;
global_csr.edge_ids[(int64_t)pos - 1] = edge_id;
const auto pos = ++global_csr.v[src + 1];
global_csr.e[static_cast<int64_t>(pos) - 1] = dst;
global_csr.edge_ids[static_cast<int64_t>(pos) - 1] = edge_id;
return 1;
});
global_csr.Print();
}

bool PhysicalPathFinding::LocalCompressedSparseRow::IterativeLength(int64_t v_size, int64_t *v, vector<int64_t> &e,
vector<std::bitset<LANE_LIMIT>> &seen,
vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next) {
bool change = false;
for (auto i = 0; i < v_size; i++) {
next[i] = 0;
}
for (auto i = 0; i < v_size; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
next[n] = next[n] | visit[i];
}
}
}
for (auto i = 0; i < v_size; i++) {
next[i] = next[i] & ~seen[i];
seen[i] = seen[i] | next[i];
change |= next[i].any();
}
return change;
global_csr.Print(); // Debug print
}


void PhysicalPathFinding::LocalCompressedSparseRow::Sink(
DataChunk &input,
PhysicalPathFinding::GlobalCompressedSparseRow &global_csr) {
if (global_csr.is_ready) {
// Go to path-finding --> CSR is ready
input.Print();
global_csr.Print();
auto &src = input.data[0];
auto &dst = input.data[1];


auto v_size = global_csr.v_size;
auto *v = (int64_t *)global_csr.v;
vector<int64_t> &e = global_csr.e;

UnifiedVectorFormat vdata_src;
UnifiedVectorFormat vdata_dst;
src.ToUnifiedFormat(input.size(), vdata_src);
dst.ToUnifiedFormat(input.size(), vdata_dst);
auto src_data = (int32_t*)vdata_src.data;
auto dst_data = (int32_t*)vdata_dst.data;
Vector result = Vector(src.GetType());
ValidityMask &result_validity = FlatVector::Validity(result);
result.SetVectorType(VectorType::FLAT_VECTOR);
auto result_data = FlatVector::GetData<int32_t>(result);

vector<std::bitset<LANE_LIMIT>> seen(v_size);
vector<std::bitset<LANE_LIMIT>> visit1(v_size);
vector<std::bitset<LANE_LIMIT>> visit2(v_size);
short lane_to_num[LANE_LIMIT];
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
lane_to_num[lane] = -1; // inactive
}
idx_t started_searches = 0;
while (started_searches < input.size()) {
// empty visit vectors
for (auto i = 0; i < v_size; i++) {
seen[i] = 0;
visit1[i] = 0;
}

// add search jobs to free lanes
uint64_t active = 0;
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
lane_to_num[lane] = -1;
while (started_searches < input.size()) {
int64_t search_num = started_searches++;
int64_t src_pos = vdata_src.sel->get_index(search_num);
int64_t dst_pos = vdata_dst.sel->get_index(search_num);
if (!vdata_src.validity.RowIsValid(src_pos)) {
result_validity.SetInvalid(search_num);
result_data[search_num] = (uint64_t)-1; /* no path */
} else if (src_data[src_pos] == dst_data[dst_pos]) {
result_data[search_num] =
(uint64_t)0; // path of length 0 does not require a search
} else {
visit1[src_data[src_pos]][lane] = true;
lane_to_num[lane] = search_num; // active lane
active++;
break;
}
}
}

// make passes while a lane is still active
for (int64_t iter = 1; active; iter++) {
if (!IterativeLength(v_size, v, e, seen, (iter & 1) ? visit1 : visit2,
(iter & 1) ? visit2 : visit1)) {
break;
}
// detect lanes that finished
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = lane_to_num[lane];
if (search_num >= 0) { // active lane
int64_t dst_pos = vdata_dst.sel->get_index(search_num);
if (seen[dst_data[dst_pos]][lane]) {
result_data[search_num] =
iter; /* found at iter => iter = path length */
lane_to_num[lane] = -1; // mark inactive
active--;
}
}
}
}

// no changes anymore: any still active searches have no path
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = lane_to_num[lane];
if (search_num >= 0) { // active lane
result_validity.SetInvalid(search_num);
result_data[search_num] = (int64_t)-1; /* no path */
lane_to_num[lane] = -1; // mark inactive
}
}
}
local_results.data.emplace_back(result);
local_results.SetCardinality(input);
local_results.Print();
return;
}
CreateCSR(input, global_csr);
}

//===--------------------------------------------------------------------===//
// Sink
//===--------------------------------------------------------------------===//
class PathFindingLocalState : public LocalSinkState {
public:
using LocalCompressedSparseRow = PhysicalPathFinding::LocalCompressedSparseRow;
PathFindingLocalState(ClientContext &context, const PhysicalPathFinding &op,
const idx_t child)
: local_csr(context, op) {
}

//! local csr
LocalCompressedSparseRow local_csr;
};

class PathFindingGlobalState : public GlobalSinkState {
public:
using GlobalCompressedSparseRow = PhysicalPathFinding::GlobalCompressedSparseRow;
PathFindingGlobalState(ClientContext &context,
const PhysicalPathFinding &op) {
const PhysicalPathFinding &op) : global_tasks(context, op.children[0]->GetTypes()) {
RowLayout lhs_layout;
lhs_layout.Initialize(op.children[0]->types);
RowLayout rhs_layout;
Expand All @@ -246,17 +128,19 @@ class PathFindingGlobalState : public GlobalSinkState {
}

PathFindingGlobalState(PathFindingGlobalState &prev)
: GlobalSinkState(prev), global_csr(std::move(prev.global_csr)), child(prev.child+1) {}
: GlobalSinkState(prev), global_tasks(prev.global_tasks),
global_csr(std::move(prev.global_csr)), child(prev.child + 1) {}

void Sink(DataChunk &input, PathFindingLocalState &lstate) {
lstate.local_csr.Sink(input, *global_csr);
void Sink(DataChunk &input, PathFindingLocalState &lstate) const {
lstate.Sink(input, *global_csr);
}

unique_ptr<GlobalCompressedSparseRow> global_csr;
size_t child;

DataChunk result;

ColumnDataCollection global_tasks;
ColumnDataScanState scan_state;
ColumnDataAppendState append_state;
};

unique_ptr<GlobalSinkState>
Expand Down Expand Up @@ -292,9 +176,9 @@ PhysicalPathFinding::Combine(ExecutionContext &context,
auto &lstate = input.local_state.Cast<PathFindingLocalState>();
auto &client_profiler = QueryProfiler::Get(context.client);

gstate.result.Move(lstate.local_csr.local_results);
gstate.global_tasks.Combine(lstate.local_tasks);
client_profiler.Flush(context.thread.profiler);

gstate.global_tasks.Print();
return SinkCombineResultType::FINISHED;
}

Expand Down Expand Up @@ -379,7 +263,7 @@ PhysicalPathFinding::GetData(ExecutionContext &context, DataChunk &result,
auto &pf_sink = sink_state->Cast<PathFindingGlobalState>();
auto &pf_gstate = input.global_state.Cast<PathFindingGlobalSourceState>();
auto &pf_lstate = input.local_state.Cast<PathFindingLocalSourceState>();
result.Move(pf_sink.result);
pf_sink.global_tasks.Scan(pf_sink.scan_state, result);
result.Print();
pf_gstate.Initialize(pf_sink);

Expand Down

0 comments on commit 54f4e42

Please sign in to comment.