Skip to content

Commit

Permalink
Merge branch 'main' into unity-azure-support
Browse files Browse the repository at this point in the history
  • Loading branch information
anilmenon14 committed Oct 10, 2024
2 parents eac67c3 + 73ff3f3 commit c4ad0dd
Show file tree
Hide file tree
Showing 55 changed files with 1,483 additions and 856 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,10 @@ jobs:
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
enable-native-executor: [0, 1]
exclude:
- daft-runner: ray
enable-native-executor: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -583,6 +587,7 @@ jobs:
pytest tests/integration/sql -m 'integration or not integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down
2 changes: 2 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,7 @@ class PyExpr:
def approx_count_distinct(self) -> PyExpr: ...
def approx_percentiles(self, percentiles: float | list[float]) -> PyExpr: ...
def mean(self) -> PyExpr: ...
def stddev(self) -> PyExpr: ...
def min(self) -> PyExpr: ...
def max(self) -> PyExpr: ...
def any_value(self, ignore_nulls: bool) -> PyExpr: ...
Expand Down Expand Up @@ -1336,6 +1337,7 @@ class PySeries:
def count(self, mode: CountMode) -> PySeries: ...
def sum(self) -> PySeries: ...
def mean(self) -> PySeries: ...
def stddev(self) -> PySeries: ...
def min(self) -> PySeries: ...
def max(self) -> PySeries: ...
def agg_list(self) -> PySeries: ...
Expand Down
55 changes: 55 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2118,6 +2118,33 @@ def mean(self, *cols: ColumnInputType) -> "DataFrame":
"""
return self._apply_agg_fn(Expression.mean, cols)

@DataframePublicAPI
def stddev(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs a global standard deviation on the DataFrame
Example:
>>> import daft
>>> df = daft.from_pydict({"col_a":[0,1,2]})
>>> df = df.stddev("col_a")
>>> df.show()
╭───────────────────╮
│ col_a │
│ --- │
│ Float64 │
╞═══════════════════╡
│ 0.816496580927726 │
╰───────────────────╯
<BLANKLINE>
(Showing first 1 of 1 rows)
Args:
*cols (Union[str, Expression]): columns to stddev
Returns:
DataFrame: Globally aggregated standard deviation. Should be a single row.
"""
return self._apply_agg_fn(Expression.stddev, cols)

@DataframePublicAPI
def min(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs a global min on the DataFrame
Expand Down Expand Up @@ -2856,6 +2883,34 @@ def mean(self, *cols: ColumnInputType) -> "DataFrame":
"""
return self.df._apply_agg_fn(Expression.mean, cols, self.group_by)

def stddev(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs grouped standard deviation on this GroupedDataFrame.
Example:
>>> import daft
>>> df = daft.from_pydict({"keys": ["a", "a", "a", "b"], "col_a": [0,1,2,100]})
>>> df = df.groupby("keys").stddev()
>>> df.show()
╭──────┬───────────────────╮
│ keys ┆ col_a │
│ --- ┆ --- │
│ Utf8 ┆ Float64 │
╞══════╪═══════════════════╡
│ a ┆ 0.816496580927726 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ b ┆ 0 │
╰──────┴───────────────────╯
<BLANKLINE>
(Showing first 2 of 2 rows)
Args:
*cols (Union[str, Expression]): columns to stddev
Returns:
DataFrame: DataFrame with grouped standard deviation.
"""
return self.df._apply_agg_fn(Expression.stddev, cols, self.group_by)

def min(self, *cols: ColumnInputType) -> "DataFrame":
"""Perform grouped min on this GroupedDataFrame.
Expand Down
8 changes: 7 additions & 1 deletion daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@

if TYPE_CHECKING:
from collections.abc import Iterator
from datetime import datetime

logger = logging.getLogger(__name__)


class DeltaLakeScanOperator(ScanOperator):
def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
def __init__(
self, table_uri: str, storage_config: StorageConfig, version: int | str | datetime | None = None
) -> None:
super().__init__()

# Unfortunately delta-rs doesn't do very good inference of credentials for S3. Thus the current Daft behavior of passing
Expand Down Expand Up @@ -76,6 +79,9 @@ def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
table_uri, storage_options=io_config_to_storage_options(deltalake_sdk_io_config, table_uri)
)

if version is not None:
self._table.load_as_version(version)

self._storage_config = storage_config
self._schema = Schema.from_pyarrow_schema(self._table.schema().to_pyarrow())
partition_columns = set(self._table.metadata().partition_columns)
Expand Down
5 changes: 5 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,11 @@ def mean(self) -> Expression:
expr = self._expr.mean()
return Expression._from_pyexpr(expr)

def stddev(self) -> Expression:
"""Calculates the standard deviation of the values in the expression"""
expr = self._expr.stddev()
return Expression._from_pyexpr(expr)

def min(self) -> Expression:
"""Calculates the minimum value in the expression"""
expr = self._expr.min()
Expand Down
12 changes: 9 additions & 3 deletions daft/io/_deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
from daft.logical.builder import LogicalPlanBuilder

if TYPE_CHECKING:
from datetime import datetime

from daft.unity_catalog import UnityCatalogTable


@PublicAPI
def read_deltalake(
table: Union[str, DataCatalogTable, "UnityCatalogTable"],
version: Optional[Union[int, str, "datetime"]] = None,
io_config: Optional["IOConfig"] = None,
_multithreaded_io: Optional[bool] = None,
) -> DataFrame:
Expand All @@ -37,8 +40,11 @@ def read_deltalake(
Args:
table: Either a URI for the Delta Lake table or a :class:`~daft.io.catalog.DataCatalogTable` instance
referencing a table in a data catalog, such as AWS Glue Data Catalog or Databricks Unity Catalog.
io_config: A custom :class:`~daft.daft.IOConfig` to use when accessing Delta Lake object storage data. Defaults to None.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
version (optional): If int is passed, read the table with specified version number. Otherwise if string or datetime,
read the timestamp version of the table. Strings must be RFC 3339 and ISO 8601 date and time format.
Datetimes are assumed to be UTC timezone unless specified. By default, read the latest version of the table.
io_config (optional): A custom :class:`~daft.daft.IOConfig` to use when accessing Delta Lake object storage data. Defaults to None.
_multithreaded_io (optional): Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
the amount of system resources (number of connections and thread contention) when running in the Ray runner.
Defaults to None, which will let Daft decide based on the runner it is currently using.
Expand Down Expand Up @@ -69,7 +75,7 @@ def read_deltalake(
raise ValueError(
f"table argument must be a table URI string, DataCatalogTable or UnityCatalogTable instance, but got: {type(table)}, {table}"
)
delta_lake_operator = DeltaLakeScanOperator(table_uri, storage_config=storage_config)
delta_lake_operator = DeltaLakeScanOperator(table_uri, storage_config=storage_config, version=version)

handle = ScanOperatorHandle.from_python_scan_operator(delta_lake_operator)
builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)
Expand Down
4 changes: 4 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,10 @@ def mean(self) -> Series:
assert self._series is not None
return Series._from_pyseries(self._series.mean())

def stddev(self) -> Series:
assert self._series is not None
return Series._from_pyseries(self._series.stddev())

def sum(self) -> Series:
assert self._series is not None
return Series._from_pyseries(self._series.sum())
Expand Down
1 change: 1 addition & 0 deletions docs/source/api_docs/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Aggregations
DataFrame.groupby
DataFrame.sum
DataFrame.mean
DataFrame.stddev
DataFrame.count
DataFrame.min
DataFrame.max
Expand Down
1 change: 1 addition & 0 deletions docs/source/api_docs/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ The following can be used with DataFrame.agg or GroupedDataFrame.agg
Expression.count
Expression.sum
Expression.mean
Expression.stddev
Expression.min
Expression.max
Expression.any_value
Expand Down
7 changes: 7 additions & 0 deletions src/daft-core/src/array/fixed_size_list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ impl FixedSizeListArray {
self.validity.as_ref()
}

pub fn null_count(&self) -> usize {
match self.validity() {
None => 0,
Some(validity) => validity.unset_bits(),
}
}

pub fn concat(arrays: &[&Self]) -> DaftResult<Self> {
if arrays.is_empty() {
return Err(DaftError::ValueError(
Expand Down
136 changes: 57 additions & 79 deletions src/daft-core/src/array/ops/list_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,72 @@ use crate::{
series::IntoSeries,
};

macro_rules! impl_daft_list_agg {
() => {
type Output = DaftResult<ListArray>;

fn list(&self) -> Self::Output {
let child_series = self.clone().into_series();
let offsets =
arrow2::offset::OffsetsBuffer::try_from(vec![0, child_series.len() as i64])?;
let list_field = self.field.to_list_field()?;
Ok(ListArray::new(list_field, child_series, offsets, None))
}

fn grouped_list(&self, groups: &GroupIndices) -> Self::Output {
let mut offsets = Vec::with_capacity(groups.len() + 1);

offsets.push(0);
for g in groups {
offsets.push(offsets.last().unwrap() + g.len() as i64);
}

let total_capacity = *offsets.last().unwrap();

let mut growable: Box<dyn Growable> = Box::new(Self::make_growable(
self.name(),
self.data_type(),
vec![self],
self.null_count() > 0,
total_capacity as usize,
));

for g in groups {
for idx in g {
growable.extend(0, *idx as usize, 1);
}
}
let list_field = self.field.to_list_field()?;

Ok(ListArray::new(
list_field,
growable.build()?,
arrow2::offset::OffsetsBuffer::try_from(offsets)?,
None,
))
}
};
}

impl<T> DaftListAggable for DataArray<T>
where
T: DaftArrowBackedType,
Self: IntoSeries,
Self: GrowableArray,
{
type Output = DaftResult<ListArray>;
fn list(&self) -> Self::Output {
let child_series = self.clone().into_series();
let offsets = arrow2::offset::OffsetsBuffer::try_from(vec![0, child_series.len() as i64])?;
let list_field = self.field.to_list_field()?;
Ok(ListArray::new(list_field, child_series, offsets, None))
}

fn grouped_list(&self, groups: &GroupIndices) -> Self::Output {
let mut offsets = Vec::with_capacity(groups.len() + 1);

offsets.push(0);
for g in groups {
offsets.push(offsets.last().unwrap() + g.len() as i64);
}
impl_daft_list_agg!();
}

let total_capacity = *offsets.last().unwrap();
impl DaftListAggable for ListArray {
impl_daft_list_agg!();
}

let mut growable: Box<dyn Growable> = Box::new(Self::make_growable(
self.name(),
self.data_type(),
vec![self],
self.data.null_count() > 0,
total_capacity as usize,
));
impl DaftListAggable for FixedSizeListArray {
impl_daft_list_agg!();
}

for g in groups {
for idx in g {
growable.extend(0, *idx as usize, 1);
}
}
let list_field = self.field.to_list_field()?;

Ok(ListArray::new(
list_field,
growable.build()?,
arrow2::offset::OffsetsBuffer::try_from(offsets)?,
None,
))
}
impl DaftListAggable for StructArray {
impl_daft_list_agg!();
}

#[cfg(feature = "python")]
Expand Down Expand Up @@ -95,45 +115,3 @@ impl DaftListAggable for crate::datatypes::PythonArray {
Self::new(self.field().clone().into(), Box::new(arrow_array))
}
}

impl DaftListAggable for ListArray {
type Output = DaftResult<Self>;

fn list(&self) -> Self::Output {
// TODO(FixedSizeList)
todo!("Requires new ListArrays for implementation")
}

fn grouped_list(&self, _groups: &GroupIndices) -> Self::Output {
// TODO(FixedSizeList)
todo!("Requires new ListArrays for implementation")
}
}

impl DaftListAggable for FixedSizeListArray {
type Output = DaftResult<ListArray>;

fn list(&self) -> Self::Output {
// TODO(FixedSizeList)
todo!("Requires new ListArrays for implementation")
}

fn grouped_list(&self, _groups: &GroupIndices) -> Self::Output {
// TODO(FixedSizeList)
todo!("Requires new ListArrays for implementation")
}
}

impl DaftListAggable for StructArray {
type Output = DaftResult<ListArray>;

fn list(&self) -> Self::Output {
// TODO(FixedSizeList)
todo!("Requires new ListArrays for implementation")
}

fn grouped_list(&self, _groups: &GroupIndices) -> Self::Output {
// TODO(FixedSizeList)
todo!("Requires new ListArrays for implementation")
}
}
Loading

0 comments on commit c4ad0dd

Please sign in to comment.