Skip to content

Commit

Permalink
move out of ingestion.source.fs -> ingestion.fs + add test + remove s…
Browse files Browse the repository at this point in the history
…mart_open dep for local
  • Loading branch information
hsheth2 committed Feb 9, 2024
1 parent 8d01a6c commit da4ce74
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 72 deletions.
12 changes: 5 additions & 7 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,7 @@

powerbi_report_server = {"requests", "requests_ntlm"}

slack = {
"slack-sdk==3.18.1"
}
slack = {"slack-sdk==3.18.1"}

databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
Expand Down Expand Up @@ -671,10 +669,10 @@
"file = datahub.ingestion.reporting.file_reporter:FileReporter",
],
"datahub.custom_packages": [],
"datahub.ingestion.fs.plugins": [
"s3 = datahub.ingestion.source.fs.s3_fs:S3FileSystem",
"file = datahub.ingestion.source.fs.local_fs:LocalFileSystem",
"http = datahub.ingestion.source.fs.http_fs:HttpFileSystem",
"datahub.fs.plugins": [
"s3 = datahub.ingestion.fs.s3_fs:S3FileSystem",
"file = datahub.ingestion.fs.local_fs:LocalFileSystem",
"http = datahub.ingestion.fs.http_fs:HttpFileSystem",
],
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ def list(self, path: str) -> Iterable[FileInfo]:
def get_path_schema(path: str) -> str:
scheme = parse.urlparse(path).scheme
if scheme == "":
# This makes the default schema "file" for local paths.
scheme = "file"
return scheme
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/fs_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from datahub.ingestion.api.registry import PluginRegistry
from datahub.ingestion.fs.fs_base import FileSystem

fs_registry = PluginRegistry[FileSystem]()
fs_registry.register_from_entrypoint("datahub.fs.plugins")
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import requests
import smart_open

from datahub.ingestion.source.fs.fs_base import FileInfo, FileSystem
from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


class HttpFileSystem(FileSystem):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import pathlib
from typing import Any, Iterable

import smart_open

from datahub.ingestion.source.fs.fs_base import FileInfo, FileSystem
from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


class LocalFileSystem(FileSystem):
Expand All @@ -13,7 +11,7 @@ def create(cls, **kwargs):
return LocalFileSystem()

def open(self, path: str, **kwargs: Any) -> Any:
return smart_open.open(path, mode="rb", transport_params=kwargs)
return pathlib.Path(path).open(mode="rb", transport_params=kwargs)

def list(self, path: str) -> Iterable[FileInfo]:
p = pathlib.Path(path)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from collections.abc import Iterator
from dataclasses import dataclass
from typing import Any, Iterable
from urllib.parse import urlparse

import boto3
import smart_open

from datahub.ingestion.source.fs.fs_base import FileInfo, FileSystem
from datahub.ingestion.source.fs.s3_list_iterator import S3ListIterator
from datahub.ingestion.fs import s3_fs
from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


def parse_s3_path(path: str) -> "S3Path":
Expand All @@ -30,6 +31,49 @@ def __str__(self):
return f"S3Path({self.bucket}, {self.key})"


class S3ListIterator(Iterator):

MAX_KEYS = 1000

def __init__(
self, s3_client: Any, bucket: str, prefix: str, max_keys: int = MAX_KEYS
) -> None:
self._s3 = s3_client
self._bucket = bucket
self._prefix = prefix
self._max_keys = max_keys
self._file_statuses: Iterator = iter([])
self._token = ""
self.fetch()

def __next__(self) -> FileInfo:
try:
return next(self._file_statuses)
except StopIteration:
if self._token:
self.fetch()
return next(self._file_statuses)
else:
raise StopIteration()

def fetch(self):
params = dict(Bucket=self._bucket, Prefix=self._prefix, MaxKeys=self._max_keys)
if self._token:
params.update(ContinuationToken=self._token)

response = self._s3.list_objects_v2(**params)

s3_fs.assert_ok_status(response)

self._file_statuses = iter(
[
FileInfo(f"s3://{response['Name']}/{x['Key']}", x["Size"], is_file=True)
for x in response.get("Contents", [])
]
)
self._token = response.get("NextContinuationToken")


class S3FileSystem(FileSystem):
def __init__(self, **kwargs):
self.s3 = boto3.client("s3", **kwargs)
Expand Down
10 changes: 5 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
)
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.fs.fs_base import FileInfo, get_path_schema
from datahub.ingestion.source.fs.fs_registry import fs_registry
from datahub.ingestion.fs.fs_base import FileInfo, get_path_schema
from datahub.ingestion.fs.fs_registry import fs_registry
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
Expand Down Expand Up @@ -186,11 +186,11 @@ def get_filenames(self) -> Iterable[FileInfo]:
schema = get_path_schema(path_str)
fs_class = fs_registry.get(schema)
fs = fs_class.create()
for file_status in fs.list(path_str):
if file_status.is_file and file_status.path.endswith(
for file_info in fs.list(path_str):
if file_info.is_file and file_info.path.endswith(
self.config.file_extension
):
yield file_status
yield file_info

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
# No super() call, as we don't want helpers that create / remove workunits
Expand Down

This file was deleted.

This file was deleted.

2 changes: 2 additions & 0 deletions metadata-ingestion/tests/unit/test_plugin_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datahub.ingestion.api.registry import PluginRegistry
from datahub.ingestion.api.sink import Sink
from datahub.ingestion.extractor.extractor_registry import extractor_registry
from datahub.ingestion.fs.fs_registry import fs_registry
from datahub.ingestion.reporting.reporting_provider_registry import (
reporting_provider_registry,
)
Expand Down Expand Up @@ -54,6 +55,7 @@
(reporting_provider_registry, ["datahub", "file"]),
(ingestion_checkpoint_provider_registry, ["datahub"]),
(lite_registry, ["duckdb"]),
(fs_registry, ["file", "http", "s3"]),
],
)
def test_registry_defaults(registry: PluginRegistry, expected: List[str]) -> None:
Expand Down

0 comments on commit da4ce74

Please sign in to comment.