Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Minimal indices dtype for FixedShapeSparseTensors #3149

Merged
merged 42 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
bac0507
modify indices field in daft dtypes
sagiahrac Oct 30, 2024
6273b05
fix cast with dynamic indices dtype
sagiahrac Oct 30, 2024
3d80fe4
fix FixedShapeTensorArray to FixedShapeSparseTensor
sagiahrac Oct 31, 2024
42b7ae4
adjust tests indices dtype
sagiahrac Oct 31, 2024
881c438
fix cast_sparse_to_dense_for_inner_dtype
sagiahrac Oct 31, 2024
e80802f
pytest minimal index dtype
sagiahrac Oct 31, 2024
84bf80d
move test
sagiahrac Nov 4, 2024
d1713be
add dynamic dtype test
sagiahrac Nov 4, 2024
5ab9b98
CR changes
sagiahrac Nov 4, 2024
6c46dc0
ruff format
sagiahrac Nov 4, 2024
d22041c
rustfmt
sagiahrac Nov 4, 2024
8735f4f
pre-commit-hooks
sagiahrac Nov 4, 2024
e69121e
backward pyarrow compatibility
sagiahrac Nov 4, 2024
f1f9cbb
improve test naming
sagiahrac Nov 4, 2024
ae4c1ad
[FEAT] Support SQL `INTERVAL` (#3146)
austin362667 Oct 30, 2024
e5dfe41
[BUG]: between panic on unsupported types (#3150)
universalmind303 Oct 30, 2024
9cd12c7
[FEAT] dec128 math (#3143)
samster25 Oct 30, 2024
051793d
[FEAT] enable decimal between (#3154)
samster25 Oct 30, 2024
5d40e65
[FEAT]: Sql common table expressions (CTE's) (#3137)
universalmind303 Oct 30, 2024
988bdfb
[FEATURE] add min_hash alternate hashers (#3052)
andrewgazelka Oct 30, 2024
1dce97d
Bump slackapi/slack-github-action from 1.26.0 to 1.27.0 (#2776)
dependabot[bot] Oct 30, 2024
a887f10
Bump image from 0.24.9 to 0.25.4 (#3088)
dependabot[bot] Oct 30, 2024
249febf
Bump adlfs from 2023.10.0 to 2024.7.0 (#2547)
dependabot[bot] Oct 30, 2024
0ec0c24
[CHORE] Enable debug in test profile (#3135)
advancedxy Oct 31, 2024
c790011
[FEAT]: sql concat and stddev (#3153)
universalmind303 Oct 31, 2024
d753b44
[FEAT]: Throw error for invalid ** usage outside folder segments (e.g…
conradsoon Oct 31, 2024
f5bcd4d
[FEAT] Streaming physical writes for native executor (#2992)
colin-ho Oct 31, 2024
802b086
[CHORE] Cancel tasks spawned on compute runtime (#3128)
colin-ho Oct 31, 2024
965a315
[BUG] Separate PartitionTask done from results (#3155)
jaychia Nov 1, 2024
d91a856
Temporal docs added to expressions.rst (#2487)
sunaysanghani Nov 1, 2024
cc73b36
[CHORE]: tpc-ds datagen (#3103)
universalmind303 Nov 1, 2024
d91105b
[FEAT] Add better detection of Ray Job environment (#3148)
jaychia Nov 1, 2024
eb3de8b
Merge branch 'main' into minimal-uint-type-for-indices
sagiahrac Nov 4, 2024
6a39354
remove test until cast impl
sagiahrac Nov 4, 2024
821b2d3
preserve indices dtype when casting fixed shape sparse tensor to python
sagiahrac Nov 4, 2024
ef7d5aa
add python test for indices minimal dtype
sagiahrac Nov 4, 2024
7778393
Update src/daft-core/src/array/ops/cast.rs
samster25 Nov 13, 2024
6072023
Update src/daft-schema/src/dtype.rs
samster25 Nov 13, 2024
1790fcc
Update src/daft-core/src/array/ops/cast.rs
samster25 Nov 13, 2024
e524651
Update src/daft-core/src/array/ops/cast.rs
samster25 Nov 13, 2024
029ba1b
Update src/daft-core/src/array/ops/sparse_tensor.rs
samster25 Nov 13, 2024
4f6dacd
Update sparse_tensor.rs
samster25 Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 61 additions & 13 deletions src/daft-core/src/array/ops/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,7 @@ fn cast_sparse_to_dense_for_inner_dtype(
if !is_valid {
continue;
}
let index_series: Series = non_zero_indices_array.get(i).unwrap();
let index_series: Series = non_zero_indices_array.get(i).unwrap().cast(&DataType::UInt64)?;
let index_array = index_series.u64().unwrap().as_arrow();
let values_series: Series = non_zero_values_array.get(i).unwrap();
let values_array = values_series.downcast::<<$T as DaftDataType>::ArrayType>()
Expand All @@ -1612,6 +1612,18 @@ fn cast_sparse_to_dense_for_inner_dtype(
Ok(item)
}

fn minimal_uint_dtype(value: u64) -> DataType {
if u8::try_from(value).is_ok() {
DataType::UInt8
} else if u16::try_from(value).is_ok() {
DataType::UInt16
} else if u32::try_from(value).is_ok() {
DataType::UInt32
} else {
DataType::UInt64
}
}

impl SparseTensorArray {
pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> {
match dtype {
Expand Down Expand Up @@ -1678,11 +1690,16 @@ impl SparseTensorArray {
shape,
)));
};

let largest_index = std::cmp::max(shape.iter().product::<u64>(), 1) - 1;
let indices_minimal_inner_dtype = minimal_uint_dtype(largest_index);
let values_array =
va.cast(&DataType::List(Box::new(inner_dtype.as_ref().clone())))?;
let indices_array =
ia.cast(&DataType::List(Box::new(indices_minimal_inner_dtype)))?;
let struct_array = StructArray::new(
Field::new(self.name(), dtype.to_physical()),
vec![values_array, ia.clone().into_series()],
vec![values_array, indices_array],
va.validity().cloned(),
);
let sparse_tensor_array = FixedShapeSparseTensorArray::new(
Expand Down Expand Up @@ -1760,6 +1777,7 @@ impl FixedShapeSparseTensorArray {

let values_arr =
va.cast(&DataType::List(Box::new(inner_dtype.as_ref().clone())))?;
let indices_arr = ia.cast(&DataType::List(Box::new(DataType::UInt64)))?;

// List -> Struct
let shape_offsets = arrow2::offset::OffsetsBuffer::try_from(shape_offsets)?;
Expand All @@ -1776,11 +1794,7 @@ impl FixedShapeSparseTensorArray {
let physical_type = dtype.to_physical();
let struct_array = StructArray::new(
Field::new(self.name(), physical_type),
vec![
values_arr,
ia.clone().into_series(),
shapes_array.into_series(),
],
vec![values_arr, indices_arr, shapes_array.into_series()],
validity.cloned(),
);
Ok(
Expand Down Expand Up @@ -1825,11 +1839,39 @@ impl FixedShapeSparseTensorArray {
Ok(fixed_shape_tensor_array.into_series())
}
#[cfg(feature = "python")]
(DataType::Python, DataType::FixedShapeSparseTensor(inner_dtype, _)) => {
let sparse_tensor_series =
self.cast(&DataType::SparseTensor(inner_dtype.clone()))?;
let sparse_pytensor_series = sparse_tensor_series.cast(&DataType::Python)?;
Ok(sparse_pytensor_series)
(DataType::Python, DataType::FixedShapeSparseTensor(_, tensor_shape)) => {
Python::with_gil(|py| {
let mut pydicts: Vec<Py<PyAny>> = Vec::with_capacity(self.len());
let va = self.values_array();
let ia = self.indices_array();
let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?;
for (values_array, indices_array) in va.into_iter().zip(ia.into_iter()) {
if let (Some(values_array), Some(indices_array)) =
(values_array, indices_array)
{
let py_values_array =
ffi::to_py_array(py, values_array.to_arrow(), &pyarrow)?
.call_method1(pyo3::intern!(py, "to_numpy"), (false,))?;
let py_indices_array =
ffi::to_py_array(py, indices_array.to_arrow(), &pyarrow)?
.call_method1(pyo3::intern!(py, "to_numpy"), (false,))?;
let pydict = pyo3::types::PyDict::new_bound(py);
pydict.set_item("values", py_values_array)?;
pydict.set_item("indices", py_indices_array)?;
pydict.set_item("shape", tensor_shape)?;
pydicts.push(pydict.unbind().into());
} else {
pydicts.push(py.None());
}
}
let py_objects_array =
PseudoArrowArray::new(pydicts.into(), self.physical.validity().cloned());
Ok(PythonArray::new(
Field::new(self.name(), dtype.clone()).into(),
py_objects_array.to_boxed(),
)?
.into_series())
})
}
(_, _) => self.physical.cast(dtype),
}
Expand Down Expand Up @@ -1966,9 +2008,15 @@ impl FixedShapeTensorArray {
offsets_cloned.into(),
validity.cloned(),
);

let largest_index = tensor_shape.iter().product::<u64>() - 1;
samster25 marked this conversation as resolved.
Show resolved Hide resolved
let indices_minimal_inner_dtype = minimal_uint_dtype(largest_index);
let casted_indices = indices_list_arr
.cast(&DataType::List(Box::new(indices_minimal_inner_dtype)))?;

let sparse_struct_array = StructArray::new(
Field::new(self.name(), dtype.to_physical()),
vec![data_list_arr.into_series(), indices_list_arr.into_series()],
vec![data_list_arr.into_series(), casted_indices],
validity.cloned(),
);
Ok(FixedShapeSparseTensorArray::new(
Expand Down
32 changes: 32 additions & 0 deletions src/daft-core/src/array/ops/sparse_tensor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,36 @@ mod tests {

Ok(())
}

#[test]
fn test_fixed_shape_sparse_datatype() -> DaftResult<()> {
const INDICES_IDX: usize = 1;
let element_counts = [2u64.pow(8), 2u64.pow(16), 2u64.pow(32), 2u64.pow(64)];
samster25 marked this conversation as resolved.
Show resolved Hide resolved
let indices_minimal_dtype = [
DataType::UInt8,
DataType::UInt16,
DataType::UInt32,
DataType::UInt64,
];

for (n_elements, minimal_dtype) in element_counts.iter().zip(indices_minimal_dtype.iter()) {
let dtype =
DataType::FixedShapeSparseTensor(Box::new(DataType::Float32), vec![*n_elements]);
let physical_dtype = dtype.to_physical();
if let DataType::Struct(fields) = physical_dtype {
assert_eq!(fields.len(), 2, "Expected exactly 2 fields in Struct");

let indices_field = &fields[INDICES_IDX];
assert_eq!(indices_field.name, "indices");
assert_eq!(
indices_field.dtype,
DataType::List(Box::new(minimal_dtype.clone()))
);
} else {
panic!("Expected Struct DataType, got {:?}", physical_dtype);
}
}

Ok(())
}
}
18 changes: 16 additions & 2 deletions src/daft-schema/src/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,23 @@ impl DataType {
Field::new("indices", List(Box::new(Self::UInt64))),
Field::new("shape", List(Box::new(Self::UInt64))),
]),
FixedShapeSparseTensor(dtype, _) => Struct(vec![
FixedShapeSparseTensor(dtype, shape) => Struct(vec![
Field::new("values", List(Box::new(*dtype.clone()))),
Field::new("indices", List(Box::new(Self::UInt64))),
{
let largest_index = std::cmp::max(shape.iter().product::<u64>(), 1) - 1;
let minimal_indices_dtype = {
if u8::try_from(largest_index).is_ok() {
Self::UInt8
} else if u16::try_from(largest_index).is_ok() {
Self::UInt16
} else if u32::try_from(largest_index).is_ok() {
Self::UInt32
} else {
Self::UInt64
}
};
Field::new("indices", List(Box::new(minimal_indices_dtype)))
},
]),
_ => {
assert!(self.is_physical());
Expand Down
11 changes: 10 additions & 1 deletion tests/series/test_cast.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,11 +1166,20 @@ def test_series_cast_fixed_size_list_to_list() -> None:
### Sparse ###


def minimal_indices_dtype(shape: tuple[int]) -> np.dtype:
largest_index_possible = np.prod(shape) - 1
minimal_dtype = np.min_scalar_type(largest_index_possible)
return minimal_dtype


def to_coo_sparse_dict(ndarray: np.ndarray) -> dict[str, np.ndarray]:
flat_array = ndarray.ravel()
indices = np.flatnonzero(flat_array).astype(np.uint64)
indices = np.flatnonzero(flat_array)
values = flat_array[indices]
shape = list(ndarray.shape)

indices_dtype = minimal_indices_dtype(shape)
indices = indices.astype(indices_dtype)
return {"values": values, "indices": indices, "shape": shape}


Expand Down
16 changes: 16 additions & 0 deletions tests/series/test_sparse_tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,19 @@ def test_sparse_tensor_repr():
╰─────────────────────────────╯
"""
)


@pytest.mark.parametrize("indices_dtype", [np.uint8, np.uint16])
def test_minimal_indices_dtype_for_fixed_shape_sparse(indices_dtype: np.dtype):
largest_index_possible = np.iinfo(indices_dtype).max
tensor_shape = (largest_index_possible + 1, 1)

series = Series.from_pylist([np.zeros(shape=tensor_shape)]).cast(
DataType.tensor(DataType.float32(), shape=tensor_shape)
)
sparse_series = series.cast(DataType.sparse_tensor(DataType.float32(), shape=tensor_shape))

received_tensor = sparse_series.to_pylist().pop()
assert received_tensor["values"].dtype == np.float32
assert received_tensor["indices"].dtype == indices_dtype
assert received_tensor["shape"] == list(tensor_shape)
Loading