Skip to content

Commit

Permalink
[BUG] [CSV Reader] Fix CSV parsing bugs around nulls and timestamps. (#…
Browse files Browse the repository at this point in the history
…1523)

This PR fixes bugs around parsing nulls and timestamps. Namely, we fork
Arrow2's CSV parsing (pulling it into source) in order to make some
fixes and opinionated semantics changes.

## Summary of Arrow2 Parsing Changes
- Adds support for `DataType::Null` dtype parsing, i.e. where if a
`Null` dtype is specified by the user or inferred by schema inference,
we will parse such null columns as all null without scanning the data.
- Fixes timestamp parsing to support ISO 8601, RFC 3339, and variants
thereof sans time zone (offset) and sans fractional seconds; this should
cover common variants of the mentioned standards, as well as the format
that Arrow writes CSV in (RFC 3339 with space date/time separator +
optional time zone + optional fractional seconds).
- Propagates the minimum required time granularity determined while
parsing the timestamp during schema inference to be the `Timestamp()`
timeunit.
  • Loading branch information
clarkzinzow authored Oct 25, 2023
1 parent 03d27f0 commit 1a4684b
Show file tree
Hide file tree
Showing 10 changed files with 609 additions and 50 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ async-compat = "0.2.1"
async-compression = {version = "0.4.4", features = ["tokio", "all-algorithms"]}
async-stream = "0.3.5"
bytes = "1.4.0"
chrono = "0.4.26"
chrono-tz = "0.8.3"
futures = "0.3.28"
html-escape = "0.2.13"
indexmap = "2.0.0"
Expand Down
4 changes: 2 additions & 2 deletions src/daft-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
arrow2 = {workspace = true, features = ["chrono-tz", "compute_take", "compute_cast", "compute_aggregate", "compute_if_then_else", "compute_sort", "compute_filter", "compute_temporal", "compute_comparison", "compute_arithmetics", "compute_concatenate", "io_ipc"]}
base64 = "0.21.2"
bincode = {workspace = true}
chrono = "0.4.26"
chrono-tz = "0.8.3"
chrono = {workspace = true}
chrono-tz = {workspace = true}
common-error = {path = "../common/error", default-features = false}
dyn-clone = "1.0.12"
fnv = "1.0.7"
Expand Down
4 changes: 4 additions & 0 deletions src/daft-csv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ async-compat = {workspace = true}
async-compression = {workspace = true}
async-stream = {workspace = true}
bytes = {workspace = true}
chrono = {workspace = true}
chrono-tz = {workspace = true}
common-error = {path = "../common/error", default-features = false}
csv-async = "1.2.6"
daft-core = {path = "../daft-core", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
lexical-core = {version = "0.8"}
log = {workspace = true}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true, optional = true}
rayon = {workspace = true}
simdutf8 = "0.1.3"
snafu = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}
Expand Down
320 changes: 320 additions & 0 deletions src/daft-csv/src/deserialize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
use arrow2::{
array::*,
datatypes::*,
error::{Error, Result},
offset::Offset,
temporal_conversions,
types::NativeType,
};
use chrono::{Datelike, Timelike};
use csv_async::ByteRecord;

pub(crate) const ISO8601: &str = "%+";
pub(crate) const ISO8601_NO_TIME_ZONE: &str = "%Y-%m-%dT%H:%M:%S%.f";
pub(crate) const ISO8601_NO_TIME_ZONE_NO_FRACTIONAL: &str = "%Y-%m-%dT%H:%M:%S";
pub(crate) const RFC3339_WITH_SPACE: &str = "%Y-%m-%d %H:%M:%S%.f%:z";
pub(crate) const RFC3339_WITH_SPACE_NO_TIME_ZONE: &str = "%Y-%m-%d %H:%M:%S%.f";
pub(crate) const RFC3339_WITH_SPACE_NO_TIME_ZONE_NO_FRACTIONAL: &str = "%Y-%m-%d %H:%M:%S";
pub(crate) const ALL_NAIVE_TIMESTAMP_FMTS: &[&str] = &[
ISO8601_NO_TIME_ZONE,
ISO8601_NO_TIME_ZONE_NO_FRACTIONAL,
RFC3339_WITH_SPACE_NO_TIME_ZONE,
RFC3339_WITH_SPACE_NO_TIME_ZONE_NO_FRACTIONAL,
];
pub(crate) const ALL_TIMESTAMP_FMTS: &[&str] = &[ISO8601, RFC3339_WITH_SPACE];

// Ideally this trait should not be needed and both `csv` and `csv_async` crates would share
// the same `ByteRecord` struct. Unfortunately, they do not and thus we must use generics
// over this trait and materialize the generics for each struct.
pub(crate) trait ByteRecordGeneric {
fn get(&self, index: usize) -> Option<&[u8]>;
}

impl ByteRecordGeneric for ByteRecord {
#[inline]
fn get(&self, index: usize) -> Option<&[u8]> {
self.get(index)
}
}

#[inline]
fn to_utf8(bytes: &[u8]) -> Option<&str> {
simdutf8::basic::from_utf8(bytes).ok()
}

#[inline]
fn deserialize_primitive<T, B: ByteRecordGeneric, F>(
rows: &[B],
column: usize,
datatype: DataType,
mut op: F,
) -> Box<dyn Array>
where
T: NativeType + lexical_core::FromLexical,
F: FnMut(&[u8]) -> Option<T>,
{
let iter = rows.iter().map(|row| match row.get(column) {
Some(bytes) => {
if bytes.is_empty() {
return None;
}
op(bytes)
}
None => None,
});
Box::new(PrimitiveArray::<T>::from_trusted_len_iter(iter).to(datatype))
}

#[inline]
fn significant_bytes(bytes: &[u8]) -> usize {
bytes.iter().map(|byte| (*byte != b'0') as usize).sum()
}

/// Deserializes bytes to a single i128 representing a decimal
/// The decimal precision and scale are not checked.
#[inline]
fn deserialize_decimal(bytes: &[u8], precision: usize, scale: usize) -> Option<i128> {
let mut a = bytes.split(|x| *x == b'.');
let lhs = a.next();
let rhs = a.next();
match (lhs, rhs) {
(Some(lhs), Some(rhs)) => lexical_core::parse::<i128>(lhs).ok().and_then(|x| {
lexical_core::parse::<i128>(rhs)
.ok()
.map(|y| (x, lhs, y, rhs))
.and_then(|(lhs, lhs_b, rhs, rhs_b)| {
let lhs_s = significant_bytes(lhs_b);
let rhs_s = significant_bytes(rhs_b);
if lhs_s + rhs_s > precision || rhs_s > scale {
None
} else {
Some((lhs, rhs, rhs_s))
}
})
.map(|(lhs, rhs, rhs_s)| lhs * 10i128.pow(rhs_s as u32) + rhs)
}),
(None, Some(rhs)) => {
if rhs.len() != precision || rhs.len() != scale {
return None;
}
lexical_core::parse::<i128>(rhs).ok()
}
(Some(lhs), None) => {
if lhs.len() != precision || scale != 0 {
return None;
}
lexical_core::parse::<i128>(lhs).ok()
}
(None, None) => None,
}
}

#[inline]
fn deserialize_boolean<B, F>(rows: &[B], column: usize, op: F) -> Box<dyn Array>
where
B: ByteRecordGeneric,
F: Fn(&[u8]) -> Option<bool>,
{
let iter = rows.iter().map(|row| match row.get(column) {
Some(bytes) => {
if bytes.is_empty() {
return None;
}
op(bytes)
}
None => None,
});
Box::new(BooleanArray::from_trusted_len_iter(iter))
}

#[inline]
fn deserialize_utf8<O: Offset, B: ByteRecordGeneric>(rows: &[B], column: usize) -> Box<dyn Array> {
let iter = rows.iter().map(|row| match row.get(column) {
Some(bytes) => to_utf8(bytes),
None => None,
});
Box::new(Utf8Array::<O>::from_trusted_len_iter(iter))
}

#[inline]
fn deserialize_binary<O: Offset, B: ByteRecordGeneric>(
rows: &[B],
column: usize,
) -> Box<dyn Array> {
let iter = rows.iter().map(|row| row.get(column));
Box::new(BinaryArray::<O>::from_trusted_len_iter(iter))
}

#[inline]
fn deserialize_null<B: ByteRecordGeneric>(rows: &[B], _: usize) -> Box<dyn Array> {
// TODO(Clark): Once we add strict parsing mode, where failure to parse to an intended dtype fails instead
// of resulting in a null, add an explicit parsing pass over the column.
Box::new(NullArray::new(DataType::Null, rows.len()))
}

#[inline]
fn deserialize_naive_datetime(string: &str, fmt_idx: &mut usize) -> Option<chrono::NaiveDateTime> {
// TODO(Clark): Parse as all candidate formats in a single pass.
for i in 0..ALL_NAIVE_TIMESTAMP_FMTS.len() {
let idx = (i + *fmt_idx) % ALL_NAIVE_TIMESTAMP_FMTS.len();
let fmt = ALL_NAIVE_TIMESTAMP_FMTS[idx];
if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(string, fmt) {
*fmt_idx = idx;
return Some(dt);
}
}
None
}

#[inline]
fn deserialize_datetime<T: chrono::TimeZone>(
string: &str,
tz: &T,
fmt_idx: &mut usize,
) -> Option<chrono::DateTime<T>> {
// TODO(Clark): Parse as all candidate formats in a single pass.
for i in 0..ALL_NAIVE_TIMESTAMP_FMTS.len() {
let idx = (i + *fmt_idx) % ALL_NAIVE_TIMESTAMP_FMTS.len();
let fmt = ALL_NAIVE_TIMESTAMP_FMTS[idx];
if let Ok(dt) = chrono::DateTime::parse_from_str(string, fmt) {
*fmt_idx = idx;
return Some(dt.with_timezone(tz));
}
}
None
}

/// Deserializes `column` of `rows` into an [`Array`] of [`DataType`] `datatype`.
#[inline]
pub(crate) fn deserialize_column<B: ByteRecordGeneric>(
rows: &[B],
column: usize,
datatype: DataType,
_line_number: usize,
) -> Result<Box<dyn Array>> {
use DataType::*;
Ok(match datatype {
Boolean => deserialize_boolean(rows, column, |bytes| {
if bytes.eq_ignore_ascii_case(b"false") {
Some(false)
} else if bytes.eq_ignore_ascii_case(b"true") {
Some(true)
} else {
None
}
}),
Int8 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<i8>(bytes).ok()
}),
Int16 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<i16>(bytes).ok()
}),
Int32 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<i32>(bytes).ok()
}),
Int64 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<i64>(bytes).ok()
}),
UInt8 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<u8>(bytes).ok()
}),
UInt16 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<u16>(bytes).ok()
}),
UInt32 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<u32>(bytes).ok()
}),
UInt64 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<u64>(bytes).ok()
}),
Float32 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<f32>(bytes).ok()
}),
Float64 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<f64>(bytes).ok()
}),
Date32 => deserialize_primitive(rows, column, datatype, |bytes| {
to_utf8(bytes)
.and_then(|x| x.parse::<chrono::NaiveDate>().ok())
.map(|x| x.num_days_from_ce() - temporal_conversions::EPOCH_DAYS_FROM_CE)
}),
Date64 => deserialize_primitive(rows, column, datatype, |bytes| {
to_utf8(bytes)
.and_then(|x| x.parse::<chrono::NaiveDateTime>().ok())
.map(|x| x.timestamp_millis())
}),
Time32(time_unit) => deserialize_primitive(rows, column, datatype, |bytes| {
let factor = get_factor_from_timeunit(time_unit);
to_utf8(bytes)
.and_then(|x| x.parse::<chrono::NaiveTime>().ok())
.map(|x| {
(x.hour() * 3_600 * factor
+ x.minute() * 60 * factor
+ x.second() * factor
+ x.nanosecond() / (1_000_000_000 / factor)) as i32
})
}),
Time64(time_unit) => deserialize_primitive(rows, column, datatype, |bytes| {
let factor: u64 = get_factor_from_timeunit(time_unit).into();
to_utf8(bytes)
.and_then(|x| x.parse::<chrono::NaiveTime>().ok())
.map(|x| {
(x.hour() as u64 * 3_600 * factor
+ x.minute() as u64 * 60 * factor
+ x.second() as u64 * factor
+ x.nanosecond() as u64 / (1_000_000_000 / factor))
as i64
})
}),
Timestamp(time_unit, None) => {
let mut last_fmt_idx = 0;
deserialize_primitive(rows, column, datatype, |bytes| {
to_utf8(bytes)
.and_then(|s| deserialize_naive_datetime(s, &mut last_fmt_idx))
.and_then(|dt| match time_unit {
TimeUnit::Second => Some(dt.timestamp()),
TimeUnit::Millisecond => Some(dt.timestamp_millis()),
TimeUnit::Microsecond => Some(dt.timestamp_micros()),
TimeUnit::Nanosecond => dt.timestamp_nanos_opt(),
})
})
}
Timestamp(time_unit, Some(ref tz)) => {
let tz = temporal_conversions::parse_offset(tz)?;
let mut last_fmt_idx = 0;
deserialize_primitive(rows, column, datatype, |bytes| {
to_utf8(bytes)
.and_then(|x| deserialize_datetime(x, &tz, &mut last_fmt_idx))
.and_then(|dt| match time_unit {
TimeUnit::Second => Some(dt.timestamp()),
TimeUnit::Millisecond => Some(dt.timestamp_millis()),
TimeUnit::Microsecond => Some(dt.timestamp_micros()),
TimeUnit::Nanosecond => dt.timestamp_nanos_opt(),
})
})
}
Decimal(precision, scale) => deserialize_primitive(rows, column, datatype, |x| {
deserialize_decimal(x, precision, scale)
}),
Utf8 => deserialize_utf8::<i32, _>(rows, column),
LargeUtf8 => deserialize_utf8::<i64, _>(rows, column),
Binary => deserialize_binary::<i32, _>(rows, column),
LargeBinary => deserialize_binary::<i64, _>(rows, column),
Null => deserialize_null(rows, column),
other => {
return Err(Error::NotYetImplemented(format!(
"Deserializing type \"{other:?}\" is not implemented"
)))
}
})
}

// Return the factor by how small is a time unit compared to seconds
fn get_factor_from_timeunit(time_unit: TimeUnit) -> u32 {
match time_unit {
TimeUnit::Second => 1,
TimeUnit::Millisecond => 1_000,
TimeUnit::Microsecond => 1_000_000,
TimeUnit::Nanosecond => 1_000_000_000,
}
}
Loading

0 comments on commit 1a4684b

Please sign in to comment.