Skip to content

Commit

Permalink
add even more print statements
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinzwang committed Feb 14, 2024
1 parent bdbdc73 commit 5f3de4d
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,12 +518,18 @@ def place_in_queue(item):
except Full:
pass

print("==== start plan run ====")

with profiler(profile_filename):
print("==== 1 ====")
try:
next_step = next(tasks)
print("==== 2 ====")

while is_active(): # Loop: Dispatch -> await.
print("==== 3 ====")
while is_active(): # Loop: Dispatch (get tasks -> batch dispatch).
print("==== 4 ====")
tasks_to_dispatch: list[PartitionTask] = []

cores: int = max(next(num_cpus_provider) - self.reserved_cores, 0)
Expand All @@ -533,6 +539,7 @@ def place_in_queue(item):

# Loop: Get a batch of tasks.
while len(tasks_to_dispatch) < dispatches_allowed and is_active():
print("==== 5 ====")
if next_step is None:
# Blocked on already dispatched tasks; await some tasks.
break
Expand All @@ -557,6 +564,7 @@ def place_in_queue(item):
# Add the task to the batch.
tasks_to_dispatch.append(next_step)
next_step = next(tasks)
print("==== 6 ====")

# Dispatch the batch of tasks.
logger.debug(
Expand All @@ -568,6 +576,8 @@ def place_in_queue(item):
if not is_active():
break

print("==== 7 ====")

for task in tasks_to_dispatch:
results = _build_partitions(daft_execution_config, task)
logger.debug("%s -> %s", task, results)
Expand All @@ -577,12 +587,15 @@ def place_in_queue(item):

pbar.mark_task_start(task)

print("==== 8 ====")

if dispatches_allowed == 0 or next_step is None:
break

# Await a batch of tasks.
# (Awaits the next task, and then the next batch of tasks within 10ms.)

print("==== 9 ====")
dispatch = datetime.now()
completed_task_ids = []
for wait_for in ("next_one", "next_batch"):
Expand Down Expand Up @@ -620,6 +633,7 @@ def place_in_queue(item):

pbar.mark_task_done(task)
del inflight_tasks[task_id]
print("==== 10 ====")

logger.debug(
"%ss to await results from %s", (datetime.now() - dispatch).total_seconds(), completed_task_ids
Expand All @@ -628,6 +642,8 @@ def place_in_queue(item):
if next_step is None:
next_step = next(tasks)

print("==== 11 ====")

Check warning on line 645 in daft/runners/ray_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_runner.py#L645

Added line #L645 was not covered by tests

except StopIteration as e:
place_in_queue(e)

Expand All @@ -636,6 +652,8 @@ def place_in_queue(item):
place_in_queue(e)
pbar.close()
raise
print("==== 12 ====")
print("==== 13 ====")

pbar.close()

Expand Down Expand Up @@ -772,6 +790,8 @@ def run_iter(
else:
result = self.scheduler.next(result_uuid)

print("==== Got result ====")

if isinstance(result, StopIteration):
break
elif isinstance(result, Exception):
Expand Down

0 comments on commit 5f3de4d

Please sign in to comment.