diff --git a/src/monty/io.py b/src/monty/io.py index 52bf2280..7e698d11 100644 --- a/src/monty/io.py +++ b/src/monty/io.py @@ -54,6 +54,40 @@ def zopen(filename: Union[str, Path], *args, **kwargs) -> IO: return open(filename, *args, **kwargs) +def _get_line_ending( + file: str | Path | io.TextIOWrapper, +) -> Literal["\r\n", "\n", "\r"]: + """Helper function to get line ending of a file. + + This function assumes the file has a single consistent line ending. + + Returns: + "\n": Unix line ending. + "\r\n": Windows line ending. + "\r": Classic MacOS line ending. + + Raises: + ValueError: If file is empty or line ending is unknown. + """ + if isinstance(file, (str, Path)): + with open(file, "rb") as f: + first_line = f.readline() + elif isinstance(file, io.TextIOWrapper): + first_line = file.buffer.readline() + + if not first_line: + raise ValueError("empty file.") + + if first_line.endswith(b"\r\n"): + return "\r\n" + elif first_line.endswith(b"\n"): + return "\n" + elif first_line.endswith(b"\r"): + return "\r" + else: + raise ValueError(f"Unknown line ending in file {repr(first_line)}.") + + def reverse_readfile( filename: Union[str, Path], l_end: Literal["AUTO"] | str = "AUTO", diff --git a/tests/test_io.py b/tests/test_io.py index 9daa17be..d55b9ad9 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -8,6 +8,7 @@ from monty.io import ( FileLock, FileLockException, + _get_line_ending, reverse_readfile, reverse_readline, zopen, @@ -17,6 +18,43 @@ TEST_DIR = os.path.join(os.path.dirname(__file__), "test_files") +class TestGetLineEnding: + @pytest.mark.parametrize("l_end", ["\n", "\r\n", "\r"]) + def test_get_line_ending(self, l_end): + """Test file with: + Unix line ending (\n) + Windows line ending (\r\n) + Classic MacOS line ending (\r) + """ + with ScratchDir("."): + test_file = "test_file.txt" + with open(test_file, "wb") as f: + f.write(f"This is a test{l_end}Second line{l_end}".encode()) + + assert _get_line_ending(test_file) == l_end + assert _get_line_ending(Path(test_file)) == l_end + + with open(test_file, "r") as f: + assert _get_line_ending(f) == l_end + + def test_empty_file(self): + with ScratchDir("."): + test_file = "empty_file.txt" + open(test_file, "w").close() + + with pytest.raises(ValueError, match="empty file"): + _get_line_ending(test_file) + + def test_unknown_line_ending(self): + with ScratchDir("."): + test_file = "test_unknown.txt" + with open(test_file, "wb") as f: + f.write(b"This is a test\036") + + with pytest.raises(ValueError, match="Unknown line ending"): + _get_line_ending(test_file) + + class TestReverseReadline: NUMLINES = 3000