Skip to content

Commit

Permalink
AVRO-3927: [Rust]support map and array schema (#2681)
Browse files Browse the repository at this point in the history
* support map and array schema

* AVRO-3927: [Rust] Introduce factory methods for Map & Array schema

The user facing API is shorter than using the From trait

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

---------

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: ZENOTME <[email protected]>
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
  • Loading branch information
3 people authored Jan 11, 2024
1 parent c193058 commit c1d5b97
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 66 deletions.
18 changes: 12 additions & 6 deletions lang/rust/avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(

items.reserve(len);
for _ in 0..len {
items.push(decode_internal(inner, names, enclosing_namespace, reader)?);
items.push(decode_internal(
&inner.items,
names,
enclosing_namespace,
reader,
)?);
}
}

Expand All @@ -215,7 +220,8 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
for _ in 0..len {
match decode_internal(&Schema::String, names, enclosing_namespace, reader)? {
Value::String(key) => {
let value = decode_internal(inner, names, enclosing_namespace, reader)?;
let value =
decode_internal(&inner.types, names, enclosing_namespace, reader)?;
items.insert(key, value);
}
value => return Err(Error::MapKeyType(value.into())),
Expand Down Expand Up @@ -321,7 +327,7 @@ mod tests {
#[test]
fn test_decode_array_without_size() -> TestResult {
let mut input: &[u8] = &[6, 2, 4, 6, 0];
let result = decode(&Schema::Array(Box::new(Schema::Int)), &mut input);
let result = decode(&Schema::array(Schema::Int), &mut input);
assert_eq!(Array(vec!(Int(1), Int(2), Int(3))), result?);

Ok(())
Expand All @@ -330,7 +336,7 @@ mod tests {
#[test]
fn test_decode_array_with_size() -> TestResult {
let mut input: &[u8] = &[5, 6, 2, 4, 6, 0];
let result = decode(&Schema::Array(Box::new(Schema::Int)), &mut input);
let result = decode(&Schema::array(Schema::Int), &mut input);
assert_eq!(Array(vec!(Int(1), Int(2), Int(3))), result?);

Ok(())
Expand All @@ -339,7 +345,7 @@ mod tests {
#[test]
fn test_decode_map_without_size() -> TestResult {
let mut input: &[u8] = &[0x02, 0x08, 0x74, 0x65, 0x73, 0x74, 0x02, 0x00];
let result = decode(&Schema::Map(Box::new(Schema::Int)), &mut input);
let result = decode(&Schema::map(Schema::Int), &mut input);
let mut expected = HashMap::new();
expected.insert(String::from("test"), Int(1));
assert_eq!(Map(expected), result?);
Expand All @@ -350,7 +356,7 @@ mod tests {
#[test]
fn test_decode_map_with_size() -> TestResult {
let mut input: &[u8] = &[0x01, 0x0C, 0x08, 0x74, 0x65, 0x73, 0x74, 0x02, 0x00];
let result = decode(&Schema::Map(Box::new(Schema::Int)), &mut input);
let result = decode(&Schema::map(Schema::Int), &mut input);
let mut expected = HashMap::new();
expected.insert(String::from("test"), Int(1));
assert_eq!(Map(expected), result?);
Expand Down
18 changes: 6 additions & 12 deletions lang/rust/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
if !items.is_empty() {
encode_long(items.len() as i64, buffer);
for item in items.iter() {
encode_internal(item, inner, names, enclosing_namespace, buffer)?;
encode_internal(item, &inner.items, names, enclosing_namespace, buffer)?;
}
}
buffer.push(0u8);
Expand All @@ -205,7 +205,7 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
encode_long(items.len() as i64, buffer);
for (key, value) in items {
encode_bytes(key, buffer);
encode_internal(value, inner, names, enclosing_namespace, buffer)?;
encode_internal(value, &inner.types, names, enclosing_namespace, buffer)?;
}
}
buffer.push(0u8);
Expand Down Expand Up @@ -309,13 +309,10 @@ pub(crate) mod tests {
let empty: Vec<Value> = Vec::new();
encode(
&Value::Array(empty.clone()),
&Schema::Array(Box::new(Schema::Int)),
&Schema::array(Schema::Int),
&mut buf,
)
.expect(&success(
&Value::Array(empty),
&Schema::Array(Box::new(Schema::Int)),
));
.expect(&success(&Value::Array(empty), &Schema::array(Schema::Int)));
assert_eq!(vec![0u8], buf);
}

Expand All @@ -325,13 +322,10 @@ pub(crate) mod tests {
let empty: HashMap<String, Value> = HashMap::new();
encode(
&Value::Map(empty.clone()),
&Schema::Map(Box::new(Schema::Int)),
&Schema::map(Schema::Int),
&mut buf,
)
.expect(&success(
&Value::Map(empty),
&Schema::Map(Box::new(Schema::Int)),
));
.expect(&success(&Value::Map(empty), &Schema::map(Schema::Int)));
assert_eq!(vec![0u8], buf);
}

Expand Down
2 changes: 1 addition & 1 deletion lang/rust/avro/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<'r, R: Read> Block<'r, R> {
/// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
/// its content.
fn read_header(&mut self) -> AvroResult<()> {
let meta_schema = Schema::Map(Box::new(Schema::Bytes));
let meta_schema = Schema::map(Schema::Bytes);

let mut buf = [0u8; 4];
self.reader
Expand Down
142 changes: 112 additions & 30 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ pub enum Schema {
String,
/// A `array` Avro schema. Avro arrays are required to have the same type for each element.
/// This variant holds the `Schema` for the array element type.
Array(Box<Schema>),
Array(ArraySchema),
/// A `map` Avro schema.
/// `Map` holds a pointer to the `Schema` of its values, which must all be the same schema.
/// `Map` keys are assumed to be `string`.
Map(Box<Schema>),
Map(MapSchema),
/// A `union` Avro schema.
Union(UnionSchema),
/// A `record` Avro schema.
Expand Down Expand Up @@ -159,6 +159,18 @@ pub enum Schema {
Ref { name: Name },
}

#[derive(Clone, Debug, PartialEq)]
pub struct MapSchema {
pub types: Box<Schema>,
pub custom_attributes: BTreeMap<String, Value>,
}

#[derive(Clone, Debug, PartialEq)]
pub struct ArraySchema {
pub items: Box<Schema>,
pub custom_attributes: BTreeMap<String, Value>,
}

impl PartialEq for Schema {
/// Assess equality of two `Schema` based on [Parsing Canonical Form].
///
Expand Down Expand Up @@ -495,8 +507,11 @@ impl<'s> ResolvedSchema<'s> {
) -> AvroResult<()> {
for schema in schemata {
match schema {
Schema::Array(schema) | Schema::Map(schema) => {
self.resolve(vec![schema], enclosing_namespace, known_schemata)?
Schema::Array(schema) => {
self.resolve(vec![&schema.items], enclosing_namespace, known_schemata)?
}
Schema::Map(schema) => {
self.resolve(vec![&schema.types], enclosing_namespace, known_schemata)?
}
Schema::Union(UnionSchema { schemas, .. }) => {
for schema in schemas {
Expand Down Expand Up @@ -581,9 +596,8 @@ impl ResolvedOwnedSchema {
enclosing_namespace: &Namespace,
) -> AvroResult<()> {
match schema {
Schema::Array(schema) | Schema::Map(schema) => {
Self::from_internal(schema, names, enclosing_namespace)
}
Schema::Array(schema) => Self::from_internal(&schema.items, names, enclosing_namespace),
Schema::Map(schema) => Self::from_internal(&schema.types, names, enclosing_namespace),
Schema::Union(UnionSchema { schemas, .. }) => {
for schema in schemas {
Self::from_internal(schema, names, enclosing_namespace)?
Expand Down Expand Up @@ -1160,6 +1174,41 @@ impl Schema {
_ => None,
}
}

/// Returns a Schema::Map with the given types.
pub fn map(types: Schema) -> Self {
Schema::Map(MapSchema {
types: Box::new(types),
custom_attributes: Default::default(),
})
}

/// Returns a Schema::Map with the given types and custom attributes.
pub fn map_with_attributes(types: Schema, custom_attributes: BTreeMap<String, Value>) -> Self {
Schema::Map(MapSchema {
types: Box::new(types),
custom_attributes,
})
}

/// Returns a Schema::Array with the given items.
pub fn array(items: Schema) -> Self {
Schema::Array(ArraySchema {
items: Box::new(items),
custom_attributes: Default::default(),
})
}

/// Returns a Schema::Array with the given items and custom attributes.
pub fn array_with_attributes(
items: Schema,
custom_attributes: BTreeMap<String, Value>,
) -> Self {
Schema::Array(ArraySchema {
items: Box::new(items),
custom_attributes,
})
}
}

impl Parser {
Expand Down Expand Up @@ -1723,7 +1772,7 @@ impl Parser {
.get("items")
.ok_or(Error::GetArrayItemsField)
.and_then(|items| self.parse(items, enclosing_namespace))
.map(|schema| Schema::Array(Box::new(schema)))
.map(Schema::array)
}

/// Parse a `serde_json::Value` representing a Avro map type into a
Expand All @@ -1737,7 +1786,7 @@ impl Parser {
.get("values")
.ok_or(Error::GetMapValuesField)
.and_then(|items| self.parse(items, enclosing_namespace))
.map(|schema| Schema::Map(Box::new(schema)))
.map(Schema::map)
}

/// Parse a `serde_json::Value` representing a Avro union type into a
Expand Down Expand Up @@ -1847,15 +1896,21 @@ impl Serialize for Schema {
Schema::Bytes => serializer.serialize_str("bytes"),
Schema::String => serializer.serialize_str("string"),
Schema::Array(ref inner) => {
let mut map = serializer.serialize_map(Some(2))?;
let mut map = serializer.serialize_map(Some(2 + inner.custom_attributes.len()))?;
map.serialize_entry("type", "array")?;
map.serialize_entry("items", &*inner.clone())?;
map.serialize_entry("items", &*inner.items.clone())?;
for attr in &inner.custom_attributes {
map.serialize_entry(attr.0, attr.1)?;
}
map.end()
}
Schema::Map(ref inner) => {
let mut map = serializer.serialize_map(Some(2))?;
let mut map = serializer.serialize_map(Some(2 + inner.custom_attributes.len()))?;
map.serialize_entry("type", "map")?;
map.serialize_entry("values", &*inner.clone())?;
map.serialize_entry("values", &*inner.types.clone())?;
for attr in &inner.custom_attributes {
map.serialize_entry(attr.0, attr.1)?;
}
map.end()
}
Schema::Union(ref inner) => {
Expand Down Expand Up @@ -2270,10 +2325,7 @@ pub mod derive {
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Array(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
}
}

Expand Down Expand Up @@ -2305,10 +2357,7 @@ pub mod derive {
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Map(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
}
}

Expand All @@ -2320,10 +2369,7 @@ pub mod derive {
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Map(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
}
}

Expand Down Expand Up @@ -2387,14 +2433,14 @@ mod tests {
#[test]
fn test_array_schema() -> TestResult {
let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#)?;
assert_eq!(Schema::Array(Box::new(Schema::String)), schema);
assert_eq!(Schema::array(Schema::String), schema);
Ok(())
}

#[test]
fn test_map_schema() -> TestResult {
let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#)?;
assert_eq!(Schema::Map(Box::new(Schema::Double)), schema);
assert_eq!(Schema::map(Schema::Double), schema);
Ok(())
}

Expand Down Expand Up @@ -2748,9 +2794,9 @@ mod tests {
doc: None,
default: None,
aliases: None,
schema: Schema::Array(Box::new(Schema::Ref {
schema: Schema::array(Schema::Ref {
name: Name::new("Node")?,
})),
}),
order: RecordFieldOrder::Ascending,
position: 1,
custom_attributes: Default::default(),
Expand Down Expand Up @@ -4442,7 +4488,7 @@ mod tests {
assert_eq!(union.schemas[0], Schema::Null);

if let Schema::Array(ref array_schema) = union.schemas[1] {
if let Schema::Long = **array_schema {
if let Schema::Long = *array_schema.items {
// OK
} else {
panic!("Expected a Schema::Array of type Long");
Expand Down Expand Up @@ -6529,4 +6575,40 @@ mod tests {

Ok(())
}

#[test]
fn test_avro_3927_serialize_array_with_custom_attributes() -> TestResult {
let expected = Schema::array_with_attributes(
Schema::Long,
BTreeMap::from([("field-id".to_string(), "1".into())]),
);

let value = serde_json::to_value(&expected)?;
let serialized = serde_json::to_string(&value)?;
assert_eq!(
r#"{"field-id":"1","items":"long","type":"array"}"#,
&serialized
);
assert_eq!(expected, Schema::parse_str(&serialized)?);

Ok(())
}

#[test]
fn test_avro_3927_serialize_map_with_custom_attributes() -> TestResult {
let expected = Schema::map_with_attributes(
Schema::Long,
BTreeMap::from([("field-id".to_string(), "1".into())]),
);

let value = serde_json::to_value(&expected)?;
let serialized = serde_json::to_string(&value)?;
assert_eq!(
r#"{"field-id":"1","type":"map","values":"long"}"#,
&serialized
);
assert_eq!(expected, Schema::parse_str(&serialized)?);

Ok(())
}
}
Loading

0 comments on commit c1d5b97

Please sign in to comment.