Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Use schema_hints as hints instead of definitive schema #1636

Merged
merged 5 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ class ScanOperatorHandle:
glob_path: list[str],
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
schema: PySchema | None = None,
schema_hint: PySchema | None = None,
) -> ScanOperatorHandle: ...
@staticmethod
def from_python_scan_operator(operator: ScanOperator) -> ScanOperatorHandle: ...
Expand Down Expand Up @@ -676,6 +676,7 @@ class PySchema:
def __getitem__(self, name: str) -> PyField: ...
def names(self) -> list[str]: ...
def union(self, other: PySchema) -> PySchema: ...
def apply_hints(self, other: PySchema) -> PySchema: ...
def eq(self, other: PySchema) -> bool: ...
@staticmethod
def from_field_name_and_types(names_and_types: list[tuple[str, PyDataType]]) -> PySchema: ...
Expand Down Expand Up @@ -955,9 +956,7 @@ class LogicalPlanBuilder:
partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, num_partitions: int
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan_with_scan_operator(
scan_operator: ScanOperatorHandle, schema_hint: PySchema | None
) -> LogicalPlanBuilder: ...
def table_scan_with_scan_operator(scan_operator: ScanOperatorHandle) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan(
file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, storage_config: StorageConfig
Expand Down
4 changes: 1 addition & 3 deletions daft/io/_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,5 @@
iceberg_operator = IcebergScanOperator(pyiceberg_table, storage_config=storage_config)

handle = ScanOperatorHandle.from_python_scan_operator(iceberg_operator)
builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator(
scan_operator=handle, schema_hint=iceberg_operator.schema()
)
builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator(scan_operator=handle)

Check warning on line 84 in daft/io/_iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/io/_iceberg.py#L84

Added line #L84 was not covered by tests
return DataFrame(builder)
19 changes: 9 additions & 10 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,38 +55,37 @@ def _get_tabular_files_scan(
path,
file_format_config,
storage_config,
schema=schema_hint._schema if schema_hint is not None else None,
schema_hint=schema_hint._schema if schema_hint is not None else None,
)
elif isinstance(path, str):
scan_op = ScanOperatorHandle.glob_scan(
[path],
file_format_config,
storage_config,
schema=schema_hint._schema if schema_hint is not None else None,
schema_hint=schema_hint._schema if schema_hint is not None else None,
)
else:
raise NotImplementedError(f"_get_tabular_files_scan cannot construct ScanOperatorHandle for input: {path}")

builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator(
scan_operator=scan_op,
schema_hint=schema_hint,
)
return builder

paths = path if isinstance(path, list) else [str(path)]
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config)

# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)
# Infer schema
schema = runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)

# Apply hints from schema_hints if provided
if schema_hint is not None:
schema = schema.apply_hints(schema_hint)
# Construct plan
builder = LogicalPlanBuilder.from_tabular_scan(
file_infos=file_infos,
schema=inferred_or_provided_schema,
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
)
Expand Down
4 changes: 1 addition & 3 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ def from_tabular_scan_with_scan_operator(
cls,
*,
scan_operator: ScanOperatorHandle,
schema_hint: Schema | None,
) -> LogicalPlanBuilder:
pyschema = schema_hint._schema if schema_hint is not None else None
builder = _LogicalPlanBuilder.table_scan_with_scan_operator(scan_operator, pyschema)
builder = _LogicalPlanBuilder.table_scan_with_scan_operator(scan_operator)
return cls(builder)

@classmethod
Expand Down
6 changes: 6 additions & 0 deletions daft/logical/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@

return Schema._from_pyschema(self._schema.union(other._schema))

def apply_hints(self, other: Schema) -> Schema:
if not isinstance(other, Schema):
raise ValueError(f"Expected Schema, got other: {type(other)}")

Check warning on line 146 in daft/logical/schema.py

View check run for this annotation

Codecov / codecov/patch

daft/logical/schema.py#L146

Added line #L146 was not covered by tests

return Schema._from_pyschema(self._schema.apply_hints(other._schema))

def __reduce__(self) -> tuple:
return Schema._from_pyschema, (self._schema,)

Expand Down
5 changes: 5 additions & 0 deletions src/daft-core/src/python/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ impl PySchema {
Ok(new_schema.into())
}

pub fn apply_hints(&self, hints: &PySchema) -> PyResult<PySchema> {
let new_schema = Arc::new(self.schema.apply_hints(&hints.schema)?);
Ok(new_schema.into())
}

pub fn eq(&self, other: &PySchema) -> PyResult<bool> {
Ok(self.schema.fields.eq(&other.schema.fields))
}
Expand Down
15 changes: 15 additions & 0 deletions src/daft-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ impl Schema {
}
}

pub fn apply_hints(&self, hints: &Schema) -> DaftResult<Schema> {
let applied_fields = self
.fields
.iter()
.map(|(name, field)| match hints.fields.get(name) {
None => (name.clone(), field.clone()),
Some(hint_field) => (name.clone(), hint_field.clone()),
})
.collect::<IndexMap<String, Field>>();

Ok(Schema {
fields: applied_fields,
})
}

pub fn to_arrow(&self) -> DaftResult<arrow2::datatypes::Schema> {
let arrow_fields: DaftResult<Vec<arrow2::datatypes::Field>> =
self.fields.iter().map(|(_, f)| f.to_arrow()).collect();
Expand Down
13 changes: 10 additions & 3 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ impl MicroPartition {
.map(|cols| cols.iter().map(|s| s.as_str()).collect::<Vec<&str>>());

let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice());

read_parquet_into_micropartition(
let mp = read_parquet_into_micropartition(
uris.as_slice(),
columns.as_deref(),
None,
Expand All @@ -399,7 +398,15 @@ impl MicroPartition {
coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit,
},
)
.context(DaftCoreComputeSnafu)
.context(DaftCoreComputeSnafu)?;

let applied_schema = Arc::new(
mp.schema
.apply_hints(&schema)
.context(DaftCoreComputeSnafu)?,
);
mp.cast_to_schema(applied_schema)
.context(DaftCoreComputeSnafu)
}

// CASE: Last resort fallback option
Expand Down
18 changes: 4 additions & 14 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,8 @@ impl LogicalPlanBuilder {
Ok(logical_plan.into())
}

pub fn table_scan_with_scan_operator(
scan_operator: ScanOperatorRef,
schema_hint: Option<SchemaRef>,
) -> DaftResult<Self> {
let schema = schema_hint.unwrap_or_else(|| scan_operator.0.schema());
pub fn table_scan_with_scan_operator(scan_operator: ScanOperatorRef) -> DaftResult<Self> {
let schema = scan_operator.0.schema();
let partitioning_keys = scan_operator.0.partitioning_keys();
let source_info =
SourceInfo::ExternalInfo(ExternalSourceInfo::Scan(ScanExternalInfo::new(
Expand Down Expand Up @@ -298,15 +295,8 @@ impl PyLogicalPlanBuilder {
}

#[staticmethod]
pub fn table_scan_with_scan_operator(
scan_operator: ScanOperatorHandle,
schema_hint: Option<PySchema>,
) -> PyResult<Self> {
Ok(LogicalPlanBuilder::table_scan_with_scan_operator(
scan_operator.into(),
schema_hint.map(|s| s.into()),
)?
.into())
pub fn table_scan_with_scan_operator(scan_operator: ScanOperatorHandle) -> PyResult<Self> {
Ok(LogicalPlanBuilder::table_scan_with_scan_operator(scan_operator.into())?.into())
}

#[staticmethod]
Expand Down
163 changes: 80 additions & 83 deletions src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl GlobScanOperator {
glob_paths: &[&str],
file_format_config: Arc<FileFormatConfig>,
storage_config: Arc<StorageConfig>,
schema: Option<SchemaRef>,
schema_hint: Option<SchemaRef>,
) -> DaftResult<Self> {
let first_glob_path = match glob_paths.first() {
None => Err(DaftError::ValueError(
Expand All @@ -130,97 +130,94 @@ impl GlobScanOperator {
Some(path) => Ok(path),
}?;

let schema = match schema {
Some(s) => s,
None => {
let (io_runtime, io_client) = get_io_client_and_runtime(storage_config.as_ref())?;
let (io_runtime, io_client) = get_io_client_and_runtime(storage_config.as_ref())?;
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator::try_new schema inference for {first_glob_path}"
));
let mut paths = run_glob(
first_glob_path,
Some(1),
io_client.clone(),
io_runtime.clone(),
Some(io_stats.clone()),
)?;
let FileMetadata {
filepath: first_filepath,
..
} = match paths.next() {
Some(file_metadata) => file_metadata,
None => Err(Error::GlobNoMatch {
glob_path: first_glob_path.to_string(),
}
.into()),
}?;
let inferred_schema = match file_format_config.as_ref() {
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
}) => {
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator::try_new schema inference for {first_glob_path}"
"GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}"
));
let mut paths = run_glob(
first_glob_path,
Some(1),
daft_parquet::read::read_parquet_schema(
first_filepath.as_str(),
io_client.clone(),
io_runtime.clone(),
Some(io_stats.clone()),
Some(io_stats),
ParquetSchemaInferenceOptions {
coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit,
},
)?
}
FileFormatConfig::Csv(CsvSourceConfig {
delimiter,
has_headers,
double_quote,
quote,
escape_char,
comment,
..
}) => {
let (schema, _) = daft_csv::metadata::read_csv_schema(
first_filepath.as_str(),
Some(CsvParseOptions::new_with_defaults(
*has_headers,
*delimiter,
*double_quote,
*quote,
*escape_char,
*comment,
)?),
None,
io_client,
Some(io_stats),
)?;
let FileMetadata {
filepath: first_filepath,
..
} = match paths.next() {
Some(file_metadata) => file_metadata,
None => Err(Error::GlobNoMatch {
glob_path: first_glob_path.to_string(),
}
.into()),
}?;
let inferred_schema = match file_format_config.as_ref() {
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
}) => {
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}"
));
daft_parquet::read::read_parquet_schema(
first_filepath.as_str(),
io_client.clone(),
Some(io_stats),
ParquetSchemaInferenceOptions {
coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit,
},
)?
schema
}
FileFormatConfig::Json(JsonSourceConfig {}) => {
// NOTE: Native JSON reads not yet implemented, so we have to delegate to Python here or implement
// a daft_json crate that gives us native JSON schema inference
match storage_config.as_ref() {
StorageConfig::Native(_) => {
todo!("Implement native JSON schema inference in a daft_json crate.")
}
FileFormatConfig::Csv(CsvSourceConfig {
delimiter,
has_headers,
double_quote,
quote,
escape_char,
comment,
..
}) => {
let (schema, _) = daft_csv::metadata::read_csv_schema(
#[cfg(feature = "python")]
StorageConfig::Python(_) => Python::with_gil(|py| {
crate::python::pylib::read_json_schema(
py,
first_filepath.as_str(),
Some(CsvParseOptions::new_with_defaults(
*has_headers,
*delimiter,
*double_quote,
*quote,
*escape_char,
*comment,
)?),
None,
io_client,
Some(io_stats),
)?;
schema
}
FileFormatConfig::Json(JsonSourceConfig {}) => {
// NOTE: Native JSON reads not yet implemented, so we have to delegate to Python here or implement
// a daft_json crate that gives us native JSON schema inference
match storage_config.as_ref() {
StorageConfig::Native(_) => todo!(
"Implement native JSON schema inference in a daft_json crate."
),
#[cfg(feature = "python")]
StorageConfig::Python(_) => Python::with_gil(|py| {
crate::python::pylib::read_json_schema(
py,
first_filepath.as_str(),
storage_config.clone().into(),
)
.and_then(|s| {
Ok(Schema::new(s.schema.fields.values().cloned().collect())?)
})
.context(PyIOSnafu)
})?,
}
}
};
Arc::new(inferred_schema)
storage_config.clone().into(),
)
.and_then(|s| Ok(Schema::new(s.schema.fields.values().cloned().collect())?))
.context(PyIOSnafu)
})?,
}
}
};

let schema = match schema_hint {
None => Arc::new(inferred_schema),
Some(schema_hint) => Arc::new(inferred_schema.apply_hints(&schema_hint)?),
};

Ok(Self {
glob_paths: glob_paths.iter().map(|s| s.to_string()).collect(),
file_format_config,
Expand Down
Loading
Loading