-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add asynchronous Platzi downloader
Implements an asynchronous downloader for Platzi courses using Playwright. Includes features for login, logout, course downloading, and saving web pages as MHTML.
- Loading branch information
Showing
11 changed files
with
826 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .async_api import AsyncPlatzi | ||
from .m3u8 import m3u8_dl | ||
|
||
__all__ = ["m3u8_dl", "AsyncPlatzi"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
import functools | ||
import json | ||
from pathlib import Path | ||
|
||
from playwright.async_api import BrowserContext, Page, async_playwright | ||
|
||
from .collectors import get_chapters_urls, get_course_title, get_unit | ||
from .constants import HEADERS, LOGIN_DETAILS_URL, LOGIN_URL, SESSION_FILE | ||
from .helpers import read_json, write_json | ||
from .logger import Logger | ||
from .m3u8 import m3u8_dl | ||
from .models import TypeUnit, User | ||
from .utils import progressive_scroll, slugify | ||
|
||
|
||
def login_required(func): | ||
@functools.wraps(func) | ||
async def wrapper(*args, **kwargs): | ||
self = args[0] | ||
if not isinstance(self, AsyncPlatzi): | ||
Logger.error(f"{login_required.__name__} can only decorate Platzi class.") | ||
return | ||
if not self.loggedin: | ||
Logger.error("Login first!") | ||
return | ||
return await func(*args, **kwargs) | ||
|
||
return wrapper | ||
|
||
|
||
def try_except_request(func): | ||
@functools.wraps(func) | ||
async def wrapper(*args, **kwargs): | ||
self = args[0] | ||
if not isinstance(self, AsyncPlatzi): | ||
Logger.error( | ||
f"{try_except_request.__name__} can only decorate Platzi class." | ||
) | ||
return | ||
|
||
try: | ||
return await func(*args, **kwargs) | ||
except Exception as e: | ||
if str(e): | ||
Logger.error(e) | ||
return | ||
|
||
return wrapper | ||
|
||
|
||
class AsyncPlatzi: | ||
def __init__(self, headless=False): | ||
self.loggedin = False | ||
self.headless = headless | ||
self.user = None | ||
|
||
async def __aenter__(self): | ||
self._playwright = await async_playwright().start() | ||
self._browser = await self._playwright.chromium.launch(headless=self.headless) | ||
self._context = await self._browser.new_context( | ||
java_script_enabled=True, | ||
is_mobile=True, | ||
) | ||
|
||
try: | ||
await self._load_state() | ||
except Exception: | ||
pass | ||
|
||
await self._set_profile() | ||
|
||
return self | ||
|
||
async def __aexit__(self, exc_type, exc, tb): | ||
await self._context.close() | ||
await self._browser.close() | ||
await self._playwright.stop() | ||
|
||
@property | ||
async def page(self) -> Page: | ||
return await self._context.new_page() | ||
|
||
@property | ||
def context(self) -> BrowserContext: | ||
return self._context | ||
|
||
@try_except_request | ||
async def _set_profile(self) -> None: | ||
try: | ||
data = await self.get_json(LOGIN_DETAILS_URL) | ||
self.user = User(**data) | ||
except Exception: | ||
return | ||
|
||
if self.user.is_authenticated: | ||
self.loggedin = True | ||
Logger.info(f"Hi, {self.user.username}!") | ||
|
||
@try_except_request | ||
async def login(self) -> None: | ||
Logger.info("Please login, in the opened browser") | ||
Logger.info("You have to login manually, you have 2 minutes to do it") | ||
|
||
page = await self.page | ||
await page.goto(LOGIN_URL) | ||
try: | ||
avatar = await page.wait_for_selector( | ||
".styles-module_Menu__Avatar__FTuh-", | ||
timeout=2 * 60 * 1000, | ||
) | ||
if avatar: | ||
self.loggedin = True | ||
await self._save_state() | ||
Logger.info("Logged in successfully") | ||
except Exception: | ||
raise Exception("Login failed") | ||
finally: | ||
await page.close() | ||
|
||
@try_except_request | ||
async def logout(self): | ||
SESSION_FILE.unlink(missing_ok=True) | ||
Logger.info("Logged out successfully") | ||
|
||
@try_except_request | ||
@login_required | ||
async def download(self, url: str, **kwargs): | ||
page = await self.page | ||
await page.goto(url) | ||
|
||
# course title | ||
course_title = await get_course_title(page) | ||
Logger.print(course_title, "[COURSE]") | ||
|
||
# download directory | ||
DL_DIR = Path("Platzi") / slugify(course_title) | ||
DL_DIR.mkdir(parents=True, exist_ok=True) | ||
|
||
# save page as mhtml | ||
await self.save_page( | ||
page, | ||
path=DL_DIR / "presentation.mhtml", | ||
) | ||
|
||
# iterate over chapters | ||
chapters_urls = await get_chapters_urls(page) | ||
for idx, (title, urls) in enumerate(chapters_urls, 1): | ||
print(f"{title}") | ||
|
||
CHAP_DIR = DL_DIR / f"{idx:02}_{slugify(title)}" | ||
CHAP_DIR.mkdir(parents=True, exist_ok=True) | ||
|
||
# iterate over units | ||
for jdx, unit_url in enumerate(urls, 1): | ||
unit = await get_unit(self.context, unit_url) | ||
name = f"{jdx:02}_{slugify(unit.title)}" | ||
|
||
if unit.video: | ||
dst = CHAP_DIR / f"{name}.mp4" | ||
Logger.print(f"[{name}.mp4]", "[DOWNLOADING][VIDEO]") | ||
await m3u8_dl(unit.video.url, dst.as_posix(), headers=HEADERS) | ||
|
||
if unit.type == TypeUnit.LECTURE: | ||
Logger.print(f"[{name}.mhtml]", "[DOWNLOADING][LECTURE]") | ||
await self.save_page( | ||
unit.url, | ||
path=CHAP_DIR / f"{name}.mhtml", | ||
) | ||
|
||
print("=" * 100) | ||
|
||
@try_except_request | ||
async def save_page(self, src: str | Page, path: str = "source.mhtml"): | ||
if isinstance(src, str): | ||
page = await self.page | ||
await page.goto(src) | ||
else: | ||
page = src | ||
|
||
await progressive_scroll(page) | ||
|
||
try: | ||
client = await page.context.new_cdp_session(page) | ||
response = await client.send("Page.captureSnapshot") | ||
with open(path, "w", encoding="utf-8", newline="\n") as file: | ||
file.write(response["data"]) | ||
except Exception: | ||
raise Exception("Error saving page as mhtml") | ||
|
||
if isinstance(src, str): | ||
await page.close() | ||
|
||
@try_except_request | ||
async def get_json(self, url: str) -> dict: | ||
page = await self.page | ||
await page.goto(url) | ||
content = await page.locator("pre").first.text_content() | ||
await page.close() | ||
return json.loads(content or "{}") | ||
|
||
async def _save_state(self): | ||
cookies = await self.context.cookies() | ||
write_json(SESSION_FILE, cookies) | ||
|
||
async def _load_state(self): | ||
SESSION_FILE.touch() | ||
cookies = read_json(SESSION_FILE) | ||
await self.context.add_cookies(cookies) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import asyncio | ||
|
||
import typer | ||
from typing_extensions import Annotated | ||
|
||
from platzi import AsyncPlatzi | ||
|
||
app = typer.Typer(rich_markup_mode="rich") | ||
|
||
|
||
@app.command() | ||
def login(): | ||
""" | ||
Open a browser window to Login to Platzi. | ||
Usage: | ||
platzi login | ||
""" | ||
asyncio.run(_login()) | ||
|
||
|
||
@app.command() | ||
def logout(): | ||
""" | ||
Delete the Platzi session from the local storage. | ||
Usage: | ||
platzi logout | ||
""" | ||
asyncio.run(_logout()) | ||
|
||
|
||
@app.command() | ||
def download( | ||
url: Annotated[ | ||
str, | ||
typer.Argument( | ||
help="The URL of the course to download", | ||
show_default=False, | ||
), | ||
], | ||
): | ||
""" | ||
Download a Platzi course from the given URL. | ||
Arguments: | ||
url: str - The URL of the course to download. | ||
Usage: | ||
platzi download <url> | ||
Example: | ||
platzi download https://platzi.com/cursos/fastapi-2023/ | ||
""" | ||
asyncio.run(_download(url)) | ||
|
||
|
||
async def _login(): | ||
async with AsyncPlatzi() as platzi: | ||
await platzi.login() | ||
|
||
|
||
async def _logout(): | ||
async with AsyncPlatzi() as platzi: | ||
await platzi.logout() | ||
|
||
|
||
async def _download(url: str): | ||
async with AsyncPlatzi() as platzi: | ||
await platzi.download(url) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
from playwright.async_api import BrowserContext, Page | ||
|
||
from .constants import PLATZI_URL | ||
from .models import TypeUnit, Unit, Video | ||
from .utils import get_m3u8_url, get_subtitles_url | ||
|
||
|
||
async def get_course_title(page: Page) -> str: | ||
SELECTOR = ".Hero-content-title" | ||
EXCEPTION = Exception("No course title found") | ||
try: | ||
title = await page.locator(SELECTOR).first.text_content() | ||
if not title: | ||
raise EXCEPTION | ||
except Exception: | ||
await page.close() | ||
raise EXCEPTION | ||
|
||
return title | ||
|
||
|
||
async def get_chapters_urls(page: Page) -> list[tuple[str, list[str]]]: | ||
SELECTOR = ".Content-feed div.ContentBlock" | ||
EXCEPTION = Exception("No sections found") | ||
try: | ||
locator = page.locator(SELECTOR) | ||
items = [] | ||
for i in range(await locator.count()): | ||
title = await locator.nth(i).locator("h3").first.text_content() | ||
|
||
if not title: | ||
raise EXCEPTION | ||
|
||
block_list_locator = locator.nth(i).locator(".ContentBlock-list a") | ||
|
||
urls: list[str] = [] | ||
for j in range(await block_list_locator.count()): | ||
url = await block_list_locator.nth(j).get_attribute("href") | ||
|
||
if not url: | ||
raise EXCEPTION | ||
|
||
urls.append(PLATZI_URL + url) | ||
|
||
items.append((title, urls)) | ||
|
||
except Exception as e: | ||
await page.close() | ||
raise EXCEPTION from e | ||
|
||
return items | ||
|
||
|
||
async def get_unit(context: BrowserContext, url: str) -> Unit: | ||
TYPE_SELECTOR = ".VideoPlayer" | ||
TITLE_SELECTOR = ".MaterialDesktopHeading_MaterialDesktopHeading-info__title__DaYr2" | ||
EXCEPTION = Exception("Could not collect unit data") | ||
|
||
try: | ||
page = await context.new_page() | ||
await page.goto(url) | ||
|
||
title = await page.locator(TITLE_SELECTOR).first.text_content() | ||
|
||
if not title: | ||
raise EXCEPTION | ||
|
||
if await page.locator(TYPE_SELECTOR).count() == 0: | ||
type = TypeUnit.LECTURE | ||
video = None | ||
|
||
else: | ||
content = await page.content() | ||
type = TypeUnit.VIDEO | ||
video = Video( | ||
url=get_m3u8_url(content), | ||
subtitles_url=get_subtitles_url(content), | ||
) | ||
|
||
return Unit( | ||
url=url, | ||
title=title, | ||
type=type, | ||
video=video, | ||
) | ||
|
||
except Exception: | ||
raise EXCEPTION | ||
|
||
finally: | ||
await page.close() |
Oops, something went wrong.