diff --git a/Cargo.lock b/Cargo.lock index e567ba59bf..e6a439995f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1155,6 +1155,7 @@ dependencies = [ "indexmap 2.1.0", "lazy_static", "log", + "mur3", "ndarray", "num-derive", "num-traits", @@ -2454,6 +2455,12 @@ dependencies = [ "target-features", ] +[[package]] +name = "mur3" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97af489e1e21b68de4c390ecca6703318bc1aa16e9733bcb62c089b73c6fbb1b" + [[package]] name = "native-tls" version = "0.2.11" diff --git a/daft/daft.pyi b/daft/daft.pyi index 8cd1f86ceb..1f413dac16 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -935,6 +935,7 @@ class PySeries: def if_else(self, other: PySeries, predicate: PySeries) -> PySeries: ... def is_null(self) -> PySeries: ... def not_null(self) -> PySeries: ... + def murmur3_32(self) -> PySeries: ... def _debug_bincode_serialize(self) -> bytes: ... @staticmethod def _debug_bincode_deserialize(b: bytes) -> PySeries: ... diff --git a/daft/series.py b/daft/series.py index a416b29dfd..b2e2372250 100644 --- a/daft/series.py +++ b/daft/series.py @@ -320,6 +320,9 @@ def hash(self, seed: Series | None = None) -> Series: return Series._from_pyseries(self._series.hash(seed._series if seed is not None else None)) + def murmur3_32(self) -> Series: + return Series._from_pyseries(self._series.murmur3_32()) + def __repr__(self) -> str: return repr(self._series) diff --git a/src/daft-core/Cargo.toml b/src/daft-core/Cargo.toml index 4372178cdf..1e8df38bfc 100644 --- a/src/daft-core/Cargo.toml +++ b/src/daft-core/Cargo.toml @@ -12,6 +12,7 @@ html-escape = {workspace = true} indexmap = {workspace = true, features = ["serde"]} lazy_static = {workspace = true} log = {workspace = true} +mur3 = "0.1.0" ndarray = "0.15.6" num-derive = {workspace = true} num-traits = {workspace = true} diff --git a/src/daft-core/src/array/ops/hash.rs b/src/daft-core/src/array/ops/hash.rs index e72feda295..370c4208e6 100644 --- a/src/daft-core/src/array/ops/hash.rs +++ b/src/daft-core/src/array/ops/hash.rs @@ -1,6 +1,10 @@ use crate::{ array::DataArray, - datatypes::{BinaryArray, BooleanArray, DaftNumericType, NullArray, UInt64Array, Utf8Array}, + datatypes::{ + logical::{DateArray, Decimal128Array, TimestampArray}, + BinaryArray, BooleanArray, DaftNumericType, Int16Array, Int32Array, Int64Array, Int8Array, + NullArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, Utf8Array, + }, kernels, }; @@ -70,3 +74,138 @@ impl NullArray { Ok(DataArray::from((self.name(), Box::new(result)))) } } + +macro_rules! impl_int_murmur3_32 { + ($ArrayT:ty) => { + impl $ArrayT { + pub fn murmur3_32(&self) -> DaftResult { + let as_arrowed = self.as_arrow(); + let has_nulls = as_arrowed + .validity() + .map(|v| v.unset_bits() > 0) + .unwrap_or(false); + if has_nulls { + murmur3_32_hash_from_iter_with_nulls( + self.name(), + as_arrowed + .into_iter() + .map(|v| v.map(|v| (*v as i64).to_le_bytes())), + ) + } else { + murmur3_32_hash_from_iter_no_nulls( + self.name(), + as_arrowed.values_iter().map(|v| (*v as i64).to_le_bytes()), + ) + } + } + } + }; +} + +impl_int_murmur3_32!(Int8Array); +impl_int_murmur3_32!(Int16Array); +impl_int_murmur3_32!(Int32Array); +impl_int_murmur3_32!(Int64Array); + +impl_int_murmur3_32!(UInt8Array); +impl_int_murmur3_32!(UInt16Array); +impl_int_murmur3_32!(UInt32Array); +impl_int_murmur3_32!(UInt64Array); + +impl Utf8Array { + pub fn murmur3_32(&self) -> DaftResult { + let as_arrowed = self.as_arrow(); + let has_nulls = as_arrowed + .validity() + .map(|v| v.unset_bits() > 0) + .unwrap_or(false); + if has_nulls { + murmur3_32_hash_from_iter_with_nulls( + self.name(), + as_arrowed.into_iter().map(|v| v.map(|v| v.as_bytes())), + ) + } else { + murmur3_32_hash_from_iter_no_nulls( + self.name(), + as_arrowed.values_iter().map(|v| v.as_bytes()), + ) + } + } +} + +impl BinaryArray { + pub fn murmur3_32(&self) -> DaftResult { + let as_arrowed = self.as_arrow(); + let has_nulls = as_arrowed + .validity() + .map(|v| v.unset_bits() > 0) + .unwrap_or(false); + if has_nulls { + murmur3_32_hash_from_iter_with_nulls(self.name(), as_arrowed.into_iter()) + } else { + murmur3_32_hash_from_iter_no_nulls(self.name(), as_arrowed.values_iter()) + } + } +} + +impl DateArray { + pub fn murmur3_32(&self) -> DaftResult { + self.physical.murmur3_32() + } +} + +impl TimestampArray { + pub fn murmur3_32(&self) -> DaftResult { + let us = self.cast(&crate::DataType::Timestamp( + crate::datatypes::TimeUnit::Microseconds, + None, + ))?; + us.timestamp()?.physical.murmur3_32() + } +} + +impl Decimal128Array { + pub fn murmur3_32(&self) -> DaftResult { + let arr = self.physical.as_arrow(); + let hashes = arr.into_iter().map(|d| { + d.map(|d| { + let twos_compliment = u128::from_ne_bytes(d.to_ne_bytes()); + let bits_needed = u128::BITS - twos_compliment.leading_zeros(); + let bytes_needed = bits_needed.div_ceil(8) as usize; + let be_bytes = twos_compliment.to_be_bytes(); + let unsigned = + mur3::murmurhash3_x86_32(&be_bytes[(be_bytes.len() - bytes_needed)..], 0); + i32::from_ne_bytes(unsigned.to_ne_bytes()) + }) + }); + let array = Box::new(arrow2::array::Int32Array::from_iter(hashes)); + Ok(Int32Array::from((self.name(), array))) + } +} + +fn murmur3_32_hash_from_iter_with_nulls>( + name: &str, + byte_iter: impl Iterator>, +) -> DaftResult { + let hashes = byte_iter.map(|b| { + b.map(|v| { + let unsigned = mur3::murmurhash3_x86_32(v.as_ref(), 0); + i32::from_ne_bytes(unsigned.to_ne_bytes()) + }) + }); + let array = Box::new(arrow2::array::Int32Array::from_iter(hashes)); + Ok(Int32Array::from((name, array))) +} + +fn murmur3_32_hash_from_iter_no_nulls>( + name: &str, + byte_iter: impl Iterator, +) -> DaftResult { + let hashes = byte_iter + .map(|b| { + let unsigned = mur3::murmurhash3_x86_32(b.as_ref(), 0); + i32::from_ne_bytes(unsigned.to_ne_bytes()) + }) + .collect::>(); + Ok(Int32Array::from((name, hashes))) +} diff --git a/src/daft-core/src/lib.rs b/src/daft-core/src/lib.rs index a52bb34ea9..4c9354ac4b 100644 --- a/src/daft-core/src/lib.rs +++ b/src/daft-core/src/lib.rs @@ -1,5 +1,5 @@ #![feature(let_chains)] - +#![feature(int_roundings)] pub mod array; pub mod count_mode; pub mod datatypes; diff --git a/src/daft-core/src/python/series.rs b/src/daft-core/src/python/series.rs index 99afad71c2..571052324e 100644 --- a/src/daft-core/src/python/series.rs +++ b/src/daft-core/src/python/series.rs @@ -303,6 +303,9 @@ impl PySeries { pub fn partitioning_years(&self) -> PyResult { Ok(self.series.partitioning_years()?.into()) } + pub fn murmur3_32(&self) -> PyResult { + Ok(self.series.murmur3_32()?.into_series().into()) + } pub fn list_lengths(&self) -> PyResult { Ok(self.series.list_lengths()?.into_series().into()) diff --git a/src/daft-core/src/series/ops/downcast.rs b/src/daft-core/src/series/ops/downcast.rs index dd3edc05a3..8d8469fe06 100644 --- a/src/daft-core/src/series/ops/downcast.rs +++ b/src/daft-core/src/series/ops/downcast.rs @@ -1,5 +1,5 @@ use crate::array::{FixedSizeListArray, ListArray, StructArray}; -use crate::datatypes::logical::FixedShapeImageArray; +use crate::datatypes::logical::{DateArray, Decimal128Array, FixedShapeImageArray, TimestampArray}; use crate::datatypes::*; use crate::series::array_impl::ArrayWrapper; use crate::series::Series; @@ -95,6 +95,18 @@ impl Series { self.downcast() } + pub fn date(&self) -> DaftResult<&DateArray> { + self.downcast() + } + + pub fn timestamp(&self) -> DaftResult<&TimestampArray> { + self.downcast() + } + + pub fn decimal128(&self) -> DaftResult<&Decimal128Array> { + self.downcast() + } + #[cfg(feature = "python")] pub fn python(&self) -> DaftResult<&PythonArray> { self.downcast() diff --git a/src/daft-core/src/series/ops/hash.rs b/src/daft-core/src/series/ops/hash.rs index 37aa74296a..80359f06e5 100644 --- a/src/daft-core/src/series/ops/hash.rs +++ b/src/daft-core/src/series/ops/hash.rs @@ -1,4 +1,8 @@ -use crate::{datatypes::UInt64Array, series::Series, with_match_comparable_daft_types}; +use crate::{ + datatypes::{Int32Array, UInt64Array}, + series::Series, + with_match_comparable_daft_types, +}; use common_error::DaftResult; impl Series { @@ -9,4 +13,24 @@ impl Series { downcasted.hash(seed) }) } + + pub fn murmur3_32(&self) -> DaftResult { + use crate::DataType::*; + match self.data_type() { + Int8 => self.i8()?.murmur3_32(), + Int16 => self.i16()?.murmur3_32(), + Int32 => self.i32()?.murmur3_32(), + Int64 => self.i64()?.murmur3_32(), + UInt8 => self.u8()?.murmur3_32(), + UInt16 => self.u16()?.murmur3_32(), + UInt32 => self.u32()?.murmur3_32(), + UInt64 => self.u64()?.murmur3_32(), + Utf8 => self.utf8()?.murmur3_32(), + Binary => self.binary()?.murmur3_32(), + Date => self.date()?.murmur3_32(), + Timestamp(..) => self.timestamp()?.murmur3_32(), + Decimal128(..) => self.decimal128()?.murmur3_32(), + v => panic!("murmur3 hash not implemented for datatype: {v}"), + } + } } diff --git a/tests/series/test_hash.py b/tests/series/test_hash.py index c75119684b..f2e3f8951a 100644 --- a/tests/series/test_hash.py +++ b/tests/series/test_hash.py @@ -1,7 +1,11 @@ from __future__ import annotations +import decimal +from datetime import date, datetime + import numpy as np import pytest +import pytz import xxhash from daft.datatype import DataType @@ -114,3 +118,102 @@ def test_hash_int_array_with_bad_length(): with pytest.raises(ValueError, match="seed length does not match array length"): arr.hash(bad_seed) + + +@pytest.mark.parametrize( + "dtype", + [ + DataType.uint8(), + DataType.uint16(), + DataType.uint32(), + DataType.uint64(), + DataType.int8(), + DataType.int16(), + DataType.int32(), + DataType.int64(), + ], +) +def test_murmur3_32_hash_int(dtype): + arr = Series.from_pylist([34, None]).cast(dtype) + hashes = arr.murmur3_32() + assert hashes.to_pylist() == [2017239379, None] + + +@pytest.mark.parametrize( + "dtype", + [ + DataType.int8(), + DataType.int16(), + DataType.int32(), + DataType.int64(), + ], +) +def test_murmur3_32_hash_signed_int(dtype): + arr = Series.from_pylist([-1, 34, None]).cast(dtype) + hashes = arr.murmur3_32() + assert hashes.to_pylist() == [1651860712, 2017239379, None] + + +def test_murmur3_32_hash_string(): + arr = Series.from_pylist(["iceberg", None]) + assert arr.datatype() == DataType.string() + hashes = arr.murmur3_32() + assert hashes.to_pylist() == [1210000089, None] + + +def test_murmur3_32_hash_string(): + arr = Series.from_pylist([b"\x00\x01\x02\x03", None]) + assert arr.datatype() == DataType.binary() + hashes = arr.murmur3_32() + java_answer = -188683207 + assert hashes.to_pylist() == [java_answer, None] + + +def test_murmur3_32_hash_date(): + arr = Series.from_pylist([date(2017, 11, 16), None]) + assert arr.datatype() == DataType.date() + hashes = arr.murmur3_32() + assert hashes.to_pylist() == [-653330422, None] + + +def test_murmur3_32_hash_timestamp(): + arr = Series.from_pylist([datetime(2017, 11, 16, 22, 31, 8), None]) + hashes = arr.murmur3_32() + assert hashes.to_pylist() == [-2047944441, None] + + +def test_murmur3_32_hash_timestamp_with_tz(): + dt = datetime(2017, 11, 16, 14, 31, 8) + pst = pytz.timezone("US/Pacific") + dt = pst.localize(dt) + arr = Series.from_pylist([dt, None]) + hashes = arr.murmur3_32() + assert hashes.to_pylist() == [-2047944441, None] + + +def test_murmur3_32_hash_timestamp_with_tz_nanoseconds(): + dt = datetime(2017, 11, 16, 14, 31, 8) + pst = pytz.timezone("US/Pacific") + dt = pst.localize(dt) + arr = Series.from_pylist([dt, None]) + arr = arr.cast(DataType.timestamp("ns", "UTC")) + hashes = arr.murmur3_32() + assert hashes.to_pylist() == [-2047944441, None] + + +def test_murmur3_32_hash_decimal_unscaled(): + arr = Series.from_pylist([decimal.Decimal(1420), None]) + hashes = arr.murmur3_32() + assert hashes.to_pylist() == [-500754589, None] + + +def test_murmur3_32_hash_decimal_scaled(): + arr = Series.from_pylist([decimal.Decimal("14.20"), None]) + hashes = arr.murmur3_32() + assert hashes.to_pylist() == [-500754589, None] + + +def test_murmur3_32_hash_decimal_full_scaled(): + arr = Series.from_pylist([decimal.Decimal(".00001420"), None]) + hashes = arr.murmur3_32() + assert hashes.to_pylist() == [-500754589, None]