diff --git a/nazurin/sites/kemono/api.py b/nazurin/sites/kemono/api.py index 9f39b400..05ac75e1 100644 --- a/nazurin/sites/kemono/api.py +++ b/nazurin/sites/kemono/api.py @@ -1,7 +1,7 @@ import os from datetime import datetime, timezone from mimetypes import guess_type -from typing import Tuple +from typing import ClassVar, Tuple, Union from bs4 import BeautifulSoup @@ -15,10 +15,12 @@ class Kemono: + API_BASE: ClassVar[str] = "https://kemono.su/api/v1" + @network_retry async def get_post(self, service: str, user_id: str, post_id: str) -> dict: """Fetch an post.""" - api = f"https://kemono.su/api/v1/{service}/user/{user_id}/post/{post_id}" + api = f"{self.API_BASE}/{service}/user/{user_id}/post/{post_id}" async with Request() as request: async with request.get(api) as response: response.raise_for_status() @@ -29,6 +31,27 @@ async def get_post(self, service: str, user_id: str, post_id: str) -> dict: post["username"] = username return post + @network_retry + async def get_post_revision( + self, service: str, user_id: str, post_id: str, revision_id: str + ) -> dict: + """Fetch a post revision.""" + api = f"{self.API_BASE}/{service}/user/{user_id}/post/{post_id}/revisions" + async with Request() as request: + async with request.get(api) as response: + response.raise_for_status() + revisions = await response.json() + post = None + for revision in revisions: + if str(revision["revision_id"]) == revision_id: + post = revision + break + if not post: + raise NazurinError("Post revision not found") + username = await self.get_username(service, user_id) + post["username"] = username + return post + @network_retry async def get_username(self, service: str, user_id: str) -> str: url = f"https://kemono.su/{service}/user/{user_id}" @@ -43,8 +66,13 @@ async def get_username(self, service: str, user_id: str) -> str: username = tag.get("content", "") return username - async def fetch(self, service: str, user_id: str, post_id: str) -> Illust: - post = await self.get_post(service, user_id, post_id) + async def fetch( + self, service: str, user_id: str, post_id: str, revision_id: Union[str, None] + ) -> Illust: + if revision_id: + post = await self.get_post_revision(service, user_id, post_id, revision_id) + else: + post = await self.get_post(service, user_id, post_id) caption = self.build_caption(post) images = [] @@ -113,16 +141,24 @@ def parse_time(time: str) -> str: filename = FILENAME.format_map(context) return (DESTINATION.format_map(context), filename + extension) + @staticmethod + def get_url(post: dict) -> str: + url = ( + f"https://kemono.su/{post['service']}" + f"/user/{post['user']}/post/{post['id']}" + ) + revision = post.get("revision_id") + if revision: + url += f"/revision/{post['revision_id']}" + return url + @staticmethod def build_caption(post) -> Caption: return Caption( { "title": post["title"], "author": "#" + post["username"], - "url": ( - f"https://kemono.su/{post['service']}" - f"/user/{post['user']}/post/{post['id']}" - ), + "url": Kemono.get_url(post), } ) diff --git a/nazurin/sites/kemono/interface.py b/nazurin/sites/kemono/interface.py index a5a6ca0e..02ebf106 100644 --- a/nazurin/sites/kemono/interface.py +++ b/nazurin/sites/kemono/interface.py @@ -14,7 +14,8 @@ # https://kemono.party/dlsite/user/RG12345/post/RE12345 # https://kemono.party/gumroad/user/12345/post/aBc1d2 # https://kemono.su/subscribestar/user/abcdef/post/12345 - r"kemono\.(?:party|su)/(\w+)/user/([\w-]+)/post/([\w-]+)" + # https://kemono.su/fanbox/user/12345/post/12345/revision/12345 + r"kemono\.(?:party|su)/(\w+)/user/([\w-]+)/post/([\w-]+)(?:/revision/(\d+))?" ] @@ -22,10 +23,12 @@ async def handle(match) -> Illust: service = match.group(1) user_id = match.group(2) post_id = match.group(3) + revision_id = match.group(4) db = Database().driver() collection = db.collection(COLLECTION) - illust = await Kemono().fetch(service, user_id, post_id) + illust = await Kemono().fetch(service, user_id, post_id, revision_id) illust.metadata["collected_at"] = time() - await collection.insert("_".join([service, user_id, post_id]), illust.metadata) + identifier = filter(lambda x: x, [service, user_id, post_id, revision_id]) + await collection.insert("_".join(identifier), illust.metadata) return illust