From b8f9a0b6875d447d32c78ca49d11cb44165eda06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Tue, 20 Aug 2024 17:35:17 +0200 Subject: [PATCH] fix(sqs,sns): from with a list of objects --- src/main/java/io/kestra/plugin/aws/sns/Publish.java | 4 ++-- src/main/java/io/kestra/plugin/aws/sqs/Publish.java | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/sns/Publish.java b/src/main/java/io/kestra/plugin/aws/sns/Publish.java index d1f417e9..59903e09 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sns/Publish.java @@ -83,8 +83,8 @@ public Publish.Output run(RunContext runContext) throws Exception { } else if (this.from instanceof List) { flowable = Flux - .fromArray(((List) this.from).toArray()) - .cast(Message.class); + .fromIterable((List) this.from) + .map(map -> JacksonMapper.toMap(map, Message.class)); resultFlowable = this.buildFlowable(flowable, snsClient, topicArn, runContext); diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java index 61145ce8..1bcdaa34 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java @@ -22,6 +22,8 @@ import java.io.InputStreamReader; import java.net.URI; import java.util.List; +import java.util.Map; + import jakarta.validation.constraints.NotNull; import static io.kestra.core.utils.Rethrow.throwFunction; @@ -85,8 +87,8 @@ public Output run(RunContext runContext) throws Exception { } else if (this.from instanceof List) { flowable = Flux - .fromArray(((List) this.from).toArray()) - .cast(Message.class); + .fromIterable((List) this.from) + .map(map -> JacksonMapper.toMap(map, Message.class)); resultFlowable = this.buildFlowable(flowable, sqsClient, queueUrl, runContext);