Skip to content

Commit

Permalink
Disabled prints. New synchronization now finished
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Nov 15, 2024
1 parent 3b60e94 commit 7492240
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 31 deletions.
12 changes: 6 additions & 6 deletions src/core/operator/physical_path_finding_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ shared_ptr<std::pair<idx_t, idx_t>> GlobalBFSState::FetchTask() {
std::unique_lock<std::mutex> lock(queue_mutex); // Lock the mutex to access the queue

// Log entry into FetchTask
std::cout << "FetchTask: Checking tasks. Current index: " << current_task_index
<< ", Total tasks: " << global_task_queue.size() << std::endl;
// std::cout << "FetchTask: Checking tasks. Current index: " << current_task_index
// << ", Total tasks: " << global_task_queue.size() << std::endl;

// Avoid unnecessary waiting if no tasks are available
if (current_task_index >= global_task_queue.size()) {
std::cout << "FetchTask: No more tasks available. Exiting." << std::endl;
// std::cout << "FetchTask: No more tasks available. Exiting." << std::endl;
return nullptr; // No more tasks
}

Expand All @@ -133,14 +133,14 @@ shared_ptr<std::pair<idx_t, idx_t>> GlobalBFSState::FetchTask() {
current_task_index++;

// Log the fetched task
std::cout << "FetchTask: Fetched task " << current_task_index - 1
<< " -> [" << task->first << ", " << task->second << "]" << std::endl;
// std::cout << "FetchTask: Fetched task " << current_task_index - 1
// << " -> [" << task->first << ", " << task->second << "]" << std::endl;

return task;
}

// Log no tasks available after wait
std::cout << "FetchTask: No more tasks available after wait. Exiting." << std::endl;
// std::cout << "FetchTask: No more tasks available after wait. Exiting." << std::endl;
return nullptr;
}

Expand Down
6 changes: 3 additions & 3 deletions src/core/operator/task/iterative_length_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ bool PhysicalIterativeTask::SetTaskRange() {

// Attempt to get a task range
bool has_tasks = SetTaskRange();
std::cout << "Worker " << worker_id << ": Has tasks = " << has_tasks << std::endl;
// std::cout << "Worker " << worker_id << ": Has tasks = " << has_tasks << std::endl;

// Clear `next` array regardless of task availability
for (auto i = left; i < right; i++) {
Expand Down Expand Up @@ -92,7 +92,7 @@ bool PhysicalIterativeTask::SetTaskRange() {

// Synchronize at the end of the main processing
barrier->Wait([&]() {
std::cout << "Worker " << worker_id << ": Resetting task index." << std::endl;
// std::cout << "Worker " << worker_id << ": Resetting task index." << std::endl;
bfs_state->ResetTaskIndex();
});
barrier->Wait();
Expand All @@ -114,7 +114,7 @@ bool PhysicalIterativeTask::SetTaskRange() {

// Final synchronization after processing
barrier->Wait([&]() {
std::cout << "Worker " << worker_id << ": Resetting task index at second barrier." << std::endl;
// std::cout << "Worker " << worker_id << ": Resetting task index at second barrier." << std::endl;
bfs_state->ResetTaskIndex();
});
barrier->Wait();
Expand Down
24 changes: 12 additions & 12 deletions src/core/operator/task/shortest_path_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,26 @@ PhysicalShortestPathTask::PhysicalShortestPathTask(shared_ptr<Event> event_p, Cl
ReachDetect();
}

std::cout << "Worker " << worker_id << ": Waiting at barrier before ResetTaskIndex." << std::endl;
// std::cout << "Worker " << worker_id << ": Waiting at barrier before ResetTaskIndex." << std::endl;
barrier->Wait();
if (worker_id == 0) {
bfs_state->ResetTaskIndex();
std::cout << "Worker " << worker_id << ": ResetTaskIndex completed." << std::endl;
// std::cout << "Worker " << worker_id << ": ResetTaskIndex completed." << std::endl;
}
barrier->Wait();
std::cout << "Worker " << worker_id << ": Passed barrier after ResetTaskIndex." << std::endl;
// std::cout << "Worker " << worker_id << ": Passed barrier after ResetTaskIndex." << std::endl;
} while (bfs_state->change);

barrier->Wait();
if (worker_id == 0) {
std::cout << "Worker " << worker_id << " started path construction" << std::endl;
// std::cout << "Worker " << worker_id << " started path construction" << std::endl;
PathConstruction();
std::cout << "Worker " << worker_id << " finished path construction" << std::endl;
// std::cout << "Worker " << worker_id << " finished path construction" << std::endl;
}

// Final synchronization before finishing
barrier->Wait();
std::cout << "Worker " << worker_id << " finishing task" << std::endl;
// std::cout << "Worker " << worker_id << " finishing task" << std::endl;
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;

Expand Down Expand Up @@ -78,7 +78,7 @@ PhysicalShortestPathTask::PhysicalShortestPathTask(shared_ptr<Event> event_p, Cl

// Attempt to get a task range
bool has_tasks = SetTaskRange();
std::cout << "Worker " << worker_id << ": Has tasks = " << has_tasks << std::endl;
// std::cout << "Worker " << worker_id << ": Has tasks = " << has_tasks << std::endl;

// Clear next array regardless of whether the worker has tasks
for (auto i = left; i < right; i++) {
Expand All @@ -87,7 +87,7 @@ PhysicalShortestPathTask::PhysicalShortestPathTask(shared_ptr<Event> event_p, Cl

// Synchronize after clearing
barrier->Wait();
std::cout << "Worker " << worker_id << ": Passed first barrier." << std::endl;
// std::cout << "Worker " << worker_id << ": Passed first barrier." << std::endl;

// Main processing loop
while (has_tasks) {
Expand All @@ -112,13 +112,13 @@ PhysicalShortestPathTask::PhysicalShortestPathTask(shared_ptr<Event> event_p, Cl
// Check for a new task range
has_tasks = SetTaskRange();
if (!has_tasks) {
std::cout << "Worker " << worker_id << ": No more tasks found to explore." << std::endl;
// std::cout << "Worker " << worker_id << ": No more tasks found to explore." << std::endl;
}
}

// Synchronize at the end of the main processing
barrier->Wait([&]() {
std::cout << "Worker " << worker_id << ": Resetting task index." << std::endl;
// std::cout << "Worker " << worker_id << ": Resetting task index." << std::endl;
bfs_state->ResetTaskIndex();
});
barrier->Wait();
Expand All @@ -139,11 +139,11 @@ PhysicalShortestPathTask::PhysicalShortestPathTask(shared_ptr<Event> event_p, Cl

// Synchronize again
barrier->Wait([&]() {
std::cout << "Worker " << worker_id << ": Resetting task index at second barrier." << std::endl;
// std::cout << "Worker " << worker_id << ": Resetting task index at second barrier." << std::endl;
bfs_state->ResetTaskIndex();
});
barrier->Wait();
std::cout << "Worker " << worker_id << ": Passed second barrier." << std::endl;
// std::cout << "Worker " << worker_id << ": Passed second barrier." << std::endl;
}

void PhysicalShortestPathTask::ReachDetect() {
Expand Down
20 changes: 10 additions & 10 deletions src/core/utils/duckpgq_barrier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,34 @@ void Barrier::Wait(std::function<void()> resetAction) {
// Convert thread ID to a string representation (hash)
auto thread_id_str = std::to_string(std::hash<std::thread::id>{}(std::this_thread::get_id()));

std::cout << "Thread " << thread_id_str << " entering barrier: Current generation = "
<< lGen << ", mCount = " << mCount << ", mThreshold = " << mThreshold << std::endl;
// std::cout << "Thread " << thread_id_str << " entering barrier: Current generation = "
// << lGen << ", mCount = " << mCount << ", mThreshold = " << mThreshold << std::endl;

if (!--mCount) {
// Last thread to reach the barrier
mGeneration++;
mCount = mThreshold;

std::cout << "Thread " << thread_id_str
<< " is the last thread. Performing reset action and updating state." << std::endl;
// std::cout << "Thread " << thread_id_str
// << " is the last thread. Performing reset action and updating state." << std::endl;

if (resetAction) {
resetAction(); // Perform the reset action
}

std::cout << "Notifying all threads: New generation = " << mGeneration << ", mCount reset to "
<< mCount << "." << std::endl;
// std::cout << "Notifying all threads: New generation = " << mGeneration << ", mCount reset to "
// << mCount << "." << std::endl;

mCond.notify_all(); // Wake up all waiting threads
} else {
// Other threads wait for the generation to change
std::cout << "Thread " << thread_id_str
<< " waiting: Current generation = " << lGen << ", mCount = " << mCount << "." << std::endl;
// std::cout << "Thread " << thread_id_str
// << " waiting: Current generation = " << lGen << ", mCount = " << mCount << "." << std::endl;

mCond.wait(lLock, [this, lGen] { return lGen != mGeneration; });

std::cout << "Thread " << thread_id_str << " resumed: Current generation = "
<< mGeneration << ", mCount = " << mCount << "." << std::endl;
// std::cout << "Thread " << thread_id_str << " resumed: Current generation = "
// << mGeneration << ", mCount = " << mCount << "." << std::endl;
}
}

Expand Down

0 comments on commit 7492240

Please sign in to comment.