diff --git a/.gitignore b/.gitignore index eec4b5e9..c4bcffc9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,13 +1,128 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class -# Folder -bin -lib64 -*__pycache__ -pyvenv.cfg -.idea - -# Project specific -videos/ -tmp/ -debug.log -run.exe \ No newline at end of file +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# Other +Video \ No newline at end of file diff --git a/LICENSE b/LICENSE.txt similarity index 100% rename from LICENSE rename to LICENSE.txt diff --git a/README.md b/README.md index e20384ba..e82f0224 100644 --- a/README.md +++ b/README.md @@ -1,94 +1,153 @@ -

- video working +

+

-## Overview. +# Overview. + This repository provide a simple script designed to facilitate the downloading of films and series from a popular streaming community platform. The script allows users to download individual films, entire series, or specific episodes, providing a seamless experience for content consumers. ## Join us -You can chat, help improve this repo, or just hang around for some fun in the **Git_StreamingCommunity** Discord [Server](https://discord.gg/c3JSUM5Hqw) - +You can chat, help improve this repo, or just hang around for some fun in the **Git_StreamingCommunity** Discord [Server](https://discord.gg/by8UsqhPWx) # Table of Contents + * [INSTALLATION](#installation) - * [Requirement](#requirement) - * [Usage](#usage) - * [Update](#update) -* [FEATURES](#features) + * [Requirement](#requirement) + * [Usage](#usage) + * [Update](#update) * [USAGE AND OPTIONS](#options) * [TUTORIAL](#tutorial) ## Requirement + Make sure you have the following prerequisites installed on your system: + * python > [3.11](https://www.python.org/downloads/) * ffmpeg [win](https://www.gyan.dev/ffmpeg/builds/) - ## Installation + Install the required Python libraries using the following command: + ``` pip install -r requirements.txt ``` ## Usage + Run the script with the following command: #### On Windows: + ```powershell python run.py ``` #### On Linux/MacOS: + ```bash python3 run.py ``` - ## Update + Keep your script up to date with the latest features by running: #### On Windows: + ```powershell python update.py ``` #### On Linux/MacOS: + ```bash python3 update.py ``` - -## Features -- Download Single Film: Easily download individual movies with a simple command. -- Download Specific Episodes or Entire Series: Seamlessly retrieve specific episodes or entire series using intuitive commands. Specify a range of episodes with square brackets notation, e.g., [5-7], or download all episodes with an asterisk (*). -- Download Subtitles: Automatically fetch subtitles if available for downloaded content. (Note: To disable this feature, see [Configuration](#configuration)) -- Sync Audio and Video: Ensure perfect synchronization between audio and video during the download process for an enhanced viewing experience. - ## Configuration + You can change some behaviors by tweaking the configuration file. ```json { - "root_path": "videos", - "movies_folder_name": "Movies", - "series_folder_name": "Series", - "download_subtitles": true, - "download_default_language": true, - "selected_language": "English", - "max_worker": 20 + "DEFAULT": { + "debug": false, + "get_info": false, + "show_message": true, + "clean_console": true, + "get_moment_title": false, + "root_path": "videos", + "movies_folder_name": "Movies", + "series_folder_name": "Series", + "anime_folder_name": "Anime", + "not_close": false, + "swith_anime": false + }, + "SITE": { + "streaming_site_name": "streamingcommunity", + "streaming_domain": "forum", + "anime_site_name": "animeunity", + "anime_domain": "to" + }, + "M3U8": { + "tdqm_workers": 20, + "tqdm_progress_timeout": 10, + "minium_ts_files_in_folder": 15, + "donwload_percentage": 1, + "requests_timeout": 5, + "enable_time_quit": false, + "tqdm_show_progress": false, + "cleanup_tmp_folder": true + }, + "M3U8_OPTIONS": { + "download_audio": true, + "download_subtitles": true, + "specific_list_audio": [ + "ita" + ], + "specific_list_subtitles": [ + "eng" + ], + } } - ``` + #### Options -| Key | Default Value | Description | Value Example | -|---------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------|--------------------------| -| root_path | videos | Path where the script will add movies and tv series folders (see [Path Examples](#Path-examples)). Do not put trailing slash. | media/streamingcommunity | -| movies_folder_name | Movies | The folder name where all the movies will be placed. Do not put trailing slash. | downloaded-movies | -| series_folder_name | Series | The folder name where all the TV Series will be placed. Do not put trailing slash. | mytvseries | -| download_subtitles | true | Whether or not you want all the found subtitles to be downloaded. | false | -| download_default_language | true | Whether or not you want to download only the default Italian audio language. | false | -| selected_language | English | If `"download_default_language"` is `False` the script will download this language. | French | -| max_worker | 20 | How many workers will cooperate to download .ts file. **High value may slow down your pc**. | 30 | + +| Key | Default Value | Description | Value Example | +| -------------------------- | ------------- | --------------------------------------------------------------------------------------------------------------------------- | ------------------------ | +| DEFAULT | | Contains default configuration options for users. | | +| debug | false | Whether debugging information should be displayed or not. | true | +| get_info | false | Whether additional information should be fetched or not with debug enable. | true | +| show_message | true | Whether messages should be displayed to the user or not. | false | +| clean_console | true | Whether the console should be cleared before displaying new information or not. | false | +| get_moment_title | false | Whether to fetch the title of the moment or not. | true | +| root_path | videos | Path where the script will add movies and TV series folders (see[Path Examples](#Path-examples)). | media/streamingcommunity | +| movies_folder_name | Movies | The folder name where all the movies will be placed. Do not put a trailing slash. | downloaded-movies | +| series_folder_name | Series | The folder name where all the TV series will be placed. Do not put a trailing slash. | mytvseries | +| anime_folder_name | Anime | The folder name where all the anime will be placed. Do not put a trailing slash. | myanime | +| not_close | false | Whether to keep the application running after completion or not. | true | +| -------------------------- | ------------- | --------------------------------------------------------------------------------------------------------------------------- | ------------------------ | +| SITE | | Contains site-specific configuration options. | | +| streaming_domain | forum | The domain of the streaming site. | express | +| anime_domain | to | The domain of the anime site. | estate | +| -------------------------- | ------------- | --------------------------------------------------------------------------------------------------------------------------- | ------------------------ | +| M3U8 | | Contains options specific to M3U8. | | +| tdqm_workers | 20 | The number of workers that will cooperate to download .ts files.**A high value may slow down your PC** | 40 | +| tqdm_progress_timeout | 10 | The timeout duration for progress display updates in seconds after quit download. | 5 | +| minium_ts_files_in_folder | 15 | The minimum number of .ts files expected in a folder. | 10 | +| donwload_percentage | 1 | The percentage of download completion required to consider the download complete. | 0.95 | +| requests_timeout | 5 | The timeout duration for HTTP requests in seconds. | 10 | +| enable_time_quit | false | Whether to enable quitting the download after a certain time period. | true | +| tqdm_show_progress | false | Whether to show progress during downloads or not.**May slow down your PC** | true | +| cleanup_tmp_folder | true | Whether to clean up temporary folders after processing or not. | false | +| -------------------------- | ------------- | --------------------------------------------------------------------------------------------------------------------------- | ------------------------ | +| M3U8_OPTIONS | | Contains options specific to M3U8 file format. | | +| download_audio | true | Indicates whether audio files should be downloaded or not. | false | +| download_subtitles | true | Indicates whether subtitles should be downloaded or not. | false | +| specific_list_audio | ["ita"] | A list of specific audio languages to download. | ["eng", "fra"] | +| specific_list_subtitles | ["eng"] | A list of specific subtitle languages to download. | ["spa", "por"] | > [!IMPORTANT] > If you're on **Windows** you'll need to use double black slashes. On Linux/MacOS, one slash is fine. diff --git a/Src/Api/Class/EpisodeType.py b/Src/Api/Class/EpisodeType.py new file mode 100644 index 00000000..2b69a3ab --- /dev/null +++ b/Src/Api/Class/EpisodeType.py @@ -0,0 +1,60 @@ +# 03.03.24 + +from typing import Dict, Any, List + +class Episode: + def __init__(self, data: Dict[str, Any]): + """ + Initialize an Episode object. + + Args: + data (Dict[str, Any]): A dictionary containing data for the episode. + """ + self.id: int = data.get('id', '') + self.number: int = data.get('number', '') + self.name: str = data.get('name', '') + self.plot: str = data.get('plot', '') + self.duration: int = data.get('duration', '') + self.scws_id: int = data.get('scws_id', '') + self.season_id: int = data.get('season_id', '') + self.created_by: str = data.get('created_by', '') + self.created_at: str = data.get('created_at', '') + self.updated_at: str = data.get('updated_at', '') + +class EpisodeManager: + def __init__(self): + """ + Initialize an EpisodeManager object. + """ + self.episodes: List[Episode] = [] + + def add_episode(self, episode_data: Dict[str, Any]): + """ + Add a new episode to the manager. + + Args: + episode_data (Dict[str, Any]): A dictionary containing data for the new episode. + """ + episode = Episode(episode_data) + self.episodes.append(episode) + + def get_episode_by_index(self, index: int) -> Episode: + """ + Get an episode by its index. + + Args: + index (int): Index of the episode to retrieve. + + Returns: + Episode: The episode object. + """ + return self.episodes[index] + + def get_length(self) -> int: + """ + Get the number of episodes in the manager. + + Returns: + int: Number of episodes. + """ + return len(self.episodes) diff --git a/Src/Api/Class/SearchType.py b/Src/Api/Class/SearchType.py new file mode 100644 index 00000000..8aaecc86 --- /dev/null +++ b/Src/Api/Class/SearchType.py @@ -0,0 +1,80 @@ +# 03.03.24 + +# Import +from typing import List + +class Image: + def __init__(self, data: dict): + """ + Initialize an Image object. + + Args: + data (dict): Data for initializing the Image. + """ + self.imageable_id: int = data.get('imageable_id') + self.imageable_type: str = data.get('imageable_type') + self.filename: str = data.get('filename') + self.type: str = data.get('type') + self.original_url_field: str = data.get('original_url_field') + + +class MediaItem: + def __init__(self, data: dict): + """ + Initialize a MediaItem object. + + Args: + data (dict): Data for initializing the MediaItem. + """ + self.id: int = data.get('id') + self.slug: str = data.get('slug') + self.name: str = data.get('name') + self.type: str = data.get('type') + self.score: str = data.get('score') + self.sub_ita: int = data.get('sub_ita') + self.last_air_date: str = data.get('last_air_date') + self.seasons_count: int = data.get('seasons_count') + + # Create Image objects for each image in the data + self.images: List[Image] = [Image(image_data) for image_data in data.get('images', [])] + self.comment: str = "" # Initialize comment as an empty string + + +class MediaManager: + def __init__(self): + """ + Initialize a MediaManager object. + """ + self.media_list: List[MediaItem] = [] + + def add_media(self, data: dict) -> None: + """ + Add media to the list. + + Args: + data (dict): Media data to add. + """ + self.media_list.append(MediaItem(data)) + + def get(self, index: int) -> MediaItem: + """ + Get a media item from the list by index. + + Args: + index (int): The index of the media item to retrieve. + + Returns: + MediaItem: The media item at the specified index. + """ + return self.media_list[index] + + def get_length(self) -> int: + """ + Get the number of media find with research + + Returns: + int: Number of episodes. + """ + return len(self.media_list) + + diff --git a/Src/Api/Class/SeriesType.py b/Src/Api/Class/SeriesType.py new file mode 100644 index 00000000..c242e6f6 --- /dev/null +++ b/Src/Api/Class/SeriesType.py @@ -0,0 +1,62 @@ +# 03.03.24 + +from typing import List, Dict, Union + +class Title: + def __init__(self, title_data: Dict[str, Union[int, str, None]]): + """ + Initialize a Title object. + + Args: + title_data (Dict[str, Union[int, str, None]]): A dictionary containing data for the title. + """ + self.id: int = title_data.get('id') + self.number: int = title_data.get('number') + self.name: str = title_data.get('name') + self.plot: str = title_data.get('plot') + self.release_date: str = title_data.get('release_date') + self.title_id: int = title_data.get('title_id') + self.created_at: str = title_data.get('created_at') + self.updated_at: str = title_data.get('updated_at') + self.episodes_count: int = title_data.get('episodes_count') + +class TitleManager: + def __init__(self): + """ + Initialize a TitleManager object. + + Args: + titles (List[Title]): A list of Title objects. Defaults to an empty list. + """ + self.titles: List[Title] = [] + + def add_title(self, title_data: Dict[str, Union[int, str, None]]): + """ + Add a new title to the manager. + + Args: + title_data (Dict[str, Union[int, str, None]]): A dictionary containing data for the new title. + """ + title = Title(title_data) + self.titles.append(title) + + def get_title_by_index(self, index: int) -> Title: + """ + Get a title by its index. + + Args: + index (int): Index of the title to retrieve. + + Returns: + Title: The title object. + """ + return self.titles[index] + + def get_length(self) -> int: + """ + Get the number of titles in the manager. + + Returns: + int: Number of titles. + """ + return len(self.titles) \ No newline at end of file diff --git a/Src/Api/Class/Video.py b/Src/Api/Class/Video.py new file mode 100644 index 00000000..22c58c02 --- /dev/null +++ b/Src/Api/Class/Video.py @@ -0,0 +1,326 @@ +# 01.03.24 + +# Class import +from Src.Util.headers import get_headers +from .SeriesType import TitleManager +from .EpisodeType import EpisodeManager +from .WindowType import WindowVideo, WindowParameter + +# Import +import requests +import re +import json +import binascii +import logging +import sys +from bs4 import BeautifulSoup +from urllib.parse import urljoin, urlencode + + +class VideoSource: + def __init__(self): + """ + Initialize a VideoSource object. + """ + self.headers: dict[str, str] = { + 'user-agent': get_headers() + } + self.is_series: bool = False + + def set_version(self, version: str) -> None: + """ + Set the version. + + Args: + version (str): The version to set. + """ + self.version = version + + def set_domain(self, domain: str) -> None: + """ + Set the domain. + + Args: + domain (str): The domain to set. + """ + self.domain = domain + + def set_url_base_name(self, base_name: str) -> None: + """ + Set the base url of the site. + + Args: + domain (str): The url of the site to set. + """ + self.base_name = base_name + + def set_media_id(self, media_id: str) -> None: + """ + Set the media ID. + + Args: + media_id (str): The media ID to set. + """ + self.media_id = media_id + + def set_series_name(self, series_name: str) -> None: + """ + Set the series name. + + Args: + series_name (str): The series name to set. + """ + self.is_series: bool = True + self.series_name: str = series_name + self.obj_title_manager: TitleManager = TitleManager() + self.obj_episode_manager: EpisodeManager = EpisodeManager() + + def collect_info_seasons(self) -> None: + """ + Collect information about seasons. + """ + self.headers = { + 'user-agent': get_headers(), + 'x-inertia': 'true', + 'x-inertia-version': self.version, + } + + try: + + # Make a request to collect information about seasons + response = requests.get(f"https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}", headers=self.headers) + response.raise_for_status() # Raise exception for non-200 status codes + + if response.ok: + + # Extract JSON response if available + json_response = response.json().get('props', {}).get('title', {}).get('seasons', []) + + # Iterate over JSON data and add titles to the manager + for dict_season in json_response: + self.obj_title_manager.add_title(dict_season) + + except Exception as e: + logging.error(f"Error collecting season info: {e}") + sys.exit(0) + + def collect_title_season(self, number_season: int) -> None: + """ + Collect information about a specific season. + + Args: + number_season (int): The season number. + """ + try: + + # Make a request to collect information about a specific season + response = requests.get(f'https://{self.base_name}.{self.domain}/titles/{self.media_id}-{self.series_name}/stagione-{number_season}', headers=self.headers) + response.raise_for_status() # Raise exception for non-200 status codes + + if response.ok: + + # Extract JSON response if available + json_response = response.json().get('props', {}).get('loadedSeason', {}).get('episodes', []) + + # Iterate over JSON data and add episodes to the manager + for dict_episode in json_response: + self.obj_episode_manager.add_episode(dict_episode) + + except Exception as e: + logging.error(f"Error collecting title season info: {e}") + sys.exit(0) + + def get_iframe(self, episode_id: str = None) -> None: + """ + Get iframe source. + + Args: + episode_id (str): The episode ID, present only for series + """ + + params = {} + + if self.is_series: + params = { + 'episode_id': episode_id, + 'next_episode': '1' + } + + try: + + # Make a request to get iframe source + response = requests.get(f"https://{self.base_name}.{self.domain}/iframe/{self.media_id}", params=params) + response.raise_for_status() # Raise exception for non-200 status codes + + if response.ok: + + # Parse response with BeautifulSoup to get iframe source + soup = BeautifulSoup(response.text, "html.parser") + self.iframe_src: str = soup.find("iframe").get("src") + + except Exception as e: + logging.error(f"Error getting iframe source: {e}") + sys.exit(0) + + def parse_script(self, script_text: str) -> None: + """ + Parse script text. + + Args: + script_text (str): The script text to parse. + """ + try: + + # Extract window video and parameter information from script text + str_window_video = re.search(r"window.video = {.*}", str(script_text)).group() + str_window_parameter = re.search(r"params: {[\s\S]*}", str(script_text)).group() + + # Fix windos and video parameter + str_window_video = str_window_video.split(" = ")[1] + str_window_parameter = str(str_window_parameter.replace("\n", "").replace(" ", "").split(",},")[0] + "}").split("params: ")[1] + + # Create window video and parameter objects + self.window_video = WindowVideo(data = json.loads(str_window_video.replace("'", '"'))) + self.window_parameter = WindowParameter(data = json.loads(str_window_parameter.replace("'", '"'))) + + except Exception as e: + logging.error(f"Error parsing script: {e}") + sys.exit(0) + + def get_content(self) -> None: + """ + Get content. + """ + try: + + # Check if iframe source is available + if self.iframe_src is not None: + + # Make a request to get content + response = requests.get(self.iframe_src, headers=self.headers) + response.raise_for_status() # Raise exception for non-200 status codes + + if response.ok: + + # Parse response with BeautifulSoup to get content + soup = BeautifulSoup(response.text, "html.parser") + script = soup.find("body").find("script").text + + # Parse script to get video information + self.parse_script(script_text=script) + + except Exception as e: + logging.error(f"Error getting content: {e}") + sys.exit(0) + + def get_playlist(self) -> str: + """ + Get playlist. + + Returns: + str: The playlist URL, or None if there's an error. + """ + try: + + # Generate playlist URL + query = urlencode(list(self.window_parameter.data.items())) + base_url = f'https://vixcloud.co/playlist/{self.window_video.id}' + + full_url = urljoin(base_url, '?' + query) + + return full_url + + except AttributeError as e: + logging.error(f"Error getting playlist: {e}") + sys.exit(0) + + def get_key(self) -> str: + """ + Get key. + + Returns: + str: The key content, or None if there's an error. + """ + try: + + # Set referer header for the request + self.headers['referer'] = f'https://vixcloud.co/embed/{self.window_video.id}?token={self.window_parameter.token}&title={self.window_video.name.replace(" ", "+")}&referer=1&expires={self.window_parameter.expires}&canPlayFHD=1' + + # Make a request to get key content + response = requests.get('https://vixcloud.co/storage/enc.key', headers=self.headers) + response.raise_for_status() # Raise exception for non-200 status codes + + if response.ok: + + # Convert key content to hexadecimal format + hex_content = binascii.hexlify(response.content).decode('utf-8') + return hex_content + + except Exception as e: + logging.error(f"Error getting key: {e}") + sys.exit(0) + +class VideoSourceAnime(VideoSource): + """ + A class representing a video source for anime content. + Inherits from VideoSource class. + """ + + def __init__(self) -> None: + super().__init__() + # MEDIA ID IS THE INDEX OF EPISODE + + def collect_episode_info(self) -> None: + """ + Collects information about the episode. + """ + try: + if self.media_id is None: + raise ValueError("Media ID is not set.") + + params = { + 'start_range': self.media_id, + 'end_range': self.media_id + 1 + } + + # series_name is the index of season in this case, index is the index of episode + response = requests.get(f'https://www.{self.base_name}.{self.domain}/info_api/{self.series_name}/{self.media_id}', params=params) + if not response.ok: + return None + + response.raise_for_status() + + # Get last episode in json request + json_response = response.json()['episodes'][-1] + + # Add in array of episode ( only one is store ) + self.obj_episode_manager.add_episode(json_response) + + except Exception as e: + logging.error(f"An error occurred while collecting episode info: {e}") + raise + + def get_embed(self) ->str: + """ + Retrieves the embed URL for the episode. + """ + try: + if not self.obj_episode_manager.episodes: + raise ValueError("No episodes available.") + + # Make request to get vixcloud embed url + embed_url_response = requests.get(f'https:///www.{self.base_name}.{self.domain}/embed-url/{self.obj_episode_manager.episodes[0].id}') + if not embed_url_response.ok: + return None + + embed_url_response.raise_for_status() + + # Make request to embed url to get video paramter text + embed_url = requests.get(embed_url_response.text).text + + # Parse script to get video information + self.parse_script(script_text=embed_url) + + except Exception as e: + logging.error(f"An error occurred while getting embed URL: {e}") + raise \ No newline at end of file diff --git a/Src/Api/Class/WindowType.py b/Src/Api/Class/WindowType.py new file mode 100644 index 00000000..61cd496d --- /dev/null +++ b/Src/Api/Class/WindowType.py @@ -0,0 +1,42 @@ +# 03.03.24 + +from typing import Dict, Any + +class WindowVideo: + def __init__(self, data: Dict[str, Any]): + """ + Initialize a WindowVideo object. + + Args: + data (dict): A dictionary containing data for the video. + """ + self.data = data + self.id: int = data.get('id', '') + self.name: str = data.get('name', '') + self.filename: str = data.get('filename', '') + self.size: str = data.get('size', '') + self.quality: str = data.get('quality', '') + self.duration: str = data.get('duration', '') + self.views: int = data.get('views', '') + self.is_viewable: bool = data.get('is_viewable', '') + self.status: str = data.get('status', '') + self.fps: float = data.get('fps', '') + self.legacy: bool = data.get('legacy', '') + self.folder_id: int = data.get('folder_id', '') + self.created_at_diff: str = data.get('created_at_diff', '') + +class WindowParameter: + def __init__(self, data: Dict[str, Any]): + """ + Initialize a WindowParameter object. + + Args: + data (dict): A dictionary containing parameters for the window. + """ + self.data = data + self.token: str = data.get('token', '') + self.token360p: str = data.get('token360p', '') + self.token480p: str = data.get('token480p', '') + self.token720p: str = data.get('token720p', '') + self.token1080p: str = data.get('token1080p', '') + self.expires: str = data.get('expires', '') diff --git a/Src/Api/Class/__init__.py b/Src/Api/Class/__init__.py new file mode 100644 index 00000000..9a33dc7a --- /dev/null +++ b/Src/Api/Class/__init__.py @@ -0,0 +1,4 @@ +# 03.03.24 + +from .Video import VideoSource +from .SearchType import MediaManager, MediaItem \ No newline at end of file diff --git a/Src/Api/__init__.py b/Src/Api/__init__.py new file mode 100644 index 00000000..769d1a29 --- /dev/null +++ b/Src/Api/__init__.py @@ -0,0 +1,6 @@ +# 03.03.24 + +from .film import download_film +from .series import download_series +from .site import get_version_and_domain, search, anime_search, get_select_title +from .anime import anime_download_film, anime_download_series \ No newline at end of file diff --git a/Src/Api/anime.py b/Src/Api/anime.py new file mode 100644 index 00000000..3ab2611c --- /dev/null +++ b/Src/Api/anime.py @@ -0,0 +1,244 @@ +# 11.03.24 + +# Class import +from Src.Util.console import console, msg +from Src.Util.config import config_manager +from Src.Lib.FFmpeg.my_m3u8 import Downloader +from Src.Util.message import start_message +from .Class import VideoSource + +# General import +import os +import logging +import requests + +# Config +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +ANIME_FOLDER = config_manager.get('DEFAULT', 'anime_folder_name') +MOVIE_FOLDER = config_manager.get('DEFAULT', 'movies_folder_name') +SERIES_FOLDER = config_manager.get('DEFAULT', 'series_folder_name') +URL_SITE_NAME = config_manager.get('SITE', 'anime_site_name') +SITE_DOMAIN = config_manager.get('SITE', 'anime_domain') + +# Variable +video_source = VideoSource() + + +class EpisodeDownloader: + def __init__(self, tv_id: int, tv_name: str, is_series: bool = True): + """ + Initialize EpisodeDownloader class. + + Args: + tv_id (int): The ID of the TV show. + tv_name (str): The name of series or for film the name of film + """ + self.tv_id = tv_id + self.tv_name = tv_name + self.is_series = is_series + + @staticmethod + def manage_selection(cmd_insert: str, max_count: int): + + list_season_select = [] + + # For a single number (e.g., '5') + if cmd_insert.isnumeric(): + list_season_select.append(int(cmd_insert)) + + # For a range (e.g., '[5-12]') + elif "[" in cmd_insert: + start, end = map(int, cmd_insert[1:-1].split("-")) + list_season_select = list(range(start, end + 1)) + + # For all seasons + elif cmd_insert == "*": + list_season_select = list(range(1, max_count + 1)) + + # Return list of selected seasons + return list_season_select + + def get_count_episodes(self): + + try: + + # Send a GET request to fetch information about the TV show + response = requests.get( + f"https://www.{URL_SITE_NAME}.{SITE_DOMAIN}/info_api/{self.tv_id}/" + ) + + # Raise an exception for bad status codes + response.raise_for_status() + + # Extract the count of episodes from the JSON response + return response.json()["episodes_count"] + + except Exception as e: + logging.error(f"(EpisodeDownloader) Error fetching episode count: {e}") + return 0 + + def get_info_episode(self, index_ep: int): + + try: + + # Send a GET request to fetch information about the specific episode + params = {"start_range": index_ep, "end_range": index_ep + 1} + + response = requests.get( + f"https://www.{URL_SITE_NAME}.{SITE_DOMAIN}/info_api/{self.tv_id}/{index_ep}", + params=params, + ) + + # Raise an exception for bad status codes + response.raise_for_status() + + # Extract episode information from the JSON response + return response.json()["episodes"][-1] + + except Exception as e: + logging.error( + f"(EpisodeDownloader) Error fetching episode information: {e}" + ) + return None + + def get_embed(self, episode_id: int): + + try: + # Send a GET request to fetch the embed URL for the episode + response = requests.get( + f"https://www.{URL_SITE_NAME}.{SITE_DOMAIN}/embed-url/{episode_id}" + ) + + # Raise an exception for bad status codes + response.raise_for_status() + + # Extract the embed URL from the response + embed_url = response.text.strip() + + # Validate the embed URL + if not embed_url.startswith("http"): + raise ValueError("Invalid embed URL") + + # Fetch the actual video URL using the embed URL + video_response = requests.get(embed_url) + + # Raise an exception for bad status codes + video_response.raise_for_status() + + # Return the video URL + return video_response.text + + except Exception as e: + logging.error(f"(EpisodeDownloader) Error fetching embed URL: {e}") + return None + + def download_episode(self, index_select): + + # Get information about the selected episode + info_ep_select = self.get_info_episode(index_select) + + if not info_ep_select: + logging.error("(EpisodeDownloader) Error getting episode information.") + return + + # Extract the ID of the selected episode + episode_id = info_ep_select.get("id") + + start_message() + console.print(f"[yellow]Download: [red]{episode_id} \n") + + # Get the embed URL for the episode + embed_url = self.get_embed(episode_id) + + if not embed_url: + logging.error("Error getting embed URL.") + return + + # Parse parameter in embed text + video_source.parse_script(embed_url) + + # Download the film using the m3u8 playlist, key, and output filename + try: + + if self.is_series: + + obj_download = Downloader( + m3u8_playlist=video_source.get_playlist(), + key=video_source.get_key(), + output_filename=os.path.join( + ROOT_PATH, + ANIME_FOLDER, + SERIES_FOLDER, + self.tv_name, + f"{index_select}.mp4", + ), + ) + + else: + + obj_download = Downloader( + m3u8_playlist=video_source.get_playlist(), + key=video_source.get_key(), + output_filename=os.path.join( + ROOT_PATH, + ANIME_FOLDER, + MOVIE_FOLDER, + f"{self.tv_name}.mp4" + ), + ) + + obj_download.download_m3u8() + + except Exception as e: + logging.error(f"(EpisodeDownloader) Error downloading film: {e}") + + + +# ONLY SERIES +def anime_download_series(tv_id: int, tv_name: str): + """ + Function to download episodes of a TV series. + + Args: + - tv_id (int): The ID of the TV series. + - tv_name (str): The name of the TV series. + """ + + # Get the count of episodes for the TV series + episodes_downloader = EpisodeDownloader(tv_id, tv_name) + episoded_count = episodes_downloader.get_count_episodes() + + console.log(f"[cyan]Episodes find: [red]{episoded_count}") + + # Prompt user to select an episode index + last_command = msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media") + + # Manage user selection + list_episode_select = EpisodeDownloader.manage_selection(last_command, episoded_count) + + # Download selected episodes + if len(list_episode_select) == 1 and last_command != "*": + episodes_downloader.download_episode(list_episode_select[0]) + + # Download all other episodes selecter + else: + for i_episode in list_episode_select: + episodes_downloader.download_episode(i_episode) + + +# ONLY FILM +def anime_download_film(id_film: int, title_name: str): + """ + Function to download a film. + + Args: + - id_film (int): The ID of the film. + - title_name (str): The title of the film. + """ + + # Placeholder function for downloading a film + episodes_downloader = EpisodeDownloader(id_film, title_name, is_series=False) + + # Extract the ID of the selected episode and download + episodes_downloader.download_episode(0) + diff --git a/Src/Api/film.py b/Src/Api/film.py index bc6fa8fc..2361ff71 100644 --- a/Src/Api/film.py +++ b/Src/Api/film.py @@ -1,134 +1,62 @@ # 3.12.23 -> 10.12.23 -# General import -import binascii -import json -import os -import re -import requests -import sys -from bs4 import BeautifulSoup - - # Class import -from Src.Lib.FFmpeg.my_m3u8 import download_m3u8 -from Src.Lib.FFmpeg.util import audio_extractor_m3u8 -from Src.Util.config import config from Src.Util.console import console -from Src.Util.headers import get_headers - - - -def get_iframe(id_title, domain): - req = requests.get(url=f"https://streamingcommunity.{domain}/iframe/{id_title}", headers={ - "User-agent": get_headers() - }) - - if req.ok: - url_embed = BeautifulSoup(req.text, "lxml").find("iframe").get("src") - req_embed = requests.get(url_embed, headers={"User-agent": get_headers()}).text - - try: - return BeautifulSoup(req_embed, "lxml").find("body").find("script").text - except Exception: - console.log("[red]Couldn't play this video file (video not available)") - sys.exit(0) - - else: - console.log(f"[red]Error: {req.status_code}") - sys.exit(0) - - -def select_quality(json_win_param): - if json_win_param['token1080p']: - return "1080p" - elif json_win_param['token720p']: - return "720p" - elif json_win_param['token480p']: - return "480p" - else: - return "360p" - - -def parse_content(embed_content): - # Parse parameter from req embed content - win_video = re.search(r"window.video = {.*}", str(embed_content)).group() - win_param = re.search(r"params: {[\s\S]*}", str(embed_content)).group() - - # Parse parameter to make read for json - json_win_video = "{" + win_video.split("{")[1].split("}")[0] + "}" - json_win_param = ("{" + win_param.split("{")[1] - .split("}")[0] - .replace("\n", "") - .replace(" ", "") + "}") - json_win_param = json_win_param.replace(",}", "}").replace("'", '"') - return json.loads(json_win_video), json.loads(json_win_param), select_quality(json.loads(json_win_param)) +from Src.Util.config import config_manager +from Src.Lib.FFmpeg.my_m3u8 import Downloader +from Src.Util.message import start_message +from .Class import VideoSource - -def get_m3u8_url(json_win_video, json_win_param, render_quality): - token_render = f"token{render_quality}" - return f"https://vixcloud.co/playlist/{json_win_video['id']}?type=video&rendition={render_quality}&token={json_win_param[token_render]}&expires={json_win_param['expires']}" - -def get_playlist(json_win_video, json_win_param, render_quality): - token_render = f"token{render_quality}" - return f"https://vixcloud.co/playlist/{json_win_video['id']}?token={json_win_param['token']}&{token_render}={json_win_param[token_render]}&expires={json_win_param['expires']}" - -def get_m3u8_key(): - response = requests.get('https://vixcloud.co/storage/enc.key') - - if response.ok: - return binascii.hexlify(response.content).decode('utf-8') - else: - console.log(f"[red]Error: {response.status_code}") - sys.exit(0) - - -def get_m3u8_audio(json_win_video, json_win_param, title_name, token_render): - req = requests.get( - f'https://vixcloud.co/playlist/{json_win_video["id"]}', - params={'token': json_win_param['token'], - 'expires': json_win_param["expires"]}, - headers={ - 'referer': - f'https://vixcloud.co/embed/{json_win_video["id"]}?token={json_win_param[token_render]}&title={title_name}&referer=1&expires={json_win_param["expires"]}' - }) - - if req.ok: - result = audio_extractor_m3u8(req) - return result - else: - console.log(f"[red]Error: {req.status_code}") - sys.exit(0) - - -# [func \ main] -def main_dw_film(id_film, title_name, domain): - embed_content = get_iframe(id_film, domain) - json_win_video, json_win_param, render_quality = parse_content(embed_content) - - token_render = f"token{render_quality}" - console.print(f"[blue]Selected quality => [red]{render_quality}") - - m3u8_url = get_m3u8_url(json_win_video, json_win_param, render_quality) - m3u8_key = get_m3u8_key() - m3u8_playlist = get_playlist(json_win_video, json_win_param, render_quality) - - mp4_name = title_name.replace("+", " ").replace(",", "").replace("-", "_") +# General import +import os +import logging + +# Config +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +MOVIE_FOLDER = config_manager.get('DEFAULT', 'movies_folder_name') +STREAM_SITE_NAME = config_manager.get('SITE', 'streaming_site_name') + +# Variable +video_source = VideoSource() +video_source.set_url_base_name(STREAM_SITE_NAME) + + +def download_film(id_film: str, title_name: str, domain: str): + """ + Downloads a film using the provided film ID, title name, and domain. + + Args: + id_film (str): The ID of the film. + title_name (str): The name of the film title. + domain (str): The domain of the site + """ + + # Start message and display film information + start_message() + console.print(f"[yellow]Download: [red]{title_name} \n") + + # Set domain and media ID for the video source + video_source.set_domain(domain) + video_source.set_media_id(id_film) + + # Retrieve the iframe and content for the video source + video_source.get_iframe() + video_source.get_content() + + # Define the filename and path for the downloaded film + mp4_name = title_name.replace("-", "_") mp4_format = mp4_name + ".mp4" - mp4_path = os.path.join(config['root_path'], config['movies_folder_name'], mp4_name, mp4_format) - m3u8_url_audio = get_m3u8_audio(json_win_video, json_win_param, title_name, token_render) + # Download the film using the m3u8 playlist, key, and output filename + try: + obj_download = Downloader( + m3u8_playlist = video_source.get_playlist(), + key = video_source.get_key(), + output_filename = os.path.join(ROOT_PATH, MOVIE_FOLDER, title_name, mp4_format) + ) - if m3u8_url_audio is not None: - console.print("[blue]Using m3u8 audio => [red]True") - subtitle_path = os.path.join(config['root_path'], config['movies_folder_name'], mp4_name) + obj_download.download_m3u8() - download_m3u8( - m3u8_index = m3u8_url, - m3u8_audio = m3u8_url_audio, - m3u8_subtitle = m3u8_playlist, - key = m3u8_key, - output_filename = mp4_path, - subtitle_folder = subtitle_path, - content_name = mp4_name - ) + except Exception as e: + logging.error(f"(download_film) Error downloading film: {e}") + pass diff --git a/Src/Api/page.py b/Src/Api/page.py deleted file mode 100644 index 09b29d50..00000000 --- a/Src/Api/page.py +++ /dev/null @@ -1,60 +0,0 @@ -# 10.12.23 - -# Class import -from Src.Util.headers import get_headers -from Src.Util.console import console -from Src.Util.config import config, config_manager - -# General import -import requests, sys, json -from bs4 import BeautifulSoup - - -def domain_version(): - - site_url = f"https://streamingcommunity.{config['domain']}" - domain = None - - try: - requests.get(site_url, headers={'user-agent': get_headers()}) - except: - - domain_req = requests.get("https://api.telegra.ph/getPage/Link-Aggiornato-StreamingCommunity-01-17") - domain = domain_req.json()['result']['description'].split(".")[1] - console.print("[green]Getting rules...") - - console.print(f"[blue]Test domain [white]=> [red]{domain}") - config_manager.update_config('domain', domain) - - if domain != None: - site_url = f"https://streamingcommunity.{domain}" - console.print(f"[blue]Use domain [white]=> [red]{domain}") - else: - domain = config['domain'] - - try: - site_request = requests.get(site_url, headers={'user-agent': get_headers()}) - soup = BeautifulSoup(site_request.text, "lxml") - version = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['version'] - console.print(f"[blue]Rules [white]=> [red].{domain}") - - return domain, version - - except Exception as e: - console.log("[red]Couldn't get the version, there's a problem with the domain. Try again." , e) - sys.exit(0) - -def search(title_search, domain): - req = requests.get(f"https://streamingcommunity.{domain}/api/search?q={title_search}", headers={'user-agent': get_headers()}) - - if req.ok: - return [{'name': title['name'], 'type': title['type'], 'id': title['id'], 'slug': title['slug']} for title in - req.json()['data']][0:21] - else: - console.log(f"[red]Error: {req.status_code}") - sys.exit(0) - - -def display_search_results(db_title): - for i, title in enumerate(db_title): - console.print(f"[yellow]{i} [white]-> [green]{title['name']} [white]- [cyan]{title['type']}") diff --git a/Src/Api/series.py b/Src/Api/series.py new file mode 100644 index 00000000..eec18a9e --- /dev/null +++ b/Src/Api/series.py @@ -0,0 +1,209 @@ +# 3.12.23 -> 10.12.23 + +# Class import +from Src.Util.console import console, msg +from Src.Util.config import config_manager +from Src.Util.table import TVShowManager +from Src.Util.message import start_message +from Src.Lib.FFmpeg.my_m3u8 import Downloader +from .Class import VideoSource + +# General import +import os +import logging +import sys + +# Config +ROOT_PATH = config_manager.get('DEFAULT', 'root_path') +SERIES_FOLDER = config_manager.get('DEFAULT', 'series_folder_name') +STREAM_SITE_NAME = config_manager.get('SITE', 'streaming_site_name') + +# Variable +video_source = VideoSource() +video_source.set_url_base_name(STREAM_SITE_NAME) +table_show_manager = TVShowManager() + + +def manage_selection(cmd_insert: str, max_count: int) -> list[int]: + """ + Manage user selection for seasons to download. + + Args: + cmd_insert (str): User input for season selection. + max_count (int): Maximum count of seasons available. + + Returns: + list_season_select (List[int]): List of selected seasons. + """ + + list_season_select = [] + + # For a single number (e.g., '5') + if cmd_insert.isnumeric(): + list_season_select.append(int(cmd_insert)) + + # For a range (e.g., '[5-12]') + elif "[" in cmd_insert: + start, end = map(int, cmd_insert[1:-1].split('-')) + list_season_select = list(range(start, end + 1)) + + # For all seasons + elif cmd_insert == "*": + list_season_select = list(range(1, max_count+1)) + + # Return list of selected seasons + return list_season_select + +def display_episodes_list() -> str: + """ + Display episodes list and handle user input. + + Returns: + last_command (str): Last command entered by the user. + """ + + # Set up table for displaying episodes + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + "Duration": {'color': 'green'} + } + table_show_manager.add_column(column_info) + + # Populate the table with episodes information + for i, media in enumerate(video_source.obj_episode_manager.episodes): + table_show_manager.add_tv_show({ + 'Index': str(media.number), + 'Name': media.name, + 'Duration': str(media.duration) + }) + + # Run the table and handle user input + last_command = table_show_manager.run() + + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + return last_command + +def donwload_video(tv_name: str, index_season_selected: int, index_episode_selected: int) -> None: + """ + Download a single episode video. + + Args: + tv_name (str): Name of the TV series. + index_season_selected (int): Index of the selected season. + index_episode_selected (int): Index of the selected episode. + """ + + # Start message and display episode information + start_message() + console.print(f"[yellow]Download: [red]{video_source.obj_episode_manager.episodes[index_episode_selected - 1].name} \n") + episode_id = video_source.obj_episode_manager.episodes[index_episode_selected - 1].id + + # Define filename and path for the downloaded video + mp4_name = f"{index_episode_selected}.mp4" + mp4_path = os.path.join(ROOT_PATH, SERIES_FOLDER, tv_name, f"S{index_season_selected}", f"E{index_episode_selected}") + os.makedirs(mp4_path, exist_ok=True) + + # Get iframe and content for the episode + video_source.get_iframe(episode_id) + video_source.get_content() + video_source.set_url_base_name(STREAM_SITE_NAME) + + # Download the episode + try: + obj_download = Downloader( + m3u8_playlist = video_source.get_playlist(), + key = video_source.get_key(), + output_filename = os.path.join(mp4_path, mp4_name) + ) + + obj_download.download_m3u8() + + except Exception as e: + logging.error(f"(donwload_video) Error downloading film: {e}") + pass + +def donwload_episode(tv_name: str, index_season_selected: int, donwload_all: bool = False) -> None: + """ + Download all episodes of a season. + + Args: + tv_name (str): Name of the TV series. + index_season_selected (int): Index of the selected season. + donwload_all (bool): Donwload all seasons episodes + """ + + # Start message and collect information about episodes + start_message() + video_source.collect_title_season(index_season_selected) + episodes_count = video_source.obj_episode_manager.get_length() + + # Download all episodes wihtout ask + if donwload_all: + for i_episode in range(0, episodes_count): + donwload_video(tv_name, index_season_selected, i_episode) + + # Exit + console.print("\n[red]Done") + sys.exit(0) + + # Display episodes list and manage user selection + last_command = display_episodes_list() + list_episode_select = manage_selection(last_command, episodes_count) + + # Download selected episodes + if len(list_episode_select) == 1 and last_command != "*": + donwload_video(tv_name, index_season_selected, list_episode_select[0]) + + # Download all other episodes selecter + else: + for i_episode in list_episode_select: + donwload_video(tv_name, index_season_selected, i_episode) + +def download_series(tv_id: str, tv_name: str, version: str, domain: str) -> None: + """ + Download all episodes of a TV series. + + Args: + tv_id (str): ID of the TV series. + tv_name (str): Name of the TV series. + version (str): Version of the TV series. + domain (str): Domain from which to download. + """ + + # Start message and set up video source + start_message() + video_source.set_version(version) + video_source.set_domain(domain) + video_source.set_series_name(tv_name) + video_source.set_media_id(tv_id) + + # Collect information about seasons + video_source.collect_info_seasons() + seasons_count = video_source.obj_title_manager.get_length() + + # Prompt user for season selection and download episodes + console.print(f"\n[green]Season find: [red]{seasons_count}") + index_season_selected = str(msg.ask("\n[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media")) + list_season_select = manage_selection(index_season_selected, seasons_count) + + # Download selected episodes + if len(list_season_select) == 1 and index_season_selected != "*": + if 1 <= int(index_season_selected) <= seasons_count: + donwload_episode(tv_name, list_season_select[0]) + + # Dowload all seasons and episodes + elif index_season_selected == "*": + for i_season in list_season_select: + donwload_episode(tv_name, i_season, True) + + # Download all other season selecter + else: + for i_season in list_season_select: + donwload_episode(tv_name, i_season) diff --git a/Src/Api/site.py b/Src/Api/site.py new file mode 100644 index 00000000..201d6e28 --- /dev/null +++ b/Src/Api/site.py @@ -0,0 +1,369 @@ +# 10.12.23 + +# Class import +from Src.Util.table import TVShowManager +from Src.Util.headers import get_headers +from Src.Util.console import console +from Src.Util.config import config_manager +from .Class import MediaManager, MediaItem + +# General import +import sys +import json +import logging +import requests +from bs4 import BeautifulSoup + +# Config +GET_TITLES_OF_MOMENT = config_manager.get_bool('DEFAULT', 'get_moment_title') + +# Variable +media_search_manager = MediaManager() +table_show_manager = TVShowManager() + +def get_token(site_name: str, domain: str) -> dict: + """ + Function to retrieve session tokens from a specified website. + + Args: + - site_name (str): The name of the site. + - domain (str): The domain of the site. + + Returns: + - dict: A dictionary containing session tokens. + The keys are 'XSRF_TOKEN', 'animeunity_session', and 'csrf_token'. + """ + + # Create a session object to handle the HTTP request and response + session = requests.Session() + + # Send a GET request to the specified URL composed of the site name and domain + response = session.get(f"https://www.{site_name}.{domain}") + + # Initialize variables to store CSRF token + find_csrf_token = None + + # Parse the HTML response using BeautifulSoup + soup = BeautifulSoup(response.text, "lxml") + + # Loop through all meta tags in the HTML response + for html_meta in soup.find_all("meta"): + + # Check if the meta tag has a 'name' attribute equal to "csrf-token" + if html_meta.get('name') == "csrf-token": + + # If found, retrieve the content of the meta tag, which is the CSRF token + find_csrf_token = html_meta.get('content') + + return { + 'XSRF_TOKEN': session.cookies['XSRF-TOKEN'], + 'animeunity_session': session.cookies['animeunity_session'], + 'csrf_token': find_csrf_token + } + +def get_moment_titles(domain: str, version: str, prefix: str): + """ + Retrieves the title name from a specified domain using the provided version and prefix. + + Args: + - domain (str): The domain of the site + - version (str): The version of the site + - prefix (str): The prefix used for retrieval. [film or serie-tv or "" for site] + + Returns: + - str or None: The title name if retrieval is successful, otherwise None. + """ + try: + + headers = { + 'x-inertia': 'true', + 'x-inertia-version': version, + 'user-agent': get_headers() + } + + response = requests.get(f'https://streamingcommunity.{domain}/{prefix}', headers=headers) + + + if response.ok: + + # Extract title name + title_name = response.json()['props']['title']['name'] + + # Return + return title_name + + else: + logging.error("Failed to retrieve data. Status code: %d", response.status_code) + return None + + except Exception as e: + logging.error("Error occurred: %s", str(e)) + return None + +def get_domain() -> str: + """ + Fetches the domain from a Telegra.ph API response. + + Returns: + str: The domain extracted from the API response. + """ + console.print("[cyan]Make request api [white]...") + + try: + response = requests.get("https://api.telegra.ph/getPage/Link-Aggiornato-StreamingCommunity-01-17", headers={'user-agent': get_headers()}) + console.print(f"[green]Request response [white]=> [red]{response.status_code} \n") + response.raise_for_status() # Raise an error if request fails + + if response.ok: + + domain = response.json()['result']['description'].split(".")[1] + return domain + + except Exception as e: + logging.error(f"Error fetching domain: {e}") + sys.exit(0) + +def test_site(domain: str) -> str: + """ + Tests the availability of a website. + + Args: + domain (str): The domain to test. + + Returns: + str: The response text if successful, otherwise None. + """ + + console.print("[cyan]Make request site [white]...") + site_url = f"https://streamingcommunity.{domain}" + + try: + response = requests.get(site_url, headers={'user-agent': get_headers()}) + console.print(f"[green]Request response [white]=> [red]{response.status_code} \n") + response.raise_for_status() # Raise an error if request fails + + if response.ok: + return response.text + else: + return None + + except Exception as e: + logging.error(f"Error testing site: {e}") + return None + +def get_version(text: str) -> str: + """ + Extracts the version from the HTML text of a webpage. + + Args: + text (str): The HTML text of the webpage. + + Returns: + str: The version extracted from the webpage. + """ + console.print("[cyan]Make request to get version [white]...") + + try: + soup = BeautifulSoup(text, "html.parser") + version = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['version'] + console.print(f"[green]Request response [white]=> [red]200 \n") + + return version + + except Exception as e: + logging.error(f"Error extracting version: {e}") + sys.exit(0) + +def get_version_and_domain() -> tuple[str, str]: + """ + Retrieves the version and domain of a website. + + Returns: + tuple[str, str]: A tuple containing the version and domain. + """ + + try: + config_domain = config_manager.get('SITE', 'streaming_domain') + + response_test_site = test_site(config_domain) + + if response_test_site is None: + config_domain = get_domain() + response_test_site = test_site(config_domain) + + if response_test_site: + version = get_version(response_test_site) + + # Update domain config file + config_manager.set_key('SITE', 'streaming_domain', str(config_domain)) + config_manager.write_config() + + # Get titles in the moment + if GET_TITLES_OF_MOMENT: + console.print("[cyan]Scrape information [white]...") + console.print(f"[green]Title of the moments: [purple]{get_moment_titles(config_domain, version, '')}") + console.print(f"[green]Film of the moments: [purple]{get_moment_titles(config_domain, version, 'film')}") + console.print(f"[green]Serie of the moments: [purple]{get_moment_titles(config_domain, version, 'serie-tv')}") + + return version, config_domain + + else: + print("Can't connect to site") + sys.exit(0) + + except Exception as e: + logging.error(f"Error getting version and domain: {e}") + sys.exit(0) + +def search(title_search: str, domain: str) -> int: + """ + Search for titles based on a search query. + + Args: + title_search (str): The title to search for. + domain (str): The domain to search on. + + Returns: + int: The number of titles found. + """ + + # Send request to search for titles + req = requests.get(f"https://streamingcommunity.{domain}/api/search?q={title_search}", headers={'user-agent': get_headers()}) + + # Add found titles to media search manager + for dict_title in req.json()['data']: + media_search_manager.add_media(dict_title) + + # Return the number of titles found + return media_search_manager.get_length() + +def update_domain_anime(): + """ + Update the domain for anime streaming site. + """ + + # Read actual config + url_site_name = config_manager.get('SITE', 'anime_site_name') + url_domain = config_manager.get('SITE', 'anime_domain') + + # Test actual site + test_response = requests.get(f"https://www.{url_site_name}.{url_domain}") + console.print(f"[green]Request test response [white]=> [red]{test_response.status_code} \n") + + if not test_response.ok: + + # Update streaming domain + version, domain = get_version_and_domain() + + # Extract url + response = requests.get(f"https://streamingcommunity.{domain}/") + soup = BeautifulSoup(response.text, "html.parser") + + # Found the anime site link + new_site_url = None + for html_a in soup.find_all("a"): + if config_manager.get('SITE', 'anime_site_name') in str(html_a.get("href")): + new_site_url = html_a.get("href") + + # Extract the domain from the URL and update the config + config_manager.set_key('SITE', 'anime_domain', new_site_url.split(".")[-1]) + +def anime_search(title_search: str) -> int: + """ + Function to perform an anime search using a provided title. + + Args: + - title_search (str): The title to search for. + + Returns: + - int: A number containing the length of media search manager. + """ + + # Update domain + update_domain_anime() + + # Get token and session value from configuration + url_site_name = config_manager.get('SITE', 'anime_site_name') + url_domain = config_manager.get('SITE', 'anime_domain') + data = get_token(url_site_name, url_domain) + + # Prepare cookies to be used in the request + cookies = { + 'animeunity_session': data.get('animeunity_session') # Use the session token retrieved from data + } + + # Prepare headers for the request + headers = { + 'accept': 'application/json, text/plain, */*', + 'accept-language': 'it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7', + 'content-type': 'application/json;charset=UTF-8', + 'x-csrf-token': data.get('csrf_token') # Use the CSRF token retrieved from data + } + + # Prepare JSON data to be sent in the request + json_data = { + 'title': title_search # Use the provided title for the search + } + + # Send a POST request to the API endpoint for live search + response = requests.post(f'https://www.{url_site_name}.{url_domain}/livesearch', cookies=cookies, headers=headers, json=json_data) + + # Process each record returned in the response + for record in response.json()['records']: + + # Rename keys for consistency + record['name'] = record.pop('title') + record['last_air_date'] = record.pop('date') + + # Add the record to media search manager if the name is not None + if record['name'] is not None: + media_search_manager.add_media(record) + + # Return the length of media search manager + return media_search_manager.get_length() + +def get_select_title() -> MediaItem: + """ + Display a selection of titles and prompt the user to choose one. + + Returns: + MediaItem: The selected media item. + """ + + # Set up table for displaying titles + table_show_manager.set_slice_end(10) + + # Add columns to the table + column_info = { + "Index": {'color': 'red'}, + "Name": {'color': 'magenta'}, + "Type": {'color': 'yellow'}, + "Score": {'color': 'cyan'}, + "Date": {'color': 'green'} + } + table_show_manager.add_column(column_info) + + # Populate the table with title information + for i, media in enumerate(media_search_manager.media_list): + table_show_manager.add_tv_show({ + 'Index': str(i), + 'Name': media.name, + 'Type': media.type, + 'Score': media.score, + 'Date': media.last_air_date + }) + + # Run the table and handle user input + last_command = table_show_manager.run(force_int_input=True, max_int_input=len(media_search_manager.media_list)) + + # Handle user's quit command + if last_command == "q": + console.print("\n[red]Quit [white]...") + sys.exit(0) + + # Check if the selected index is within range + if 0 <= int(last_command) <= len(media_search_manager.media_list): + return media_search_manager.get(int(last_command)) + else: + console.print("\n[red]Wrong index") + sys.exit(0) diff --git a/Src/Api/tv.py b/Src/Api/tv.py deleted file mode 100644 index eda68b22..00000000 --- a/Src/Api/tv.py +++ /dev/null @@ -1,226 +0,0 @@ -# 3.12.23 -> 10.12.23 - -# General import -import binascii -import json -import os -import re -import requests -import sys -import urllib -from bs4 import BeautifulSoup - -# Class import -from Src.Lib.FFmpeg.my_m3u8 import download_m3u8 -from Src.Lib.FFmpeg.util import audio_extractor_m3u8 -from Src.Util.config import config -from Src.Util.console import console, msg -from Src.Util.headers import get_headers - - -def get_info_tv(id_film, title_name, site_version, domain): - req = requests.get(f"https://streamingcommunity.{domain}/titles/{id_film}-{title_name}", headers={ - 'user-agent': get_headers(), - 'X-Inertia': 'true', - 'X-Inertia-Version': site_version, - }) - - if req.ok: - return req.json()['props']['title']['seasons_count'] - else: - console.log(f"[red]Error: {req.status_code}") - sys.exit(0) - - -def get_info_season(tv_id, tv_name, domain, version, n_season): - req = requests.get( - f'https://streamingcommunity.{domain}/titles/{tv_id}-{tv_name}/stagione-{n_season}', - headers={ - 'user-agent': get_headers(), - 'x-inertia': 'true', - 'x-inertia-version': version, - }) - - if req.ok: - return [{'id': ep['id'], 'n': ep['number'], 'name': ep['name'] if ep['name'] is not None else ""} for ep in - req.json()['props']['loadedSeason']['episodes']] - else: - console.log(f"[red]Error: {req.status_code}") - sys.exit(0) - - -def get_iframe(tv_id, ep_id, domain): - req = requests.get(f'https://streamingcommunity.{domain}/iframe/{tv_id}', - params={'episode_id': ep_id, 'next_episode': '1'}, - headers={'user-agent': get_headers()} - ) - - if req.ok: - try: - url_embed = BeautifulSoup(req.text, "lxml").find("iframe").get("src") - req_embed = requests.get(url_embed, headers={"User-agent": get_headers()}).text - return BeautifulSoup(req_embed, "lxml").find("body").find("script").text - except: - console.log("[red]Error with episode. Skipping...") - return None - else: - console.log(f"[red]Error: {req.status_code}") - sys.exit(0) - - -def select_quality(json_win_param): - if json_win_param['token1080p']: - return "1080p" - elif json_win_param['token720p']: - return "720p" - elif json_win_param['token480p']: - return "480p" - else: - return "360p" - - -def parse_content(embed_content): - # Parse parameter from req embed content - win_video = re.search(r"window.video = {.*}", str(embed_content)).group() - win_param = re.search(r"params: {[\s\S]*}", str(embed_content)).group() - - # Parse parameter to make read for json - json_win_video = "{" + win_video.split("{")[1].split("}")[0] + "}" - json_win_param = "{" + win_param.split("{")[1].split("}")[0].replace("\n", "").replace(" ", "") + "}" - json_win_param = json_win_param.replace(",}", "}").replace("'", '"') - return json.loads(json_win_video), json.loads(json_win_param), select_quality(json.loads(json_win_param)) - - -def get_playlist(json_win_video, json_win_param, render_quality): - token_render = f"token{render_quality}" - return f"https://vixcloud.co/playlist/{json_win_video['id']}?token={json_win_param['token']}&{token_render}={json_win_param[token_render]}&expires={json_win_param['expires']}" - - -def get_m3u8_url(json_win_video, json_win_param, render_quality): - token_render = f"token{render_quality}" - return f"https://vixcloud.co/playlist/{json_win_video['id']}?type=video&rendition={render_quality}&token={json_win_param[token_render]}&expires={json_win_param['expires']}" - - -def get_m3u8_key_ep(): - response = requests.get('https://vixcloud.co/storage/enc.key') - - if response.ok: - return binascii.hexlify(response.content).decode('utf-8') - else: - console.log(f"[red]Error: {response.status_code}") - sys.exit(0) - - -def get_m3u8_audio(json_win_video, json_win_param, tv_name, n_season, n_ep, ep_title, token_render): - req = requests.get(f'https://vixcloud.co/playlist/{json_win_video["id"]}', - params={'token': json_win_param['token'], - 'expires': json_win_param["expires"]}, - headers={ - 'referer': f'https://vixcloud.co/embed/{json_win_video["id"]}?token={json_win_param[token_render]}&title={tv_name}&referer=1&expires={json_win_param["expires"]}&description=S{n_season}%3AE{n_ep}+{ep_title}&nextEpisode=1' - }) - - if req.ok: - result = audio_extractor_m3u8(req) - return result - else: - console.log(f"[red]Error: {req.status_code}") - sys.exit(0) - - -# [func \ main] -def dw_single_ep(tv_id, eps, index_ep_select, domain, tv_name, season_select): - encoded_name = urllib.parse.quote(eps[index_ep_select]['name']) - - console.print( - f"[green]Downloading episode: [blue]{eps[index_ep_select]['n']} [green]=> [purple]{eps[index_ep_select]['name']}") - embed_content = get_iframe(tv_id, eps[index_ep_select]['id'], domain) - if embed_content is None: - return - json_win_video, json_win_param, render_quality = parse_content(embed_content) - - token_render = f"token{render_quality}" - console.print(f"[blue]Selected quality => [red]{render_quality}") - - m3u8_playlist = get_playlist(json_win_video, json_win_param, render_quality) - m3u8_url = get_m3u8_url(json_win_video, json_win_param, render_quality) - m3u8_key = get_m3u8_key_ep() - - mp4_name = f"S{str(season_select).zfill(2)}E{str(index_ep_select + 1).zfill(2)}" - mp4_format = f"{mp4_name}.mp4" - season = mp4_name.rsplit("E", 1)[0] - mp4_path = os.path.join(config['root_path'], config['series_folder_name'], mp4_format) - - m3u8_url_audio = get_m3u8_audio(json_win_video, json_win_param, tv_name, season_select, index_ep_select + 1, - encoded_name, token_render) - - if m3u8_url_audio is not None: - console.print("[blue]Using m3u8 audio => [red]True") - - subtitle_path = os.path.join(config['root_path'], config['series_folder_name'], tv_name, season) - download_m3u8( - m3u8_index = m3u8_url, - m3u8_audio = m3u8_url_audio, - m3u8_subtitle = m3u8_playlist, - key = m3u8_key, - output_filename = mp4_path, - subtitle_folder = subtitle_path, - content_name = mp4_name - ) - - -def main_dw_tv(tv_id, tv_name, version, domain): - - num_season_find = get_info_tv(tv_id, tv_name, version, domain) - console.print( - "\n[green]Insert season [red]number[green], or [red](*) [green]to download all seasons, or [red][1-2] [green]for a range of seasons") - console.print(f"\n[blue]Season(s) found: [red]{num_season_find}") - season_select = str(msg.ask("\n[green]Insert which season(s) number you'd like to download")) - if "[" in season_select: - start, end = map(int, season_select[1:-1].split('-')) - result = list(range(start, end + 1)) - for n_season in result: - eps = get_info_season(tv_id, tv_name, domain, version, n_season) - for ep in eps: - dw_single_ep(tv_id, eps, int(ep['n']) - 1, domain, tv_name, n_season) - print("\n") - elif season_select != "*": - season_select = int(season_select) - if 1 <= season_select <= num_season_find: - eps = get_info_season(tv_id, tv_name, domain, version, season_select) - - for ep in eps: - console.print(f"[green]Episode: [blue]{ep['n']} [green]=> [purple]{ep['name']}") - index_ep_select = str(msg.ask( - "\n[green]Insert episode [blue]number[green], or [red](*) [green]to download all episodes, or [red][1-2] [green]for a range of episodes")) - - # Download range [] - if "[" in index_ep_select: - start, end = map(int, index_ep_select[1:-1].split('-')) - result = list(range(start, end + 1)) - - for n_range_ep in result: - # index_ep_select = int(n_range_ep) # Unused - dw_single_ep(tv_id, eps, n_range_ep - 1, domain, tv_name, season_select) - - # Download single ep - elif index_ep_select != "*": - if 1 <= int(index_ep_select) <= len(eps): - index_ep_select = int(index_ep_select) - 1 - dw_single_ep(tv_id, eps, index_ep_select, domain, tv_name, season_select) - else: - console.print("[red]Wrong [yellow]INDEX [red]for the selected Episode") - - # Download all - else: - for ep in eps: - dw_single_ep(tv_id, eps, int(ep['n']) - 1, domain, tv_name, season_select) - print("\n") - - else: - console.print("[red]Wrong [yellow]INDEX for the selected Season") - else: - for n_season in range(1, num_season_find + 1): - eps = get_info_season(tv_id, tv_name, domain, version, n_season) - for ep in eps: - dw_single_ep(tv_id, eps, int(ep['n']) - 1, domain, tv_name, n_season) - print("\n") diff --git a/Src/Lib/FFmpeg/installer.py b/Src/Lib/FFmpeg/installer.py deleted file mode 100644 index 9e2820cb..00000000 --- a/Src/Lib/FFmpeg/installer.py +++ /dev/null @@ -1,65 +0,0 @@ -# 24.01.2023 - -# Class -from Src.Util.console import console - -# Import -import subprocess, os, requests, zipfile, sys, ctypes, os, sys - -# Variable - - -# [ func ] -def isAdmin(): - try: - is_admin = (os.getuid() == 0) - except AttributeError: - is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 - return is_admin - -def download_ffmpeg(): - - # Specify the URL for the FFmpeg binary zip file for Windows - ffmpeg_url = "https://www.gyan.dev/ffmpeg/builds/ffmpeg-git-full.7z" - - # Name of the directory where FFmpeg will be extracted - ffmpeg_dir = "ffmpeg" - console.print("[yellow]Downloading FFmpeg...[/yellow]") - - # Download the FFmpeg zip file - response = requests.get(ffmpeg_url) - os.makedirs(ffmpeg_dir, exist_ok=True) - - # Save the zip file - zip_file_path = os.path.join(ffmpeg_dir, "ffmpeg.zip") - with open(zip_file_path, "wb") as zip_file: - zip_file.write(response.content) - - with zipfile.ZipFile(zip_file_path, "r") as zip_ref: - zip_ref.extractall(ffmpeg_dir) - - # Add the FFmpeg directory to the system PATH - ffmpeg_bin_dir = os.path.join(os.getcwd(), ffmpeg_dir, "bin") - os.environ["PATH"] += os.pathsep + ffmpeg_bin_dir - os.remove(zip_file_path) - -def check_ffmpeg(): - - console.print("[green]Checking ffmpeg ...") - - try: - subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) - console.print("[blue]FFmpeg [white]=> [red]Find") - except: - try: - console.print("[cyan]FFmpeg is not in the PATH. Downloading and adding to the PATH...[/cyan]") - - if not isAdmin(): - console.log("[red]You need admin privileges to proceed!") - sys.exit(0) - - download_ffmpeg() - sys.exit(0) - except: - console.print("[red]Unable to download or add FFmpeg to the PATH.[/red]") - sys.exit(0) diff --git a/Src/Lib/FFmpeg/my_m3u8.py b/Src/Lib/FFmpeg/my_m3u8.py index f8636578..28eb3ae3 100644 --- a/Src/Lib/FFmpeg/my_m3u8.py +++ b/Src/Lib/FFmpeg/my_m3u8.py @@ -1,445 +1,878 @@ -# 5.01.24 -> 7.01.24 -> 17.02.24 +# 5.01.24 -> 7.01.24 -> 20.02.24 -> 29.03.24 -# Class import -from Src.Util.console import console -from Src.Util.headers import get_headers -from Src.Util.config import config -from Src.Lib.FFmpeg.util import print_duration_table +# Importing modules +import os +import sys +import time +import threading +import logging +import warnings -# Import -from m3u8 import M3U8 as M3U8_Lib -from tqdm.rich import tqdm -import requests, os, ffmpeg, sys, warnings, shutil, time, threading -from concurrent.futures import ThreadPoolExecutor, as_completed -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from cryptography.hazmat.backends import default_backend - -# Disable warning +# Disable specific warnings from tqdm import TqdmExperimentalWarning warnings.filterwarnings("ignore", category=TqdmExperimentalWarning) warnings.filterwarnings("ignore", category=UserWarning, module="cryptography") +# External libraries +import requests +from tqdm.rich import tqdm +from concurrent.futures import ThreadPoolExecutor, as_completed + +# Internal utilities +from Src.Util.console import console +from Src.Util.headers import get_headers +from Src.Util.config import config_manager +from Src.Util.os import ( + remove_folder, + remove_file, + format_size, + compute_sha1_hash, + convert_to_hex +) +from Src.Lib.FFmpeg.util.helper import ( + print_duration_table, + transcode_with_subtitles, + join_audios, + concatenate_and_save +) + +# Logic class +from .util.math_calc import TSFileSizeCalculator +from .util.url_fix import M3U8_UrlFix +from .util.decryption import M3U8_Decryption +from .util.parser import M3U8_Parser + +# Config +Download_audio = config_manager.get_bool('M3U8_OPTIONS', 'download_audio') +Donwload_subtitles = config_manager.get_bool('M3U8_OPTIONS', 'download_subtitles') +DOWNLOAD_SPECIFIC_AUDIO = config_manager.get_list('M3U8_OPTIONS', 'specific_list_audio') +DOWNLOAD_SPECIFIC_SUBTITLE = config_manager.get_list('M3U8_OPTIONS', 'specific_list_subtitles') +TQDM_MAX_WORKER = config_manager.get_int('M3U8', 'tdqm_workers') +TQDM_PROGRESS_TIMEOUT = config_manager.get_int('M3U8', 'tqdm_progress_timeout') +COMPLETED_PERCENTAGE = config_manager.get_float('M3U8', 'donwload_percentage') +REQUESTS_TIMEOUT = config_manager.get_int('M3U8', 'requests_timeout') +ENABLE_TIME_TIMEOUT = config_manager.get_bool('M3U8', 'enable_time_quit') +TQDM_SHOW_PROGRESS = config_manager.get_bool('M3U8', 'tqdm_show_progress') +MIN_TS_FILES_IN_FOLDER = config_manager.get_int('M3U8', 'minium_ts_files_in_folder') +REMOVE_SEGMENTS_FOLDER = config_manager.get_bool('M3U8', 'cleanup_tmp_folder') + # Variable -MAX_WORKER = config['max_worker'] -DOWNLOAD_PATH = config['root_path'] -DOWNLOAD_SUB = config['download_subtitles'] -DOWNLOAD_DEFAULT_LANGUAGE = config['download_default_language'] -SELECTED_LANGUAGE = config['selected_language'] +config_headers = config_manager.get_dict('M3U8_OPTIONS', 'request') failed_segments = [] - +class_urlFix = M3U8_UrlFix() # [ main class ] -class Decryption(): - def __init__(self, key): - self.iv = None + +class M3U8_Segments: + def __init__(self, url, folder, key=None): + """ + Initializes M3U8_Segments with the provided URL and optional decryption key. + + Args: + - url (str): The URL of the M3U8 file. + - key (str, optional): The decryption key. Defaults to None. + """ + + self.url = url self.key = key - def parse_key(self, raw_iv): - self.iv = bytes.fromhex(raw_iv.replace("0x", "")) + # Init M3U8_Decryption class if key is present + if self.key is not None: + self.decryption = M3U8_Decryption(key) - def decrypt_ts(self, encrypted_data): - cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend()) - decryptor = cipher.decryptor() - decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize() + # Generate temp base folder based on hash of url + self.downloaded_size = 0 + self.temp_folder = folder + os.makedirs(self.temp_folder, exist_ok=True) - return decrypted_data + # Config + self.enable_timer = ENABLE_TIME_TIMEOUT + self.progress_timeout = TQDM_PROGRESS_TIMEOUT + self.class_ts_files_size = TSFileSizeCalculator() -class M3U8_Parser: - def __init__(self): - self.segments = [] - self.video_playlist = [] - self.keys = [] - self.subtitle_playlist = [] # No vvt ma url a vvt - self.subtitle = [] # Url a vvt - self.audio_ts = [] + def parse_data(self, m3u8_content: str) -> None: + """ + Parses the M3U8 content to extract segment information. - def parse_data(self, m3u8_content): + Args: + m3u8_content (str): The content of the M3U8 file. + """ try: - m3u8_obj = M3U8_Lib(m3u8_content) + # Parse index m3u8 content from request(m3u8).text + m3u8_parser = M3U8_Parser(DOWNLOAD_SPECIFIC_SUBTITLE) + m3u8_parser.parse_data(m3u8_content) - for playlist in m3u8_obj.playlists: - self.video_playlist.append({ - "uri": playlist.uri, - "width": playlist.stream_info.resolution, - "codecs": playlist.stream_info.codecs - }) + # Add decryption iv if key has the same byte string + if self.key is not None and m3u8_parser.keys.get('iv') is not None: + + iv = m3u8_parser.keys.get('iv') + method = m3u8_parser.keys.get('method') - for key in m3u8_obj.keys: - if key is not None: - self.keys = ({ - "method": key.method, - "uri": key.uri, - "iv": key.iv - }) + # Add iv for decryption to M3U8_Decryption + logging.info(f"[M3U8_Segments] Parameter iv => {iv}") + self.decryption.parse_key(iv) - for media in m3u8_obj.media: - if media.type == "SUBTITLES": - self.subtitle_playlist.append({ - "type": media.type, - "name": media.name, - "default": media.default, - "language": media.language, - "uri": media.uri - }) + # Add method for decryption to M3U8_Decryption + logging.info(f"[M3U8_Segments] Set method => {method}") + self.decryption.set_method(method) - if media.type == "AUDIO": - self.audio_ts.append({ - "type": media.type, - "name": media.name, - "default": media.default, - "language": media.language, - "uri": media.uri - }) + # Store segments + self.segments = m3u8_parser.segments + logging.info("[M3U8_Segments] Segments extracted successfully.") + + except Exception as e: + logging.error(f"[M3U8_Segments] Error parsing M3U8 content: {e}") - for segment in m3u8_obj.segments: - if "vtt" not in segment.uri: - self.segments.append(segment.uri) - else: - self.subtitle.append(segment.uri) + def get_info(self) -> None: + """ + Makes a request to the index m3u8 file to get information about segments. + """ + + try: + # Add random user agent to config headers + config_headers['index']['user-agent'] = get_headers() + + # Send a GET request to retrieve the index m3u8 file + response = requests.get(self.url, headers=config_headers['index']) + response.raise_for_status() # Raise HTTPError for non-2xx status codes + + # Parse text from request to m3u8 index + self.parse_data(response.text) + logging.info(f"[M3U8_Segments] Ts segments found: {len(self.segments)}") + + except requests.exceptions.RequestException as req_err: + logging.error(f"[M3U8_Segments] Error occurred during request: {req_err}") + sys.exit(1) # Exit with non-zero status to indicate an error except Exception as e: - print(f"Error parsing M3U8 content: {e}") + logging.error(f"[M3U8_Segments] Error occurred: {e}") + + def is_valid_ts_url(self, ts_url: str) -> bool: + """ + Check if the given ts URL is valid. + + Args: + ts_url (str): The URL of the ts file. + failed_segments (list): List of failed segment URLs. + + Returns: + bool: True if the URL is valid, False otherwise. + """ + # Check if the URL exists in the list of segments and is not in the list of failed segments + for failed_seg in failed_segments: + if str(failed_seg) in ts_url: + return False + + return True - def get_best_quality(self): + def make_reqests_stream(self, ts_url: str) -> bytes: + """ + Make a single request to a ts file to get content. - if self.video_playlist: - return self.video_playlist[0].get('uri') - else: - print("No video playlist found") - return None - - def download_subtitle(self, subtitle_path, content_name): + Args: + ts_url (str): The URL of the ts file. - path = subtitle_path + Returns: + bytes or None: The content of the requested ts file if successful, otherwise None. + """ - if self.subtitle_playlist: - for sub_info in self.subtitle_playlist: - name_language = sub_info.get("language") + try: - os.makedirs(path, exist_ok=True) - console.log(f"[green]Downloading subtitle: [red]{name_language}") - req_sub_content = requests.get(sub_info.get("uri")) + # Fix URL if it is incomplete (missing 'http') + if "http" not in ts_url: + ts_url = class_urlFix.generate_full_url(ts_url) + logging.info(f"Generated new URL: {ts_url}") - sub_parse = M3U8_Parser() - sub_parse.parse_data(req_sub_content.text) - url_subtitle = sub_parse.subtitle[0] + # Check if the ts_url is valid + is_valid_url = self.is_valid_ts_url(ts_url) - if "forced" in name_language.lower(): - name_language = name_language.lower().replace("forced", "").strip() - name_language = name_language.lower().replace("-", "").strip() - subtitle_name = f"{content_name}.{name_language}.forced.vtt" - else: - subtitle_name = f"{content_name}.{name_language}.vtt" + if is_valid_url: + # Generate random user agent for segments request + headers = config_headers.get('segments') + headers['user-agent'] = get_headers() - open(os.path.join(path, subtitle_name), "wb" - ).write(requests.get(url_subtitle).content) + # Make GET request to ts audio or video file with a random user agent + response = requests.get(ts_url, headers=headers, timeout=REQUESTS_TIMEOUT) - else: - console.log("[red]No subtitle found") + # If the response status code is not 200, mark the URL as failed + if response.status_code != 200: + logging.error(f"Failed to fetch content from {ts_url}. Status code: {response.status_code}") + return None - def get_track_audio(self, language_name): + # Return the content if the request is successful + return response.content - if self.audio_ts: - console.log(f"[cyan]Found {len(self.audio_ts)}, playlist with audio") + else: + logging.info(f"Skipping invalid URL: {ts_url}") + return None - if language_name is not None: - for obj_audio in self.audio_ts: - if obj_audio.get("name") == language_name: - return obj_audio.get("uri") + except requests.exceptions.RequestException as req_err: + logging.error(f"Error occurred during request to {ts_url}: {req_err}") + return None + except Exception as e: + logging.error(f"An unexpected error occurred: {e}") return None - - else: - console.log("[red]Couldn't find any playlist with audio") -class M3U8_Segments: - def __init__(self, url, key=None): - self.url = url - self.key = key - if key is not None: - self.decription = Decryption(key) + def save_stream(self, index: int, progress_counter: tqdm, stop_event: threading.Event) -> None: + """ + Save ts file and decrypt if there is an iv present in the decryption class. - self.temp_folder = os.path.join("tmp", "segments") - os.makedirs(self.temp_folder, exist_ok=True) + Parameters: + - index (int): The index of the ts file in the segments list. + - progress_counter (tqdm): The progress counter object. + - stop_event (threading.Event): The event to signal when to quit. + """ + # Break if stop event is true + if stop_event.is_set(): + return - self.progress_timeout = 10 - self.max_retry = 3 + try: + # Get ts url and create a filename based on index + ts_url = self.segments[index] + ts_filename = os.path.join(self.temp_folder, f"{index}.ts") + logging.info(f"Requesting: {ts_url}, saving to: {ts_filename}") - def parse_data(self, m3u8_content): - m3u8_parser = M3U8_Parser() - m3u8_parser.parse_data(m3u8_content) - - # Add decryption iv if present - if self.key is not None and m3u8_parser.keys: - self.decription.parse_key(m3u8_parser.keys.get("iv")) + # If file already exists, skip download + if os.path.exists(ts_filename): + logging.info(f"Skipping download. File already exists: {ts_filename}") + return - # Add all segments - self.segments = m3u8_parser.segments + # Get bytes of ts data + ts_content = self.make_reqests_stream(ts_url) - def get_info(self): - response = requests.get(self.url, headers={'user-agent': get_headers()}) + # If data is retrieved + if ts_content is not None: + # Create a file to save data + with open(ts_filename, "wb") as ts_file: + # Decrypt if there is an IV in the main M3U8 index + if self.key and self.decryption.iv: + decrypted_data = self.decryption.decrypt(ts_content) + ts_file.write(decrypted_data) + else: + ts_file.write(ts_content) - if response.ok: - self.parse_data(response.text) + # Update downloaded size + if TQDM_SHOW_PROGRESS: + self.downloaded_size += len(ts_content) + self.class_ts_files_size.add_ts_file_size(len(ts_content) * len(self.segments)) - if len(self.segments) == 0: - console.log("[red]Couldn't find any segments to download, retry") - sys.exit(0) + except Exception as e: + logging.error(f"Error saving TS file: {e}") - else: - console.log(f"[red]Wrong m3u8, error: {response.status_code}") - sys.exit(0) + finally: + # Update progress counter + progress_counter.update(1) - def get_req_ts(self, ts_url): - url_number = self.segments.index(ts_url) + if TQDM_SHOW_PROGRESS: + downloaded_size_str = format_size(self.downloaded_size) + estimate_total_size = self.class_ts_files_size.calculate_total_size() + progress_counter.set_description(f"[yellow]Download [red][{index}] - [{downloaded_size_str} / {estimate_total_size}]") + else: + progress_counter.set_description(f"[yellow]Download") - is_valid = True - for failde_seg in failed_segments: - if str(failde_seg) in ts_url: - is_valid = False - break + # Refresh progress bar + progress_counter.refresh() - if is_valid: + def donwload_streams(self): + """ + Downloads TS segments in parallel using ThreadPoolExecutor. - try: - response = requests.get(ts_url, headers={'user-agent': get_headers()}, timeout=5) + """ + try: + # Initialize progress bar + progress_counter = tqdm(total=len(self.segments), unit=" segment", desc="[yellow]Download") - if response.status_code == 200: - return response.content - else: - failed_segments.append(str(url_number)) - return None - - except Exception as e: - failed_segments.append(str(url_number)) - return None - - else: - return None + # Event to signal stop condition for progress monitoring + stop_event = threading.Event() - def save_ts(self, index, progress_counter, stop_event): + # Start progress monitor thread + progress_thread = threading.Thread(target=self.timer, args=(progress_counter, stop_event)) + progress_thread.start() - if stop_event.is_set(): - return + # Create ThreadPoolExecutor for parallel downloading + with ThreadPoolExecutor(max_workers=TQDM_MAX_WORKER) as executor: + futures = [] - ts_url = self.segments[index] - ts_filename = os.path.join(self.temp_folder, f"{index}.ts") + # Submit tasks for downloading segments + for index in range(len(self.segments)): + future = executor.submit(self.save_stream, index, progress_counter, stop_event) + futures.append(future) - if not os.path.exists(ts_filename): - ts_content = self.get_req_ts(ts_url) + try: + # Wait for tasks to complete + for future in as_completed(futures): + future.result() - if ts_content is not None: - with open(ts_filename, "wb") as ts_file: - if self.key and self.decription.iv: - decrypted_data = self.decription.decrypt_ts(ts_content) - ts_file.write(decrypted_data) - else: - ts_file.write(ts_content) + # Check if progress reached 99% + if progress_counter.n >= len(self.segments) * COMPLETED_PERCENTAGE: + #console.log(f"[yellow]Progress reached {COMPLETED_PERCENTAGE*100}%. Stopping.") + progress_counter.refresh() + break + + except KeyboardInterrupt: + console.log("[red]Ctrl+C detected. Exiting gracefully [white]...") + stop_event.set() + + except KeyboardInterrupt: + logging.info("Ctrl+C detected. Exiting gracefully...") + + except Exception as e: + logging.error(f"An unexpected error occurred: {e}") + + finally: + # Signal stop event to end progress monitor thread + stop_event.set() + + # Wait for progress monitor thread to finish + progress_thread.join() + + def timer(self, progress_counter: tqdm, quit_event: threading.Event): + """ + Function to monitor progress and quit if no progress is made within a certain time - progress_counter.update(1) - - def download_ts(self): - progress_counter = tqdm(total=len(self.segments), unit="bytes", desc="[yellow]Download") - stop_event = threading.Event() - progress_thread = threading.Thread(target=self.timer, args=(progress_counter, stop_event)) - progress_thread.start() - - with ThreadPoolExecutor(max_workers=MAX_WORKER) as executor: - futures = [] - - # Submit tasks for downloading segments - for index in range(len(self.segments)): - future = executor.submit(self.save_ts, index, progress_counter, stop_event) - futures.append(future) - - try: - for future in as_completed(futures): - future.result() - if progress_counter.n >= len(self.segments) * 0.995: - console.log(f"[yellow]Progress reached {0.995*100}%. Stopping.") - break - - except KeyboardInterrupt: - console.log("[red]Ctrl+C detected. Exiting gracefully [white]...") - stop_event.set() - - progress_thread.join() - - def timer(self, progress_counter, quit_event): + Parameters: + - progress_counter (tqdm): The progress counter object. + - quit_event (threading.Event): The event to signal when to quit. + """ + + # If timer is disabled, return immediately without starting it, to reduce cpu use + if not self.enable_timer: + return + start_time = time.time() last_count = 0 + # Loop until quit event is set while not quit_event.is_set(): current_count = progress_counter.n + # Update start time when progress is made if current_count != last_count: start_time = time.time() last_count = current_count + # Calculate elapsed time elapsed_time = time.time() - start_time + # Check if elapsed time exceeds progress timeout if elapsed_time > self.progress_timeout: - console.log(f"[red]No progress for {self.progress_timeout} seconds. Stopping.") + console.log(f"[red]No progress for {self.progress_timeout} seconds. Stopping.") + + # Set quit event to break the loop quit_event.set() break - time.sleep(1) + + # Calculate remaining time until timeout + remaining_time = max(0, self.progress_timeout - elapsed_time) + # Determine sleep interval dynamically based on remaining time + sleep_interval = min(1, remaining_time) + + # Wait for the calculated sleep interval + time.sleep(sleep_interval) + + # Refresh progress bar progress_counter.refresh() - def join(self, output_filename): - """Join all segments file to a mp4 file name""" - file_list_path = os.path.join('file_list.txt') - ts_files = [f for f in os.listdir(self.temp_folder) if f.endswith(".ts")] +class Downloader(): + def __init__(self, output_filename: str = None, m3u8_playlist:str = None, m3u8_index:str = None, key: str = None): + + """ + Initialize the Downloader object. + + Parameters: + - output_filename (str): Output filename for the downloaded content. + - m3u8_playlist (str, optional): URL to the main M3U8 playlist. + - key (str, optional): Hexadecimal representation of the encryption key. + """ + + + self.m3u8_playlist = m3u8_playlist + self.m3u8_index = m3u8_index + self.key = bytes.fromhex(key) if key is not None else key + self.output_filename = output_filename + + # Auto generate out file name if not present + if output_filename == None: + if m3u8_playlist != None: + self.output_filename = os.path.join("missing", compute_sha1_hash(m3u8_playlist)) + else: + self.output_filename = os.path.join("missing", compute_sha1_hash(m3u8_index)) + + if self.key != None: + hex_data = convert_to_hex(self.key) + console.log(f"[cyan]Key use [white]=> [red]{hex_data}") + + # Initialize temp base path + self.base_path = os.path.join(str(self.output_filename).replace(".mp4", "")) + self.video_segments_path = os.path.join(self.base_path, "tmp", "video") + self.audio_segments_path = os.path.join(self.base_path, "tmp", "audio") + self.subtitle_segments_path = os.path.join(self.base_path, "tmp", "subtitle") + + # Create temp folder + logging.info("Create temp folder") + os.makedirs(self.video_segments_path, exist_ok=True) + os.makedirs(self.audio_segments_path, exist_ok=True) + os.makedirs(self.subtitle_segments_path, exist_ok=True) + + # Track subtitle, audio donwload + self.downloaded_audio = [] + self.downloaded_subtitle = [] + self.downloaded_video = [] + + # Default decoding + self.video_decoding = "avc1.640028" + self.audio_decoding = "mp4a.40.2" + + def __df_make_req__(self, url: str) -> str: + """ + Make a request to get text from the provided URL. + + Parameters: + - url (str): The URL to make the request to. + + Returns: + - str: The text content of the response. + """ + try: + # Send a GET request to the provided URL + config_headers.get('index')['user-agent'] = get_headers() + response = requests.get(url, headers=config_headers.get('index')) + + if response.ok: + return response.text + else: + logging.error(f"[df_make_req] Request to {url} failed with status code: {response.status_code}") + return None + + except requests.RequestException as req_err: + logging.error(f"[df_make_req] Error occurred during request: {req_err}") + return None + + except Exception as e: + logging.error(f"[df_make_req] An unexpected error occurred: {e}") + return None + + def manage_playlist(self, m3u8_playlist_text): + """ + Parses the M3U8 playlist to extract information about keys, playlists, subtitles, etc. - def extract_number(file_name): - return int(''.join(filter(str.isdigit, file_name))) - ts_files.sort(key=extract_number) + Args: + m3u8_playlist_text (str): The text content of the M3U8 playlist. + """ - if len(ts_files) == 0: - console.log("[red]Couldn't find any segments to join, retry") - sys.exit(0) + global Download_audio, Donwload_subtitles - with open(file_list_path, 'w') as f: - for ts_file in ts_files: - relative_path = os.path.relpath(os.path.join(self.temp_folder, ts_file)) - f.write(f"file '{relative_path}'\n") + # Create an instance of the M3U8_Parser class + parse_class_m3u8 = M3U8_Parser(DOWNLOAD_SPECIFIC_SUBTITLE) - try: - ffmpeg.input(file_list_path, format='concat', safe=0).output(output_filename, map_metadata='-1', c='copy', loglevel='error').run() - except ffmpeg.Error as e: - console.log(f"[red]Error saving MP4: {e.stdout}") - - os.remove(file_list_path) - shutil.rmtree("tmp", ignore_errors=True) + # Extract information about the M3U8 playlist + parse_class_m3u8.parse_data(m3u8_playlist_text) -class M3U8_Downloader: - def __init__(self, m3u8_url, m3u8_audio = None, key=None, output_filename="output.mp4"): - self.m3u8_url = m3u8_url - self.m3u8_audio = m3u8_audio - self.key = key - self.video_path = output_filename - self.audio_path = os.path.join(DOWNLOAD_PATH, "audio.mp4") + # Collect available audio tracks and default audio track + self.list_available_audio = parse_class_m3u8.get_track_audios() + self.default_audio = parse_class_m3u8.get_default_track_audio() + + # Check if there is some audios, else disable download + if self.list_available_audio != None: + console.log(f"[cyan]Find audios language: [red]{[obj_audio.get('language') for obj_audio in self.list_available_audio]}") + else: + console.log("[red]Cant find a list of audios") + Download_audio = False + + # Collect available subtitles and default subtitle + self.list_available_subtitles = parse_class_m3u8.get_subtitles() + self.default_subtitle = parse_class_m3u8.get_default_subtitle() + + # Check if there is some subtitles, else disable download + if self.list_available_subtitles != None: + console.log(f"[cyan]Find subtitles language: [red]{[obj_sub.get('language') for obj_sub in self.list_available_subtitles]}") + else: + console.log("[red]Cant find a list of audios") + Donwload_subtitles = False + + # Collect best quality video + m3u8_index_obj = parse_class_m3u8.get_best_quality() + + # Get URI of the best quality and codecs parameters + console.log(f"[cyan]Select resolution: [red]{m3u8_index_obj.get('width')}") + m3u8_index = m3u8_index_obj.get('uri') + m3u8_index_decoding = m3u8_index_obj.get('codecs') + + # Fix URL if it is not complete with http:\\site_name.domain\... + if "http" not in m3u8_index: + + # Generate full URL + m3u8_index = class_urlFix.generate_full_url(m3u8_index) + + # Check if a valid HTTPS URL is obtained + if m3u8_index is not None and "https" in m3u8_index: + console.log(f"[cyan]Found m3u8 index [white]=> [red]{m3u8_index}") + else: + logging.warning("[download_m3u8] Can't find a valid m3u8 index") + sys.exit(0) + + # Collect best index, video decoding, and audio decoding + self.m3u8_index = m3u8_index + + # if is present in playlist + if m3u8_index_decoding != None: + self.video_decoding = m3u8_index_decoding.split(",")[0] + self.audio_decoding = m3u8_index_decoding.split(",")[1] + + def manage_subtitle(self): + """ + Downloads and manages subtitles. + + This method iterates over available subtitles, downloads them if necessary, and updates + the list of downloaded subtitles. + """ + + # Iterate over each available subtitle + for obj_subtitle in self.list_available_subtitles: + logging.info(f"(manage_subtitle) Find => {obj_subtitle}") + + # Check if there is custom subtitles to download + if len(DOWNLOAD_SPECIFIC_SUBTITLE) > 0: + + # Check if language in list + if obj_subtitle.get('language') not in DOWNLOAD_SPECIFIC_SUBTITLE: + continue + + # Construct full path for the subtitle file + sub_full_path = os.path.join(self.subtitle_segments_path, obj_subtitle.get('language') + ".vtt") + + # Check if the subtitle file already exists + if not os.path.exists(sub_full_path): + console.log(f"[cyan]Download subtitle [white]=> [red]{obj_subtitle.get('language')}.") + + # Add the subtitle to the list of downloaded subtitles + self.downloaded_subtitle.append({ + 'name': obj_subtitle.get('name').split(" ")[0], + 'language': obj_subtitle.get('language').upper(), + 'path': os.path.abspath(sub_full_path) + }) + + + # If the subtitle file doesn't exist, download it + response = requests.get(obj_subtitle.get('uri')) + open(sub_full_path, "wb").write(response.content) + + def manage_audio(self): + """ + Downloads and manages audio segments. + + This method iterates over available audio tracks, downloads them if necessary, and updates + the list of downloaded audio tracks. + """ + + # Iterate over each available audio track + for obj_audio in self.list_available_audio: + logging.info(f"(manage_audio) Find => {obj_audio}") + + # Check if there is custom subtitles to download + if len(DOWNLOAD_SPECIFIC_AUDIO) > 0: + + # Check if language in list + if obj_audio.get('language') not in DOWNLOAD_SPECIFIC_AUDIO: + continue + + # Construct full path for the audio segment directory + full_path_audio = os.path.join(self.audio_segments_path, obj_audio.get('language')) - def start(self): - video_m3u8 = M3U8_Segments(self.m3u8_url, self.key) - console.log("[purple]Downloading video ts") + self.downloaded_audio.append({ + 'language': obj_audio.get('language'), + 'path': full_path_audio + }) + + # Check if the audio segment directory already exists + if not os.path.exists(full_path_audio): + + # If the audio segment directory doesn't exist, download audio segments + audio_m3u8 = M3U8_Segments(obj_audio.get('uri'), full_path_audio, self.key) + console.log(f"[purple]Download audio segments [white]=> [red]{obj_audio.get('language')}.") + + # Get information about the audio segments + audio_m3u8.get_info() + + # Download the audio segments + audio_m3u8.donwload_streams() + + def manage_video(self): + """ + Downloads and manages video segments. + + This method downloads video segments if necessary and updates + the list of downloaded video segments. + """ + + # Construct full path for the video segment directory + full_path_video = self.video_segments_path + + # Create an instance of M3U8_Segments to handle video segments + video_m3u8 = M3U8_Segments(self.m3u8_index, full_path_video, self.key) + console.log("[purple]Download video segments.") + + # Add the video segment directory to the list of downloaded video segments + self.downloaded_video.append({ + 'path': full_path_video + }) + + # Get information about the video segments video_m3u8.get_info() - video_m3u8.download_ts() - video_m3u8.join(self.video_path) - print_duration_table(self.video_path) - - if self.m3u8_audio is not None: - audio_m3u8 = M3U8_Segments(self.m3u8_audio, self.key) - console.log("[purple]Downloading audio ts") - audio_m3u8.get_info() - audio_m3u8.download_ts() - audio_m3u8.join(self.audio_path) - print_duration_table(self.audio_path) - - self.join_audio() - - if os.path.exists(f"{self.video_path}.mp4"): - os.renames(f"{self.video_path}.mp4", self.video_path) - def join_audio(self): - console.log("[purple]Join audio and video") + # Download the video segments + video_m3u8.donwload_streams() - try: - video_stream = ffmpeg.input(self.video_path) - audio_stream = ffmpeg.input(self.audio_path) - - process = ( - ffmpeg.output( - video_stream, - audio_stream, - self.video_path + ".mp4", - vcodec="copy", - acodec="copy", - loglevel='error' - ) - .global_args( - '-map', '0:v:0', - '-map', '1:a:0', - '-shortest', '-strict', - 'experimental') - .run() + @staticmethod + def extract_number(file_name): + return int(''.join(filter(str.isdigit, file_name))) + + def join_ts_files(self, full_path: str, out_file_name: str): + """ + Joins the individual .ts files into a single video file. + + Args: + full_path (str): The full path to the directory containing the .ts files. + out_file_name (str): The name of the output video file. + + Returns: + str: The path to the output video file. + """ + + # Get the current directory and create a file_list with the path of all .ts files + file_list_path = os.path.join('file_list.txt') + + # Sort files (1.ts, 2.ts, ...) based on their numbers + ts_files = [f for f in os.listdir(full_path) if f.endswith(".ts")] + ts_files.sort(key=Downloader.extract_number) + + # Check if there are enough .ts files to join (at least 10) + if len(ts_files) < 10: + logging.error(f"No .ts file to join in folder: {full_path}") + + else: + + # Save files sorted in a txt file with absolute path to fix problem with ( C:\\path (win)) + with open(file_list_path, 'w') as file_list: + for ts_file in ts_files: + #absolute_path = os.path.abspath(os.path.join(full_path, ts_file)) + relative_path = os.path.relpath(os.path.join(full_path, ts_file)) + file_list.write(f"file '{relative_path}'\n") + + # Concatenate and save the files and return the path to the output filename + return concatenate_and_save( + file_list_path=file_list_path, + output_filename=out_file_name, + video_decoding=self.video_decoding, + audio_decoding=self.audio_decoding + ) + + def download_audios(self): + """ + Downloads audio files and stores their paths. + """ + + # Initialize an empty list to store audio tracks paths + self.audio_tracks_path = [] + + # Check if there are any downloaded audio objects + if len(self.downloaded_audio) > 0: + + # Iterate over each downloaded audio object + for obj_downloaded_audio in self.downloaded_audio: + + # Create the expected path for the audio file based on its language + obj_audio_path = os.path.join(self.base_path, obj_downloaded_audio.get('language') + ".mp4") + + # Check if the audio file already exists + if not os.path.exists(obj_audio_path): + + # If the audio file doesn't exist, join the .ts files and save as .mp4 + new_audio_path = self.join_ts_files( + obj_downloaded_audio.get('path'), + obj_audio_path + ) + + console.log(f"[cyan]Join segments: [red]{obj_downloaded_audio.get('language')}") + + # Add the joined audio file path to the list + self.audio_tracks_path.append({ + 'path': new_audio_path + }) + + def download_videos(self): + """ + Downloads video files and stores their path. + """ + + # Construct the expected path for the video file + video_track_path = os.path.join(self.base_path, "video.mp4") + console.log(f"[cyan]Join segments: [red]video") + + # Check if the video file already exists + if not os.path.exists(video_track_path): + + # If the video file doesn't exist, join the .ts files and save as .mp4 + video_track_path = self.join_ts_files( + self.downloaded_video[0].get('path'), + video_track_path + ) + + # Get info video + print_duration_table(video_track_path) + + self.video_track_path = video_track_path + + def add_subtitles_audios(self): + """Add subtitles and audio tracks to the video. + + This function checks if there are any audio tracks and adds them to the video if available. + It also adds subtitles to the video if there are any downloaded. If no audio tracks are + available, it uses the original video path. The resulting video with added subtitles is + saved as 'out.mkv' in the base path and rename to .mp4. + """ + + # Initialize variables + path_video_and_audio = None + path_join_subtitles = None + + # Check if there are any audio tracks + if len(self.audio_tracks_path) > 0: + # Log adding audio tracks + console.log(f"[cyan]Add audios.") + + # Join audio tracks with the video + path_video_and_audio = join_audios( + video_path=self.video_track_path, + audio_tracks=self.audio_tracks_path ) - console.print("[green]Merge completed successfully.") + # Check if there are any downloaded subtitles + if len(self.downloaded_subtitle) > 0: + # Log adding subtitles + console.log(f"[cyan]Add subtitles.") + + # If no audio tracks were joined, use the original video path + if path_video_and_audio is None: + path_video_and_audio = self.video_track_path + + # Transcode video with subtitles + path_join_subtitles = transcode_with_subtitles( + path_video_and_audio, + self.downloaded_subtitle, + os.path.join(self.base_path, "out.mkv") + ) + + self.path_video_and_audio = path_video_and_audio + self.path_join_subtitles = path_join_subtitles + + def cleanup_tmp(self, is_index = False): + """Cleanup temporary files. + + This function removes temporary audio join files, the starting video file if necessary, + and the temporary folder. It also renames the output file to the desired output filename. + + Args: + full_path (str): The full path to the directory containing the .ts files. + is_index (bool): To bypass audio tracks and subtitles tracks + """ - except ffmpeg.Error as e: - print("ffmpeg error:", e) + join_output_file = None + console.log("[cyan]Cleanup [white]...") + + # Remove audio join files + if not is_index: + for clean_audio_path in self.audio_tracks_path: + remove_file(clean_audio_path.get('path')) + + # Determine the output file + if not is_index: + + # Determine the output file + if self.path_join_subtitles is not None: + join_output_file = self.path_join_subtitles + remove_file(self.path_video_and_audio) + else: + join_output_file = self.path_video_and_audio + + # Remove the starting video if necessary + if self.path_join_subtitles is not None or self.path_video_and_audio is not None: + remove_file(self.video_track_path) + + # If no join or video and audio files exist, the final output is the original video + if self.path_join_subtitles is None and self.path_video_and_audio is None: + join_output_file = self.video_track_path + + # Rename output file + os.rename(join_output_file, self.output_filename) + + # Remove the temporary folder + if not is_index: + remove_folder(self.base_path) + else: + remove_folder(os.path.join(self.base_path, "tmp")) - os.remove(self.video_path) - os.remove(self.audio_path) - - -# [ main function ] -def df_make_req(url): - response = requests.get(url) - - if response.ok: - return response.text - else: - console.log(f"[red]Wrong url, error: {response.status_code}") - sys.exit(0) - -def download_subtitle(url, name_language): - path = os.path.join(DOWNLOAD_PATH, "subtitle") - os.makedirs(path, exist_ok=True) - - console.log(f"[green]Downloading subtitle: [red]{name_language}") - open(os.path.join(path, name_language + ".vtt"), "wb").write(requests.get(url).content) - -def download_m3u8( - m3u8_playlist=None, - m3u8_index = None, - m3u8_audio=None, - m3u8_subtitle=None, - key=None, - output_filename=os.path.join(DOWNLOAD_PATH, "output.mp4"), - log=False, - subtitle_folder="subtitles", - content_name="" - ): - - m3u8_audio_url=None - # m3u8_playlist never use in this version - - key = bytes.fromhex(key) if key is not None else key - - if m3u8_audio is not None: - m3u8_audio_obj = None - if DOWNLOAD_DEFAULT_LANGUAGE: - m3u8_audio_obj = next((audioobj for audioobj in m3u8_audio if audioobj.get("default", False)), None) or m3u8_audio[0] - elif SELECTED_LANGUAGE: - m3u8_audio_obj = next((audioobj for audioobj in m3u8_audio if audioobj["lang"] == SELECTED_LANGUAGE), None) - if m3u8_audio_obj is None: - console.log("[red]Cant find a valid m3u8 audio") - sys.exit(0) - m3u8_audio_url = m3u8_audio_obj["url"] - console.log(f"[green]Select language => [purple]{m3u8_audio_obj['lang']}") - - if m3u8_subtitle != None: - - parse_class_m3u8_sub = M3U8_Parser() - - # Parse directly m3u8 content pass if present - if "#EXTM3U" not in m3u8_subtitle: - parse_class_m3u8_sub.parse_data(df_make_req(m3u8_subtitle)) - else: - parse_class_m3u8_sub.parse_data(m3u8_subtitle) - - # Download subtitle if present ( normally in m3u8 playlist ) - if DOWNLOAD_SUB: - parse_class_m3u8_sub.download_subtitle(subtitle_path=subtitle_folder, content_name=content_name) - - # Download m3u8 index, with segments - path = os.path.dirname(output_filename) - os.makedirs(path, exist_ok=True) - - if log: - console.log(f"[green]Download m3u8 from index [white]=> [purple]{m3u8_index}") - M3U8_Downloader(m3u8_index, m3u8_audio_url, key=key, output_filename=output_filename).start() + def download_m3u8(self): + """ + Download content from M3U8 sources including video, audio, and subtitles. + """ + + # Check if the M3U8 playlist is valid + if self.m3u8_playlist is not None: + logging.info(f"Download m3u8 from playlist.") + + # Fetch the M3U8 playlist content + m3u8_playlist_text = self.__df_make_req__(self.m3u8_playlist) + + # Add full URL of the M3U8 playlist to fix next .ts without https if necessary + class_urlFix.set_playlist(self.m3u8_playlist) + + # Collect information about the playlist + self.manage_playlist(m3u8_playlist_text) + + # Download subtitles + if Donwload_subtitles: + logging.info("Download subtitles ...") + self.manage_subtitle() + + # Download segmenets of audio tracks + if Download_audio: + logging.info("Download audios ...") + self.manage_audio() + + # Download segements of video segments + logging.info("Download videos ...") + self.manage_video() + + # Convert audios segments to mp4 + self.download_audios() + + # Convert video segments to mp4 + self.download_videos() + + # Add subtitles and audio to video mp4 if present + self.add_subtitles_audios() + + # Clean up folder of all tmp folder and tmp with .ts segments folder + if REMOVE_SEGMENTS_FOLDER: + self.cleanup_tmp() + + else: + logging.info(f"Download m3u8 from index.") + + # Add full URL of the M3U8 playlist to fix next .ts without https if necessary + class_urlFix.set_playlist(self.m3u8_index) + + logging.info("Download videos ...") + self.manage_video() + + # Convert video segments to mp4 + self.download_videos() + + # Clean up folder of all tmp folder and tmp with .ts segments folder + if REMOVE_SEGMENTS_FOLDER: + self.cleanup_tmp(is_index = True) diff --git a/Src/Lib/FFmpeg/util.py b/Src/Lib/FFmpeg/util.py deleted file mode 100644 index a4c47fa9..00000000 --- a/Src/Lib/FFmpeg/util.py +++ /dev/null @@ -1,56 +0,0 @@ -# 31.01.24 - -# Class -from Src.Util.console import console - -# Import -import ffmpeg - - -# Variable - - -# [ func ] -def get_video_duration(file_path): - try: - probe = ffmpeg.probe(file_path) - duration = float(probe['format']['duration']) - return duration - except ffmpeg.Error as e: - print(f"Error: {e.stderr}") - return None - - -def format_duration(seconds): - hours, remainder = divmod(seconds, 3600) - minutes, seconds = divmod(remainder, 60) - return int(hours), int(minutes), int(seconds) - - -def print_duration_table(file_path): - video_duration = get_video_duration(file_path) - - if video_duration is not None: - hours, minutes, seconds = format_duration(video_duration) - console.log( - f"[cyan]Info [green]'{file_path}': [purple]{int(hours)}[red]h [purple]{int(minutes)}[red]m [purple]{int(seconds)}[red]s") - - -def audio_extractor_m3u8(req): - m3u8_cont = req.text.split() - m3u8_cont_arr = [] - for row in m3u8_cont: - if "audio" in str(row): - lang = None - default = False - for field in row.split(","): - if "NAME" in field: - lang = field.split('"')[-2] - if "DEFAULT" in field: - default_str = field.split('=')[1] - default = default_str.strip() == "YES" - audioobj = {"url": row.split(",")[-1].split('"')[-2], "lang": lang, "default": default} - if audioobj['lang'] is None: - continue - m3u8_cont_arr.append(audioobj) - return m3u8_cont_arr or None diff --git a/Src/Upload/__init__.py b/Src/Upload/__init__.py new file mode 100644 index 00000000..feed9cfe --- /dev/null +++ b/Src/Upload/__init__.py @@ -0,0 +1,3 @@ +# 01.03.24 + +from .update import update \ No newline at end of file diff --git a/Src/Upload/__version__.py b/Src/Upload/__version__.py index 3e4e8d15..7078ca17 100644 --- a/Src/Upload/__version__.py +++ b/Src/Upload/__version__.py @@ -1,5 +1,5 @@ __title__ = 'Streaming_community' -__version__ = 'v0.9.2' +__version__ = 'v1.0.0' __author__ = 'Ghost6446' __description__ = 'A command-line program to download film' __license__ = 'MIT License' diff --git a/Src/Upload/update.py b/Src/Upload/update.py index 815e8379..052da5a8 100644 --- a/Src/Upload/update.py +++ b/Src/Upload/update.py @@ -1,47 +1,73 @@ -# 13.09.2023 +# 01.03.2023 # Class import from Src.Util.console import console # General import -import os, requests, time +import os +import requests +import time # Variable repo_name = "StreamingCommunity_api" repo_user = "ghost6446" main = os.path.abspath(os.path.dirname(__file__)) + def get_install_version(): + """ + Get the installed version from the '__version__.py' file. + """ + about = {} - with open(os.path.join(main, '__version__.py'), 'r', encoding='utf-8') as f: + + version_file_path = os.path.join(main, '__version__.py') + + with open(version_file_path, 'r', encoding='utf-8') as f: exec(f.read(), about) + return about['__version__'] -def main_update(): - console.print("[green]Checking GitHub version ...") +def update(): + """ + Check for updates on GitHub and display relevant information. + """ + + console.print("[green]Checking GitHub version [white]...") - json = requests.get(f"https://api.github.com/repos/{repo_user}/{repo_name}/releases").json()[0] - stargazers_count = requests.get(f"https://api.github.com/repos/{repo_user}/{repo_name}").json()['stargazers_count'] + # Make the GitHub API requests and handle potential errors + try: + repo_info = requests.get(f"https://api.github.com/repos/{repo_user}/{repo_name}").json() + release_info = requests.get(f"https://api.github.com/repos/{repo_user}/{repo_name}/releases").json()[0] + except requests.RequestException as e: + console.print(f"[red]Error accessing GitHub API: {e}") + return - last_version = json['name'] - down_count = json['assets'][0]['download_count'] + # Get start of the reposity + stargazers_count = repo_info['stargazers_count'] - if down_count > 0 and stargazers_count > 0: + # Find info about latest versione deploy and the donwload count + last_version = release_info['name'] + down_count = release_info['assets'][0]['download_count'] + + # Calculate percentual of start base on download count + if down_count > 0 and stargazers_count > 0: percentual_stars = round(stargazers_count / down_count * 100, 2) - else: + else: percentual_stars = 0 - if get_install_version() != last_version: - console.print(f"[red]=> A new version is available: [green]{json['zipball_url']}") - console.print(f"[red]=> New Version: [yellow]{json['name']}") - - else: - console.print(f"[red]=> Everything is up to date") - console.print(f"[red]=> You're on Version: [yellow]{json['name']}") + installed_version = get_install_version() + + # Check installed version + if installed_version != last_version: + console.print(f"[red]Version: [yellow]{last_version}") + else: + console.print(f"[red]Everything up to date") - print("\n") - console.print(f"[red]{repo_name} was downloaded [yellow]{down_count} [red]times, but only [yellow]{percentual_stars}% [red]of You(!!) have starred it. \n\ + console.print(f"[red]{repo_name} was downloaded [yellow]{down_count} [red]times, but only [yellow]{percentual_stars}% [red]of You(!!) have starred it.\n\ [cyan]Help the repository grow today, by leaving a [yellow]star [cyan]and [yellow]sharing [cyan]it to others online!") + time.sleep(3) - print("\n") \ No newline at end of file + print("\n") + diff --git a/Src/Util/config.py b/Src/Util/config.py index c92292ea..78ab8f53 100644 --- a/Src/Util/config.py +++ b/Src/Util/config.py @@ -1,22 +1,171 @@ -import json, os +# 29.01.24 + +import json +import os +from typing import Any, List class ConfigManager: - def __init__(self, file_path): + def __init__(self, file_path: str = 'config.json') -> None: + """Initialize the ConfigManager. + + Args: + file_path (str, optional): The path to the configuration file. Default is 'config.json'. + """ self.file_path = file_path + self.config = {} + self.cache = {} + + def read_config(self) -> None: + """Read the configuration file.""" + try: + if os.path.exists(self.file_path): + with open(self.file_path, 'r') as f: + self.config = json.load(f) + except Exception as e: + print(f"Error reading configuration file: {e}") + + def read_key(self, section: str, key: str, data_type: type = str) -> Any: + """Read a key from the configuration file. + + Args: + section (str): The section in the configuration file. + key (str): The key to be read. + data_type (type, optional): The expected data type of the key's value. Default is str. + + Returns: + The value of the key converted to the specified data type. + """ + cache_key = f"{section}.{key}" + if cache_key in self.cache: + return self.cache[cache_key] + if section in self.config and key in self.config[section]: + value = self.config[section][key] + else: + raise ValueError(f"Key '{key}' not found in section '{section}'") + value = self._convert_to_data_type(value, data_type) + self.cache[cache_key] = value + return value + + def _convert_to_data_type(self, value: str, data_type: type) -> Any: + """Convert the value to the specified data type. + + Args: + value (str): The value to be converted. + data_type (type): The expected data type. + + Returns: + The value converted to the specified data type. + """ + if data_type == int: + return int(value) + elif data_type == bool: + return bool(value) + elif data_type == list: + return value if isinstance(value, list) else [item.strip() for item in value.split(',')] + elif data_type == type(None): + return None + else: + return value + + def get(self, section: str, key: str) -> Any: + """Read a value from the configuration file. + + Args: + section (str): The section in the configuration file. + key (str): The key to be read. + + Returns: + The value associated with the key. + """ + return self.read_key(section, key) + + def get_int(self, section: str, key: str) -> int: + """Read an integer value from the configuration file. + + Args: + section (str): The section in the configuration file. + key (str): The key to be read. - def load_config(self): - with open(self.file_path, 'r') as file: - config_file = json.load(file) - return config_file + Returns: + int: The integer value. + """ + return self.read_key(section, key, int) + + def get_float(self, section: str, key: str) -> int: + """Read an float value from the configuration file. - def update_config(self, key, new_value): - config = self.load_config() - config[key] = new_value - with open(self.file_path, 'w') as file: - json.dump(config, file, indent=4) + Args: + section (str): The section in the configuration file. + key (str): The key to be read. + Returns: + float: The float value. + """ + return self.read_key(section, key, float) + + def get_bool(self, section: str, key: str) -> bool: + """Read a boolean value from the configuration file. + + Args: + section (str): The section in the configuration file. + key (str): The key to be read. + + Returns: + bool: The boolean value. + """ + return self.read_key(section, key, bool) + + def get_list(self, section: str, key: str) -> List[str]: + """Read a list value from the configuration file. + + Args: + section (str): The section in the configuration file. + key (str): The key to be read. + + Returns: + list: The list value. + """ + return self.read_key(section, key, list) + + def get_dict(self, section: str, key: str) -> dict: + """Read a dictionary value from the configuration file. + + Args: + section (str): The section in the configuration file. + key (str): The key to be read. + + Returns: + dict: The dictionary value. + """ + return self.read_key(section, key, dict) + + def set_key(self, section: str, key: str, value: Any) -> None: + """Set a key in the configuration file. + + Args: + section (str): The section in the configuration file. + key (str): The key to be set. + value (Any): The value to be associated with the key. + """ + try: + if section not in self.config: + self.config[section] = {} + self.config[section][key] = value + cache_key = f"{section}.{key}" + self.cache[cache_key] = value + self.write_config() + except Exception as e: + print(f"Error setting key '{key}' in section '{section}': {e}") + + def write_config(self) -> None: + """Write the configuration to the file.""" + try: + with open(self.file_path, 'w') as f: + json.dump(self.config, f, indent=4) + except Exception as e: + print(f"Error writing configuration file: {e}") -# Example usage: -config_path = os.path.join('config.json') -config_manager = ConfigManager(config_path) -config = config_manager.load_config() + +# Initialize +config_manager = ConfigManager() +config_manager.read_config() diff --git a/Src/Util/console.py b/Src/Util/console.py index 474364d5..eb5f0181 100644 --- a/Src/Util/console.py +++ b/Src/Util/console.py @@ -1,10 +1,9 @@ -# 17.09.2023 -> 3.12.23 +# 24.02.24 # Import from rich.console import Console from rich.prompt import Prompt -import logging # Variable msg = Prompt() -console = Console() +console = Console() \ No newline at end of file diff --git a/Src/Util/headers.py b/Src/Util/headers.py index 248d3e13..f5d18a39 100644 --- a/Src/Util/headers.py +++ b/Src/Util/headers.py @@ -1,4 +1,4 @@ -# 3.12.23 -> 10.12.23 +# 3.12.23 -> 10.12.23 -> 20.03.24 # Import import fake_useragent @@ -6,5 +6,13 @@ # Variable useragent = fake_useragent.UserAgent(use_external_data=True) -def get_headers(): +def get_headers() -> str: + """ + Generate a random user agent to use in HTTP requests. + + Returns: + - str: A random user agent string. + """ + + # Get a random user agent string from the user agent rotator return useragent.firefox \ No newline at end of file diff --git a/Src/Util/logger.py b/Src/Util/logger.py new file mode 100644 index 00000000..1788bcf7 --- /dev/null +++ b/Src/Util/logger.py @@ -0,0 +1,52 @@ +# 26.03.24 + +# Class import +from Src.Util.config import config_manager + +# Import +import logging +from logging.handlers import RotatingFileHandler + +class Logger: + def __init__(self): + """ + Initialize the Logger class. + """ + + # Fetching configuration values + self.DEBUG_MODE = config_manager.get_bool("DEFAULT", "debug") + self.log_to_file = config_manager.get_bool("DEFAULT", "log_to_file") + self.log_file = config_manager.get("DEFAULT", "log_file") if self.log_to_file else None + + # Setting logging level based on DEBUG_MODE + if self.DEBUG_MODE: + self.level = logging.DEBUG + + # Configure file logging if debug mode and logging to file are both enabled + if self.log_to_file: + self.configure_file_logging() + else: + + # If DEBUG_MODE is False, set logging level to ERROR + self.level = logging.ERROR + + # Configure console logging + self.configure_logging() + + def configure_logging(self): + """ + Configure console logging. + """ + logging.basicConfig(level=self.level, format='[%(filename)s:%(lineno)s - %(funcName)20s() ] %(asctime)s - %(levelname)s - %(message)s') + + def configure_file_logging(self): + """ + Configure file logging if enabled. + """ + + file_handler = RotatingFileHandler(self.log_file, maxBytes=10*1024*1024, backupCount=5) + + file_handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('[%(filename)s:%(lineno)s - %(funcName)20s() ] %(asctime)s - %(levelname)s - %(message)s') + file_handler.setFormatter(formatter) + logging.getLogger('').addHandler(file_handler) diff --git a/Src/Util/message.py b/Src/Util/message.py index 137fc697..1ea0754b 100644 --- a/Src/Util/message.py +++ b/Src/Util/message.py @@ -1,9 +1,30 @@ -# 3.12.23 +# 3.12.23 -> 19.07.24 -# Import +# Class import +from .config import config_manager from Src.Util.console import console -def msg_start(): +# Import +import os +import platform + +# Variable +CLEAN = config_manager.get_bool('DEFAULT', 'clean_console') +SHOW = config_manager.get_bool('DEFAULT', 'show_message') + +def get_os_system(): + """ + This function returns the name of the operating system. + """ + os_system = platform.system() + return os_system + +def start_message(): + """ + Display a start message. + + This function prints a formatted start message, including a title and creator information. + """ msg = """ @@ -18,4 +39,15 @@ def msg_start(): """ - console.print(f"[purple]{msg}") \ No newline at end of file + if CLEAN: + if get_os_system() == 'Windows': + os.system("cls") + else: + os.system("clear") + + if SHOW: + console.print(f"[bold yellow]{msg}") + console.print(f"[magenta]Created by: Ghost6446\n") + + row = "-" * console.width + console.print(f"[yellow]{row} \n") \ No newline at end of file diff --git a/Src/Util/os.py b/Src/Util/os.py index ddf87c38..9442a81a 100644 --- a/Src/Util/os.py +++ b/Src/Util/os.py @@ -1,17 +1,35 @@ # 24.01.24 # Import -import shutil, os, time +import shutil +import os +import time +import json +import hashlib +import logging + +def remove_folder(folder_path: str) -> None: + """ + Remove a folder if it exists. + + Parameters: + - folder_path (str): The path to the folder to be removed. + """ -def remove_folder(folder_path): if os.path.exists(folder_path): try: shutil.rmtree(folder_path) except OSError as e: print(f"Error removing folder '{folder_path}': {e}") +def remove_file(file_path: str) -> None: + """ + Remove a file if it exists -def remove_file(file_path): + Parameters: + - file_path (str): The path to the file to be removed. + """ + if os.path.exists(file_path): time.sleep(1) @@ -19,5 +37,160 @@ def remove_file(file_path): os.remove(file_path) except OSError as e: print(f"Error removing file '{file_path}': {e}") - else: - print(f"File '{file_path}' does not exist.") \ No newline at end of file + #else: + # print(f"File '{file_path}' does not exist.") + +def move_file_one_folder_up(file_path): + """ + Move a file one folder up from its current location. + + Args: + file_path (str): Path to the file to be moved. + + """ + + # Get the directory of the file + file_directory = os.path.dirname(file_path) + + # Get the parent directory + parent_directory = os.path.dirname(file_directory) + + # Get the filename + filename = os.path.basename(file_path) + + # New path for the file one folder up + new_path = os.path.join(parent_directory, filename) + + # Move the file + os.rename(file_path, new_path) + +def read_json(path: str): + """Reads JSON file and returns its content. + + Args: + path (str): The file path of the JSON file to read. + + Returns: + variable: The content of the JSON file as a dictionary. + """ + + with open(path, "r") as file: + config = json.load(file) + + return config + +def save_json(json_obj, path: str) -> (None): + """Saves JSON object to the specified file path. + + Args: + json_obj (Dict[str, Any]): The JSON object to be saved. + path (str): The file path where the JSON object will be saved. + """ + + with open(path, 'w') as file: + json.dump(json_obj, file, indent=4) # Adjust the indentation as needed + +def clean_json(path: str) -> (None): + """Reads JSON data from the file, cleans it, and saves it back. + + Args: + path (str): The file path of the JSON file to clean. + """ + + data = read_json(path) + + # Recursively replace all values with an empty string + def recursive_empty_string(obj): + if isinstance(obj, dict): + return {key: recursive_empty_string(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [recursive_empty_string(item) for item in obj] + else: + return "" + + modified_data = recursive_empty_string(data) + + # Save the modified JSON data back to the file + save_json(modified_data, path) + +def format_size(size_bytes: float): + """ + Format the size in bytes into a human-readable format. + + Args: + size_bytes (float): The size in bytes to be formatted. + + Returns: + str: The formatted size. + """ + + units = ['B', 'KB', 'MB', 'GB', 'TB'] + unit_index = 0 + + # Convert bytes to appropriate unit + while size_bytes >= 1024 and unit_index < len(units) - 1: + size_bytes /= 1024 + unit_index += 1 + + # Round the size to two decimal places and return with the appropriate unit + return f"{size_bytes:.2f} {units[unit_index]}" + +def compute_sha1_hash(input_string: str) -> (str): + """ + Computes the SHA-1 hash of the input string. + + Args: + input_string (str): The string to be hashed. + + Returns: + str: The SHA-1 hash of the input string. + """ + # Compute the SHA-1 hash + hashed_string = hashlib.sha1(input_string.encode()).hexdigest() + + # Return the hashed string + return hashed_string + +def decode_bytes(bytes_data: bytes, encodings_to_try: list[str] = None) -> (str): + """ + Decode a byte sequence using a list of encodings and return the decoded string. + + Args: + bytes_data (bytes): The byte sequence to decode. + encodings_to_try (List[str], optional): A list of encoding names to try for decoding. + If None, defaults to ['utf-8', 'latin-1', 'ascii']. + + Returns: + str or None: The decoded string if successful, None if decoding fails. + """ + if encodings_to_try is None: + encodings_to_try = ['utf-8', 'latin-1', 'ascii'] + + for encoding in encodings_to_try: + try: + # Attempt decoding with the current encoding + string_data = bytes_data.decode(encoding) + logging.info("Decoded successfully with encoding: %s", encoding) + logging.info("Decoded string: %s", string_data) + return string_data + except UnicodeDecodeError: + continue # Try the next encoding if decoding fails + + # If none of the encodings work, treat it as raw bytes + logging.warning("Unable to decode the data as text. Treating it as raw bytes.") + logging.info("Raw byte data: %s", bytes_data) + return None + +def convert_to_hex(bytes_data: bytes) -> str: + """ + Convert a byte sequence to its hexadecimal representation. + + Args: + bytes_data (bytes): The byte sequence to convert. + + Returns: + str: The hexadecimal representation of the byte sequence. + """ + hex_data = ''.join(['{:02x}'.format(char) for char in bytes_data]) + logging.info("Hexadecimal representation of the data: %s", hex_data) + return hex_data \ No newline at end of file diff --git a/Src/Util/table.py b/Src/Util/table.py new file mode 100644 index 00000000..52995d06 --- /dev/null +++ b/Src/Util/table.py @@ -0,0 +1,149 @@ +# 03.03.24 + +# Class import +from .message import start_message + +# Import +from rich.console import Console +from rich.table import Table +from rich.text import Text +from rich.prompt import Prompt +from rich.style import Style +from typing import Dict, List, Any + +class TVShowManager: + def __init__(self): + """ + Initialize TVShowManager with provided column information. + + Args: + column_info (Dict[str, Dict[str, str]]): Dictionary containing column names, their colors, and justification. + """ + self.console = Console() + self.tv_shows: List[Dict[str, Any]] = [] # List to store TV show data as dictionaries + self.slice_start: int = 0 + self.slice_end: int = 5 + self.step: int = self.slice_end + self.column_info = [] + + def set_slice_end(self, new_slice: int) -> None: + """ + Set the end of the slice for displaying TV shows. + + Args: + new_slice (int): The new value for the slice end. + """ + self.slice_end = new_slice + self.step = new_slice + + def add_column(self, column_info: Dict[str, Dict[str, str]]) -> None: + """ + Add column information. + + Args: + column_info (Dict[str, Dict[str, str]]): Dictionary containing column names, their colors, and justification. + """ + self.column_info = column_info + + def add_tv_show(self, tv_show: Dict[str, Any]): + """ + Add a TV show to the list of TV shows. + + Args: + tv_show (Dict[str, Any]): Dictionary containing TV show details. + """ + self.tv_shows.append(tv_show) + + def display_data(self, data_slice: List[Dict[str, Any]]): + """ + Display TV show data in a tabular format. + + Args: + data_slice (List[Dict[str, Any]]): List of dictionaries containing TV show details to display. + """ + table = Table(title=Text("Show Details", justify="center", style="bold magenta"), border_style="white") + + # Add columns dynamically based on provided column information + for col_name, col_style in self.column_info.items(): + color = col_style.get("color", None) + if color: + style = Style(color=color) + else: + style = None + table.add_column(col_name, style=style, justify='center') + + # Add rows dynamically based on available TV show data + for entry in data_slice: + row_data = [entry[col_name] for col_name in self.column_info.keys()] + table.add_row(*row_data) + + self.console.print(table) # Use self.console.print instead of print + + def run(self, force_int_input: bool = False, max_int_input: int = 0) -> str: + """ + Run the TV show manager application. + + Args: + - force_int_input(bool): If True, only accept integer inputs from 0 to max_int_input + - max_int_input (int): + + Returns: + - str: Last command executed before breaking out of the loop. + """ + total_items = len(self.tv_shows) + last_command = "" # Variable to store the last command executed + + while True: + start_message() + + # Display table + self.display_data(self.tv_shows[self.slice_start:self.slice_end]) + + # Handling user input for loading more items or quitting + if self.slice_end < total_items: + self.console.print(f"\n\n[yellow][INFO] [green]Press [red]Enter [green]to restart, or [red]'q' [green]to quit.") + + if not force_int_input: + key = Prompt.ask("[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media") + else: + choices = [str(i) for i in range(0, max_int_input)] + choices.append("") + + key = Prompt.ask("[cyan]Insert media [red]index", choices=choices, show_choices=False) + last_command = key + + if key.lower() == "q": + break + + elif key == "": + self.slice_start += self.slice_end + self.slice_end += self.slice_end + if self.slice_end > total_items: + self.slice_end = total_items + + else: + break + + else: + self.console.print(f"\n\n[yellow][INFO] [red]You've reached the end. [green]Press [red]Enter [green]to restart, or [red]'q' [green]to quit.") + + if not force_int_input: + key = Prompt.ask("[cyan]Insert media [red]index [yellow]or [red](*) [cyan]to download all media [yellow]or [red][1-2] [cyan]for a range of media") + else: + choices = [str(i) for i in range(0, max_int_input)] + choices.append("") + + key = Prompt.ask("[cyan]Insert media [red]index", choices=choices, show_choices=False) + last_command = key + + if key.lower() == "q": + break + + elif key == "": + self.slice_start = 0 + self.slice_end = self.step + + else: + break + + return last_command diff --git a/config.json b/config.json index 6e93a615..fcf21404 100644 --- a/config.json +++ b/config.json @@ -1,10 +1,55 @@ { - "root_path": "videos", - "movies_folder_name": "Movies", - "series_folder_name": "Series", - "download_subtitles": true, - "download_default_language": true, - "selected_language": "English", - "max_worker": 20, - "domain": "forum" + "DEFAULT": { + "debug": false, + "log_file": "debug.txt", + "log_to_file": true, + "get_info": false, + "show_message": true, + "clean_console": true, + "bypass_ffmpeg": true, + "bypass_github": true, + "get_moment_title": false, + "root_path": "Video", + "movies_folder_name": "Movies", + "series_folder_name": "Series", + "anime_folder_name": "Anime", + "not_close": false, + "swith_anime": false + }, + "SITE": { + "streaming_site_name": "streamingcommunity", + "streaming_domain": "forum", + "anime_site_name": "animeunity", + "anime_domain": "to" + }, + "M3U8": { + "tdqm_workers": 20, + "tqdm_progress_timeout": 10, + "minium_ts_files_in_folder": 15, + "donwload_percentage": 1, + "requests_timeout": 5, + "enable_time_quit": false, + "tqdm_show_progress": false, + "cleanup_tmp_folder": true + }, + "M3U8_OPTIONS": { + "download_audio": true, + "download_subtitles": true, + "specific_list_audio": [ + "ita" + ], + "specific_list_subtitles": [ + "eng" + ], + "request": { + "index": { + "authority": "vixcloud.co", + "user-agent": "" + }, + "segments": { + "Origin": "https://vixcloud.co", + "user-agent": "" + } + } + } } \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index ecde4356..78557503 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/run.py b/run.py index ba5b444f..bd33eae5 100644 --- a/run.py +++ b/run.py @@ -1,131 +1,166 @@ -# 10.12.23 -> 1.02.24 +# 10.12.23 -> 31.01.24 # Class -import Src.Api.page as Page -from Src.Api.film import main_dw_film as download_film -from Src.Api.tv import main_dw_tv as download_tv -from Src.Util.message import msg_start +from Src.Api import ( + get_version_and_domain, + download_series, + download_film, + search, + anime_search, + anime_download_series, + anime_download_film, + get_select_title +) +from Src.Util.message import start_message from Src.Util.console import console, msg -from Src.Util.os import remove_folder -from Src.Upload.update import main_update -from Src.Lib.FFmpeg.installer import check_ffmpeg +from Src.Util.config import config_manager +from Src.Util.os import remove_folder, remove_file +from Src.Upload.update import update as git_update +from Src.Lib.FFmpeg import check_ffmpeg +from Src.Util.logger import Logger # Import -import sys, platform +import sys +import logging +import platform +# Variable +DEBUG_MODE = config_manager.get_bool("DEFAULT", "debug") +DEBUG_GET_ALL_INFO = config_manager.get_bool('DEFAULT', 'get_info') +SWITCH_TO = config_manager.get_bool('DEFAULT', 'swith_anime') +CLOSE_CONSOLE = config_manager.get_bool('DEFAULT', 'not_close') + + +# [ main ] def initialize(): """ - Initializes the application by performing necessary setup tasks. + Initialize the application. + Checks Python version, removes temporary folder, and displays start message. """ # Get system where script is run run_system = platform.system() - # Checking Python version + # Enable debug with info + if DEBUG_MODE: + logging.basicConfig(level=logging.DEBUG) + logging.getLogger('root').setLevel(logging.INFO) + else: + logging.basicConfig(level=logging.ERROR) + logging.getLogger('root').setLevel(logging.ERROR) + + if sys.version_info < (3, 11): console.log("Install python version > 3.11") sys.exit(0) # Removing temporary folder remove_folder("tmp") - msg_start() + remove_file("debug.log") + start_message() + # Attempting GitHub update try: - # Updating application - main_update() + git_update() except Exception as e: - console.print(f"[blue]Request GitHub [white]=> [red]Failed: {e}") + console.print(f"[blue]Req github [white]=> [red]Failed: {e}") - # Checking FFmpeg installation - if run_system != 'Windows': + # Checking ffmpeg availability ( only win ) + if run_system == 'Windows': check_ffmpeg() + +def main(): + """ + Main function of the application. + """ + + # Get site domain and version + initialize() + site_version, domain = get_version_and_domain() + + # Make request to site to get content that corrsisponde to that string + film_search = msg.ask("\n[cyan]Insert word to search in all site: ").strip() + len_database = search(film_search, domain) + + if len_database != 0: + + # Select title from list + select_title = get_select_title() + + # For series + if select_title.type == 'tv': + download_series( + tv_id=select_title.id, + tv_name=select_title.slug, + version=site_version, + domain=domain + ) + + # For film + else: + download_film( + id_film=select_title.id, + title_name=select_title.slug, + domain=domain + ) - print("\n") + # If no media find + else: + console.print("[red]Cant find a single element") + # End + console.print("\n[red]Done") -def main(): +def main_switch(): """ - Main function to execute the application logic. + Main function for anime unity """ - # Initializing the application + # Get site domain and version initialize() - # Retrieving domain and site version - domain, site_version = Page.domain_version() - - # Searching for movie or TV series title - film_search = msg.ask("\n[blue]Search for any Movie or TV Series title").strip() - db_title = Page.search(film_search, domain) - Page.display_search_results(db_title) - - if db_title: - - # Displaying total results - console.print(f"\n[blue]Total result: {len(db_title)}") - - # Asking user to select title(s) to download - console.print( - "\n[green]Insert [yellow]INDEX [red]number[green], or [red][1-2] [green]for a range of movies/tv series, or [red][1,3,5] [green]to select discontinued movie/tv series" - ) - console.print("\n[red]In case of a TV Series you will also choose seasons and episodes to download") - index_select = str(msg.ask("\n[blue]Select [yellow]INDEX [blue]to download")).strip() - - # For only number ( to fix ) - if index_select.isnumeric(): - index_select = int(index_select) - if 0 <= index_select <= len(db_title) - 1: - selected_title = db_title[index_select] - - if selected_title['type'] == "movie": - console.print(f"[green]\nSelected Movie: {selected_title['name']}") - download_film(selected_title['id'], selected_title['slug'], domain) - else: - console.print(f"[green]\nSelected TV Series: {selected_title['name']}") - download_tv(selected_title['id'], selected_title['slug'], site_version, domain) - else: - console.print("[red]Wrong INDEX for selection") - - # For range like [5-15] ( to fix ) - elif "[" in index_select: - if "-" in index_select: - start, end = map(int, index_select[1:-1].split('-')) - result = list(range(start, end + 1)) - for n in result: - selected_title = db_title[n] - if selected_title['type'] == "movie": - console.print(f"[green]\nSelected Movie: {selected_title['name']}") - download_film(selected_title['id'], selected_title['slug'], domain) - else: - console.print(f"[green]\nSelected TV Series: {selected_title['name']}") - download_tv(selected_title['id'], selected_title['slug'], site_version, domain) - - # For a list of specific ( to fix ) - elif "," in index_select: - result = list(map(int, index_select[1:-1].split(','))) - for n in result: - selected_title = db_title[n] - if selected_title['type'] == "movie": - console.print(f"[green]\nSelected Movie: {selected_title['name']}") - download_film(selected_title['id'], selected_title['slug'], domain) - else: - console.print(f"[green]\nSelected TV Series: {selected_title['name']}") - download_tv(selected_title['id'], selected_title['slug'], site_version, domain) - else: - console.print("[red]Wrong INDEX for selection") + # Make request to site to get content that corrsisponde to that string + film_search = msg.ask("\n[cyan]Insert word to search in all site: ").strip() + len_database = anime_search(film_search) + + if len_database != 0: + + # Select title from list + select_title = get_select_title() + + # For series + if select_title.type == 'TV': + anime_download_series( + tv_id=select_title.id, + tv_name=select_title.slug + ) + + # For film + else: + anime_download_film( + id_film=select_title.id, + title_name=select_title.slug + ) + + # If no media find else: - console.print("[red]Couldn't find any entries for the selected title") + console.print("[red]Cant find a single element") - console.print("[red]Done!") if __name__ == '__main__': - main() + logger = Logger() - while 1: - cmd_insert = str(msg.ask("[red]Quit the script ? [red][[yellow]yes[red] / [yellow]no[red]]")) + if not SWITCH_TO: + if not CLOSE_CONSOLE: + main() + else: + while 1: + main() - if cmd_insert in ['y', 'yes', 'ye']: - break + else: + if not CLOSE_CONSOLE: + main_switch() else: - main() + while 1: + main_switch() diff --git a/update.py b/update.py index 24c2a31c..3409cc50 100644 --- a/update.py +++ b/update.py @@ -1,6 +1,9 @@ # 10.12.24 -import requests, os, shutil +# General imports +import requests +import os +import shutil from zipfile import ZipFile from io import BytesIO from rich.console import Console @@ -9,86 +12,70 @@ console = Console() local_path = os.path.join(".") -def move_content(source: str, destination: str) -> None: +def move_content(source: str, destination: str) : """ - Recursively moves content from source directory to destination directory. + Move all content from the source folder to the destination folder. Args: - source (str): Path to the source directory. - destination (str): Path to the destination directory. - - Returns: - None + source (str): The path to the source folder. + destination (str): The path to the destination folder. """ + os.makedirs(destination, exist_ok=True) + + # Iterate through all elements in the source folder for element in os.listdir(source): source_path = os.path.join(source, element) destination_path = os.path.join(destination, element) + + # If it's a directory, recursively call the function if os.path.isdir(source_path): move_content(source_path, destination_path) + + # Otherwise, move the file, replacing if it already exists else: shutil.move(source_path, destination_path) -def delete_files_folders(main_directory_path: str, folders_to_exclude: list = [], files_to_exclude: list = []) -> None: +def keep_specific_items(directory: str, keep_folder: str, keep_file: str): """ - Deletes files and folders from the specified directory except those specified. + Delete all items in the directory except for the specified folder and file. Args: - main_directory_path (str): Path to the main directory. - folders_to_exclude (list): List of folder names to exclude from deletion. - files_to_exclude (list): List of file names to exclude from deletion. - - Returns: - None - """ - for root, dirs, files in os.walk(main_directory_path, topdown=False): - for name in files: - file_path = os.path.join(root, name) - if name not in files_to_exclude: - try: - os.remove(file_path) - except: - pass - for name in dirs: - dir_path = os.path.join(root, name) - if name not in folders_to_exclude: - try: - os.rmdir(dir_path) - except: - pass - -def list_files_and_folders(directory: str, files_to_remove: list = []) -> None: + directory (str): The path to the directory. + keep_folder (str): The name of the folder to keep. + keep_file (str): The name of the file to keep. """ - Lists files and folders in the specified directory and removes those specified. - Args: - directory (str): Path to the directory to list files and folders. - files_to_remove (list): List of file names to remove. - - Returns: - None - """ try: - for root, dirs, files in os.walk(directory): - for file_name in files: - file_path = os.path.join(root, file_name) - if file_name in files_to_remove: - os.remove(file_path) + if not os.path.exists(directory) or not os.path.isdir(directory): + raise ValueError(f"Error: '{directory}' is not a valid directory.") + + # Iterate through items in the directory + for item in os.listdir(directory): + item_path = os.path.join(directory, item) + + # Check if the item is the specified folder or file + if os.path.isdir(item_path) and item != keep_folder: + shutil.rmtree(item_path) + elif os.path.isfile(item_path) and item != keep_file: + os.remove(item_path) + + except PermissionError as pe: + print(f"PermissionError: {pe}. Check permissions and try running the script with admin privileges.") + except Exception as e: - print(f"Error occurred: {e}") + print(f"Error: {e}") -def download_and_extract_latest_commit(author: str, repo_name: str, exclude_files: list) -> None: +def download_and_extract_latest_commit(author: str, repo_name: str): """ - Downloads and extracts the latest commit from a GitHub repository. + Download and extract the latest commit from a GitHub repository. Args: - author (str): The username of the repository owner. - repo_name (str): The name of the repository. - exclude_files (list): List of file names to exclude from extraction. - - Returns: - None + author (str): The owner of the GitHub repository. + repo_name (str): The name of the GitHub repository. """ + + # Get the latest commit information using GitHub API api_url = f'https://api.github.com/repos/{author}/{repo_name}/commits?per_page=1' response = requests.get(api_url) console.log("[green]Making a request to GitHub repository...") @@ -98,48 +85,54 @@ def download_and_extract_latest_commit(author: str, repo_name: str, exclude_file commit_sha = commit_info['sha'] zipball_url = f'https://github.com/{author}/{repo_name}/archive/{commit_sha}.zip' console.log("[green]Getting zip file from repository...") + + # Download the zipball response = requests.get(zipball_url) + # Extract the content of the zipball into a temporary folder temp_path = os.path.join(os.path.dirname(os.getcwd()), 'temp_extracted') with ZipFile(BytesIO(response.content)) as zip_ref: zip_ref.extractall(temp_path) console.log("[green]Extracting file ...") - list_files_and_folders(temp_path, exclude_files) - + # Move files from the temporary folder to the current folder for item in os.listdir(temp_path): item_path = os.path.join(temp_path, item) destination_path = os.path.join(local_path, item) shutil.move(item_path, destination_path) + # Remove the temporary folder shutil.rmtree(temp_path) + + # Move all folder to main folder new_folder_name = f"{repo_name}-{commit_sha}" move_content(new_folder_name, ".") + + # Remove old temp folder shutil.rmtree(new_folder_name) + console.log(f"[cyan]Latest commit downloaded and extracted successfully.") else: console.log(f"[red]Failed to fetch commit information. Status code: {response.status_code}") -def main_upload() -> None: +def main_upload(): """ - Main function to upload the latest changes from a GitHub repository. - - Returns: - None + Main function to upload the latest commit of a GitHub repository. """ + repository_owner = 'Ghost6446' repository_name = 'StreamingCommunity_api' cmd_insert = input("Are you sure you want to delete all files? (Only videos folder will remain) [yes/no]: ") - if cmd_insert.lower() == "yes" or cmd_insert.lower() == "y": - delete_files_folders( - main_directory_path=".", - folders_to_exclude=["videos"], - files_to_exclude=["upload.py", "config.json"] - ) - download_and_extract_latest_commit(repository_owner, repository_name, ["config.json"]) + if cmd_insert == "yes": + + # Remove all old file + keep_specific_items(".", "videos", "upload.py") + + download_and_extract_latest_commit(repository_owner, repository_name) main_upload() +# win # pyinstaller --onefile --add-data "./Src/upload/__version__.py;Src/upload" run.py