Skip to content

Commit

Permalink
fix: FileTransform with multiple row failed on nashorn
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Nov 1, 2024
1 parent c4100e5 commit 0c74d51
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.runners.RunContext;

import java.util.Collection;
import java.util.List;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand All @@ -26,14 +29,14 @@
inputs:
- id: file
type: FILE
tasks:
- id: file_transform
type: io.kestra.plugin.scripts.groovy.FileTransform
from: "{{ inputs.file }}"
script: |
logger.info('row: {}', row)
if (row.get('name') == 'richard') {
row = null
} else {
Expand All @@ -48,7 +51,7 @@
id: groovy_file_transform
namespace: company.team
inputs:
inputs:
- id: file
type: FILE
Expand All @@ -68,7 +71,7 @@
id: groovy_file_transform
namespace: company.team
inputs:
inputs:
- id: json
type: JSON
defaults: [{"name":"jane"}, {"name":"richard"}]
Expand All @@ -79,7 +82,7 @@
from: "{{ inputs.json }}"
script: |
logger.info('row: {}', row)
if (row.get('name') == 'richard') {
row = null
} else {
Expand All @@ -97,7 +100,7 @@
tasks:
- id: file_transform
type: io.kestra.plugin.scripts.groovy.FileTransform
from: "[{\"name\":\"John Doe\", \"age\":99, \"embedded\":{\"foo\":\"bar\"}}]"
from: "[{"name":"John Doe", "age":99, "embedded":{"foo":"bar"}}]"
script: |
import com.fasterxml.jackson.*
Expand All @@ -108,7 +111,7 @@
def typeRef = new core.type.TypeReference<HashMap<String,Object>>() {};
data = mapper.readValue(jsonStr, typeRef);
logger.info('json object: {}', data);
logger.info('embedded field: {}', data.embedded.foo)
"""
Expand All @@ -120,4 +123,10 @@ public class FileTransform extends io.kestra.plugin.scripts.jvm.FileTransform {
public Output run(RunContext runContext) throws Exception {
return this.run(runContext, "groovy");
}

@Override
protected Collection<Object> convertRows(Object rows) {
//noinspection unchecked
return (Collection<Object>) rows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,40 +37,14 @@ protected FileTransform task(String source) {
.build();
}


@SuppressWarnings("unchecked")
@Test
void rows() throws Exception {
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
OutputStream output = new FileOutputStream(tempFile);

FileSerde.write(output, ImmutableMap.of(
"id", "1",
"name", "john"
));

URI source = storageInterface.put(
null,
new URI("/" + IdUtils.create()),
new FileInputStream(tempFile)
);

FileTransform task = io.kestra.plugin.scripts.groovy.FileTransform.builder()
@Override
protected FileTransform multipleRows(String source) {
return io.kestra.plugin.scripts.groovy.FileTransform.builder()
.id("unit-test")
.type(FileTransform.class.getName())
.from(source.toString())
.from(source)
.concurrent(10)
.script("rows = [1,2,3, [\"action\": \"insert\"]]\n")
.script("rows = [1, 2, row, [\"action\": \"insert\"]]\n")
.build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
FileTransform.Output runOutput = task.run(runContext);

BufferedReader inputStream = new BufferedReader(new InputStreamReader(storageInterface.get(null, runOutput.getUri())));
List<Object> result = new ArrayList<>();
FileSerde.reader(inputStream, result::add);

assertThat(result.size(), is(4));
assertThat(result, hasItems(1, 2, 3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.runners.RunContext;

import java.util.Collection;
import java.util.List;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -67,13 +70,33 @@
from: "{{ inputs.file }}"
script: |
logger.info('row: {}', row)
if row['name'] == 'richard':
if row['name'] == 'richard':
row = None
else:
else:
row['email'] = row['name'] + '@kestra.io'
"""
),
@Example(
title = "Create multiple rows from one row.",
full = true,
code = """
id: jython_file_transform
namespace: company.team
inputs:
- id: file
type: FILE
tasks:
- id: file_transform
type: io.kestra.plugin.scripts.jython.FileTransform
from: "{{ inputs.file }}"
script: |
logger.info('row: {}', row)
rows = [{"action": "insert"}, row]
"""
),
@Example(
title = "Transform with file from JSON string.",
full = true,
Expand All @@ -92,8 +115,8 @@
from: "{{ inputs.json }}"
script: |
logger.info('row: {}', row)
if row['name'] == 'richard':
if row['name'] == 'richard':
row = None
else:
row['email'] = row['name'] + '@kestra.io'
Expand All @@ -106,4 +129,10 @@ public class FileTransform extends io.kestra.plugin.scripts.jvm.FileTransform {
public Output run(RunContext runContext) throws Exception {
return this.run(runContext, "python");
}

@Override
protected Collection<Object> convertRows(Object rows) {
//noinspection unchecked
return (Collection<Object>) rows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,14 @@ protected FileTransform task(String source) {
)
.build();
}

@Override
protected FileTransform multipleRows(String source) {
return io.kestra.plugin.scripts.jython.FileTransform.builder()
.id("unit-test")
.type(Eval.class.getName())
.from(source)
.script("rows = [1, 2 , row, {\"action\": \"insert\"}]\n")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.runners.RunContext;
import org.openjdk.nashorn.api.scripting.ScriptObjectMirror;

import java.util.Collection;
import java.util.List;

@SuperBuilder
@ToString
Expand All @@ -30,12 +34,32 @@
from: "{{ outputs['avro-to-gcs'] }}"
script: |
logger.info('row: {}', row)
if (row['name'] === 'richard') {
row = null
} else {
row['email'] = row['name'] + '@kestra.io'
}
}
"""
),
@Example(
title = "Create multiple rows from one row.",
full = true,
code = """
id: nashorn_file_transform
namespace: company.team
inputs:
- id: file
type: FILE
tasks:
- id: file_transform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: "{{ inputs.file }}"
script: |
logger.info('row: {}', row)
rows = [{"action": "insert"}, row]
"""
),
@Example(
Expand All @@ -51,7 +75,7 @@
from: "[{\"name":\"jane\"}, {\"name\":\"richard\"}]"
script: |
logger.info('row: {}', row)
if (row['name'] === 'richard') {
row = null
} else {
Expand All @@ -66,4 +90,9 @@ public class FileTransform extends io.kestra.plugin.scripts.jvm.FileTransform {
public Output run(RunContext runContext) throws Exception {
return this.run(runContext, "nashorn");
}

@Override
protected Collection<Object> convertRows(Object rows) {
return ((ScriptObjectMirror) rows).values();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,14 @@ protected FileTransform task(String source) {
)
.build();
}

@Override
protected io.kestra.plugin.scripts.jvm.FileTransform multipleRows(String source) {
return io.kestra.plugin.scripts.nashorn.FileTransform.builder()
.id("unit-test")
.type(Eval.class.getName())
.from(source)
.script("rows = [1, 2, row, {\"action\": \"insert\"}]\n")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ protected void finalize(
runContext.metric(Counter.of("records", lineCount));
}

abstract protected Collection<Object> convertRows(Object rows);

@SuppressWarnings("unchecked")
protected Function<Object, Publisher<Object>> convert(ScriptEngineService.CompiledScript script) throws ScriptException {
return throwFunction(row -> {
Expand All @@ -148,7 +150,7 @@ protected Function<Object, Publisher<Object>> convert(ScriptEngineService.Compil
script.getScript().eval(bindings);

if (bindings.get("rows") != null) {
return Flux.fromIterable((Collection<Object>) bindings.get("rows"));
return Flux.fromIterable(this.convertRows(bindings.get("rows")));
}

if (bindings.get("row") != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import jakarta.inject.Inject;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;

@KestraTest
Expand All @@ -34,6 +35,8 @@ public abstract class FileTransformTest {

abstract protected FileTransform task(String source);

abstract protected FileTransform multipleRows(String source);

@Test
void run() throws Exception {
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
Expand Down Expand Up @@ -123,4 +126,37 @@ void test(String source, int size) throws Exception {
)));
}
}

@Test
void rows() throws Exception {
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
OutputStream output = new FileOutputStream(tempFile);

var map = Map.of(
"id", "1",
"name", "john"
);

FileSerde.write(output, map);

URI source = storageInterface.put(
null,
new URI("/" + IdUtils.create()),
new FileInputStream(tempFile)
);

io.kestra.plugin.scripts.jvm.FileTransform task = this.multipleRows(source.toString());

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
io.kestra.plugin.scripts.jvm.FileTransform.Output runOutput = task.run(runContext);

BufferedReader inputStream = new BufferedReader(new InputStreamReader(storageInterface.get(null, runOutput.getUri())));
List<Object> result = new ArrayList<>();
FileSerde.reader(inputStream, result::add);

assertThat(result.size(), is(4));
assertThat(result, hasItems(1, 2));
assertThat(result.get(2), is(map));
assertThat(result.get(3), is(Map.of("action", "insert")));
}
}

0 comments on commit 0c74d51

Please sign in to comment.