diff --git a/daft/iceberg/iceberg_scan.py b/daft/iceberg/iceberg_scan.py index 4d26b954f1..8671b7cca1 100644 --- a/daft/iceberg/iceberg_scan.py +++ b/daft/iceberg/iceberg_scan.py @@ -111,6 +111,7 @@ def _make_scan_tasks(self) -> list[ScanTask]: schema=self._schema._schema, num_rows=record_count, storage_config=storage_config, + size_bytes=file.file_size_in_bytes, ) scan_tasks.append(st) return scan_tasks diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 81fdfc10f1..4abac4f6b1 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -6,7 +6,7 @@ use std::{ use common_error::{DaftError, DaftResult}; use daft_core::{datatypes::Field, schema::SchemaRef}; -use daft_dsl::{Expr, ExprRef, optimization::get_required_columns}; +use daft_dsl::{optimization::get_required_columns, Expr, ExprRef}; use daft_stats::{PartitionSpec, TableMetadata, TableStatistics}; use file_format::FileFormatConfig; use serde::{Deserialize, Serialize}; @@ -242,7 +242,6 @@ impl Display for PartitionField { } } - pub trait ScanOperator: Send + Sync + Display + Debug { fn schema(&self) -> SchemaRef; fn partitioning_keys(&self) -> &[PartitionField]; diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index d51bc3b814..a8a9c3919b 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -91,8 +91,9 @@ pub mod pylib { #[staticmethod] pub fn from_python_abc(py_scan: PyObject) -> PyResult { - let scan_op = - ScanOperatorRef(Arc::new(PythonScanOperatorBridge::from_python_abc(py_scan)?)); + let scan_op = ScanOperatorRef(Arc::new(PythonScanOperatorBridge::from_python_abc( + py_scan, + )?)); Ok(ScanOperatorHandle { scan_op }) } } @@ -212,6 +213,7 @@ pub mod pylib { schema: PySchema, num_rows: i64, storage_config: PyStorageConfig, + size_bytes: Option, columns: Option>, limit: Option, ) -> PyResult { @@ -221,14 +223,20 @@ pub mod pylib { let data_source = DataFileSource::CatalogDataFile { path: file, chunk_spec: None, - size_bytes: None, + size_bytes: size_bytes, metadata: TableMetadata { length: num_rows as usize, }, partition_spec: empty_pspec, statistics: None, }; - let scan_task = ScanTask::new(vec![data_source], file_format.into(), schema.schema, storage_config.into(), Pushdowns::default()); + let scan_task = ScanTask::new( + vec![data_source], + file_format.into(), + schema.schema, + storage_config.into(), + Pushdowns::default(), + ); Ok(PyScanTask(scan_task.into())) }