Skip to content

Commit

Permalink
feat(ingest): add and use file system abstraction in file source (dat…
Browse files Browse the repository at this point in the history
…ahub-project#8415)

Co-authored-by: oleksandrsimonchuk <[email protected]>
Co-authored-by: Harshal Sheth <[email protected]>
Co-authored-by: Tamas Nemeth <[email protected]>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
5 people authored Jul 1, 2024
1 parent b6ec52b commit 8b4e302
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 102 deletions.
5 changes: 5 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,11 @@
"file = datahub.ingestion.reporting.file_reporter:FileReporter",
],
"datahub.custom_packages": [],
"datahub.fs.plugins": [
"s3 = datahub.ingestion.fs.s3_fs:S3FileSystem",
"file = datahub.ingestion.fs.local_fs:LocalFileSystem",
"http = datahub.ingestion.fs.http_fs:HttpFileSystem",
],
}


Expand Down
Empty file.
40 changes: 40 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/fs_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Any, Iterable
from urllib import parse


@dataclass
class FileInfo:
path: str
size: int
is_file: bool

def __str__(self):
return f"FileInfo({self.path}, {self.size}, {self.is_file})"


class FileSystem(metaclass=ABCMeta):
@classmethod
def create(cls, **kwargs: Any) -> "FileSystem":
raise NotImplementedError('File system implementations must implement "create"')

@abstractmethod
def open(self, path: str, **kwargs: Any) -> Any:
pass

@abstractmethod
def file_status(self, path: str) -> FileInfo:
pass

@abstractmethod
def list(self, path: str) -> Iterable[FileInfo]:
pass


def get_path_schema(path: str) -> str:
scheme = parse.urlparse(path).scheme
if scheme == "":
# This makes the default schema "file" for local paths.
scheme = "file"
return scheme
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/fs_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from datahub.ingestion.api.registry import PluginRegistry
from datahub.ingestion.fs.fs_base import FileSystem

fs_registry = PluginRegistry[FileSystem]()
fs_registry.register_from_entrypoint("datahub.fs.plugins")
28 changes: 28 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/http_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Any, Iterable

import requests
import smart_open

from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


class HttpFileSystem(FileSystem):
@classmethod
def create(cls, **kwargs):
return HttpFileSystem()

def open(self, path: str, **kwargs: Any) -> Any:
return smart_open.open(path, mode="rb", transport_params=kwargs)

def file_status(self, path: str) -> FileInfo:
head = requests.head(path)
if head.ok:
return FileInfo(path, int(head.headers["Content-length"]), is_file=True)
elif head.status_code == 404:
raise FileNotFoundError(f"Requested path {path} does not exist.")
else:
raise IOError(f"Cannot get file status for the requested path {path}.")

def list(self, path: str) -> Iterable[FileInfo]:
status = self.file_status(path)
return [status]
29 changes: 29 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/local_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import pathlib
from typing import Any, Iterable

from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


class LocalFileSystem(FileSystem):
@classmethod
def create(cls, **kwargs):
return LocalFileSystem()

def open(self, path: str, **kwargs: Any) -> Any:
# Local does not support any additional kwargs
assert not kwargs
return pathlib.Path(path).open(mode="rb")

def list(self, path: str) -> Iterable[FileInfo]:
p = pathlib.Path(path)
if p.is_file():
return [self.file_status(path)]
else:
return iter([self.file_status(str(x)) for x in p.iterdir()])

def file_status(self, path: str) -> FileInfo:
if os.path.isfile(path):
return FileInfo(path, os.path.getsize(path), is_file=True)
else:
return FileInfo(path, 0, is_file=False)
108 changes: 108 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from collections.abc import Iterator
from dataclasses import dataclass
from typing import Any, Iterable
from urllib.parse import urlparse

import boto3
import smart_open

from datahub.ingestion.fs import s3_fs
from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


def parse_s3_path(path: str) -> "S3Path":
parsed = urlparse(path)
return S3Path(parsed.netloc, parsed.path.lstrip("/"))


def assert_ok_status(s3_response):
is_ok = s3_response["ResponseMetadata"]["HTTPStatusCode"] == 200
assert (
is_ok
), f"Failed to fetch S3 object, error message: {s3_response['Error']['Message']}"


@dataclass
class S3Path:
bucket: str
key: str

def __str__(self):
return f"S3Path({self.bucket}, {self.key})"


class S3ListIterator(Iterator):

MAX_KEYS = 1000

def __init__(
self, s3_client: Any, bucket: str, prefix: str, max_keys: int = MAX_KEYS
) -> None:
self._s3 = s3_client
self._bucket = bucket
self._prefix = prefix
self._max_keys = max_keys
self._file_statuses: Iterator = iter([])
self._token = ""
self.fetch()

def __next__(self) -> FileInfo:
try:
return next(self._file_statuses)
except StopIteration:
if self._token:
self.fetch()
return next(self._file_statuses)
else:
raise StopIteration()

def fetch(self):
params = dict(Bucket=self._bucket, Prefix=self._prefix, MaxKeys=self._max_keys)
if self._token:
params.update(ContinuationToken=self._token)

response = self._s3.list_objects_v2(**params)

s3_fs.assert_ok_status(response)

self._file_statuses = iter(
[
FileInfo(f"s3://{response['Name']}/{x['Key']}", x["Size"], is_file=True)
for x in response.get("Contents", [])
]
)
self._token = response.get("NextContinuationToken")


class S3FileSystem(FileSystem):
def __init__(self, **kwargs):
self.s3 = boto3.client("s3", **kwargs)

@classmethod
def create(cls, **kwargs):
return S3FileSystem(**kwargs)

def open(self, path: str, **kwargs: Any) -> Any:
transport_params = kwargs.update({"client": self.s3})
return smart_open.open(path, mode="rb", transport_params=transport_params)

def file_status(self, path: str) -> FileInfo:
s3_path = parse_s3_path(path)
try:
response = self.s3.get_object_attributes(
Bucket=s3_path.bucket, Key=s3_path.key, ObjectAttributes=["ObjectSize"]
)
assert_ok_status(response)
return FileInfo(path, response["ObjectSize"], is_file=True)
except Exception as e:
if (
hasattr(e, "response")
and e.response["ResponseMetadata"]["HTTPStatusCode"] == 404
):
return FileInfo(path, 0, is_file=False)
else:
raise e

def list(self, path: str) -> Iterable[FileInfo]:
s3_path = parse_s3_path(path)
return S3ListIterator(self.s3, s3_path.bucket, s3_path.key)
Loading

0 comments on commit 8b4e302

Please sign in to comment.