From f0164eff8a05ad01ae4727e9b9f9af0d58a92bc9 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 12 Sep 2023 05:08:02 +0900 Subject: [PATCH] AVRO-3853: [Rust] Support Local timestamp logical types for Rust SDK --- lang/rust/avro/README.md | 13 ++++++++ lang/rust/avro/src/de.rs | 50 ++++++++++++++++++++++++++++-- lang/rust/avro/src/decode.rs | 2 ++ lang/rust/avro/src/encode.rs | 2 ++ lang/rust/avro/src/error.rs | 6 ++++ lang/rust/avro/src/lib.rs | 13 ++++++++ lang/rust/avro/src/schema.rs | 38 +++++++++++++++++++++++ lang/rust/avro/src/types.rs | 56 ++++++++++++++++++++++++++++++++++ lang/rust/avro/tests/schema.rs | 38 +++++++++++++++++++++++ 9 files changed, 216 insertions(+), 2 deletions(-) diff --git a/lang/rust/avro/README.md b/lang/rust/avro/README.md index 4155785a2a8..ad5ec70689f 100644 --- a/lang/rust/avro/README.md +++ b/lang/rust/avro/README.md @@ -432,6 +432,7 @@ fn main() -> Result<(), Error> { 1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate 1. Date, Time (milli) as `i32` and Time (micro) as `i64` 1. Timestamp (milli and micro) as `i64` +1. Local timestamp (milli and micro) as `i64` 1. Duration as a custom type with `months`, `days` and `millis` accessor methods each of which returns an `i32` Note that the on-disk representation is identical to the underlying primitive/complex type. @@ -499,6 +500,16 @@ fn main() -> Result<(), Error> { "type": "long", "logicalType": "timestamp-micros" }, + { + "name": "local_timestamp_millis", + "type": "long", + "logicalType": "local-timestamp-millis" + }, + { + "name": "local_timestamp_micros", + "type": "long", + "logicalType": "local-timestamp-micros" + }, { "name": "duration", "type": { @@ -527,6 +538,8 @@ fn main() -> Result<(), Error> { record.put("time_micros", Value::TimeMicros(3)); record.put("timestamp_millis", Value::TimestampMillis(4)); record.put("timestamp_micros", Value::TimestampMicros(5)); + record.put("local_timestamp_millis", Value::LocalTimestampMillis(4)); + record.put("local_timestamp_micros", Value::LocalTimestampMicros(5)); record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8))); writer.append(record)?; diff --git a/lang/rust/avro/src/de.rs b/lang/rust/avro/src/de.rs index 601a909610d..6600564489a 100644 --- a/lang/rust/avro/src/de.rs +++ b/lang/rust/avro/src/de.rs @@ -244,7 +244,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> { Value::Long(i) | Value::TimeMicros(i) | Value::TimestampMillis(i) - | Value::TimestampMicros(i) => visitor.visit_i64(*i), + | Value::TimestampMicros(i) + | Value::LocalTimestampMillis(i) + | Value::LocalTimestampMicros(i) => visitor.visit_i64(*i), &Value::Float(f) => visitor.visit_f32(f), &Value::Double(d) => visitor.visit_f64(d), Value::Union(_i, u) => match **u { @@ -254,7 +256,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> { Value::Long(i) | Value::TimeMicros(i) | Value::TimestampMillis(i) - | Value::TimestampMicros(i) => visitor.visit_i64(i), + | Value::TimestampMicros(i) + | Value::LocalTimestampMillis(i) + | Value::LocalTimestampMicros(i) => visitor.visit_i64(i), Value::Float(f) => visitor.visit_f32(f), Value::Double(d) => visitor.visit_f64(d), Value::Record(ref fields) => visitor.visit_map(RecordDeserializer::new(fields)), @@ -1073,6 +1077,24 @@ mod tests { Ok(()) } + #[test] + fn test_avro_3853_local_timestamp_millis() -> TestResult { + let raw_value = 1; + let value = Value::LocalTimestampMillis(raw_value); + let result = crate::from_value::(&value)?; + assert_eq!(result, raw_value); + Ok(()) + } + + #[test] + fn test_avro_3853_local_timestamp_micros() -> TestResult { + let raw_value = 1; + let value = Value::LocalTimestampMicros(raw_value); + let result = crate::from_value::(&value)?; + assert_eq!(result, raw_value); + Ok(()) + } + #[test] fn test_from_value_uuid_str() -> TestResult { let raw_value = "9ec535ff-3e2a-45bd-91d3-0a01321b5a49"; @@ -1116,6 +1138,8 @@ mod tests { ("time_micros_a".to_string(), 123), ("timestamp_millis_b".to_string(), 234), ("timestamp_micros_c".to_string(), 345), + ("local_timestamp_millis_d".to_string(), 678), + ("local_timestamp_micros_e".to_string(), 789), ] .iter() .cloned() @@ -1132,6 +1156,12 @@ mod tests { key if key.starts_with("timestamp_micros_") => { (k.clone(), Value::TimestampMicros(*v)) } + key if key.starts_with("local_timestamp_millis_") => { + (k.clone(), Value::LocalTimestampMillis(*v)) + } + key if key.starts_with("local_timestamp_micros_") => { + (k.clone(), Value::LocalTimestampMicros(*v)) + } _ => unreachable!("unexpected key: {:?}", k), }) .collect(); @@ -1181,6 +1211,22 @@ mod tests { "a_non_existing_timestamp_micros".to_string(), Value::Union(0, Box::new(Value::TimestampMicros(-345))), ), + ( + "a_local_timestamp_millis".to_string(), + Value::Union(0, Box::new(Value::LocalTimestampMillis(678))), + ), + ( + "a_non_existing_local_timestamp_millis".to_string(), + Value::Union(0, Box::new(Value::LocalTimestampMillis(-678))), + ), + ( + "a_local_timestamp_micros".to_string(), + Value::Union(0, Box::new(Value::LocalTimestampMicros(789))), + ), + ( + "a_non_existing_local_timestamp_micros".to_string(), + Value::Union(0, Box::new(Value::LocalTimestampMicros(-789))), + ), ( "a_record".to_string(), Value::Union( diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs index debb38076d8..b13c76739b9 100644 --- a/lang/rust/avro/src/decode.rs +++ b/lang/rust/avro/src/decode.rs @@ -130,6 +130,8 @@ pub(crate) fn decode_internal>( Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros), Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis), Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros), + Schema::LocalTimestampMillis => zag_i64(reader).map(Value::LocalTimestampMillis), + Schema::LocalTimestampMicros => zag_i64(reader).map(Value::LocalTimestampMicros), Schema::Duration => { let mut buf = [0u8; 12]; reader.read_exact(&mut buf).map_err(Error::ReadDuration)?; diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs index e8080f04be1..6e52e0c3b1e 100644 --- a/lang/rust/avro/src/encode.rs +++ b/lang/rust/avro/src/encode.rs @@ -77,6 +77,8 @@ pub(crate) fn encode_internal>( Value::Long(i) | Value::TimestampMillis(i) | Value::TimestampMicros(i) + | Value::LocalTimestampMillis(i) + | Value::LocalTimestampMicros(i) | Value::TimeMicros(i) => encode_long(*i, buffer), Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()), Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()), diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index 447d2711acc..bf066b8a5ee 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -151,6 +151,12 @@ pub enum Error { #[error("TimestampMicros expected, got {0:?}")] GetTimestampMicros(ValueKind), + #[error("LocalTimestampMillis expected, got {0:?}")] + GetLocalTimestampMillis(ValueKind), + + #[error("LocalTimestampMicros expected, got {0:?}")] + GetLocalTimestampMicros(ValueKind), + #[error("Null expected, got {0:?}")] GetNull(ValueKind), diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs index cde0251c582..35b1b431a11 100644 --- a/lang/rust/avro/src/lib.rs +++ b/lang/rust/avro/src/lib.rs @@ -545,6 +545,7 @@ //! 1. UUID using the [`uuid`](https://docs.rs/uuid/1.0.0/uuid) crate //! 1. Date, Time (milli) as `i32` and Time (micro) as `i64` //! 1. Timestamp (milli and micro) as `i64` +//! 1. Local timestamp (milli and micro) as `i64` //! 1. Duration as a custom type with `months`, `days` and `millis` accessor methods each of which returns an `i32` //! //! Note that the on-disk representation is identical to the underlying primitive/complex type. @@ -613,6 +614,16 @@ //! "logicalType": "timestamp-micros" //! }, //! { +//! "name": "local_timestamp_millis", +//! "type": "long", +//! "logicalType": "local-timestamp-millis" +//! }, +//! { +//! "name": "local_timestamp_micros", +//! "type": "long", +//! "logicalType": "local-timestamp-micros" +//! }, +//! { //! "name": "duration", //! "type": { //! "type": "fixed", @@ -640,6 +651,8 @@ //! record.put("time_micros", Value::TimeMicros(3)); //! record.put("timestamp_millis", Value::TimestampMillis(4)); //! record.put("timestamp_micros", Value::TimestampMicros(5)); +//! record.put("local_timestamp_millis", Value::LocalTimestampMillis(4)); +//! record.put("local_timestamp_micros", Value::LocalTimestampMicros(5)); //! record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8))); //! //! writer.append(record)?; diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index 139a6259740..e811f91bdaf 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -127,6 +127,10 @@ pub enum Schema { TimestampMillis, /// An instant in time represented as the number of microseconds after the UNIX epoch. TimestampMicros, + /// An instant in localtime represented as the number of milliseconds after the UNIX epoch. + LocalTimestampMillis, + /// An instant in local time represented as the number of microseconds after the UNIX epoch. + LocalTimestampMicros, /// An amount of time defined by a number of months, days and milliseconds. Duration, /// A reference to another schema. @@ -191,6 +195,8 @@ impl From<&types::Value> for SchemaKind { Value::TimeMicros(_) => Self::TimeMicros, Value::TimestampMillis(_) => Self::TimestampMillis, Value::TimestampMicros(_) => Self::TimestampMicros, + Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis, + Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros, Value::Duration { .. } => Self::Duration, } } @@ -1388,6 +1394,26 @@ impl Parser { enclosing_namespace, ); } + "local-timestamp-millis" => { + return try_logical_type( + "local-timestamp-millis", + complex, + &[SchemaKind::Long], + Schema::LocalTimestampMillis, + self, + enclosing_namespace, + ); + } + "local-timestamp-micros" => { + return try_logical_type( + "local-timestamp-micros", + complex, + &[SchemaKind::Long], + Schema::LocalTimestampMicros, + self, + enclosing_namespace, + ); + } "duration" => { logical_verify_type(complex, &[SchemaKind::Fixed], self, enclosing_namespace)?; return Ok(Schema::Duration); @@ -1901,6 +1927,18 @@ impl Serialize for Schema { map.serialize_entry("logicalType", "timestamp-micros")?; map.end() } + Schema::LocalTimestampMillis => { + let mut map = serializer.serialize_map(None)?; + map.serialize_entry("type", "long")?; + map.serialize_entry("logicalType", "local-timestamp-millis")?; + map.end() + } + Schema::LocalTimestampMicros => { + let mut map = serializer.serialize_map(None)?; + map.serialize_entry("type", "long")?; + map.serialize_entry("logicalType", "local-timestamp-micros")?; + map.end() + } Schema::Duration => { let mut map = serializer.serialize_map(None)?; diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs index 11bf6356419..fea262b58a4 100644 --- a/lang/rust/avro/src/types.rs +++ b/lang/rust/avro/src/types.rs @@ -108,6 +108,10 @@ pub enum Value { TimestampMillis(i64), /// Timestamp in microseconds. TimestampMicros(i64), + /// Local timestamp in milliseconds. + LocalTimestampMillis(i64), + /// Local timestamp in microseconds. + LocalTimestampMicros(i64), /// Avro Duration. An amount of time defined by months, days and milliseconds. Duration(Duration), /// Universally unique identifier. @@ -327,6 +331,8 @@ impl TryFrom for JsonValue { Value::TimeMicros(t) => Ok(Self::Number(t.into())), Value::TimestampMillis(t) => Ok(Self::Number(t.into())), Value::TimestampMicros(t) => Ok(Self::Number(t.into())), + Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())), + Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())), Value::Duration(d) => Ok(Self::Array( <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(), )), @@ -409,8 +415,12 @@ impl Value { (&Value::Long(_), &Schema::TimeMicros) => None, (&Value::Long(_), &Schema::TimestampMillis) => None, (&Value::Long(_), &Schema::TimestampMicros) => None, + (&Value::Long(_), &Schema::LocalTimestampMillis) => None, + (&Value::Long(_), &Schema::LocalTimestampMicros) => None, (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None, (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None, + (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None, + (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None, (&Value::TimeMicros(_), &Schema::TimeMicros) => None, (&Value::TimeMillis(_), &Schema::TimeMillis) => None, (&Value::Date(_), &Schema::Date) => None, @@ -669,6 +679,8 @@ impl Value { Schema::TimeMicros => self.resolve_time_micros(), Schema::TimestampMillis => self.resolve_timestamp_millis(), Schema::TimestampMicros => self.resolve_timestamp_micros(), + Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(), + Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(), Schema::Duration => self.resolve_duration(), Schema::Uuid => self.resolve_uuid(), } @@ -784,6 +796,22 @@ impl Value { } } + fn resolve_local_timestamp_millis(self) -> Result { + match self { + Value::LocalTimestampMillis(ts) | Value::Long(ts) => Ok(Value::LocalTimestampMillis(ts)), + Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))), + other => Err(Error::GetLocalTimestampMillis(other.into())), + } + } + + fn resolve_local_timestamp_micros(self) -> Result { + match self { + Value::LocalTimestampMicros(ts) | Value::Long(ts) => Ok(Value::LocalTimestampMicros(ts)), + Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))), + other => Err(Error::GetLocalTimestampMicros(other.into())), + } + } + fn resolve_null(self) -> Result { match self { Value::Null => Ok(Value::Null), @@ -1681,6 +1709,26 @@ Field with name '"b"' is not a member of the map items"#, assert!(value.resolve(&Schema::TimestampMicros).is_err()); } + #[test] + fn test_avro_3853_resolve_timestamp_millis() { + let value = Value::LocalTimestampMillis(10); + assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok()); + assert!(value.resolve(&Schema::Float).is_err()); + + let value = Value::Float(10.0f32); + assert!(value.resolve(&Schema::LocalTimestampMillis).is_err()); + } + + #[test] + fn test_avro_3853_resolve_timestamp_micros() { + let value = Value::LocalTimestampMicros(10); + assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok()); + assert!(value.resolve(&Schema::Int).is_err()); + + let value = Value::Double(10.0); + assert!(value.resolve(&Schema::LocalTimestampMicros).is_err()); + } + #[test] fn resolve_duration() { let value = Value::Duration(Duration::new( @@ -1886,6 +1934,14 @@ Field with name '"b"' is not a member of the map items"#, JsonValue::try_from(Value::TimestampMicros(1))?, JsonValue::Number(1.into()) ); + assert_eq!( + JsonValue::try_from(Value::LocalTimestampMillis(1))?, + JsonValue::Number(1.into()) + ); + assert_eq!( + JsonValue::try_from(Value::LocalTimestampMicros(1))?, + JsonValue::Number(1.into()) + ); assert_eq!( JsonValue::try_from(Value::Duration( [1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8].into() diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs index 1d2cb8b4d98..63b73056084 100644 --- a/lang/rust/avro/tests/schema.rs +++ b/lang/rust/avro/tests/schema.rs @@ -592,6 +592,42 @@ const TIMESTAMPMICROS_LOGICAL_TYPE: &[(&str, bool)] = &[ ), ]; +const LOCAL_TIMESTAMPMILLIS_LOGICAL_TYPE: &[(&str, bool)] = &[ + ( + r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#, + true, + ), + // this is valid even though its logical type is "local-timestamp-milis" (missing the second "l"), because + // unknown logical types are ignored + ( + r#"{"type": "long", "logicalType": "local-timestamp-milis"}"#, + true, + ), + ( + // this is still valid because unknown logicalType should be ignored + r#"{"type": "int", "logicalType": "local-timestamp-millis"}"#, + true, + ), +]; + +const LOCAL_TIMESTAMPMICROS_LOGICAL_TYPE: &[(&str, bool)] = &[ + ( + r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#, + true, + ), + // this is valid even though its logical type is "local-timestamp-micro" (missing the last "s"), because + // unknown logical types are ignored + ( + r#"{"type": "long", "logicalType": "local-timestamp-micro"}"#, + true, + ), + ( + // this is still valid because unknown logicalType should be ignored + r#"{"type": "int", "logicalType": "local-timestamp-micros"}"#, + true, + ), +]; + lazy_static! { static ref EXAMPLES: Vec<(&'static str, bool)> = Vec::new() .iter() @@ -612,6 +648,8 @@ lazy_static! { .chain(TIMEMICROS_LOGICAL_TYPE.iter().copied()) .chain(TIMESTAMPMILLIS_LOGICAL_TYPE.iter().copied()) .chain(TIMESTAMPMICROS_LOGICAL_TYPE.iter().copied()) + .chain(LOCAL_TIMESTAMPMILLIS_LOGICAL_TYPE.iter().copied()) + .chain(LOCAL_TIMESTAMPMICROS_LOGICAL_TYPE.iter().copied()) .collect(); static ref VALID_EXAMPLES: Vec<(&'static str, bool)> = EXAMPLES.iter().copied().filter(|s| s.1).collect();