Skip to content

Commit

Permalink
v24.42.0 (#109)
Browse files Browse the repository at this point in the history
* Moved counter for checking handler classes to outside the condition (#104)

* Add recursive checking when creating a destination directory (#105)

* 106 unexplained errors deleting gpg keystore after decryption (#107)

* Move exception print statement earlier to avoid confusing log messages. Check for directories before trying to delete them.

* Upate changelog

* Add way to enable host key validation for SSH/SFTP (#108)

* bump version v24.37.2 -> v24.42.0
  • Loading branch information
adammcdonagh authored Oct 19, 2024
1 parent 5c3da8d commit 3331e11
Show file tree
Hide file tree
Showing 20 changed files with 393 additions and 21 deletions.
10 changes: 10 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@
"args": ["-t", "scp-basic", "-c", "test/cfg", "-v3"],
"justMyCode": false
},
{
"name": "Python: Transfer - SFTP Basic",
"type": "debugpy",
"request": "launch",
"preLaunchTask": "Build Test containers",
"program": "src/opentaskpy/cli/task_run.py",
"console": "integratedTerminal",
"args": ["-t", "sftp-basic", "-c", "test/cfg", "-v3"],
"justMyCode": false
},
{
"name": "Python: Transfer - Basic - As job",
"type": "debugpy",
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

# v24.42.0

- Fix issue where different protocols where not being detected properly, and proxy had to be explicitly defined when it was unnecessary
- When creating a destination directory with SFTP, it will now check whether lower level directories exist, and create them if not
- Always check for a directory before trying to delete it and thowing an exception if it doesn't exist.
- Moved exception printing for transfers to earlier in th code to ensure log messages aren't confusing.
- Add ability to check for SSH host key and validate it before proceeding with connection

# No release

- Bump `black` to 24.10.0
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "opentaskpy"
version = "v24.37.2"
version = "v24.42.0"
authors = [{ name = "Adam McDonagh", email = "[email protected]" }]
license = { text = "GPLv3" }
classifiers = [
Expand Down Expand Up @@ -71,7 +71,7 @@ otf-batch-validator = "opentaskpy.cli.batch_validator:main"
profile = 'black'

[tool.bumpver]
current_version = "v24.37.2"
current_version = "v24.42.0"
version_pattern = "vYY.WW.PATCH[-TAG]"
commit_message = "bump version {old_version} -> {new_version}"
commit = true
Expand Down
6 changes: 6 additions & 0 deletions src/opentaskpy/config/schemas/execution/ssh/protocol.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
"port": {
"type": "integer"
},
"hostKeyValidation": {
"type": "boolean"
},
"knownHostsFile": {
"type": "string"
},
"credentials": {
"type": "object",
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
"port": {
"type": "integer"
},
"hostKeyValidation": {
"type": "boolean"
},
"knownHostsFile": {
"type": "string"
},
"supportsPosixRename": {
"type": "boolean",
"default": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
"port": {
"type": "integer"
},
"hostKeyValidation": {
"type": "boolean"
},
"knownHostsFile": {
"type": "string"
},
"supportsPosixRename": {
"type": "boolean",
"default": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
"port": {
"type": "integer"
},
"hostKeyValidation": {
"type": "boolean"
},
"knownHostsFile": {
"type": "string"
},
"credentials": {
"type": "object",
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
"port": {
"type": "integer"
},
"hostKeyValidation": {
"type": "boolean"
},
"knownHostsFile": {
"type": "string"
},
"credentials": {
"type": "object",
"properties": {
Expand Down
35 changes: 31 additions & 4 deletions src/opentaskpy/remotehandlers/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,20 @@
from io import StringIO
from shlex import quote

from paramiko import AutoAddPolicy, Channel, RSAKey, SFTPClient, SSHClient
from tenacity import retry, stop_after_attempt, wait_exponential
from paramiko import Channel, RSAKey, SFTPClient, SSHClient
from tenacity import (
retry,
retry_if_exception,
retry_if_not_exception_message,
stop_after_attempt,
wait_exponential,
)

import opentaskpy.otflogging
from opentaskpy.remotehandlers.remotehandler import RemoteTransferHandler

from .ssh_utils import setup_host_key_validation


class SFTPTransfer(RemoteTransferHandler):
"""SFTP Transfer Handler."""
Expand Down Expand Up @@ -103,6 +111,12 @@ def connect(self, hostname: str) -> None:
reraise=True,
stop=stop_after_attempt(6),
wait=wait_exponential(multiplier=2, min=5, max=60),
retry=(
retry_if_not_exception_message(
match=r".*(not found in known_hosts|Name or service not known).*"
)
& retry_if_exception(Exception)
),
)
def connect_with_retry(self, client_kwargs: dict) -> SSHClient:
"""Connect to the remote host with retry.
Expand All @@ -118,7 +132,7 @@ def connect_with_retry(self, client_kwargs: dict) -> SSHClient:
ssh_client.set_log_channel(
f"{__name__}.{ self.spec['task_id']}.paramiko.transport"
)
ssh_client.set_missing_host_key_policy(AutoAddPolicy())
setup_host_key_validation(ssh_client, self.spec, self.logger)
self.logger.info(f"Connecting to {client_kwargs['hostname']}")

# Set additional timeout options to match the standard timeout
Expand Down Expand Up @@ -317,7 +331,20 @@ def push_files_from_worker(
f"[{self.spec['hostname']}] Creating destination directory:"
f" {destination_directory}"
)
self.sftp_client.mkdir(destination_directory)

# We need to check recursively if the destination directory is nested
current_dir = ""
for dir_part in destination_directory.split("/"):
if not dir_part:
continue
current_dir += f"/{dir_part}"
try:
self.sftp_client.stat(current_dir)
except OSError:
self.logger.info(
f"[{self.spec['hostname']}] Destination directory {current_dir} does not exist. Creating it."
)
self.sftp_client.mkdir(current_dir)

# Transfer the files
result = 0
Expand Down
23 changes: 19 additions & 4 deletions src/opentaskpy/remotehandlers/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@
from io import StringIO
from shlex import quote

from paramiko import AutoAddPolicy, RSAKey, SFTPClient, SSHClient, Transport
from paramiko import RSAKey, SFTPClient, SSHClient, Transport
from paramiko.channel import ChannelFile, ChannelStderrFile
from tenacity import retry, stop_after_attempt, wait_exponential
from tenacity import (
retry,
retry_if_exception,
retry_if_not_exception_message,
stop_after_attempt,
wait_exponential,
)

import opentaskpy.otflogging
from opentaskpy.exceptions import SSHClientError
Expand All @@ -24,6 +30,8 @@
RemoteTransferHandler,
)

from .ssh_utils import setup_host_key_validation

SSH_OPTIONS: str = "-o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=5"
REMOTE_SCRIPT_BASE_DIR: str = "/tmp" # nosec B108

Expand Down Expand Up @@ -52,7 +60,7 @@ def __init__(self, spec: dict):

client = SSHClient()
client.set_log_channel(f"{__name__}.{ spec['task_id']}.paramiko.transport")
client.set_missing_host_key_policy(AutoAddPolicy())
setup_host_key_validation(client, spec, self.logger)
self.ssh_client = client

# Handle default values
Expand Down Expand Up @@ -136,6 +144,12 @@ def connect(self, hostname: str, ssh_client: SSHClient | None = None) -> None:
reraise=True,
stop=stop_after_attempt(6),
wait=wait_exponential(multiplier=2, min=5, max=60),
retry=(
retry_if_not_exception_message(
match=r".*(not found in known_hosts|Name or service not known).*"
)
& retry_if_exception(Exception)
),
)
def connect_with_retry(self, ssh_client: SSHClient, kwargs: dict) -> None:
"""Connect to the remote host with retry.
Expand Down Expand Up @@ -916,7 +930,8 @@ def __init__(self, remote_host: str, spec: dict):

client = SSHClient()
client.set_log_channel(f"{__name__}.{ spec['task_id']}.paramiko.transport")
client.set_missing_host_key_policy(AutoAddPolicy())

setup_host_key_validation(client, spec, self.logger)

self.ssh_client = client

Expand Down
28 changes: 28 additions & 0 deletions src/opentaskpy/remotehandlers/ssh_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Utility functions for SSH."""

from logging import Logger

from paramiko import AutoAddPolicy, SSHClient


def setup_host_key_validation(client: SSHClient, spec: dict, logger: Logger) -> None:
"""Set up host key validation for an SSH client.
Args:
client (SSHClient): The SSH client to set up.
spec (dict): The spec for the SSH connection.
logger (logging.Logger): The logger to use.
"""
logger.info("Loading system host keys")
client.load_system_host_keys()

if (
"hostKeyValidation" in spec["protocol"]
and spec["protocol"]["hostKeyValidation"]
):
if "knownHostsFile" in spec["protocol"]:
host_key = spec["protocol"]["knownHostsFile"]
logger.info(f"Loading host keys from {host_key}")
client.load_host_keys(host_key)
else:
client.set_missing_host_key_policy(AutoAddPolicy())
22 changes: 14 additions & 8 deletions src/opentaskpy/taskhandlers/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ def return_result(
Returns:
bool: The result of the task run.
"""
# Log the exception
if exception:
self.logger.exception(exception)

# Delete the remote connection objects
if self.source_remote_handler:
self.logger.info("Closing source connection")
Expand All @@ -125,9 +129,6 @@ def return_result(
)

# Call super to do the rest
# Log the exception
if exception:
self.logger.exception(exception)
return super().return_result(status, message, exception) # type: ignore[no-any-return]

def _get_default_class(self, protocol_name: str) -> type:
Expand Down Expand Up @@ -403,7 +404,6 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901
):
different_protocols = True
any_different_protocols = True
i += 1

# If there are differences, download the file locally first
# so it's ready to upload to multiple destinations at once
Expand Down Expand Up @@ -431,6 +431,7 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901
"Pull to worker from remote source errored",
exception=exceptions.RemoteTransferError,
)
i += 1

# Before doing any file movements, check to see if file decryption or encryption is
# required on the source or destination. For any unsupported transferTypes we need to fail here first
Expand Down Expand Up @@ -639,7 +640,8 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901
and self.local_staging_dir != self.source_file_spec["directory"]
):
self.logger.debug("Removing local staging directory")
shutil.rmtree(self.local_staging_dir)
if path.exists(self.local_staging_dir):
shutil.rmtree(self.local_staging_dir)
else:
self.logger.info("Performing filewatch only")

Expand Down Expand Up @@ -769,7 +771,8 @@ def encrypt_files(
encrypted_files[output_filename] = files[file]

# Remove the temporary gnupg keychain files under f"{tmpdir}/.gnupg"
shutil.rmtree(f"{tmpdir}/.gnupg")
if path.exists(f"{tmpdir}/.gnupg"):
shutil.rmtree(f"{tmpdir}/.gnupg")

return encrypted_files

Expand Down Expand Up @@ -830,7 +833,8 @@ def decrypt_files(self, files: dict, private_key: str) -> dict:
self.logger.error(f"GPG STDERR: {decryption_data.stderr}")

# Remove the temporary gnupg keychain files under f"{tmpdir}/.gnupg"
shutil.rmtree(f"{tmpdir}/.gnupg")
if path.exists(f"{tmpdir}/.gnupg"):
shutil.rmtree(f"{tmpdir}/.gnupg")

raise exceptions.DecryptionError(
f"Error decrypting file {file}: {decryption_data.status}"
Expand All @@ -839,7 +843,9 @@ def decrypt_files(self, files: dict, private_key: str) -> dict:
decrypted_files[output_filename] = files[file]

# Remove the temporary gnupg keychain files under f"{tmpdir}/.gnupg"
shutil.rmtree(f"{tmpdir}/.gnupg")
# Check if the directory exists first
if path.exists(f"{tmpdir}/.gnupg"):
shutil.rmtree(f"{tmpdir}/.gnupg")

self.logger.debug(f"Returning decrypted files: {decrypted_files}")

Expand Down
27 changes: 27 additions & 0 deletions test/cfg/transfers/sftp-basic.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"type": "transfer",
"source": {
"hostname": "127.0.0.1",
"directory": "/home/application/testFiles/src",
"fileRegex": ".*\\.txt",
"protocol": {
"name": "sftp",
"port": 1234,
"credentials": {
"username": "{{ SSH_USERNAME }}"
}
}
},
"destination": [
{
"hostname": "{{ HOST_D }}",
"directory": "/home/application/testFiles/dest",
"protocol": {
"name": "sftp",
"credentials": {
"username": "{{ SSH_USERNAME }}"
}
}
}
]
}
2 changes: 2 additions & 0 deletions test/cfg/variables.json.j2
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"HOST_A": "172.16.0.11",
"HOST_B": "172.16.0.12",
"HOST_C": "172.16.0.21",
"HOST_D": "172.16.0.22",
"SSH_USERNAME": "application",
"TEMP_SOURCE_FOLDER": "/tmp",
"MY_FOLDER": "{{ FOLDER1 }}",
Expand Down
6 changes: 6 additions & 0 deletions test/createTestDirectories.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ rm -fr $DIR/testFiles/ssh_2/src $DIR/testFiles/ssh_2/dest $DIR/testFiles/ssh_2/a

mkdir -p $DIR/testFiles/ssh_1/dest $DIR/testFiles/ssh_1/src $DIR/testFiles/ssh_1/archive $DIR/testFiles/ssh_1/ssh
mkdir -p $DIR/testFiles/ssh_2/dest $DIR/testFiles/ssh_2/src $DIR/testFiles/ssh_2/archive $DIR/testFiles/ssh_2/ssh

rm -fr $DIR/testFiles/sftp_1/src $DIR/testFiles/sftp_1/dest $DIR/testFiles/sftp_1/archive 2>/dev/null
rm -fr $DIR/testFiles/sftp_2/src $DIR/testFiles/sftp_2/dest $DIR/testFiles/sftp_2/archive 2>/dev/null

mkdir -p $DIR/testFiles/sftp_1/dest $DIR/testFiles/sftp_1/src $DIR/testFiles/sftp_1/archive $DIR/testFiles/sftp_1/ssh
mkdir -p $DIR/testFiles/sftp_2/dest $DIR/testFiles/sftp_2/src $DIR/testFiles/sftp_2/archive $DIR/testFiles/sftp_2/ssh
Loading

0 comments on commit 3331e11

Please sign in to comment.