Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: storage now have a namespace parameter #188

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void task() throws Exception {
Flux<LogEntry> receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/hello.java"),
IOUtils.toInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void task() throws Exception {
Flux<LogEntry> receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
IOUtils.toInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void task() throws Exception {
Flux<LogEntry> receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
IOUtils.toInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void task() throws Exception {
Flux<LogEntry> receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
IOUtils.toInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.kestra.core.models.tasks.runners.TaskException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.kestra.plugin.scripts.exec.scripts.models.RunnerType;
Expand All @@ -27,8 +26,6 @@ class PythonTest {
@Inject
RunContextFactory runContextFactory;

@Inject
StorageInterface storageInterface;

@Test
void run() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void task() throws Exception {
Flux<LogEntry> receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
IOUtils.toInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ void requirements(RunnerType runner, DockerOptions dockerOptions) throws Excepti
@MethodSource("source")
void inputs(RunnerType runner, DockerOptions dockerOptions) throws Exception {
URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
IOUtils.toInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void task() throws Exception {
Flux<LogEntry> receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
IOUtils.toInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void task() throws Exception {
Flux<LogEntry> receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
IOUtils.toInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ void files() throws Exception {
assertThat(run.getStdOutLineCount(), is(1));
assertThat(run.getVars().get("extract"), is("I'm here"));

InputStream get = storageInterface.get(null, run.getOutputFiles().get("xml"));
InputStream get = storageInterface.get(null, null, run.getOutputFiles().get("xml"));

assertThat(
CharStreams.toString(new InputStreamReader(get)),
is("1\n3\n")
);

get = storageInterface.get(null, run.getOutputFiles().get("csv"));
get = storageInterface.get(null, null, run.getOutputFiles().get("csv"));

assertThat(
CharStreams.toString(new InputStreamReader(get)),
Expand Down Expand Up @@ -112,13 +112,13 @@ void outputDirs() throws Exception {
assertThat(run.getStdErrLineCount(), is(0));
assertThat(run.getStdOutLineCount(), is(0));

InputStream get = storageInterface.get(null, run.getOutputFiles().get("xml/file1.txt"));
InputStream get = storageInterface.get(null, null, run.getOutputFiles().get("xml/file1.txt"));
assertThat(CharStreams.toString(new InputStreamReader(get)), is("1\n"));

get = storageInterface.get(null, run.getOutputFiles().get("xml/sub/sub2/file2.txt"));
get = storageInterface.get(null, null, run.getOutputFiles().get("xml/sub/sub2/file2.txt"));
assertThat(CharStreams.toString(new InputStreamReader(get)), is("2\n"));

get = storageInterface.get(null, run.getOutputFiles().get("csv/file1.txt"));
get = storageInterface.get(null, null, run.getOutputFiles().get("csv/file1.txt"));
assertThat(CharStreams.toString(new InputStreamReader(get)), is("3\n"));
}
@Test
Expand Down Expand Up @@ -197,6 +197,7 @@ void useInputFilesFromKestraFs() throws Exception {
URL resource = AbstractBashTest.class.getClassLoader().getResource("application.yml");

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
new FileInputStream(Objects.requireNonNull(resource).getFile())
Expand All @@ -220,7 +221,7 @@ void useInputFilesFromKestraFs() throws Exception {
ScriptOutput run = bash.run(runContext);

assertThat(run.getExitCode(), is(0));
InputStream get = storageInterface.get(null, run.getOutputFiles().get("out"));
InputStream get = storageInterface.get(null, null, run.getOutputFiles().get("out"));
String outputContent = CharStreams.toString(new InputStreamReader(get));
String fileContent = String.join("\n", Files.readAllLines(new File(resource.getPath()).toPath(), StandardCharsets.UTF_8));
assertThat(outputContent, is(fileContent + "\n"));
Expand All @@ -231,12 +232,14 @@ void useInputFilesAsVariable() throws Exception {
URL resource = AbstractBashTest.class.getClassLoader().getResource("application.yml");

URI put1 = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
new FileInputStream(Objects.requireNonNull(resource).getFile())
);

URI put2 = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
new FileInputStream(Objects.requireNonNull(resource).getFile())
Expand All @@ -261,7 +264,7 @@ void useInputFilesAsVariable() throws Exception {
ScriptOutput run = bash.run(runContext);

assertThat(run.getExitCode(), is(0));
InputStream get = storageInterface.get(null, run.getOutputFiles().get("out"));
InputStream get = storageInterface.get(null, null, run.getOutputFiles().get("out"));
String outputContent = CharStreams.toString(new InputStreamReader(get));
String fileContent = String.join("\n", Files.readAllLines(new File(resource.getPath()).toPath(), StandardCharsets.UTF_8));
assertThat(outputContent, is(fileContent + "\n" + fileContent + "\n"));
Expand All @@ -272,6 +275,7 @@ void preventRelativeFile() throws Exception {
URL resource = AbstractBashTest.class.getClassLoader().getResource("application.yml");

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
new FileInputStream(Objects.requireNonNull(resource).getFile())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ void dontStopOnFirstFailed(RunnerType runner, DockerOptions dockerOptions) throw
@MethodSource("source")
void files(RunnerType runner, DockerOptions dockerOptions) throws Exception {
URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
IOUtils.toInputStream("I'm here", StandardCharsets.UTF_8)
Expand Down Expand Up @@ -171,14 +172,14 @@ void files(RunnerType runner, DockerOptions dockerOptions) throws Exception {
assertThat(run.getVars().get("extract"), is("I'm here"));
assertThat(run.getOutputFiles().size(), is(2));

InputStream get = storageInterface.get(null, run.getOutputFiles().get("file.xml"));
InputStream get = storageInterface.get(null, null, run.getOutputFiles().get("file.xml"));

assertThat(
CharStreams.toString(new InputStreamReader(get)),
is("1\n3\n")
);

get = storageInterface.get(null, run.getOutputFiles().get("sub/dir/file.csv"));
get = storageInterface.get(null, null, run.getOutputFiles().get("sub/dir/file.csv"));

assertThat(
CharStreams.toString(new InputStreamReader(get)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void inputOutputFiles() throws Exception {
assertThat(run.getExitCode(), is(0));
assertThat(run.getOutputFiles().get("out/bla.yml").toString(), startsWith("kestra://"));
assertThat(
new String(storageInterface.get(null, run.getOutputFiles().get("out/bla.yml")).readAllBytes()),
new String(storageInterface.get(null, null, run.getOutputFiles().get("out/bla.yml")).readAllBytes()),
containsString("base-path: /tmp/unittest")
);
}
Expand All @@ -169,6 +169,7 @@ private URI internalFiles(String path) throws IOException, URISyntaxException {
var resource = ScriptTest.class.getClassLoader().getResource("application.yml");

return storageInterface.put(
null,
null,
new URI(path),
new FileInputStream(Objects.requireNonNull(resource).getFile())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected FileTransform.Output run(RunContext runContext, String engineName) thr
.build();
}

protected void finalize(
private void finalize(
RunContext runContext,
Flux<Object> flowable,
ScriptEngineService.CompiledScript scripts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ void run() throws Exception {
));

URI source = storageInterface.put(
null,
null,
new URI("/" + IdUtils.create()),
new FileInputStream(tempFile)
Expand Down Expand Up @@ -104,7 +105,7 @@ void test(String source, int size) throws Exception {
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
FileTransform.Output runOutput = task.run(runContext);

BufferedReader inputStream = new BufferedReader(new InputStreamReader(storageInterface.get(null, runOutput.getUri())));
BufferedReader inputStream = new BufferedReader(new InputStreamReader(storageInterface.get(null, null, runOutput.getUri())));
List<Object> result = new ArrayList<>();
FileSerde.reader(inputStream, result::add);

Expand Down Expand Up @@ -140,6 +141,7 @@ void rows() throws Exception {
FileSerde.write(output, map);

URI source = storageInterface.put(
null,
null,
new URI("/" + IdUtils.create()),
new FileInputStream(tempFile)
Expand All @@ -150,7 +152,7 @@ void rows() throws Exception {
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of());
io.kestra.plugin.scripts.jvm.FileTransform.Output runOutput = task.run(runContext);

BufferedReader inputStream = new BufferedReader(new InputStreamReader(storageInterface.get(null, runOutput.getUri())));
BufferedReader inputStream = new BufferedReader(new InputStreamReader(storageInterface.get(null, null, runOutput.getUri())));
List<Object> result = new ArrayList<>();
FileSerde.reader(inputStream, result::add);

Expand Down