From 9d20890e40079a065f9b77f8fe6716c7cc44249d Mon Sep 17 00:00:00 2001
From: Jay Chia <17691182+jaychia@users.noreply.github.com>
Date: Mon, 16 Oct 2023 15:19:12 +0900
Subject: [PATCH] [CHORE] Refactor logging (#1489)
1. Removes `loguru` as a dependency
2. Removes custom handler that forwards logs to loguru
3. Removes all custom formatting of our logs
4. Warning which runner is being used is only done now for when the
PyRunner is being used with an existing Ray connection
I did (3) because Daft is a library, and custom formatting of logs
should be done by the application. E.g. if a user was building a
webserver with their own custom logging setup, then Daft mangling log
formatting on the root logger would be very annoying.
This is what our logs look like now on a jupyter notebook:
Enabling more verbose logs at a higher level (e.g. INFO logs) is
performed by the user/embedding that uses Daft, e.g.
```python
import logging
logging.basicConfig(
format='%(asctime)s,%(msecs)03d %(levelname)-8s [%(pathname)s:%(lineno)d] %(message)s',
datefmt='%Y-%m-%d:%H:%M:%S',
level=logging.INFO,
)
# Outputs logs that look like:
# 2023-10-16:11:25:46,195 INFO [/Users/jaychia/.cargo/registry/src/index.crates.io-6f17d22bba15001f/aws-config-0.55.3/src/meta/region.rs:43] load_region; provider=EnvironmentVariableRegionProvider { env: Env(Real) }
```
Daft now respects normal Python logging and does not rely on loguru at
all to do any of this configurations. This lets us play much nicer with
applications that use normal Python logging.
---------
Co-authored-by: Jay Chia
---
benchmarking/tpch/__main__.py | 4 +-
benchmarking/tpch/data_generation.py | 5 ++-
.../tpch/pipelined_data_generation.py | 5 ++-
daft/__init__.py | 8 ----
daft/context.py | 20 ++++++++--
daft/dataframe/to_torch.py | 3 +-
daft/execution/physical_plan.py | 37 ++++++++----------
daft/filesystem.py | 4 +-
daft/internal/rule_runner.py | 5 ++-
daft/internal/treenode.py | 5 ++-
daft/logging.py | 39 -------------------
daft/logical/optimizer.py | 4 +-
daft/runners/profiler.py | 6 ++-
daft/runners/pyrunner.py | 15 ++++---
daft/runners/ray_runner.py | 10 ++---
daft/table/table.py | 11 ++----
daft/udf_library/url_udfs.py | 6 ++-
pyproject.toml | 1 -
src/daft-table/src/lib.rs | 9 ++++-
19 files changed, 86 insertions(+), 111 deletions(-)
delete mode 100644 daft/logging.py
diff --git a/benchmarking/tpch/__main__.py b/benchmarking/tpch/__main__.py
index 4be8e871e1..002d173c6b 100644
--- a/benchmarking/tpch/__main__.py
+++ b/benchmarking/tpch/__main__.py
@@ -3,6 +3,7 @@
import argparse
import contextlib
import csv
+import logging
import math
import os
import platform
@@ -13,7 +14,6 @@
from typing import Any, Callable
import ray
-from loguru import logger
import daft
from benchmarking.tpch import answers, data_generation
@@ -21,6 +21,8 @@
from daft.context import get_context
from daft.runners.profiler import profiler
+logger = logging.getLogger(__name__)
+
ALL_TABLES = [
"part",
"supplier",
diff --git a/benchmarking/tpch/data_generation.py b/benchmarking/tpch/data_generation.py
index d38af0c5c1..bc7f3ac576 100644
--- a/benchmarking/tpch/data_generation.py
+++ b/benchmarking/tpch/data_generation.py
@@ -1,6 +1,7 @@
from __future__ import annotations
import argparse
+import logging
import math
import os
import shlex
@@ -8,10 +9,10 @@
import subprocess
from glob import glob
-from loguru import logger
-
import daft
+logger = logging.getLogger(__name__)
+
SCHEMA = {
"part": [
"P_PARTKEY",
diff --git a/benchmarking/tpch/pipelined_data_generation.py b/benchmarking/tpch/pipelined_data_generation.py
index 1e08ee7a1a..3fa3f2b701 100644
--- a/benchmarking/tpch/pipelined_data_generation.py
+++ b/benchmarking/tpch/pipelined_data_generation.py
@@ -15,6 +15,7 @@
import argparse
import glob
+import logging
import os
import pathlib
import shlex
@@ -22,10 +23,10 @@
import subprocess
from multiprocessing import Pool
-from loguru import logger
-
from benchmarking.tpch.data_generation import gen_parquet
+logger = logging.getLogger(__name__)
+
STATIC_TABLES = ["nation", "region"]
diff --git a/daft/__init__.py b/daft/__init__.py
index 00b6a8ed4c..173173adb9 100644
--- a/daft/__init__.py
+++ b/daft/__init__.py
@@ -2,8 +2,6 @@
import os
-from daft.logging import setup_logger
-
###
# Set up code coverage for when running code coverage with ray
###
@@ -20,12 +18,6 @@
"Environ: {!r} "
"Exception: {!r}\n".format({k: v for k, v in os.environ.items() if k.startswith("COV_CORE")}, exc)
)
-###
-# Setup logging
-###
-
-
-setup_logger()
###
# Get build constants from Rust .so
diff --git a/daft/context.py b/daft/context.py
index 848b0d9612..b8273c7c0f 100644
--- a/daft/context.py
+++ b/daft/context.py
@@ -1,16 +1,17 @@
from __future__ import annotations
import dataclasses
+import logging
import os
import warnings
from typing import TYPE_CHECKING, ClassVar
-from loguru import logger
-
if TYPE_CHECKING:
from daft.logical.builder import LogicalPlanBuilder
from daft.runners.runner import Runner
+logger = logging.getLogger(__name__)
+
class _RunnerConfig:
name = ClassVar[str]
@@ -75,7 +76,6 @@ def runner(self) -> Runner:
if self.runner_config.name == "ray":
from daft.runners.ray_runner import RayRunner
- logger.info("Using RayRunner")
assert isinstance(self.runner_config, _RayRunnerConfig)
_RUNNER = RayRunner(
address=self.runner_config.address,
@@ -84,7 +84,19 @@ def runner(self) -> Runner:
elif self.runner_config.name == "py":
from daft.runners.pyrunner import PyRunner
- logger.info("Using PyRunner")
+ try:
+ import ray
+
+ if ray.is_initialized():
+ logger.warning(
+ "WARNING: Daft is NOT using Ray for execution!\n"
+ "Daft is using the PyRunner but we detected an active Ray connection. "
+ "If you intended to use the Daft RayRunner, please first run `daft.context.set_runner_ray()` "
+ "before executing Daft queries."
+ )
+ except ImportError:
+ pass
+
assert isinstance(self.runner_config, _PyRunnerConfig)
_RUNNER = PyRunner(use_thread_pool=self.runner_config.use_thread_pool)
diff --git a/daft/dataframe/to_torch.py b/daft/dataframe/to_torch.py
index f3b30b888f..1d637e83a4 100644
--- a/daft/dataframe/to_torch.py
+++ b/daft/dataframe/to_torch.py
@@ -1,8 +1,9 @@
from __future__ import annotations
+import logging
from typing import Any, Iterable, Iterator
-from loguru import logger
+logger = logging.getLogger(__name__)
try:
# When available, subclass from the newer torchdata DataPipes instead of torch Datasets.
diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py
index 1ff6de696c..e4bc572bc6 100644
--- a/daft/execution/physical_plan.py
+++ b/daft/execution/physical_plan.py
@@ -13,13 +13,12 @@
from __future__ import annotations
+import logging
import math
import pathlib
from collections import deque
from typing import Generator, Iterator, TypeVar, Union
-from loguru import logger
-
from daft.daft import (
FileFormat,
FileFormatConfig,
@@ -40,6 +39,8 @@
from daft.logical.schema import Schema
from daft.runners.partitioning import PartialPartitionMetadata
+logger = logging.getLogger(__name__)
+
PartitionT = TypeVar("PartitionT")
T = TypeVar("T")
@@ -123,7 +124,7 @@ def file_read(
except StopIteration:
if len(materializations) > 0:
- logger.debug("file_read blocked on completion of first source in: {sources}", sources=materializations)
+ logger.debug(f"file_read blocked on completion of first source in: {materializations}")
yield None
else:
return
@@ -231,10 +232,8 @@ def join(
if len(left_requests) + len(right_requests) > 0:
logger.debug(
"join blocked on completion of sources.\n"
- "Left sources: {left_requests}\n"
- "Right sources: {right_requests}",
- left_requests=left_requests,
- right_requests=right_requests,
+ f"Left sources: {left_requests}\n"
+ f"Right sources: {right_requests}",
)
yield None
@@ -339,7 +338,7 @@ def global_limit(
# (Optimization. If we are doing limit(0) and already have a partition executing to use for it, just wait.)
if remaining_rows == 0 and len(materializations) > 0:
- logger.debug("global_limit blocked on completion of: {source}", source=materializations[0])
+ logger.debug(f"global_limit blocked on completion of: {materializations[0]}")
yield None
continue
@@ -364,9 +363,7 @@ def global_limit(
except StopIteration:
if len(materializations) > 0:
- logger.debug(
- "global_limit blocked on completion of first source in: {sources}", sources=materializations
- )
+ logger.debug(f"global_limit blocked on completion of first source in: {materializations}")
yield None
else:
return
@@ -396,9 +393,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh
except StopIteration:
if len(materializations) > 0:
- logger.debug(
- "flatten_plan blocked on completion of first source in: {sources}", sources=materializations
- )
+ logger.debug(f"flatten_plan blocked on completion of first source in: {materializations}")
yield None
else:
return
@@ -427,7 +422,7 @@ def split(
yield step
while any(not _.done() for _ in materializations):
- logger.debug("split_to blocked on completion of all sources: {sources}", sources=materializations)
+ logger.debug(f"split_to blocked on completion of all sources: {materializations}")
yield None
splits_per_partition = deque([1 for _ in materializations])
@@ -517,7 +512,7 @@ def coalesce(
except StopIteration:
if len(materializations) > 0:
- logger.debug("coalesce blocked on completion of a task in: {sources}", sources=materializations)
+ logger.debug(f"coalesce blocked on completion of a task in: {materializations}")
yield None
else:
return
@@ -547,7 +542,7 @@ def reduce(
# All fanouts dispatched. Wait for all of them to materialize
# (since we need all of them to emit even a single reduce).
while any(not _.done() for _ in materializations):
- logger.debug("reduce blocked on completion of all sources in: {sources}", sources=materializations)
+ logger.debug(f"reduce blocked on completion of all sources in: {materializations}")
yield None
inputs_to_reduce = [deque(_.partitions()) for _ in materializations]
@@ -587,7 +582,7 @@ def sort(
sample_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
for source in source_materializations:
while not source.done():
- logger.debug("sort blocked on completion of source: {source}", source=source)
+ logger.debug(f"sort blocked on completion of source: {source}")
yield None
sample = (
@@ -606,7 +601,7 @@ def sort(
# Wait for samples to materialize.
while any(not _.done() for _ in sample_materializations):
- logger.debug("sort blocked on completion of all samples: {samples}", samples=sample_materializations)
+ logger.debug(f"sort blocked on completion of all samples: {sample_materializations}")
yield None
# Reduce the samples to get sort boundaries.
@@ -628,7 +623,7 @@ def sort(
# Wait for boundaries to materialize.
while not boundaries.done():
- logger.debug("sort blocked on completion of boundary partition: {boundaries}", boundaries=boundaries)
+ logger.debug(f"sort blocked on completion of boundary partition: {boundaries}")
yield None
# Create a range fanout plan.
@@ -699,7 +694,7 @@ def materialize(
except StopIteration:
if len(materializations) > 0:
- logger.debug("materialize blocked on completion of all sources: {sources}", sources=materializations)
+ logger.debug(f"materialize blocked on completion of all sources: {materializations}")
yield None
else:
return
diff --git a/daft/filesystem.py b/daft/filesystem.py
index b675c45ee4..d23747fb88 100644
--- a/daft/filesystem.py
+++ b/daft/filesystem.py
@@ -12,12 +12,12 @@
else:
from typing import Literal
+import logging
from typing import Any
import fsspec
import pyarrow as pa
from fsspec.registry import get_filesystem_class
-from loguru import logger
from pyarrow.fs import (
FileSystem,
FSSpecHandler,
@@ -28,6 +28,8 @@
from daft.daft import FileFormat, FileInfos, NativeStorageConfig, StorageConfig
from daft.table import Table
+logger = logging.getLogger(__name__)
+
_CACHED_FSES: dict[str, FileSystem] = {}
diff --git a/daft/internal/rule_runner.py b/daft/internal/rule_runner.py
index 22688790d0..0e0cb32249 100644
--- a/daft/internal/rule_runner.py
+++ b/daft/internal/rule_runner.py
@@ -1,13 +1,14 @@
from __future__ import annotations
+import logging
from dataclasses import dataclass
from typing import Generic, TypeVar
-from loguru import logger
-
from daft.internal.rule import Rule
from daft.internal.treenode import TreeNode
+logger = logging.getLogger(__name__)
+
TreeNodeType = TypeVar("TreeNodeType", bound="TreeNode")
diff --git a/daft/internal/treenode.py b/daft/internal/treenode.py
index 54fb4e30a3..9de36ac300 100644
--- a/daft/internal/treenode.py
+++ b/daft/internal/treenode.py
@@ -1,14 +1,15 @@
from __future__ import annotations
+import logging
import os
import typing
from typing import TYPE_CHECKING, Generic, List, TypeVar, cast
-from loguru import logger
-
if TYPE_CHECKING:
from daft.internal.rule import Rule
+logger = logging.getLogger(__name__)
+
TreeNodeType = TypeVar("TreeNodeType", bound="TreeNode")
diff --git a/daft/logging.py b/daft/logging.py
deleted file mode 100644
index 971849930b..0000000000
--- a/daft/logging.py
+++ /dev/null
@@ -1,39 +0,0 @@
-from __future__ import annotations
-
-import sys
-
-
-def setup_logger() -> None:
- import inspect
- import logging
-
- from loguru import logger
- from loguru._defaults import env
-
- logger.remove()
- LOGURU_LEVEL = env("LOGURU_LEVEL", str, "INFO")
- logger.add(sys.stderr, level=LOGURU_LEVEL)
-
- class InterceptHandler(logging.Handler):
- def filter(self, record: logging.LogRecord) -> bool:
- parent = super().filter(record)
- return parent or record.pathname.startswith("src/")
-
- def emit(self, record: logging.LogRecord) -> None:
- # Get corresponding Loguru level if it exists.
- level: str | int
- try:
- level = logger.level(record.levelname).name
- except ValueError:
- level = record.levelno
-
- # Find caller from where originated the logged message.
- frame, depth = inspect.currentframe(), 0
- while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
- frame = frame.f_back
- depth += 1
-
- logger.opt(depth=depth - 1, exception=record.exc_info).log(level, record.getMessage())
-
- logging.getLogger().setLevel(logger.level(LOGURU_LEVEL).no)
- logging.getLogger().addHandler(InterceptHandler())
diff --git a/daft/logical/optimizer.py b/daft/logical/optimizer.py
index 4320d834a8..6b4db7537a 100644
--- a/daft/logical/optimizer.py
+++ b/daft/logical/optimizer.py
@@ -1,6 +1,6 @@
from __future__ import annotations
-from loguru import logger
+import logging
from daft.daft import PartitionScheme, ResourceRequest
from daft.expressions import ExpressionsProjection, col
@@ -21,6 +21,8 @@
UnaryNode,
)
+logger = logging.getLogger(__name__)
+
class PushDownPredicates(Rule[LogicalPlan]):
"""Push Filter nodes down through its children when possible - run filters early to reduce amount of data processed"""
diff --git a/daft/runners/profiler.py b/daft/runners/profiler.py
index 095e6425f3..0336b29de2 100644
--- a/daft/runners/profiler.py
+++ b/daft/runners/profiler.py
@@ -1,17 +1,19 @@
from __future__ import annotations
+import logging
import os
from contextlib import contextmanager
from typing import TYPE_CHECKING
-from loguru import logger
-
if TYPE_CHECKING:
from viztracer import VizTracer
ACTIVE = False
+logger = logging.getLogger(__name__)
+
+
@contextmanager
def profiler(filename: str) -> VizTracer:
if int(os.environ.get("DAFT_PROFILING", 0)) == 1:
diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py
index f291727b13..2c34ae6b1f 100644
--- a/daft/runners/pyrunner.py
+++ b/daft/runners/pyrunner.py
@@ -1,12 +1,12 @@
from __future__ import annotations
+import logging
import multiprocessing
from concurrent import futures
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterable, Iterator
import psutil
-from loguru import logger
from daft.daft import (
FileFormatConfig,
@@ -36,6 +36,9 @@
import fsspec
+logger = logging.getLogger(__name__)
+
+
@dataclass
class LocalPartitionSet(PartitionSet[Table]):
_partitions: dict[PartID, Table]
@@ -200,15 +203,13 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP
and next_step.resource_request.num_gpus > 0
)
):
- logger.debug(
- "Running task synchronously in main thread: {next_step}", next_step=next_step
- )
+ logger.debug(f"Running task synchronously in main thread: {next_step}")
partitions = self.build_partitions(next_step.instructions, *next_step.inputs)
next_step.set_result([PyMaterializedResult(partition) for partition in partitions])
else:
# Submit the task for execution.
- logger.debug("Submitting task for execution: {next_step}", next_step=next_step)
+ logger.debug(f"Submitting task for execution: {next_step}")
future = thread_pool.submit(
self.build_partitions, next_step.instructions, *next_step.inputs
)
@@ -230,9 +231,7 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP
done_task = inflight_tasks.pop(done_id)
partitions = done_future.result()
- logger.debug(
- "Task completed: {done_id} -> {partitions}", done_id=done_id, partitions=partitions
- )
+ logger.debug(f"Task completed: {done_id} -> {partitions}")
done_task.set_result([PyMaterializedResult(partition) for partition in partitions])
if next_step is None:
diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py
index a0aa7dc31e..c4b97a1738 100644
--- a/daft/runners/ray_runner.py
+++ b/daft/runners/ray_runner.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+import logging
import threading
import time
import uuid
@@ -10,11 +11,12 @@
from typing import TYPE_CHECKING, Any, Generator, Iterable, Iterator
import pyarrow as pa
-from loguru import logger
from daft.logical.builder import LogicalPlanBuilder
from daft.planner import PhysicalPlanScheduler
+logger = logging.getLogger(__name__)
+
try:
import ray
except ImportError:
@@ -452,8 +454,6 @@ def _run_plan(
psets: dict[str, ray.ObjectRef],
result_uuid: str,
) -> None:
- from loguru import logger
-
# Get executable tasks from plan scheduler.
tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=True)
@@ -495,9 +495,7 @@ def _run_plan(
# If it is a no-op task, just run it locally immediately.
elif len(next_step.instructions) == 0:
- logger.debug(
- "Running task synchronously in main thread: {next_step}", next_step=next_step
- )
+ logger.debug(f"Running task synchronously in main thread: {next_step}")
assert isinstance(next_step, SingleOutputPartitionTask)
next_step.set_result(
[RayMaterializedResult(partition) for partition in next_step.inputs]
diff --git a/daft/table/table.py b/daft/table/table.py
index fb7366bfb8..ec865bd05d 100644
--- a/daft/table/table.py
+++ b/daft/table/table.py
@@ -1,10 +1,9 @@
from __future__ import annotations
-import sys
+import logging
from typing import TYPE_CHECKING, Any
import pyarrow as pa
-from loguru import logger
from daft.arrow_utils import ensure_table
from daft.daft import JoinType
@@ -20,11 +19,6 @@
from daft.logical.schema import Schema
from daft.series import Series
-if sys.version_info < (3, 8):
- pass
-else:
- pass
-
_NUMPY_AVAILABLE = True
try:
import numpy as np
@@ -45,6 +39,9 @@
from daft.io import IOConfig
+logger = logging.getLogger(__name__)
+
+
class Table:
_table: _PyTable
diff --git a/daft/udf_library/url_udfs.py b/daft/udf_library/url_udfs.py
index 989b87f016..b5a9c682d7 100644
--- a/daft/udf_library/url_udfs.py
+++ b/daft/udf_library/url_udfs.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+import logging
import sys
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -19,6 +20,9 @@
thread_local = threading.local()
+logger = logging.getLogger(__name__)
+
+
def _worker_thread_initializer() -> None:
"""Initializes per-thread local state"""
thread_local.filesystems_cache = {}
@@ -27,8 +31,6 @@ def _worker_thread_initializer() -> None:
def _download(
path: str | None, on_error: Literal["raise"] | Literal["null"], fs: fsspec.AbstractFileSystem | None
) -> bytes | None:
- from loguru import logger
-
if path is None:
return None
protocol = filesystem.get_protocol_from_path(path)
diff --git a/pyproject.toml b/pyproject.toml
index 56f5c604ef..8ddc1e9fcc 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,7 +7,6 @@ authors = [{name = "Eventual Inc", email = "daft@eventualcomputing.com"}]
dependencies = [
"pyarrow >= 6.0.1",
"fsspec[http]",
- "loguru",
"psutil",
"typing-extensions >= 4.0.0; python_version < '3.8'",
"pickle5 >= 0.0.12; python_version < '3.8'"
diff --git a/src/daft-table/src/lib.rs b/src/daft-table/src/lib.rs
index 98f2e9da54..1f4df77e47 100644
--- a/src/daft-table/src/lib.rs
+++ b/src/daft-table/src/lib.rs
@@ -498,7 +498,14 @@ impl Table {
let mut str_val = s.str_value(i).unwrap();
if let Some(max_col_width) = max_col_width {
if str_val.len() > max_col_width {
- str_val = format!("{}...", &str_val[..max_col_width - 3]);
+ str_val = format!(
+ "{}...",
+ &str_val
+ .char_indices()
+ .take(max_col_width - 3)
+ .map(|(_, c)| c)
+ .collect::()
+ );
}
}
str_val