Skip to content

Commit

Permalink
AVRO-3853: [Rust] Support Local timestamp logical types for Rust SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Sep 11, 2023
1 parent e3b47eb commit f0164ef
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 2 deletions.
13 changes: 13 additions & 0 deletions lang/rust/avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ fn main() -> Result<(), Error> {
1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate
1. Date, Time (milli) as `i32` and Time (micro) as `i64`
1. Timestamp (milli and micro) as `i64`
1. Local timestamp (milli and micro) as `i64`
1. Duration as a custom type with `months`, `days` and `millis` accessor methods each of which returns an `i32`

Note that the on-disk representation is identical to the underlying primitive/complex type.
Expand Down Expand Up @@ -499,6 +500,16 @@ fn main() -> Result<(), Error> {
"type": "long",
"logicalType": "timestamp-micros"
},
{
"name": "local_timestamp_millis",
"type": "long",
"logicalType": "local-timestamp-millis"
},
{
"name": "local_timestamp_micros",
"type": "long",
"logicalType": "local-timestamp-micros"
},
{
"name": "duration",
"type": {
Expand Down Expand Up @@ -527,6 +538,8 @@ fn main() -> Result<(), Error> {
record.put("time_micros", Value::TimeMicros(3));
record.put("timestamp_millis", Value::TimestampMillis(4));
record.put("timestamp_micros", Value::TimestampMicros(5));
record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));

writer.append(record)?;
Expand Down
50 changes: 48 additions & 2 deletions lang/rust/avro/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
Value::Long(i)
| Value::TimeMicros(i)
| Value::TimestampMillis(i)
| Value::TimestampMicros(i) => visitor.visit_i64(*i),
| Value::TimestampMicros(i)
| Value::LocalTimestampMillis(i)
| Value::LocalTimestampMicros(i) => visitor.visit_i64(*i),
&Value::Float(f) => visitor.visit_f32(f),
&Value::Double(d) => visitor.visit_f64(d),
Value::Union(_i, u) => match **u {
Expand All @@ -254,7 +256,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
Value::Long(i)
| Value::TimeMicros(i)
| Value::TimestampMillis(i)
| Value::TimestampMicros(i) => visitor.visit_i64(i),
| Value::TimestampMicros(i)
| Value::LocalTimestampMillis(i)
| Value::LocalTimestampMicros(i) => visitor.visit_i64(i),
Value::Float(f) => visitor.visit_f32(f),
Value::Double(d) => visitor.visit_f64(d),
Value::Record(ref fields) => visitor.visit_map(RecordDeserializer::new(fields)),
Expand Down Expand Up @@ -1073,6 +1077,24 @@ mod tests {
Ok(())
}

#[test]
fn test_avro_3853_local_timestamp_millis() -> TestResult {
let raw_value = 1;
let value = Value::LocalTimestampMillis(raw_value);
let result = crate::from_value::<i64>(&value)?;
assert_eq!(result, raw_value);
Ok(())
}

#[test]
fn test_avro_3853_local_timestamp_micros() -> TestResult {
let raw_value = 1;
let value = Value::LocalTimestampMicros(raw_value);
let result = crate::from_value::<i64>(&value)?;
assert_eq!(result, raw_value);
Ok(())
}

#[test]
fn test_from_value_uuid_str() -> TestResult {
let raw_value = "9ec535ff-3e2a-45bd-91d3-0a01321b5a49";
Expand Down Expand Up @@ -1116,6 +1138,8 @@ mod tests {
("time_micros_a".to_string(), 123),
("timestamp_millis_b".to_string(), 234),
("timestamp_micros_c".to_string(), 345),
("local_timestamp_millis_d".to_string(), 678),
("local_timestamp_micros_e".to_string(), 789),
]
.iter()
.cloned()
Expand All @@ -1132,6 +1156,12 @@ mod tests {
key if key.starts_with("timestamp_micros_") => {
(k.clone(), Value::TimestampMicros(*v))
}
key if key.starts_with("local_timestamp_millis_") => {
(k.clone(), Value::LocalTimestampMillis(*v))
}
key if key.starts_with("local_timestamp_micros_") => {
(k.clone(), Value::LocalTimestampMicros(*v))
}
_ => unreachable!("unexpected key: {:?}", k),
})
.collect();
Expand Down Expand Up @@ -1181,6 +1211,22 @@ mod tests {
"a_non_existing_timestamp_micros".to_string(),
Value::Union(0, Box::new(Value::TimestampMicros(-345))),
),
(
"a_local_timestamp_millis".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampMillis(678))),
),
(
"a_non_existing_local_timestamp_millis".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampMillis(-678))),
),
(
"a_local_timestamp_micros".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampMicros(789))),
),
(
"a_non_existing_local_timestamp_micros".to_string(),
Value::Union(0, Box::new(Value::LocalTimestampMicros(-789))),
),
(
"a_record".to_string(),
Value::Union(
Expand Down
2 changes: 2 additions & 0 deletions lang/rust/avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
Schema::LocalTimestampMillis => zag_i64(reader).map(Value::LocalTimestampMillis),
Schema::LocalTimestampMicros => zag_i64(reader).map(Value::LocalTimestampMicros),
Schema::Duration => {
let mut buf = [0u8; 12];
reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
Expand Down
2 changes: 2 additions & 0 deletions lang/rust/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
Value::Long(i)
| Value::TimestampMillis(i)
| Value::TimestampMicros(i)
| Value::LocalTimestampMillis(i)
| Value::LocalTimestampMicros(i)
| Value::TimeMicros(i) => encode_long(*i, buffer),
Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
Expand Down
6 changes: 6 additions & 0 deletions lang/rust/avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ pub enum Error {
#[error("TimestampMicros expected, got {0:?}")]
GetTimestampMicros(ValueKind),

#[error("LocalTimestampMillis expected, got {0:?}")]
GetLocalTimestampMillis(ValueKind),

#[error("LocalTimestampMicros expected, got {0:?}")]
GetLocalTimestampMicros(ValueKind),

#[error("Null expected, got {0:?}")]
GetNull(ValueKind),

Expand Down
13 changes: 13 additions & 0 deletions lang/rust/avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@
//! 1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate
//! 1. Date, Time (milli) as `i32` and Time (micro) as `i64`
//! 1. Timestamp (milli and micro) as `i64`
//! 1. Local timestamp (milli and micro) as `i64`
//! 1. Duration as a custom type with `months`, `days` and `millis` accessor methods each of which returns an `i32`
//!
//! Note that the on-disk representation is identical to the underlying primitive/complex type.
Expand Down Expand Up @@ -613,6 +614,16 @@
//! "logicalType": "timestamp-micros"
//! },
//! {
//! "name": "local_timestamp_millis",
//! "type": "long",
//! "logicalType": "local-timestamp-millis"
//! },
//! {
//! "name": "local_timestamp_micros",
//! "type": "long",
//! "logicalType": "local-timestamp-micros"
//! },
//! {
//! "name": "duration",
//! "type": {
//! "type": "fixed",
Expand Down Expand Up @@ -640,6 +651,8 @@
//! record.put("time_micros", Value::TimeMicros(3));
//! record.put("timestamp_millis", Value::TimestampMillis(4));
//! record.put("timestamp_micros", Value::TimestampMicros(5));
//! record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
//! record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
//! record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));
//!
//! writer.append(record)?;
Expand Down
38 changes: 38 additions & 0 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ pub enum Schema {
TimestampMillis,
/// An instant in time represented as the number of microseconds after the UNIX epoch.
TimestampMicros,
/// An instant in localtime represented as the number of milliseconds after the UNIX epoch.
LocalTimestampMillis,
/// An instant in local time represented as the number of microseconds after the UNIX epoch.
LocalTimestampMicros,
/// An amount of time defined by a number of months, days and milliseconds.
Duration,
/// A reference to another schema.
Expand Down Expand Up @@ -191,6 +195,8 @@ impl From<&types::Value> for SchemaKind {
Value::TimeMicros(_) => Self::TimeMicros,
Value::TimestampMillis(_) => Self::TimestampMillis,
Value::TimestampMicros(_) => Self::TimestampMicros,
Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis,
Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros,
Value::Duration { .. } => Self::Duration,
}
}
Expand Down Expand Up @@ -1388,6 +1394,26 @@ impl Parser {
enclosing_namespace,
);
}
"local-timestamp-millis" => {
return try_logical_type(
"local-timestamp-millis",
complex,
&[SchemaKind::Long],
Schema::LocalTimestampMillis,
self,
enclosing_namespace,
);
}
"local-timestamp-micros" => {
return try_logical_type(
"local-timestamp-micros",
complex,
&[SchemaKind::Long],
Schema::LocalTimestampMicros,
self,
enclosing_namespace,
);
}
"duration" => {
logical_verify_type(complex, &[SchemaKind::Fixed], self, enclosing_namespace)?;
return Ok(Schema::Duration);
Expand Down Expand Up @@ -1901,6 +1927,18 @@ impl Serialize for Schema {
map.serialize_entry("logicalType", "timestamp-micros")?;
map.end()
}
Schema::LocalTimestampMillis => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
map.serialize_entry("logicalType", "local-timestamp-millis")?;
map.end()
}
Schema::LocalTimestampMicros => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
map.serialize_entry("logicalType", "local-timestamp-micros")?;
map.end()
}
Schema::Duration => {
let mut map = serializer.serialize_map(None)?;

Expand Down
56 changes: 56 additions & 0 deletions lang/rust/avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ pub enum Value {
TimestampMillis(i64),
/// Timestamp in microseconds.
TimestampMicros(i64),
/// Local timestamp in milliseconds.
LocalTimestampMillis(i64),
/// Local timestamp in microseconds.
LocalTimestampMicros(i64),
/// Avro Duration. An amount of time defined by months, days and milliseconds.
Duration(Duration),
/// Universally unique identifier.
Expand Down Expand Up @@ -327,6 +331,8 @@ impl TryFrom<Value> for JsonValue {
Value::TimeMicros(t) => Ok(Self::Number(t.into())),
Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
Value::Duration(d) => Ok(Self::Array(
<[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
)),
Expand Down Expand Up @@ -409,8 +415,12 @@ impl Value {
(&Value::Long(_), &Schema::TimeMicros) => None,
(&Value::Long(_), &Schema::TimestampMillis) => None,
(&Value::Long(_), &Schema::TimestampMicros) => None,
(&Value::Long(_), &Schema::LocalTimestampMillis) => None,
(&Value::Long(_), &Schema::LocalTimestampMicros) => None,
(&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
(&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
(&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None,
(&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None,
(&Value::TimeMicros(_), &Schema::TimeMicros) => None,
(&Value::TimeMillis(_), &Schema::TimeMillis) => None,
(&Value::Date(_), &Schema::Date) => None,
Expand Down Expand Up @@ -669,6 +679,8 @@ impl Value {
Schema::TimeMicros => self.resolve_time_micros(),
Schema::TimestampMillis => self.resolve_timestamp_millis(),
Schema::TimestampMicros => self.resolve_timestamp_micros(),
Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(),
Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(),
Schema::Duration => self.resolve_duration(),
Schema::Uuid => self.resolve_uuid(),
}
Expand Down Expand Up @@ -784,6 +796,22 @@ impl Value {
}
}

fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
match self {
Value::LocalTimestampMillis(ts) | Value::Long(ts) => Ok(Value::LocalTimestampMillis(ts)),
Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))),
other => Err(Error::GetLocalTimestampMillis(other.into())),
}
}

fn resolve_local_timestamp_micros(self) -> Result<Self, Error> {
match self {
Value::LocalTimestampMicros(ts) | Value::Long(ts) => Ok(Value::LocalTimestampMicros(ts)),
Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))),
other => Err(Error::GetLocalTimestampMicros(other.into())),
}
}

fn resolve_null(self) -> Result<Self, Error> {
match self {
Value::Null => Ok(Value::Null),
Expand Down Expand Up @@ -1681,6 +1709,26 @@ Field with name '"b"' is not a member of the map items"#,
assert!(value.resolve(&Schema::TimestampMicros).is_err());
}

#[test]
fn test_avro_3853_resolve_timestamp_millis() {
let value = Value::LocalTimestampMillis(10);
assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok());
assert!(value.resolve(&Schema::Float).is_err());

let value = Value::Float(10.0f32);
assert!(value.resolve(&Schema::LocalTimestampMillis).is_err());
}

#[test]
fn test_avro_3853_resolve_timestamp_micros() {
let value = Value::LocalTimestampMicros(10);
assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok());
assert!(value.resolve(&Schema::Int).is_err());

let value = Value::Double(10.0);
assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
}

#[test]
fn resolve_duration() {
let value = Value::Duration(Duration::new(
Expand Down Expand Up @@ -1886,6 +1934,14 @@ Field with name '"b"' is not a member of the map items"#,
JsonValue::try_from(Value::TimestampMicros(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::LocalTimestampMillis(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::LocalTimestampMicros(1))?,
JsonValue::Number(1.into())
);
assert_eq!(
JsonValue::try_from(Value::Duration(
[1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8].into()
Expand Down
Loading

0 comments on commit f0164ef

Please sign in to comment.