From e9f10b8d8922dc4938f7cdfa55f43941a2d98a35 Mon Sep 17 00:00:00 2001 From: Martin Grigorov Date: Fri, 8 Dec 2023 09:31:45 +0200 Subject: [PATCH] AVRO-3916: [Rust] Add [Local]TimestampNanos types (#2611) Signed-off-by: Martin Tzvetanov Grigorov --- lang/rust/avro/README.md | 2 ++ lang/rust/avro/src/de.rs | 54 +++++++++++++++++++++++++++++++++--- lang/rust/avro/src/decode.rs | 2 ++ lang/rust/avro/src/encode.rs | 2 ++ lang/rust/avro/src/error.rs | 6 ++++ lang/rust/avro/src/lib.rs | 2 ++ lang/rust/avro/src/schema.rs | 18 ++++++++++++ lang/rust/avro/src/types.rs | 54 ++++++++++++++++++++++++++++++++++++ 8 files changed, 136 insertions(+), 4 deletions(-) diff --git a/lang/rust/avro/README.md b/lang/rust/avro/README.md index a349847fac7..bb3fd2b2edf 100644 --- a/lang/rust/avro/README.md +++ b/lang/rust/avro/README.md @@ -538,8 +538,10 @@ fn main() -> Result<(), Error> { record.put("time_micros", Value::TimeMicros(3)); record.put("timestamp_millis", Value::TimestampMillis(4)); record.put("timestamp_micros", Value::TimestampMicros(5)); + record.put("timestamp_nanos", Value::TimestampNanos(6)); record.put("local_timestamp_millis", Value::LocalTimestampMillis(4)); record.put("local_timestamp_micros", Value::LocalTimestampMicros(5)); + record.put("local_timestamp_nanos", Value::LocalTimestampMicros(6)); record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8))); writer.append(record)?; diff --git a/lang/rust/avro/src/de.rs b/lang/rust/avro/src/de.rs index aba2b541fff..ceffadbd3bd 100644 --- a/lang/rust/avro/src/de.rs +++ b/lang/rust/avro/src/de.rs @@ -245,8 +245,10 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> { | Value::TimeMicros(i) | Value::TimestampMillis(i) | Value::TimestampMicros(i) + | Value::TimestampNanos(i) | Value::LocalTimestampMillis(i) - | Value::LocalTimestampMicros(i) => visitor.visit_i64(*i), + | Value::LocalTimestampMicros(i) + | Value::LocalTimestampNanos(i) => visitor.visit_i64(*i), &Value::Float(f) => visitor.visit_f32(f), &Value::Double(d) => visitor.visit_f64(d), Value::Union(_i, u) => match **u { @@ -257,8 +259,10 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> { | Value::TimeMicros(i) | Value::TimestampMillis(i) | Value::TimestampMicros(i) + | Value::TimestampNanos(i) | Value::LocalTimestampMillis(i) - | Value::LocalTimestampMicros(i) => visitor.visit_i64(i), + | Value::LocalTimestampMicros(i) + | Value::LocalTimestampNanos(i) => visitor.visit_i64(i), Value::Float(f) => visitor.visit_f32(f), Value::Double(d) => visitor.visit_f64(d), Value::Record(ref fields) => visitor.visit_map(RecordDeserializer::new(fields)), @@ -1080,7 +1084,16 @@ mod tests { fn test_timestamp_micros() -> TestResult { let raw_value = 1; let value = Value::TimestampMicros(raw_value); - let result = crate::from_value::(&value)?; + let result = from_value::(&value)?; + assert_eq!(result, raw_value); + Ok(()) + } + + #[test] + fn test_avro_3916_timestamp_nanos() -> TestResult { + let raw_value = 1; + let value = Value::TimestampNanos(raw_value); + let result = from_value::(&value)?; assert_eq!(result, raw_value); Ok(()) } @@ -1089,7 +1102,7 @@ mod tests { fn test_avro_3853_local_timestamp_millis() -> TestResult { let raw_value = 1; let value = Value::LocalTimestampMillis(raw_value); - let result = crate::from_value::(&value)?; + let result = from_value::(&value)?; assert_eq!(result, raw_value); Ok(()) } @@ -1103,6 +1116,15 @@ mod tests { Ok(()) } + #[test] + fn test_avro_3916_local_timestamp_nanos() -> TestResult { + let raw_value = 1; + let value = Value::LocalTimestampNanos(raw_value); + let result = crate::from_value::(&value)?; + assert_eq!(result, raw_value); + Ok(()) + } + #[test] fn test_from_value_uuid_str() -> TestResult { let raw_value = "9ec535ff-3e2a-45bd-91d3-0a01321b5a49"; @@ -1146,8 +1168,10 @@ mod tests { ("time_micros_a".to_string(), 123), ("timestamp_millis_b".to_string(), 234), ("timestamp_micros_c".to_string(), 345), + ("timestamp_nanos_d".to_string(), 345_001), ("local_timestamp_millis_d".to_string(), 678), ("local_timestamp_micros_e".to_string(), 789), + ("local_timestamp_nanos_f".to_string(), 345_002), ] .iter() .cloned() @@ -1164,12 +1188,18 @@ mod tests { key if key.starts_with("timestamp_micros_") => { (k.clone(), Value::TimestampMicros(*v)) } + key if key.starts_with("timestamp_nanos_") => { + (k.clone(), Value::TimestampNanos(*v)) + } key if key.starts_with("local_timestamp_millis_") => { (k.clone(), Value::LocalTimestampMillis(*v)) } key if key.starts_with("local_timestamp_micros_") => { (k.clone(), Value::LocalTimestampMicros(*v)) } + key if key.starts_with("local_timestamp_nanos_") => { + (k.clone(), Value::LocalTimestampNanos(*v)) + } _ => unreachable!("unexpected key: {:?}", k), }) .collect(); @@ -1219,6 +1249,14 @@ mod tests { "a_non_existing_timestamp_micros".to_string(), Value::Union(0, Box::new(Value::TimestampMicros(-345))), ), + ( + "a_timestamp_nanos".to_string(), + Value::Union(0, Box::new(Value::TimestampNanos(345))), + ), + ( + "a_non_existing_timestamp_nanos".to_string(), + Value::Union(0, Box::new(Value::TimestampNanos(-345))), + ), ( "a_local_timestamp_millis".to_string(), Value::Union(0, Box::new(Value::LocalTimestampMillis(678))), @@ -1235,6 +1273,14 @@ mod tests { "a_non_existing_local_timestamp_micros".to_string(), Value::Union(0, Box::new(Value::LocalTimestampMicros(-789))), ), + ( + "a_local_timestamp_nanos".to_string(), + Value::Union(0, Box::new(Value::LocalTimestampNanos(789))), + ), + ( + "a_non_existing_local_timestamp_nanos".to_string(), + Value::Union(0, Box::new(Value::LocalTimestampNanos(-789))), + ), ( "a_record".to_string(), Value::Union( diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs index 8c50e77026a..bf8477fb70a 100644 --- a/lang/rust/avro/src/decode.rs +++ b/lang/rust/avro/src/decode.rs @@ -137,8 +137,10 @@ pub(crate) fn decode_internal>( Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros), Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis), Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros), + Schema::TimestampNanos => zag_i64(reader).map(Value::TimestampNanos), Schema::LocalTimestampMillis => zag_i64(reader).map(Value::LocalTimestampMillis), Schema::LocalTimestampMicros => zag_i64(reader).map(Value::LocalTimestampMicros), + Schema::LocalTimestampNanos => zag_i64(reader).map(Value::LocalTimestampNanos), Schema::Duration => { let mut buf = [0u8; 12]; reader.read_exact(&mut buf).map_err(Error::ReadDuration)?; diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs index 5dd08777177..23f94664c89 100644 --- a/lang/rust/avro/src/encode.rs +++ b/lang/rust/avro/src/encode.rs @@ -90,8 +90,10 @@ pub(crate) fn encode_internal>( Value::Long(i) | Value::TimestampMillis(i) | Value::TimestampMicros(i) + | Value::TimestampNanos(i) | Value::LocalTimestampMillis(i) | Value::LocalTimestampMicros(i) + | Value::LocalTimestampNanos(i) | Value::TimeMicros(i) => encode_long(*i, buffer), Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()), Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()), diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index 13cec65c7c6..36c8d94a91a 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -154,12 +154,18 @@ pub enum Error { #[error("TimestampMicros expected, got {0:?}")] GetTimestampMicros(ValueKind), + #[error("TimestampNanos expected, got {0:?}")] + GetTimestampNanos(ValueKind), + #[error("LocalTimestampMillis expected, got {0:?}")] GetLocalTimestampMillis(ValueKind), #[error("LocalTimestampMicros expected, got {0:?}")] GetLocalTimestampMicros(ValueKind), + #[error("LocalTimestampNanos expected, got {0:?}")] + GetLocalTimestampNanos(ValueKind), + #[error("Null expected, got {0:?}")] GetNull(ValueKind), diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs index a613d93c0a0..2d9d79a7688 100644 --- a/lang/rust/avro/src/lib.rs +++ b/lang/rust/avro/src/lib.rs @@ -651,8 +651,10 @@ //! record.put("time_micros", Value::TimeMicros(3)); //! record.put("timestamp_millis", Value::TimestampMillis(4)); //! record.put("timestamp_micros", Value::TimestampMicros(5)); +//! record.put("timestamp_nanos", Value::TimestampNanos(6)); //! record.put("local_timestamp_millis", Value::LocalTimestampMillis(4)); //! record.put("local_timestamp_micros", Value::LocalTimestampMicros(5)); +//! record.put("local_timestamp_nanos", Value::LocalTimestampMicros(6)); //! record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8))); //! //! writer.append(record)?; diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index f2487e316f4..90cdeb1b275 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -130,10 +130,14 @@ pub enum Schema { TimestampMillis, /// An instant in time represented as the number of microseconds after the UNIX epoch. TimestampMicros, + /// An instant in time represented as the number of nanoseconds after the UNIX epoch. + TimestampNanos, /// An instant in localtime represented as the number of milliseconds after the UNIX epoch. LocalTimestampMillis, /// An instant in local time represented as the number of microseconds after the UNIX epoch. LocalTimestampMicros, + /// An instant in local time represented as the number of nanoseconds after the UNIX epoch. + LocalTimestampNanos, /// An amount of time defined by a number of months, days and milliseconds. Duration, /// A reference to another schema. @@ -199,8 +203,10 @@ impl From<&types::Value> for SchemaKind { Value::TimeMicros(_) => Self::TimeMicros, Value::TimestampMillis(_) => Self::TimestampMillis, Value::TimestampMicros(_) => Self::TimestampMicros, + Value::TimestampNanos(_) => Self::TimestampNanos, Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis, Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros, + Value::LocalTimestampNanos(_) => Self::LocalTimestampNanos, Value::Duration { .. } => Self::Duration, } } @@ -1942,6 +1948,12 @@ impl Serialize for Schema { map.serialize_entry("logicalType", "timestamp-micros")?; map.end() } + Schema::TimestampNanos => { + let mut map = serializer.serialize_map(None)?; + map.serialize_entry("type", "long")?; + map.serialize_entry("logicalType", "timestamp-nanos")?; + map.end() + } Schema::LocalTimestampMillis => { let mut map = serializer.serialize_map(None)?; map.serialize_entry("type", "long")?; @@ -1954,6 +1966,12 @@ impl Serialize for Schema { map.serialize_entry("logicalType", "local-timestamp-micros")?; map.end() } + Schema::LocalTimestampNanos => { + let mut map = serializer.serialize_map(None)?; + map.serialize_entry("type", "long")?; + map.serialize_entry("logicalType", "local-timestamp-nanos")?; + map.end() + } Schema::Duration => { let mut map = serializer.serialize_map(None)?; diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs index 49fb9e6a35a..c3fde21e490 100644 --- a/lang/rust/avro/src/types.rs +++ b/lang/rust/avro/src/types.rs @@ -112,10 +112,14 @@ pub enum Value { TimestampMillis(i64), /// Timestamp in microseconds. TimestampMicros(i64), + /// Timestamp in nanoseconds. + TimestampNanos(i64), /// Local timestamp in milliseconds. LocalTimestampMillis(i64), /// Local timestamp in microseconds. LocalTimestampMicros(i64), + /// Local timestamp in nanoseconds. + LocalTimestampNanos(i64), /// Avro Duration. An amount of time defined by months, days and milliseconds. Duration(Duration), /// Universally unique identifier. @@ -340,8 +344,10 @@ impl TryFrom for JsonValue { Value::TimeMicros(t) => Ok(Self::Number(t.into())), Value::TimestampMillis(t) => Ok(Self::Number(t.into())), Value::TimestampMicros(t) => Ok(Self::Number(t.into())), + Value::TimestampNanos(t) => Ok(Self::Number(t.into())), Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())), Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())), + Value::LocalTimestampNanos(t) => Ok(Self::Number(t.into())), Value::Duration(d) => Ok(Self::Array( <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(), )), @@ -428,8 +434,10 @@ impl Value { (&Value::Long(_), &Schema::LocalTimestampMicros) => None, (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None, (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None, + (&Value::TimestampNanos(_), &Schema::TimestampNanos) => None, (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None, (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None, + (&Value::LocalTimestampNanos(_), &Schema::LocalTimestampNanos) => None, (&Value::TimeMicros(_), &Schema::TimeMicros) => None, (&Value::TimeMillis(_), &Schema::TimeMillis) => None, (&Value::Date(_), &Schema::Date) => None, @@ -689,8 +697,10 @@ impl Value { Schema::TimeMicros => self.resolve_time_micros(), Schema::TimestampMillis => self.resolve_timestamp_millis(), Schema::TimestampMicros => self.resolve_timestamp_micros(), + Schema::TimestampNanos => self.resolve_timestamp_nanos(), Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(), Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(), + Schema::LocalTimestampNanos => self.resolve_local_timestamp_nanos(), Schema::Duration => self.resolve_duration(), Schema::Uuid => self.resolve_uuid(), } @@ -814,6 +824,14 @@ impl Value { } } + fn resolve_timestamp_nanos(self) -> Result { + match self { + Value::TimestampNanos(ts) | Value::Long(ts) => Ok(Value::TimestampNanos(ts)), + Value::Int(ts) => Ok(Value::TimestampNanos(i64::from(ts))), + other => Err(Error::GetTimestampNanos(other.into())), + } + } + fn resolve_local_timestamp_millis(self) -> Result { match self { Value::LocalTimestampMillis(ts) | Value::Long(ts) => { @@ -834,6 +852,14 @@ impl Value { } } + fn resolve_local_timestamp_nanos(self) -> Result { + match self { + Value::LocalTimestampNanos(ts) | Value::Long(ts) => Ok(Value::LocalTimestampNanos(ts)), + Value::Int(ts) => Ok(Value::LocalTimestampNanos(i64::from(ts))), + other => Err(Error::GetLocalTimestampNanos(other.into())), + } + } + fn resolve_null(self) -> Result { match self { Value::Null => Ok(Value::Null), @@ -1738,6 +1764,16 @@ Field with name '"b"' is not a member of the map items"#, assert!(value.resolve(&Schema::TimestampMicros).is_err()); } + #[test] + fn test_avro_3914_resolve_timestamp_nanos() { + let value = Value::TimestampNanos(10); + assert!(value.clone().resolve(&Schema::TimestampNanos).is_ok()); + assert!(value.resolve(&Schema::Int).is_err()); + + let value = Value::Double(10.0); + assert!(value.resolve(&Schema::TimestampNanos).is_err()); + } + #[test] fn test_avro_3853_resolve_timestamp_millis() { let value = Value::LocalTimestampMillis(10); @@ -1758,6 +1794,16 @@ Field with name '"b"' is not a member of the map items"#, assert!(value.resolve(&Schema::LocalTimestampMicros).is_err()); } + #[test] + fn test_avro_3916_resolve_timestamp_nanos() { + let value = Value::LocalTimestampNanos(10); + assert!(value.clone().resolve(&Schema::LocalTimestampNanos).is_ok()); + assert!(value.resolve(&Schema::Int).is_err()); + + let value = Value::Double(10.0); + assert!(value.resolve(&Schema::LocalTimestampNanos).is_err()); + } + #[test] fn resolve_duration() { let value = Value::Duration(Duration::new( @@ -1963,6 +2009,10 @@ Field with name '"b"' is not a member of the map items"#, JsonValue::try_from(Value::TimestampMicros(1))?, JsonValue::Number(1.into()) ); + assert_eq!( + JsonValue::try_from(Value::TimestampNanos(1))?, + JsonValue::Number(1.into()) + ); assert_eq!( JsonValue::try_from(Value::LocalTimestampMillis(1))?, JsonValue::Number(1.into()) @@ -1971,6 +2021,10 @@ Field with name '"b"' is not a member of the map items"#, JsonValue::try_from(Value::LocalTimestampMicros(1))?, JsonValue::Number(1.into()) ); + assert_eq!( + JsonValue::try_from(Value::LocalTimestampNanos(1))?, + JsonValue::Number(1.into()) + ); assert_eq!( JsonValue::try_from(Value::Duration( [1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8].into()