Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-ho committed Feb 23, 2024
1 parent 357f433 commit bde1d35
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 18 deletions.
2 changes: 1 addition & 1 deletion daft/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def date(cls) -> DataType:

@classmethod
def time(cls, timeunit: TimeUnit | str) -> DataType:
"""Time DataType."""
"""Time DataType. Supported timeunits are "us", "ns"."""
if isinstance(timeunit, str):
timeunit = TimeUnit.from_str(timeunit)
return cls._from_pydatatype(PyDataType.time(timeunit._timeunit))
Expand Down
1 change: 1 addition & 0 deletions src/daft-core/src/array/ops/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ impl TimestampArray {
match dtype {
DataType::Timestamp(..) => arrow_logical_cast(self, dtype),
DataType::Date => Ok(self.date()?.into_series()),
DataType::Time(tu) => Ok(self.time(tu)?.into_series()),
DataType::Utf8 => {
let DataType::Timestamp(unit, timezone) = self.data_type() else {
panic!("Wrong dtype for TimestampArray: {}", self.data_type())
Expand Down
76 changes: 74 additions & 2 deletions src/daft-core/src/array/ops/date.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
datatypes::{
logical::{DateArray, TimestampArray},
Field, Int32Array, UInt32Array,
logical::{DateArray, TimeArray, TimestampArray},
Field, Int32Array, Int64Array, TimeUnit, UInt32Array,
},
DataType,
};
Expand Down Expand Up @@ -108,6 +108,78 @@ impl TimestampArray {
))
}

pub fn time(&self, timeunit_for_cast: &TimeUnit) -> DaftResult<TimeArray> {
let physical = self.physical.as_arrow();
let DataType::Timestamp(timeunit, tz) = self.data_type() else {
unreachable!("Timestamp array must have Timestamp datatype")
};
let tu = timeunit.to_arrow();
let timeunit_for_cast = if timeunit_for_cast == &TimeUnit::Nanoseconds {
TimeUnit::Nanoseconds
} else {
TimeUnit::Microseconds // default to microseconds
};
let time_arrow = match tz {
Some(tz) => match arrow2::temporal_conversions::parse_offset(tz) {
Ok(tz) => Ok(arrow2::array::PrimitiveArray::<i64>::from_iter(
physical.iter().map(|ts| {
ts.map(|ts| {
let dt =
arrow2::temporal_conversions::timestamp_to_datetime(*ts, tu, &tz);
match timeunit_for_cast {
TimeUnit::Nanoseconds => {
let hour = dt.hour() as i64 * 3_600_000_000_000;
let minute = dt.minute() as i64 * 60_000_000_000;
let second = dt.second() as i64 * 1_000_000_000;
let nanosecond = dt.nanosecond() as i64;
hour + minute + second + nanosecond
}
_ => {
let hour = dt.hour() as i64 * 3_600_000_000;
let minute = dt.minute() as i64 * 60_000_000;
let second = dt.second() as i64 * 1_000_000;
let microsecond = dt.nanosecond() as i64 / 1_000;
hour + minute + second + microsecond
}
}
})
}),
)),
Err(e) => Err(DaftError::TypeError(format!(
"Cannot parse timezone in Timestamp datatype: {}, error: {}",
tz, e
))),
},
None => Ok(arrow2::array::PrimitiveArray::<i64>::from_iter(
physical.iter().map(|ts| {
ts.map(|ts| {
let dt = arrow2::temporal_conversions::timestamp_to_naive_datetime(*ts, tu);
match timeunit_for_cast {
TimeUnit::Nanoseconds => {
let hour = dt.hour() as i64 * 3_600_000_000_000;
let minute = dt.minute() as i64 * 60_000_000_000;
let second = dt.second() as i64 * 1_000_000_000;
let nanosecond = dt.nanosecond() as i64;
hour + minute + second + nanosecond
}
_ => {
let hour = dt.hour() as i64 * 3_600_000_000;
let minute = dt.minute() as i64 * 60_000_000;
let second = dt.second() as i64 * 1_000_000;
let microsecond = dt.nanosecond() as i64 / 1_000;
hour + minute + second + microsecond
}
}
})
}),
)),
}?;
Ok(TimeArray::new(
Field::new(self.name(), DataType::Time(timeunit_for_cast)),
Int64Array::from((self.name(), Box::new(time_arrow))),
))
}

pub fn hour(&self) -> DaftResult<UInt32Array> {
let physical = self.physical.as_arrow();
let DataType::Timestamp(timeunit, tz) = self.data_type() else {
Expand Down
5 changes: 4 additions & 1 deletion src/daft-core/src/array/ops/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ impl DateArray {

impl TimeArray {
pub fn murmur3_32(&self) -> DaftResult<Int32Array> {
self.physical.murmur3_32()
let us = self.cast(&crate::DataType::Time(
crate::datatypes::TimeUnit::Microseconds,
))?;
us.time()?.physical.murmur3_32()
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/expressions/test_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
(None, DataType.null()),
(Series.from_pylist([1, 2, 3]), DataType.int64()),
(date(2023, 1, 1), DataType.date()),
(time(1, 2, 3), DataType.time(timeunit=TimeUnit.from_str("us"))),
(time(1, 2, 3, 4), DataType.time(timeunit=TimeUnit.from_str("us"))),
(datetime(2023, 1, 1), DataType.timestamp(timeunit=TimeUnit.from_str("us"))),
(datetime(2022, 1, 1, tzinfo=pytz.utc), DataType.timestamp(timeunit=TimeUnit.from_str("us"), timezone="UTC")),
],
Expand Down
6 changes: 6 additions & 0 deletions tests/io/test_csv_roundtrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
DataType.time(TimeUnit.us()),
DataType.time(TimeUnit.us()),
),
(
[datetime.time(1, 2, 3, 4), datetime.time(5, 6, 7, 8), None],
pa.time64("ns"),
DataType.time(TimeUnit.ns()),
DataType.time(TimeUnit.us()),
),
(
[datetime.datetime(1994, 1, 1), datetime.datetime(1995, 1, 1), None],
pa.timestamp("ms"),
Expand Down
5 changes: 5 additions & 0 deletions tests/io/test_parquet_roundtrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
pa.time64("us"),
DataType.time(TimeUnit.us()),
),
(
[datetime.time(12, 1, 22, 4), datetime.time(13, 8, 45, 34), None],
pa.time64("ns"),
DataType.time(TimeUnit.ns()),
),
(
[datetime.datetime(1994, 1, 1), datetime.datetime(1995, 1, 1), None],
pa.timestamp("ms"),
Expand Down
19 changes: 19 additions & 0 deletions tests/series/test_cast.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,3 +773,22 @@ def test_cast_date_to_timestamp():

back = casted.dt.date()
assert (input == back).to_pylist() == [True]


def test_cast_timestamp_to_time():
from datetime import datetime, time

"""Microseconds"""
input = Series.from_pylist([datetime(2022, 1, 6, 12, 34, 56, 78)])
casted = input.cast(DataType.time("us"))
assert casted.to_pylist() == [time(12, 34, 56, 78)]

"""Nanoseconds"""
input = Series.from_pylist([datetime(2022, 1, 6, 12, 34, 56, 78)])
casted = input.cast(DataType.time("ns"))
assert casted.to_pylist() == [time(12, 34, 56, 78)]

"""Seconds"""
input = Series.from_pylist([datetime(2022, 1, 6, 12, 34, 56, 78)])
casted = input.cast(DataType.time("s"))
assert casted.to_pylist() == [time(12, 34, 56, 78)]
10 changes: 9 additions & 1 deletion tests/series/test_hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,20 @@ def test_murmur3_32_hash_date():


def test_murmur3_32_hash_time():
arr = Series.from_pylist([time(22, 31, 8), None])
arr = Series.from_pylist([time(22, 31, 8, 0), None])
assert arr.datatype() == DataType.time("us")
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [-662762989, None]


def test_murmur3_32_hash_time_nanoseconds():
arr = Series.from_pylist([time(22, 31, 8, 0), None])
arr = arr.cast(DataType.time("ns"))
assert arr.datatype() == DataType.time("ns")
hashes = arr.murmur3_32()
assert hashes.to_pylist() == [-662762989, None]


def test_murmur3_32_hash_timestamp():
arr = Series.from_pylist([datetime(2017, 11, 16, 22, 31, 8), None])
hashes = arr.murmur3_32()
Expand Down
11 changes: 6 additions & 5 deletions tests/series/test_size_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,20 @@ def test_series_date_size_bytes(size, with_nulls) -> None:

@pytest.mark.parametrize("size", [0, 1, 2, 8, 9, 16])
@pytest.mark.parametrize("with_nulls", [True, False])
def test_series_time_size_bytes(size, with_nulls) -> None:
@pytest.mark.parametrize("precision", ["us", "ns"])
def test_series_time_size_bytes(size, with_nulls, precision) -> None:
from datetime import time

pydata = [time(i, 0, 0) for i in range(size)]
pydata = [time(i, i, i, i) for i in range(size)]

if with_nulls and size > 0:
data = pa.array(pydata[:-1] + [None], pa.time64("us"))
data = pa.array(pydata[:-1] + [None], pa.time64(precision))
else:
data = pa.array(pydata, pa.time64("us"))
data = pa.array(pydata, pa.time64(precision))

s = Series.from_arrow(data)

assert s.datatype() == DataType.time("us")
assert s.datatype() == DataType.time(precision)
assert s.size_bytes() == get_total_buffer_size(data)


Expand Down
5 changes: 3 additions & 2 deletions tests/series/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def date_maker(d):
assert taken.to_pylist() == sorted_order[::-1]


def test_series_time_sorting() -> None:
@pytest.mark.parametrize("timeunit", ["us", "ns"])
def test_series_time_sorting(timeunit) -> None:
from datetime import time

def time_maker(h, m, s, us):
Expand All @@ -125,7 +126,7 @@ def time_maker(h, m, s, us):
sorted_order = list(
map(time_maker, [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [1, 2, 4, 5, None, None])
)
s = s.cast(DataType.time("us"))
s = s.cast(DataType.time(timeunit))
s_sorted = s.sort()
assert len(s_sorted) == len(s)
assert s_sorted.datatype() == s.datatype()
Expand Down
6 changes: 4 additions & 2 deletions tests/series/test_take.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def date_maker(d):
assert taken.to_pylist() == days[::-1]


def test_series_time_take() -> None:
@pytest.mark.parametrize("time_unit", ["us", "ns"])
def test_series_time_take(time_unit) -> None:
from datetime import time

def time_maker(h, m, s, us):
Expand All @@ -55,8 +56,9 @@ def time_maker(h, m, s, us):

times = list(map(time_maker, [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [5, 4, 1, None, 2, None]))
s = Series.from_pylist(times)
s = s.cast(DataType.time(time_unit))
taken = s.take(Series.from_pylist([5, 4, 3, 2, 1, 0]))
assert taken.datatype() == DataType.time("us")
assert taken.datatype() == DataType.time(time_unit)
assert taken.to_pylist() == times[::-1]


Expand Down
8 changes: 5 additions & 3 deletions tests/table/test_from_py.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"str": ["foo", "bar"],
"binary": [b"foo", b"bar"],
"date": [datetime.date.today(), datetime.date.today()],
"time": [datetime.time(1, 2, 3), datetime.time(4, 5, 6)],
"time": [datetime.time(1, 2, 3, 4), datetime.time(5, 6, 7, 8)],
"list": [[1, 2], [3]],
"struct": [{"a": 1, "b": 2.0}, {"b": 3.0}],
"empty_struct": [{}, {}],
Expand Down Expand Up @@ -94,7 +94,8 @@
"binary": pa.array(PYTHON_TYPE_ARRAYS["binary"], pa.binary()),
"boolean": pa.array(PYTHON_TYPE_ARRAYS["bool"], pa.bool_()),
"date32": pa.array(PYTHON_TYPE_ARRAYS["date"], pa.date32()),
"time64": pa.array(PYTHON_TYPE_ARRAYS["time"], pa.time64("us")),
"time64_microseconds": pa.array(PYTHON_TYPE_ARRAYS["time"], pa.time64("us")),
"time64_nanoseconds": pa.array(PYTHON_TYPE_ARRAYS["time"], pa.time64("ns")),
"list": pa.array(PYTHON_TYPE_ARRAYS["list"], pa.list_(pa.int64())),
"fixed_size_list": pa.array([[1, 2], [3, 4]], pa.list_(pa.int64(), 2)),
"struct": pa.array(PYTHON_TYPE_ARRAYS["struct"], pa.struct([("a", pa.int64()), ("b", pa.float64())])),
Expand Down Expand Up @@ -144,7 +145,8 @@
"binary": pa.large_binary(),
"boolean": pa.bool_(),
"date32": pa.date32(),
"time64": pa.time64("us"),
"time64_microseconds": pa.time64("us"),
"time64_nanoseconds": pa.time64("ns"),
"list": pa.large_list(pa.int64()),
"fixed_size_list": pa.list_(pa.int64(), 2),
"struct": pa.struct([("a", pa.int64()), ("b", pa.float64())]),
Expand Down

0 comments on commit bde1d35

Please sign in to comment.