diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index 171f7d40ce4..e46a9975c3a 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -503,7 +503,7 @@ pub enum CompatibilityError { #[error("Incompatible schema types! The {schema_type} should have been {expected_type:?}")] TypeExpected { schema_type: String, - expected_type: &'static [SchemaKind], + expected_type: Vec, }, #[error("Incompatible schemata! Field '{0}' in reader schema does not match the type in the writer schema")] diff --git a/lang/rust/avro/src/schema_compatibility.rs b/lang/rust/avro/src/schema_compatibility.rs index 09c302036e2..5ee7c060beb 100644 --- a/lang/rust/avro/src/schema_compatibility.rs +++ b/lang/rust/avro/src/schema_compatibility.rs @@ -62,7 +62,19 @@ impl Checker { let w_type = SchemaKind::from(writers_schema); let r_type = SchemaKind::from(readers_schema); - if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == SchemaKind::Fixed) { + if w_type != SchemaKind::Union + && (r_type.is_primitive() + || r_type == SchemaKind::Fixed + || r_type == SchemaKind::Date + || r_type == SchemaKind::TimeMillis + || r_type == SchemaKind::TimeMicros + || r_type == SchemaKind::TimestampMillis + || r_type == SchemaKind::TimestampMicros + || r_type == SchemaKind::TimestampNanos + || r_type == SchemaKind::LocalTimestampMillis + || r_type == SchemaKind::LocalTimestampMicros + || r_type == SchemaKind::LocalTimestampNanos) + { return Ok(()); } @@ -80,7 +92,7 @@ impl Checker { } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Record], + expected_type: vec![SchemaKind::Record], }) } } @@ -96,7 +108,7 @@ impl Checker { } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Array], + expected_type: vec![SchemaKind::Array], }) } } @@ -143,7 +155,7 @@ impl Checker { if w_type == SchemaKind::Union { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Record], + expected_type: vec![SchemaKind::Record], }); } @@ -278,6 +290,53 @@ impl SchemaCompatibility { writers_schema: &Schema, readers_schema: &Schema, ) -> Result<(), CompatibilityError> { + fn check_reader_type_multi( + reader_type: SchemaKind, + allowed_reader_types: Vec, + writer_type: SchemaKind, + ) -> Result<(), CompatibilityError> { + if allowed_reader_types.iter().any(|&t| t == reader_type) { + Ok(()) + } else { + let mut allowed_types: Vec = vec![writer_type]; + allowed_types.extend_from_slice(allowed_reader_types.as_slice()); + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: allowed_types, + }) + } + } + + fn check_reader_type( + reader_type: SchemaKind, + allowed_reader_type: SchemaKind, + writer_type: SchemaKind, + ) -> Result<(), CompatibilityError> { + if reader_type == allowed_reader_type { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: vec![writer_type, allowed_reader_type], + }) + } + } + + fn check_writer_type( + writers_schema: &Schema, + allowed_schema: Schema, + expected_schema_types: Vec, + ) -> Result<(), CompatibilityError> { + if allowed_schema == *writers_schema { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: expected_schema_types, + }) + } + } + let w_type = SchemaKind::from(writers_schema); let r_type = SchemaKind::from(readers_schema); @@ -305,13 +364,13 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Record], + expected_type: vec![SchemaKind::Record], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Record], + expected_type: vec![SchemaKind::Record], }); } } @@ -338,7 +397,7 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Fixed], + expected_type: vec![SchemaKind::Fixed], }); } } @@ -357,13 +416,13 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Enum], + expected_type: vec![SchemaKind::Enum], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Enum], + expected_type: vec![SchemaKind::Enum], }); } } @@ -374,13 +433,13 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Map], + expected_type: vec![SchemaKind::Map], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Map], + expected_type: vec![SchemaKind::Map], }); } } @@ -391,13 +450,86 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Array], + expected_type: vec![SchemaKind::Array], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Array], + expected_type: vec![SchemaKind::Array], + }); + } + } + SchemaKind::Date => { + return check_writer_type( + writers_schema, + Schema::Date, + vec![SchemaKind::Date, SchemaKind::Int], + ); + } + SchemaKind::TimeMillis => { + return check_writer_type( + writers_schema, + Schema::TimeMillis, + vec![SchemaKind::Date, SchemaKind::Int], + ); + } + SchemaKind::TimeMicros => { + return check_writer_type( + writers_schema, + Schema::TimeMicros, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); + } + SchemaKind::TimestampNanos => { + return check_writer_type( + writers_schema, + Schema::TimestampNanos, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); + } + SchemaKind::TimestampMillis => { + return check_writer_type( + writers_schema, + Schema::TimestampMillis, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); + } + SchemaKind::TimestampMicros => { + return check_writer_type( + writers_schema, + Schema::TimestampMicros, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); + } + SchemaKind::LocalTimestampMillis => { + return check_writer_type( + writers_schema, + Schema::LocalTimestampMillis, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); + } + SchemaKind::LocalTimestampMicros => { + return check_writer_type( + writers_schema, + Schema::LocalTimestampMicros, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); + } + SchemaKind::LocalTimestampNanos => { + return check_writer_type( + writers_schema, + Schema::TimeMicros, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); + } + SchemaKind::Duration => { + if let Schema::Duration = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: vec![SchemaKind::Duration, SchemaKind::Fixed], }); } } @@ -411,64 +543,48 @@ impl SchemaCompatibility { // Here are the checks for primitive types match w_type { - SchemaKind::Int => { - if [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double] - .iter() - .any(|&t| t == r_type) - { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::Float, SchemaKind::Double], - }) - } - } - SchemaKind::Long => { - if [SchemaKind::Float, SchemaKind::Double] - .iter() - .any(|&t| t == r_type) - { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Float, SchemaKind::Double], - }) - } - } + SchemaKind::Int => check_reader_type_multi( + r_type, + vec![ + SchemaKind::Long, + SchemaKind::Float, + SchemaKind::Double, + SchemaKind::Date, + SchemaKind::TimeMillis, + ], + w_type, + ), + SchemaKind::Long => check_reader_type_multi( + r_type, + vec![ + SchemaKind::Float, + SchemaKind::Double, + SchemaKind::TimeMicros, + SchemaKind::TimestampMillis, + SchemaKind::TimestampMicros, + SchemaKind::TimestampNanos, + SchemaKind::LocalTimestampMillis, + SchemaKind::LocalTimestampMicros, + SchemaKind::LocalTimestampNanos, + ], + w_type, + ), SchemaKind::Float => { - if [SchemaKind::Float, SchemaKind::Double] - .iter() - .any(|&t| t == r_type) - { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Float, SchemaKind::Double], - }) - } + check_reader_type_multi(r_type, vec![SchemaKind::Float, SchemaKind::Double], w_type) } - SchemaKind::String => { - if r_type == SchemaKind::Bytes { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Bytes], - }) - } + SchemaKind::String => check_reader_type(r_type, SchemaKind::Bytes, w_type), + SchemaKind::Bytes => check_reader_type(r_type, SchemaKind::String, w_type), + SchemaKind::Date | SchemaKind::TimeMillis => { + check_reader_type(r_type, SchemaKind::Int, w_type) } - SchemaKind::Bytes => { - if r_type == SchemaKind::String { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::String], - }) - } + SchemaKind::TimeMicros + | SchemaKind::TimestampMicros + | SchemaKind::TimestampMillis + | SchemaKind::TimestampNanos + | SchemaKind::LocalTimestampMillis + | SchemaKind::LocalTimestampMicros + | SchemaKind::LocalTimestampNanos => { + check_reader_type(r_type, SchemaKind::Long, w_type) } _ => Err(CompatibilityError::Inconclusive(String::from( "writers_schema", @@ -659,6 +775,16 @@ mod tests { // bytes (Schema::Bytes, Schema::Null), (Schema::Bytes, Schema::Int), + // logical types + (Schema::TimeMicros, Schema::Int), + (Schema::TimestampMillis, Schema::Int), + (Schema::TimestampMicros, Schema::Int), + (Schema::TimestampNanos, Schema::Int), + (Schema::LocalTimestampMillis, Schema::Int), + (Schema::LocalTimestampMicros, Schema::Int), + (Schema::LocalTimestampNanos, Schema::Int), + (Schema::Date, Schema::Long), + (Schema::TimeMillis, Schema::Long), // array and maps (int_array_schema(), long_array_schema()), (int_map_schema(), int_array_schema()), @@ -698,6 +824,25 @@ mod tests { (Schema::Double, Schema::Float), (Schema::String, Schema::Bytes), (Schema::Bytes, Schema::String), + // logical types + (Schema::Date, Schema::Int), + (Schema::TimeMillis, Schema::Int), + (Schema::TimeMicros, Schema::Long), + (Schema::TimestampMillis, Schema::Long), + (Schema::TimestampMicros, Schema::Long), + (Schema::TimestampNanos, Schema::Long), + (Schema::LocalTimestampMillis, Schema::Long), + (Schema::LocalTimestampMicros, Schema::Long), + (Schema::LocalTimestampNanos, Schema::Long), + (Schema::Int, Schema::Date), + (Schema::Int, Schema::TimeMillis), + (Schema::Long, Schema::TimeMicros), + (Schema::Long, Schema::TimestampMillis), + (Schema::Long, Schema::TimestampMicros), + (Schema::Long, Schema::TimestampNanos), + (Schema::Long, Schema::LocalTimestampMillis), + (Schema::Long, Schema::LocalTimestampMicros), + (Schema::Long, Schema::LocalTimestampNanos), (int_array_schema(), int_array_schema()), (long_array_schema(), int_array_schema()), (int_map_schema(), int_map_schema()), @@ -859,7 +1004,14 @@ mod tests { assert_eq!( CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::Float, SchemaKind::Double], + expected_type: vec![ + SchemaKind::Int, + SchemaKind::Long, + SchemaKind::Float, + SchemaKind::Double, + SchemaKind::Date, + SchemaKind::TimeMillis + ], }, SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err() ); @@ -901,7 +1053,7 @@ mod tests { "field1".to_owned(), Box::new(CompatibilityError::TypeExpected { schema_type: "readers_schema".to_owned(), - expected_type: &[SchemaKind::Bytes] + expected_type: vec![SchemaKind::String, SchemaKind::Bytes] }) ), SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()