Skip to content

Commit

Permalink
Switch from jsch to sshd-sftp fo SSH impl and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dadoonet committed Aug 30, 2024
1 parent 6c270a6 commit 9b7a9ea
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 135 deletions.
6 changes: 3 additions & 3 deletions crawler/crawler-ssh/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
<artifactId>fscrawler-crawler-abstract</artifactId>
</dependency>

<!--Dependency for parsing remote ssh directory [http://www.jcraft.com/jsch/]-->
<!-- Dependency for parsing remote ssh directory https://github.com/apache/mina-sshd/ -->
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-sftp</artifactId>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@

package fr.pilato.elasticsearch.crawler.fs.crawler.ssh;

import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractModel;
import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractor;
import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings;
Expand All @@ -32,71 +27,80 @@
import org.apache.commons.io.FilenameUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.keyprovider.FileKeyPairProvider;
import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpClientFactory;

import java.io.InputStream;
import java.time.Instant;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Vector;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class FileAbstractorSSH extends FileAbstractor<ChannelSftp.LsEntry> {
public class FileAbstractorSSH extends FileAbstractor<SftpClient.DirEntry> {
private final Logger logger = LogManager.getLogger(FileAbstractorSSH.class);

private ChannelSftp sftp;
private ClientSession session;
private SshClient sshClient;
private SftpClient sftpClient;

public FileAbstractorSSH(FsSettings fsSettings) {
super(fsSettings);
}

@Override
public FileAbstractModel toFileAbstractModel(String path, ChannelSftp.LsEntry file) {
public FileAbstractModel toFileAbstractModel(String path, SftpClient.DirEntry file) {
return new FileAbstractModel(
file.getFilename(),
!file.getAttrs().isDir(),
!file.getAttributes().isDirectory(),
// We are using here the local TimeZone as a reference. If the remote system is under another TZ, this might cause issues
LocalDateTime.ofInstant(Instant.ofEpochMilli(file.getAttrs().getMTime()*1000L), ZoneId.systemDefault()),
LocalDateTime.ofInstant(file.getAttributes().getModifyTime().toInstant(), ZoneId.systemDefault()),
// We don't have the creation date
null,
// We are using here the local TimeZone as a reference. If the remote system is under another TZ, this might cause issues
LocalDateTime.ofInstant(Instant.ofEpochMilli(file.getAttrs().getATime()*1000L), ZoneId.systemDefault()),
LocalDateTime.ofInstant(file.getAttributes().getAccessTime().toInstant(), ZoneId.systemDefault()),
FilenameUtils.getExtension(file.getFilename()),
path,
path.equals("/") ? path.concat(file.getFilename()) : path.concat("/").concat(file.getFilename()),
file.getAttrs().getSize(),
Integer.toString(file.getAttrs().getUId()),
Integer.toString(file.getAttrs().getGId()),
file.getAttrs().getPermissions());
file.getAttributes().getSize(),
Integer.toString(file.getAttributes().getUserId()),
Integer.toString(file.getAttributes().getGroupId()),
file.getAttributes().getPermissions());
}

@Override
public InputStream getInputStream(FileAbstractModel file) throws Exception {
return sftp.get(file.getFullpath());
return sftpClient.read(file.getFullpath());
}

@Override
public void closeInputStream(InputStream inputStream) throws IOException {
inputStream.close();
}

@SuppressWarnings("unchecked")
@Override
public Collection<FileAbstractModel> getFiles(String dir) throws Exception {
logger.debug("Listing local files from {}", dir);
Vector<ChannelSftp.LsEntry> ls;

ls = sftp.ls(dir);
if (ls == null) return null;
Iterable<SftpClient.DirEntry> ls;

Collection<FileAbstractModel> result = new ArrayList<>(ls.size());
// Iterate other files
// We ignore here all files like . and ..
result.addAll(ls.stream().filter(file -> !".".equals(file.getFilename()) &&
!"..".equals(file.getFilename()))
ls = sftpClient.readDir(dir);

/*
Iterate other files
We ignore here all files like "." and ".."
*/
Collection<FileAbstractModel> result = StreamSupport.stream(ls.spliterator(), false)
.filter(file -> !".".equals(file.getFilename()) &&
!"..".equals(file.getFilename()))
.map(file -> toFileAbstractModel(dir, file))
.collect(Collectors.toList()));
.collect(Collectors.toList());

logger.debug("{} local files found", result.size());
return result;
Expand All @@ -105,7 +109,7 @@ public Collection<FileAbstractModel> getFiles(String dir) throws Exception {
@Override
public boolean exists(String dir) {
try {
sftp.ls(dir);
sftpClient.readDir(dir);
} catch (Exception e) {
return false;
}
Expand All @@ -114,52 +118,62 @@ public boolean exists(String dir) {

@Override
public void open() throws Exception {
sftp = openSSHConnection(fsSettings.getServer());
sshClient = createSshClient();
session = openSshSession(sshClient, fsSettings.getServer());
sftpClient = createSftpClient(session);
}

@Override
public void close() throws Exception {
if (sftp != null) {
logger.debug("Closing SSH connection");
sftp.getSession().disconnect();
sftp.disconnect();
if (sshClient != null) {
if (session != null) {
if (sftpClient != null) {
logger.debug("Closing SFTP Client");
sftpClient.close();
}
logger.debug("Closing SSH Session");
session.close();
}
logger.debug("Closing SSH Client");
sshClient.close();
}
}

private ChannelSftp openSSHConnection(Server server) throws Exception {
private SshClient createSshClient() {
logger.debug("Create and start SSH client");

SshClient client = SshClient.setUpDefaultClient();
client.setServerKeyVerifier(AcceptAllServerKeyVerifier.INSTANCE);
client.start();
return client;
}

private ClientSession openSshSession(SshClient sshClient, Server server) throws Exception {
logger.debug("Opening SSH connection to {}@{}", server.getUsername(), server.getHostname());

JSch jsch = new JSch();
Session session = jsch.getSession(server.getUsername(), server.getHostname(), server.getPort());
java.util.Properties config = new java.util.Properties();
config.put("StrictHostKeyChecking", "no");
if (server.getPemPath() != null) {
jsch.addIdentity(server.getPemPath());
}
session.setConfig(config);
ClientSession session = sshClient.connect(server.getUsername(), server.getHostname(), server.getPort()).verify().getSession();

if (server.getPassword() != null) {
session.setPassword(server.getPassword());
session.addPasswordIdentity(server.getPassword()); // for password-based authentication
}

try {
session.connect();
} catch (JSchException e) {
logger.warn("Cannot connect with SSH to {}@{}: {}", server.getUsername(),
server.getHostname(), e.getMessage());
throw e;
if (server.getPemPath() != null) {
// for password-less authentication
FileKeyPairProvider fileKeyPairProvider = new FileKeyPairProvider(Paths.get(server.getPemPath()));
Iterable<KeyPair> keyPairs = fileKeyPairProvider.loadKeys(null);
for (KeyPair keyPair : keyPairs) {
session.addPublicKeyIdentity(keyPair);
}
}

//Open a new session for SFTP.
Channel channel = session.openChannel("sftp");
channel.connect();
session.auth().verify();

//checking SSH client connection.
if (!channel.isConnected()) {
logger.warn("Cannot connect with SSH to {}@{}", server.getUsername(),
server.getHostname());
throw new RuntimeException("Can not connect to " + server.getUsername() + "@" + server.getHostname());
}
logger.debug("SSH connection successful");
return (ChannelSftp) channel;
return session;
}

private SftpClient createSftpClient(ClientSession session) throws Exception {
logger.debug("Create SFTP client");
return SftpClientFactory.instance().createSftpClient(session);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package fr.pilato.elasticsearch.crawler.fs.client;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
package fr.pilato.elasticsearch.crawler.fs.test.integration;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.KeyPair;
import fr.pilato.elasticsearch.crawler.fs.FsCrawlerImpl;
import fr.pilato.elasticsearch.crawler.fs.client.ESSearchRequest;
import fr.pilato.elasticsearch.crawler.fs.client.ESSearchResponse;
Expand All @@ -39,12 +35,6 @@
import fr.pilato.elasticsearch.crawler.fs.test.framework.AbstractFSCrawlerTestCase;
import jakarta.ws.rs.ProcessingException;
import org.apache.logging.log4j.Level;
import org.apache.sshd.common.keyprovider.FileKeyPairProvider;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.session.ServerSession;
import org.apache.sshd.sftp.server.SftpFileSystemAccessor;
import org.apache.sshd.sftp.server.SftpSubsystemFactory;
import org.apache.sshd.sftp.server.SftpSubsystemProxy;
import org.hamcrest.Matcher;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -57,7 +47,6 @@
import java.net.URL;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
Expand All @@ -69,8 +58,6 @@

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiAlphanumOfLength;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween;
import static com.jcraft.jsch.KeyPair.RSA;
import static com.jcraft.jsch.KeyPair.genKeyPair;
import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.*;
import static fr.pilato.elasticsearch.crawler.fs.settings.ServerUrl.decodeCloudId;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -93,7 +80,6 @@
*
* mvn verify -DskipIntegTests
*/
@ThreadLeakFilters(filters = {SshdThreadFilter.class})
public abstract class AbstractITCase extends AbstractFSCrawlerTestCase {

protected static Path metadataDir = null;
Expand All @@ -105,9 +91,6 @@ public abstract class AbstractITCase extends AbstractFSCrawlerTestCase {
private final static String DEFAULT_TEST_CLUSTER_URL = "https://127.0.0.1:9200";
private final static String DEFAULT_USERNAME = "elastic";
private final static String DEFAULT_PASSWORD = "changeme";
private final static Integer DEFAULT_TEST_REST_PORT = 8080;
protected final static String SSH_USERNAME = "USERNAME";
protected final static String SSH_PASSWORD = "PASSWORD";

protected static String testClusterUrl = null;
@Deprecated
Expand Down Expand Up @@ -525,39 +508,4 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx
}
});
}

protected SshServer startSshServer() throws IOException, JSchException {
SftpSubsystemFactory factory = new SftpSubsystemFactory.Builder()
.withFileSystemAccessor(new SftpFileSystemAccessor() {
@Override
public Path resolveLocalFilePath(ServerSession session, SftpSubsystemProxy subsystem, Path rootDir,
String remotePath) throws InvalidPathException {
String path = remotePath;
if (remotePath.startsWith("/")) {
path = remotePath.substring(1);
}
return currentTestResourceDir.resolve(path);
}
})
.build();

// Generate the key files
JSch jSch = new JSch();
KeyPair keyPair = genKeyPair(jSch, RSA);
keyPair.writePrivateKey(rootTmpDir.resolve("private.key").toString());
keyPair.writePublicKey(rootTmpDir.resolve("public.key").toString(), "Fake public key for FSCrawler tests");

SshServer sshd = SshServer.setUpDefaultServer();
sshd.setPasswordAuthenticator((username, password, session) ->
SSH_USERNAME.equals(username) && SSH_PASSWORD.equals(password));

sshd.setHost("0.0.0.0");
sshd.setKeyPairProvider(new FileKeyPairProvider(rootTmpDir.resolve("private.key")));
sshd.setSubsystemFactories(Collections.singletonList(factory));
sshd.start();

logger.info(" -> Started fake SSHD service on {}:{}", sshd.getHost(), sshd.getPort());

return sshd;
}
}
Loading

0 comments on commit 9b7a9ea

Please sign in to comment.