From 1d61ff3a589f3d4bf4bc104633af0352f2459d19 Mon Sep 17 00:00:00 2001 From: Ralf Grubenmann Date: Fri, 9 Feb 2024 16:31:56 +0100 Subject: [PATCH 1/2] add support for logical types in rust avro --- lang/rust/avro/src/schema_compatibility.rs | 273 ++++++++++++++++++++- 1 file changed, 265 insertions(+), 8 deletions(-) diff --git a/lang/rust/avro/src/schema_compatibility.rs b/lang/rust/avro/src/schema_compatibility.rs index 09c302036e2..5dd45d17ab3 100644 --- a/lang/rust/avro/src/schema_compatibility.rs +++ b/lang/rust/avro/src/schema_compatibility.rs @@ -62,7 +62,19 @@ impl Checker { let w_type = SchemaKind::from(writers_schema); let r_type = SchemaKind::from(readers_schema); - if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == SchemaKind::Fixed) { + if w_type != SchemaKind::Union + && (r_type.is_primitive() + || r_type == SchemaKind::Fixed + || r_type == SchemaKind::TimeMicros + || r_type == SchemaKind::TimestampMillis + || r_type == SchemaKind::TimestampMicros + || r_type == SchemaKind::TimestampNanos + || r_type == SchemaKind::LocalTimestampMillis + || r_type == SchemaKind::LocalTimestampMicros + || r_type == SchemaKind::LocalTimestampNanos + || r_type == SchemaKind::Date + || r_type == SchemaKind::TimeMillis) + { return Ok(()); } @@ -401,6 +413,106 @@ impl SchemaCompatibility { }); } } + SchemaKind::TimeMillis => { + if let Schema::TimeMillis = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::TimeMillis, SchemaKind::Int], + }); + } + } + SchemaKind::TimeMicros => { + if let Schema::TimeMicros = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::TimeMicros, SchemaKind::Long], + }); + } + } + SchemaKind::TimestampNanos => { + if let Schema::TimestampNanos = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::TimestampNanos, SchemaKind::Long], + }); + } + } + SchemaKind::TimestampMillis => { + if let Schema::TimestampMillis = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::TimestampMillis, SchemaKind::Long], + }); + } + } + SchemaKind::TimestampMicros => { + if let Schema::TimestampMicros = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::TimestampMicros, SchemaKind::Long], + }); + } + } + SchemaKind::Date => { + if let Schema::Date = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::Date, SchemaKind::Int], + }); + } + } + SchemaKind::LocalTimestampMillis => { + if let Schema::LocalTimestampMillis = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::LocalTimestampMillis, SchemaKind::Long], + }); + } + } + SchemaKind::LocalTimestampMicros => { + if let Schema::LocalTimestampMicros = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::LocalTimestampMicros, SchemaKind::Long], + }); + } + } + SchemaKind::LocalTimestampNanos => { + if let Schema::LocalTimestampNanos = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::LocalTimestampNanos, SchemaKind::Long], + }); + } + } + SchemaKind::Duration => { + if let Schema::Duration = writers_schema { + return Ok(()); + } else { + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: &[SchemaKind::Duration, SchemaKind::Fixed], + }); + } + } _ => { return Err(CompatibilityError::Inconclusive(String::from( "readers_schema", @@ -412,9 +524,15 @@ impl SchemaCompatibility { // Here are the checks for primitive types match w_type { SchemaKind::Int => { - if [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double] - .iter() - .any(|&t| t == r_type) + if [ + SchemaKind::Long, + SchemaKind::Float, + SchemaKind::Double, + SchemaKind::TimeMillis, + SchemaKind::Date, + ] + .iter() + .any(|&t| t == r_type) { Ok(()) } else { @@ -425,15 +543,35 @@ impl SchemaCompatibility { } } SchemaKind::Long => { - if [SchemaKind::Float, SchemaKind::Double] - .iter() - .any(|&t| t == r_type) + if [ + SchemaKind::Float, + SchemaKind::Double, + SchemaKind::TimeMicros, + SchemaKind::TimestampMillis, + SchemaKind::TimestampMicros, + SchemaKind::TimestampNanos, + SchemaKind::LocalTimestampMillis, + SchemaKind::LocalTimestampMicros, + SchemaKind::LocalTimestampNanos, + ] + .iter() + .any(|&t| t == r_type) { Ok(()) } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Float, SchemaKind::Double], + expected_type: &[ + SchemaKind::Float, + SchemaKind::Double, + SchemaKind::TimeMicros, + SchemaKind::TimestampMillis, + SchemaKind::TimestampMicros, + SchemaKind::TimestampNanos, + SchemaKind::LocalTimestampMillis, + SchemaKind::LocalTimestampMicros, + SchemaKind::LocalTimestampNanos, + ], }) } } @@ -470,6 +608,96 @@ impl SchemaCompatibility { }) } } + SchemaKind::TimeMillis => { + if r_type == SchemaKind::Int { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Int, SchemaKind::TimeMillis], + }) + } + } + SchemaKind::Date => { + if r_type == SchemaKind::Int { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Int, SchemaKind::Date], + }) + } + } + SchemaKind::TimeMicros => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::TimeMicros], + }) + } + } + SchemaKind::TimestampMicros => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::TimestampMicros], + }) + } + } + SchemaKind::TimestampMillis => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::TimestampMillis], + }) + } + } + SchemaKind::TimestampNanos => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::TimestampNanos], + }) + } + } + SchemaKind::LocalTimestampMillis => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampMillis], + }) + } + } + SchemaKind::LocalTimestampMicros => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampMicros], + }) + } + } + SchemaKind::LocalTimestampNanos => { + if r_type == SchemaKind::Long { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampNanos], + }) + } + } _ => Err(CompatibilityError::Inconclusive(String::from( "writers_schema", ))), @@ -659,6 +887,16 @@ mod tests { // bytes (Schema::Bytes, Schema::Null), (Schema::Bytes, Schema::Int), + // logical types + (Schema::TimeMicros, Schema::Int), + (Schema::TimestampMillis, Schema::Int), + (Schema::TimestampMicros, Schema::Int), + (Schema::TimestampNanos, Schema::Int), + (Schema::LocalTimestampMillis, Schema::Int), + (Schema::LocalTimestampMicros, Schema::Int), + (Schema::LocalTimestampNanos, Schema::Int), + (Schema::Date, Schema::Long), + (Schema::TimeMillis, Schema::Long), // array and maps (int_array_schema(), long_array_schema()), (int_map_schema(), int_array_schema()), @@ -698,6 +936,25 @@ mod tests { (Schema::Double, Schema::Float), (Schema::String, Schema::Bytes), (Schema::Bytes, Schema::String), + // logical types + (Schema::TimeMicros, Schema::Long), + (Schema::TimestampMillis, Schema::Long), + (Schema::TimestampMicros, Schema::Long), + (Schema::TimestampNanos, Schema::Long), + (Schema::LocalTimestampMillis, Schema::Long), + (Schema::LocalTimestampMicros, Schema::Long), + (Schema::LocalTimestampNanos, Schema::Long), + (Schema::Date, Schema::Int), + (Schema::TimeMillis, Schema::Int), + (Schema::Long, Schema::TimeMicros), + (Schema::Long, Schema::TimestampMillis), + (Schema::Long, Schema::TimestampMicros), + (Schema::Long, Schema::TimestampNanos), + (Schema::Long, Schema::LocalTimestampMillis), + (Schema::Long, Schema::LocalTimestampMicros), + (Schema::Long, Schema::LocalTimestampNanos), + (Schema::Int, Schema::Date), + (Schema::Int, Schema::TimeMillis), (int_array_schema(), int_array_schema()), (long_array_schema(), int_array_schema()), (int_map_schema(), int_map_schema()), From e49bed9df96652187ef6d5ac7a0d0eea33cb89bc Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Tue, 13 Feb 2024 09:59:58 +0200 Subject: [PATCH 2/2] AVRO-3935: [Rust] Extract common logic for checking compatible reader/writer types Signed-off-by: Martin Tzvetanov Grigorov --- lang/rust/avro/src/error.rs | 2 +- lang/rust/avro/src/schema_compatibility.rs | 403 ++++++++------------- 2 files changed, 150 insertions(+), 255 deletions(-) diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index 171f7d40ce4..e46a9975c3a 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -503,7 +503,7 @@ pub enum CompatibilityError { #[error("Incompatible schema types! The {schema_type} should have been {expected_type:?}")] TypeExpected { schema_type: String, - expected_type: &'static [SchemaKind], + expected_type: Vec, }, #[error("Incompatible schemata! Field '{0}' in reader schema does not match the type in the writer schema")] diff --git a/lang/rust/avro/src/schema_compatibility.rs b/lang/rust/avro/src/schema_compatibility.rs index 5dd45d17ab3..5ee7c060beb 100644 --- a/lang/rust/avro/src/schema_compatibility.rs +++ b/lang/rust/avro/src/schema_compatibility.rs @@ -65,15 +65,15 @@ impl Checker { if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == SchemaKind::Fixed + || r_type == SchemaKind::Date + || r_type == SchemaKind::TimeMillis || r_type == SchemaKind::TimeMicros || r_type == SchemaKind::TimestampMillis || r_type == SchemaKind::TimestampMicros || r_type == SchemaKind::TimestampNanos || r_type == SchemaKind::LocalTimestampMillis || r_type == SchemaKind::LocalTimestampMicros - || r_type == SchemaKind::LocalTimestampNanos - || r_type == SchemaKind::Date - || r_type == SchemaKind::TimeMillis) + || r_type == SchemaKind::LocalTimestampNanos) { return Ok(()); } @@ -92,7 +92,7 @@ impl Checker { } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Record], + expected_type: vec![SchemaKind::Record], }) } } @@ -108,7 +108,7 @@ impl Checker { } else { Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Array], + expected_type: vec![SchemaKind::Array], }) } } @@ -155,7 +155,7 @@ impl Checker { if w_type == SchemaKind::Union { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Record], + expected_type: vec![SchemaKind::Record], }); } @@ -290,6 +290,53 @@ impl SchemaCompatibility { writers_schema: &Schema, readers_schema: &Schema, ) -> Result<(), CompatibilityError> { + fn check_reader_type_multi( + reader_type: SchemaKind, + allowed_reader_types: Vec, + writer_type: SchemaKind, + ) -> Result<(), CompatibilityError> { + if allowed_reader_types.iter().any(|&t| t == reader_type) { + Ok(()) + } else { + let mut allowed_types: Vec = vec![writer_type]; + allowed_types.extend_from_slice(allowed_reader_types.as_slice()); + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: allowed_types, + }) + } + } + + fn check_reader_type( + reader_type: SchemaKind, + allowed_reader_type: SchemaKind, + writer_type: SchemaKind, + ) -> Result<(), CompatibilityError> { + if reader_type == allowed_reader_type { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: vec![writer_type, allowed_reader_type], + }) + } + } + + fn check_writer_type( + writers_schema: &Schema, + allowed_schema: Schema, + expected_schema_types: Vec, + ) -> Result<(), CompatibilityError> { + if allowed_schema == *writers_schema { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: expected_schema_types, + }) + } + } + let w_type = SchemaKind::from(writers_schema); let r_type = SchemaKind::from(readers_schema); @@ -317,13 +364,13 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Record], + expected_type: vec![SchemaKind::Record], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Record], + expected_type: vec![SchemaKind::Record], }); } } @@ -350,7 +397,7 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Fixed], + expected_type: vec![SchemaKind::Fixed], }); } } @@ -369,13 +416,13 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Enum], + expected_type: vec![SchemaKind::Enum], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Enum], + expected_type: vec![SchemaKind::Enum], }); } } @@ -386,13 +433,13 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Map], + expected_type: vec![SchemaKind::Map], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Map], + expected_type: vec![SchemaKind::Map], }); } } @@ -403,105 +450,78 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Array], + expected_type: vec![SchemaKind::Array], }); } } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Array], + expected_type: vec![SchemaKind::Array], }); } } + SchemaKind::Date => { + return check_writer_type( + writers_schema, + Schema::Date, + vec![SchemaKind::Date, SchemaKind::Int], + ); + } SchemaKind::TimeMillis => { - if let Schema::TimeMillis = writers_schema { - return Ok(()); - } else { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::TimeMillis, SchemaKind::Int], - }); - } + return check_writer_type( + writers_schema, + Schema::TimeMillis, + vec![SchemaKind::Date, SchemaKind::Int], + ); } SchemaKind::TimeMicros => { - if let Schema::TimeMicros = writers_schema { - return Ok(()); - } else { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::TimeMicros, SchemaKind::Long], - }); - } + return check_writer_type( + writers_schema, + Schema::TimeMicros, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); } SchemaKind::TimestampNanos => { - if let Schema::TimestampNanos = writers_schema { - return Ok(()); - } else { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::TimestampNanos, SchemaKind::Long], - }); - } + return check_writer_type( + writers_schema, + Schema::TimestampNanos, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); } SchemaKind::TimestampMillis => { - if let Schema::TimestampMillis = writers_schema { - return Ok(()); - } else { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::TimestampMillis, SchemaKind::Long], - }); - } + return check_writer_type( + writers_schema, + Schema::TimestampMillis, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); } SchemaKind::TimestampMicros => { - if let Schema::TimestampMicros = writers_schema { - return Ok(()); - } else { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::TimestampMicros, SchemaKind::Long], - }); - } - } - SchemaKind::Date => { - if let Schema::Date = writers_schema { - return Ok(()); - } else { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Date, SchemaKind::Int], - }); - } + return check_writer_type( + writers_schema, + Schema::TimestampMicros, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); } SchemaKind::LocalTimestampMillis => { - if let Schema::LocalTimestampMillis = writers_schema { - return Ok(()); - } else { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::LocalTimestampMillis, SchemaKind::Long], - }); - } + return check_writer_type( + writers_schema, + Schema::LocalTimestampMillis, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); } SchemaKind::LocalTimestampMicros => { - if let Schema::LocalTimestampMicros = writers_schema { - return Ok(()); - } else { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::LocalTimestampMicros, SchemaKind::Long], - }); - } + return check_writer_type( + writers_schema, + Schema::LocalTimestampMicros, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); } SchemaKind::LocalTimestampNanos => { - if let Schema::LocalTimestampNanos = writers_schema { - return Ok(()); - } else { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::LocalTimestampNanos, SchemaKind::Long], - }); - } + return check_writer_type( + writers_schema, + Schema::TimeMicros, + vec![SchemaKind::TimeMicros, SchemaKind::Long], + ); } SchemaKind::Duration => { if let Schema::Duration = writers_schema { @@ -509,7 +529,7 @@ impl SchemaCompatibility { } else { return Err(CompatibilityError::TypeExpected { schema_type: String::from("writers_schema"), - expected_type: &[SchemaKind::Duration, SchemaKind::Fixed], + expected_type: vec![SchemaKind::Duration, SchemaKind::Fixed], }); } } @@ -523,27 +543,20 @@ impl SchemaCompatibility { // Here are the checks for primitive types match w_type { - SchemaKind::Int => { - if [ + SchemaKind::Int => check_reader_type_multi( + r_type, + vec![ SchemaKind::Long, SchemaKind::Float, SchemaKind::Double, - SchemaKind::TimeMillis, SchemaKind::Date, - ] - .iter() - .any(|&t| t == r_type) - { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::Float, SchemaKind::Double], - }) - } - } - SchemaKind::Long => { - if [ + SchemaKind::TimeMillis, + ], + w_type, + ), + SchemaKind::Long => check_reader_type_multi( + r_type, + vec![ SchemaKind::Float, SchemaKind::Double, SchemaKind::TimeMicros, @@ -553,150 +566,25 @@ impl SchemaCompatibility { SchemaKind::LocalTimestampMillis, SchemaKind::LocalTimestampMicros, SchemaKind::LocalTimestampNanos, - ] - .iter() - .any(|&t| t == r_type) - { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[ - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::TimeMicros, - SchemaKind::TimestampMillis, - SchemaKind::TimestampMicros, - SchemaKind::TimestampNanos, - SchemaKind::LocalTimestampMillis, - SchemaKind::LocalTimestampMicros, - SchemaKind::LocalTimestampNanos, - ], - }) - } - } + ], + w_type, + ), SchemaKind::Float => { - if [SchemaKind::Float, SchemaKind::Double] - .iter() - .any(|&t| t == r_type) - { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Float, SchemaKind::Double], - }) - } + check_reader_type_multi(r_type, vec![SchemaKind::Float, SchemaKind::Double], w_type) } - SchemaKind::String => { - if r_type == SchemaKind::Bytes { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Bytes], - }) - } + SchemaKind::String => check_reader_type(r_type, SchemaKind::Bytes, w_type), + SchemaKind::Bytes => check_reader_type(r_type, SchemaKind::String, w_type), + SchemaKind::Date | SchemaKind::TimeMillis => { + check_reader_type(r_type, SchemaKind::Int, w_type) } - SchemaKind::Bytes => { - if r_type == SchemaKind::String { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::String], - }) - } - } - SchemaKind::TimeMillis => { - if r_type == SchemaKind::Int { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Int, SchemaKind::TimeMillis], - }) - } - } - SchemaKind::Date => { - if r_type == SchemaKind::Int { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Int, SchemaKind::Date], - }) - } - } - SchemaKind::TimeMicros => { - if r_type == SchemaKind::Long { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::TimeMicros], - }) - } - } - SchemaKind::TimestampMicros => { - if r_type == SchemaKind::Long { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::TimestampMicros], - }) - } - } - SchemaKind::TimestampMillis => { - if r_type == SchemaKind::Long { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::TimestampMillis], - }) - } - } - SchemaKind::TimestampNanos => { - if r_type == SchemaKind::Long { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::TimestampNanos], - }) - } - } - SchemaKind::LocalTimestampMillis => { - if r_type == SchemaKind::Long { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampMillis], - }) - } - } - SchemaKind::LocalTimestampMicros => { - if r_type == SchemaKind::Long { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampMicros], - }) - } - } - SchemaKind::LocalTimestampNanos => { - if r_type == SchemaKind::Long { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::LocalTimestampNanos], - }) - } + SchemaKind::TimeMicros + | SchemaKind::TimestampMicros + | SchemaKind::TimestampMillis + | SchemaKind::TimestampNanos + | SchemaKind::LocalTimestampMillis + | SchemaKind::LocalTimestampMicros + | SchemaKind::LocalTimestampNanos => { + check_reader_type(r_type, SchemaKind::Long, w_type) } _ => Err(CompatibilityError::Inconclusive(String::from( "writers_schema", @@ -937,6 +825,8 @@ mod tests { (Schema::String, Schema::Bytes), (Schema::Bytes, Schema::String), // logical types + (Schema::Date, Schema::Int), + (Schema::TimeMillis, Schema::Int), (Schema::TimeMicros, Schema::Long), (Schema::TimestampMillis, Schema::Long), (Schema::TimestampMicros, Schema::Long), @@ -944,8 +834,8 @@ mod tests { (Schema::LocalTimestampMillis, Schema::Long), (Schema::LocalTimestampMicros, Schema::Long), (Schema::LocalTimestampNanos, Schema::Long), - (Schema::Date, Schema::Int), - (Schema::TimeMillis, Schema::Int), + (Schema::Int, Schema::Date), + (Schema::Int, Schema::TimeMillis), (Schema::Long, Schema::TimeMicros), (Schema::Long, Schema::TimestampMillis), (Schema::Long, Schema::TimestampMicros), @@ -953,8 +843,6 @@ mod tests { (Schema::Long, Schema::LocalTimestampMillis), (Schema::Long, Schema::LocalTimestampMicros), (Schema::Long, Schema::LocalTimestampNanos), - (Schema::Int, Schema::Date), - (Schema::Int, Schema::TimeMillis), (int_array_schema(), int_array_schema()), (long_array_schema(), int_array_schema()), (int_map_schema(), int_map_schema()), @@ -1116,7 +1004,14 @@ mod tests { assert_eq!( CompatibilityError::TypeExpected { schema_type: String::from("readers_schema"), - expected_type: &[SchemaKind::Long, SchemaKind::Float, SchemaKind::Double], + expected_type: vec![ + SchemaKind::Int, + SchemaKind::Long, + SchemaKind::Float, + SchemaKind::Double, + SchemaKind::Date, + SchemaKind::TimeMillis + ], }, SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err() ); @@ -1158,7 +1053,7 @@ mod tests { "field1".to_owned(), Box::new(CompatibilityError::TypeExpected { schema_type: "readers_schema".to_owned(), - expected_type: &[SchemaKind::Bytes] + expected_type: vec![SchemaKind::String, SchemaKind::Bytes] }) ), SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()