Skip to content

Commit

Permalink
[FEAT] Iceberg Murmur3 Hash function (#1778)
Browse files Browse the repository at this point in the history
* Implements Iceberg 32bit murmur hash function that follows this spec
https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
  • Loading branch information
samster25 authored Jan 12, 2024
1 parent 3939fe4 commit 146e68b
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 4 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,7 @@ class PySeries:
def if_else(self, other: PySeries, predicate: PySeries) -> PySeries: ...
def is_null(self) -> PySeries: ...
def not_null(self) -> PySeries: ...
def murmur3_32(self) -> PySeries: ...
def _debug_bincode_serialize(self) -> bytes: ...
@staticmethod
def _debug_bincode_deserialize(b: bytes) -> PySeries: ...
Expand Down
3 changes: 3 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ def hash(self, seed: Series | None = None) -> Series:

return Series._from_pyseries(self._series.hash(seed._series if seed is not None else None))

def murmur3_32(self) -> Series:
return Series._from_pyseries(self._series.murmur3_32())

def __repr__(self) -> str:
return repr(self._series)

Expand Down
1 change: 1 addition & 0 deletions src/daft-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ html-escape = {workspace = true}
indexmap = {workspace = true, features = ["serde"]}
lazy_static = {workspace = true}
log = {workspace = true}
mur3 = "0.1.0"
ndarray = "0.15.6"
num-derive = {workspace = true}
num-traits = {workspace = true}
Expand Down
141 changes: 140 additions & 1 deletion src/daft-core/src/array/ops/hash.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::{
array::DataArray,
datatypes::{BinaryArray, BooleanArray, DaftNumericType, NullArray, UInt64Array, Utf8Array},
datatypes::{
logical::{DateArray, Decimal128Array, TimestampArray},
BinaryArray, BooleanArray, DaftNumericType, Int16Array, Int32Array, Int64Array, Int8Array,
NullArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, Utf8Array,
},
kernels,
};

Expand Down Expand Up @@ -70,3 +74,138 @@ impl NullArray {
Ok(DataArray::from((self.name(), Box::new(result))))
}
}

macro_rules! impl_int_murmur3_32 {
($ArrayT:ty) => {
impl $ArrayT {
pub fn murmur3_32(&self) -> DaftResult<Int32Array> {
let as_arrowed = self.as_arrow();
let has_nulls = as_arrowed
.validity()
.map(|v| v.unset_bits() > 0)
.unwrap_or(false);
if has_nulls {
murmur3_32_hash_from_iter_with_nulls(
self.name(),
as_arrowed
.into_iter()
.map(|v| v.map(|v| (*v as i64).to_le_bytes())),
)
} else {
murmur3_32_hash_from_iter_no_nulls(
self.name(),
as_arrowed.values_iter().map(|v| (*v as i64).to_le_bytes()),
)
}
}
}
};
}

impl_int_murmur3_32!(Int8Array);
impl_int_murmur3_32!(Int16Array);
impl_int_murmur3_32!(Int32Array);
impl_int_murmur3_32!(Int64Array);

impl_int_murmur3_32!(UInt8Array);
impl_int_murmur3_32!(UInt16Array);
impl_int_murmur3_32!(UInt32Array);
impl_int_murmur3_32!(UInt64Array);

impl Utf8Array {
pub fn murmur3_32(&self) -> DaftResult<Int32Array> {
let as_arrowed = self.as_arrow();
let has_nulls = as_arrowed
.validity()
.map(|v| v.unset_bits() > 0)
.unwrap_or(false);
if has_nulls {
murmur3_32_hash_from_iter_with_nulls(
self.name(),
as_arrowed.into_iter().map(|v| v.map(|v| v.as_bytes())),
)
} else {
murmur3_32_hash_from_iter_no_nulls(
self.name(),
as_arrowed.values_iter().map(|v| v.as_bytes()),
)
}
}
}

impl BinaryArray {
pub fn murmur3_32(&self) -> DaftResult<Int32Array> {
let as_arrowed = self.as_arrow();
let has_nulls = as_arrowed
.validity()
.map(|v| v.unset_bits() > 0)
.unwrap_or(false);
if has_nulls {
murmur3_32_hash_from_iter_with_nulls(self.name(), as_arrowed.into_iter())
} else {
murmur3_32_hash_from_iter_no_nulls(self.name(), as_arrowed.values_iter())
}
}
}

impl DateArray {
pub fn murmur3_32(&self) -> DaftResult<Int32Array> {
self.physical.murmur3_32()
}
}

impl TimestampArray {
pub fn murmur3_32(&self) -> DaftResult<Int32Array> {
let us = self.cast(&crate::DataType::Timestamp(
crate::datatypes::TimeUnit::Microseconds,
None,
))?;
us.timestamp()?.physical.murmur3_32()
}
}

impl Decimal128Array {
pub fn murmur3_32(&self) -> DaftResult<Int32Array> {
let arr = self.physical.as_arrow();
let hashes = arr.into_iter().map(|d| {
d.map(|d| {
let twos_compliment = u128::from_ne_bytes(d.to_ne_bytes());
let bits_needed = u128::BITS - twos_compliment.leading_zeros();
let bytes_needed = bits_needed.div_ceil(8) as usize;
let be_bytes = twos_compliment.to_be_bytes();
let unsigned =
mur3::murmurhash3_x86_32(&be_bytes[(be_bytes.len() - bytes_needed)..], 0);
i32::from_ne_bytes(unsigned.to_ne_bytes())
})
});
let array = Box::new(arrow2::array::Int32Array::from_iter(hashes));
Ok(Int32Array::from((self.name(), array)))
}
}

fn murmur3_32_hash_from_iter_with_nulls<B: AsRef<[u8]>>(
name: &str,
byte_iter: impl Iterator<Item = Option<B>>,
) -> DaftResult<Int32Array> {
let hashes = byte_iter.map(|b| {
b.map(|v| {
let unsigned = mur3::murmurhash3_x86_32(v.as_ref(), 0);
i32::from_ne_bytes(unsigned.to_ne_bytes())
})
});
let array = Box::new(arrow2::array::Int32Array::from_iter(hashes));
Ok(Int32Array::from((name, array)))
}

fn murmur3_32_hash_from_iter_no_nulls<B: AsRef<[u8]>>(
name: &str,
byte_iter: impl Iterator<Item = B>,
) -> DaftResult<Int32Array> {
let hashes = byte_iter
.map(|b| {
let unsigned = mur3::murmurhash3_x86_32(b.as_ref(), 0);
i32::from_ne_bytes(unsigned.to_ne_bytes())
})
.collect::<Vec<_>>();
Ok(Int32Array::from((name, hashes)))
}
2 changes: 1 addition & 1 deletion src/daft-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![feature(let_chains)]

#![feature(int_roundings)]
pub mod array;
pub mod count_mode;
pub mod datatypes;
Expand Down
3 changes: 3 additions & 0 deletions src/daft-core/src/python/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ impl PySeries {
pub fn partitioning_years(&self) -> PyResult<Self> {
Ok(self.series.partitioning_years()?.into())
}
pub fn murmur3_32(&self) -> PyResult<Self> {
Ok(self.series.murmur3_32()?.into_series().into())
}

pub fn list_lengths(&self) -> PyResult<Self> {
Ok(self.series.list_lengths()?.into_series().into())
Expand Down
14 changes: 13 additions & 1 deletion src/daft-core/src/series/ops/downcast.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::array::{FixedSizeListArray, ListArray, StructArray};
use crate::datatypes::logical::FixedShapeImageArray;
use crate::datatypes::logical::{DateArray, Decimal128Array, FixedShapeImageArray, TimestampArray};
use crate::datatypes::*;
use crate::series::array_impl::ArrayWrapper;
use crate::series::Series;
Expand Down Expand Up @@ -95,6 +95,18 @@ impl Series {
self.downcast()
}

pub fn date(&self) -> DaftResult<&DateArray> {
self.downcast()
}

pub fn timestamp(&self) -> DaftResult<&TimestampArray> {
self.downcast()
}

pub fn decimal128(&self) -> DaftResult<&Decimal128Array> {
self.downcast()
}

#[cfg(feature = "python")]
pub fn python(&self) -> DaftResult<&PythonArray> {
self.downcast()
Expand Down
26 changes: 25 additions & 1 deletion src/daft-core/src/series/ops/hash.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::{datatypes::UInt64Array, series::Series, with_match_comparable_daft_types};
use crate::{
datatypes::{Int32Array, UInt64Array},
series::Series,
with_match_comparable_daft_types,
};
use common_error::DaftResult;

impl Series {
Expand All @@ -9,4 +13,24 @@ impl Series {
downcasted.hash(seed)
})
}

pub fn murmur3_32(&self) -> DaftResult<Int32Array> {
use crate::DataType::*;
match self.data_type() {
Int8 => self.i8()?.murmur3_32(),
Int16 => self.i16()?.murmur3_32(),
Int32 => self.i32()?.murmur3_32(),
Int64 => self.i64()?.murmur3_32(),
UInt8 => self.u8()?.murmur3_32(),
UInt16 => self.u16()?.murmur3_32(),
UInt32 => self.u32()?.murmur3_32(),
UInt64 => self.u64()?.murmur3_32(),
Utf8 => self.utf8()?.murmur3_32(),
Binary => self.binary()?.murmur3_32(),
Date => self.date()?.murmur3_32(),
Timestamp(..) => self.timestamp()?.murmur3_32(),
Decimal128(..) => self.decimal128()?.murmur3_32(),
v => panic!("murmur3 hash not implemented for datatype: {v}"),
}
}
}
103 changes: 103 additions & 0 deletions tests/series/test_hash.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from __future__ import annotations

import decimal
from datetime import date, datetime

import numpy as np
import pytest
import pytz
import xxhash

from daft.datatype import DataType
Expand Down Expand Up @@ -114,3 +118,102 @@ def test_hash_int_array_with_bad_length():

with pytest.raises(ValueError, match="seed length does not match array length"):
arr.hash(bad_seed)


@pytest.mark.parametrize(
"dtype",
[
DataType.uint8(),
DataType.uint16(),
DataType.uint32(),
DataType.uint64(),
DataType.int8(),
DataType.int16(),
DataType.int32(),
DataType.int64(),
],
)
def test_murmur3_32_hash_int(dtype):
arr = Series.from_pylist([34, None]).cast(dtype)
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [2017239379, None]


@pytest.mark.parametrize(
"dtype",
[
DataType.int8(),
DataType.int16(),
DataType.int32(),
DataType.int64(),
],
)
def test_murmur3_32_hash_signed_int(dtype):
arr = Series.from_pylist([-1, 34, None]).cast(dtype)
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [1651860712, 2017239379, None]


def test_murmur3_32_hash_string():
arr = Series.from_pylist(["iceberg", None])
assert arr.datatype() == DataType.string()
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [1210000089, None]


def test_murmur3_32_hash_string():
arr = Series.from_pylist([b"\x00\x01\x02\x03", None])
assert arr.datatype() == DataType.binary()
hashes = arr.murmur3_32()
java_answer = -188683207
assert hashes.to_pylist() == [java_answer, None]


def test_murmur3_32_hash_date():
arr = Series.from_pylist([date(2017, 11, 16), None])
assert arr.datatype() == DataType.date()
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [-653330422, None]


def test_murmur3_32_hash_timestamp():
arr = Series.from_pylist([datetime(2017, 11, 16, 22, 31, 8), None])
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [-2047944441, None]


def test_murmur3_32_hash_timestamp_with_tz():
dt = datetime(2017, 11, 16, 14, 31, 8)
pst = pytz.timezone("US/Pacific")
dt = pst.localize(dt)
arr = Series.from_pylist([dt, None])
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [-2047944441, None]


def test_murmur3_32_hash_timestamp_with_tz_nanoseconds():
dt = datetime(2017, 11, 16, 14, 31, 8)
pst = pytz.timezone("US/Pacific")
dt = pst.localize(dt)
arr = Series.from_pylist([dt, None])
arr = arr.cast(DataType.timestamp("ns", "UTC"))
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [-2047944441, None]


def test_murmur3_32_hash_decimal_unscaled():
arr = Series.from_pylist([decimal.Decimal(1420), None])
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [-500754589, None]


def test_murmur3_32_hash_decimal_scaled():
arr = Series.from_pylist([decimal.Decimal("14.20"), None])
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [-500754589, None]


def test_murmur3_32_hash_decimal_full_scaled():
arr = Series.from_pylist([decimal.Decimal(".00001420"), None])
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [-500754589, None]

0 comments on commit 146e68b

Please sign in to comment.