Skip to content

Commit

Permalink
Python: Create HadoopFileSystem from netloc (merge request !1060)
Browse files Browse the repository at this point in the history
Pyiceberg支持跨HDFS Cluster访问,用于访问线上iceberg表

现有的pyarrow模块根据catalog配置来初始化HadoopFileSystem,

```
catalog:
  ice:
    hdfs.host: xxxx
    hdfs.port: xxx
```

如果实际访问的Cluster不是catalog配置的cluster,会触发Wrong FS异常

java.lang.IllegalArgumentException:
        Wrong FS: hdfs://xxxx expected: hdfs://yyyy

改成优先采用制定的uri初始化,

线上测试

TAPD: --story=887319139
  • Loading branch information
frankliee committed Sep 20, 2023
1 parent e88d0f4 commit ed78cb3
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 31 deletions.
36 changes: 19 additions & 17 deletions python/pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,24 +284,24 @@ def to_input_file(self) -> PyArrowFile:


class PyArrowFileIO(FileIO):
fs_by_scheme: Callable[[str], FileSystem]
fs_by_scheme: Callable[[str, Optional[str]], FileSystem]

def __init__(self, properties: Properties = EMPTY_DICT):
self.fs_by_scheme: Callable[[str], FileSystem] = lru_cache(self._initialize_fs)
self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = lru_cache(self._initialize_fs)
super().__init__(properties=properties)

@staticmethod
def parse_location(location: str) -> Tuple[str, str]:
def parse_location(location: str) -> Tuple[str, str, str]:
"""Return the path without the scheme."""
uri = urlparse(location)
if not uri.scheme:
return "file", os.path.abspath(location)
return "file", uri.netloc, os.path.abspath(location)
elif uri.scheme == "hdfs":
return uri.scheme, location
return uri.scheme, uri.netloc, location
else:
return uri.scheme, f"{uri.netloc}{uri.path}"
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

def _initialize_fs(self, scheme: str) -> FileSystem:
def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
if scheme in {"s3", "s3a", "s3n"}:
from pyarrow.fs import S3FileSystem

Expand All @@ -321,6 +321,8 @@ def _initialize_fs(self, scheme: str) -> FileSystem:
from pyarrow.fs import HadoopFileSystem

hdfs_kwargs: Dict[str, Any] = {}
if netloc:
return HadoopFileSystem.from_uri(f"hdfs://{netloc}")
if host := self.properties.get(HDFS_HOST):
hdfs_kwargs["host"] = host
if port := self.properties.get(HDFS_PORT):
Expand Down Expand Up @@ -364,9 +366,9 @@ def new_input(self, location: str) -> PyArrowFile:
Returns:
PyArrowFile: A PyArrowFile instance for the given location.
"""
scheme, path = self.parse_location(location)
scheme, netloc, path = self.parse_location(location)
return PyArrowFile(
fs=self.fs_by_scheme(scheme),
fs=self.fs_by_scheme(scheme, netloc),
location=location,
path=path,
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
Expand All @@ -381,9 +383,9 @@ def new_output(self, location: str) -> PyArrowFile:
Returns:
PyArrowFile: A PyArrowFile instance for the given location.
"""
scheme, path = self.parse_location(location)
scheme, netloc, path = self.parse_location(location)
return PyArrowFile(
fs=self.fs_by_scheme(scheme),
fs=self.fs_by_scheme(scheme, netloc),
location=location,
path=path,
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
Expand All @@ -402,8 +404,8 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
an AWS error code 15.
"""
str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
scheme, path = self.parse_location(str_location)
fs = self.fs_by_scheme(scheme)
scheme, netloc, path = self.parse_location(str_location)
fs = self.fs_by_scheme(scheme, netloc)

try:
fs.delete_file(path)
Expand Down Expand Up @@ -575,7 +577,7 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi


def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment:
_, path = PyArrowFileIO.parse_location(data_file.file_path)
_, _, path = PyArrowFileIO.parse_location(data_file.file_path)
return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs)


Expand Down Expand Up @@ -797,7 +799,7 @@ def _task_to_table(
if limit and sum(row_counts) >= limit:
return None

_, path = PyArrowFileIO.parse_location(task.file.file_path)
_, _, path = PyArrowFileIO.parse_location(task.file.file_path)
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
with fs.open_input_file(path) as fin:
fragment = arrow_format.make_fragment(fin)
Expand Down Expand Up @@ -906,9 +908,9 @@ def project_table(
Raises:
ResolveError: When an incompatible query is done.
"""
scheme, _ = PyArrowFileIO.parse_location(table.location())
scheme, netloc, _ = PyArrowFileIO.parse_location(table.location())
if isinstance(table.io, PyArrowFileIO):
fs = table.io.fs_by_scheme(scheme)
fs = table.io.fs_by_scheme(scheme, netloc)
else:
try:
from pyiceberg.io.fsspec import FsspecFileIO
Expand Down
27 changes: 13 additions & 14 deletions python/tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1529,17 +1529,16 @@ def test_writing_avro_file_gcs(generated_manifest_entry_file: str, pyarrow_filei
pyarrow_fileio_gcs.delete(f"gs://warehouse/{filename}")


def test_parse_hdfs_location() -> None:
locations = ["hdfs://127.0.0.1:9000/root/foo.txt", "hdfs://127.0.0.1/root/foo.txt"]
for location in locations:
schema, path = PyArrowFileIO.parse_location(location)
assert schema == "hdfs"
assert location == path


def test_parse_local_location() -> None:
locations = ["/root/foo.txt", "/root/tmp/foo.txt"]
for location in locations:
schema, path = PyArrowFileIO.parse_location(location)
assert schema == "file"
assert location == path
def test_parse_location() -> None:
def check_results(location: str, expected_schema: str, expected_netloc: str, expected_uri: str) -> None:
schema, netloc, uri = PyArrowFileIO.parse_location(location)
assert schema == expected_schema
assert netloc == expected_netloc
assert uri == expected_uri

check_results("hdfs://127.0.0.1:9000/root/foo.txt", "hdfs", "127.0.0.1:9000", "hdfs://127.0.0.1:9000/root/foo.txt")
check_results("hdfs://127.0.0.1/root/foo.txt", "hdfs", "127.0.0.1", "hdfs://127.0.0.1/root/foo.txt")
check_results("hdfs://clusterA/root/foo.txt", "hdfs", "clusterA", "hdfs://clusterA/root/foo.txt")

check_results("/root/foo.txt", "file", "", "/root/foo.txt")
check_results("/root/tmp/foo.txt", "file", "", "/root/tmp/foo.txt")

0 comments on commit ed78cb3

Please sign in to comment.