diff --git a/.env.example b/.env.example index c2f4db0..0983a05 100644 --- a/.env.example +++ b/.env.example @@ -1,2 +1,4 @@ TOKEN_GITHUB= -PERSPECTIVE_API_KEY= \ No newline at end of file +PERSPECTIVE_API_KEY= +IMSLP_USERNAME= +IMSLP_PASSWORD= \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3c11445..2e2f55f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -75,5 +75,7 @@ jobs: env: TOKEN_GITHUB: ${{ secrets.TOKEN_GITHUB }} PERSPECTIVE_API_KEY: ${{ secrets.PERSPECTIVE_API_KEY }} + IMSLP_USERNAME: ${{ secrets.IMSLP_USERNAME }} + IMSLP_PASSWORD: ${{ secrets.IMSLP_PASSWORD }} TEST: ${{ matrix.test }} VERSION: ${{ github.head_ref || 'main' }} \ No newline at end of file diff --git a/.github/workflows/update-requirements.yml b/.github/workflows/update-requirements.yml index 17186e5..d37e774 100644 --- a/.github/workflows/update-requirements.yml +++ b/.github/workflows/update-requirements.yml @@ -76,6 +76,8 @@ jobs: env: TOKEN_GITHUB: ${{ secrets.TOKEN_GITHUB }} PERSPECTIVE_API_KEY: ${{ secrets.PERSPECTIVE_API_KEY }} + IMSLP_USERNAME: ${{ secrets.IMSLP_USERNAME }} + IMSLP_PASSWORD: ${{ secrets.IMSLP_PASSWORD }} TEST: ${{ matrix.test }} VERSION: ${{ github.head_ref || 'main' }} - name: Create pull request diff --git a/src/image2structure/fetch/imslp_fetcher.py b/src/image2structure/fetch/imslp_fetcher.py new file mode 100644 index 0000000..cb32678 --- /dev/null +++ b/src/image2structure/fetch/imslp_fetcher.py @@ -0,0 +1,240 @@ +from typing import Optional, List, Set +from dotenv import load_dotenv +from imslp import client +from mwclient.page import Page +from mwclient.image import Image + +from image2structure.fetch.fetcher import Fetcher, ScrapeResult, DownloadError + + +import requests +import datetime +import os +import imslp +import urllib.parse +import bs4 +import mwclient.page +import re + + +# Regular expression to extract the page count +IMSLP_REGEXP_PAGE_COUNT = re.compile(r"(\d+)\s*pp*\.*") +Image.MAX_IMAGE_PIXELS = 700000000 + + +# Source: https://github.com/jlumbroso/imslp/blob/main/imslp/interfaces/scraping.py +def fetch_images_metadata(page: mwclient.page.Page) -> list: + """ + Fetches the metadata associated with the images of an IMSLP page, as + specified by a `mwclient.page.Page` object. This contains the download + counter which is not available through the MediaWiki API and requires + scraping to obtain. + + :param page: + :return: + """ + + if page is None: + return list() + + esc_title = urllib.parse.quote(page.base_title.replace(" ", "_")) + + u = "https://imslp.org/wiki/{}".format(esc_title) + + r = requests.get(u) + if not r.ok: + return list() + + s = bs4.BeautifulSoup(r.content, features="html.parser") + + images = [] + + for f in page.images(): + + f_title = f.base_title + f_esc_title = urllib.parse.quote(f_title.replace(" ", "_")) + + # Hacky way of finding the relevant metadata + t1 = s.find(attrs={"href": "/wiki/File:{}".format(f_esc_title)}) + t2 = s.find(attrs={"title": "File:{}".format(f_title)}) + + if t1 is None and t2 is None: + continue + + t = t1 or t2 + if t.text.strip() == "": + continue + + page_count = None + m = IMSLP_REGEXP_PAGE_COUNT.search(t.parent.text) + if m is not None: + try: + page_count = int(m.group(1)) + except ValueError: + pass + + file_id = int(t.text.replace("#", "")) + + # Fix image URL + if f.imageinfo["url"][0] == "/": + # URL is //imslp.org/stuff... + f.imageinfo["url"] = "http:" + f.imageinfo["url"] + + images.append( + { + "id": file_id, + "title": f_title, + "url": f.imageinfo["url"], + "page_count": page_count, + "size": f.imageinfo.get("size"), + "obj": f, + } + ) + + return images + + +class ImslpFetcher(Fetcher): + """Fetcher for music scores from IMSLP.""" + + IMSLP_URL: str = "https://imslp.org/wiki/" + LIST_WORKS_COUNT: int = 100 + + def __init__( + self, + date_created_after: datetime.datetime, + date_created_before: datetime.datetime, + timeout: int, + verbose: bool, + ): + super().__init__(date_created_after, date_created_before, timeout, verbose) + self._page: int = 0 + load_dotenv() + + # Get the IMLSP client + username: Optional[str] = os.environ.get("IMSLP_USERNAME") + password: Optional[str] = os.environ.get("IMSLP_PASSWORD") + assert username is not None + assert password is not None + self._client = client.ImslpClient(username=username, password=password) + + # metadata + self._metadata: Optional[ + Set[imslp.interfaces.internal.HashablePageRecord] + ] = None + + def notify_change_dates(self): + self.change_internal_dates(days=1) + + def scrape(self, num_instances: int) -> List[ScrapeResult]: + """ + Scrape num_instances data points. + + Args: + num_instances: The number of instances to scrape. + + Returns: + List[ScrapeResult]: The results of the scraping. + + Raises: + ScrapeError: If the scraping fails. + """ + results: List[ScrapeResult] = [] + + while len(results) < num_instances: + if self._metadata is None or len(self._metadata) == 0: + if self._verbose: + print( + f"Fetching page {self._page} of IMSLP works ({self.LIST_WORKS_COUNT} per page)" + ) + self._metadata = set( + imslp.interfaces.internal.list_works( + start=self._page * self.LIST_WORKS_COUNT, + count=self.LIST_WORKS_COUNT, + cache=False, + ) + ) + self._page += 1 + + while len(self._metadata) > 0 and len(results) < num_instances: + result: imslp.interfaces.internal.HashablePageRecord = ( + self._metadata.pop() + ) + url: str = result["permlink"] + if not url.startswith(self.IMSLP_URL): + continue + + name: str = url.replace(self.IMSLP_URL, "") + page = Page(self._client._site, name) + image_metadatas = fetch_images_metadata(page) + + for metadata in image_metadatas: + if "obj" not in metadata or metadata["obj"] is None: + continue + + image: Image = metadata["obj"] + timestamp: str = image.imageinfo["timestamp"] + # Timestamp is formadted as "2021-10-10T10:10:10Z" + date: datetime.datetime = datetime.datetime.strptime( + timestamp, "%Y-%m-%dT%H:%M:%SZ" + ) + if ( + date < self._date_created_after + or date > self._date_created_before + ): + continue + + file_name: str = image.imageinfo["url"].split("/")[-1] + if not file_name.endswith(".pdf"): + continue + + total_num_pages: Optional[int] = metadata["page_count"] + if total_num_pages is None: + continue + + if self._verbose: + print( + f"\t- Found {file_name} with {total_num_pages} pages created at {date}" + ) + results.append( + ScrapeResult( + download_url=image.imageinfo["url"], + instance_name=file_name, + date=date, + additional_info={ + "total_num_pages": total_num_pages, + "metadata": metadata, + }, + ) + ) + + return results + + def download(self, download_path: str, scrape_result: ScrapeResult) -> None: + """ + Download the data from the given scrape result to the given destination path. + + Args: + download_path: The path to save the downloaded data to. + scrape_result: The result of the scraping. + + Returns: + None + + Raises: + DownloadError: If the download fails. + """ + if not os.path.exists(download_path): + raise DownloadError(f"Download path {download_path} does not exist.") + + if ( + "metadata" not in scrape_result.additional_info + or "obj" not in scrape_result.additional_info["metadata"] + or not isinstance(scrape_result.additional_info["metadata"]["obj"], Image) + ): + raise DownloadError("No metadata or invalid metadata in the scrape result.") + + image: Image = scrape_result.additional_info["metadata"]["obj"] + file_path: str = os.path.join(download_path, scrape_result.instance_name) + with open(file_path, "wb") as file: + image.download(file) diff --git a/src/image2structure/fetch/test_imslp_fetcher.py b/src/image2structure/fetch/test_imslp_fetcher.py new file mode 100644 index 0000000..ff3e507 --- /dev/null +++ b/src/image2structure/fetch/test_imslp_fetcher.py @@ -0,0 +1,43 @@ +import pytest +import datetime +import os + +from image2structure.fetch.imslp_fetcher import ImslpFetcher +from image2structure.fetch.fetcher import DownloadError, ScrapeResult + + +class TestImslpFetcher: + def setup_method(self, method): + self.fetcher = ImslpFetcher( + # Set large dates as we cannot filter on precise dates, + # so if the range is too small it might take a long time to get results + date_created_after=datetime.datetime(2010, 1, 1), + date_created_before=datetime.datetime(2020, 1, 1), + timeout=30, + verbose=False, + ) + + def test_scrape_runs(self): + results = self.fetcher.scrape(1) + assert len(results) == 1 + + def test_download_runs(self): + results = self.fetcher.scrape(1) + + # Download the first result + tmp_path = os.path.dirname(__file__) + self.fetcher.download(tmp_path, results[0]) + pdf_path: str = os.path.join(tmp_path, results[0].instance_name) + assert os.path.exists(pdf_path) + os.remove(pdf_path) + + def test_download_invalid_path(self): + with pytest.raises(DownloadError): + result = ScrapeResult( + download_url="http://imslp.org/images/3/3d/fake.pdf", + instance_name="fake.pdf", + additional_info={"page_count": 10}, + date=datetime.datetime.now(), + ) + self.fetcher.download("invalid_path", result) + assert not os.path.exists("invalid_path/fake.pdf")