diff --git a/src/main/java/io/kestra/plugin/kafka/serdes/MapToGenericRecordSerializer.java b/src/main/java/io/kestra/plugin/kafka/serdes/MapToGenericRecordSerializer.java index ab13a4e..f7b13e0 100644 --- a/src/main/java/io/kestra/plugin/kafka/serdes/MapToGenericRecordSerializer.java +++ b/src/main/java/io/kestra/plugin/kafka/serdes/MapToGenericRecordSerializer.java @@ -47,7 +47,11 @@ private static Object buildValue(Schema schema, Object data) { case ARRAY -> buildArrayValue(schema, (Collection) data); case ENUM -> buildEnumValue(schema, (String) data); case FIXED -> buildFixedValue(schema, (byte[]) data); - case STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL -> data; + case INT -> data instanceof Number number ? number.intValue() : data; + case LONG -> data instanceof Number number ? number.longValue() : data; + case FLOAT -> data instanceof Number number ? number.floatValue() : data; + case DOUBLE -> data instanceof Number number ? number.doubleValue() : data; + case STRING, BYTES, BOOLEAN, NULL -> data; }; } diff --git a/src/test/java/io/kestra/plugin/kafka/KafkaTest.java b/src/test/java/io/kestra/plugin/kafka/KafkaTest.java index 40b12be..39ffe04 100644 --- a/src/test/java/io/kestra/plugin/kafka/KafkaTest.java +++ b/src/test/java/io/kestra/plugin/kafka/KafkaTest.java @@ -413,6 +413,72 @@ void produceComplexAvro() throws Exception { assertThat(reproduceRunOutput.getMessagesCount(), is(1)); } + @Test + void produceAvro_withIntegerAsLong() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + String topic = "tu_" + IdUtils.create(); + + Map value = Map.of("number", 42); + + Produce task = Produce.builder() + .properties(Map.of("bootstrap.servers", this.bootstrap)) + .serdeProperties(Map.of("schema.registry.url", this.registry)) + .keySerializer(SerdeType.STRING) + .valueSerializer(SerdeType.AVRO) + .topic(topic) + .valueAvroSchema(""" + { + "type": "record", + "name": "Sample", + "namespace": "io.kestra.examples", + "fields": [ + { + "name": "number", + "type": "long" + } + ] + } + """) + .from(Map.of("value", value)) + .build(); + + Produce.Output output = task.run(runContext); + assertThat(output.getMessagesCount(), is(1)); + } + + @Test + void produceAvro_withDoubleAsFloat() throws Exception { + RunContext runContext = runContextFactory.of(ImmutableMap.of()); + String topic = "tu_" + IdUtils.create(); + + Map value = Map.of("number", 42.0d); + + Produce task = Produce.builder() + .properties(Map.of("bootstrap.servers", this.bootstrap)) + .serdeProperties(Map.of("schema.registry.url", this.registry)) + .keySerializer(SerdeType.STRING) + .valueSerializer(SerdeType.AVRO) + .topic(topic) + .valueAvroSchema(""" + { + "type": "record", + "name": "Sample", + "namespace": "io.kestra.examples", + "fields": [ + { + "name": "number", + "type": "float" + } + ] + } + """) + .from(Map.of("value", value)) + .build(); + + Produce.Output output = task.run(runContext); + assertThat(output.getMessagesCount(), is(1)); + } + @Test void produceAvro_withUnion_andRecord() throws Exception { RunContext runContext = runContextFactory.of(ImmutableMap.of());