From b07509cb67d61d993858d446ed1677a75f45e3c0 Mon Sep 17 00:00:00 2001 From: Yoann Vernageau <6807151+yvrng@users.noreply.github.com> Date: Tue, 15 Oct 2024 09:37:23 +0200 Subject: [PATCH] fix: handle numeric types when generating Avro values (#96) * fix: handle numeric types when generating Avro values * fix: add support for logical types --- .../serdes/MapToGenericRecordSerializer.java | 6 +- .../io/kestra/plugin/kafka/KafkaTest.java | 66 +++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/kestra/plugin/kafka/serdes/MapToGenericRecordSerializer.java b/src/main/java/io/kestra/plugin/kafka/serdes/MapToGenericRecordSerializer.java index ab13a4e..f7b13e0 100644 --- a/src/main/java/io/kestra/plugin/kafka/serdes/MapToGenericRecordSerializer.java +++ b/src/main/java/io/kestra/plugin/kafka/serdes/MapToGenericRecordSerializer.java @@ -47,7 +47,11 @@ private static Object buildValue(Schema schema, Object data) { case ARRAY -> buildArrayValue(schema, (Collection) data); case ENUM -> buildEnumValue(schema, (String) data); case FIXED -> buildFixedValue(schema, (byte[]) data); - case STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL -> data; + case INT -> data instanceof Number number ? number.intValue() : data; + case LONG -> data instanceof Number number ? number.longValue() : data; + case FLOAT -> data instanceof Number number ? number.floatValue() : data; + case DOUBLE -> data instanceof Number number ? number.doubleValue() : data; + case STRING, BYTES, BOOLEAN, NULL -> data; }; } diff --git a/src/test/java/io/kestra/plugin/kafka/KafkaTest.java b/src/test/java/io/kestra/plugin/kafka/KafkaTest.java index 40b12be..39ffe04 100644 --- a/src/test/java/io/kestra/plugin/kafka/KafkaTest.java +++ b/src/test/java/io/kestra/plugin/kafka/KafkaTest.java @@ -413,6 +413,72 @@ void produceComplexAvro() throws Exception { assertThat(reproduceRunOutput.getMessagesCount(), is(1)); } + @Test + void produceAvro_withIntegerAsLong() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + String topic = "tu_" + IdUtils.create(); + + Map value = Map.of("number", 42); + + Produce task = Produce.builder() + .properties(Map.of("bootstrap.servers", this.bootstrap)) + .serdeProperties(Map.of("schema.registry.url", this.registry)) + .keySerializer(SerdeType.STRING) + .valueSerializer(SerdeType.AVRO) + .topic(topic) + .valueAvroSchema(""" + { + "type": "record", + "name": "Sample", + "namespace": "io.kestra.examples", + "fields": [ + { + "name": "number", + "type": "long" + } + ] + } + """) + .from(Map.of("value", value)) + .build(); + + Produce.Output output = task.run(runContext); + assertThat(output.getMessagesCount(), is(1)); + } + + @Test + void produceAvro_withDoubleAsFloat() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + String topic = "tu_" + IdUtils.create(); + + Map value = Map.of("number", 42.0d); + + Produce task = Produce.builder() + .properties(Map.of("bootstrap.servers", this.bootstrap)) + .serdeProperties(Map.of("schema.registry.url", this.registry)) + .keySerializer(SerdeType.STRING) + .valueSerializer(SerdeType.AVRO) + .topic(topic) + .valueAvroSchema(""" + { + "type": "record", + "name": "Sample", + "namespace": "io.kestra.examples", + "fields": [ + { + "name": "number", + "type": "float" + } + ] + } + """) + .from(Map.of("value", value)) + .build(); + + Produce.Output output = task.run(runContext); + assertThat(output.getMessagesCount(), is(1)); + } + @Test void produceAvro_withUnion_andRecord() throws Exception { RunContext runContext = runContextFactory.of(ImmutableMap.of());