diff --git a/doc/changes/unreleased.md b/doc/changes/unreleased.md index c68c1bbd..176e7a47 100644 --- a/doc/changes/unreleased.md +++ b/doc/changes/unreleased.md @@ -27,3 +27,4 @@ - Update abatilo/actions-poetry from `v2.1.4` to `v3.0.0` - Update actions/setup-python from `v2` to `v5` - Added build system section to project `pyproject.toml` +- Restructure internals of bucketfs package diff --git a/doc/examples/logger.py b/doc/examples/logger.py index b44bb078..3fb47af4 100644 --- a/doc/examples/logger.py +++ b/doc/examples/logger.py @@ -1,10 +1,12 @@ import logging + from exasol.bucketfs import Service logging.basicConfig(level=logging.INFO) # Advanced Logging import logging + from exasol.bucketfs import Service # Attention: @@ -16,7 +18,7 @@ # Explicityly Configure the bucketfs logger if you need to. # # 1. Get a reference to the bucketfs logger -bucketfs_logger = logging.getLogger('exasol.bucketfs') +bucketfs_logger = logging.getLogger("exasol.bucketfs") # 2. Configure the bucketfs logger as needed # Note: diff --git a/exasol/bucketfs/__init__.py b/exasol/bucketfs/__init__.py index 059ed41b..57a7bf42 100644 --- a/exasol/bucketfs/__init__.py +++ b/exasol/bucketfs/__init__.py @@ -45,23 +45,18 @@ """ from __future__ import annotations -import hashlib -import logging -from collections import defaultdict -from pathlib import Path -from typing import ( - BinaryIO, - ByteString, - Iterable, - Iterator, - Mapping, - MutableMapping, +from exasol.bucketfs._buckets import ( + Bucket, + MappedBucket, ) -from urllib.parse import urlparse - -import requests -from requests import HTTPError -from requests.auth import HTTPBasicAuth +from exasol.bucketfs._convert import ( + as_bytes, + as_file, + as_hash, + as_string, +) +from exasol.bucketfs._error import BucketFsError +from exasol.bucketfs._service import Service __all__ = [ "Service", @@ -73,388 +68,3 @@ "as_file", "as_hash", ] - -_logger = logging.getLogger("exasol.bucketfs") - - -class BucketFsError(Exception): - """Error occurred while interacting with the bucket fs service.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - -def _lines(response): - lines = (line for line in response.text.split("\n") if not line.isspace()) - return (line for line in lines if line != "") - - -def _build_url(service_url, bucket=None, path=None) -> str: - info = urlparse(service_url) - url = f"{info.scheme}://{info.hostname}:{info.port}" - if bucket is not None: - url += f"/{bucket}" - if path is not None: - url += f"/{path}" - return url - - -def _parse_service_url(url: str) -> str: - supported_schemes = ("http", "https") - elements = urlparse(url) - if elements.scheme not in supported_schemes: - raise BucketFsError( - f"Invalid scheme: {elements.scheme}. Supported schemes [{', '.join(supported_schemes)}]" - ) - if not elements.netloc: - raise BucketFsError(f"Invalid location: {elements.netloc}") - # use bucket fs default port if no explicit port was specified - port = elements.port if elements.port else 2580 - return f"{elements.scheme}://{elements.hostname}:{port}" - - -class Service: - """Provides a simple to use api to access a bucketfs service. - - Attributes: - buckets: lists all available buckets. - """ - - def __init__( - self, - url: str, - credentials: Mapping[str, Mapping[str, str]] | None = None, - verify: bool | str = True, - ): - """Create a new Service instance. - - Args: - url: - Url of the bucketfs service, e.g. `http(s)://127.0.0.1:2580`. - credentials: - A mapping containing credentials (username and password) for buckets. - E.g. {"bucket1": { "username": "foo", "password": "bar" }} - verify: - Either a boolean, in which case it controls whether we verify - the server's TLS certificate, or a string, in which case it must be a path - to a CA bundle to use. Defaults to ``True``. - """ - self._url = _parse_service_url(url) - self._authenticator = defaultdict( - lambda: {"username": "r", "password": "read"}, - credentials if credentials is not None else {}, - ) - self._verify = verify - - @property - def buckets(self) -> MutableMapping[str, Bucket]: - """List all available buckets.""" - url = _build_url(service_url=self._url) - response = requests.get(url, verify=self._verify) - try: - _logger.info(f"Retrieving bucket list from {url}") - response.raise_for_status() - except HTTPError as ex: - raise BucketFsError( - f"Couldn't list of all buckets from: {self._url}" - ) from ex - - buckets = _lines(response) - return { - name: Bucket( - name=name, - service=self._url, - username=self._authenticator[name]["username"], - password=self._authenticator[name]["password"], - ) - for name in buckets - } - - def __str__(self) -> str: - return f"Service<{self._url}>" - - def __iter__(self) -> Iterator[str]: - yield from self.buckets - - def __getitem__(self, item: str) -> Bucket: - return self.buckets[item] - - -class Bucket: - def __init__( - self, - name: str, - service: str, - username: str, - password: str, - verify: bool | str = True, - ): - """ - Create a new bucket instance. - - Args: - name: - Name of the bucket. - service: - Url where this bucket is hosted on. - username: - Username used for authentication. - password: - Password used for authentication. - verify: - Either a boolean, in which case it controls whether we verify - the server's TLS certificate, or a string, in which case it must be a path - to a CA bundle to use. Defaults to ``True``. - """ - self._name = name - self._service = _parse_service_url(service) - self._username = username - self._password = password - self._verify = verify - - def __str__(self): - return f"Bucket<{self.name} | on: {self._service}>" - - @property - def name(self) -> str: - return self._name - - @property - def _auth(self) -> HTTPBasicAuth: - return HTTPBasicAuth(username=self._username, password=self._password) - - @property - def files(self) -> Iterable[str]: - url = _build_url(service_url=self._service, bucket=self.name) - _logger.info(f"Retrieving bucket listing for {self.name}.") - response = requests.get(url, auth=self._auth, verify=self._verify) - try: - response.raise_for_status() - except HTTPError as ex: - raise BucketFsError( - f"Couldn't retrieve file list form bucket: {self.name}" - ) from ex - return {line for line in _lines(response)} - - def __iter__(self) -> Iterator[str]: - yield from self.files - - def upload( - self, path: str, data: ByteString | BinaryIO | Iterable[ByteString] - ) -> None: - """ - Uploads a file onto this bucket - - Args: - path: in the bucket the file shall be associated with. - data: raw content of the file. - """ - url = _build_url(service_url=self._service, bucket=self.name, path=path) - _logger.info(f"Uploading {path} to bucket {self.name}.") - response = requests.put(url, data=data, auth=self._auth, verify=self._verify) - try: - response.raise_for_status() - except HTTPError as ex: - raise BucketFsError(f"Couldn't upload file: {path}") from ex - - def delete(self, path) -> None: - """ - Deletes a specific file in this bucket. - - Args: - path: points to the file which shall be deleted. - - Raises: - A BucketFsError if the operation couldn't be executed successfully. - """ - url = _build_url(service_url=self._service, bucket=self.name, path=path) - _logger.info(f"Deleting {path} from bucket {self.name}.") - response = requests.delete(url, auth=self._auth, verify=self._verify) - try: - response.raise_for_status() - except HTTPError as ex: - raise BucketFsError(f"Couldn't delete: {path}") from ex - - def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]: - """ - Downloads a specific file of this bucket. - - Args: - path: which shall be downloaded. - chunk_size: which shall be used for downloading. - - Returns: - An iterable of binary chunks representing the downloaded file. - """ - url = _build_url(service_url=self._service, bucket=self.name, path=path) - _logger.info( - f"Downloading {path} using a chunk size of {chunk_size} bytes from bucket {self.name}." - ) - with requests.get( - url, stream=True, auth=self._auth, verify=self._verify - ) as response: - try: - response.raise_for_status() - except HTTPError as ex: - raise BucketFsError(f"Couldn't download: {path}") from ex - - yield from response.iter_content(chunk_size=chunk_size) - - -class MappedBucket: - """ - Wraps a bucket and provides various convenience features to it (e.g. index based access). - - Attention: - - Even though this class provides a very convenient interface, - the functionality of this class should be used with care. - Even though it may not be obvious, all the provided features do involve interactions with a bucketfs service - in the background (upload, download, sync, etc.). - Keep this in mind when using this class. - """ - - def __init__(self, bucket: Bucket, chunk_size: int = 8192): - """ - Creates a new MappedBucket. - - Args: - bucket: which shall be wrapped. - chunk_size: which shall be used for downloads. - """ - self._bucket = bucket - self._chunk_size = chunk_size - - @property - def chunk_size(self) -> int: - """Chunk size which will be used for downloads.""" - return self._chunk_size - - @chunk_size.setter - def chunk_size(self, value: int) -> None: - self._chunk_size = value - - def __iter__(self) -> Iterable[str]: - yield from self._bucket.files - - def __setitem__( - self, key: str, value: ByteString | BinaryIO | Iterable[ByteString] - ) -> None: - """ - Uploads a file onto this bucket. - - See also Bucket:upload - """ - self._bucket.upload(path=key, data=value) - - def __delitem__(self, key: str) -> None: - """ - Deletes a file from the bucket. - - See also Bucket:delete - """ - self._bucket.delete(path=key) - - def __getitem__(self, item: str) -> Iterable[ByteString]: - """ - Downloads a file from this bucket. - - See also Bucket::download - """ - return self._bucket.download(item, self._chunk_size) - - def __str__(self): - return f"MappedBucket<{self._bucket}>" - - -def _chunk_as_bytes(chunk: int | ByteString) -> ByteString: - """ - In some scenarios python converts single bytes to integers: - >>> chunks = [type(chunk) for chunk in b"abc"] - >>> chunks - ... [, , ] - in order to cope with this transparently this wrapper can be used. - """ - if not isinstance(chunk, Iterable): - chunk = bytes([chunk]) - return chunk - - -def _bytes(chunks: Iterable[ByteString]) -> ByteString: - chunks = (_chunk_as_bytes(c) for c in chunks) - data = bytearray() - for chunk in chunks: - data.extend(chunk) - return data - - -def as_bytes(chunks: Iterable[ByteString]) -> ByteString: - """ - Transforms a set of byte chunks into a bytes like object. - - Args: - chunks: which shall be concatenated. - - Return: - A single continues byte like object. - """ - return _bytes(chunks) - - -def as_string(chunks: Iterable[ByteString], encoding: str = "utf-8") -> str: - """ - Transforms a set of byte chunks into a string. - - Args: - chunks: which shall be converted into a single string. - encoding: which shall be used to convert the bytes to a string. - - Return: - A string representation of the converted bytes. - """ - return _bytes(chunks).decode(encoding) - - -def as_file(chunks: Iterable[ByteString], filename: str | Path) -> Path: - """ - Transforms a set of byte chunks into a string. - - Args: - chunks: which shall be written to file. - filename: for the file which is to be created. - - Return: - A path to the created file. - """ - chunks = (_chunk_as_bytes(c) for c in chunks) - filename = Path(filename) - with open(filename, "wb") as f: - for chunk in chunks: - f.write(chunk) - return filename - - -def as_hash(chunks: Iterable[ByteString], algorithm: str = "sha1") -> ByteString: - """ - Calculate the hash for a set of byte chunks. - - Args: - chunks: which shall be used as input for the checksum. - algorithm: which shall be used for calculating the checksum. - - Return: - A string representing the hex digest. - """ - try: - hasher = hashlib.new(algorithm) - except ValueError as ex: - raise BucketFsError( - "Algorithm ({algorithm}) is not available, please use [{algorithms}]".format( - algorithm=algorithm, algorithms=",".join(hashlib.algorithms_available) - ) - ) from ex - - chunks = (_chunk_as_bytes(c) for c in chunks) - for chunk in chunks: - hasher.update(chunk) - return hasher.digest() diff --git a/exasol/bucketfs/_buckets.py b/exasol/bucketfs/_buckets.py new file mode 100644 index 00000000..b65f13c5 --- /dev/null +++ b/exasol/bucketfs/_buckets.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +from typing import ( + BinaryIO, + ByteString, + Iterable, + Iterator, +) + +import requests +from requests import HTTPError +from requests.auth import HTTPBasicAuth + +from exasol.bucketfs._error import BucketFsError +from exasol.bucketfs._logging import LOGGER +from exasol.bucketfs._shared import ( + _build_url, + _lines, + _parse_service_url, +) + + +class Bucket: + def __init__( + self, + name: str, + service: str, + username: str, + password: str, + verify: bool | str = True, + ): + """ + Create a new bucket instance. + + Args: + name: + Name of the bucket. + service: + Url where this bucket is hosted on. + username: + Username used for authentication. + password: + Password used for authentication. + verify: + Either a boolean, in which case it controls whether we verify + the server's TLS certificate, or a string, in which case it must be a path + to a CA bundle to use. Defaults to ``True``. + """ + self._name = name + self._service = _parse_service_url(service) + self._username = username + self._password = password + self._verify = verify + + def __str__(self): + return f"Bucket<{self.name} | on: {self._service}>" + + @property + def name(self) -> str: + return self._name + + @property + def _auth(self) -> HTTPBasicAuth: + return HTTPBasicAuth(username=self._username, password=self._password) + + @property + def files(self) -> Iterable[str]: + url = _build_url(service_url=self._service, bucket=self.name) + LOGGER.info(f"Retrieving bucket listing for {self.name}.") + response = requests.get(url, auth=self._auth, verify=self._verify) + try: + response.raise_for_status() + except HTTPError as ex: + raise BucketFsError( + f"Couldn't retrieve file list form bucket: {self.name}" + ) from ex + return {line for line in _lines(response)} + + def __iter__(self) -> Iterator[str]: + yield from self.files + + def upload( + self, path: str, data: ByteString | BinaryIO | Iterable[ByteString] + ) -> None: + """ + Uploads a file onto this bucket + + Args: + path: in the bucket the file shall be associated with. + data: raw content of the file. + """ + url = _build_url(service_url=self._service, bucket=self.name, path=path) + LOGGER.info(f"Uploading {path} to bucket {self.name}.") + response = requests.put(url, data=data, auth=self._auth, verify=self._verify) + try: + response.raise_for_status() + except HTTPError as ex: + raise BucketFsError(f"Couldn't upload file: {path}") from ex + + def delete(self, path) -> None: + """ + Deletes a specific file in this bucket. + + Args: + path: points to the file which shall be deleted. + + Raises: + A BucketFsError if the operation couldn't be executed successfully. + """ + url = _build_url(service_url=self._service, bucket=self.name, path=path) + LOGGER.info(f"Deleting {path} from bucket {self.name}.") + response = requests.delete(url, auth=self._auth, verify=self._verify) + try: + response.raise_for_status() + except HTTPError as ex: + raise BucketFsError(f"Couldn't delete: {path}") from ex + + def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]: + """ + Downloads a specific file of this bucket. + + Args: + path: which shall be downloaded. + chunk_size: which shall be used for downloading. + + Returns: + An iterable of binary chunks representing the downloaded file. + """ + url = _build_url(service_url=self._service, bucket=self.name, path=path) + LOGGER.info( + f"Downloading {path} using a chunk size of {chunk_size} bytes from bucket {self.name}." + ) + with requests.get( + url, stream=True, auth=self._auth, verify=self._verify + ) as response: + try: + response.raise_for_status() + except HTTPError as ex: + raise BucketFsError(f"Couldn't download: {path}") from ex + + yield from response.iter_content(chunk_size=chunk_size) + + +class MappedBucket: + """ + Wraps a bucket and provides various convenience features to it (e.g. index based access). + + Attention: + + Even though this class provides a very convenient interface, + the functionality of this class should be used with care. + Even though it may not be obvious, all the provided features do involve interactions with a bucketfs service + in the background (upload, download, sync, etc.). + Keep this in mind when using this class. + """ + + def __init__(self, bucket: Bucket, chunk_size: int = 8192): + """ + Creates a new MappedBucket. + + Args: + bucket: which shall be wrapped. + chunk_size: which shall be used for downloads. + """ + self._bucket = bucket + self._chunk_size = chunk_size + + @property + def chunk_size(self) -> int: + """Chunk size which will be used for downloads.""" + return self._chunk_size + + @chunk_size.setter + def chunk_size(self, value: int) -> None: + self._chunk_size = value + + def __iter__(self) -> Iterable[str]: + yield from self._bucket.files + + def __setitem__( + self, key: str, value: ByteString | BinaryIO | Iterable[ByteString] + ) -> None: + """ + Uploads a file onto this bucket. + + See also Bucket:upload + """ + self._bucket.upload(path=key, data=value) + + def __delitem__(self, key: str) -> None: + """ + Deletes a file from the bucket. + + See also Bucket:delete + """ + self._bucket.delete(path=key) + + def __getitem__(self, item: str) -> Iterable[ByteString]: + """ + Downloads a file from this bucket. + + See also Bucket::download + """ + return self._bucket.download(item, self._chunk_size) + + def __str__(self): + return f"MappedBucket<{self._bucket}>" diff --git a/exasol/bucketfs/_convert.py b/exasol/bucketfs/_convert.py new file mode 100644 index 00000000..34e158d1 --- /dev/null +++ b/exasol/bucketfs/_convert.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path +from typing import ( + ByteString, + Iterable, +) + + +def _chunk_as_bytes(chunk: int | ByteString) -> ByteString: + """ + In some scenarios python converts single bytes to integers: + >>> chunks = [type(chunk) for chunk in b"abc"] + >>> chunks + ... [, , ] + in order to cope with this transparently this wrapper can be used. + """ + if not isinstance(chunk, Iterable): + chunk = bytes([chunk]) + return chunk + + +def _bytes(chunks: Iterable[ByteString]) -> ByteString: + chunks = (_chunk_as_bytes(c) for c in chunks) + data = bytearray() + for chunk in chunks: + data.extend(chunk) + return data + + +def as_bytes(chunks: Iterable[ByteString]) -> ByteString: + """ + Transforms a set of byte chunks into a bytes like object. + + Args: + chunks: which shall be concatenated. + + Return: + A single continues byte like object. + """ + return _bytes(chunks) + + +def as_string(chunks: Iterable[ByteString], encoding: str = "utf-8") -> str: + """ + Transforms a set of byte chunks into a string. + + Args: + chunks: which shall be converted into a single string. + encoding: which shall be used to convert the bytes to a string. + + Return: + A string representation of the converted bytes. + """ + return _bytes(chunks).decode(encoding) + + +def as_file(chunks: Iterable[ByteString], filename: str | Path) -> Path: + """ + Transforms a set of byte chunks into a string. + + Args: + chunks: which shall be written to file. + filename: for the file which is to be created. + + Return: + A path to the created file. + """ + chunks = (_chunk_as_bytes(c) for c in chunks) + filename = Path(filename) + with open(filename, "wb") as f: + for chunk in chunks: + f.write(chunk) + return filename + + +def as_hash(chunks: Iterable[ByteString], algorithm: str = "sha1") -> ByteString: + """ + Calculate the hash for a set of byte chunks. + + Args: + chunks: which shall be used as input for the checksum. + algorithm: which shall be used for calculating the checksum. + + Return: + A string representing the hex digest. + """ + try: + hasher = hashlib.new(algorithm) + except ValueError as ex: + raise BucketFsError( + "Algorithm ({algorithm}) is not available, please use [{algorithms}]".format( + algorithm=algorithm, algorithms=",".join(hashlib.algorithms_available) + ) + ) from ex + + chunks = (_chunk_as_bytes(c) for c in chunks) + for chunk in chunks: + hasher.update(chunk) + return hasher.digest() diff --git a/exasol/bucketfs/_error.py b/exasol/bucketfs/_error.py new file mode 100644 index 00000000..91984f59 --- /dev/null +++ b/exasol/bucketfs/_error.py @@ -0,0 +1,8 @@ +from __future__ import annotations + + +class BucketFsError(Exception): + """Error occurred while interacting with the bucket fs service.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) diff --git a/exasol/bucketfs/_logging.py b/exasol/bucketfs/_logging.py new file mode 100644 index 00000000..878cc634 --- /dev/null +++ b/exasol/bucketfs/_logging.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +import logging + +LOGGER = logging.getLogger("exasol.bucketfs") diff --git a/exasol/bucketfs/_service.py b/exasol/bucketfs/_service.py new file mode 100644 index 00000000..8ea80ca7 --- /dev/null +++ b/exasol/bucketfs/_service.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +from collections import defaultdict +from typing import ( + Iterator, + Mapping, + MutableMapping, +) + +import requests +from requests import HTTPError + +from exasol.bucketfs._buckets import Bucket +from exasol.bucketfs._error import BucketFsError +from exasol.bucketfs._logging import LOGGER +from exasol.bucketfs._shared import ( + _build_url, + _lines, + _parse_service_url, +) + + +class Service: + """Provides a simple to use api to access a bucketfs service. + + Attributes: + buckets: lists all available buckets. + """ + + def __init__( + self, + url: str, + credentials: Mapping[str, Mapping[str, str]] | None = None, + verify: bool | str = True, + ): + """Create a new Service instance. + + Args: + url: + Url of the bucketfs service, e.g. `http(s)://127.0.0.1:2580`. + credentials: + A mapping containing credentials (username and password) for buckets. + E.g. {"bucket1": { "username": "foo", "password": "bar" }} + verify: + Either a boolean, in which case it controls whether we verify + the server's TLS certificate, or a string, in which case it must be a path + to a CA bundle to use. Defaults to ``True``. + """ + self._url = _parse_service_url(url) + self._authenticator = defaultdict( + lambda: {"username": "r", "password": "read"}, + credentials if credentials is not None else {}, + ) + self._verify = verify + + @property + def buckets(self) -> MutableMapping[str, Bucket]: + """List all available buckets.""" + url = _build_url(service_url=self._url) + response = requests.get(url, verify=self._verify) + try: + LOGGER.info(f"Retrieving bucket list from {url}") + response.raise_for_status() + except HTTPError as ex: + raise BucketFsError( + f"Couldn't list of all buckets from: {self._url}" + ) from ex + + buckets = _lines(response) + return { + name: Bucket( + name=name, + service=self._url, + username=self._authenticator[name]["username"], + password=self._authenticator[name]["password"], + ) + for name in buckets + } + + def __str__(self) -> str: + return f"Service<{self._url}>" + + def __iter__(self) -> Iterator[str]: + yield from self.buckets + + def __getitem__(self, item: str) -> Bucket: + return self.buckets[item] diff --git a/exasol/bucketfs/_shared.py b/exasol/bucketfs/_shared.py new file mode 100644 index 00000000..631216e4 --- /dev/null +++ b/exasol/bucketfs/_shared.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from urllib.parse import urlparse + +from exasol.bucketfs._error import BucketFsError + + +def _lines(response): + lines = (line for line in response.text.split("\n") if not line.isspace()) + return (line for line in lines if line != "") + + +def _build_url(service_url, bucket=None, path=None) -> str: + info = urlparse(service_url) + url = f"{info.scheme}://{info.hostname}:{info.port}" + if bucket is not None: + url += f"/{bucket}" + if path is not None: + url += f"/{path}" + return url + + +def _parse_service_url(url: str) -> str: + supported_schemes = ("http", "https") + elements = urlparse(url) + if elements.scheme not in supported_schemes: + raise BucketFsError( + f"Invalid scheme: {elements.scheme}. Supported schemes [{', '.join(supported_schemes)}]" + ) + if not elements.netloc: + raise BucketFsError(f"Invalid location: {elements.netloc}") + # use bucket fs default port if no explicit port was specified + port = elements.port if elements.port else 2580 + return f"{elements.scheme}://{elements.hostname}:{port}"