From 1498a4ed294c36c6d6346d0a4785446566e009fd Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Wed, 29 Nov 2023 14:56:15 -0800 Subject: [PATCH] Add temporal type inference and parsing test coverage, misc. fixes and extension thereof. --- src/daft-decoding/src/deserialize.rs | 26 +++++- src/daft-decoding/src/inference.rs | 28 +++++-- src/daft-json/src/decoding.rs | 63 ++++++++++++-- src/daft-json/src/inference.rs | 118 ++++++++++++++++++--------- src/daft-json/src/metadata.rs | 69 +++++++++++++++- src/daft-json/src/read.rs | 16 +++- src/daft-json/test/dtypes.jsonl | 8 +- 7 files changed, 265 insertions(+), 63 deletions(-) diff --git a/src/daft-decoding/src/deserialize.rs b/src/daft-decoding/src/deserialize.rs index 0b1ba034bf..4a0762b254 100644 --- a/src/daft-decoding/src/deserialize.rs +++ b/src/daft-decoding/src/deserialize.rs @@ -23,6 +23,10 @@ pub(crate) const ALL_NAIVE_TIMESTAMP_FMTS: &[&str] = &[ ]; pub(crate) const ALL_TIMESTAMP_FMTS: &[&str] = &[ISO8601, RFC3339_WITH_SPACE]; +pub(crate) const ISO8601_DATE: &str = "%Y-%m-%d"; +pub(crate) const ISO8601_DATE_SLASHES: &str = "%Y/%m/%d"; +pub(crate) const ALL_NAIVE_DATE_FMTS: &[&str] = &[ISO8601_DATE, ISO8601_DATE_SLASHES]; + // Ideally this trait should not be needed and both `csv` and `csv_async` crates would share // the same `ByteRecord` struct. Unfortunately, they do not and thus we must use generics // over this trait and materialize the generics for each struct. @@ -152,6 +156,20 @@ fn deserialize_null(rows: &[B], _: usize) -> Box Option { + // TODO(Clark): Parse as all candidate formats in a single pass. + for i in 0..ALL_NAIVE_DATE_FMTS.len() { + let idx = (i + *fmt_idx) % ALL_NAIVE_DATE_FMTS.len(); + let fmt = ALL_NAIVE_DATE_FMTS[idx]; + if let Ok(dt) = chrono::NaiveDate::parse_from_str(string, fmt) { + *fmt_idx = idx; + return Some(dt); + } + } + None +} + #[inline] pub fn deserialize_naive_datetime( string: &str, @@ -237,13 +255,15 @@ pub fn deserialize_column( lexical_core::parse::(bytes).ok() }), Date32 => deserialize_primitive(rows, column, datatype, |bytes| { + let mut last_fmt_idx = 0; to_utf8(bytes) - .and_then(|x| x.parse::().ok()) + .and_then(|x| deserialize_naive_date(x, &mut last_fmt_idx)) .map(|x| x.num_days_from_ce() - temporal_conversions::EPOCH_DAYS_FROM_CE) }), Date64 => deserialize_primitive(rows, column, datatype, |bytes| { + let mut last_fmt_idx = 0; to_utf8(bytes) - .and_then(|x| x.parse::().ok()) + .and_then(|x| deserialize_naive_datetime(x, &mut last_fmt_idx)) .map(|x| x.timestamp_millis()) }), Time32(time_unit) => deserialize_primitive(rows, column, datatype, |bytes| { @@ -313,7 +333,7 @@ pub fn deserialize_column( } // Return the factor by how small is a time unit compared to seconds -fn get_factor_from_timeunit(time_unit: TimeUnit) -> u32 { +pub fn get_factor_from_timeunit(time_unit: TimeUnit) -> u32 { match time_unit { TimeUnit::Second => 1, TimeUnit::Millisecond => 1_000, diff --git a/src/daft-decoding/src/inference.rs b/src/daft-decoding/src/inference.rs index 08d6c86494..b91630c9a0 100644 --- a/src/daft-decoding/src/inference.rs +++ b/src/daft-decoding/src/inference.rs @@ -1,7 +1,7 @@ use arrow2::datatypes::{DataType, TimeUnit}; use chrono::Timelike; -use crate::deserialize::{ALL_NAIVE_TIMESTAMP_FMTS, ALL_TIMESTAMP_FMTS}; +use crate::deserialize::{ALL_NAIVE_DATE_FMTS, ALL_NAIVE_TIMESTAMP_FMTS, ALL_TIMESTAMP_FMTS}; /// Infers [`DataType`] from `bytes` /// # Implementation @@ -35,12 +35,15 @@ pub fn infer(bytes: &[u8]) -> arrow2::datatypes::DataType { pub fn infer_string(string: &str) -> DataType { if is_date(string) { DataType::Date32 - } else if is_time(string) { - DataType::Time32(TimeUnit::Millisecond) - } else if let Some(time_unit) = is_naive_datetime(string) { - DataType::Timestamp(time_unit, None) + } else if let Some(time_unit) = is_time(string) { + DataType::Time32(time_unit) } else if let Some((time_unit, offset)) = is_datetime(string) { + // NOTE: We try to parse as a non-naive datatime (with timezone information) first, + // since is_datetime() will return false if timezone information is not present in the string, + // while is_naive_datetime() will ignore timezone information in the string. DataType::Timestamp(time_unit, Some(offset)) + } else if let Some(time_unit) = is_naive_datetime(string) { + DataType::Timestamp(time_unit, None) } else { DataType::Utf8 } @@ -63,11 +66,20 @@ fn is_integer(bytes: &[u8]) -> bool { } fn is_date(string: &str) -> bool { - string.parse::().is_ok() + for fmt in ALL_NAIVE_DATE_FMTS { + if chrono::NaiveDate::parse_from_str(string, fmt).is_ok() { + return true; + } + } + false } -fn is_time(string: &str) -> bool { - string.parse::().is_ok() +fn is_time(string: &str) -> Option { + if let Ok(t) = string.parse::() { + let time_unit = nanoseconds_to_time_unit(t.nanosecond()); + return Some(time_unit); + } + None } fn is_naive_datetime(string: &str) -> Option { diff --git a/src/daft-json/src/decoding.rs b/src/daft-json/src/decoding.rs index a000fddbed..7c23a906fa 100644 --- a/src/daft-json/src/decoding.rs +++ b/src/daft-json/src/decoding.rs @@ -10,7 +10,11 @@ use arrow2::error::{Error, Result}; use arrow2::offset::Offsets; use arrow2::temporal_conversions; use arrow2::types::{f16, NativeType, Offset}; -use daft_decoding::deserialize::{deserialize_datetime, deserialize_naive_datetime}; +use chrono::{Datelike, Timelike}; +use daft_decoding::deserialize::{ + deserialize_datetime, deserialize_naive_date, deserialize_naive_datetime, + get_factor_from_timeunit, +}; use indexmap::IndexMap; use json_deserializer::{Number, Value}; @@ -135,20 +139,20 @@ fn deserialize_into<'a, A: Borrow>>(target: &mut Box } DataType::Int8 => deserialize_primitive_into::<_, i8>(target, rows, deserialize_int_into), DataType::Int16 => deserialize_primitive_into::<_, i16>(target, rows, deserialize_int_into), - DataType::Int32 - | DataType::Date32 - | DataType::Time32(_) - | DataType::Interval(IntervalUnit::YearMonth) => { + DataType::Int32 | DataType::Interval(IntervalUnit::YearMonth) => { deserialize_primitive_into::<_, i32>(target, rows, deserialize_int_into) } + DataType::Date32 | DataType::Time32(_) => { + deserialize_primitive_into::<_, i32>(target, rows, deserialize_date_into) + } DataType::Interval(IntervalUnit::DayTime) => { unimplemented!("There is no natural representation of DayTime in JSON.") } - DataType::Int64 | DataType::Date64 | DataType::Time64(_) | DataType::Duration(_) => { + DataType::Int64 | DataType::Duration(_) => { deserialize_primitive_into::<_, i64>(target, rows, deserialize_int_into) } - DataType::Timestamp(..) => { - deserialize_primitive_into::<_, i64>(target, rows, deserialize_timestamp_into) + DataType::Timestamp(..) | DataType::Date64 | DataType::Time64(_) => { + deserialize_primitive_into::<_, i64>(target, rows, deserialize_datetime_into) } DataType::UInt8 => deserialize_primitive_into::<_, u8>(target, rows, deserialize_int_into), DataType::UInt16 => { @@ -369,7 +373,34 @@ where } } -fn deserialize_timestamp_into<'a, A: Borrow>>( +fn deserialize_date_into<'a, A: Borrow>>( + target: &mut MutablePrimitiveArray, + rows: &[A], +) { + let dtype = target.data_type().clone(); + let mut last_fmt_idx = 0; + let iter = rows.iter().map(|row| match row.borrow() { + Value::Number(v) => Some(deserialize_int_single(*v)), + Value::String(v) => match dtype { + DataType::Time32(tu) => { + let factor = get_factor_from_timeunit(tu); + v.parse::().ok().map(|x| { + (x.hour() * 3_600 * factor + + x.minute() * 60 * factor + + x.second() * factor + + x.nanosecond() / (1_000_000_000 / factor)) as i32 + }) + } + DataType::Date32 => deserialize_naive_date(v, &mut last_fmt_idx) + .map(|x| x.num_days_from_ce() - temporal_conversions::EPOCH_DAYS_FROM_CE), + _ => unreachable!(), + }, + _ => None, + }); + target.extend_trusted_len(iter); +} + +fn deserialize_datetime_into<'a, A: Borrow>>( target: &mut MutablePrimitiveArray, rows: &[A], ) { @@ -378,6 +409,19 @@ fn deserialize_timestamp_into<'a, A: Borrow>>( let iter = rows.iter().map(|row| match row.borrow() { Value::Number(v) => Some(deserialize_int_single(*v)), Value::String(v) => match dtype { + DataType::Time64(tu) => { + let factor = get_factor_from_timeunit(tu) as u64; + v.parse::().ok().map(|x| { + (x.hour() as u64 * 3_600 * factor + + x.minute() as u64 * 60 * factor + + x.second() as u64 * factor + + x.nanosecond() as u64 / (1_000_000_000 / factor)) + as i64 + }) + } + DataType::Date64 => { + deserialize_naive_datetime(v, &mut last_fmt_idx).map(|x| x.timestamp_millis()) + } DataType::Timestamp(tu, None) => deserialize_naive_datetime(v, &mut last_fmt_idx) .and_then(|dt| match tu { TimeUnit::Second => Some(dt.timestamp()), @@ -386,6 +430,7 @@ fn deserialize_timestamp_into<'a, A: Borrow>>( TimeUnit::Nanosecond => dt.timestamp_nanos_opt(), }), DataType::Timestamp(tu, Some(ref tz)) => { + let tz = if tz == "Z" { "UTC" } else { tz }; let tz = temporal_conversions::parse_offset(tz).unwrap(); deserialize_datetime(v, &tz, &mut last_fmt_idx).and_then(|dt| match tu { TimeUnit::Second => Some(dt.timestamp()), diff --git a/src/daft-json/src/inference.rs b/src/daft-json/src/inference.rs index 69c186feb0..eb59470f54 100644 --- a/src/daft-json/src/inference.rs +++ b/src/daft-json/src/inference.rs @@ -71,7 +71,7 @@ fn infer_array(values: &[Value]) -> Result { let dt = if !types.is_empty() { let types = types.into_iter().collect::>(); - coerce_data_type(&types) + coerce_data_type(types) } else { DataType::Null }; @@ -101,7 +101,7 @@ pub(crate) fn column_types_map_to_fields( .map(|(name, dtype_set)| { let dtypes = dtype_set.into_iter().collect::>(); // Get consolidated dtype for column. - let dtype = coerce_data_type(dtypes.as_slice()); + let dtype = coerce_data_type(dtypes); arrow2::datatypes::Field::new(name, dtype, true) }) .collect::>() @@ -113,10 +113,10 @@ pub(crate) fn column_types_map_to_fields( /// * Lists and scalars are coerced to a list of a compatible scalar /// * Structs contain the union of all fields /// * All other types are coerced to `Utf8` -pub(crate) fn coerce_data_type + std::fmt::Debug>(datatypes: &[A]) -> DataType { +pub(crate) fn coerce_data_type(datatypes: Vec) -> DataType { // Drop null dtype from the dtype set. let datatypes = datatypes - .iter() + .into_iter() .filter(|dt| !matches!((*dt).borrow(), DataType::Null)) .collect::>(); @@ -124,10 +124,10 @@ pub(crate) fn coerce_data_type + std::fmt::Debug>(datatypes: return DataType::Null; } - let are_all_equal = datatypes.windows(2).all(|w| w[0].borrow() == w[1].borrow()); + let are_all_equal = datatypes.windows(2).all(|w| w[0] == w[1]); if are_all_equal { - return datatypes[0].borrow().clone(); + return datatypes.into_iter().next().unwrap(); } let are_all_structs = datatypes @@ -136,23 +136,23 @@ pub(crate) fn coerce_data_type + std::fmt::Debug>(datatypes: if are_all_structs { // All structs => union of all field dtypes (these may have equal names). - let fields = datatypes.iter().fold(vec![], |mut acc, dt| { - if let DataType::Struct(new_fields) = (*dt).borrow() { + let fields = datatypes.into_iter().fold(vec![], |mut acc, dt| { + if let DataType::Struct(new_fields) = dt { acc.extend(new_fields); }; acc }); // Group fields by unique names. - let fields = fields.iter().fold( - IndexMap::<&String, HashSet<&DataType>>::new(), + let fields = fields.into_iter().fold( + IndexMap::>::new(), |mut acc, field| { - match acc.entry(&field.name) { + match acc.entry(field.name) { indexmap::map::Entry::Occupied(mut v) => { - v.get_mut().insert(&field.data_type); + v.get_mut().insert(field.data_type); } indexmap::map::Entry::Vacant(v) => { let mut a = HashSet::new(); - a.insert(&field.data_type); + a.insert(field.data_type); v.insert(a); } } @@ -164,32 +164,76 @@ pub(crate) fn coerce_data_type + std::fmt::Debug>(datatypes: .into_iter() .map(|(name, dts)| { let dts = dts.into_iter().collect::>(); - Field::new(name, coerce_data_type(&dts), true) + Field::new(name, coerce_data_type(dts), true) }) .collect(); return DataType::Struct(fields); - } else if datatypes.len() > 2 { - // TODO(Clark): Return an error for uncoercible types. - return DataType::Utf8; } - let (lhs, rhs) = (datatypes[0].borrow(), datatypes[1].borrow()); - - return match (lhs, rhs) { - (lhs, rhs) if lhs == rhs => lhs.clone(), - (DataType::List(lhs), DataType::List(rhs)) => { - let inner = coerce_data_type(&[lhs.data_type(), rhs.data_type()]); - DataType::List(Box::new(Field::new(ITEM_NAME, inner, true))) - } - (scalar, DataType::List(list)) | (DataType::List(list), scalar) => { - let inner = coerce_data_type(&[scalar, list.data_type()]); - DataType::List(Box::new(Field::new(ITEM_NAME, inner, true))) - } - (DataType::Float64, DataType::Int64) | (DataType::Int64, DataType::Float64) => { - DataType::Float64 - } - (DataType::Int64, DataType::Boolean) | (DataType::Boolean, DataType::Int64) => { - DataType::Int64 - } - (_, _) => DataType::Utf8, - }; + datatypes + .into_iter() + .reduce(|lhs, rhs| { + match (lhs, rhs) { + (lhs, rhs) if lhs == rhs => lhs, + (DataType::Utf8, _) | (_, DataType::Utf8) => DataType::Utf8, + (DataType::List(lhs), DataType::List(rhs)) => { + let inner = + coerce_data_type(vec![lhs.data_type().clone(), rhs.data_type().clone()]); + DataType::List(Box::new(Field::new(ITEM_NAME, inner, true))) + } + (scalar, DataType::List(list)) | (DataType::List(list), scalar) => { + let inner = coerce_data_type(vec![scalar, list.data_type().clone()]); + DataType::List(Box::new(Field::new(ITEM_NAME, inner, true))) + } + (DataType::Float64, DataType::Int64) | (DataType::Int64, DataType::Float64) => { + DataType::Float64 + } + (DataType::Int64, DataType::Boolean) | (DataType::Boolean, DataType::Int64) => { + DataType::Int64 + } + (DataType::Time32(left_tu), DataType::Time32(right_tu)) => { + // Set unified time unit to the highest granularity time unit. + let unified_tu = if left_tu == right_tu + || time_unit_to_ordinal(&left_tu) > time_unit_to_ordinal(&right_tu) + { + left_tu + } else { + right_tu + }; + DataType::Time32(unified_tu) + } + ( + DataType::Timestamp(left_tu, left_tz), + DataType::Timestamp(right_tu, right_tz), + ) => { + // Set unified time unit to the highest granularity time unit. + let unified_tu = if left_tu == right_tu + || time_unit_to_ordinal(&left_tu) > time_unit_to_ordinal(&right_tu) + { + left_tu + } else { + right_tu + }; + // Set unified time zone to UTC. + let unified_tz = if left_tz == right_tz { + left_tz.clone() + } else { + Some("Z".to_string()) + }; + DataType::Timestamp(unified_tu, unified_tz) + } + (_, _) => DataType::Utf8, + } + }) + .unwrap() +} + +fn time_unit_to_ordinal(tu: &arrow2::datatypes::TimeUnit) -> usize { + use arrow2::datatypes::TimeUnit; + + match tu { + TimeUnit::Second => 0, + TimeUnit::Millisecond => 1, + TimeUnit::Microsecond => 2, + TimeUnit::Nanosecond => 3, + } } diff --git a/src/daft-json/src/metadata.rs b/src/daft-json/src/metadata.rs index 493da274c1..10cd46108e 100644 --- a/src/daft-json/src/metadata.rs +++ b/src/daft-json/src/metadata.rs @@ -200,7 +200,11 @@ mod tests { use std::sync::Arc; use common_error::DaftResult; - use daft_core::{datatypes::Field, schema::Schema, DataType}; + use daft_core::{ + datatypes::{Field, TimeUnit}, + schema::Schema, + DataType, + }; use daft_io::{IOClient, IOConfig}; use rstest::rstest; @@ -256,6 +260,69 @@ mod tests { Ok(()) } + #[rstest] + fn test_json_schema_local_dtypes() -> DaftResult<()> { + let file = format!("{}/test/dtypes.jsonl", env!("CARGO_MANIFEST_DIR"),); + + let mut io_config = IOConfig::default(); + io_config.s3.anonymous = true; + + let io_client = Arc::new(IOClient::new(io_config.into())?); + + let schema = read_json_schema(file.as_ref(), None, None, io_client, None)?; + assert_eq!( + schema, + Schema::new(vec![ + Field::new("int", DataType::Int64), + Field::new("float", DataType::Float64), + Field::new("bool", DataType::Boolean), + Field::new("str", DataType::Utf8), + Field::new("null", DataType::Null), + Field::new("date", DataType::Date), + // TODO(Clark): Add coverage for time parsing once we add support for representing time series in Daft. + // // Timezone should be finest granularity found in file, i.e. nanoseconds. + // Field::new("time", DataType::Time(TimeUnit::Nanoseconds)), + // Timezone should be finest granularity found in file, i.e. microseconds. + Field::new( + "naive_timestamp", + DataType::Timestamp(TimeUnit::Microseconds, None) + ), + // Timezone should be UTC due to field having multiple different timezones across records. + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Milliseconds, Some("Z".to_string())) + ), + Field::new("list", DataType::List(Box::new(DataType::Int64))), + Field::new( + "obj", + DataType::Struct(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Boolean) + ]) + ), + Field::new( + "nested_list", + DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(vec![ + Field::new("a", DataType::Utf8), + ]))))) + ), + Field::new( + "nested_obj", + DataType::Struct(vec![ + Field::new( + "obj", + DataType::Struct(vec![Field::new("a", DataType::Int64)]) + ), + Field::new("list", DataType::List(Box::new(DataType::Int64))), + ]) + ), + ])? + .into(), + ); + + Ok(()) + } + #[test] fn test_json_schema_local_nulls() -> DaftResult<()> { let file = format!("{}/test/iris_tiny_nulls.jsonl", env!("CARGO_MANIFEST_DIR"),); diff --git a/src/daft-json/src/read.rs b/src/daft-json/src/read.rs index 94658e587e..d9f485a119 100644 --- a/src/daft-json/src/read.rs +++ b/src/daft-json/src/read.rs @@ -368,7 +368,7 @@ mod tests { use common_error::DaftResult; use daft_core::{ - datatypes::Field, + datatypes::{Field, TimeUnit}, schema::Schema, utils::arrow::{cast_array_for_daft_if_needed, cast_array_from_daft_if_needed}, DataType, @@ -534,6 +534,20 @@ mod tests { Field::new("bool", DataType::Boolean), Field::new("str", DataType::Utf8), Field::new("null", DataType::Null), + Field::new("date", DataType::Date), + // TODO(Clark): Add coverage for time parsing once we add support for representing time series in Daft. + // // Timezone should be finest granularity found in file, i.e. nanoseconds. + // Field::new("time", DataType::Time(TimeUnit::Nanoseconds)), + // Timezone should be finest granularity found in file, i.e. microseconds. + Field::new( + "naive_timestamp", + DataType::Timestamp(TimeUnit::Microseconds, None) + ), + // Timezone should be UTC due to field having multiple different timezones across records. + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Milliseconds, Some("Z".to_string())) + ), Field::new("list", DataType::List(Box::new(DataType::Int64))), Field::new( "obj", diff --git a/src/daft-json/test/dtypes.jsonl b/src/daft-json/test/dtypes.jsonl index 1b15ea15c6..118f19d73c 100644 --- a/src/daft-json/test/dtypes.jsonl +++ b/src/daft-json/test/dtypes.jsonl @@ -1,4 +1,4 @@ -{"int": 1, "float": 2.3, "bool": false, "str": "foo", "null": null, "list": [1, 2, 3], "obj": {"a": 1, "b": false}, "nested_list": [[{"a": "foo"}]], "nested_obj": {"obj": {"a": 4}, "list": [1, 2]}} -{"int": 2, "float": 3.3, "bool": true, "str": "bar", "null": null, "list": [4, 5], "obj": {"a": 2, "b": false}, "nested_list": [[{"a": "bar"}]], "nested_obj": {"obj": {"a": 6}, "list": [3, 4]}} -{"int": null, "float": null, "bool": null, "str": null, "null": null, "list": null, "obj": null, "nested_list": null, "nested_obj": null} -{"int": 3, "float": 4.3, "bool": false, "str": "baz", "null": null, "list": [6, 7, null, 9], "obj": {"a": null, "b": false}, "nested_list": [[{"a": null}]], "nested_obj": {"obj": {"a": null}, "list": [5, null]}} +{"int": 1, "float": 2.3, "bool": false, "str": "foo", "null": null, "date": "2023-11-29", "naive_timestamp": "2023-11-29T06:31:52", "timestamp": "2023-11-29T06:31:52Z", "list": [1, 2, 3], "obj": {"a": 1, "b": false}, "nested_list": [[{"a": "foo"}]], "nested_obj": {"obj": {"a": 4}, "list": [1, 2]}} +{"int": 2, "float": 3.3, "bool": true, "str": "bar", "null": null, "date": "2023/11/28", "naive_timestamp": "2023-11-29T06:31:52.342", "timestamp": "2023-11-29T06:31:52.342+07:00", "list": [4, 5], "obj": {"a": 2, "b": false}, "nested_list": [[{"a": "bar"}]], "nested_obj": {"obj": {"a": 6}, "list": [3, 4]}} +{"int": null, "float": null, "bool": null, "str": null, "null": null, "date": null, "naive_timestamp": null, "timestamp": null, "list": null, "obj": null, "nested_list": null, "nested_obj": null} +{"int": 3, "float": 4.3, "bool": false, "str": "baz", "null": null, "date": "2023-11-27", "naive_timestamp": "2023-11-29 06:31:52.342567", "timestamp": "2023-11-29 06:31:52.342-07:00", "list": [6, 7, null, 9], "obj": {"a": null, "b": false}, "nested_list": [[{"a": null}]], "nested_obj": {"obj": {"a": null}, "list": [5, null]}}