Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3479: [rust] Avro Schema Derive Proc Macro #1631

Merged
merged 34 commits into from
Apr 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d55efdf
port crate
jklamer Mar 4, 2022
99bd6d7
namespace port
jklamer Mar 4, 2022
2d4e9e8
dev depends
jklamer Mar 4, 2022
5f7db8f
resolved against main
jklamer Mar 14, 2022
019ff71
Cons list tests
jklamer Mar 6, 2022
490195c
rebased onto master resolution
jklamer Mar 14, 2022
15edb59
namespace attribute in derive
jklamer Mar 30, 2022
dabd3f2
std pointers
jklamer Mar 30, 2022
1cf8d10
References, testing, and refactoring
jklamer Apr 2, 2022
b98512b
[AVRO-3479] Clean up for PR
jklamer Apr 2, 2022
a81b358
AVRO-3479: Add missing ASL2 headers
martin-g Apr 8, 2022
05b1286
AVRO-3479: Minor improvements
martin-g Apr 8, 2022
5a43cd2
Schema assertions and PR comments
jklamer Apr 9, 2022
52b1c42
test failure fixing
jklamer Apr 9, 2022
39ac767
add readme
jklamer Apr 10, 2022
113f31f
README + implementation guide + bug fix with enclosing namespaces
jklamer Apr 10, 2022
b82e00d
AVRO-3479: Minor improvements
martin-g Apr 11, 2022
892b249
AVRO-3479: Fix typos
martin-g Apr 11, 2022
ff75150
AVRO-3479: Use darling crate to parse derive attributes
martin-g Apr 11, 2022
47ee2d1
darling for NamedTypes and fields
jklamer Apr 12, 2022
9f7d9a6
AVRO-3479 pr review naming
jklamer Apr 12, 2022
ed4d649
AVRO-3479 doc comment doc and small tests
jklamer Apr 15, 2022
8da964a
AVRO-3479 featurize
jklamer Apr 15, 2022
78db295
AVRO-3479 cargo engineering
jklamer Apr 15, 2022
cc9fe3c
Fix a docu warning:
martin-g Apr 15, 2022
c553bbd
AVRO-3479: Rename avro_derive to apache-avro-derive
martin-g Apr 15, 2022
4be7583
AVRO-3479: Use fqn for Mutex
martin-g Apr 15, 2022
a10e19a
AVRO-3479: Update darling to 0.14.0
martin-g Apr 15, 2022
632aabf
AVRO-3479: Fix the version of apache-avro-derive
martin-g Apr 15, 2022
bfb43da
AVRO-3479: Minor cleanups
martin-g Apr 15, 2022
984f0fa
AVRO-3479: Inline a pub function that is used only in avro_derive
martin-g Apr 15, 2022
65d0a04
AVRO-3479: Derive Schema::Long for u32
martin-g Apr 16, 2022
7574a05
Merge branch 'master' into jklamer/avro-derive
martin-g Apr 16, 2022
c59e961
AVRO-3479: Bump dependencies to their latest versions
martin-g Apr 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lang/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
[workspace]
members = [
"avro",
"avro_derive"
]
6 changes: 4 additions & 2 deletions lang/rust/avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ snappy = ["crc32fast", "snap"]
zstandard = ["zstd"]
bzip = ["bzip2"]
xz = ["xz2"]
derive = ["apache-avro-derive" ]

[lib]
path = "src/lib.rs"
Expand All @@ -56,7 +57,7 @@ byteorder = "1.4.3"
bzip2 = { version = "0.4.3", optional = true }
crc32fast = { version = "1.3.2", optional = true }
digest = "0.10.3"
libflate = "1.1.2"
libflate = "1.2.0"
xz2 = { version = "0.1.6", optional = true }
num-bigint = "0.4.3"
rand = "0.8.5"
Expand All @@ -72,7 +73,8 @@ uuid = { version = "0.8.2", features = ["serde", "v4"] }
zerocopy = "0.6.1"
lazy_static = "1.4.0"
log = "0.4.16"
zstd = { version = "0.11.0+zstd.1.5.2", optional = true }
zstd = { version = "0.11.1+zstd.1.5.2", optional = true }
apache-avro-derive = { version= "0.14.0", path = "../avro_derive", optional = true }

[dev-dependencies]
md-5 = "0.10.1"
Expand Down
231 changes: 215 additions & 16 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl Name {
/// Name::new("some_namespace.some_name").unwrap()
/// );
/// ```
pub(crate) fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
pub fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
Name {
name: self.name.clone(),
namespace: self
Expand Down Expand Up @@ -1006,7 +1006,8 @@ impl Parser {
schema: &Schema,
aliases: &Aliases,
) {
// FIXME, this should be globally aware, so if there is something overwriting something else then there is an ambiguois schema definition. An apropriate error should be thrown
// FIXME, this should be globally aware, so if there is something overwriting something
// else then there is an ambiguous schema definition. An appropriate error should be thrown
self.parsed_schemas
.insert(fully_qualified_name.clone(), schema.clone());
self.resolving_schemas.remove(fully_qualified_name);
Expand Down Expand Up @@ -1526,6 +1527,204 @@ fn field_ordering_position(field: &str) -> Option<usize> {
.map(|pos| pos + 1)
}

/// Trait for types that serve as an Avro data model. Derive implementation available
/// through `derive` feature. Do not implement directly, implement [`derive::AvroSchemaComponent`]
/// to get this trait through a blanket implementation.
pub trait AvroSchema {
fn get_schema() -> Schema;
}

#[cfg(feature = "derive")]
pub mod derive {
use super::*;

/// Trait for types that serve as fully defined components inside an Avro data model. Derive
/// implementation available through `derive` feature. This is what is implemented by
/// the `derive(AvroSchema)` macro.
///
/// # Implementation guide
///
///### Simple implementation
/// To construct a non named simple schema, it is possible to ignore the input argument making the
/// general form implementation look like
/// ```ignore
/// impl AvroSchemaComponent for AType {
/// fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
/// Schema::?
/// }
///}
/// ```
/// ### Passthrough implementation
/// To construct a schema for a Type that acts as in "inner" type, such as for smart pointers, simply
/// pass through the arguments to the inner type
/// ```ignore
/// impl AvroSchemaComponent for PassthroughType {
/// fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
/// InnerType::get_schema_in_ctxt(names, enclosing_namespace)
/// }
///}
/// ```
///### Complex implementation
/// To implement this for Named schema there is a general form needed to avoid creating invalid
/// schemas or infinite loops.
/// ```ignore
/// impl AvroSchemaComponent for ComplexType {
/// fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema {
/// // Create the fully qualified name for your type given the enclosing namespace
/// let name = apache_avro::schema::Name::new("MyName")
/// .expect("Unable to parse schema name")
/// .fully_qualified_name(enclosing_namespace);
/// let enclosing_namespace = &name.namespace;
/// // Check, if your name is already defined, and if so, return a ref to that name
/// if named_schemas.contains_key(&name) {
/// apache_avro::schema::Schema::Ref{name: name.clone()}
/// } else {
/// named_schemas.insert(name.clone(), apache_avro::schema::Schema::Ref{name: name.clone()});
/// // YOUR SCHEMA DEFINITION HERE with the name equivalent to "MyName".
/// // For non-simple sub types delegate to their implementation of AvroSchemaComponent
/// }
/// }
///}
/// ```
pub trait AvroSchemaComponent {
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace)
-> Schema;
}

impl<T> AvroSchema for T
where
T: AvroSchemaComponent,
{
fn get_schema() -> Schema {
T::get_schema_in_ctxt(&mut HashMap::default(), &Option::None)
}
}

macro_rules! impl_schema(
($type:ty, $variant_constructor:expr) => (
impl AvroSchemaComponent for $type {
fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
$variant_constructor
}
}
);
);

impl_schema!(i8, Schema::Int);
impl_schema!(i16, Schema::Int);
impl_schema!(i32, Schema::Int);
impl_schema!(i64, Schema::Long);
impl_schema!(u8, Schema::Int);
impl_schema!(u16, Schema::Int);
impl_schema!(u32, Schema::Long);
impl_schema!(f32, Schema::Float);
impl_schema!(f64, Schema::Double);
impl_schema!(String, Schema::String);
impl_schema!(uuid::Uuid, Schema::Uuid);
impl_schema!(core::time::Duration, Schema::Duration);

impl<T> AvroSchemaComponent for Vec<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Array(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
}
}

impl<T> AvroSchemaComponent for Option<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
let inner_schema = T::get_schema_in_ctxt(named_schemas, enclosing_namespace);
Schema::Union(UnionSchema {
schemas: vec![Schema::Null, inner_schema.clone()],
variant_index: vec![Schema::Null, inner_schema]
.iter()
.enumerate()
.map(|(idx, s)| (SchemaKind::from(s), idx))
.collect(),
})
}
}

impl<T> AvroSchemaComponent for Map<String, T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Map(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
}
}

impl<T> AvroSchemaComponent for HashMap<String, T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Map(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
}
}

impl<T> AvroSchemaComponent for Box<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}

impl<T> AvroSchemaComponent for std::sync::Mutex<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}

impl<T> AvroSchemaComponent for Cow<'_, T>
where
T: AvroSchemaComponent + Clone,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -2589,7 +2788,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_record_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2628,7 +2827,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_record_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2662,7 +2861,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_enum_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2696,7 +2895,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_enum_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2730,7 +2929,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_fixed_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2764,7 +2963,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_fixed_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2804,7 +3003,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_record_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2839,7 +3038,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_enum_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2874,7 +3073,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_fixed_name"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -2925,7 +3124,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 3);
assert_eq!(rs.get_names().len(), 3);
for s in &[
"space.record_name",
"space.middle_record_name",
Expand Down Expand Up @@ -2981,7 +3180,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 3);
assert_eq!(rs.get_names().len(), 3);
for s in &[
"space.record_name",
"middle_namespace.middle_record_name",
Expand Down Expand Up @@ -3038,7 +3237,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 3);
assert_eq!(rs.get_names().len(), 3);
for s in &[
"space.record_name",
"middle_namespace.middle_record_name",
Expand Down Expand Up @@ -3081,7 +3280,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.in_array_record"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down Expand Up @@ -3120,7 +3319,7 @@ mod tests {
"#;
let schema = Schema::parse_str(schema).unwrap();
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert!(rs.get_names().len() == 2);
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.in_map_record"] {
assert!(rs.get_names().contains_key(&Name::new(s).unwrap()));
}
Expand Down
1 change: 1 addition & 0 deletions lang/rust/avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl Value {
(&Value::Int(_), &Schema::Int) => None,
(&Value::Int(_), &Schema::Date) => None,
(&Value::Int(_), &Schema::TimeMillis) => None,
(&Value::Int(_), &Schema::Long) => None,
(&Value::Long(_), &Schema::Long) => None,
(&Value::Long(_), &Schema::TimeMicros) => None,
(&Value::Long(_), &Schema::TimestampMillis) => None,
Expand Down
Loading