From 3cfe66f01d6fdd1818c5c06a418400efe61718b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Mon, 3 Apr 2023 16:53:24 +0200 Subject: [PATCH] fix(ssh): Join thread when the output streams are closed (#56) --- .../java/io/kestra/plugin/fs/ssh/Command.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/kestra/plugin/fs/ssh/Command.java b/src/main/java/io/kestra/plugin/fs/ssh/Command.java index 20fd793..8f4bb98 100644 --- a/src/main/java/io/kestra/plugin/fs/ssh/Command.java +++ b/src/main/java/io/kestra/plugin/fs/ssh/Command.java @@ -74,6 +74,8 @@ public class Command extends Task implements AbstractVfsInterface, RunnableTask< public VoidOutput run(RunContext runContext) throws Exception { Session session = null; ChannelExec channel = null; + AbstractLogThread stdOut = null; + AbstractLogThread stdErr = null; try( var outStream = new PipedOutputStream(); @@ -97,17 +99,14 @@ public VoidOutput run(RunContext runContext) throws Exception { channel.setCommand(String.join("\n", renderedCommands)); channel.setOutputStream(new BufferedOutputStream(outStream), true); channel.setErrStream(new BufferedOutputStream(errStream), true); - var stdOut = threadLogSupplier(runContext).call(inStream, false); - var stdErr = threadLogSupplier(runContext).call(inErrStream, true); + stdOut = threadLogSupplier(runContext).call(inStream, false); + stdErr = threadLogSupplier(runContext).call(inErrStream, true); channel.connect(); while (channel.isConnected()) { Thread.sleep(SLEEP_DELAY_MS); } - stdOut.join(); - stdErr.join(); - if(channel.getExitStatus() != 0) { throw new Exception("SSH command fails with exit status " + channel.getExitStatus()); } @@ -120,6 +119,12 @@ public VoidOutput run(RunContext runContext) throws Exception { if (session != null) { session.disconnect(); } + if (stdOut != null) { + stdOut.join(); + } + if (stdErr != null) { + stdErr.join(); + } } }