diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index a18dc27d86201a..87aec946f8e2d4 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -245,9 +245,7 @@ powerbi_report_server = {"requests", "requests_ntlm"} -slack = { - "slack-sdk==3.18.1" -} +slack = {"slack-sdk==3.18.1"} databricks = { # 0.1.11 appears to have authentication issues with azure databricks @@ -671,10 +669,10 @@ "file = datahub.ingestion.reporting.file_reporter:FileReporter", ], "datahub.custom_packages": [], - "datahub.ingestion.fs.plugins": [ - "s3 = datahub.ingestion.source.fs.s3_fs:S3FileSystem", - "file = datahub.ingestion.source.fs.local_fs:LocalFileSystem", - "http = datahub.ingestion.source.fs.http_fs:HttpFileSystem", + "datahub.fs.plugins": [ + "s3 = datahub.ingestion.fs.s3_fs:S3FileSystem", + "file = datahub.ingestion.fs.local_fs:LocalFileSystem", + "http = datahub.ingestion.fs.http_fs:HttpFileSystem", ], } diff --git a/metadata-ingestion/src/datahub/ingestion/source/fs/__init__.py b/metadata-ingestion/src/datahub/ingestion/fs/__init__.py similarity index 100% rename from metadata-ingestion/src/datahub/ingestion/source/fs/__init__.py rename to metadata-ingestion/src/datahub/ingestion/fs/__init__.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/fs/fs_base.py b/metadata-ingestion/src/datahub/ingestion/fs/fs_base.py similarity index 93% rename from metadata-ingestion/src/datahub/ingestion/source/fs/fs_base.py rename to metadata-ingestion/src/datahub/ingestion/fs/fs_base.py index d38e645eeadb1a..b099d4d332946a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fs/fs_base.py +++ b/metadata-ingestion/src/datahub/ingestion/fs/fs_base.py @@ -35,5 +35,6 @@ def list(self, path: str) -> Iterable[FileInfo]: def get_path_schema(path: str) -> str: scheme = parse.urlparse(path).scheme if scheme == "": + # This makes the default schema "file" for local paths. scheme = "file" return scheme diff --git a/metadata-ingestion/src/datahub/ingestion/fs/fs_registry.py b/metadata-ingestion/src/datahub/ingestion/fs/fs_registry.py new file mode 100644 index 00000000000000..cb2349723a4cdb --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/fs/fs_registry.py @@ -0,0 +1,5 @@ +from datahub.ingestion.api.registry import PluginRegistry +from datahub.ingestion.fs.fs_base import FileSystem + +fs_registry = PluginRegistry[FileSystem]() +fs_registry.register_from_entrypoint("datahub.fs.plugins") diff --git a/metadata-ingestion/src/datahub/ingestion/source/fs/http_fs.py b/metadata-ingestion/src/datahub/ingestion/fs/http_fs.py similarity index 92% rename from metadata-ingestion/src/datahub/ingestion/source/fs/http_fs.py rename to metadata-ingestion/src/datahub/ingestion/fs/http_fs.py index 537b7b379488a5..040eab18ddcb79 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fs/http_fs.py +++ b/metadata-ingestion/src/datahub/ingestion/fs/http_fs.py @@ -3,7 +3,7 @@ import requests import smart_open -from datahub.ingestion.source.fs.fs_base import FileInfo, FileSystem +from datahub.ingestion.fs.fs_base import FileInfo, FileSystem class HttpFileSystem(FileSystem): diff --git a/metadata-ingestion/src/datahub/ingestion/source/fs/local_fs.py b/metadata-ingestion/src/datahub/ingestion/fs/local_fs.py similarity index 82% rename from metadata-ingestion/src/datahub/ingestion/source/fs/local_fs.py rename to metadata-ingestion/src/datahub/ingestion/fs/local_fs.py index 4570f68980f69f..3f4467ee1c762f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fs/local_fs.py +++ b/metadata-ingestion/src/datahub/ingestion/fs/local_fs.py @@ -2,9 +2,7 @@ import pathlib from typing import Any, Iterable -import smart_open - -from datahub.ingestion.source.fs.fs_base import FileInfo, FileSystem +from datahub.ingestion.fs.fs_base import FileInfo, FileSystem class LocalFileSystem(FileSystem): @@ -13,7 +11,7 @@ def create(cls, **kwargs): return LocalFileSystem() def open(self, path: str, **kwargs: Any) -> Any: - return smart_open.open(path, mode="rb", transport_params=kwargs) + return pathlib.Path(path).open(mode="rb", transport_params=kwargs) def list(self, path: str) -> Iterable[FileInfo]: p = pathlib.Path(path) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fs/s3_fs.py b/metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py similarity index 56% rename from metadata-ingestion/src/datahub/ingestion/source/fs/s3_fs.py rename to metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py index cb508e7232718e..a135b7b6ce8375 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fs/s3_fs.py +++ b/metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py @@ -1,3 +1,4 @@ +from collections.abc import Iterator from dataclasses import dataclass from typing import Any, Iterable from urllib.parse import urlparse @@ -5,8 +6,8 @@ import boto3 import smart_open -from datahub.ingestion.source.fs.fs_base import FileInfo, FileSystem -from datahub.ingestion.source.fs.s3_list_iterator import S3ListIterator +from datahub.ingestion.fs import s3_fs +from datahub.ingestion.fs.fs_base import FileInfo, FileSystem def parse_s3_path(path: str) -> "S3Path": @@ -30,6 +31,49 @@ def __str__(self): return f"S3Path({self.bucket}, {self.key})" +class S3ListIterator(Iterator): + + MAX_KEYS = 1000 + + def __init__( + self, s3_client: Any, bucket: str, prefix: str, max_keys: int = MAX_KEYS + ) -> None: + self._s3 = s3_client + self._bucket = bucket + self._prefix = prefix + self._max_keys = max_keys + self._file_statuses: Iterator = iter([]) + self._token = "" + self.fetch() + + def __next__(self) -> FileInfo: + try: + return next(self._file_statuses) + except StopIteration: + if self._token: + self.fetch() + return next(self._file_statuses) + else: + raise StopIteration() + + def fetch(self): + params = dict(Bucket=self._bucket, Prefix=self._prefix, MaxKeys=self._max_keys) + if self._token: + params.update(ContinuationToken=self._token) + + response = self._s3.list_objects_v2(**params) + + s3_fs.assert_ok_status(response) + + self._file_statuses = iter( + [ + FileInfo(f"s3://{response['Name']}/{x['Key']}", x["Size"], is_file=True) + for x in response.get("Contents", []) + ] + ) + self._token = response.get("NextContinuationToken") + + class S3FileSystem(FileSystem): def __init__(self, **kwargs): self.s3 = boto3.client("s3", **kwargs) diff --git a/metadata-ingestion/src/datahub/ingestion/source/file.py b/metadata-ingestion/src/datahub/ingestion/source/file.py index aaee7a3a161c84..c3b2a99632daf5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/file.py +++ b/metadata-ingestion/src/datahub/ingestion/source/file.py @@ -32,8 +32,8 @@ ) from datahub.ingestion.api.source_helpers import auto_workunit_reporter from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.fs.fs_base import FileInfo, get_path_schema -from datahub.ingestion.source.fs.fs_registry import fs_registry +from datahub.ingestion.fs.fs_base import FileInfo, get_path_schema +from datahub.ingestion.fs.fs_registry import fs_registry from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( MetadataChangeEvent, MetadataChangeProposal, @@ -186,11 +186,11 @@ def get_filenames(self) -> Iterable[FileInfo]: schema = get_path_schema(path_str) fs_class = fs_registry.get(schema) fs = fs_class.create() - for file_status in fs.list(path_str): - if file_status.is_file and file_status.path.endswith( + for file_info in fs.list(path_str): + if file_info.is_file and file_info.path.endswith( self.config.file_extension ): - yield file_status + yield file_info def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: # No super() call, as we don't want helpers that create / remove workunits diff --git a/metadata-ingestion/src/datahub/ingestion/source/fs/fs_registry.py b/metadata-ingestion/src/datahub/ingestion/source/fs/fs_registry.py deleted file mode 100644 index 7ef4bd30e19ab2..00000000000000 --- a/metadata-ingestion/src/datahub/ingestion/source/fs/fs_registry.py +++ /dev/null @@ -1,5 +0,0 @@ -from datahub.ingestion.api.registry import PluginRegistry -from datahub.ingestion.source.fs.fs_base import FileSystem - -fs_registry = PluginRegistry[FileSystem]() -fs_registry.register_from_entrypoint("datahub.ingestion.fs.plugins") diff --git a/metadata-ingestion/src/datahub/ingestion/source/fs/s3_list_iterator.py b/metadata-ingestion/src/datahub/ingestion/source/fs/s3_list_iterator.py deleted file mode 100644 index b1a6c63e856a03..00000000000000 --- a/metadata-ingestion/src/datahub/ingestion/source/fs/s3_list_iterator.py +++ /dev/null @@ -1,48 +0,0 @@ -from collections.abc import Iterator -from typing import Any - -from datahub.ingestion.source.fs import s3_fs -from datahub.ingestion.source.fs.fs_base import FileInfo - - -class S3ListIterator(Iterator): - - MAX_KEYS = 1000 - - def __init__( - self, s3_client: Any, bucket: str, prefix: str, max_keys: int = MAX_KEYS - ) -> None: - self._s3 = s3_client - self._bucket = bucket - self._prefix = prefix - self._max_keys = max_keys - self._file_statuses: Iterator = iter([]) - self._token = "" - self.fetch() - - def __next__(self) -> FileInfo: - try: - return next(self._file_statuses) - except StopIteration: - if self._token: - self.fetch() - return next(self._file_statuses) - else: - raise StopIteration() - - def fetch(self): - params = dict(Bucket=self._bucket, Prefix=self._prefix, MaxKeys=self._max_keys) - if self._token: - params.update(ContinuationToken=self._token) - - response = self._s3.list_objects_v2(**params) - - s3_fs.assert_ok_status(response) - - self._file_statuses = iter( - [ - FileInfo(f"s3://{response['Name']}/{x['Key']}", x["Size"], is_file=True) - for x in response.get("Contents", []) - ] - ) - self._token = response.get("NextContinuationToken") diff --git a/metadata-ingestion/tests/unit/test_plugin_system.py b/metadata-ingestion/tests/unit/test_plugin_system.py index 4d1ebce2be849f..0e12416325bf9a 100644 --- a/metadata-ingestion/tests/unit/test_plugin_system.py +++ b/metadata-ingestion/tests/unit/test_plugin_system.py @@ -7,6 +7,7 @@ from datahub.ingestion.api.registry import PluginRegistry from datahub.ingestion.api.sink import Sink from datahub.ingestion.extractor.extractor_registry import extractor_registry +from datahub.ingestion.fs.fs_registry import fs_registry from datahub.ingestion.reporting.reporting_provider_registry import ( reporting_provider_registry, ) @@ -54,6 +55,7 @@ (reporting_provider_registry, ["datahub", "file"]), (ingestion_checkpoint_provider_registry, ["datahub"]), (lite_registry, ["duckdb"]), + (fs_registry, ["file", "http", "s3"]), ], ) def test_registry_defaults(registry: PluginRegistry, expected: List[str]) -> None: