Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] read_sql #1943

Merged
merged 30 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
dbad9a3
init
colin-ho Feb 22, 2024
1e86d0f
int tests
colin-ho Feb 23, 2024
7d05a83
sql alchemy version
colin-ho Feb 23, 2024
dd9ebbf
fix test
colin-ho Feb 23, 2024
f501dd1
retry
colin-ho Feb 23, 2024
fa085de
retry all
colin-ho Feb 23, 2024
71af666
add try block
colin-ho Feb 23, 2024
c836819
move retries out of fixture
colin-ho Feb 24, 2024
f24fa9d
move retries out of fixture
colin-ho Feb 24, 2024
1237098
add text to query
colin-ho Feb 24, 2024
f05ce21
add text to query
colin-ho Feb 24, 2024
2cb90fa
fix assertion
colin-ho Feb 24, 2024
f1836f1
yay micropartitions always 1
colin-ho Feb 24, 2024
dfc0fe9
add more integration tests + refactor
colin-ho Feb 26, 2024
2d51864
cleanup
colin-ho Feb 26, 2024
f53bbb2
everything except limit 0
colin-ho Feb 28, 2024
eab61b0
fix math
colin-ho Feb 29, 2024
7e86e43
to_sql_inner
colin-ho Mar 1, 2024
2f48c55
rename to apply_limit_before_offset
colin-ho Mar 1, 2024
ff389d3
docs
colin-ho Mar 1, 2024
ccf1c4b
improve literal to_sql, use more equality tests, and add todos
colin-ho Mar 4, 2024
ff41a78
fix stuff from merge conflict
colin-ho Mar 4, 2024
b077bc5
disable pushdowns in sql reader
colin-ho Mar 4, 2024
3c49a8a
disable pushdowns in sql reader
colin-ho Mar 4, 2024
f7ec4c9
revise partioning algo
colin-ho Mar 6, 2024
2ca97fd
refactor
colin-ho Mar 6, 2024
7ac4131
refactor some string args
colin-ho Mar 6, 2024
5be0cd2
add datetime support
colin-ho Mar 6, 2024
0a85439
comment about timestamp
colin-ho Mar 6, 2024
bd65b05
refactor and add limit pushdown
colin-ho Mar 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,75 @@ jobs:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

integration-test-sql:
runs-on: ubuntu-latest
timeout-minutes: 30
needs:
- integration-test-build
env:
package-name: getdaft
strategy:
fail-fast: false
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
steps:
- uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 0
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: pip
cache-dependency-path: |
pyproject.toml
requirements-dev.txt
- name: Download built wheels
uses: actions/download-artifact@v3
with:
name: wheels
path: dist
- name: Setup Virtual Env
run: |
python -m venv venv
echo "$GITHUB_WORKSPACE/venv/bin" >> $GITHUB_PATH
- name: Install Daft and dev dependencies
run: |
pip install --upgrade pip
pip install -r requirements-dev.txt dist/${{ env.package-name }}-*x86_64*.whl --force-reinstall
rm -rf daft
- name: Spin up services
run: |
pushd ./tests/integration/sql/docker-compose/
docker-compose -f ./docker-compose.yml up -d
popd
- name: Run sql integration tests
run: |
pytest tests/integration/sql -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
with:
payload: |
{
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":rotating_light: [CI] SQL Integration Tests <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|workflow> *FAILED on main* :rotating_light:"
}
}
]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

rust-tests:
runs-on: ${{ matrix.os }}-latest
timeout-minutes: 30
Expand Down
2 changes: 2 additions & 0 deletions daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def get_build_type() -> str:
read_iceberg,
read_json,
read_parquet,
read_sql,
)
from daft.series import Series
from daft.udf import udf
Expand All @@ -94,6 +95,7 @@ def get_build_type() -> str:
"read_parquet",
"read_iceberg",
"read_delta_lake",
"read_sql",
"DataCatalogType",
"DataCatalogTable",
"DataFrame",
Expand Down
3 changes: 3 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def set_execution_config(
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values
are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`)
Expand Down Expand Up @@ -283,6 +284,7 @@ def set_execution_config(
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200.
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand All @@ -302,6 +304,7 @@ def set_execution_config(
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
)

ctx._daft_execution_config = new_daft_execution_config
Expand Down
37 changes: 36 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,21 @@ class JsonSourceConfig:
chunk_size: int | None = None,
): ...

class DatabaseSourceConfig:
"""
Configuration of a database data source.
"""

sql: str

def __init__(self, sql: str): ...

class FileFormatConfig:
"""
Configuration for parsing a particular file format (Parquet, CSV, JSON).
"""

config: ParquetSourceConfig | CsvSourceConfig | JsonSourceConfig
config: ParquetSourceConfig | CsvSourceConfig | JsonSourceConfig | DatabaseSourceConfig

@staticmethod
def from_parquet_config(config: ParquetSourceConfig) -> FileFormatConfig:
Expand All @@ -242,6 +251,12 @@ class FileFormatConfig:
Create a JSON file format config.
"""
...
@staticmethod
def from_database_config(config: DatabaseSourceConfig) -> FileFormatConfig:
"""
Create a database file format config.
"""
...
def file_format(self) -> FileFormat:
"""
Get the file format for this config.
Expand Down Expand Up @@ -583,6 +598,20 @@ class ScanTask:
Create a Catalog Scan Task
"""
...
@staticmethod
def sql_scan_task(
url: str,
file_format: FileFormatConfig,
schema: PySchema,
num_rows: int | None,
storage_config: StorageConfig,
size_bytes: int | None,
pushdowns: Pushdowns | None,
) -> ScanTask:
"""
Create a SQL Scan Task
"""
...

class ScanOperatorHandle:
"""
Expand Down Expand Up @@ -800,6 +829,7 @@ class PyDataType:
@staticmethod
def python() -> PyDataType: ...
def to_arrow(self, cast_tensor_type_for_ray: builtins.bool | None = None) -> pyarrow.DataType: ...
def is_numeric(self) -> builtins.bool: ...
def is_image(self) -> builtins.bool: ...
def is_fixed_shape_image(self) -> builtins.bool: ...
def is_tensor(self) -> builtins.bool: ...
Expand All @@ -826,6 +856,7 @@ class PySchema:
def names(self) -> list[str]: ...
def union(self, other: PySchema) -> PySchema: ...
def eq(self, other: PySchema) -> bool: ...
def estimate_row_size_bytes(self) -> float: ...
@staticmethod
def from_field_name_and_types(names_and_types: list[tuple[str, PyDataType]]) -> PySchema: ...
@staticmethod
Expand Down Expand Up @@ -875,6 +906,7 @@ class PyExpr:
def is_in(self, other: PyExpr) -> PyExpr: ...
def name(self) -> str: ...
def to_field(self, schema: PySchema) -> PyField: ...
def to_sql(self) -> str | None: ...
def __repr__(self) -> str: ...
def __hash__(self) -> int: ...
def __reduce__(self) -> tuple: ...
Expand Down Expand Up @@ -1218,6 +1250,7 @@ class PyDaftExecutionConfig:
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
) -> PyDaftExecutionConfig: ...
@property
def scan_tasks_min_size_bytes(self) -> int: ...
Expand All @@ -1243,6 +1276,8 @@ class PyDaftExecutionConfig:
def csv_inflation_factor(self) -> float: ...
@property
def shuffle_aggregation_default_partitions(self) -> int: ...
@property
def read_sql_partition_size_bytes(self) -> int: ...

class PyDaftPlanningConfig:
def with_config_values(
Expand Down
5 changes: 5 additions & 0 deletions daft/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@
return cls.decimal128(arrow_type.precision, arrow_type.scale)
elif pa.types.is_date32(arrow_type):
return cls.date()
elif pa.types.is_date64(arrow_type):
return cls.timestamp(TimeUnit.ms())
elif pa.types.is_time64(arrow_type):
timeunit = TimeUnit.from_str(pa.type_for_alias(str(arrow_type)).unit)
return cls.time(timeunit)
Expand Down Expand Up @@ -479,6 +481,9 @@
def _is_fixed_shape_image_type(self) -> builtins.bool:
return self._dtype.is_fixed_shape_image()

def _is_numeric_type(self) -> builtins.bool:
return self._dtype.is_numeric()

Check warning on line 485 in daft/datatype.py

View check run for this annotation

Codecov / codecov/patch

daft/datatype.py#L485

Added line #L485 was not covered by tests

def _is_map(self) -> builtins.bool:
return self._dtype.is_map()

Expand Down
3 changes: 3 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,9 @@ def name(self) -> builtins.str:
def __repr__(self) -> builtins.str:
return repr(self._expr)

def _to_sql(self) -> builtins.str | None:
return self._expr.to_sql()

def _to_field(self, schema: Schema) -> Field:
return Field._from_pyfield(self._expr.to_field(schema._schema))

Expand Down
2 changes: 2 additions & 0 deletions daft/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from daft.io._iceberg import read_iceberg
from daft.io._json import read_json
from daft.io._parquet import read_parquet
from daft.io._sql import read_sql
from daft.io.catalog import DataCatalogTable, DataCatalogType
from daft.io.file_path import from_glob_path

Expand All @@ -39,6 +40,7 @@ def _set_linux_cert_paths():
"read_parquet",
"read_iceberg",
"read_delta_lake",
"read_sql",
"IOConfig",
"S3Config",
"AzureConfig",
Expand Down
55 changes: 55 additions & 0 deletions daft/io/_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# isort: dont-add-import: from __future__ import annotations


from typing import Optional

from daft import context
from daft.api_annotations import PublicAPI
from daft.daft import PythonStorageConfig, ScanOperatorHandle, StorageConfig
from daft.dataframe import DataFrame
from daft.logical.builder import LogicalPlanBuilder
from daft.sql.sql_scan import SQLScanOperator


@PublicAPI
def read_sql(
sql: str, url: str, partition_col: Optional[str] = None, num_partitions: Optional[int] = None
) -> DataFrame:
"""Creates a DataFrame from a SQL query.

Example:
>>> df = daft.read_sql("SELECT * FROM my_table", "sqlite:///my_database.db")

.. NOTE::
If partition_col is specified, this function will partition the query by the specified column. You may specify the number of partitions, or let Daft determine the number of partitions.
Daft will first calculate percentiles of the specified column. For example if num_partitions is 3, Daft will calculate the 33rd and 66th percentiles of the specified column, and use these values to partition the query.
If the database does not support the necessary SQL syntax to calculate percentiles, Daft will calculate the min and max of the specified column and partition the query into equal ranges.

Args:
sql (str): SQL query to execute
url (str): URL to the database
partition_col (Optional[str]): Column to partition the data by, defaults to None
num_partitions (Optional[int]): Number of partitions to read the data into,
defaults to None, which will lets Daft determine the number of partitions.

Returns:
DataFrame: Dataframe containing the results of the query
"""

if num_partitions is not None and partition_col is None:
raise ValueError("Failed to execute sql: partition_col must be specified when num_partitions is specified")

Check warning on line 40 in daft/io/_sql.py

View check run for this annotation

Codecov / codecov/patch

daft/io/_sql.py#L39-L40

Added lines #L39 - L40 were not covered by tests

io_config = context.get_context().daft_planning_config.default_io_config
storage_config = StorageConfig.python(PythonStorageConfig(io_config))

Check warning on line 43 in daft/io/_sql.py

View check run for this annotation

Codecov / codecov/patch

daft/io/_sql.py#L42-L43

Added lines #L42 - L43 were not covered by tests

sql_operator = SQLScanOperator(

Check warning on line 45 in daft/io/_sql.py

View check run for this annotation

Codecov / codecov/patch

daft/io/_sql.py#L45

Added line #L45 was not covered by tests
sql,
url,
storage_config,
partition_col=partition_col,
num_partitions=num_partitions,
)
handle = ScanOperatorHandle.from_python_scan_operator(sql_operator)
builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)

Check warning on line 53 in daft/io/_sql.py

View check run for this annotation

Codecov / codecov/patch

daft/io/_sql.py#L52-L53

Added lines #L52 - L53 were not covered by tests

return DataFrame(builder)

Check warning on line 55 in daft/io/_sql.py

View check run for this annotation

Codecov / codecov/patch

daft/io/_sql.py#L55

Added line #L55 was not covered by tests
3 changes: 3 additions & 0 deletions daft/logical/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@
def column_names(self) -> list[str]:
return list(self._schema.names())

def estimate_row_size_bytes(self) -> float:
return self._schema.estimate_row_size_bytes()

Check warning on line 120 in daft/logical/schema.py

View check run for this annotation

Codecov / codecov/patch

daft/logical/schema.py#L120

Added line #L120 was not covered by tests

def __iter__(self) -> Iterator[Field]:
col_names = self.column_names()
yield from (self[name] for name in col_names)
Expand Down
11 changes: 11 additions & 0 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ class TableParseParquetOptions:
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns()


@dataclass(frozen=True)
class TableReadSQLOptions:
"""Options for parsing SQL tables

Args:
predicate_expression: Expression predicate to apply to the table
"""

predicate_expression: Expression | None = None


@dataclass(frozen=True)
class PartialPartitionMetadata:
num_rows: None | int
Expand Down
Empty file added daft/sql/__init__.py
Empty file.
Loading
Loading