Skip to content

Commit

Permalink
Cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 25, 2024
1 parent 69092df commit 4b4cc01
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 57 deletions.
4 changes: 2 additions & 2 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from daft.context import execution_config_ctx, get_context
from daft.daft import PyTable as _PyTable
from daft.dependencies import np
from daft.runners import tracer
from daft.runners.progress_bar import ProgressBar
from daft.runners.tracer import tracer
from daft.series import Series, item_to_series
from daft.table import Table

Expand Down Expand Up @@ -700,7 +700,7 @@ def place_in_queue(item):
except Full:
pass

with profiler(profile_filename), tracer.RunnerTracer(trace_filename) as runner_tracer:
with profiler(profile_filename), tracer(trace_filename) as runner_tracer:
wave_count = 0
try:
next_step = next(tasks)
Expand Down
91 changes: 36 additions & 55 deletions daft/runners/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,53 @@

import contextlib
import json
import os
import time
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, TextIO

if TYPE_CHECKING:
from daft import ResourceRequest

Check warning on line 10 in daft/runners/tracer.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/tracer.py#L10

Added line #L10 was not covered by tests


class RunnerTracer:
def __init__(self, filepath: str):
self._filepath = filepath
@contextlib.contextmanager
def tracer(filepath: str):
if int(os.environ.get("DAFT_RUNNER_TRACING", 0)) == 1:
with open(filepath, "w") as f:

Check warning on line 16 in daft/runners/tracer.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/tracer.py#L16

Added line #L16 was not covered by tests
# Initialize the JSON file
f.write("[")

Check warning on line 18 in daft/runners/tracer.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/tracer.py#L18

Added line #L18 was not covered by tests

def __enter__(self) -> RunnerTracer:
self._file = open(self._filepath, "w")
self._file.write("[")
self._start = time.time()
return self
# Yield the tracer
runner_tracer = RunnerTracer(f)
yield runner_tracer

Check warning on line 22 in daft/runners/tracer.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/tracer.py#L21-L22

Added lines #L21 - L22 were not covered by tests

# Add the final touches to the file
f.write(

Check warning on line 25 in daft/runners/tracer.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/tracer.py#L25

Added line #L25 was not covered by tests
json.dumps({"name": "process_name", "ph": "M", "pid": 1, "args": {"name": "RayRunner dispatch loop"}})
)
f.write(",\n")
f.write(json.dumps({"name": "process_name", "ph": "M", "pid": 2, "args": {"name": "Ray Task Execution"}}))
f.write("\n]")

Check warning on line 30 in daft/runners/tracer.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/tracer.py#L28-L30

Added lines #L28 - L30 were not covered by tests
else:
runner_tracer = RunnerTracer(None)
yield runner_tracer

def __exit__(self, exc_type, exc_value, traceback):
self._file.write(
json.dumps({"name": "process_name", "ph": "M", "pid": 1, "args": {"name": "RayRunner dispatch loop"}})
)
self._file.write(",\n")
self._file.write(
json.dumps({"name": "process_name", "ph": "M", "pid": 2, "args": {"name": "Ray Task Execution"}})
)
self._file.write("\n]")
self._file.close()

class RunnerTracer:
def __init__(self, file: TextIO | None):
self._file = file
self._start = time.time()

def _write_event(self, event: dict[str, Any]):
self._file.write(
json.dumps(
{
**event,
"ts": int((time.time() - self._start) * 1000 * 1000),
}
if self._file is not None:
self._file.write(

Check warning on line 43 in daft/runners/tracer.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/tracer.py#L43

Added line #L43 was not covered by tests
json.dumps(
{
**event,
"ts": int((time.time() - self._start) * 1000 * 1000),
}
)
)
)
self._file.write(",\n")
self._file.write(",\n")

Check warning on line 51 in daft/runners/tracer.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/tracer.py#L51

Added line #L51 was not covered by tests

@contextlib.contextmanager
def dispatch_wave(self, wave_num: int):
Expand Down Expand Up @@ -70,16 +79,6 @@ def metrics_updater(**kwargs):
}
)

# def dispatch_wave_metrics(self, metrics: dict[str, int]):
# """Marks a counter event for various runner counters such as num cores, max inflight tasks etc"""
# self._write_event({
# "name": "dispatch_metrics",
# "ph": "C",
# "pid": 1,
# "tid": 1,
# "args": metrics,
# })

def count_inflight_tasks(self, count: int):
self._write_event(
{
Expand Down Expand Up @@ -116,15 +115,6 @@ def dispatch_batching(self):
}
)

# def count_dispatch_batch_size(self, dispatch_batch_size: int):
# self._write_event({
# "name": "dispatch_batch_size",
# "ph": "C",
# "pid": 1,
# "tid": 1,
# "args": {"dispatch_batch_size": dispatch_batch_size},
# })

def mark_noop_task_start(self):
"""Marks the start of running a no-op task"""
self._write_event(
Expand Down Expand Up @@ -248,15 +238,6 @@ def awaiting(self):
}
)

# def count_num_ready(self, num_ready: int):
# self._write_event({
# "name": "awaiting",
# "ph": "C",
# "pid": 1,
# "tid": 1,
# "args": {"num_ready": num_ready},
# })

###
# Tracing each individual task as an Async Event
###
Expand Down

0 comments on commit 4b4cc01

Please sign in to comment.