From 5f3de4da38c94e3d9e8d5832e95f672f6c1def8c Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Wed, 14 Feb 2024 14:40:06 -0800 Subject: [PATCH] add even more print statements --- daft/runners/ray_runner.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index dc07f4ca5d..dd410e6da4 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -518,12 +518,18 @@ def place_in_queue(item): except Full: pass + print("==== start plan run ====") + with profiler(profile_filename): + print("==== 1 ====") try: next_step = next(tasks) + print("==== 2 ====") while is_active(): # Loop: Dispatch -> await. + print("==== 3 ====") while is_active(): # Loop: Dispatch (get tasks -> batch dispatch). + print("==== 4 ====") tasks_to_dispatch: list[PartitionTask] = [] cores: int = max(next(num_cpus_provider) - self.reserved_cores, 0) @@ -533,6 +539,7 @@ def place_in_queue(item): # Loop: Get a batch of tasks. while len(tasks_to_dispatch) < dispatches_allowed and is_active(): + print("==== 5 ====") if next_step is None: # Blocked on already dispatched tasks; await some tasks. break @@ -557,6 +564,7 @@ def place_in_queue(item): # Add the task to the batch. tasks_to_dispatch.append(next_step) next_step = next(tasks) + print("==== 6 ====") # Dispatch the batch of tasks. logger.debug( @@ -568,6 +576,8 @@ def place_in_queue(item): if not is_active(): break + print("==== 7 ====") + for task in tasks_to_dispatch: results = _build_partitions(daft_execution_config, task) logger.debug("%s -> %s", task, results) @@ -577,12 +587,15 @@ def place_in_queue(item): pbar.mark_task_start(task) + print("==== 8 ====") + if dispatches_allowed == 0 or next_step is None: break # Await a batch of tasks. # (Awaits the next task, and then the next batch of tasks within 10ms.) + print("==== 9 ====") dispatch = datetime.now() completed_task_ids = [] for wait_for in ("next_one", "next_batch"): @@ -620,6 +633,7 @@ def place_in_queue(item): pbar.mark_task_done(task) del inflight_tasks[task_id] + print("==== 10 ====") logger.debug( "%ss to await results from %s", (datetime.now() - dispatch).total_seconds(), completed_task_ids @@ -628,6 +642,8 @@ def place_in_queue(item): if next_step is None: next_step = next(tasks) + print("==== 11 ====") + except StopIteration as e: place_in_queue(e) @@ -636,6 +652,8 @@ def place_in_queue(item): place_in_queue(e) pbar.close() raise + print("==== 12 ====") + print("==== 13 ====") pbar.close() @@ -772,6 +790,8 @@ def run_iter( else: result = self.scheduler.next(result_uuid) + print("==== Got result ====") + if isinstance(result, StopIteration): break elif isinstance(result, Exception):