+

Source code for exasol.bucketfs._path

+from __future__ import annotations
+from typing import Protocol, ByteString, BinaryIO, Iterable, Generator, Optional
+from enum import Enum, auto
+from pathlib import PurePath, PureWindowsPath
+import errno
+import os
+from io import IOBase
+from exasol.bucketfs._buckets import BucketLike, SaaSBucket, MountedBucket
+from exasol.bucketfs._service import Service
+from exasol.bucketfs._error import BucketFsError
+
+ARCHIVE_SUFFIXES = [".tar", ".gz", ".tgz", ".zip", ".tar"]
+
+
+class StorageBackend(Enum):
+    onprem = auto()
+    saas = auto()
+    mounted = auto()
+
+
+
[docs]class PathLike(Protocol): + """ + Definition of the PathLike view of the files in a Bucket. + """ + + @property + def name(self) -> str: + """ + A string representing the final path component, excluding the drive and root, if any. + """ + + @property + def suffix(self) -> str: + """ + The file extension of the final component, if any. + """ + + @property + def root(self) -> str: + """ + A string representing the root, if any. + """ + + @property + def parent(self) -> str: + """ + The logical parent of this path. + """ + +
[docs] def as_uri(self) -> str: + """ + Represent the path as a file URI. Can be used to reconstruct the location/path. + """
+ +
[docs] def as_udf_path(self) -> str: + """ + This method is specific to a BucketFS flavour of the PathLike. + It returns a corresponding path, as it's seen from a UDF. + """
+ +
[docs] def exists(self) -> bool: + """ + Return True if the path points to an existing file or directory. + """
+ +
[docs] def is_dir(self) -> bool: + """ + Return True if the path points to a directory, False if it points to another kind of file. + """
+ +
[docs] def is_file(self) -> bool: + """ + Return True if the path points to a regular file, False if it points to another kind of file. + """
+ +
[docs] def read(self, chunk_size: int = 8192) -> Iterable[ByteString]: + """ + Read the content of the file behind this path. + + Only works for PathLike objects which return True for `is_file()`. + + Args: + chunk_size: which will be yielded by the iterator. + + Returns: + Returns an iterator which can be used to read the contents of the path in chunks. + + Raises: + FileNotFoundError: If the file does not exist. + IsADirectoryError: if the pathlike object points to a directory. + """
+ +
[docs] def write(self, data: ByteString | BinaryIO | Iterable[ByteString]) -> None: + """ + Writes data to this path. + + Q. Should it create the parent directory if it doesn't exit? + A. Yes, it should. + + After successfully writing to this path `exists` will yield true for this path. + If the file already existed it will be overwritten. + + Args: + data: which shall be writen to the path. + + Raises: + NotAFileError: if the pathlike object is not a file path. + """
+ +
[docs] def rm(self) -> None: + """ + Remove this file. + + Note: + If `exists()` and is_file yields true for this path, the path will be deleted, + otherwise exception will be thrown. + + Raises: + FileNotFoundError: If the file does not exist. + """
+ +
[docs] def rmdir(self, recursive: bool = False) -> None: + """ + Removes this directory. + + Note: In order to stay close to pathlib, by default `rmdir` with `recursive` + set to `False` won't delete non-empty directories. + + Args: + recursive: if true the directory itself and its entire contents (files and subdirs) + will be deleted. If false and the directory is not empty an error will be thrown. + + Raises: + FileNotFoundError: If the file does not exist. + PermissionError: If recursive is false and the directory is not empty. + """
+ +
[docs] def joinpath(self, *path_segments) -> "PathLike": + """ + Calling this method is equivalent to combining the path with each of the given path segments in turn. + + Returns: + A new pathlike object pointing the combined path. + """
+ +
[docs] def walk(self, top_down: bool = True) -> Generator[tuple["PathLike", list[str], list[str]], None, None]: + """ + Generate the file names in a directory tree by walking the tree either top-down or bottom-up. + + Note: + Try to mimik https://docs.python.org/3/library/pathlib.html#pathlib.Path.walk as closely as possible, + except the functionality associated with the parameters of the `pathlib` walk. + + Yields: + A 3-tuple of (dirpath, dirnames, filenames). + """
+ +
[docs] def iterdir(self) -> Generator["PathLike", None, None]: + """ + When the path points to a directory, yield path objects of the directory contents. + + Note: + If `path` points to a file then `iterdir()` will yield nothing. + + Yields: + All direct children of the pathlike object. + """
+ + def __truediv__(self, other): + """ + Overload / for joining, see also joinpath or `pathlib.Path`. + """
+ + +def _remove_archive_suffix(path: PurePath) -> PurePath: + while path.suffix in ARCHIVE_SUFFIXES: + path = path.with_suffix('') + return path + + +class _BucketFile: + """ + A node in a perceived file structure of a bucket. + This can be a file, a directory or both. + """ + + def __init__(self, name: str, parent: str = ''): + self._name = name + self._path = f'{parent}/{name}' if parent else name + self._children: Optional[dict[str, "_BucketFile"]] = None + self.is_file = False + + @property + def name(self): + return self._name + + @property + def path(self): + return self._path + + @property + def is_dir(self): + # The node can be a directory as well as a file, + # hence is the is_dir property, independent of is_file. + return bool(self._children) + + def __iter__(self): + if self._children is None: + return iter(()) + return iter(self._children.values()) + + def get_child(self, child_name: str) -> "_BucketFile": + """ + Returns a child object with the specified name. + Creates one if it hasn't been created yet. + """ + if self._children is None: + self._children = {} + child: Optional["_BucketFile"] = None + else: + child = self._children.get(child_name) + if child is None: + child = _BucketFile(child_name, self._path) + self._children[child_name] = child + return child + + +class BucketPath: + """ + Implementation of the PathLike view for files in a bucket. + """ + + def __init__(self, path: str | PurePath, bucket_api: BucketLike): + """ + :param path: A pure path of a file or directory. The path is assumed to + be relative to the bucket. It is also permissible to have + this path in an absolute form, e.g. '/dir1/...' + or '\\\\abc\\...\\'. + + All Pure Path methods of the PathLike protocol will be + delegated to this object. + + :param bucket_api: An object supporting the Bucket API protocol. + """ + self._path = PurePath(path) + self._bucket_api = bucket_api + + def _get_relative_posix(self): + """ + Returns the pure path of this object as a string, in the format of a bucket + file: 'dir/subdir/.../filename'. + """ + path_str = str(self._path)[len(self._path.anchor):] + if isinstance(self._path, PureWindowsPath): + path_str = path_str.replace('\\', '/') + if path_str == '.': + path_str = '' + return path_str + + def _navigate(self) -> Optional[_BucketFile]: + """ + Reads the bucket file structure and navigates to the node corresponding to the + pure path of this object. Returns None if such node doesn't exist, otherwise + returns this node. + """ + path_str = self._get_relative_posix() + path_len = len(path_str) + path_root: Optional[_BucketFile] = None + for file_name in self._bucket_api.files: + if file_name.startswith(path_str): + path_root = path_root or _BucketFile(self._path.name, str(self.parent)) + node = path_root + for part in file_name[path_len:].split('/'): + if part: + node = node.get_child(part) + node.is_file = True + return path_root + + @property + def name(self) -> str: + return self._path.name + + @property + def suffix(self) -> str: + return self._path.suffix + + @property + def root(self) -> str: + return self._path.root + + @property + def parent(self) -> str: + return self._path.parent.name + + def as_uri(self) -> str: + return self._path.as_uri() + + def as_udf_path(self) -> str: + return str(PurePath(self._bucket_api.udf_path) / + _remove_archive_suffix(self._path)) + + def exists(self) -> bool: + return self._navigate() is not None + + def is_dir(self) -> bool: + current_node = self._navigate() + return (current_node is not None) and current_node.is_dir + + def is_file(self) -> bool: + current_node = self._navigate() + return (current_node is not None) and current_node.is_file + + def read(self, chunk_size: int = 8192) -> Iterable[ByteString]: + return self._bucket_api.download(str(self._path), chunk_size) + + def write(self, data: ByteString | BinaryIO | Iterable[ByteString]) -> None: + if (not isinstance(data, IOBase) and isinstance(data, Iterable) and + all(isinstance(chunk, ByteString) for chunk in data)): + data = b''.join(data) + self._bucket_api.upload(str(self._path), data) + + def rm(self) -> None: + current_node = self._navigate() + if current_node is None: + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(self._path)) + if not current_node.is_file: + raise IsADirectoryError(errno.EISDIR, os.strerror(errno.EISDIR), str(self._path)) + self._bucket_api.delete(str(self._path)) + + def rmdir(self, recursive: bool = False) -> None: + current_node = self._navigate() + if current_node is None: + # There is no such thing as an empty directory. So, for the sake of + # compatibility with the PathLike, any directory that doesn't exist + # is considered empty. + return + if not current_node.is_dir: + raise NotADirectoryError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), str(self._path)) + if recursive: + self._rmdir_recursive(current_node) + else: + raise OSError(errno.ENOTEMPTY, os.strerror(errno.ENOTEMPTY), str(self._path)) + + def _rmdir_recursive(self, node: _BucketFile): + for child in node: + self._rmdir_recursive(child) + if node.is_file: + self._bucket_api.delete(node.path) + + def joinpath(self, *path_segments) -> PathLike: + # The path segments can be of either this type or an os.PathLike. + cls = type(self) + seg_paths = [seg._path if isinstance(seg, cls) else seg for seg in path_segments] + new_path = self._path.joinpath(*seg_paths) + return cls(new_path, self._bucket_api) + + def walk(self, top_down: bool = True) -> Generator[tuple[PathLike, list[str], list[str]], None, None]: + current_node = self._navigate() + if current_node is None: + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(self._path)) + + if current_node.is_dir: + yield from self._walk_recursive(current_node, top_down) + + def _walk_recursive(self, node: _BucketFile, top_down: bool) -> \ + Generator[tuple[PathLike, list[str], list[str]], None, None]: + + bucket_path = BucketPath(node.path, self._bucket_api) + dir_list: list[str] = [] + file_list: list[str] = [] + for child in node: + if child.is_file: + file_list.append(child.name) + if child.is_dir: + dir_list.append(child.name) + + # The difference between the top_down and bottom_up is in the order of + # yielding the current node and its children. Top down - current node first, + # bottom_up - children first. + if top_down: + yield bucket_path, dir_list, file_list + for child in node: + if child.is_dir: + yield from self._walk_recursive(child, top_down) + if not top_down: + yield bucket_path, dir_list, file_list + + def iterdir(self) -> Generator[PathLike, None, None]: + current_node = self._navigate() + if current_node is None: + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(self._path)) + if not current_node.is_dir: + raise NotADirectoryError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), str(self._path)) + + for child in current_node: + yield BucketPath(self._path / child.name, self._bucket_api) + + def __truediv__(self, other): + # The other object can be of either this type or an os.PathLike. + cls = type(self) + new_path = self._path / (other._path if isinstance(other, cls) else other) + return cls(new_path, self._bucket_api) + + def __str__(self): + return str(self._path) + + +def _create_onprem_bucket(url: str, + username: str, + password: str, + bucket_name: str = 'default', + verify: bool | str = True, + service_name: Optional[str] = None + ) -> BucketLike: + """ + Creates an on-prem bucket. + """ + credentials = {bucket_name: {'username': username, 'password': password}} + service = Service(url, credentials, verify, service_name) + buckets = service.buckets + if bucket_name not in buckets: + raise BucketFsError(f'Bucket {bucket_name} does not exist.') + return buckets[bucket_name] + + +def _create_saas_bucket(account_id: str, + database_id: str, + pat: str, + url: str = 'https://cloud.exasol.com' + ) -> BucketLike: + """ + Creates a SaaS bucket. + """ + return SaaSBucket(url=url, account_id=account_id, database_id=database_id, pat=pat) + + +def _create_mounted_bucket(service_name: str = 'bfsdefault', + bucket_name: str = 'default', + base_path: Optional[str] = None + ) -> BucketLike: + """ + Creates a bucket mounted to a UDF. + """ + bucket = MountedBucket(service_name, bucket_name, base_path) + if not bucket.root.exists(): + raise BucketFsError(f'Service {service_name} or bucket {bucket_name} do not exist.') + return bucket + + +
[docs]def build_path(**kwargs) -> PathLike: + """ + Creates a PathLike object based on a bucket in one of the BucketFS storage backends. + It provides the same interface for the following BucketFS implementations: + - On-Premises + - SaaS + - BucketFS files mounted as read-only directory in a UDF. + + Arguments: + backend: + This is a mandatory parameter that indicates the BucketFS storage backend. + The available backends are defined in the StorageBackend enumeration, + Currently, these are "onprem", "saas" and "mounted". The parameter value + can be provided either as a string, e.g. "onprem", or as an enum, e.g. + StorageBackend.onprem. + path: + Optional parameter that selects a path within the bucket. If not provided + the returned PathLike objects corresponds to the root of the bucket. Hence, + an alternative way of creating a PathLike pointing to a particular file or + directory is as in the code below. + path = build_path(...) / "the_desired_path" + + The rest of the arguments are backend specific. + + On-prem arguments: + url: + Url of the BucketFS service, e.g. `http(s)://127.0.0.1:2580`. + username: + BucketFS username (generally, different from the DB username). + password: + BucketFS user password. + bucket_name: + Name of the bucket. Currently, a PathLike cannot span multiple buckets. + verify: + Either a boolean, in which case it controls whether we verify the server's + TLS certificate, or a string, in which case it must be a path to a CA bundle + to use. Defaults to ``True``. + service_name: + Optional name of the BucketFS service. + + SaaS arguments: + url: + Url of the Exasol SaaS. Defaults to 'https://cloud.exasol.com'. + account_id: + SaaS user account ID, e.g. 'org_LVeOj4pwXhPatNz5' + (given example is not a valid ID of an existing account). + database_id: + Database ID, e.g. 'msduZKlMR8QCP_MsLsVRwy' + (given example is not a valid ID of an existing database). + pat: + Personal Access Token, e.g. 'exa_pat_aj39AsM3bYR9bQ4qk2wiG8SWHXbRUGNCThnep5YV73az6A' + (given example is not a valid PAT). + + Mounted BucketFS directory arguments: + service_name: + Name of the BucketFS service (not a service url). Defaults to 'bfsdefault'. + bucket_name: + Name of the bucket. Currently, a PathLike cannot span multiple buckets. + base_path: + Explicitly specified root path in a file system. This is an alternative to + providing the service_name and the bucket_name. + """ + + backend = kwargs.pop('backend', StorageBackend.onprem) + path = kwargs.pop('path') if 'path' in kwargs else '' + + if isinstance(backend, str): + backend = StorageBackend[backend.lower()] + if backend == StorageBackend.onprem: + bucket = _create_onprem_bucket(**kwargs) + elif backend == StorageBackend.saas: + bucket = _create_saas_bucket(**kwargs) + else: + bucket = _create_mounted_bucket(**kwargs) + + return BucketPath(path, bucket)
+
+