diff --git a/plugin-script-groovy/src/main/java/io/kestra/plugin/scripts/groovy/FileTransform.java b/plugin-script-groovy/src/main/java/io/kestra/plugin/scripts/groovy/FileTransform.java index 693044b..049bc50 100644 --- a/plugin-script-groovy/src/main/java/io/kestra/plugin/scripts/groovy/FileTransform.java +++ b/plugin-script-groovy/src/main/java/io/kestra/plugin/scripts/groovy/FileTransform.java @@ -9,6 +9,9 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.runners.RunContext; +import java.util.Collection; +import java.util.List; + @SuperBuilder @ToString @EqualsAndHashCode @@ -26,14 +29,14 @@ inputs: - id: file type: FILE - + tasks: - id: file_transform type: io.kestra.plugin.scripts.groovy.FileTransform from: "{{ inputs.file }}" script: | logger.info('row: {}', row) - + if (row.get('name') == 'richard') { row = null } else { @@ -48,7 +51,7 @@ id: groovy_file_transform namespace: company.team - inputs: + inputs: - id: file type: FILE @@ -68,7 +71,7 @@ id: groovy_file_transform namespace: company.team - inputs: + inputs: - id: json type: JSON defaults: [{"name":"jane"}, {"name":"richard"}] @@ -79,7 +82,7 @@ from: "{{ inputs.json }}" script: | logger.info('row: {}', row) - + if (row.get('name') == 'richard') { row = null } else { @@ -97,7 +100,7 @@ tasks: - id: file_transform type: io.kestra.plugin.scripts.groovy.FileTransform - from: "[{\"name\":\"John Doe\", \"age\":99, \"embedded\":{\"foo\":\"bar\"}}]" + from: "[{"name":"John Doe", "age":99, "embedded":{"foo":"bar"}}]" script: | import com.fasterxml.jackson.* @@ -108,7 +111,7 @@ def typeRef = new core.type.TypeReference>() {}; data = mapper.readValue(jsonStr, typeRef); - + logger.info('json object: {}', data); logger.info('embedded field: {}', data.embedded.foo) """ @@ -120,4 +123,10 @@ public class FileTransform extends io.kestra.plugin.scripts.jvm.FileTransform { public Output run(RunContext runContext) throws Exception { return this.run(runContext, "groovy"); } + + @Override + protected Collection convertRows(Object rows) { + //noinspection unchecked + return (Collection) rows; + } } diff --git a/plugin-script-groovy/src/test/java/io/kestra/plugin/scripts/groovy/FileTransformTest.java b/plugin-script-groovy/src/test/java/io/kestra/plugin/scripts/groovy/FileTransformTest.java index bc03dff..626cdf8 100644 --- a/plugin-script-groovy/src/test/java/io/kestra/plugin/scripts/groovy/FileTransformTest.java +++ b/plugin-script-groovy/src/test/java/io/kestra/plugin/scripts/groovy/FileTransformTest.java @@ -37,40 +37,14 @@ protected FileTransform task(String source) { .build(); } - - @SuppressWarnings("unchecked") - @Test - void rows() throws Exception { - File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); - OutputStream output = new FileOutputStream(tempFile); - - FileSerde.write(output, ImmutableMap.of( - "id", "1", - "name", "john" - )); - - URI source = storageInterface.put( - null, - new URI("/" + IdUtils.create()), - new FileInputStream(tempFile) - ); - - FileTransform task = io.kestra.plugin.scripts.groovy.FileTransform.builder() + @Override + protected FileTransform multipleRows(String source) { + return io.kestra.plugin.scripts.groovy.FileTransform.builder() .id("unit-test") .type(FileTransform.class.getName()) - .from(source.toString()) + .from(source) .concurrent(10) - .script("rows = [1,2,3, [\"action\": \"insert\"]]\n") + .script("rows = [1, 2, row, [\"action\": \"insert\"]]\n") .build(); - - RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()); - FileTransform.Output runOutput = task.run(runContext); - - BufferedReader inputStream = new BufferedReader(new InputStreamReader(storageInterface.get(null, runOutput.getUri()))); - List result = new ArrayList<>(); - FileSerde.reader(inputStream, result::add); - - assertThat(result.size(), is(4)); - assertThat(result, hasItems(1, 2, 3)); } } diff --git a/plugin-script-jython/src/main/java/io/kestra/plugin/scripts/jython/FileTransform.java b/plugin-script-jython/src/main/java/io/kestra/plugin/scripts/jython/FileTransform.java index 1665bd9..268d96a 100644 --- a/plugin-script-jython/src/main/java/io/kestra/plugin/scripts/jython/FileTransform.java +++ b/plugin-script-jython/src/main/java/io/kestra/plugin/scripts/jython/FileTransform.java @@ -9,6 +9,9 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.runners.RunContext; +import java.util.Collection; +import java.util.List; + @SuperBuilder @ToString @EqualsAndHashCode @@ -67,13 +70,33 @@ from: "{{ inputs.file }}" script: | logger.info('row: {}', row) - - if row['name'] == 'richard': + + if row['name'] == 'richard': row = None - else: + else: row['email'] = row['name'] + '@kestra.io' """ ), + @Example( + title = "Create multiple rows from one row.", + full = true, + code = """ + id: jython_file_transform + namespace: company.team + + inputs: + - id: file + type: FILE + + tasks: + - id: file_transform + type: io.kestra.plugin.scripts.jython.FileTransform + from: "{{ inputs.file }}" + script: | + logger.info('row: {}', row) + rows = [{"action": "insert"}, row] + """ + ), @Example( title = "Transform with file from JSON string.", full = true, @@ -92,8 +115,8 @@ from: "{{ inputs.json }}" script: | logger.info('row: {}', row) - - if row['name'] == 'richard': + + if row['name'] == 'richard': row = None else: row['email'] = row['name'] + '@kestra.io' @@ -106,4 +129,10 @@ public class FileTransform extends io.kestra.plugin.scripts.jvm.FileTransform { public Output run(RunContext runContext) throws Exception { return this.run(runContext, "python"); } + + @Override + protected Collection convertRows(Object rows) { + //noinspection unchecked + return (Collection) rows; + } } diff --git a/plugin-script-jython/src/test/java/io/kestra/plugin/scripts/jython/FileTransformTest.java b/plugin-script-jython/src/test/java/io/kestra/plugin/scripts/jython/FileTransformTest.java index 7dbff61..bf90306 100644 --- a/plugin-script-jython/src/test/java/io/kestra/plugin/scripts/jython/FileTransformTest.java +++ b/plugin-script-jython/src/test/java/io/kestra/plugin/scripts/jython/FileTransformTest.java @@ -19,4 +19,14 @@ protected FileTransform task(String source) { ) .build(); } + + @Override + protected FileTransform multipleRows(String source) { + return io.kestra.plugin.scripts.jython.FileTransform.builder() + .id("unit-test") + .type(Eval.class.getName()) + .from(source) + .script("rows = [1, 2 , row, {\"action\": \"insert\"}]\n") + .build(); + } } diff --git a/plugin-script-nashorn/src/main/java/io/kestra/plugin/scripts/nashorn/FileTransform.java b/plugin-script-nashorn/src/main/java/io/kestra/plugin/scripts/nashorn/FileTransform.java index a5b884a..c6c44ef 100644 --- a/plugin-script-nashorn/src/main/java/io/kestra/plugin/scripts/nashorn/FileTransform.java +++ b/plugin-script-nashorn/src/main/java/io/kestra/plugin/scripts/nashorn/FileTransform.java @@ -6,6 +6,10 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.runners.RunContext; +import org.openjdk.nashorn.api.scripting.ScriptObjectMirror; + +import java.util.Collection; +import java.util.List; @SuperBuilder @ToString @@ -30,12 +34,32 @@ from: "{{ outputs['avro-to-gcs'] }}" script: | logger.info('row: {}', row) - + if (row['name'] === 'richard') { row = null } else { row['email'] = row['name'] + '@kestra.io' - } + } + """ + ), + @Example( + title = "Create multiple rows from one row.", + full = true, + code = """ + id: nashorn_file_transform + namespace: company.team + + inputs: + - id: file + type: FILE + + tasks: + - id: file_transform + type: io.kestra.plugin.scripts.nashorn.FileTransform + from: "{{ inputs.file }}" + script: | + logger.info('row: {}', row) + rows = [{"action": "insert"}, row] """ ), @Example( @@ -51,7 +75,7 @@ from: "[{\"name":\"jane\"}, {\"name\":\"richard\"}]" script: | logger.info('row: {}', row) - + if (row['name'] === 'richard') { row = null } else { @@ -66,4 +90,9 @@ public class FileTransform extends io.kestra.plugin.scripts.jvm.FileTransform { public Output run(RunContext runContext) throws Exception { return this.run(runContext, "nashorn"); } + + @Override + protected Collection convertRows(Object rows) { + return ((ScriptObjectMirror) rows).values(); + } } diff --git a/plugin-script-nashorn/src/test/java/io/kestra/plugin/scripts/nashorn/FileTransformTest.java b/plugin-script-nashorn/src/test/java/io/kestra/plugin/scripts/nashorn/FileTransformTest.java index 8a1b1a0..cd34246 100644 --- a/plugin-script-nashorn/src/test/java/io/kestra/plugin/scripts/nashorn/FileTransformTest.java +++ b/plugin-script-nashorn/src/test/java/io/kestra/plugin/scripts/nashorn/FileTransformTest.java @@ -20,4 +20,14 @@ protected FileTransform task(String source) { ) .build(); } + + @Override + protected io.kestra.plugin.scripts.jvm.FileTransform multipleRows(String source) { + return io.kestra.plugin.scripts.nashorn.FileTransform.builder() + .id("unit-test") + .type(Eval.class.getName()) + .from(source) + .script("rows = [1, 2, row, {\"action\": \"insert\"}]\n") + .build(); + } } diff --git a/plugin-script/src/main/java/io/kestra/plugin/scripts/jvm/FileTransform.java b/plugin-script/src/main/java/io/kestra/plugin/scripts/jvm/FileTransform.java index c4183da..603620c 100644 --- a/plugin-script/src/main/java/io/kestra/plugin/scripts/jvm/FileTransform.java +++ b/plugin-script/src/main/java/io/kestra/plugin/scripts/jvm/FileTransform.java @@ -139,6 +139,8 @@ protected void finalize( runContext.metric(Counter.of("records", lineCount)); } + abstract protected Collection convertRows(Object rows); + @SuppressWarnings("unchecked") protected Function> convert(ScriptEngineService.CompiledScript script) throws ScriptException { return throwFunction(row -> { @@ -148,7 +150,7 @@ protected Function> convert(ScriptEngineService.Compil script.getScript().eval(bindings); if (bindings.get("rows") != null) { - return Flux.fromIterable((Collection) bindings.get("rows")); + return Flux.fromIterable(this.convertRows(bindings.get("rows"))); } if (bindings.get("row") != null) { diff --git a/plugin-script/src/test/java/io/kestra/plugin/scripts/jvm/FileTransformTest.java b/plugin-script/src/test/java/io/kestra/plugin/scripts/jvm/FileTransformTest.java index 707175f..86a46d6 100644 --- a/plugin-script/src/test/java/io/kestra/plugin/scripts/jvm/FileTransformTest.java +++ b/plugin-script/src/test/java/io/kestra/plugin/scripts/jvm/FileTransformTest.java @@ -22,6 +22,7 @@ import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.is; @KestraTest @@ -34,6 +35,8 @@ public abstract class FileTransformTest { abstract protected FileTransform task(String source); + abstract protected FileTransform multipleRows(String source); + @Test void run() throws Exception { File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); @@ -123,4 +126,37 @@ void test(String source, int size) throws Exception { ))); } } + + @Test + void rows() throws Exception { + File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs"); + OutputStream output = new FileOutputStream(tempFile); + + var map = Map.of( + "id", "1", + "name", "john" + ); + + FileSerde.write(output, map); + + URI source = storageInterface.put( + null, + new URI("/" + IdUtils.create()), + new FileInputStream(tempFile) + ); + + io.kestra.plugin.scripts.jvm.FileTransform task = this.multipleRows(source.toString()); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()); + io.kestra.plugin.scripts.jvm.FileTransform.Output runOutput = task.run(runContext); + + BufferedReader inputStream = new BufferedReader(new InputStreamReader(storageInterface.get(null, runOutput.getUri()))); + List result = new ArrayList<>(); + FileSerde.reader(inputStream, result::add); + + assertThat(result.size(), is(4)); + assertThat(result, hasItems(1, 2)); + assertThat(result.get(2), is(map)); + assertThat(result.get(3), is(Map.of("action", "insert"))); + } }