Skip to content

Commit

Permalink
Faster avro nulls (backport of #6881 to 1.17) (#6883)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrp authored Sep 17, 2024
1 parent 5016d52 commit 0aa6bf4
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
5 changes: 5 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

#### Highlights

### 1.17.1 (Not released yet)

* [#6880](https://github.com/TouK/nussknacker/pull/6880) Performance optimization of generating Avro messages with unions
- shorter message in logs

### 1.17.0 (12 September 2024)

* [#6658](https://github.com/TouK/nussknacker/pull/6658) Bump up circe-yaml lib to 0.15.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
case (Schema.Type.RECORD, map: util.Map[String @unchecked, _]) =>
encodeRecord(map, schema)
case (Schema.Type.ENUM, symbol: CharSequence) =>
encodeEnumOrError(symbol.toString, schema, fieldName)
encodeEnum(symbol.toString, schema, fieldName)
case (Schema.Type.ENUM, symbol: EnumSymbol) =>
encodeEnumOrError(symbol.toString, schema, fieldName)
encodeEnum(symbol.toString, schema, fieldName)
case (Schema.Type.ARRAY, collection: Iterable[_]) =>
encodeCollection(collection, schema)
case (Schema.Type.ARRAY, collection: util.Collection[_]) =>
Expand All @@ -60,6 +60,8 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
case (Schema.Type.MAP, map: util.Map[_, _]) =>
encodeMap(map.asScala, schema)
case (Schema.Type.UNION, _) =>
// Note: calling 'toString' on Avro schema is expensive, especially when we reject some messages.
// Error messages should be lazily evaluated, and materialized only when exiting public functions.
schema.getTypes.asScala
.to(LazyList)
.flatMap { subTypeSchema =>
Expand Down Expand Up @@ -138,9 +140,9 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
case (Schema.Type.NULL, None) =>
Valid(null)
case (_, null) =>
error(s"Not expected null for field: $fieldName with schema: $schema")
error(s"Not expected null for field: $fieldName with schema: ${schema.getFullName}")
case (_, _) =>
error(s"Not expected type: ${value.getClass.getName} for field: $fieldName with schema: $schema")
error(s"Not expected type: ${value.getClass.getName} for field: $fieldName with schema: ${schema.getFullName}")
}
}

Expand All @@ -149,11 +151,15 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
decimal.setScale(decimalLogicalType.getScale, RoundingMode.DOWN).bigDecimal
}

def encodeEnumOrError(symbol: String, schema: Schema, fieldName: Option[String]): WithError[EnumSymbol] =
if (!schema.hasEnumSymbol(symbol))
error(s"Not expected symbol: $symbol for field: $fieldName with schema: $schema")
else
private def encodeEnum(symbol: String, schema: Schema, fieldName: Option[String]): WithError[EnumSymbol] =
if (!schema.hasEnumSymbol(symbol)) {
val allowedEnumValues = schema.getEnumSymbols.asScala.mkString(", ")
error(
s"Not expected symbol: $symbol for field: $fieldName with schema: ${schema.getFullName}, allowed values: $allowedEnumValues"
)
} else {
Valid(new EnumSymbol(schema, symbol))
}

def encodeRecordOrError(fields: collection.Map[String, _], schema: Schema): GenericData.Record = {
encodeRecordOrError(fields.asJava, schema)
Expand Down Expand Up @@ -221,7 +227,7 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat

private def encodeFixed(bytes: Array[Byte], schema: Schema): WithError[GenericData.Fixed] = {
if (bytes.length != schema.getFixedSize) {
error(s"Fixed size not matches: ${bytes.length} != ${schema.getFixedSize} for schema: $schema")
error(s"Fixed size not matches: ${bytes.length} != ${schema.getFixedSize} for schema: ${schema.getFullName}")
} else {
val fixed = new GenericData.Fixed(schema)
fixed.bytes(bytes)
Expand Down

0 comments on commit 0aa6bf4

Please sign in to comment.