diff --git a/app/main.py b/app/main.py index 4f01642..631193a 100644 --- a/app/main.py +++ b/app/main.py @@ -7,7 +7,7 @@ from app.plugins.sonos import get_data as get_data_sonos, proxy from app.plugins.speedtest import get_data as get_data_speedtest # from app.plugins.album import album_uploade_page, upload_image, delete_image, get_data -from app.plugins.icloud_album import get_data +from app.plugins.icloud_album import get_data as get_data_album from app.plugins.weather import get_data as get_data_weather from app.plugins.publictransportation import get_data as get_data_publictransportation from app.plugins.eoguide import get_data as get_data_eoguide @@ -90,7 +90,7 @@ async def delete_file(filename: str = Form(...)): @app.get("/album") @cache(expire=1800) async def album(): - return get_data() + return get_data_album() @app.get("/weather") diff --git a/app/plugins/icloud_album.py b/app/plugins/icloud_album.py index 93f3bb0..47483ab 100644 --- a/app/plugins/icloud_album.py +++ b/app/plugins/icloud_album.py @@ -1,72 +1,176 @@ import httpx - +import json +from typing import Dict, Any, List, Union from app import config - -def get_data(): - album_code = client_key = config.get_attribute(["icloud_album_id"]) - - base_url = _get_base_url(album_code) - - webstream_url = f"{base_url}webstream" - webasseturls_url = f"{base_url}webasseturls" - - x = httpx.post(webstream_url, json={"streamCtag":None}) - webstream_data = x.json() - - if x.status_code == 330: - new_host = webstream_data.get("X-Apple-MMe-Host") - - base_url = f"https://{new_host}/{album_code}/sharedstreams/" - webstream_url = f"{base_url}webstream" - webasseturls_url = f"{base_url}webasseturls" - - x = httpx.post(webstream_url, json={"streamCtag":None}) - webstream_data = x.json() - - photos = [] - for item in webstream_data['photos']: - photos.append(item['photoGuid']) - - y = httpx.post(webasseturls_url, json={"photoGuids":photos}) - webasset_data = y.json() - - images = [] - for img_key in webasset_data['items'].keys(): - img = webasset_data['items'][img_key] - image_url = f"https://{img['url_location']}{img['url_path']}" - images.append(image_url) - - return {"images": images} - -def _get_base_url(token: str) -> str: +# Static headers +HEADERS = { + "Origin": "https://www.icloud.com", + "Accept-Language": "en-US,en;q=0.8", + "User-Agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36" + ), + "Content-Type": "text/plain", + "Accept": "*/*", + "Referer": "https://www.icloud.com/sharedalbum/", + "Connection": "keep-alive", +} + +def get_data() -> Dict[str, List[str]]: + """ + Fetches enriched image URLs from iCloud shared albums. + :return: A dictionary with a list of image URLs. + """ + token = config.get_attribute(["icloud_album_id"]) + images = get_images(token) + + urls = [ + max(photo["derivatives"].values(), key=lambda x: x["fileSize"])["url"] + for photo in images["photos"] + if "derivatives" in photo + ] + + return {"images": urls} + +def chunk_list(lst: List[Any], chunk_size: int) -> List[List[Any]]: + """ + Splits a list into smaller chunks of a specified size. + :param lst: The list to chunk. + :param chunk_size: Size of each chunk. + :return: List of chunks. + """ + return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)] + +def get_images(token: str) -> Dict[str, Any]: + """ + Retrieves images and their metadata enriched with URLs. + :param token: The authentication token. + :return: A dictionary containing metadata and enriched photos. + """ + base_url = get_base_url(token) + redirected_base_url = get_redirected_base_url(base_url, token) + api_response = get_api_response(redirected_base_url) + + chunks = chunk_list(api_response["photoGuids"], 25) + all_urls = {guid: url for chunk in chunks for guid, url in get_urls(redirected_base_url, chunk).items()} + + return { + "metadata": api_response["metadata"], + "photos": enrich_images_with_urls(api_response, all_urls), + } + +def get_base_url(token: str) -> str: + """ + Constructs the base URL for accessing shared streams. + :param token: The authentication token. + :return: The base URL. + """ BASE_62_CHAR_SET = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' - def base62_to_int(e: str) -> int: - t = 0 - for char in e: - t = t * 62 + BASE_62_CHAR_SET.index(char) - return t - - e = token - t = e[0] - n = base62_to_int(e[1]) if t == 'A' else base62_to_int(e[1:3]) - i = e.find(';') - r = e - s = None - - if i >= 0: - s = e[i + 1:] - r = r.replace(';' + s, '') - - server_partition = n - - base_url = 'https://p' - base_url += f"{server_partition:02d}-sharedstreams.icloud.com" - base_url += f"/{token}/sharedstreams/" + def base62_to_int(value: str) -> int: + return sum(BASE_62_CHAR_SET.index(char) * (62 ** idx) for idx, char in enumerate(reversed(value))) + partition = base62_to_int(token[1]) if token[0] == 'A' else base62_to_int(token[1:3]) + base_url = f"https://p{partition:02d}-sharedstreams.icloud.com/{token}/sharedstreams/" return base_url +def get_redirected_base_url(base_url: str, token: str) -> str: + """ + Resolves potential redirections for the base URL. + :param base_url: The original base URL. + :param token: The authentication token. + :return: The redirected URL or the original URL if no redirection occurred. + """ + url = f"{base_url}webstream" + response = httpx.post(url, headers=HEADERS, json={"streamCtag": None}, follow_redirects=False) + + if response.status_code == 330: + new_host = response.json()["X-Apple-MMe-Host"] + return f"https://{new_host}/{token}/sharedstreams/" + + response.raise_for_status() + return base_url -if __name__ == "__main__": - get_data() \ No newline at end of file +def get_api_response(base_url: str) -> Dict[str, Any]: + """ + Retrieves metadata and photos from the API. + :param base_url: The API base URL. + :return: Parsed JSON response containing metadata and photos. + """ + url = f"{base_url}webstream" + response = httpx.post(url, headers=HEADERS, json={"streamCtag": None}) + response.raise_for_status() + data = response.json() + + return { + "metadata": { + "streamName": data["streamName"], + "userFirstName": data["userFirstName"], + "userLastName": data["userLastName"], + "streamCtag": data["streamCtag"], + "itemsReturned": int(data["itemsReturned"]), + "locations": data["locations"], + }, + "photoGuids": [photo["photoGuid"] for photo in data["photos"]], + "photos": { + photo["photoGuid"]: { + **photo, + "batchDateCreated": parse_date(photo["batchDateCreated"]), + "dateCreated": parse_date(photo["dateCreated"]), + "height": int(photo["height"]), + "width": int(photo["width"]), + "derivatives": [ + {**value, "fileSize": int(value["fileSize"]), "width": int(value["width"]), "height": int(value["height"])} + for value in photo["derivatives"].values() + ], + } + for photo in data["photos"] + }, + } + +def parse_date(date: str) -> Union[str, None]: + """ + Parses a date string to ensure consistent format. + :param date: The date string. + :return: The parsed date or None on failure. + """ + try: + return date + except Exception: + return None + +def get_urls(base_url: str, photo_guids: List[str]) -> Dict[str, str]: + """ + Retrieves URLs for a batch of photo GUIDs. + :param base_url: The API base URL. + :param photo_guids: A list of photo GUIDs. + :return: A dictionary mapping GUIDs to URLs. + """ + url = f"{base_url}webasseturls" + response = httpx.post(url, headers=HEADERS, json={"photoGuids": photo_guids}) + response.raise_for_status() + return { + item_id: f"https://{item['url_location']}{item['url_path']}" + for item_id, item in response.json()["items"].items() + } + +def enrich_images_with_urls(api_response: Dict[str, Any], urls: Dict[str, str]) -> List[Dict[str, Any]]: + """ + Enriches photo metadata with derivative URLs. + :param api_response: The API response containing photos and metadata. + :param urls: A dictionary of checksums to URLs. + :return: A list of enriched photo objects. + """ + photos = list(api_response["photos"].values()) + enriched_photos = [] + + for photo in photos: + derivatives = { + str(derivative["height"]): {**derivative, "url": urls[derivative["checksum"]]} + for derivative in photo["derivatives"] + if derivative["checksum"] in urls + } + enriched_photos.append({**photo, "derivatives": derivatives}) + + return enriched_photos