Skip to content

Commit

Permalink
Trying to get copy to work
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Nov 15, 2024
1 parent 7492240 commit 1f70de2
Show file tree
Hide file tree
Showing 2 changed files with 2,833 additions and 2,797 deletions.
52 changes: 44 additions & 8 deletions src/core/operator/physical_path_finding_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ GlobalBFSState::GlobalBFSState(shared_ptr<DataChunk> pairs_, CSR* csr_, int64_t
} else {
throw NotImplementedException("Mode not supported");
}
result.SetCardinality(pairs_->size());

auto &src_data = pairs->data[0];
auto &dst_data = pairs->data[1];
src_data.ToUnifiedFormat(pairs->size(), vdata_src);
Expand Down Expand Up @@ -396,20 +398,54 @@ PhysicalPathFinding::GetLocalSourceState(ExecutionContext &context,
return make_uniq<PathFindingLocalSourceState>(context.client, *this);
}

SourceResultType
PhysicalPathFinding::GetData(ExecutionContext &context, DataChunk &result,
SourceResultType PhysicalPathFinding::GetData(ExecutionContext &context, DataChunk &result,
OperatorSourceInput &input) const {
auto &pf_sink = sink_state->Cast<PathFindingGlobalSinkState>();
auto &pf_bfs_state = pf_sink.global_bfs_state;

// If there are no pairs, we're done
if (pf_sink.global_pairs->Count() == 0) {
return SourceResultType::FINISHED;
}
pf_bfs_state->result.SetCardinality(*pf_bfs_state->pairs);
result.SetCapacity(pf_bfs_state->pairs->size());
result.Move(*pf_bfs_state->pairs);
result.Fuse(pf_bfs_state->result);
return result.size() == 0 ? SourceResultType::FINISHED
: SourceResultType::HAVE_MORE_OUTPUT;

// Track the current offset to handle batches larger than STANDARD_VECTOR_SIZE
static idx_t current_offset = 0;

// Determine the number of tuples to process in this call
idx_t remaining_tuples = pf_bfs_state->pairs->size() - current_offset;
auto tuples_to_copy = std::min<idx_t>(STANDARD_VECTOR_SIZE, pf_bfs_state->pairs->size() - current_offset);
// If there are no tuples left, we're finished
if (tuples_to_copy == 0) {
current_offset = 0; // Reset for future calls
return SourceResultType::FINISHED;
}

std::cout << "Current offset: " << current_offset
<< ", Tuples to copy: " << tuples_to_copy
<< ", Pairs size: " << pf_bfs_state->pairs->size()
<< ", Result size: " << pf_bfs_state->result.size() << std::endl;

// Slice the pairs and result data to the appropriate size for this batch
DataChunk temp_pairs;
temp_pairs.Initialize(context.client, pf_bfs_state->pairs->GetTypes());
pf_bfs_state->pairs->Copy(temp_pairs, current_offset);

DataChunk temp_result;
temp_result.Initialize(context.client, pf_bfs_state->result.GetTypes());
pf_bfs_state->result.Copy(temp_result, current_offset);

// Move the sliced data into the result DataChunk
result.SetCapacity(tuples_to_copy);
result.Move(temp_pairs);
result.Fuse(temp_result);
result.SetCardinality(tuples_to_copy);

// Update the current offset
current_offset += tuples_to_copy;

// Return appropriate status based on whether there are more tuples to process
return current_offset >= pf_bfs_state->pairs->size() ? SourceResultType::FINISHED
: SourceResultType::HAVE_MORE_OUTPUT;
}

//===--------------------------------------------------------------------===//
Expand Down
Loading

0 comments on commit 1f70de2

Please sign in to comment.