Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-ho committed Feb 26, 2024
1 parent deb9a8c commit 5f96728
Show file tree
Hide file tree
Showing 11 changed files with 18 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ jobs:
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":rotating_light: [CI] Iceberg Integration Tests <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|workflow> *FAILED on main* :rotating_light:"
"text": ":rotating_light: [CI] SQL Integration Tests <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|workflow> *FAILED on main* :rotating_light:"
}
}
]
Expand Down
1 change: 0 additions & 1 deletion daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
continue
rows_left -= record_count
scan_tasks.append(st)

return iter(scan_tasks)

def can_absorb_filter(self) -> bool:
Expand Down
3 changes: 0 additions & 3 deletions daft/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from __future__ import annotations

import logging
import pickle
import random
import statistics
from typing import Any, Callable

import pyarrow as pa

logger = logging.getLogger(__name__)

ARROW_VERSION = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric())


Expand Down
1 change: 0 additions & 1 deletion src/daft-dsl/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,6 @@ impl Expr {
let inner_sql = inner.to_sql()?;
Some(format!("{} IS NOT NULL", inner_sql))
}
// TODO: Implement SQL translations for these expressions
Expr::IfElse {
if_true,
if_false,
Expand Down
2 changes: 1 addition & 1 deletion src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ fn materialize_scan_task(
}
FileFormatConfig::Database(_) => {
return Err(common_error::DaftError::TypeError(
"Native reads for Database file format not yet implemented".to_string(),
"Native reads for Database file format not implemented".to_string(),
))
.context(DaftCoreComputeSnafu);
}
Expand Down
4 changes: 3 additions & 1 deletion src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,9 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftExecutionConfig>) -> DaftRe
input_physical.into(),
)))
}
_ => unimplemented!(),
FileFormat::Database => Err(common_error::DaftError::ValueError(
"Database sink not yet implemented".to_string(),
)),
}
}
}
Expand Down
36 changes: 6 additions & 30 deletions src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,62 +133,46 @@ pub enum DataFileSource {
partition_spec: PartitionSpec,
statistics: Option<TableStatistics>,
},
DatabaseDataFile {
path: String,
chunk_spec: Option<ChunkSpec>,
size_bytes: Option<u64>,
metadata: Option<TableMetadata>,
partition_spec: Option<PartitionSpec>,
statistics: Option<TableStatistics>,
},
}

impl DataFileSource {
pub fn get_path(&self) -> &str {
match self {
Self::AnonymousDataFile { path, .. }
| Self::CatalogDataFile { path, .. }
| Self::DatabaseDataFile { path, .. } => path,
Self::AnonymousDataFile { path, .. } | Self::CatalogDataFile { path, .. } => path,
}
}

pub fn get_chunk_spec(&self) -> Option<&ChunkSpec> {
match self {
Self::AnonymousDataFile { chunk_spec, .. }
| Self::CatalogDataFile { chunk_spec, .. }
| Self::DatabaseDataFile { chunk_spec, .. } => chunk_spec.as_ref(),
| Self::CatalogDataFile { chunk_spec, .. } => chunk_spec.as_ref(),
}
}

pub fn get_size_bytes(&self) -> Option<u64> {
match self {
Self::AnonymousDataFile { size_bytes, .. }
| Self::CatalogDataFile { size_bytes, .. }
| Self::DatabaseDataFile { size_bytes, .. } => *size_bytes,
| Self::CatalogDataFile { size_bytes, .. } => *size_bytes,
}
}

pub fn get_metadata(&self) -> Option<&TableMetadata> {
match self {
Self::AnonymousDataFile { metadata, .. } | Self::DatabaseDataFile { metadata, .. } => {
metadata.as_ref()
}
Self::AnonymousDataFile { metadata, .. } => metadata.as_ref(),
Self::CatalogDataFile { metadata, .. } => Some(metadata),
}
}

pub fn get_statistics(&self) -> Option<&TableStatistics> {
match self {
Self::AnonymousDataFile { statistics, .. }
| Self::CatalogDataFile { statistics, .. }
| Self::DatabaseDataFile { statistics, .. } => statistics.as_ref(),
| Self::CatalogDataFile { statistics, .. } => statistics.as_ref(),
}
}

pub fn get_partition_spec(&self) -> Option<&PartitionSpec> {
match self {
Self::AnonymousDataFile { partition_spec, .. }
| Self::DatabaseDataFile { partition_spec, .. } => partition_spec.as_ref(),
Self::AnonymousDataFile { partition_spec, .. } => partition_spec.as_ref(),
Self::CatalogDataFile { partition_spec, .. } => Some(partition_spec),
}
}
Expand All @@ -203,14 +187,6 @@ impl DataFileSource {
metadata,
partition_spec,
statistics,
}
| Self::DatabaseDataFile {
path,
chunk_spec,
size_bytes,
metadata,
partition_spec,
statistics,
} => {
res.push(format!("Path = {}", path));
if let Some(chunk_spec) = chunk_spec {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-scan/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ pub mod pylib {
storage_config: PyStorageConfig,
pushdowns: Option<PyPushdowns>,
) -> PyResult<Self> {
let data_source = DataFileSource::DatabaseDataFile {
let data_source = DataFileSource::AnonymousDataFile {
path: url,
chunk_spec: None,
size_bytes: None,
Expand Down
8 changes: 1 addition & 7 deletions src/daft-scan/src/scan_task_iters.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use common_error::{DaftError, DaftResult};
use common_error::DaftResult;
use daft_io::IOStatsContext;
use daft_parquet::read::read_parquet_metadata;

Expand Down Expand Up @@ -191,12 +191,6 @@ pub fn split_by_row_groups(
curr_row_groups = Vec::new();
curr_size_bytes = 0;
}
DataFileSource::DatabaseDataFile { .. } => {
return Err(DaftError::ValueError(
"Cannot split by row groups for database sources"
.to_string(),
));
}
};

new_tasks.push(Ok(ScanTask::new(
Expand Down
1 change: 1 addition & 0 deletions tests/integration/sql/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def generated_data() -> pd.DataFrame:
def test_db(request: pytest.FixtureRequest, generated_data: pd.DataFrame) -> Generator[str, None, None]:
db_url = request.param
if db_url.startswith("sqlite"):
# No docker container for sqlite, so we need to create a temporary file
with tempfile.NamedTemporaryFile(suffix=".db") as file:
db_url += file.name
setup_database(db_url, generated_data)
Expand Down
7 changes: 4 additions & 3 deletions tests/integration/sql/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,13 @@ def test_sql_read_with_all_pushdowns(test_db) -> None:


@pytest.mark.integration()
def test_sql_read_with_limit_pushdown(test_db) -> None:
@pytest.mark.parametrize("limit", [0, 1, 10, 100, 200])
def test_sql_read_with_limit_pushdown(test_db, limit) -> None:
df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", test_db)
df = df.limit(100)
df = df.limit(limit)

df = df.collect()
assert len(df) == 100
assert len(df) == limit


@pytest.mark.integration()
Expand Down

0 comments on commit 5f96728

Please sign in to comment.