Skip to content

Commit

Permalink
Merge branch 'main' into charles/azure-ls
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiayue Charles Lin committed Sep 29, 2023
2 parents 2e91525 + 069432d commit 06f033e
Show file tree
Hide file tree
Showing 36 changed files with 1,051 additions and 126 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ jobs:

publish:
name: Publish wheels to PYPI and Anaconda
if: ${{ (github.ref == 'refs/heads/main') }}
runs-on: ubuntu-latest
needs:
- build-and-test
Expand Down Expand Up @@ -166,7 +165,7 @@ jobs:
run: conda install -q -y anaconda-client "urllib3<2.0"

- name: Upload wheels to anaconda nightly
if: ${{ success() && (env.IS_SCHEDULE_DISPATCH == 'true' || env.IS_PUSH == 'true') }}
if: ${{ success() && (((env.IS_SCHEDULE_DISPATCH == 'true') && (github.ref == 'refs/heads/main')) || env.IS_PUSH == 'true') }}
shell: bash -el {0}
env:
DAFT_STAGING_UPLOAD_TOKEN: ${{ secrets.DAFT_STAGING_UPLOAD_TOKEN }}
Expand Down
18 changes: 10 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ rand = "^0.8"
serde_json = "1.0.104"
snafu = "0.7.4"
tokio = {version = "1.32.0", features = ["net", "time", "bytes", "process", "signal", "macros", "rt", "rt-multi-thread"]}
tokio-stream = {version = "0.1.14", features = ["fs"]}

[workspace.dependencies.arrow2]
git = "https://github.com/Eventual-Inc/arrow2"
Expand Down
6 changes: 5 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ class PyExpr:
def utf8_endswith(self, pattern: PyExpr) -> PyExpr: ...
def utf8_startswith(self, pattern: PyExpr) -> PyExpr: ...
def utf8_contains(self, pattern: PyExpr) -> PyExpr: ...
def utf8_split(self, pattern: PyExpr) -> PyExpr: ...
def utf8_length(self) -> PyExpr: ...
def image_decode(self) -> PyExpr: ...
def image_encode(self, image_format: ImageFormat) -> PyExpr: ...
Expand Down Expand Up @@ -617,6 +618,7 @@ class PySeries:
def utf8_endswith(self, pattern: PySeries) -> PySeries: ...
def utf8_startswith(self, pattern: PySeries) -> PySeries: ...
def utf8_contains(self, pattern: PySeries) -> PySeries: ...
def utf8_split(self, pattern: PySeries) -> PySeries: ...
def utf8_length(self) -> PySeries: ...
def is_nan(self) -> PySeries: ...
def dt_date(self) -> PySeries: ...
Expand Down Expand Up @@ -673,7 +675,9 @@ class PhysicalPlanScheduler:
A work scheduler for physical query plans.
"""

def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan: ...
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan: ...

class LogicalPlanBuilder:
"""
Expand Down
4 changes: 2 additions & 2 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from typing import Iterator, TypeVar, cast

from daft.context import get_context
from daft.daft import (
FileFormat,
FileFormatConfig,
Expand All @@ -29,10 +28,11 @@ def tabular_scan(
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
limit: int,
is_ray_runner: bool,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
# TODO(Clark): Fix this Ray runner hack.
part = Table._from_pytable(file_info_table)
if get_context().is_ray_runner:
if is_ray_runner:
import ray

parts = [ray.put(part)]
Expand Down
18 changes: 17 additions & 1 deletion daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ def endswith(self, suffix: str | Expression) -> Expression:
suffix_expr = Expression._to_expression(suffix)
return Expression._from_pyexpr(self._expr.utf8_endswith(suffix_expr._expr))

def startswith(self, prefix: str) -> Expression:
def startswith(self, prefix: str | Expression) -> Expression:
"""Checks whether each string starts with the given pattern in a string column
Example:
Expand All @@ -587,6 +587,22 @@ def startswith(self, prefix: str) -> Expression:
prefix_expr = Expression._to_expression(prefix)
return Expression._from_pyexpr(self._expr.utf8_startswith(prefix_expr._expr))

def split(self, pattern: str | Expression) -> Expression:
"""Splits each string on the given pattern, into one or more strings.
Example:
>>> col("x").str.split(",")
>>> col("x").str.split(col("pattern"))
Args:
pattern: The pattern on which each string should be split, or a column to pick such patterns from.
Returns:
Expression: A List[Utf8] expression containing the string splits for each string in the column.
"""
pattern_expr = Expression._to_expression(pattern)
return Expression._from_pyexpr(self._expr.utf8_split(pattern_expr._expr))

def concat(self, other: str) -> Expression:
"""Concatenates two string expressions together
Expand Down
4 changes: 3 additions & 1 deletion daft/planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ class PhysicalPlanScheduler(ABC):
"""

@abstractmethod
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan:
pass
4 changes: 3 additions & 1 deletion daft/planner/py_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ class PyPhysicalPlanScheduler(PhysicalPlanScheduler):
def __init__(self, plan: logical_plan.LogicalPlan):
self._plan = plan

def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(physical_plan_factory._get_physical_plan(self._plan, psets))
6 changes: 4 additions & 2 deletions daft/planner/rust_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ class RustPhysicalPlanScheduler(PhysicalPlanScheduler):
def __init__(self, scheduler: _PhysicalPlanScheduler):
self._scheduler = scheduler

def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(self._scheduler.to_partition_tasks(psets))
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(self._scheduler.to_partition_tasks(psets, is_ray_runner))
2 changes: 1 addition & 1 deletion daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[Table]:
if entry.value is not None
}
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets)
tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=False)

with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
partitions_gen = self._physical_plan_to_partitions(tasks)
Expand Down
2 changes: 1 addition & 1 deletion daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def _run_plan(
from loguru import logger

# Get executable tasks from plan scheduler.
tasks = plan_scheduler.to_partition_tasks(psets)
tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=True)

# Note: For autoscaling clusters, we will probably want to query cores dynamically.
# Keep in mind this call takes about 0.3ms.
Expand Down
6 changes: 6 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,12 @@ def contains(self, pattern: Series) -> Series:
assert self._series is not None and pattern._series is not None
return Series._from_pyseries(self._series.utf8_contains(pattern._series))

def split(self, pattern: Series) -> Series:
if not isinstance(pattern, Series):
raise ValueError(f"expected another Series but got {type(pattern)}")
assert self._series is not None and pattern._series is not None
return Series._from_pyseries(self._series.utf8_split(pattern._series))

def concat(self, other: Series) -> Series:
if not isinstance(other, Series):
raise ValueError(f"expected another Series but got {type(other)}")
Expand Down
5 changes: 4 additions & 1 deletion src/common/error/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum DaftError {
path: String,
source: GenericError,
},
InternalError(String),
External(GenericError),
}

Expand All @@ -31,7 +32,8 @@ impl std::error::Error for DaftError {
| DaftError::TypeError(_)
| DaftError::ComputeError(_)
| DaftError::ArrowError(_)
| DaftError::ValueError(_) => None,
| DaftError::ValueError(_)
| DaftError::InternalError(_) => None,
DaftError::IoError(io_error) => Some(io_error),
DaftError::FileNotFound { source, .. } | DaftError::External(source) => Some(&**source),
#[cfg(feature = "python")]
Expand Down Expand Up @@ -96,6 +98,7 @@ impl Display for DaftError {
Self::ComputeError(s) => write!(f, "DaftError::ComputeError {s}"),
Self::ArrowError(s) => write!(f, "DaftError::ArrowError {s}"),
Self::ValueError(s) => write!(f, "DaftError::ValueError {s}"),
Self::InternalError(s) => write!(f, "DaftError::InternalError {s}"),
#[cfg(feature = "python")]
Self::PyO3Error(e) => write!(f, "DaftError::PyO3Error {e}"),
Self::IoError(e) => write!(f, "DaftError::IoError {e}"),
Expand Down
3 changes: 2 additions & 1 deletion src/daft-core/src/array/ops/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ impl FullNull for ListArray {
Self::new(
Field::new(name, dtype.clone()),
empty_flat_child,
OffsetsBuffer::try_from(repeat(0).take(length).collect::<Vec<_>>()).unwrap(),
OffsetsBuffer::try_from(repeat(0).take(length + 1).collect::<Vec<_>>())
.unwrap(),
Some(validity),
)
}
Expand Down
117 changes: 116 additions & 1 deletion src/daft-core/src/array/ops/utf8.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,66 @@
use crate::datatypes::{BooleanArray, UInt64Array, Utf8Array};
use crate::{
array::ListArray,
datatypes::{BooleanArray, Field, UInt64Array, Utf8Array},
DataType, Series,
};
use arrow2;

use common_error::{DaftError, DaftResult};

use super::{as_arrow::AsArrow, full::FullNull};

fn split_array_on_patterns<'a, T, U>(
arr_iter: T,
pattern_iter: U,
buffer_len: usize,
name: &str,
) -> DaftResult<ListArray>
where
T: arrow2::trusted_len::TrustedLen + Iterator<Item = Option<&'a str>>,
U: Iterator<Item = Option<&'a str>>,
{
// This will overallocate by pattern_len * N_i, where N_i is the number of pattern occurences in the ith string in arr_iter.
let mut splits = arrow2::array::MutableUtf8Array::with_capacity(buffer_len);
// arr_iter implements TrustedLen, so we can always use size_hint().1 as the exact length of the iterator. The only
// time this would fail is if the length of the iterator exceeds usize::MAX, which should never happen for an i64
// offset array, since the array length can't exceed i64::MAX on 64-bit machines.
let arr_len = arr_iter.size_hint().1.unwrap();
let mut offsets = arrow2::offset::Offsets::new();
let mut validity = arrow2::bitmap::MutableBitmap::with_capacity(arr_len);
for (val, pat) in arr_iter.zip(pattern_iter) {
let mut num_splits = 0i64;
match (val, pat) {
(Some(val), Some(pat)) => {
for split in val.split(pat) {
splits.push(Some(split));
num_splits += 1;
}
validity.push(true);
}
(_, _) => {
validity.push(false);
}
}
offsets.try_push(num_splits)?;
}
// Shrink splits capacity to current length, since we will have overallocated if any of the patterns actually occurred in the strings.
splits.shrink_to_fit();
let splits: arrow2::array::Utf8Array<i64> = splits.into();
let offsets: arrow2::offset::OffsetsBuffer<i64> = offsets.into();
let validity: Option<arrow2::bitmap::Bitmap> = match validity.unset_bits() {
0 => None,
_ => Some(validity.into()),
};
let flat_child =
Series::try_from(("splits", Box::new(splits) as Box<dyn arrow2::array::Array>))?;
Ok(ListArray::new(
Field::new(name, DataType::List(Box::new(DataType::Utf8))),
flat_child,
offsets,
validity,
))
}

impl Utf8Array {
pub fn endswith(&self, pattern: &Utf8Array) -> DaftResult<BooleanArray> {
self.binary_broadcasted_compare(pattern, |data: &str, pat: &str| data.ends_with(pat))
Expand All @@ -18,6 +74,65 @@ impl Utf8Array {
self.binary_broadcasted_compare(pattern, |data: &str, pat: &str| data.contains(pat))
}

pub fn split(&self, pattern: &Utf8Array) -> DaftResult<ListArray> {
let self_arrow = self.as_arrow();
let pattern_arrow = pattern.as_arrow();
// Handle all-null cases.
if self_arrow
.validity()
.map_or(false, |v| v.unset_bits() == v.len())
|| pattern_arrow
.validity()
.map_or(false, |v| v.unset_bits() == v.len())
{
return Ok(ListArray::full_null(
self.name(),
&DataType::List(Box::new(DataType::Utf8)),
std::cmp::max(self.len(), pattern.len()),
));
// Handle empty cases.
} else if self.is_empty() || pattern.is_empty() {
return Ok(ListArray::empty(
self.name(),
&DataType::List(Box::new(DataType::Utf8)),
));
}
let buffer_len = self_arrow.values().len();
match (self.len(), pattern.len()) {
// Matching len case:
(self_len, pattern_len) if self_len == pattern_len => split_array_on_patterns(
self_arrow.into_iter(),
pattern_arrow.into_iter(),
buffer_len,
self.name(),
),
// Broadcast pattern case:
(self_len, 1) => {
let pattern_scalar_value = pattern.get(0).unwrap();
split_array_on_patterns(
self_arrow.into_iter(),
std::iter::repeat(Some(pattern_scalar_value)).take(self_len),
buffer_len,
self.name(),
)
}
// Broadcast self case:
(1, pattern_len) => {
let self_scalar_value = self.get(0).unwrap();
split_array_on_patterns(
std::iter::repeat(Some(self_scalar_value)).take(pattern_len),
pattern_arrow.into_iter(),
buffer_len * pattern_len,
self.name(),
)
}
// Mismatched len case:
(self_len, pattern_len) => Err(DaftError::ComputeError(format!(
"lhs and rhs have different length arrays: {self_len} vs {pattern_len}"
))),
}
}

pub fn length(&self) -> DaftResult<UInt64Array> {
let self_arrow = self.as_arrow();
let arrow_result = self_arrow
Expand Down
Loading

0 comments on commit 06f033e

Please sign in to comment.