Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fail_on_file_not_exist option to SFTPToS3Operator #44320

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class SFTPToS3Operator(BaseOperator):
uploading the file to S3.
:param use_temp_file: If True, copies file first to local,
if False streams file from SFTP to S3.
:param fail_on_file_not_exist: If True, operator fails when file does not exist,
if False, operator will not fail and skips transfer. Default is True.
"""

template_fields: Sequence[str] = ("s3_key", "sftp_path", "s3_bucket")
Expand All @@ -62,6 +64,7 @@ def __init__(
sftp_conn_id: str = "ssh_default",
s3_conn_id: str = "aws_default",
use_temp_file: bool = True,
fail_on_file_not_exist: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -71,6 +74,7 @@ def __init__(
self.s3_key = s3_key
self.s3_conn_id = s3_conn_id
self.use_temp_file = use_temp_file
self.fail_on_file_not_exist = fail_on_file_not_exist

@staticmethod
def get_s3_key(s3_key: str) -> str:
Expand All @@ -85,6 +89,14 @@ def execute(self, context: Context) -> None:

sftp_client = ssh_hook.get_conn().open_sftp()

try:
sftp_client.stat(self.sftp_path)
except FileNotFoundError:
if self.fail_on_file_not_exist:
raise
self.log.info("File %s not found on SFTP server. Skipping transfer.", self.sftp_path)
return
Guaqamole marked this conversation as resolved.
Show resolved Hide resolved

if self.use_temp_file:
with NamedTemporaryFile("w") as f:
sftp_client.get(self.sftp_path, f.name)
Expand Down
37 changes: 37 additions & 0 deletions providers/tests/amazon/aws/transfers/test_sftp_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,40 @@ def test_sftp_to_s3_operation(self, use_temp_file):
conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
conn.delete_bucket(Bucket=self.s3_bucket)
assert not s3_hook.check_for_bucket(self.s3_bucket)

@pytest.mark.parametrize("fail_on_file_not_exist", [True, False])
@mock_aws
@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_sftp_to_s3_fail_on_file_not_exist(self, fail_on_file_not_exist):
s3_hook = S3Hook(aws_conn_id=None)
conn = boto3.client("s3")
conn.create_bucket(Bucket=self.s3_bucket)
assert s3_hook.check_for_bucket(self.s3_bucket)

if fail_on_file_not_exist:
with pytest.raises(FileNotFoundError):
SFTPToS3Operator(
s3_bucket=self.s3_bucket,
s3_key=self.s3_key,
sftp_path="/tmp/wrong_path.txt",
sftp_conn_id=SFTP_CONN_ID,
s3_conn_id=S3_CONN_ID,
fail_on_file_not_exist=fail_on_file_not_exist,
task_id="test_sftp_to_s3",
dag=self.dag,
).execute(None)
else:
SFTPToS3Operator(
s3_bucket=self.s3_bucket,
s3_key=self.s3_key,
sftp_path=self.sftp_path,
sftp_conn_id=SFTP_CONN_ID,
s3_conn_id=S3_CONN_ID,
fail_on_file_not_exist=fail_on_file_not_exist,
task_id="test_sftp_to_s3",
dag=self.dag,
).execute(None)

conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
conn.delete_bucket(Bucket=self.s3_bucket)
assert not s3_hook.check_for_bucket(self.s3_bucket)