From 807314512736c2a1c0a4203f4eb9b5d84aa021e4 Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Fri, 27 Oct 2023 03:47:21 +0800 Subject: [PATCH] AVRO-3892: [Rust] Support to resolve fixed from bytes and deserialize bytes in deserialize_any (#2567) * support to resolve fixed from bytes * support to deserialize bytes, fixed, decimal. * fix clippy * AVRO-3892: Rename test method Signed-off-by: Martin Tzvetanov Grigorov * AVRO-3892: [Rust] Add unit tests for deserializing &str/String from Value::Bytes The tests are not really related to AVRO-3892. They do not cover the new changes in deserialize_any() Signed-off-by: Martin Tzvetanov Grigorov * add unit test for deserialize bytes from decimal and uuid * add more test --------- Signed-off-by: Martin Tzvetanov Grigorov Co-authored-by: ZENOTME Co-authored-by: Martin Tzvetanov Grigorov --- lang/rust/avro/examples/benchmark.rs | 2 +- lang/rust/avro/src/de.rs | 110 ++++++++++++++++++++++++++- lang/rust/avro/src/decimal.rs | 2 +- lang/rust/avro/src/types.rs | 46 +++++++++++ lang/rust/avro/src/writer.rs | 4 +- 5 files changed, 158 insertions(+), 6 deletions(-) diff --git a/lang/rust/avro/examples/benchmark.rs b/lang/rust/avro/examples/benchmark.rs index c3eac431393..53dfb1ddbda 100644 --- a/lang/rust/avro/examples/benchmark.rs +++ b/lang/rust/avro/examples/benchmark.rs @@ -60,7 +60,7 @@ fn benchmark( let start = Instant::now(); let mut writer = Writer::new(schema, BufWriter::new(Vec::new())); - writer.extend(records.into_iter())?; + writer.extend(records)?; let duration = Instant::now().duration_since(start); durations.push(duration); diff --git a/lang/rust/avro/src/de.rs b/lang/rust/avro/src/de.rs index 6600564489a..aba2b541fff 100644 --- a/lang/rust/avro/src/de.rs +++ b/lang/rust/avro/src/de.rs @@ -266,6 +266,8 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> { Value::String(ref s) => visitor.visit_borrowed_str(s), Value::Uuid(uuid) => visitor.visit_str(&uuid.to_string()), Value::Map(ref items) => visitor.visit_map(MapDeserializer::new(items)), + Value::Bytes(ref bytes) | Value::Fixed(_, ref bytes) => visitor.visit_bytes(bytes), + Value::Decimal(ref d) => visitor.visit_bytes(&d.to_vec()?), _ => Err(de::Error::custom(format!( "unsupported union: {:?}", self.input @@ -276,6 +278,8 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> { Value::String(ref s) => visitor.visit_borrowed_str(s), Value::Uuid(uuid) => visitor.visit_str(&uuid.to_string()), Value::Map(ref items) => visitor.visit_map(MapDeserializer::new(items)), + Value::Bytes(ref bytes) | Value::Fixed(_, ref bytes) => visitor.visit_bytes(bytes), + Value::Decimal(ref d) => visitor.visit_bytes(&d.to_vec()?), value => Err(de::Error::custom(format!( "incorrect value of type: {:?}", crate::schema::SchemaKind::from(value) @@ -350,8 +354,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> { Value::String(ref s) => visitor.visit_bytes(s.as_bytes()), Value::Bytes(ref bytes) | Value::Fixed(_, ref bytes) => visitor.visit_bytes(bytes), Value::Uuid(ref u) => visitor.visit_bytes(u.as_bytes()), + Value::Decimal(ref d) => visitor.visit_bytes(&d.to_vec()?), _ => Err(de::Error::custom(format!( - "Expected a String|Bytes|Fixed|Uuid, but got {:?}", + "Expected a String|Bytes|Fixed|Uuid|Decimal, but got {:?}", self.input ))), } @@ -654,6 +659,7 @@ pub fn from_value<'de, D: Deserialize<'de>>(value: &'de Value) -> Result TestResult { let raw_value = "9ec535ff-3e2a-45bd-91d3-0a01321b5a49"; let value = Value::Uuid(Uuid::parse_str(raw_value)?); - let result = crate::from_value::(&value)?; + let result = from_value::(&value)?; assert_eq!(result.to_string(), raw_value); Ok(()) } @@ -1315,4 +1323,102 @@ mod tests { Ok(()) } + + #[test] + fn test_avro_3892_deserialize_string_from_bytes() -> TestResult { + let raw_value = vec![1, 2, 3, 4]; + let value = Value::Bytes(raw_value.clone()); + let result = from_value::(&value)?; + assert_eq!(result, String::from_utf8(raw_value)?); + Ok(()) + } + + #[test] + fn test_avro_3892_deserialize_str_from_bytes() -> TestResult { + let raw_value = &[1, 2, 3, 4]; + let value = Value::Bytes(raw_value.to_vec()); + let result = from_value::<&str>(&value)?; + assert_eq!(result, std::str::from_utf8(raw_value)?); + Ok(()) + } + + #[derive(Debug)] + struct Bytes(Vec); + + impl<'de> Deserialize<'de> for Bytes { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct BytesVisitor; + impl<'de> serde::de::Visitor<'de> for BytesVisitor { + type Value = Bytes; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a byte array") + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + Ok(Bytes(v.to_vec())) + } + } + deserializer.deserialize_bytes(BytesVisitor) + } + } + + #[test] + fn test_avro_3892_deserialize_bytes_from_decimal() -> TestResult { + let expected_bytes = BigInt::from(123456789).to_signed_bytes_be(); + let value = Value::Decimal(Decimal::from(&expected_bytes)); + let raw_bytes = from_value::(&value)?; + assert_eq!(raw_bytes.0, expected_bytes); + + let value = Value::Union(0, Box::new(Value::Decimal(Decimal::from(&expected_bytes)))); + let raw_bytes = from_value::>(&value)?; + assert_eq!(raw_bytes.unwrap().0, expected_bytes); + Ok(()) + } + + #[test] + fn test_avro_3892_deserialize_bytes_from_uuid() -> TestResult { + let uuid_str = "10101010-2020-2020-2020-101010101010"; + let expected_bytes = Uuid::parse_str(uuid_str)?.as_bytes().to_vec(); + let value = Value::Uuid(Uuid::parse_str(uuid_str)?); + let raw_bytes = from_value::(&value)?; + assert_eq!(raw_bytes.0, expected_bytes); + + let value = Value::Union(0, Box::new(Value::Uuid(Uuid::parse_str(uuid_str)?))); + let raw_bytes = from_value::>(&value)?; + assert_eq!(raw_bytes.unwrap().0, expected_bytes); + Ok(()) + } + + #[test] + fn test_avro_3892_deserialize_bytes_from_fixed() -> TestResult { + let expected_bytes = vec![1, 2, 3, 4]; + let value = Value::Fixed(4, expected_bytes.clone()); + let raw_bytes = from_value::(&value)?; + assert_eq!(raw_bytes.0, expected_bytes); + + let value = Value::Union(0, Box::new(Value::Fixed(4, expected_bytes.clone()))); + let raw_bytes = from_value::>(&value)?; + assert_eq!(raw_bytes.unwrap().0, expected_bytes); + Ok(()) + } + + #[test] + fn test_avro_3892_deserialize_bytes_from_bytes() -> TestResult { + let expected_bytes = vec![1, 2, 3, 4]; + let value = Value::Bytes(expected_bytes.clone()); + let raw_bytes = from_value::(&value)?; + assert_eq!(raw_bytes.0, expected_bytes); + + let value = Value::Union(0, Box::new(Value::Bytes(expected_bytes.clone()))); + let raw_bytes = from_value::>(&value)?; + assert_eq!(raw_bytes.unwrap().0, expected_bytes); + Ok(()) + } } diff --git a/lang/rust/avro/src/decimal.rs b/lang/rust/avro/src/decimal.rs index 4237d710878..7188127f620 100644 --- a/lang/rust/avro/src/decimal.rs +++ b/lang/rust/avro/src/decimal.rs @@ -44,7 +44,7 @@ impl Decimal { self.len } - fn to_vec(&self) -> AvroResult> { + pub(crate) fn to_vec(&self) -> AvroResult> { self.to_sign_extended_bytes_with_len(self.len) } diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs index 715094f7ff2..060326f2e6c 100644 --- a/lang/rust/avro/src/types.rs +++ b/lang/rust/avro/src/types.rs @@ -917,6 +917,13 @@ impl Value { } } Value::String(s) => Ok(Value::Fixed(s.len(), s.into_bytes())), + Value::Bytes(s) => { + if s.len() == size { + Ok(Value::Fixed(size, s)) + } else { + Err(Error::CompareFixedSizes { size, n: s.len() }) + } + } other => Err(Error::GetStringForFixed(other.into())), } } @@ -2958,4 +2965,43 @@ Field with name '"b"' is not a member of the map items"#, Ok(()) } + + #[test] + fn test_avro_3892_resolve_fixed_from_bytes() -> TestResult { + let value = Value::Bytes(vec![97, 98, 99]); + assert_eq!( + value.resolve(&Schema::Fixed(FixedSchema { + name: "test".into(), + aliases: None, + doc: None, + size: 3, + attributes: Default::default() + }))?, + Value::Fixed(3, vec![97, 98, 99]) + ); + + let value = Value::Bytes(vec![97, 99]); + assert!(value + .resolve(&Schema::Fixed(FixedSchema { + name: "test".into(), + aliases: None, + doc: None, + size: 3, + attributes: Default::default() + })) + .is_err(),); + + let value = Value::Bytes(vec![97, 98, 99, 100]); + assert!(value + .resolve(&Schema::Fixed(FixedSchema { + name: "test".into(), + aliases: None, + doc: None, + size: 3, + attributes: Default::default() + })) + .is_err(),); + + Ok(()) + } } diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs index d968d28e053..b820885c6e3 100644 --- a/lang/rust/avro/src/writer.rs +++ b/lang/rust/avro/src/writer.rs @@ -895,7 +895,7 @@ mod tests { let record_copy = record.clone(); let records = vec![record, record_copy]; - let n1 = writer.extend(records.into_iter())?; + let n1 = writer.extend(records)?; let n2 = writer.flush()?; let result = writer.into_inner()?; @@ -970,7 +970,7 @@ mod tests { let record_copy = record.clone(); let records = vec![record, record_copy]; - let n1 = writer.extend_ser(records.into_iter())?; + let n1 = writer.extend_ser(records)?; let n2 = writer.flush()?; let result = writer.into_inner()?;