Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Refactor logging #1489

Merged
merged 5 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion benchmarking/tpch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
import contextlib
import csv
import logging
import math
import os
import platform
Expand All @@ -13,14 +14,15 @@
from typing import Any, Callable

import ray
from loguru import logger

import daft
from benchmarking.tpch import answers, data_generation
from daft import DataFrame
from daft.context import get_context
from daft.runners.profiler import profiler

logger = logging.getLogger(__name__)

ALL_TABLES = [
"part",
"supplier",
Expand Down
5 changes: 3 additions & 2 deletions benchmarking/tpch/data_generation.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from __future__ import annotations

import argparse
import logging
import math
import os
import shlex
import sqlite3
import subprocess
from glob import glob

from loguru import logger

import daft

logger = logging.getLogger(__name__)

SCHEMA = {
"part": [
"P_PARTKEY",
Expand Down
5 changes: 3 additions & 2 deletions benchmarking/tpch/pipelined_data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@

import argparse
import glob
import logging
import os
import pathlib
import shlex
import shutil
import subprocess
from multiprocessing import Pool

from loguru import logger

from benchmarking.tpch.data_generation import gen_parquet

logger = logging.getLogger(__name__)

STATIC_TABLES = ["nation", "region"]


Expand Down
8 changes: 0 additions & 8 deletions daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import os

from daft.logging import setup_logger

###
# Set up code coverage for when running code coverage with ray
###
Expand All @@ -20,12 +18,6 @@
"Environ: {!r} "
"Exception: {!r}\n".format({k: v for k, v in os.environ.items() if k.startswith("COV_CORE")}, exc)
)
###
# Setup logging
###


setup_logger()

###
# Get build constants from Rust .so
Expand Down
20 changes: 16 additions & 4 deletions daft/context.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from __future__ import annotations

import dataclasses
import logging
import os
import warnings
from typing import TYPE_CHECKING, ClassVar

from loguru import logger

if TYPE_CHECKING:
from daft.logical.builder import LogicalPlanBuilder
from daft.runners.runner import Runner

logger = logging.getLogger(__name__)


class _RunnerConfig:
name = ClassVar[str]
Expand Down Expand Up @@ -75,7 +76,6 @@
if self.runner_config.name == "ray":
from daft.runners.ray_runner import RayRunner

logger.info("Using RayRunner")
assert isinstance(self.runner_config, _RayRunnerConfig)
_RUNNER = RayRunner(
address=self.runner_config.address,
Expand All @@ -84,7 +84,19 @@
elif self.runner_config.name == "py":
from daft.runners.pyrunner import PyRunner

logger.info("Using PyRunner")
try:
import ray

if ray.is_initialized():
logger.warning(

Check warning on line 91 in daft/context.py

View check run for this annotation

Codecov / codecov/patch

daft/context.py#L91

Added line #L91 was not covered by tests
"WARNING: Daft is NOT using Ray for execution!\n"
"Daft is using the PyRunner but we detected an active Ray connection. "
"If you intended to use the Daft RayRunner, please first run `daft.context.set_runner_ray()` "
"before executing Daft queries."
)
except ImportError:
pass

Check warning on line 98 in daft/context.py

View check run for this annotation

Codecov / codecov/patch

daft/context.py#L97-L98

Added lines #L97 - L98 were not covered by tests

assert isinstance(self.runner_config, _PyRunnerConfig)
_RUNNER = PyRunner(use_thread_pool=self.runner_config.use_thread_pool)

Expand Down
3 changes: 2 additions & 1 deletion daft/dataframe/to_torch.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import logging

Check warning on line 3 in daft/dataframe/to_torch.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/to_torch.py#L3

Added line #L3 was not covered by tests
from typing import Any, Iterable, Iterator

from loguru import logger
logger = logging.getLogger(__name__)

Check warning on line 6 in daft/dataframe/to_torch.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/to_torch.py#L6

Added line #L6 was not covered by tests

try:
# When available, subclass from the newer torchdata DataPipes instead of torch Datasets.
Expand Down
37 changes: 16 additions & 21 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@

from __future__ import annotations

import logging
import math
import pathlib
from collections import deque
from typing import Generator, Iterator, TypeVar, Union

from loguru import logger

from daft.daft import (
FileFormat,
FileFormatConfig,
Expand All @@ -40,6 +39,8 @@
from daft.logical.schema import Schema
from daft.runners.partitioning import PartialPartitionMetadata

logger = logging.getLogger(__name__)

PartitionT = TypeVar("PartitionT")
T = TypeVar("T")

Expand Down Expand Up @@ -123,7 +124,7 @@

except StopIteration:
if len(materializations) > 0:
logger.debug("file_read blocked on completion of first source in: {sources}", sources=materializations)
logger.debug(f"file_read blocked on completion of first source in: {materializations}")

Check warning on line 127 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L127

Added line #L127 was not covered by tests
yield None
else:
return
Expand Down Expand Up @@ -231,10 +232,8 @@
if len(left_requests) + len(right_requests) > 0:
logger.debug(
"join blocked on completion of sources.\n"
"Left sources: {left_requests}\n"
"Right sources: {right_requests}",
left_requests=left_requests,
right_requests=right_requests,
f"Left sources: {left_requests}\n"
f"Right sources: {right_requests}",
)
yield None

Expand Down Expand Up @@ -339,7 +338,7 @@

# (Optimization. If we are doing limit(0) and already have a partition executing to use for it, just wait.)
if remaining_rows == 0 and len(materializations) > 0:
logger.debug("global_limit blocked on completion of: {source}", source=materializations[0])
logger.debug(f"global_limit blocked on completion of: {materializations[0]}")

Check warning on line 341 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L341

Added line #L341 was not covered by tests
yield None
continue

Expand All @@ -364,9 +363,7 @@

except StopIteration:
if len(materializations) > 0:
logger.debug(
"global_limit blocked on completion of first source in: {sources}", sources=materializations
)
logger.debug(f"global_limit blocked on completion of first source in: {materializations}")
yield None
else:
return
Expand Down Expand Up @@ -396,9 +393,7 @@

except StopIteration:
if len(materializations) > 0:
logger.debug(
"flatten_plan blocked on completion of first source in: {sources}", sources=materializations
)
logger.debug(f"flatten_plan blocked on completion of first source in: {materializations}")
yield None
else:
return
Expand Down Expand Up @@ -427,7 +422,7 @@
yield step

while any(not _.done() for _ in materializations):
logger.debug("split_to blocked on completion of all sources: {sources}", sources=materializations)
logger.debug(f"split_to blocked on completion of all sources: {materializations}")

Check warning on line 425 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L425

Added line #L425 was not covered by tests
yield None

splits_per_partition = deque([1 for _ in materializations])
Expand Down Expand Up @@ -517,7 +512,7 @@

except StopIteration:
if len(materializations) > 0:
logger.debug("coalesce blocked on completion of a task in: {sources}", sources=materializations)
logger.debug(f"coalesce blocked on completion of a task in: {materializations}")
yield None
else:
return
Expand Down Expand Up @@ -547,7 +542,7 @@
# All fanouts dispatched. Wait for all of them to materialize
# (since we need all of them to emit even a single reduce).
while any(not _.done() for _ in materializations):
logger.debug("reduce blocked on completion of all sources in: {sources}", sources=materializations)
logger.debug(f"reduce blocked on completion of all sources in: {materializations}")
yield None

inputs_to_reduce = [deque(_.partitions()) for _ in materializations]
Expand Down Expand Up @@ -587,7 +582,7 @@
sample_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
for source in source_materializations:
while not source.done():
logger.debug("sort blocked on completion of source: {source}", source=source)
logger.debug(f"sort blocked on completion of source: {source}")
yield None

sample = (
Expand All @@ -606,7 +601,7 @@

# Wait for samples to materialize.
while any(not _.done() for _ in sample_materializations):
logger.debug("sort blocked on completion of all samples: {samples}", samples=sample_materializations)
logger.debug(f"sort blocked on completion of all samples: {sample_materializations}")
yield None

# Reduce the samples to get sort boundaries.
Expand All @@ -628,7 +623,7 @@

# Wait for boundaries to materialize.
while not boundaries.done():
logger.debug("sort blocked on completion of boundary partition: {boundaries}", boundaries=boundaries)
logger.debug(f"sort blocked on completion of boundary partition: {boundaries}")
yield None

# Create a range fanout plan.
Expand Down Expand Up @@ -699,7 +694,7 @@

except StopIteration:
if len(materializations) > 0:
logger.debug("materialize blocked on completion of all sources: {sources}", sources=materializations)
logger.debug(f"materialize blocked on completion of all sources: {materializations}")
yield None
else:
return
Expand Down
4 changes: 3 additions & 1 deletion daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
else:
from typing import Literal

import logging
from typing import Any

import fsspec
import pyarrow as pa
from fsspec.registry import get_filesystem_class
from loguru import logger
from pyarrow.fs import (
FileSystem,
FSSpecHandler,
Expand All @@ -28,6 +28,8 @@
from daft.daft import FileFormat, FileInfos, NativeStorageConfig, StorageConfig
from daft.table import Table

logger = logging.getLogger(__name__)

_CACHED_FSES: dict[str, FileSystem] = {}


Expand Down
5 changes: 3 additions & 2 deletions daft/internal/rule_runner.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from __future__ import annotations

import logging

Check warning on line 3 in daft/internal/rule_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/internal/rule_runner.py#L3

Added line #L3 was not covered by tests
from dataclasses import dataclass
from typing import Generic, TypeVar

from loguru import logger

from daft.internal.rule import Rule
from daft.internal.treenode import TreeNode

logger = logging.getLogger(__name__)

Check warning on line 10 in daft/internal/rule_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/internal/rule_runner.py#L10

Added line #L10 was not covered by tests

TreeNodeType = TypeVar("TreeNodeType", bound="TreeNode")


Expand Down
5 changes: 3 additions & 2 deletions daft/internal/treenode.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from __future__ import annotations

import logging
import os
import typing
from typing import TYPE_CHECKING, Generic, List, TypeVar, cast

from loguru import logger

if TYPE_CHECKING:
from daft.internal.rule import Rule

logger = logging.getLogger(__name__)

TreeNodeType = TypeVar("TreeNodeType", bound="TreeNode")


Expand Down
39 changes: 0 additions & 39 deletions daft/logging.py

This file was deleted.

Loading
Loading