From 5f96728c8c9bd02cc3eef272ea2def5c1d4045c4 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 26 Feb 2024 09:35:39 -0800 Subject: [PATCH] cleanup --- .github/workflows/python-package.yml | 2 +- daft/iceberg/iceberg_scan.py | 1 - daft/utils.py | 3 -- src/daft-dsl/src/expr.rs | 1 - src/daft-micropartition/src/micropartition.rs | 2 +- src/daft-plan/src/planner.rs | 4 ++- src/daft-scan/src/lib.rs | 36 ++++--------------- src/daft-scan/src/python.rs | 2 +- src/daft-scan/src/scan_task_iters.rs | 8 +---- tests/integration/sql/conftest.py | 1 + tests/integration/sql/test_sql.py | 7 ++-- 11 files changed, 18 insertions(+), 49 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 7a53add4be..1ce65e8be0 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -546,7 +546,7 @@ jobs: "type": "section", "text": { "type": "mrkdwn", - "text": ":rotating_light: [CI] Iceberg Integration Tests <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|workflow> *FAILED on main* :rotating_light:" + "text": ":rotating_light: [CI] SQL Integration Tests <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|workflow> *FAILED on main* :rotating_light:" } } ] diff --git a/daft/iceberg/iceberg_scan.py b/daft/iceberg/iceberg_scan.py index 84bf2e69d9..f660893110 100644 --- a/daft/iceberg/iceberg_scan.py +++ b/daft/iceberg/iceberg_scan.py @@ -169,7 +169,6 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]: continue rows_left -= record_count scan_tasks.append(st) - return iter(scan_tasks) def can_absorb_filter(self) -> bool: diff --git a/daft/utils.py b/daft/utils.py index 8580323f42..a8efc9bf2e 100644 --- a/daft/utils.py +++ b/daft/utils.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import pickle import random import statistics @@ -8,8 +7,6 @@ import pyarrow as pa -logger = logging.getLogger(__name__) - ARROW_VERSION = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) diff --git a/src/daft-dsl/src/expr.rs b/src/daft-dsl/src/expr.rs index 825e6494a7..50c6b02278 100644 --- a/src/daft-dsl/src/expr.rs +++ b/src/daft-dsl/src/expr.rs @@ -610,7 +610,6 @@ impl Expr { let inner_sql = inner.to_sql()?; Some(format!("{} IS NOT NULL", inner_sql)) } - // TODO: Implement SQL translations for these expressions Expr::IfElse { if_true, if_false, diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 6159018598..cf7bc10250 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -213,7 +213,7 @@ fn materialize_scan_task( } FileFormatConfig::Database(_) => { return Err(common_error::DaftError::TypeError( - "Native reads for Database file format not yet implemented".to_string(), + "Native reads for Database file format not implemented".to_string(), )) .context(DaftCoreComputeSnafu); } diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 9914111e61..e4e2ddb349 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -724,7 +724,9 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe input_physical.into(), ))) } - _ => unimplemented!(), + FileFormat::Database => Err(common_error::DaftError::ValueError( + "Database sink not yet implemented".to_string(), + )), } } } diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 752cd2d761..449a5d9acd 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -133,46 +133,32 @@ pub enum DataFileSource { partition_spec: PartitionSpec, statistics: Option, }, - DatabaseDataFile { - path: String, - chunk_spec: Option, - size_bytes: Option, - metadata: Option, - partition_spec: Option, - statistics: Option, - }, } impl DataFileSource { pub fn get_path(&self) -> &str { match self { - Self::AnonymousDataFile { path, .. } - | Self::CatalogDataFile { path, .. } - | Self::DatabaseDataFile { path, .. } => path, + Self::AnonymousDataFile { path, .. } | Self::CatalogDataFile { path, .. } => path, } } pub fn get_chunk_spec(&self) -> Option<&ChunkSpec> { match self { Self::AnonymousDataFile { chunk_spec, .. } - | Self::CatalogDataFile { chunk_spec, .. } - | Self::DatabaseDataFile { chunk_spec, .. } => chunk_spec.as_ref(), + | Self::CatalogDataFile { chunk_spec, .. } => chunk_spec.as_ref(), } } pub fn get_size_bytes(&self) -> Option { match self { Self::AnonymousDataFile { size_bytes, .. } - | Self::CatalogDataFile { size_bytes, .. } - | Self::DatabaseDataFile { size_bytes, .. } => *size_bytes, + | Self::CatalogDataFile { size_bytes, .. } => *size_bytes, } } pub fn get_metadata(&self) -> Option<&TableMetadata> { match self { - Self::AnonymousDataFile { metadata, .. } | Self::DatabaseDataFile { metadata, .. } => { - metadata.as_ref() - } + Self::AnonymousDataFile { metadata, .. } => metadata.as_ref(), Self::CatalogDataFile { metadata, .. } => Some(metadata), } } @@ -180,15 +166,13 @@ impl DataFileSource { pub fn get_statistics(&self) -> Option<&TableStatistics> { match self { Self::AnonymousDataFile { statistics, .. } - | Self::CatalogDataFile { statistics, .. } - | Self::DatabaseDataFile { statistics, .. } => statistics.as_ref(), + | Self::CatalogDataFile { statistics, .. } => statistics.as_ref(), } } pub fn get_partition_spec(&self) -> Option<&PartitionSpec> { match self { - Self::AnonymousDataFile { partition_spec, .. } - | Self::DatabaseDataFile { partition_spec, .. } => partition_spec.as_ref(), + Self::AnonymousDataFile { partition_spec, .. } => partition_spec.as_ref(), Self::CatalogDataFile { partition_spec, .. } => Some(partition_spec), } } @@ -203,14 +187,6 @@ impl DataFileSource { metadata, partition_spec, statistics, - } - | Self::DatabaseDataFile { - path, - chunk_spec, - size_bytes, - metadata, - partition_spec, - statistics, } => { res.push(format!("Path = {}", path)); if let Some(chunk_spec) = chunk_spec { diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index 78e6fbffa2..2f465c1eca 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -310,7 +310,7 @@ pub mod pylib { storage_config: PyStorageConfig, pushdowns: Option, ) -> PyResult { - let data_source = DataFileSource::DatabaseDataFile { + let data_source = DataFileSource::AnonymousDataFile { path: url, chunk_spec: None, size_bytes: None, diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index a627d89105..fe3d470a2f 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use common_error::{DaftError, DaftResult}; +use common_error::DaftResult; use daft_io::IOStatsContext; use daft_parquet::read::read_parquet_metadata; @@ -191,12 +191,6 @@ pub fn split_by_row_groups( curr_row_groups = Vec::new(); curr_size_bytes = 0; } - DataFileSource::DatabaseDataFile { .. } => { - return Err(DaftError::ValueError( - "Cannot split by row groups for database sources" - .to_string(), - )); - } }; new_tasks.push(Ok(ScanTask::new( diff --git a/tests/integration/sql/conftest.py b/tests/integration/sql/conftest.py index f0d5b8029a..a2864fac39 100644 --- a/tests/integration/sql/conftest.py +++ b/tests/integration/sql/conftest.py @@ -47,6 +47,7 @@ def generated_data() -> pd.DataFrame: def test_db(request: pytest.FixtureRequest, generated_data: pd.DataFrame) -> Generator[str, None, None]: db_url = request.param if db_url.startswith("sqlite"): + # No docker container for sqlite, so we need to create a temporary file with tempfile.NamedTemporaryFile(suffix=".db") as file: db_url += file.name setup_database(db_url, generated_data) diff --git a/tests/integration/sql/test_sql.py b/tests/integration/sql/test_sql.py index 9ea6cda088..84d814c9dd 100644 --- a/tests/integration/sql/test_sql.py +++ b/tests/integration/sql/test_sql.py @@ -97,12 +97,13 @@ def test_sql_read_with_all_pushdowns(test_db) -> None: @pytest.mark.integration() -def test_sql_read_with_limit_pushdown(test_db) -> None: +@pytest.mark.parametrize("limit", [0, 1, 10, 100, 200]) +def test_sql_read_with_limit_pushdown(test_db, limit) -> None: df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", test_db) - df = df.limit(100) + df = df.limit(limit) df = df.collect() - assert len(df) == 100 + assert len(df) == limit @pytest.mark.integration()