Skip to content

Commit

Permalink
fix: handle processing of an empty list or file
Browse files Browse the repository at this point in the history
  • Loading branch information
yvrng authored and loicmathieu committed Sep 5, 2024
1 parent e04d3ff commit 05d8944
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/kestra/plugin/kafka/Produce.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,15 @@ public Output run(RunContext runContext) throws Exception {

count = resultFlowable
.reduce(Integer::sum)
.block();
.blockOptional().orElse(0);
}
} else {
flowable = Flux.fromArray(((List<Object>) this.from).toArray());
resultFlowable = this.buildFlowable(flowable, runContext, producer);

count = resultFlowable
.reduce(Integer::sum)
.block();
.blockOptional().orElse(0);
}
} else {
producer.send(this.producerRecord(runContext, producer, (Map<String, Object>) this.from));
Expand Down

0 comments on commit 05d8944

Please sign in to comment.