From f3b6ee2d32ae5200675e345b4d26b151caf3034b Mon Sep 17 00:00:00 2001 From: Michael Spector Date: Wed, 3 Jul 2024 17:19:27 +0300 Subject: [PATCH] AVRO-4010: [Rust] Avoid re-resolving schema on every read() (#2995) Co-authored-by: Michael Spector --- lang/rust/avro/src/reader.rs | 26 ++++++--- lang/rust/avro/src/schema.rs | 101 +++++++++++++++++++---------------- 2 files changed, 75 insertions(+), 52 deletions(-) diff --git a/lang/rust/avro/src/reader.rs b/lang/rust/avro/src/reader.rs index adefed2030d..121f8e25758 100644 --- a/lang/rust/avro/src/reader.rs +++ b/lang/rust/avro/src/reader.rs @@ -20,7 +20,10 @@ use crate::{ decode::{decode, decode_internal}, from_value, rabin::Rabin, - schema::{AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema}, + schema::{ + resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema, + ResolvedSchema, Schema, + }, types::Value, util, AvroResult, Codec, Error, }; @@ -47,6 +50,7 @@ struct Block<'r, R> { writer_schema: Schema, schemata: Vec<&'r Schema>, user_metadata: HashMap>, + names_refs: Names, } impl<'r, R: Read> Block<'r, R> { @@ -61,6 +65,7 @@ impl<'r, R: Read> Block<'r, R> { message_count: 0, marker: [0; 16], user_metadata: Default::default(), + names_refs: Default::default(), }; block.read_header()?; @@ -179,13 +184,18 @@ impl<'r, R: Read> Block<'r, R> { let mut block_bytes = &self.buf[self.buf_idx..]; let b_original = block_bytes.len(); - let schemata = if self.schemata.is_empty() { - vec![&self.writer_schema] - } else { - self.schemata.clone() + + let item = decode_internal( + &self.writer_schema, + &self.names_refs, + &None, + &mut block_bytes, + )?; + let item = match read_schema { + Some(schema) => item.resolve(schema)?, + None => item, }; - let item = - from_avro_datum_schemata(&self.writer_schema, schemata, &mut block_bytes, read_schema)?; + if b_original == block_bytes.len() { // from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop return Err(Error::ReadBlock); @@ -214,8 +224,10 @@ impl<'r, R: Read> Block<'r, R> { .map(|(name, schema)| (name.clone(), (*schema).clone())) .collect(); self.writer_schema = Schema::parse_with_names(&json, names)?; + resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?; } else { self.writer_schema = Schema::parse(&json)?; + resolve_names(&self.writer_schema, &mut self.names_refs, &None)?; } Ok(()) } diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index b8a64dffd4f..f58892ca09d 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -545,7 +545,7 @@ impl TryFrom for ResolvedOwnedSchema { names, root_schema: schema, }; - Self::from_internal(&rs.root_schema, &mut rs.names, &None)?; + resolve_names(&rs.root_schema, &mut rs.names, &None)?; Ok(rs) } } @@ -557,57 +557,68 @@ impl ResolvedOwnedSchema { pub(crate) fn get_names(&self) -> &Names { &self.names } +} - fn from_internal( - schema: &Schema, - names: &mut Names, - enclosing_namespace: &Namespace, - ) -> AvroResult<()> { - match schema { - Schema::Array(schema) => Self::from_internal(&schema.items, names, enclosing_namespace), - Schema::Map(schema) => Self::from_internal(&schema.types, names, enclosing_namespace), - Schema::Union(UnionSchema { schemas, .. }) => { - for schema in schemas { - Self::from_internal(schema, names, enclosing_namespace)? - } - Ok(()) +pub(crate) fn resolve_names( + schema: &Schema, + names: &mut Names, + enclosing_namespace: &Namespace, +) -> AvroResult<()> { + match schema { + Schema::Array(schema) => resolve_names(&schema.items, names, enclosing_namespace), + Schema::Map(schema) => resolve_names(&schema.types, names, enclosing_namespace), + Schema::Union(UnionSchema { schemas, .. }) => { + for schema in schemas { + resolve_names(schema, names, enclosing_namespace)? } - Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => { - let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); - if names - .insert(fully_qualified_name.clone(), schema.clone()) - .is_some() - { - Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) - } else { - Ok(()) - } + Ok(()) + } + Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => { + let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); + if names + .insert(fully_qualified_name.clone(), schema.clone()) + .is_some() + { + Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) + } else { + Ok(()) } - Schema::Record(RecordSchema { name, fields, .. }) => { - let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); - if names - .insert(fully_qualified_name.clone(), schema.clone()) - .is_some() - { - Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) - } else { - let record_namespace = fully_qualified_name.namespace; - for field in fields { - Self::from_internal(&field.schema, names, &record_namespace)? - } - Ok(()) + } + Schema::Record(RecordSchema { name, fields, .. }) => { + let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); + if names + .insert(fully_qualified_name.clone(), schema.clone()) + .is_some() + { + Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) + } else { + let record_namespace = fully_qualified_name.namespace; + for field in fields { + resolve_names(&field.schema, names, &record_namespace)? } + Ok(()) } - Schema::Ref { name } => { - let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); - names - .get(&fully_qualified_name) - .map(|_| ()) - .ok_or(Error::SchemaResolutionError(fully_qualified_name)) - } - _ => Ok(()), } + Schema::Ref { name } => { + let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); + names + .get(&fully_qualified_name) + .map(|_| ()) + .ok_or(Error::SchemaResolutionError(fully_qualified_name)) + } + _ => Ok(()), + } +} + +pub(crate) fn resolve_names_with_schemata( + schemata: &Vec<&Schema>, + names: &mut Names, + enclosing_namespace: &Namespace, +) -> AvroResult<()> { + for schema in schemata { + resolve_names(schema, names, enclosing_namespace)?; } + Ok(()) } /// Represents a `field` in a `record` Avro schema.