Skip to content

Commit

Permalink
AVRO-3479: [rust] Avro Schema Derive Proc Macro (#1631)
Browse files Browse the repository at this point in the history
* port crate

* namespace port

* dev depends

* resolved against main

* Cons list tests

* rebased onto master resolution

* namespace attribute in derive

* std pointers

* References, testing, and refactoring

* [AVRO-3479] Clean up for PR

* AVRO-3479: Add missing ASL2 headers

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Minor improvements

Add TODOs

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* Schema assertions and PR comments

* test failure fixing

* add readme

* README + implementation guide + bug fix with enclosing namespaces

* AVRO-3479: Minor improvements

Fix typos.
Format the code/doc.
Apply suggestions by the IDE to use assert_eq!() instead of assert!()

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Fix typos

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Use darling crate to parse derive attributes

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* darling for NamedTypes and fields

* AVRO-3479 pr review naming

* AVRO-3479 doc comment doc and small tests

* AVRO-3479 featurize

* AVRO-3479 cargo engineering

* Fix a docu warning:

warning: unresolved link to `AvroSchemaComponent`
    --> avro/src/schema.rs:1524:70
     |
1524 | /// through `derive` feature. Do not implement directly, implement [`AvroSchemaComponent`]
     |                                                                      ^^^^^^^^^^^^^^^^^^^ no item named `AvroSchemaComponent` in scope
     |
     = note: `#[warn(rustdoc::broken_intra_doc_links)]` on by default
     = help: to escape `[` and `]` characters, add '\' before them like `\[` or `\]`

warning: `apache-avro` (lib doc) generated 1 warning
    Finished dev [unoptimized + debuginfo] target(s) in 10.13s

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Rename avro_derive to apache-avro-derive

For consistency.
Add Cargo.toml metadata

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Use fqn for Mutex

For some reason Rustdoc build sometimes (not always!) complain that the
import of std::sync::Mutex is not used ...

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Update darling to 0.14.0

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Fix the version of apache-avro-derive

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Minor cleanups

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Inline a pub function that is used only in avro_derive

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Derive Schema::Long for u32

Validate successfully Value::Int into Schema::Long

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3479: Bump dependencies to their latest versions

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: Martin Grigorov <[email protected]>
(cherry picked from commit cbc4372)
  • Loading branch information
jklamer authored and martin-g committed Apr 16, 2022
1 parent 72ccbf8 commit 2602398
Show file tree
Hide file tree
Showing 8 changed files with 1,852 additions and 18 deletions.
1 change: 1 addition & 0 deletions lang/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
[workspace]
members = [
"avro",
"avro_derive"
]
6 changes: 4 additions & 2 deletions lang/rust/avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ snappy = ["crc32fast", "snap"]
zstandard = ["zstd"]
bzip = ["bzip2"]
xz = ["xz2"]
derive = ["apache-avro-derive" ]

[lib]
path = "src/lib.rs"
Expand All @@ -56,7 +57,7 @@ byteorder = "1.4.3"
bzip2 = { version = "0.4.3", optional = true }
crc32fast = { version = "1.3.2", optional = true }
digest = "0.10.3"
libflate = "1.1.2"
libflate = "1.2.0"
xz2 = { version = "0.1.6", optional = true }
num-bigint = "0.4.3"
rand = "0.8.5"
Expand All @@ -72,7 +73,8 @@ uuid = { version = "0.8.2", features = ["serde", "v4"] }
zerocopy = "0.6.1"
lazy_static = "1.4.0"
log = "0.4.16"
zstd = { version = "0.11.0+zstd.1.5.2", optional = true }
zstd = { version = "0.11.1+zstd.1.5.2", optional = true }
apache-avro-derive = { version= "0.14.0", path = "../avro_derive", optional = true }

[dev-dependencies]
md-5 = "0.10.1"
Expand Down
231 changes: 215 additions & 16 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl Name {
/// Name::new("some_namespace.some_name").unwrap()
/// );
/// ```
pub(crate) fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
pub fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
Name {
name: self.name.clone(),
namespace: self
Expand Down Expand Up @@ -1006,7 +1006,8 @@ impl Parser {
schema: &Schema,
aliases: &Aliases,
) {
// FIXME, this should be globally aware, so if there is something overwriting something else then there is an ambiguois schema definition. An apropriate error should be thrown
// FIXME, this should be globally aware, so if there is something overwriting something
// else then there is an ambiguous schema definition. An appropriate error should be thrown
self.parsed_schemas
.insert(fully_qualified_name.clone(), schema.clone());
self.resolving_schemas.remove(fully_qualified_name);
Expand Down Expand Up @@ -1526,6 +1527,204 @@ fn field_ordering_position(field: &str) -> Option<usize> {
.map(|pos| pos + 1)
}

/// Trait for types that serve as an Avro data model. Derive implementation available
/// through `derive` feature. Do not implement directly, implement [`derive::AvroSchemaComponent`]
/// to get this trait through a blanket implementation.
pub trait AvroSchema {
fn get_schema() -> Schema;
}

#[cfg(feature = "derive")]
pub mod derive {
use super::*;

/// Trait for types that serve as fully defined components inside an Avro data model. Derive
/// implementation available through `derive` feature. This is what is implemented by
/// the `derive(AvroSchema)` macro.
///
/// # Implementation guide
///
///### Simple implementation
/// To construct a non named simple schema, it is possible to ignore the input argument making the
/// general form implementation look like
/// ```ignore
/// impl AvroSchemaComponent for AType {
/// fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
/// Schema::?
/// }
///}
/// ```
/// ### Passthrough implementation
/// To construct a schema for a Type that acts as in "inner" type, such as for smart pointers, simply
/// pass through the arguments to the inner type
/// ```ignore
/// impl AvroSchemaComponent for PassthroughType {
/// fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
/// InnerType::get_schema_in_ctxt(names, enclosing_namespace)
/// }
///}
/// ```
///### Complex implementation
/// To implement this for Named schema there is a general form needed to avoid creating invalid
/// schemas or infinite loops.
/// ```ignore
/// impl AvroSchemaComponent for ComplexType {
/// fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
/// // Create the fully qualified name for your type given the enclosing namespace
/// let name = apache_avro::schema::Name::new("MyName")
/// .expect("Unable to parse schema name")
/// .fully_qualified_name(enclosing_namespace);
/// let enclosing_namespace = &name.namespace;
/// // Check, if your name is already defined, and if so, return a ref to that name
/// if named_schemas.contains_key(&name) {
/// apache_avro::schema::Schema::Ref{name: name.clone()}
/// } else {
/// named_schemas.insert(name.clone(), apache_avro::schema::Schema::Ref{name: name.clone()});
/// // YOUR SCHEMA DEFINITION HERE with the name equivalent to "MyName".
/// // For non-simple sub types delegate to their implementation of AvroSchemaComponent
/// }
/// }
///}
/// ```
pub trait AvroSchemaComponent {
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace)
-> Schema;
}

impl<T> AvroSchema for T
where
T: AvroSchemaComponent,
{
fn get_schema() -> Schema {
T::get_schema_in_ctxt(&mut HashMap::default(), &Option::None)
}
}

macro_rules! impl_schema(
($type:ty, $variant_constructor:expr) => (
impl AvroSchemaComponent for $type {
fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
$variant_constructor
}
}
);
);

impl_schema!(i8, Schema::Int);
impl_schema!(i16, Schema::Int);
impl_schema!(i32, Schema::Int);
impl_schema!(i64, Schema::Long);
impl_schema!(u8, Schema::Int);
impl_schema!(u16, Schema::Int);
impl_schema!(u32, Schema::Long);
impl_schema!(f32, Schema::Float);
impl_schema!(f64, Schema::Double);
impl_schema!(String, Schema::String);
impl_schema!(uuid::Uuid, Schema::Uuid);
impl_schema!(core::time::Duration, Schema::Duration);

impl<T> AvroSchemaComponent for Vec<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Array(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
}
}

impl<T> AvroSchemaComponent for Option<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
let inner_schema = T::get_schema_in_ctxt(named_schemas, enclosing_namespace);
Schema::Union(UnionSchema {
schemas: vec![Schema::Null, inner_schema.clone()],
variant_index: vec![Schema::Null, inner_schema]
.iter()
.enumerate()
.map(|(idx, s)| (SchemaKind::from(s), idx))
.collect(),
})
}
}

impl<T> AvroSchemaComponent for Map<String, T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Map(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
}
}

impl<T> AvroSchemaComponent for HashMap<String, T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Map(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
}
}

impl<T> AvroSchemaComponent for Box<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}

impl<T> AvroSchemaComponent for std::sync::Mutex<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}

impl<T> AvroSchemaComponent for Cow<'_, T>
where
T: AvroSchemaComponent + Clone,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -2589,7 +2788,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_record_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2628,7 +2827,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_record_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2662,7 +2861,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_enum_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2696,7 +2895,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_enum_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2730,7 +2929,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_fixed_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2764,7 +2963,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_fixed_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2804,7 +3003,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_record_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2839,7 +3038,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_enum_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2874,7 +3073,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_fixed_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2925,7 +3124,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 3);
assert_eq!(rs.get_names().len(), 3);
for s in &[
"space.record_name",
"space.middle_record_name",
Expand Down Expand Up @@ -2981,7 +3180,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 3);
assert_eq!(rs.get_names().len(), 3);
for s in &[
"space.record_name",
"middle_namespace.middle_record_name",
Expand Down Expand Up @@ -3038,7 +3237,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 3);
assert_eq!(rs.get_names().len(), 3);
for s in &[
"space.record_name",
"middle_namespace.middle_record_name",
Expand Down Expand Up @@ -3081,7 +3280,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.in_array_record"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -3120,7 +3319,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.in_map_record"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down
1 change: 1 addition & 0 deletions lang/rust/avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl Value {
(&Value::Int(_), &Schema::Int) => None,
(&Value::Int(_), &Schema::Date) => None,
(&Value::Int(_), &Schema::TimeMillis) => None,
(&Value::Int(_), &Schema::Long) => None,
(&Value::Long(_), &Schema::Long) => None,
(&Value::Long(_), &Schema::TimeMicros) => None,
(&Value::Long(_), &Schema::TimestampMillis) => None,
Expand Down
Loading

0 comments on commit 2602398

Please sign in to comment.