Skip to content

Commit

Permalink
[CHORE] [New Query Planner] Misc. user-facing error tweaks to improve…
Browse files Browse the repository at this point in the history
… UX. (#1358)

This PR makes misc. tweaks to user-facing error messages in order to
improve UX. E.g., adding column name context to dtype validation errors,
in order to make the messages more actionable.

This PR is stacked on top of
#1357, see the final commit for
the actual diff.

The only non-trivial is moving `DataType::get_exploded_dtype()` to
`Field::to_exploded_field()`, in order to have the column name context
for failure cases (e.g. the dtype doesn't support exploding).

As a driveby, this PR also fixes some suboptimal reprs for the new
logical plan, particular around `Vec<Expr>`.
  • Loading branch information
clarkzinzow authored Sep 12, 2023
1 parent db53efb commit fc10d6d
Show file tree
Hide file tree
Showing 22 changed files with 191 additions and 85 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ def concat(self, other: "DataFrame") -> "DataFrame":
"""
if self.schema() != other.schema():
raise ValueError(
f"DataFrames must have exactly the same schema for concatenation! Expected:\n{self.schema()}\n\nReceived:\n{other.schema()}"
f"DataFrames must have exactly the same schema for concatenation!\nExpected:\n{self.schema()}\n\nReceived:\n{other.schema()}"
)
builder = self._builder.concat(other._builder)
return DataFrame(builder)
Expand Down
28 changes: 28 additions & 0 deletions daft/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,37 @@


def setup_logger() -> None:
import inspect
import logging

from loguru import logger
from loguru._defaults import env

logger.remove()
LOGURU_LEVEL = env("LOGURU_LEVEL", str, "INFO")
logger.add(sys.stderr, level=LOGURU_LEVEL)

class InterceptHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
# Get corresponding Loguru level if it exists.
level: str | int
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno

# Find caller from where originated the logged message.
frame, depth = inspect.currentframe(), 0
while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
frame = frame.f_back
depth += 1

logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())

# Clear out any existing standard loggers.
root = logging.root
for h in root.handlers[:]:
root.removeHandler(h)
h.close()
# Add handler that redirects logs to loguru.
logging.basicConfig(handlers=[InterceptHandler()], level=0)
12 changes: 0 additions & 12 deletions src/daft-core/src/datatypes/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,18 +361,6 @@ impl DataType {
)
}

#[inline]
pub fn get_exploded_dtype(&self) -> DaftResult<&DataType> {
match self {
DataType::List(child_dtype) | DataType::FixedSizeList(child_dtype, _) => {
Ok(child_dtype.as_ref())
}
_ => Err(DaftError::ValueError(format!(
"Datatype cannot be exploded: {self}"
))),
}
}

pub fn to_json(&self) -> DaftResult<String> {
let payload = DataTypePayload::new(self);
Ok(serde_json::to_string(&payload)?)
Expand Down
18 changes: 17 additions & 1 deletion src/daft-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use arrow2::datatypes::Field as ArrowField;

use crate::datatypes::dtype::DataType;
use common_error::DaftResult;
use common_error::{DaftError, DaftResult};

use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -94,6 +94,22 @@ impl Field {
metadata: self.metadata.clone(),
})
}

pub fn to_exploded_field(&self) -> DaftResult<Self> {
match &self.dtype {
DataType::List(child_dtype) | DataType::FixedSizeList(child_dtype, _) => {
Ok(Self {
name: self.name.clone(),
dtype: child_dtype.as_ref().clone(),
metadata: self.metadata.clone(),
})
}
_ => Err(DaftError::ValueError(format!(
"Column \"{}\" with dtype {} cannot be exploded, must be a List or FixedSizeList column.",
self.name, self.dtype,
))),
}
}
}

impl From<&ArrowField> for Field {
Expand Down
8 changes: 4 additions & 4 deletions src/daft-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ impl Schema {
pub fn get_field(&self, name: &str) -> DaftResult<&Field> {
match self.fields.get(name) {
None => Err(DaftError::FieldNotFound(format!(
"Field: {} not found in {:?}",
"Column \"{}\" not found in schema: {:?}",
name,
self.fields.values()
self.fields.keys()
))),
Some(val) => Ok(val),
}
Expand All @@ -57,9 +57,9 @@ impl Schema {
pub fn get_index(&self, name: &str) -> DaftResult<usize> {
match self.fields.get_index_of(name) {
None => Err(DaftError::FieldNotFound(format!(
"Field: {} not found in {:?}",
"Column \"{}\" not found in schema: {:?}",
name,
self.fields.values()
self.fields.keys()
))),
Some(val) => Ok(val),
}
Expand Down
14 changes: 9 additions & 5 deletions src/daft-dsl/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,10 @@ impl AggExpr {
| DataType::UInt64 => DataType::UInt64,
DataType::Float32 => DataType::Float32,
DataType::Float64 => DataType::Float64,
_other => {
other => {
return Err(DaftError::TypeError(format!(
"Expected input to sum() to be numeric but received {field}",
"Expected input to sum() to be numeric but received dtype {} for column \"{}\"",
other, field.name,
)))
}
},
Expand All @@ -179,7 +180,8 @@ impl AggExpr {
| DataType::Float64 => DataType::Float64,
other => {
return Err(DaftError::TypeError(format!(
"Numeric mean is not implemented for type {other}"
"Numeric mean is not implemented for column \"{}\" of type {}",
field.name, other,
)))
}
},
Expand All @@ -190,7 +192,8 @@ impl AggExpr {
// TODO: [ISSUE-688] Make Binary type comparable
if field.dtype == DataType::Binary {
return Err(DaftError::TypeError(format!(
"Cannot get min/max of Binary type: {field}",
"Cannot get min/max of Binary type in column \"{}\"",
field.name,
)));
}
Ok(Field::new(field.name.as_str(), field.dtype))
Expand All @@ -203,7 +206,8 @@ impl AggExpr {
#[cfg(feature = "python")]
DataType::Python => Ok(field),
_ => Err(DaftError::TypeError(format!(
"We can only perform List Concat Agg on List or Python Types, got: {field}",
"We can only perform List Concat Agg on List or Python Types, got dtype {} for column \"{}\"",
field.dtype, field.name
))),
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/daft-dsl/src/functions/list/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ impl FunctionEvaluator for ExplodeEvaluator {
match inputs {
[input] => {
let field = input.to_field(schema)?;
let exploded_dtype = field.dtype.get_exploded_dtype()?;
Ok(Field::new(field.name, exploded_dtype.clone()))
field.to_exploded_field()
}
_ => Err(DaftError::SchemaMismatch(format!(
"Expected 1 input arg, got {}",
Expand Down
8 changes: 4 additions & 4 deletions src/daft-dsl/src/functions/list/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ impl FunctionEvaluator for JoinEvaluator {

match input_field.dtype {
DataType::List(_) | DataType::FixedSizeList(_, _) => {
let child_type = input_field.dtype.get_exploded_dtype().unwrap();
if child_type != &DataType::Utf8 {
return Err(DaftError::TypeError(format!("Expected input to be a list type with a Utf8 child, received child: {}", child_type)));
let exploded_field = input_field.to_exploded_field()?;
if exploded_field.dtype != DataType::Utf8 {
return Err(DaftError::TypeError(format!("Expected column \"{}\" to be a list type with a Utf8 child, received list type with child dtype {}", exploded_field.name, exploded_field.dtype)));
}
Ok(Field::new(input.name()?, DataType::Utf8))
Ok(exploded_field)
}
_ => Err(DaftError::TypeError(format!(
"Expected input to be a list type, received: {}",
Expand Down
2 changes: 2 additions & 0 deletions src/daft-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
indexmap = {workspace = true}
log = {workspace = true}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true, optional = true}
serde = {workspace = true, features = ["rc"]}
serde_json = {workspace = true}
snafu = {workspace = true}
Expand Down
22 changes: 21 additions & 1 deletion src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,27 @@ impl PyLogicalPlanBuilder {
pub fn optimize(&self) -> PyResult<Self> {
let optimizer = Optimizer::new(Default::default());
let unoptimized_plan = self.builder.build();
let optimized_plan = optimizer.optimize(unoptimized_plan, |_, _, _, _, _| {})?;
let optimized_plan = optimizer.optimize(
unoptimized_plan,
|new_plan, rule_batch, pass, transformed, seen| {
if transformed {
log::debug!(
"Rule batch {:?} transformed plan on pass {}, and produced {} plan:\n{}",
rule_batch,
pass,
if seen { "an already seen" } else { "a new" },
new_plan.repr_ascii(true),
);
} else {
log::debug!(
"Rule batch {:?} did NOT transform plan on pass {} for plan:\n{}",
rule_batch,
pass,
new_plan.repr_ascii(true),
);
}
},
)?;
let builder = LogicalPlanBuilder::new(optimized_plan);
Ok(builder.into())
}
Expand Down
18 changes: 16 additions & 2 deletions src/daft-plan/src/logical_ops/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,23 @@ impl Aggregate {

pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
res.push(format!("Aggregation: {:?}", self.aggregations));
res.push(format!(
"Aggregation: {}",
self.aggregations
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
));
if !self.groupby.is_empty() {
res.push(format!("Group by = {:?}", self.groupby));
res.push(format!(
"Group by = {}",
self.groupby
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
));
}
res.push(format!("Output schema = {}", self.schema().short_string()));
res
Expand Down
13 changes: 10 additions & 3 deletions src/daft-plan/src/logical_ops/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,19 @@ impl Sink {
}) => {
res.push(format!("Sink: {:?}", file_format));
if let Some(partition_cols) = partition_cols {
res.push(format!("Partition cols = {:?}", partition_cols));
res.push(format!(
"Partition cols = {}",
partition_cols
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
));
}
if let Some(compression) = compression {
res.push(format!("Compression = {:?}", compression));
res.push(format!("Compression = {}", compression));
}
res.push(format!("Root dir = {:?}", root_dir));
res.push(format!("Root dir = {}", root_dir));
}
}
res.push(format!("Output schema = {}", self.schema.short_string()));
Expand Down
18 changes: 13 additions & 5 deletions src/daft-plan/src/logical_ops/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ impl Source {
file_format_config,
storage_config,
}) => {
res.push(format!("Source: {:?}", file_format_config.var_name()));
for fp in file_infos.file_paths.iter() {
res.push(format!("File paths = {}", fp));
}
res.push(format!("Source: {}", file_format_config.var_name()));
res.push(format!(
"File paths = [{}]",
file_infos.file_paths.join(", ")
));
res.push(format!("File schema = {}", source_schema.short_string()));
res.push(format!("Format-specific config = {:?}", file_format_config));
res.push(format!("Storage config = {:?}", storage_config));
Expand All @@ -87,7 +88,14 @@ impl Source {
self.output_schema.short_string()
));
if !self.filters.is_empty() {
res.push(format!("Filters = {:?}", self.filters));
res.push(format!(
"Filters = {}",
self.filters
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
));
}
if let Some(limit) = self.limit {
res.push(format!("Limit = {}", limit));
Expand Down
9 changes: 8 additions & 1 deletion src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,14 @@ impl LogicalPlan {
Self::Filter(Filter { predicate, .. }) => vec![format!("Filter: {predicate}")],
Self::Limit(Limit { limit, .. }) => vec![format!("Limit: {limit}")],
Self::Explode(Explode { to_explode, .. }) => {
vec![format!("Explode: {to_explode:?}")]
vec![format!(
"Explode: {}",
to_explode
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
)]
}
Self::Sort(sort) => sort.multiline_display(),
Self::Repartition(repartition) => repartition.multiline_display(),
Expand Down
Loading

0 comments on commit fc10d6d

Please sign in to comment.