Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[16.0] fs_storage: support SSH private keys authentication #331

Open
wants to merge 4 commits into
base: 16.0
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions fs_storage/models/fs_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import base64
import functools
import inspect
import io
import json
import logging
import os.path
Expand All @@ -21,6 +22,19 @@

_logger = logging.getLogger(__name__)

try:
import paramiko

SSH_PKEYS = {

Check warning on line 28 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L28

Added line #L28 was not covered by tests
"DSS": paramiko.DSSKey,
"RSA": paramiko.RSAKey,
"EC": paramiko.ECDSAKey,
"OPENSSH": paramiko.Ed25519Key,
}
except ImportError: # pragma: no cover
_logger.debug("Cannot `import paramiko`.")
SSH_PKEYS = {}


# TODO: useful for the whole OCA?
def deprecated(reason):
Expand Down Expand Up @@ -358,13 +372,53 @@
and isinstance(options["auth"], list)
):
options["auth"] = tuple(options["auth"])
if (
self.protocol in ("sftp", "ssh")
and "pkey" in options
and isinstance(options["pkey"], str)
):
# Handle SSH private keys by replacing 'pkey' parameter by a
# paramiko.pkey.PKey object
pkey_file = io.StringIO(options["pkey"])
pkey = self._get_ssh_private_key(

Check warning on line 383 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L382-L383

Added lines #L382 - L383 were not covered by tests
pkey_file,
passphrase=options.get("passphrase"),
)
options["pkey"] = pkey

Check warning on line 387 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L387

Added line #L387 was not covered by tests
options = self._recursive_add_odoo_storage_path(options)
fs = fsspec.filesystem(self.protocol, **options)
directory_path = self.directory_path
if directory_path:
fs = fsspec.filesystem("rooted_dir", path=directory_path, fs=fs)
return fs

def _detect_ssh_private_key_type(self, pkey_file):
"""Detect SSH private key type (RSA, DSS...)."""
# Code copied and adapted from 'paramiko.pkey.PKey._read_private_key' method
# https://github.com/paramiko/paramiko/blob/main/paramiko/pkey.py#L498C9-L498C26
pkey_file.seek(0)
lines = pkey_file.readlines()
pkey_file.seek(0)

Check warning on line 401 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L399-L401

Added lines #L399 - L401 were not covered by tests
if not lines:
raise paramiko.SSHException("no lines in private key file")
start = 0
m = paramiko.pkey.PKey.BEGIN_TAG.match(lines[start])
line_range = len(lines) - 1

Check warning on line 406 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L403-L406

Added lines #L403 - L406 were not covered by tests
while start < line_range and not m:
start += 1
m = paramiko.pkey.PKey.BEGIN_TAG.match(lines[start])
start += 1
keytype = m.group(1) if m else None

Check warning on line 411 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L408-L411

Added lines #L408 - L411 were not covered by tests
if keytype:
return keytype

Check warning on line 413 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L413

Added line #L413 was not covered by tests

def _get_ssh_private_key(self, pkey_file, passphrase=None):
"""Build the expected `paramiko.pkey.PKey` object."""
pkey_type = self._detect_ssh_private_key_type(pkey_file)

Check warning on line 417 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L417

Added line #L417 was not covered by tests
if not pkey_type:
raise paramiko.SSHException("not a valid private key file")
return SSH_PKEYS[pkey_type].from_private_key(pkey_file, password=passphrase)

Check warning on line 420 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L419-L420

Added lines #L419 - L420 were not covered by tests

# Deprecated methods used to ease the migration from the storage_backend addons
# to the fs_storage addons. These methods will be removed in the future (Odoo 18)
@deprecated("Please use _get_filesystem() instead and the fsspec API directly.")
Expand Down
Loading