From a8703b9b2005ff60708a16ca001b8da530cfc37a Mon Sep 17 00:00:00 2001 From: Uchechukwu Orji Date: Sat, 13 Apr 2024 11:56:05 +0100 Subject: [PATCH 1/3] Replace usage of os.path and path.py with pathlib --- CHANGELOG.md | 3 +- src/gutenberg2zim/constants.py | 2 +- src/gutenberg2zim/database.py | 2 +- src/gutenberg2zim/download.py | 94 ++++++++++--------- src/gutenberg2zim/entrypoint.py | 26 ++++-- src/gutenberg2zim/export.py | 160 ++++++++++++++++---------------- src/gutenberg2zim/rdf.py | 44 +++++---- src/gutenberg2zim/s3.py | 31 ++++--- src/gutenberg2zim/urls.py | 59 ++++++------ src/gutenberg2zim/utils.py | 40 ++++---- src/gutenberg2zim/zim.py | 52 ++++++----- 11 files changed, 266 insertions(+), 247 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 243459f..041444d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ as of 2.0.0. ### Changed - Insert as few rsync URLs as possible in DB when a book selection is made (#220) +- Replace usage of os.path and path.py with pathlib.Path (#195) ### Fixed @@ -102,7 +103,7 @@ as of 2.0.0. ## [1.1.6] - removed duplicate dependencies -- Added tag _category:gutenberg which was missing +- Added tag \_category:gutenberg which was missing - docker-only release with updated zimwriterfs (2.1.0-1) ## [1.1.5] diff --git a/src/gutenberg2zim/constants.py b/src/gutenberg2zim/constants.py index 659b925..6d2301a 100644 --- a/src/gutenberg2zim/constants.py +++ b/src/gutenberg2zim/constants.py @@ -21,4 +21,4 @@ logger = getLogger(NAME, level=logging.INFO) TMP_FOLDER = "tmp" -TMP_FOLDER_PATH = pathlib.Path(TMP_FOLDER) +TMP_FOLDER_PATH = pathlib.Path(TMP_FOLDER).resolve() diff --git a/src/gutenberg2zim/database.py b/src/gutenberg2zim/database.py index fa68bb1..9bf8f31 100644 --- a/src/gutenberg2zim/database.py +++ b/src/gutenberg2zim/database.py @@ -211,7 +211,7 @@ def load_fixtures(model): logger.debug(f"[fixtures] Created {f}") -def setup_database(*, wipe=False): +def setup_database(*, wipe: bool = False) -> None: logger.info("Setting up the database") for model in (License, Author, Book, BookFormat, Url): diff --git a/src/gutenberg2zim/download.py b/src/gutenberg2zim/download.py index 3a38c89..b579a51 100644 --- a/src/gutenberg2zim/download.py +++ b/src/gutenberg2zim/download.py @@ -1,17 +1,15 @@ -import os -import pathlib import shutil import tempfile import zipfile from multiprocessing.dummy import Pool +from pathlib import Path from pprint import pprint as pp import apsw import backoff from kiwixstorage import KiwixStorage -from path import Path -from gutenberg2zim.constants import TMP_FOLDER, logger +from gutenberg2zim.constants import TMP_FOLDER_PATH, logger from gutenberg2zim.database import Book, BookFormat from gutenberg2zim.export import fname_for, get_list_of_filtered_books from gutenberg2zim.s3 import download_from_cache @@ -36,24 +34,24 @@ # return False -def handle_zipped_epub(zippath, book, dst_dir: pathlib.Path): +def handle_zipped_epub(zippath: Path, book: Book, dst_dir: Path) -> bool: def clfn(fn): - return os.path.join(*os.path.split(fn)[1:]) + return Path(fn).name def is_safe(fname): - fname = ensure_unicode(clfn(fname)) - if Path(fname).basename() == fname: + name = ensure_unicode(clfn(fname)) + if Path(fname).name == name: return True - return fname == os.path.join("images", Path(fname).splitpath()[-1]) + return fname == f"images/{Path(fname).name}" zipped_files = [] # create temp directory to extract to - tmpd = tempfile.mkdtemp(dir=TMP_FOLDER) + tmpd = tempfile.mkdtemp(dir=TMP_FOLDER_PATH) try: with zipfile.ZipFile(zippath, "r") as zf: # check that there is no insecure data (absolute names) if sum([1 for n in zf.namelist() if not is_safe(ensure_unicode(n))]): - Path(tmpd).rmtree_p() + shutil.rmtree(tmpd, ignore_errors=True) return False # zipped_files = [clfn(fn) for fn in zf.namelist()] zipped_files = zf.namelist() @@ -64,7 +62,7 @@ def is_safe(fname): # file is not a zip file when it should be. # don't process it anymore as we don't know what to do. # could this be due to an incorrect/incomplete download? - return + return False # is there multiple HTML files in ZIP ? (rare) mhtml = ( @@ -73,25 +71,26 @@ def is_safe(fname): # move all extracted files to proper locations for zipped_file in zipped_files: # skip folders - if not Path(zipped_file).ext: + if not Path(zipped_file).resolve().is_file(): continue - src = os.path.join(tmpd, zipped_file) - if os.path.exists(src): - fname = Path(zipped_file).basename() + src = (Path(tmpd) / zipped_file).resolve() + if src.exists(): + fname = Path(zipped_file).name if fname.endswith(".html") or fname.endswith(".htm"): if mhtml: if fname.startswith(f"{book.id}-h."): - dst = dst_dir.joinpath(f"{book.id}.html") + dst = dst_dir / f"{book.id}.html" else: - dst = dst_dir.joinpath(f"{book.id}_{fname}") + dst = dst_dir / f"{book.id}_{fname}" else: - dst = dst_dir.joinpath(f"{book.id}.html") + dst = dst_dir / f"{book.id}.html" else: - dst = dst_dir.joinpath(f"{book.id}_{fname}") + dst = dst_dir / f"{book.id}_{fname}" + dst = dst.resolve() try: - Path(src).move(str(dst)) + src.rename(dst) except Exception as e: import traceback @@ -100,14 +99,14 @@ def is_safe(fname): raise # delete temp directory and zipfile - if Path(zippath).exists(): - os.unlink(zippath) - Path(tmpd).rmtree_p() + zippath.unlink(missing_ok=True) + shutil.rmtree(tmpd, ignore_errors=True) + return True def download_book( book: Book, - download_cache: str, + download_cache: Path, formats: list[str], *, force: bool, @@ -124,13 +123,15 @@ def download_book( if "html" not in formats: formats.append("html") - book_dir = pathlib.Path(download_cache).joinpath(str(book.id)) - optimized_dir = book_dir.joinpath("optimized") - unoptimized_dir = book_dir.joinpath("unoptimized") + book_dir = download_cache / str(book.id) + optimized_dir = book_dir / "optimized" + unoptimized_dir = book_dir / "unoptimized" + unsuccessful_formats = [] for book_format in formats: - unoptimized_fpath = unoptimized_dir.joinpath(fname_for(book, book_format)) - optimized_fpath = optimized_dir.joinpath(archive_name_for(book, book_format)) + unoptimized_fpath = unoptimized_dir / fname_for(book, book_format) + unoptimized_fpath = unoptimized_dir / fname_for(book, book_format) + optimized_fpath = optimized_dir / archive_name_for(book, book_format) # check if already downloaded if (unoptimized_fpath.exists() or optimized_fpath.exists()) and not force: @@ -141,12 +142,10 @@ def download_book( if book_format == "html": for fpath in book_dir.iterdir(): if fpath.is_file() and fpath.suffix not in [".pdf", ".epub"]: - fpath.unlink() + fpath.unlink(missing_ok=True) else: - if unoptimized_fpath.exists(): - unoptimized_fpath.unlink() - if optimized_fpath.exists(): - optimized_fpath.unlink() + unoptimized_fpath.unlink(missing_ok=True) + optimized_fpath.unlink(missing_ok=True) # delete dirs which are empty for dir_name in [optimized_dir, unoptimized_dir]: if not dir_name.exists(): @@ -233,7 +232,7 @@ def download_book( # HTML files are *sometime* available as ZIP files if url.endswith(".zip"): - zpath = unoptimized_dir.joinpath(f"{fname_for(book, book_format)}.zip") + zpath = unoptimized_dir / f"{fname_for(book, book_format)}.zip" etag = get_etag_from_url(url) if s3_storage: @@ -254,7 +253,11 @@ def download_book( book.html_etag = etag # type: ignore book.save() # extract zipfile - handle_zipped_epub(zippath=zpath, book=book, dst_dir=unoptimized_dir) + handle_zipped_epub( + zippath=zpath, + book=book, + dst_dir=unoptimized_dir, + ) else: if ( url.endswith(".htm") @@ -329,10 +332,9 @@ def download_cover(book, book_dir, s3_storage, optimizer_version): etag = get_etag_from_url(url) downloaded_from_cache = False cover = f"{book.id}_cover_image.jpg" - if ( - book_dir.joinpath("optimized").joinpath(cover).exists() - or book_dir.joinpath("unoptimized").joinpath(cover).exists() - ): + if (book_dir / "optimized" / cover).exists() or ( + book_dir / "unoptimized" / cover + ).exists(): logger.debug(f"Cover already exists for book #{book.id}") return if s3_storage: @@ -343,13 +345,13 @@ def download_cover(book, book_dir, s3_storage, optimizer_version): book=book, etag=etag, book_format="cover", - dest_dir=book_dir.joinpath("optimized"), + dest_dir=book_dir / "optimized", s3_storage=s3_storage, optimizer_version=optimizer_version, ) if not downloaded_from_cache: logger.debug(f"Downloading {url}") - if download_file(url, book_dir.joinpath("unoptimized").joinpath(cover)): + if download_file(url, book_dir / "unoptimized" / cover): book.cover_etag = etag book.save() else: @@ -357,11 +359,11 @@ def download_cover(book, book_dir, s3_storage, optimizer_version): def download_all_books( - download_cache: str, + download_cache: Path, concurrency: int, languages: list[str], formats: list[str], - only_books: list[str], + only_books: list[int], *, force: bool, s3_storage: KiwixStorage | None, @@ -372,7 +374,7 @@ def download_all_books( ) # ensure dir exist - Path(download_cache).mkdir_p() + download_cache.mkdir(parents=True, exist_ok=True) def backoff_busy_error_hdlr(details): logger.warning( diff --git a/src/gutenberg2zim/entrypoint.py b/src/gutenberg2zim/entrypoint.py index 2eec09d..3aeb0b9 100755 --- a/src/gutenberg2zim/entrypoint.py +++ b/src/gutenberg2zim/entrypoint.py @@ -1,9 +1,8 @@ import logging -import os import sys +from pathlib import Path from docopt import docopt -from path import Path from gutenberg2zim.checkdeps import check_dependencies from gutenberg2zim.constants import TMP_FOLDER_PATH, VERSION, logger @@ -94,7 +93,12 @@ def main(): arguments.get("--rdf-url") or "http://www.gutenberg.org/cache/epub/feeds/rdf-files.tar.bz2" ) - dl_cache = arguments.get("--dl-folder") or os.path.join("dl-cache") + + if dl_folder := arguments.get("--dl-folder"): + dl_cache = Path(dl_folder).resolve() + else: + dl_cache = Path("dl-cache").resolve() + books_csv = arguments.get("--books") or "" zim_title = arguments.get("--zim-title") zim_desc = arguments.get("--zim-desc") @@ -141,7 +145,7 @@ def main(): } ) - books = [] + books: list[int] = [] try: books_csv = books_csv.split(",") @@ -151,7 +155,7 @@ def f(x): for i in books_csv: blst = f(i) if len(blst) > 1: - blst = range(blst[0], blst[1] + 1) + blst = list(range(blst[0], blst[1] + 1)) books.extend(blst) books_csv = list(set(books)) except Exception as e: @@ -218,16 +222,17 @@ def f(x): for zim_lang in zims: if do_zim: logger.info("BUILDING ZIM dynamically") + if one_lang_one_zim_folder: + output_folder = Path(one_lang_one_zim_folder).resolve() + else: + output_folder = Path(".").resolve() build_zimfile( - output_folder=Path(one_lang_one_zim_folder or ".").abspath(), + output_folder=output_folder, download_cache=dl_cache, concurrency=concurrency, languages=zim_lang, formats=formats, only_books=books, - force=force, - title_search=title_search, - add_bookshelves=bookshelves, s3_storage=s3_storage, optimizer_version=optimizer_version, zim_name=Path(zim_name).name if zim_name else None, @@ -235,4 +240,7 @@ def f(x): description=zim_desc, stats_filename=stats_filename, publisher=publisher, + force=force, + title_search=title_search, + add_bookshelves=bookshelves, ) diff --git a/src/gutenberg2zim/export.py b/src/gutenberg2zim/export.py index 20b0e63..dd45593 100644 --- a/src/gutenberg2zim/export.py +++ b/src/gutenberg2zim/export.py @@ -1,23 +1,22 @@ import json -import os -import pathlib import shutil import tempfile import traceback import urllib.parse import zipfile from multiprocessing.dummy import Pool +from pathlib import Path import bs4 from bs4 import BeautifulSoup from jinja2 import Environment, PackageLoader -from path import Path +from kiwixstorage import KiwixStorage from schedule import every from six import text_type from zimscraperlib.image.transformation import resize_image import gutenberg2zim -from gutenberg2zim.constants import TMP_FOLDER, TMP_FOLDER_PATH, logger +from gutenberg2zim.constants import TMP_FOLDER_PATH, logger from gutenberg2zim.database import Author, Book, BookFormat from gutenberg2zim.iso639 import language_name from gutenberg2zim.l10n import l10n_strings @@ -94,8 +93,8 @@ def save_bs_output(soup, fpath, encoding=UTF8): jinja_env.filters["urlencode"] = urlencode -def tmpl_path(): - return os.path.join(Path(gutenberg2zim.__file__).parent, "templates") +def tmpl_path() -> Path: + return (Path(gutenberg2zim.__file__).parent / "templates").resolve() def get_list_of_all_languages(): @@ -105,8 +104,8 @@ def get_list_of_all_languages(): def export_illustration(): logger.info("Adding illustration") - src_illus_fpath = pathlib.Path(tmpl_path(), "favicon.png") - tmp_illus_fpath = pathlib.Path(TMP_FOLDER_PATH, "illustration.png") + src_illus_fpath = tmpl_path() / "favicon.png" + tmp_illus_fpath = TMP_FOLDER_PATH / "illustration.png" shutil.copy(src_illus_fpath, tmp_illus_fpath) @@ -152,18 +151,17 @@ def export_skeleton( "datatables", "fonts", ): - src = os.path.join(src_folder, fname) + assets_root = src_folder / fname # recursively add our assets, at a path identical to position in repo - assets_root = pathlib.Path(src) if assets_root.is_file(): Global.add_item_for(path=fname, fpath=assets_root) else: for fpath in assets_root.glob("**/*"): if not fpath.is_file() or fpath.name == "l10n.js": continue - path = str(fpath.relative_to(src)) - Global.add_item_for(path=os.path.join(fname, path), fpath=fpath) + path = str(fpath.relative_to(assets_root)) + Global.add_item_for(path=str(Path(fname) / path), fpath=fpath) # export homepage tpl_path = "Home.html" @@ -178,19 +176,20 @@ def export_skeleton( def export_all_books( - project_id, - download_cache, - concurrency, - languages, - formats, - only_books, - force, - title_search, - add_bookshelves, - s3_storage, - optimizer_version, - stats_filename, -): + project_id: str, + download_cache: Path, + concurrency: int, + languages: list[str], + formats: list[str], + only_books: list[int], + s3_storage: KiwixStorage | None, + optimizer_version: dict[str, str], + stats_filename: str | None, + *, + force: bool, + title_search: bool, + add_bookshelves: bool, +) -> None: books = get_list_of_filtered_books( languages=languages, formats=formats, only_books=only_books ) @@ -273,15 +272,15 @@ def nb_by_fmt(fmt): def dlb(b): export_book( b, - book_dir=pathlib.Path(download_cache).joinpath(str(b.id)), + book_dir=download_cache / str(b.id), formats=formats, books=books, project_id=project_id, + s3_storage=s3_storage, + optimizer_version=optimizer_version, force=force, title_search=title_search, add_bookshelves=add_bookshelves, - s3_storage=s3_storage, - optimizer_version=optimizer_version, ) Global.inc_progress() @@ -302,8 +301,8 @@ def report_progress(stats_filename=None): json.dump(progress, outfile, indent=2) -def html_content_for(book, src_dir): - html_fpath = src_dir.joinpath(fname_for(book, "html")) +def html_content_for(book: Book, src_dir): + html_fpath = src_dir / fname_for(book, "html") # is HTML file present? if not html_fpath.exists(): @@ -529,7 +528,7 @@ def cover_html_content_for( book, optimized_files_dir, books, project_id, title_search, add_bookshelves, formats ): cover_img = f"{book.id}_cover_image.jpg" - cover_img = cover_img if optimized_files_dir.joinpath(cover_img).exists() else None + cover_img = cover_img if (optimized_files_dir / cover_img).exists() else None translate_author = ( f' data-l10n-id="author-{book.author.name().lower()}"' if book.author.name() in ["Anonymous", "Various"] @@ -574,23 +573,24 @@ def save_author_file(author, books, project_id): def export_book( - book, - book_dir, - formats, - books, - project_id, - force, - title_search, - add_bookshelves, - s3_storage, - optimizer_version, + book: Book, + book_dir: Path, + formats: list[str], + books: list[Book], + project_id: str, + s3_storage: KiwixStorage | None, + optimizer_version: dict[str, str], + *, + force: bool, + title_search: bool, + add_bookshelves: bool, ): - optimized_files_dir = book_dir.joinpath("optimized") + optimized_files_dir = book_dir / "optimized" if optimized_files_dir.exists(): for fpath in optimized_files_dir.iterdir(): path = str(fpath.relative_to(optimized_files_dir)) Global.add_item_for(path=path, fpath=fpath) - unoptimized_files_dir = book_dir.joinpath("unoptimized") + unoptimized_files_dir = book_dir / "unoptimized" if unoptimized_files_dir.exists(): handle_unoptimized_files( book=book, @@ -614,14 +614,15 @@ def export_book( def handle_unoptimized_files( - book, - src_dir, - formats, - optimizer_version, - force, - s3_storage, + book: Book, + src_dir: Path, + formats: list[str], + optimizer_version: dict[str, str], + s3_storage: KiwixStorage | None, + *, + force: bool, ): - def copy_file(src, dst): + def copy_file(src: Path, dst: Path): logger.info(f"\t\tCopying from {src} to {dst}") try: shutil.copy2(src, dst) @@ -631,14 +632,13 @@ def copy_file(src, dst): print(line.strip()) # noqa: T201 return - def update_download_cache(unoptimized_file, optimized_file): + def update_download_cache(unoptimized_file: Path, optimized_file: Path): book_dir = unoptimized_file.parents[1] - optimized_dir = book_dir.joinpath("optimized") - unoptimized_dir = book_dir.joinpath("unoptimized") - if not optimized_dir.exists(): - optimized_dir.mkdir() - dst = optimized_dir.joinpath(optimized_file.name) - os.unlink(unoptimized_file) + optimized_dir = book_dir / "optimized" + unoptimized_dir = book_dir / "unoptimized" + optimized_dir.mkdir(exist_ok=True, parents=True) + dst = optimized_dir / optimized_file.name + unoptimized_file.unlink(missing_ok=True) copy_file(optimized_file.resolve(), dst.resolve()) if not list(unoptimized_dir.iterdir()): unoptimized_dir.rmdir() @@ -650,7 +650,7 @@ def update_download_cache(unoptimized_file, optimized_file): html_book_optimized_files = [] if html: article_name = article_name_for(book) - article_fpath = TMP_FOLDER_PATH.joinpath(article_name) + article_fpath = TMP_FOLDER_PATH / article_name if not article_fpath.exists() or force: logger.info(f"\t\tExporting to {article_fpath}") try: @@ -661,16 +661,14 @@ def update_download_cache(unoptimized_file, optimized_file): raise save_bs_output(new_html, article_fpath, UTF8) html_book_optimized_files.append(article_fpath) - update_download_cache( - src_dir.joinpath(fname_for(book, "html")), article_fpath - ) + update_download_cache(src_dir / fname_for(book, "html"), article_fpath) if not src_dir.exists(): return else: logger.info(f"\t\tSkipping HTML article {article_fpath}") Global.add_item_for(path=article_name, fpath=article_fpath) - def optimize_image(src, dst, *, force=False): + def optimize_image(src: Path, dst: Path, *, force: bool = False) -> Path | None: if dst.exists() and not force: logger.info(f"\tSkipping image optimization for {dst}") return dst @@ -699,7 +697,7 @@ def optimize_epub(src, dst): logger.info(f"\t\tCreating ePUB off {src} at {dst}") zipped_files = [] # create temp directory to extract to - tmpd = tempfile.mkdtemp(dir=TMP_FOLDER) + tmpd = Path(tempfile.mkdtemp(dir=TMP_FOLDER_PATH)).resolve() try: with zipfile.ZipFile(src, "r") as zf: @@ -711,23 +709,23 @@ def optimize_epub(src, dst): remove_cover = False for fname in zipped_files: - fnp = os.path.join(tmpd, fname) - if Path(fname).ext in (".png", ".jpeg", ".jpg", ".gif"): + fnp = tmpd / fname + if fnp.suffix in (".png", ".jpeg", ".jpg", ".gif"): # special case to remove ugly cover if fname.endswith("cover.jpg") and is_bad_cover(fnp): zipped_files.remove(fname) remove_cover = True else: - optimize_image(pathlib.Path(fnp), pathlib.Path(fnp), force=True) + optimize_image(fnp, fnp, force=True) - if Path(fname).ext in (".htm", ".html"): + if fnp.suffix in (".htm", ".html"): html_content, _ = read_file(fnp) html = update_html_for_static( book=book, html_content=html_content, formats=formats, epub=True ) save_bs_output(html, fnp, UTF8) - if Path(fname).ext == ".ncx": + if fnp.suffix == ".ncx": pattern = "*** START: FULL LICENSE ***" ncx, _ = read_file(fnp) soup = BeautifulSoup(ncx, "lxml-xml") @@ -744,11 +742,11 @@ def optimize_epub(src, dst): # delete {id}/cover.jpg if exist and update {id}/content.opf if remove_cover: # remove cover - Path(os.path.join(tmpd, text_type(book.id), "cover.jpg")).unlink_p() + (tmpd / text_type(book.id) / "cover.jpg").unlink(missing_ok=True) soup = None - opff = os.path.join(tmpd, text_type(book.id), "content.opf") - if os.path.exists(opff): + opff = tmpd / text_type(book.id) / "content.opf" + if opff.exists(): opff_content, _ = read_file(opff) soup = BeautifulSoup(opff_content, "lxml-xml") @@ -761,14 +759,14 @@ def optimize_epub(src, dst): # bundle epub as zip zip_epub(epub_fpath=dst, root_folder=tmpd, fpaths=zipped_files) - Path(tmpd).rmtree_p() + shutil.rmtree(tmpd, ignore_errors=True) def handle_companion_file( - fname, + fname: Path, book: Book, - dstfname=None, + dstfname: str | None = None, *, - force=False, + force: bool = False, as_ext=None, html_file_list=None, s3_storage=None, @@ -777,7 +775,7 @@ def handle_companion_file( src = fname if dstfname is None: dstfname = fname.name - dst = TMP_FOLDER_PATH.joinpath(dstfname) + dst = TMP_FOLDER_PATH / dstfname if dst.exists() and not force: logger.debug(f"\t\tSkipping already optimized companion {dstfname}") Global.add_item_for(path=dstfname, fpath=dst) @@ -804,7 +802,7 @@ def handle_companion_file( update_download_cache(src, dst) elif ext == ".epub": logger.info(f"\tCreating optimized EPUB file {fname}") - tmp_epub = tempfile.NamedTemporaryFile(suffix=".epub", dir=TMP_FOLDER) + tmp_epub = tempfile.NamedTemporaryFile(suffix=".epub", dir=TMP_FOLDER_PATH) tmp_epub.close() try: optimize_epub(src, tmp_epub.name) @@ -821,7 +819,7 @@ def handle_companion_file( as_ext=".zip", ) else: - Path(tmp_epub.name).move(str(dst)) + Path(tmp_epub.name).resolve().rename(dst) Global.add_item_for(path=dstfname, fpath=dst) if s3_storage: upload_to_cache( @@ -850,7 +848,7 @@ def handle_companion_file( if fpath.is_file() and fpath.name.startswith(f"{book.id}_"): if fpath.suffix in (".html", ".htm"): src = fpath - dst = TMP_FOLDER_PATH.joinpath(fpath.name) + dst = TMP_FOLDER_PATH / fpath.name if dst.exists() and not force: logger.debug(f"\t\tSkipping already optimized HTML {dst}") Global.add_item_for(path=fpath.name, fpath=dst) @@ -890,7 +888,7 @@ def handle_companion_file( for other_format in formats: if other_format not in book.formats() or other_format == "html": continue - book_file = src_dir.joinpath(fname_for(book, other_format)) + book_file = src_dir / fname_for(book, other_format) if book_file.exists(): try: handle_companion_file( @@ -916,7 +914,7 @@ def write_book_presentation_article( formats, ): article_name = article_name_for(book=book, cover=True) - cover_fpath = TMP_FOLDER_PATH.joinpath(article_name) + cover_fpath = TMP_FOLDER_PATH / article_name if not cover_fpath.exists() or force: logger.info(f"\t\tExporting article presentation to {cover_fpath}") html = cover_html_content_for( diff --git a/src/gutenberg2zim/rdf.py b/src/gutenberg2zim/rdf.py index ef29ee8..edfbec2 100644 --- a/src/gutenberg2zim/rdf.py +++ b/src/gutenberg2zim/rdf.py @@ -1,7 +1,7 @@ -import os -import pathlib import re import tarfile +from pathlib import Path +from tarfile import TarFile, TarInfo import peewee from bs4 import BeautifulSoup @@ -16,13 +16,13 @@ ) -def get_rdf_fpath(): +def get_rdf_fpath() -> Path: fname = "rdf-files.tar.bz2" - fpath = pathlib.Path(fname).resolve() + fpath = Path(fname).resolve() return fpath -def download_rdf_file(rdf_path, rdf_url): +def download_rdf_file(rdf_path: Path, rdf_url: str) -> None: """Download rdf-files archive""" if rdf_path.exists(): logger.info(f"\trdf-files archive already exists in {rdf_path}") @@ -32,13 +32,13 @@ def download_rdf_file(rdf_path, rdf_url): download_file(rdf_url, rdf_path) -def parse_and_fill(rdf_path, only_books): +def parse_and_fill(rdf_path: Path, only_books: list[int]) -> None: logger.info(f"\tLooping throught RDF files in {rdf_path}") rdf_tarfile = tarfile.open(name=rdf_path, mode="r|bz2") for rdf_member in rdf_tarfile: - rdf_member_path = pathlib.Path(rdf_member.name) + rdf_member_path = Path(rdf_member.name) # skip books outside of requested list if ( @@ -51,13 +51,13 @@ def parse_and_fill(rdf_path, only_books): if rdf_member_path.name == "pg0.rdf": continue - if not str(rdf_member_path.name).endswith(".rdf"): + if not rdf_member_path.name.endswith(".rdf"): continue parse_and_process_file(rdf_tarfile, rdf_member) -def parse_and_process_file(rdf_tarfile, rdf_member): +def parse_and_process_file(rdf_tarfile: TarFile, rdf_member: TarInfo) -> None: gid = re.match(r".*/pg([0-9]+).rdf", rdf_member.name).groups()[0] # type: ignore if Book.get_or_none(id=int(gid)): @@ -67,11 +67,19 @@ def parse_and_process_file(rdf_tarfile, rdf_member): return logger.info(f"\tParsing file {rdf_member.name} for book id {gid}") - parser = RdfParser(rdf_tarfile.extractfile(rdf_member).read(), gid).parse() + rdf_data = rdf_tarfile.extractfile(rdf_member) + if rdf_data is None: + logger.warning( + f"Unable to extract member '{rdf_member.name}' from archive " + f"'{rdf_member.name}'" + ) + return + + parser = RdfParser(rdf_data.read(), gid).parse() if parser.license == "None": logger.info(f"\tWARN: Unusable book without any information {gid}") - elif parser.title == "": + elif not parser.title: logger.info(f"\tWARN: Unusable book without title {gid}") else: save_rdf_in_database(parser) @@ -96,8 +104,8 @@ def parse(self): # The tile of the book: this may or may not be divided # into a new-line-seperated title and subtitle. # If it is, then we will just split the title. - self.title = soup.find("dcterms:title") - self.title = self.title.text if self.title else "- No Title -" + title = soup.find("dcterms:title") + self.title = title.text if title else "- No Title -" self.title = self.title.split("\n")[0] self.subtitle = " ".join(self.title.split("\n")[1:]) self.author_id = None @@ -174,7 +182,7 @@ def parse(self): return self -def save_rdf_in_database(parser): +def save_rdf_in_database(parser: RdfParser) -> None: # Insert author, if it not exists if parser.author_id: try: @@ -276,7 +284,7 @@ def save_rdf_in_database(parser): ) -def get_formatted_number(num): +def get_formatted_number(num: str | None) -> str | None: """ Get a formatted string of a number from a not-predictable-string that may or may not actually contain a number. @@ -297,9 +305,9 @@ def get_formatted_number(num): nums = [f"{i:0=5d}" for i in range(21000, 40000)] for num in nums: print(num) # noqa: T201 - curd = os.path.dirname(os.path.realpath(__file__)) - rdf = os.path.join(curd, "..", "rdf-files", num, "pg" + num + ".rdf") - if os.path.isfile(rdf): + curd = Path(__file__).resolve().parent + rdf = curd.parent / "rdf-files" / num / f"pg{num}.rdf" + if rdf.is_file(): data = "" with open(rdf) as f: data = f.read() diff --git a/src/gutenberg2zim/s3.py b/src/gutenberg2zim/s3.py index fad6b0c..f124dbb 100644 --- a/src/gutenberg2zim/s3.py +++ b/src/gutenberg2zim/s3.py @@ -1,11 +1,11 @@ -import os -import pathlib import zipfile +from pathlib import Path from kiwixstorage import KiwixStorage from pif import get_public_ip from gutenberg2zim.constants import TMP_FOLDER, logger +from gutenberg2zim.database import Book from gutenberg2zim.utils import archive_name_for @@ -25,20 +25,26 @@ def s3_credentials_ok(s3_url_with_credentials): def download_from_cache( - book, etag, book_format, dest_dir, s3_storage, optimizer_version -): + book: Book, + etag: str | None, + book_format: str, + dest_dir: Path, + s3_storage: KiwixStorage, + optimizer_version: dict[str, str] | None, +) -> bool: """whether it successfully downloaded from cache""" key = f"{book.id}/{book_format}" if not s3_storage.has_object(key): return False meta = s3_storage.get_object_stat(key).meta - if meta.get("etag") != etag: + if meta.get("etag") != etag: # type: ignore logger.error( - f"etag doesn't match for {key}. Expected {etag}, got {meta.get('etag')}" + f"etag doesn't match for {key}. " + f"Expected {etag}, got {meta.get('etag')}" # type: ignore ) return False if optimizer_version is not None and ( - meta.get("optimizer_version") != optimizer_version[book_format] + meta.get("optimizer_version") != optimizer_version[book_format] # type: ignore ): logger.error( f"optimizer version doesn't match for {key}. Expected " @@ -47,17 +53,17 @@ def download_from_cache( return False dest_dir.mkdir(parents=True, exist_ok=True) if book_format == "cover": - fpath = dest_dir.joinpath(f"{book.id}_cover_image.jpg") + fpath = dest_dir / f"{book.id}_cover_image.jpg" else: if book_format == "html": book_format = "zip" - fpath = dest_dir.joinpath(archive_name_for(book, book_format)) + fpath = dest_dir / archive_name_for(book, book_format) try: s3_storage.download_file(key, fpath) if book_format == "zip": with zipfile.ZipFile(fpath, "r") as zipfl: zipfl.extractall(dest_dir) - os.unlink(fpath) + fpath.unlink(missing_ok=True) except Exception as exc: logger.error(f"{key} failed to download from cache: {exc}") return False @@ -69,7 +75,7 @@ def upload_to_cache(book_id, asset, etag, book_format, s3_storage, optimizer_ver """whether it successfully uploaded to cache""" fpath = asset key = f"{book_id}/{book_format}" - zippath = pathlib.Path(f"{TMP_FOLDER}/{book_id}.zip") + zippath = Path(f"{TMP_FOLDER}/{book_id}.zip") if isinstance(asset, list): with zipfile.ZipFile(zippath, "w") as zipfl: for fl in asset: @@ -88,7 +94,6 @@ def upload_to_cache(book_id, asset, etag, book_format, s3_storage, optimizer_ver logger.error(f"{key} failed to upload to cache: {exc}") return False finally: - if zippath.exists(): - os.unlink(zippath) + zippath.unlink(missing_ok=True) logger.info(f"uploaded {fpath} to cache at {key}") return True diff --git a/src/gutenberg2zim/urls.py b/src/gutenberg2zim/urls.py index 94817e8..c3496c8 100644 --- a/src/gutenberg2zim/urls.py +++ b/src/gutenberg2zim/urls.py @@ -1,4 +1,3 @@ -import os import urllib.parse as urlparse from collections import defaultdict @@ -19,14 +18,14 @@ class UrlBuilder: SERVER_NAME = "aleph_pglaf_org" RSYNC = "rsync://aleph.pglaf.org/gutenberg/" - BASE_ONE = "http://aleph.pglaf.org/" - BASE_TWO = "http://aleph.pglaf.org/cache/epub/" + BASE_ONE = "http://aleph.pglaf.org" + BASE_TWO = "http://aleph.pglaf.org/cache/epub" BASE_THREE = "http://aleph.pglaf.org/etext" def __init__(self): self.base = self.BASE_ONE - def build(self): + def build(self) -> str: """ Build either an url depending on whether the base url is `BASE_ONE` or `BASE_TWO`. @@ -40,25 +39,24 @@ def build(self): """ if self.base == self.BASE_ONE: if int(self.b_id) > 10: # noqa: PLR2004 - base_url = os.path.join( - os.path.join(*list(str(self.b_id))[:-1]), str(self.b_id) - ) + components = "/".join(self.b_id[:-1]) + base_url = f"{components}/{self.b_id}" else: - base_url = os.path.join(os.path.join("0", str(self.b_id))) - url = os.path.join(self.base, base_url) + base_url = f"0/{self.b_id}" + url = f"{self.base}/{base_url}" elif self.base == self.BASE_TWO: - url = os.path.join(self.base, str(self.b_id)) + url = f"{self.base}/{self.b_id}" elif self.base == self.BASE_THREE: url = self.base return url # type: ignore - def with_base(self, base): + def with_base(self, base: str) -> None: self.base = base - def with_id(self, b_id): - self.b_id = b_id + def with_id(self, b_id: str | int) -> None: + self.b_id = str(b_id) - def __str__(self): + def __str__(self) -> str: return self.build_url() # type: ignore @@ -145,9 +143,9 @@ def build_epub(files): return [] name = "".join(["pg", b_id]) - url = os.path.join(u.build(), name + ".epub") - url_images = os.path.join(u.build(), name + "-images.epub") - url_noimages = os.path.join(u.build(), name + "-noimages.epub") + url = f"{u.build()}/{name}.epub" + url_images = f"{u.build()}/{name}-images.epub" + url_noimages = f"{u.build()}/{name}-noimages.epub" urls.extend([url, url_images, url_noimages]) return urls @@ -171,13 +169,13 @@ def build_pdf(files): for i in files: if "images" not in i["name"]: - url = os.path.join(u.build(), i["name"]) + url = f'{u.build()}/{i["name"]}' urls.append(url) - url_dash1 = os.path.join(u1.build(), b_id + "-" + "pdf" + ".pdf") - url_dash = os.path.join(u.build(), b_id + "-" + "pdf" + ".pdf") - url_normal = os.path.join(u.build(), b_id + ".pdf") - url_pg = os.path.join(u.build(), "pg" + b_id + ".pdf") + url_dash1 = f"{u1.build()}/{b_id}-pdf.pdf" + url_dash = f"{u.build()}/{b_id}-pdf.pdf" + url_normal = f"{u.build()}/{b_id}.pdf" + url_pg = f"{u.build()}/pg{b_id}.pdf" urls.extend([url_dash, url_normal, url_pg, url_dash1]) return list(set(urls)) @@ -198,17 +196,16 @@ def build_html(files): if all(["-h.html" not in file_names, "-h.zip" in file_names]): for i in files: - url = os.path.join(u.build(), i["name"]) + url = f'{u.build()}/{i["name"]}' urls.append(url) - url_zip = os.path.join(u.build(), b_id + "-h" + ".zip") - # url_utf8 = os.path.join(u.build(), b_id + '-8' + '.zip') - url_html = os.path.join(u.build(), b_id + "-h" + ".html") - url_htm = os.path.join(u.build(), b_id + "-h" + ".htm") + url_zip = f"{u.build()}/{b_id}-h.zip" + url_html = f"{u.build()}/{b_id}-h.html" + url_htm = f"{u.build()}/{b_id}-h.htm" u.with_base(UrlBuilder.BASE_TWO) name = "".join(["pg", b_id]) - html_utf8 = os.path.join(u.build(), name + ".html.utf8") + html_utf8 = f"{u.build()}/{name}.html.utf8" u.with_base(UrlBuilder.BASE_THREE) file_index = index_of_substring(files, ["html", "htm"]) @@ -219,7 +216,7 @@ def build_html(files): etext_names = [f"{i:0=2d}" for i in etext_nums] etext_urls = [] for i in etext_names: - etext_urls.append(os.path.join(u.build() + i, file_name)) + etext_urls.append(f"{u.build()}{i}/{file_name}") urls.extend([url_zip, url_htm, url_html, html_utf8]) urls.extend(etext_urls) @@ -227,7 +224,7 @@ def build_html(files): def setup_urls(force, books): - file_with_url = TMP_FOLDER_PATH.joinpath(f"file_on_{UrlBuilder.SERVER_NAME}") + file_with_url = TMP_FOLDER_PATH / f"file_on_{UrlBuilder.SERVER_NAME}" if file_with_url.exists() and not force: logger.info( @@ -291,5 +288,5 @@ def setup_urls(force, books): if __name__ == "__main__": - book = Book.get(id=9) + book = Book.get(id=84) print(get_urls(book)) # noqa: T201 diff --git a/src/gutenberg2zim/utils.py b/src/gutenberg2zim/utils.py index 21bbabe..6ca49cb 100644 --- a/src/gutenberg2zim/utils.py +++ b/src/gutenberg2zim/utils.py @@ -1,15 +1,14 @@ import collections import hashlib -import os import subprocess import sys import unicodedata import zipfile +from pathlib import Path import chardet import requests import six -from path import Path from zimscraperlib.download import save_large_file from gutenberg2zim.constants import logger @@ -34,25 +33,25 @@ NB_MAIN_LANGS = 5 -def book_name_for_fs(book): - return book.title.strip().replace("/", "-")[:230] +def book_name_for_fs(book: Book) -> str: + return book.title.strip().replace("/", "-")[:230] # type: ignore -def article_name_for(book, *, cover=False): - cover = "_cover" if cover else "" +def article_name_for(book: Book, *, cover: bool = False) -> str: + cover_suffix = "_cover" if cover else "" title = book_name_for_fs(book) - return f"{title}{cover}.{book.id}.html" + return f"{title}{cover_suffix}.{book.id}.html" -def archive_name_for(book, book_format): +def archive_name_for(book: Book, book_format: str) -> str: return f"{book_name_for_fs(book)}.{book.id}.{book_format}" -def fname_for(book, book_format): +def fname_for(book: Book, book_format: str) -> str: return f"{book.id}.{book_format}" -def get_etag_from_url(url): +def get_etag_from_url(url: str) -> str | None: try: response_headers = requests.head( # noqa: S113 url=url, allow_redirects=True @@ -69,7 +68,7 @@ def critical_error(message): sys.exit(1) -def normalize(text=None): +def normalize(text: str | None = None) -> str | None: return None if text is None else unicodedata.normalize("NFC", text) @@ -91,15 +90,14 @@ def exec_cmd(cmd): return subprocess.run(args).returncode -def download_file(url, fpath): +def download_file(url: str, fpath: Path) -> bool: fpath.parent.mkdir(parents=True, exist_ok=True) try: save_large_file(url, fpath) return True except Exception as exc: logger.error(f"Error while downloading from {url}: {exc}") - if fpath.exists(): - os.unlink(fpath) + fpath.unlink(missing_ok=True) return False @@ -163,28 +161,28 @@ def md5sum(fpath): return hashlib.md5(read_file(fpath)[0].encode("utf-8")).hexdigest() # noqa: S324 -def is_bad_cover(fpath): +def is_bad_cover(fpath: Path) -> bool: bad_sizes = [19263] bad_sums = ["a059007e7a2e86f2bf92e4070b3e5c73"] - if Path(fpath).size not in bad_sizes: + if fpath.stat().st_size not in bad_sizes: return False return md5sum(fpath) in bad_sums -def read_file_as(fpath, encoding="utf-8"): +def read_file_as(fpath: Path, encoding="utf-8") -> str: # logger.debug("opening `{}` as `{}`".format(fpath, encoding)) with open(fpath, encoding=encoding) as f: return f.read() -def guess_file_encoding(fpath): +def guess_file_encoding(fpath: Path) -> str | None: with open(fpath, "rb") as f: return chardet.detect(f.read()).get("encoding") -def read_file(fpath): +def read_file(fpath: Path): for encoding in ["utf-8", "iso-8859-1"]: try: return read_file_as(fpath, encoding), encoding @@ -201,10 +199,10 @@ def save_file(content, fpath, encoding=UTF8): f.write(content) -def zip_epub(epub_fpath, root_folder, fpaths): +def zip_epub(epub_fpath: Path, root_folder: Path, fpaths: list[str]) -> None: with zipfile.ZipFile(epub_fpath, "w", zipfile.ZIP_DEFLATED) as zf: for fpath in fpaths: - zf.write(os.path.join(root_folder, fpath), fpath) + zf.write(root_folder / fpath, fpath) def ensure_unicode(v): diff --git a/src/gutenberg2zim/zim.py b/src/gutenberg2zim/zim.py index d6817e7..d69e6b8 100644 --- a/src/gutenberg2zim/zim.py +++ b/src/gutenberg2zim/zim.py @@ -1,6 +1,7 @@ import datetime +import pathlib -from path import Path +from kiwixstorage import KiwixStorage from peewee import fn from gutenberg2zim.constants import logger @@ -13,23 +14,24 @@ def build_zimfile( - output_folder, - download_cache, - concurrency, - languages, - formats, - only_books, - force, - title_search, - add_bookshelves, - s3_storage, - optimizer_version, - zim_name, - title, - description, - stats_filename, - publisher, -): + output_folder: pathlib.Path, + download_cache: pathlib.Path, + concurrency: int, + languages: list[str], + formats: list[str], + only_books: list[int], + s3_storage: KiwixStorage | None, + optimizer_version: dict[str, str], + zim_name: str | None, + title: str | None, + description: str | None, + stats_filename: str | None, + publisher: str, + *, + force: bool, + title_search: bool, + add_bookshelves: bool, +) -> None: # actual list of languages with books sorted by most used nb = fn.COUNT(Book.language).alias("nb") db_languages = [ @@ -62,14 +64,14 @@ def build_zimfile( zim_name = "{}_{}.zim".format( project_id, datetime.datetime.now().strftime("%Y-%m") # noqa: DTZ005 ) - zim_path = output_folder.joinpath(zim_name) + zim_path = output_folder / zim_name - if Path(zim_name).exists() and not force: + if zim_path.exists() and not force: logger.info(f"ZIM file `{zim_name}` already exist.") return - elif Path(zim_name).exists(): + elif zim_path.exists(): logger.info(f"Removing existing ZIM file {zim_name}") - Path(zim_name).unlink() + zim_path.unlink(missing_ok=True) Global.setup( filename=zim_path, @@ -90,12 +92,12 @@ def build_zimfile( languages=languages, formats=formats, only_books=only_books, - force=force, - title_search=title_search, - add_bookshelves=add_bookshelves, s3_storage=s3_storage, optimizer_version=optimizer_version, stats_filename=stats_filename, + force=force, + title_search=title_search, + add_bookshelves=add_bookshelves, ) except Exception as exc: From 800eb95ecae2e96008461e74337f6e05a9f7ec53 Mon Sep 17 00:00:00 2001 From: Uchechukwu Orji Date: Sat, 13 Apr 2024 12:05:30 +0100 Subject: [PATCH 2/3] document base url covention --- src/gutenberg2zim/urls.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/gutenberg2zim/urls.py b/src/gutenberg2zim/urls.py index c3496c8..69ce956 100644 --- a/src/gutenberg2zim/urls.py +++ b/src/gutenberg2zim/urls.py @@ -18,6 +18,8 @@ class UrlBuilder: SERVER_NAME = "aleph_pglaf_org" RSYNC = "rsync://aleph.pglaf.org/gutenberg/" + # NOTE: All urls below should not end with a trailing slash + # as they will be added while building the urls for a book. BASE_ONE = "http://aleph.pglaf.org" BASE_TWO = "http://aleph.pglaf.org/cache/epub" BASE_THREE = "http://aleph.pglaf.org/etext" From 3a8250c4d53010a1b67a6afdf1f285f89fac3f8f Mon Sep 17 00:00:00 2001 From: Uchechukwu Orji Date: Mon, 22 Apr 2024 12:48:23 +0100 Subject: [PATCH 3/3] remove redundant calls to resolve --- src/gutenberg2zim/download.py | 4 ++-- src/gutenberg2zim/entrypoint.py | 8 +++----- src/gutenberg2zim/export.py | 2 +- src/gutenberg2zim/rdf.py | 2 +- src/gutenberg2zim/urls.py | 2 +- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/gutenberg2zim/download.py b/src/gutenberg2zim/download.py index b579a51..6a80f8c 100644 --- a/src/gutenberg2zim/download.py +++ b/src/gutenberg2zim/download.py @@ -71,10 +71,10 @@ def is_safe(fname): # move all extracted files to proper locations for zipped_file in zipped_files: # skip folders - if not Path(zipped_file).resolve().is_file(): + if not Path(zipped_file).is_file(): continue - src = (Path(tmpd) / zipped_file).resolve() + src = Path(tmpd) / zipped_file if src.exists(): fname = Path(zipped_file).name diff --git a/src/gutenberg2zim/entrypoint.py b/src/gutenberg2zim/entrypoint.py index 3aeb0b9..5a75ee0 100755 --- a/src/gutenberg2zim/entrypoint.py +++ b/src/gutenberg2zim/entrypoint.py @@ -222,12 +222,10 @@ def f(x): for zim_lang in zims: if do_zim: logger.info("BUILDING ZIM dynamically") - if one_lang_one_zim_folder: - output_folder = Path(one_lang_one_zim_folder).resolve() - else: - output_folder = Path(".").resolve() build_zimfile( - output_folder=output_folder, + output_folder=Path(one_lang_one_zim_folder).resolve() + if one_lang_one_zim_folder + else Path(".").resolve(), download_cache=dl_cache, concurrency=concurrency, languages=zim_lang, diff --git a/src/gutenberg2zim/export.py b/src/gutenberg2zim/export.py index dd45593..363d929 100644 --- a/src/gutenberg2zim/export.py +++ b/src/gutenberg2zim/export.py @@ -94,7 +94,7 @@ def save_bs_output(soup, fpath, encoding=UTF8): def tmpl_path() -> Path: - return (Path(gutenberg2zim.__file__).parent / "templates").resolve() + return Path(gutenberg2zim.__file__).parent / "templates" def get_list_of_all_languages(): diff --git a/src/gutenberg2zim/rdf.py b/src/gutenberg2zim/rdf.py index edfbec2..de37347 100644 --- a/src/gutenberg2zim/rdf.py +++ b/src/gutenberg2zim/rdf.py @@ -305,7 +305,7 @@ def get_formatted_number(num: str | None) -> str | None: nums = [f"{i:0=5d}" for i in range(21000, 40000)] for num in nums: print(num) # noqa: T201 - curd = Path(__file__).resolve().parent + curd = Path(__file__).parent rdf = curd.parent / "rdf-files" / num / f"pg{num}.rdf" if rdf.is_file(): data = "" diff --git a/src/gutenberg2zim/urls.py b/src/gutenberg2zim/urls.py index 69ce956..f0d2a92 100644 --- a/src/gutenberg2zim/urls.py +++ b/src/gutenberg2zim/urls.py @@ -290,5 +290,5 @@ def setup_urls(force, books): if __name__ == "__main__": - book = Book.get(id=84) + book = Book.get(id=9) print(get_urls(book)) # noqa: T201