From 61c0c584ab994e088197f0103a5a0895cf2f0c47 Mon Sep 17 00:00:00 2001 From: t3tra-dev Date: Fri, 3 Jan 2025 20:50:21 +0900 Subject: [PATCH] refactor: split _markitdown.py into modular components --- src/markitdown/__init__.py | 3 +- src/markitdown/__main__.py | 3 +- src/markitdown/_markitdown.py | 1515 ------------------------- src/markitdown/converters/__init__.py | 3 + src/markitdown/converters/archive.py | 135 +++ src/markitdown/converters/base.py | 18 + src/markitdown/converters/document.py | 195 ++++ src/markitdown/converters/media.py | 246 ++++ src/markitdown/converters/text.py | 234 ++++ src/markitdown/converters/web.py | 314 +++++ src/markitdown/core.py | 409 +++++++ src/markitdown/exceptions.py | 6 + 12 files changed, 1564 insertions(+), 1517 deletions(-) delete mode 100644 src/markitdown/_markitdown.py create mode 100644 src/markitdown/converters/__init__.py create mode 100644 src/markitdown/converters/archive.py create mode 100644 src/markitdown/converters/base.py create mode 100644 src/markitdown/converters/document.py create mode 100644 src/markitdown/converters/media.py create mode 100644 src/markitdown/converters/text.py create mode 100644 src/markitdown/converters/web.py create mode 100644 src/markitdown/core.py create mode 100644 src/markitdown/exceptions.py diff --git a/src/markitdown/__init__.py b/src/markitdown/__init__.py index 482f428..e446072 100644 --- a/src/markitdown/__init__.py +++ b/src/markitdown/__init__.py @@ -2,7 +2,8 @@ # # SPDX-License-Identifier: MIT -from ._markitdown import MarkItDown, FileConversionException, UnsupportedFormatException +from .core import (FileConversionException, MarkItDown, + UnsupportedFormatException) __all__ = [ "MarkItDown", diff --git a/src/markitdown/__main__.py b/src/markitdown/__main__.py index b6cf963..7667ff8 100644 --- a/src/markitdown/__main__.py +++ b/src/markitdown/__main__.py @@ -4,8 +4,9 @@ import argparse import sys from textwrap import dedent + from .__about__ import __version__ -from ._markitdown import MarkItDown, DocumentConverterResult +from .core import DocumentConverterResult, MarkItDown def main(): diff --git a/src/markitdown/_markitdown.py b/src/markitdown/_markitdown.py deleted file mode 100644 index 789c1e5..0000000 --- a/src/markitdown/_markitdown.py +++ /dev/null @@ -1,1515 +0,0 @@ -# type: ignore -import base64 -import binascii -import copy -import html -import json -import mimetypes -import os -import re -import shutil -import subprocess -import sys -import tempfile -import traceback -import zipfile -from xml.dom import minidom -from typing import Any, Dict, List, Optional, Union -from pathlib import Path -from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse -from warnings import warn, resetwarnings, catch_warnings - -import mammoth -import markdownify -import pandas as pd -import pdfminer -import pdfminer.high_level -import pptx - -# File-format detection -import puremagic -import requests -from bs4 import BeautifulSoup -from charset_normalizer import from_path - -# Optional Transcription support -try: - # Using warnings' catch_warnings to catch - # pydub's warning of ffmpeg or avconv missing - with catch_warnings(record=True) as w: - import pydub - - if w: - raise ModuleNotFoundError - import speech_recognition as sr - - IS_AUDIO_TRANSCRIPTION_CAPABLE = True -except ModuleNotFoundError: - pass -finally: - resetwarnings() - -# Optional YouTube transcription support -try: - from youtube_transcript_api import YouTubeTranscriptApi - - IS_YOUTUBE_TRANSCRIPT_CAPABLE = True -except ModuleNotFoundError: - pass - - -class _CustomMarkdownify(markdownify.MarkdownConverter): - """ - A custom version of markdownify's MarkdownConverter. Changes include: - - - Altering the default heading style to use '#', '##', etc. - - Removing javascript hyperlinks. - - Truncating images with large data:uri sources. - - Ensuring URIs are properly escaped, and do not conflict with Markdown syntax - """ - - def __init__(self, **options: Any): - options["heading_style"] = options.get("heading_style", markdownify.ATX) - # Explicitly cast options to the expected type if necessary - super().__init__(**options) - - def convert_hn(self, n: int, el: Any, text: str, convert_as_inline: bool) -> str: - """Same as usual, but be sure to start with a new line""" - if not convert_as_inline: - if not re.search(r"^\n", text): - return "\n" + super().convert_hn(n, el, text, convert_as_inline) # type: ignore - - return super().convert_hn(n, el, text, convert_as_inline) # type: ignore - - def convert_a(self, el: Any, text: str, convert_as_inline: bool): - """Same as usual converter, but removes Javascript links and escapes URIs.""" - prefix, suffix, text = markdownify.chomp(text) # type: ignore - if not text: - return "" - href = el.get("href") - title = el.get("title") - - # Escape URIs and skip non-http or file schemes - if href: - try: - parsed_url = urlparse(href) # type: ignore - if parsed_url.scheme and parsed_url.scheme.lower() not in ["http", "https", "file"]: # type: ignore - return "%s%s%s" % (prefix, text, suffix) - href = urlunparse(parsed_url._replace(path=quote(unquote(parsed_url.path)))) # type: ignore - except ValueError: # It's not clear if this ever gets thrown - return "%s%s%s" % (prefix, text, suffix) - - # For the replacement see #29: text nodes underscores are escaped - if ( - self.options["autolinks"] - and text.replace(r"\_", "_") == href - and not title - and not self.options["default_title"] - ): - # Shortcut syntax - return "<%s>" % href - if self.options["default_title"] and not title: - title = href - title_part = ' "%s"' % title.replace('"', r"\"") if title else "" - return ( - "%s[%s](%s%s)%s" % (prefix, text, href, title_part, suffix) - if href - else text - ) - - def convert_img(self, el: Any, text: str, convert_as_inline: bool) -> str: - """Same as usual converter, but removes data URIs""" - - alt = el.attrs.get("alt", None) or "" - src = el.attrs.get("src", None) or "" - title = el.attrs.get("title", None) or "" - title_part = ' "%s"' % title.replace('"', r"\"") if title else "" - if ( - convert_as_inline - and el.parent.name not in self.options["keep_inline_images_in"] - ): - return alt - - # Remove dataURIs - if src.startswith("data:"): - src = src.split(",")[0] + "..." - - return "![%s](%s%s)" % (alt, src, title_part) - - def convert_soup(self, soup: Any) -> str: - return super().convert_soup(soup) # type: ignore - - -class DocumentConverterResult: - """The result of converting a document to text.""" - - def __init__(self, title: Union[str, None] = None, text_content: str = ""): - self.title: Union[str, None] = title - self.text_content: str = text_content - - -class DocumentConverter: - """Abstract superclass of all DocumentConverters.""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - raise NotImplementedError() - - -class PlainTextConverter(DocumentConverter): - """Anything with content type text/plain""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Guess the content type from any file extension that might be around - content_type, _ = mimetypes.guess_type( - "__placeholder" + kwargs.get("file_extension", "") - ) - - # Only accept text files - if content_type is None: - return None - elif "text/" not in content_type.lower(): - return None - - text_content = str(from_path(local_path).best()) - return DocumentConverterResult( - title=None, - text_content=text_content, - ) - - -class HtmlConverter(DocumentConverter): - """Anything with content type text/html""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not html - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".html", ".htm"]: - return None - - result = None - with open(local_path, "rt", encoding="utf-8") as fh: - result = self._convert(fh.read()) - - return result - - def _convert(self, html_content: str) -> Union[None, DocumentConverterResult]: - """Helper function that converts and HTML string.""" - - # Parse the string - soup = BeautifulSoup(html_content, "html.parser") - - # Remove javascript and style blocks - for script in soup(["script", "style"]): - script.extract() - - # Print only the main content - body_elm = soup.find("body") - webpage_text = "" - if body_elm: - webpage_text = _CustomMarkdownify().convert_soup(body_elm) - else: - webpage_text = _CustomMarkdownify().convert_soup(soup) - - assert isinstance(webpage_text, str) - - return DocumentConverterResult( - title=None if soup.title is None else soup.title.string, - text_content=webpage_text, - ) - - -class RSSConverter(DocumentConverter): - """Convert RSS / Atom type to markdown""" - - def convert( - self, local_path: str, **kwargs - ) -> Union[None, DocumentConverterResult]: - # Bail if not RSS type - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".xml", ".rss", ".atom"]: - return None - try: - doc = minidom.parse(local_path) - except BaseException as _: - return None - result = None - if doc.getElementsByTagName("rss"): - # A RSS feed must have a root element of - result = self._parse_rss_type(doc) - elif doc.getElementsByTagName("feed"): - root = doc.getElementsByTagName("feed")[0] - if root.getElementsByTagName("entry"): - # An Atom feed must have a root element of and at least one - result = self._parse_atom_type(doc) - else: - return None - else: - # not rss or atom - return None - - return result - - def _parse_atom_type( - self, doc: minidom.Document - ) -> Union[None, DocumentConverterResult]: - """Parse the type of an Atom feed. - - Returns None if the feed type is not recognized or something goes wrong. - """ - try: - root = doc.getElementsByTagName("feed")[0] - title = self._get_data_by_tag_name(root, "title") - subtitle = self._get_data_by_tag_name(root, "subtitle") - entries = root.getElementsByTagName("entry") - md_text = f"# {title}\n" - if subtitle: - md_text += f"{subtitle}\n" - for entry in entries: - entry_title = self._get_data_by_tag_name(entry, "title") - entry_summary = self._get_data_by_tag_name(entry, "summary") - entry_updated = self._get_data_by_tag_name(entry, "updated") - entry_content = self._get_data_by_tag_name(entry, "content") - - if entry_title: - md_text += f"\n## {entry_title}\n" - if entry_updated: - md_text += f"Updated on: {entry_updated}\n" - if entry_summary: - md_text += self._parse_content(entry_summary) - if entry_content: - md_text += self._parse_content(entry_content) - - return DocumentConverterResult( - title=title, - text_content=md_text, - ) - except BaseException as _: - return None - - def _parse_rss_type( - self, doc: minidom.Document - ) -> Union[None, DocumentConverterResult]: - """Parse the type of an RSS feed. - - Returns None if the feed type is not recognized or something goes wrong. - """ - try: - root = doc.getElementsByTagName("rss")[0] - channel = root.getElementsByTagName("channel") - if not channel: - return None - channel = channel[0] - channel_title = self._get_data_by_tag_name(channel, "title") - channel_description = self._get_data_by_tag_name(channel, "description") - items = channel.getElementsByTagName("item") - if channel_title: - md_text = f"# {channel_title}\n" - if channel_description: - md_text += f"{channel_description}\n" - if not items: - items = [] - for item in items: - title = self._get_data_by_tag_name(item, "title") - description = self._get_data_by_tag_name(item, "description") - pubDate = self._get_data_by_tag_name(item, "pubDate") - content = self._get_data_by_tag_name(item, "content:encoded") - - if title: - md_text += f"\n## {title}\n" - if pubDate: - md_text += f"Published on: {pubDate}\n" - if description: - md_text += self._parse_content(description) - if content: - md_text += self._parse_content(content) - - return DocumentConverterResult( - title=channel_title, - text_content=md_text, - ) - except BaseException as _: - print(traceback.format_exc()) - return None - - def _parse_content(self, content: str) -> str: - """Parse the content of an RSS feed item""" - try: - # using bs4 because many RSS feeds have HTML-styled content - soup = BeautifulSoup(content, "html.parser") - return _CustomMarkdownify().convert_soup(soup) - except BaseException as _: - return content - - def _get_data_by_tag_name( - self, element: minidom.Element, tag_name: str - ) -> Union[str, None]: - """Get data from first child element with the given tag name. - Returns None when no such element is found. - """ - nodes = element.getElementsByTagName(tag_name) - if not nodes: - return None - fc = nodes[0].firstChild - if fc: - return fc.data - return None - - -class WikipediaConverter(DocumentConverter): - """Handle Wikipedia pages separately, focusing only on the main document content.""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not Wikipedia - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".html", ".htm"]: - return None - url = kwargs.get("url", "") - if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url): - return None - - # Parse the file - soup = None - with open(local_path, "rt", encoding="utf-8") as fh: - soup = BeautifulSoup(fh.read(), "html.parser") - - # Remove javascript and style blocks - for script in soup(["script", "style"]): - script.extract() - - # Print only the main content - body_elm = soup.find("div", {"id": "mw-content-text"}) - title_elm = soup.find("span", {"class": "mw-page-title-main"}) - - webpage_text = "" - main_title = None if soup.title is None else soup.title.string - - if body_elm: - # What's the title - if title_elm and len(title_elm) > 0: - main_title = title_elm.string # type: ignore - assert isinstance(main_title, str) - - # Convert the page - webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup( - body_elm - ) - else: - webpage_text = _CustomMarkdownify().convert_soup(soup) - - return DocumentConverterResult( - title=main_title, - text_content=webpage_text, - ) - - -class YouTubeConverter(DocumentConverter): - """Handle YouTube specially, focusing on the video title, description, and transcript.""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not YouTube - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".html", ".htm"]: - return None - url = kwargs.get("url", "") - if not url.startswith("https://www.youtube.com/watch?"): - return None - - # Parse the file - soup = None - with open(local_path, "rt", encoding="utf-8") as fh: - soup = BeautifulSoup(fh.read(), "html.parser") - - # Read the meta tags - assert soup.title is not None and soup.title.string is not None - metadata: Dict[str, str] = {"title": soup.title.string} - for meta in soup(["meta"]): - for a in meta.attrs: - if a in ["itemprop", "property", "name"]: - metadata[meta[a]] = meta.get("content", "") - break - - # We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation - try: - for script in soup(["script"]): - content = script.text - if "ytInitialData" in content: - lines = re.split(r"\r?\n", content) - obj_start = lines[0].find("{") - obj_end = lines[0].rfind("}") - if obj_start >= 0 and obj_end >= 0: - data = json.loads(lines[0][obj_start : obj_end + 1]) - attrdesc = self._findKey(data, "attributedDescriptionBodyText") # type: ignore - if attrdesc: - metadata["description"] = str(attrdesc["content"]) - break - except Exception: - pass - - # Start preparing the page - webpage_text = "# YouTube\n" - - title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore - assert isinstance(title, str) - - if title: - webpage_text += f"\n## {title}\n" - - stats = "" - views = self._get(metadata, ["interactionCount"]) # type: ignore - if views: - stats += f"- **Views:** {views}\n" - - keywords = self._get(metadata, ["keywords"]) # type: ignore - if keywords: - stats += f"- **Keywords:** {keywords}\n" - - runtime = self._get(metadata, ["duration"]) # type: ignore - if runtime: - stats += f"- **Runtime:** {runtime}\n" - - if len(stats) > 0: - webpage_text += f"\n### Video Metadata\n{stats}\n" - - description = self._get(metadata, ["description", "og:description"]) # type: ignore - if description: - webpage_text += f"\n### Description\n{description}\n" - - if IS_YOUTUBE_TRANSCRIPT_CAPABLE: - transcript_text = "" - parsed_url = urlparse(url) # type: ignore - params = parse_qs(parsed_url.query) # type: ignore - if "v" in params: - assert isinstance(params["v"][0], str) - video_id = str(params["v"][0]) - try: - youtube_transcript_languages = kwargs.get( - "youtube_transcript_languages", ("en",) - ) - # Must be a single transcript. - transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=youtube_transcript_languages) # type: ignore - transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore - # Alternative formatting: - # formatter = TextFormatter() - # formatter.format_transcript(transcript) - except Exception: - pass - if transcript_text: - webpage_text += f"\n### Transcript\n{transcript_text}\n" - - title = title if title else soup.title.string - assert isinstance(title, str) - - return DocumentConverterResult( - title=title, - text_content=webpage_text, - ) - - def _get( - self, - metadata: Dict[str, str], - keys: List[str], - default: Union[str, None] = None, - ) -> Union[str, None]: - for k in keys: - if k in metadata: - return metadata[k] - return default - - def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type - if isinstance(json, list): - for elm in json: - ret = self._findKey(elm, key) - if ret is not None: - return ret - elif isinstance(json, dict): - for k in json: - if k == key: - return json[k] - else: - ret = self._findKey(json[k], key) - if ret is not None: - return ret - return None - - -class IpynbConverter(DocumentConverter): - """Converts Jupyter Notebook (.ipynb) files to Markdown.""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not ipynb - extension = kwargs.get("file_extension", "") - if extension.lower() != ".ipynb": - return None - - # Parse and convert the notebook - result = None - with open(local_path, "rt", encoding="utf-8") as fh: - notebook_content = json.load(fh) - result = self._convert(notebook_content) - - return result - - def _convert(self, notebook_content: dict) -> Union[None, DocumentConverterResult]: - """Helper function that converts notebook JSON content to Markdown.""" - try: - md_output = [] - title = None - - for cell in notebook_content.get("cells", []): - cell_type = cell.get("cell_type", "") - source_lines = cell.get("source", []) - - if cell_type == "markdown": - md_output.append("".join(source_lines)) - - # Extract the first # heading as title if not already found - if title is None: - for line in source_lines: - if line.startswith("# "): - title = line.lstrip("# ").strip() - break - - elif cell_type == "code": - # Code cells are wrapped in Markdown code blocks - md_output.append(f"```python\n{''.join(source_lines)}\n```") - elif cell_type == "raw": - md_output.append(f"```\n{''.join(source_lines)}\n```") - - md_text = "\n\n".join(md_output) - - # Check for title in notebook metadata - title = notebook_content.get("metadata", {}).get("title", title) - - return DocumentConverterResult( - title=title, - text_content=md_text, - ) - - except Exception as e: - raise FileConversionException( - f"Error converting .ipynb file: {str(e)}" - ) from e - - -class BingSerpConverter(DocumentConverter): - """ - Handle Bing results pages (only the organic search results). - NOTE: It is better to use the Bing API - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a Bing SERP - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".html", ".htm"]: - return None - url = kwargs.get("url", "") - if not re.search(r"^https://www\.bing\.com/search\?q=", url): - return None - - # Parse the query parameters - parsed_params = parse_qs(urlparse(url).query) - query = parsed_params.get("q", [""])[0] - - # Parse the file - soup = None - with open(local_path, "rt", encoding="utf-8") as fh: - soup = BeautifulSoup(fh.read(), "html.parser") - - # Clean up some formatting - for tptt in soup.find_all(class_="tptt"): - if hasattr(tptt, "string") and tptt.string: - tptt.string += " " - for slug in soup.find_all(class_="algoSlug_icon"): - slug.extract() - - # Parse the algorithmic results - _markdownify = _CustomMarkdownify() - results = list() - for result in soup.find_all(class_="b_algo"): - # Rewrite redirect urls - for a in result.find_all("a", href=True): - parsed_href = urlparse(a["href"]) - qs = parse_qs(parsed_href.query) - - # The destination is contained in the u parameter, - # but appears to be base64 encoded, with some prefix - if "u" in qs: - u = ( - qs["u"][0][2:].strip() + "==" - ) # Python 3 doesn't care about extra padding - - try: - # RFC 4648 / Base64URL" variant, which uses "-" and "_" - a["href"] = base64.b64decode(u, altchars="-_").decode("utf-8") - except UnicodeDecodeError: - pass - except binascii.Error: - pass - - # Convert to markdown - md_result = _markdownify.convert_soup(result).strip() - lines = [line.strip() for line in re.split(r"\n+", md_result)] - results.append("\n".join([line for line in lines if len(line) > 0])) - - webpage_text = ( - f"## A Bing search for '{query}' found the following results:\n\n" - + "\n\n".join(results) - ) - - return DocumentConverterResult( - title=None if soup.title is None else soup.title.string, - text_content=webpage_text, - ) - - -class PdfConverter(DocumentConverter): - """ - Converts PDFs to Markdown. Most style information is ignored, so the results are essentially plain-text. - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a PDF - extension = kwargs.get("file_extension", "") - if extension.lower() != ".pdf": - return None - - return DocumentConverterResult( - title=None, - text_content=pdfminer.high_level.extract_text(local_path), - ) - - -class DocxConverter(HtmlConverter): - """ - Converts DOCX files to Markdown. Style information (e.g.m headings) and tables are preserved where possible. - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a DOCX - extension = kwargs.get("file_extension", "") - if extension.lower() != ".docx": - return None - - result = None - with open(local_path, "rb") as docx_file: - style_map = kwargs.get("style_map", None) - - result = mammoth.convert_to_html(docx_file, style_map=style_map) - html_content = result.value - result = self._convert(html_content) - - return result - - -class XlsxConverter(HtmlConverter): - """ - Converts XLSX files to Markdown, with each sheet presented as a separate Markdown table. - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a XLSX - extension = kwargs.get("file_extension", "") - if extension.lower() != ".xlsx": - return None - - sheets = pd.read_excel(local_path, sheet_name=None) - md_content = "" - for s in sheets: - md_content += f"## {s}\n" - html_content = sheets[s].to_html(index=False) - md_content += self._convert(html_content).text_content.strip() + "\n\n" - - return DocumentConverterResult( - title=None, - text_content=md_content.strip(), - ) - - -class PptxConverter(HtmlConverter): - """ - Converts PPTX files to Markdown. Supports heading, tables and images with alt text. - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a PPTX - extension = kwargs.get("file_extension", "") - if extension.lower() != ".pptx": - return None - - md_content = "" - - presentation = pptx.Presentation(local_path) - slide_num = 0 - for slide in presentation.slides: - slide_num += 1 - - md_content += f"\n\n\n" - - title = slide.shapes.title - for shape in slide.shapes: - # Pictures - if self._is_picture(shape): - # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 - alt_text = "" - try: - alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") - except Exception: - pass - - # A placeholder name - filename = re.sub(r"\W", "", shape.name) + ".jpg" - md_content += ( - "\n![" - + (alt_text if alt_text else shape.name) - + "](" - + filename - + ")\n" - ) - - # Tables - if self._is_table(shape): - html_table = "" - first_row = True - for row in shape.table.rows: - html_table += "" - for cell in row.cells: - if first_row: - html_table += "" - else: - html_table += "" - html_table += "" - first_row = False - html_table += "
" + html.escape(cell.text) + "" + html.escape(cell.text) + "
" - md_content += ( - "\n" + self._convert(html_table).text_content.strip() + "\n" - ) - - # Charts - if shape.has_chart: - md_content += self._convert_chart_to_markdown(shape.chart) - - # Text areas - elif shape.has_text_frame: - if shape == title: - md_content += "# " + shape.text.lstrip() + "\n" - else: - md_content += shape.text + "\n" - - md_content = md_content.strip() - - if slide.has_notes_slide: - md_content += "\n\n### Notes:\n" - notes_frame = slide.notes_slide.notes_text_frame - if notes_frame is not None: - md_content += notes_frame.text - md_content = md_content.strip() - - return DocumentConverterResult( - title=None, - text_content=md_content.strip(), - ) - - def _is_picture(self, shape): - if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE: - return True - if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PLACEHOLDER: - if hasattr(shape, "image"): - return True - return False - - def _is_table(self, shape): - if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.TABLE: - return True - return False - - def _convert_chart_to_markdown(self, chart): - md = "\n\n### Chart" - if chart.has_title: - md += f": {chart.chart_title.text_frame.text}" - md += "\n\n" - data = [] - category_names = [c.label for c in chart.plots[0].categories] - series_names = [s.name for s in chart.series] - data.append(["Category"] + series_names) - - for idx, category in enumerate(category_names): - row = [category] - for series in chart.series: - row.append(series.values[idx]) - data.append(row) - - markdown_table = [] - for row in data: - markdown_table.append("| " + " | ".join(map(str, row)) + " |") - header = markdown_table[0] - separator = "|" + "|".join(["---"] * len(data[0])) + "|" - return md + "\n".join([header, separator] + markdown_table[1:]) - - -class MediaConverter(DocumentConverter): - """ - Abstract class for multi-modal media (e.g., images and audio) - """ - - def _get_metadata(self, local_path): - exiftool = shutil.which("exiftool") - if not exiftool: - return None - else: - try: - result = subprocess.run( - [exiftool, "-json", local_path], capture_output=True, text=True - ).stdout - return json.loads(result)[0] - except Exception: - return None - - -class WavConverter(MediaConverter): - """ - Converts WAV files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` is installed). - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a WAV - extension = kwargs.get("file_extension", "") - if extension.lower() != ".wav": - return None - - md_content = "" - - # Add metadata - metadata = self._get_metadata(local_path) - if metadata: - for f in [ - "Title", - "Artist", - "Author", - "Band", - "Album", - "Genre", - "Track", - "DateTimeOriginal", - "CreateDate", - "Duration", - ]: - if f in metadata: - md_content += f"{f}: {metadata[f]}\n" - - # Transcribe - if IS_AUDIO_TRANSCRIPTION_CAPABLE: - try: - transcript = self._transcribe_audio(local_path) - md_content += "\n\n### Audio Transcript:\n" + ( - "[No speech detected]" if transcript == "" else transcript - ) - except Exception: - md_content += ( - "\n\n### Audio Transcript:\nError. Could not transcribe this audio." - ) - - return DocumentConverterResult( - title=None, - text_content=md_content.strip(), - ) - - def _transcribe_audio(self, local_path) -> str: - recognizer = sr.Recognizer() - with sr.AudioFile(local_path) as source: - audio = recognizer.record(source) - return recognizer.recognize_google(audio).strip() - - -class Mp3Converter(WavConverter): - """ - Converts MP3 files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` AND `pydub` are installed). - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a MP3 - extension = kwargs.get("file_extension", "") - if extension.lower() != ".mp3": - return None - - md_content = "" - - # Add metadata - metadata = self._get_metadata(local_path) - if metadata: - for f in [ - "Title", - "Artist", - "Author", - "Band", - "Album", - "Genre", - "Track", - "DateTimeOriginal", - "CreateDate", - "Duration", - ]: - if f in metadata: - md_content += f"{f}: {metadata[f]}\n" - - # Transcribe - if IS_AUDIO_TRANSCRIPTION_CAPABLE: - handle, temp_path = tempfile.mkstemp(suffix=".wav") - os.close(handle) - try: - sound = pydub.AudioSegment.from_mp3(local_path) - sound.export(temp_path, format="wav") - - _args = dict() - _args.update(kwargs) - _args["file_extension"] = ".wav" - - try: - transcript = super()._transcribe_audio(temp_path).strip() - md_content += "\n\n### Audio Transcript:\n" + ( - "[No speech detected]" if transcript == "" else transcript - ) - except Exception: - md_content += "\n\n### Audio Transcript:\nError. Could not transcribe this audio." - - finally: - os.unlink(temp_path) - - # Return the result - return DocumentConverterResult( - title=None, - text_content=md_content.strip(), - ) - - -class ImageConverter(MediaConverter): - """ - Converts images to markdown via extraction of metadata (if `exiftool` is installed), OCR (if `easyocr` is installed), and description via a multimodal LLM (if an llm_client is configured). - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not an image - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".jpg", ".jpeg", ".png"]: - return None - - md_content = "" - - # Add metadata - metadata = self._get_metadata(local_path) - if metadata: - for f in [ - "ImageSize", - "Title", - "Caption", - "Description", - "Keywords", - "Artist", - "Author", - "DateTimeOriginal", - "CreateDate", - "GPSPosition", - ]: - if f in metadata: - md_content += f"{f}: {metadata[f]}\n" - - # Try describing the image with GPTV - llm_client = kwargs.get("llm_client") - llm_model = kwargs.get("llm_model") - if llm_client is not None and llm_model is not None: - md_content += ( - "\n# Description:\n" - + self._get_llm_description( - local_path, - extension, - llm_client, - llm_model, - prompt=kwargs.get("llm_prompt"), - ).strip() - + "\n" - ) - - return DocumentConverterResult( - title=None, - text_content=md_content, - ) - - def _get_llm_description(self, local_path, extension, client, model, prompt=None): - if prompt is None or prompt.strip() == "": - prompt = "Write a detailed caption for this image." - - data_uri = "" - with open(local_path, "rb") as image_file: - content_type, encoding = mimetypes.guess_type("_dummy" + extension) - if content_type is None: - content_type = "image/jpeg" - image_base64 = base64.b64encode(image_file.read()).decode("utf-8") - data_uri = f"data:{content_type};base64,{image_base64}" - - messages = [ - { - "role": "user", - "content": [ - {"type": "text", "text": prompt}, - { - "type": "image_url", - "image_url": { - "url": data_uri, - }, - }, - ], - } - ] - - response = client.chat.completions.create(model=model, messages=messages) - return response.choices[0].message.content - - -class ZipConverter(DocumentConverter): - """Converts ZIP files to markdown by extracting and converting all contained files. - - The converter extracts the ZIP contents to a temporary directory, processes each file - using appropriate converters based on file extensions, and then combines the results - into a single markdown document. The temporary directory is cleaned up after processing. - - Example output format: - ```markdown - Content from the zip file `example.zip`: - - ## File: docs/readme.txt - - This is the content of readme.txt - Multiple lines are preserved - - ## File: images/example.jpg - - ImageSize: 1920x1080 - DateTimeOriginal: 2024-02-15 14:30:00 - Description: A beautiful landscape photo - - ## File: data/report.xlsx - - ## Sheet1 - | Column1 | Column2 | Column3 | - |---------|---------|---------| - | data1 | data2 | data3 | - | data4 | data5 | data6 | - ``` - - Key features: - - Maintains original file structure in headings - - Processes nested files recursively - - Uses appropriate converters for each file type - - Preserves formatting of converted content - - Cleans up temporary files after processing - """ - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not a ZIP - extension = kwargs.get("file_extension", "") - if extension.lower() != ".zip": - return None - - # Get parent converters list if available - parent_converters = kwargs.get("_parent_converters", []) - if not parent_converters: - return DocumentConverterResult( - title=None, - text_content=f"[ERROR] No converters available to process zip contents from: {local_path}", - ) - - extracted_zip_folder_name = ( - f"extracted_{os.path.basename(local_path).replace('.zip', '_zip')}" - ) - extraction_dir = os.path.normpath( - os.path.join(os.path.dirname(local_path), extracted_zip_folder_name) - ) - md_content = f"Content from the zip file `{os.path.basename(local_path)}`:\n\n" - - try: - # Extract the zip file safely - with zipfile.ZipFile(local_path, "r") as zipObj: - # Safeguard against path traversal - for member in zipObj.namelist(): - member_path = os.path.normpath(os.path.join(extraction_dir, member)) - if ( - not os.path.commonprefix([extraction_dir, member_path]) - == extraction_dir - ): - raise ValueError( - f"Path traversal detected in zip file: {member}" - ) - - # Extract all files safely - zipObj.extractall(path=extraction_dir) - - # Process each extracted file - for root, dirs, files in os.walk(extraction_dir): - for name in files: - file_path = os.path.join(root, name) - relative_path = os.path.relpath(file_path, extraction_dir) - - # Get file extension - _, file_extension = os.path.splitext(name) - - # Update kwargs for the file - file_kwargs = kwargs.copy() - file_kwargs["file_extension"] = file_extension - file_kwargs["_parent_converters"] = parent_converters - - # Try converting the file using available converters - for converter in parent_converters: - # Skip the zip converter to avoid infinite recursion - if isinstance(converter, ZipConverter): - continue - - result = converter.convert(file_path, **file_kwargs) - if result is not None: - md_content += f"\n## File: {relative_path}\n\n" - md_content += result.text_content + "\n\n" - break - - # Clean up extracted files if specified - if kwargs.get("cleanup_extracted", True): - shutil.rmtree(extraction_dir) - - return DocumentConverterResult(title=None, text_content=md_content.strip()) - - except zipfile.BadZipFile: - return DocumentConverterResult( - title=None, - text_content=f"[ERROR] Invalid or corrupted zip file: {local_path}", - ) - except ValueError as ve: - return DocumentConverterResult( - title=None, - text_content=f"[ERROR] Security error in zip file {local_path}: {str(ve)}", - ) - except Exception as e: - return DocumentConverterResult( - title=None, - text_content=f"[ERROR] Failed to process zip file {local_path}: {str(e)}", - ) - - -class FileConversionException(BaseException): - pass - - -class UnsupportedFormatException(BaseException): - pass - - -class MarkItDown: - """(In preview) An extremely simple text-based document reader, suitable for LLM use. - This reader will convert common file-types or webpages to Markdown.""" - - def __init__( - self, - requests_session: Optional[requests.Session] = None, - llm_client: Optional[Any] = None, - llm_model: Optional[str] = None, - style_map: Optional[str] = None, - # Deprecated - mlm_client: Optional[Any] = None, - mlm_model: Optional[str] = None, - ): - if requests_session is None: - self._requests_session = requests.Session() - else: - self._requests_session = requests_session - - # Handle deprecation notices - ############################# - if mlm_client is not None: - if llm_client is None: - warn( - "'mlm_client' is deprecated, and was renamed 'llm_client'.", - DeprecationWarning, - ) - llm_client = mlm_client - mlm_client = None - else: - raise ValueError( - "'mlm_client' is deprecated, and was renamed 'llm_client'. Do not use both at the same time. Just use 'llm_client' instead." - ) - - if mlm_model is not None: - if llm_model is None: - warn( - "'mlm_model' is deprecated, and was renamed 'llm_model'.", - DeprecationWarning, - ) - llm_model = mlm_model - mlm_model = None - else: - raise ValueError( - "'mlm_model' is deprecated, and was renamed 'llm_model'. Do not use both at the same time. Just use 'llm_model' instead." - ) - ############################# - - self._llm_client = llm_client - self._llm_model = llm_model - self._style_map = style_map - - self._page_converters: List[DocumentConverter] = [] - - # Register converters for successful browsing operations - # Later registrations are tried first / take higher priority than earlier registrations - # To this end, the most specific converters should appear below the most generic converters - self.register_page_converter(PlainTextConverter()) - self.register_page_converter(HtmlConverter()) - self.register_page_converter(RSSConverter()) - self.register_page_converter(WikipediaConverter()) - self.register_page_converter(YouTubeConverter()) - self.register_page_converter(BingSerpConverter()) - self.register_page_converter(DocxConverter()) - self.register_page_converter(XlsxConverter()) - self.register_page_converter(PptxConverter()) - self.register_page_converter(WavConverter()) - self.register_page_converter(Mp3Converter()) - self.register_page_converter(ImageConverter()) - self.register_page_converter(IpynbConverter()) - self.register_page_converter(PdfConverter()) - self.register_page_converter(ZipConverter()) - - def convert( - self, source: Union[str, requests.Response, Path], **kwargs: Any - ) -> DocumentConverterResult: # TODO: deal with kwargs - """ - Args: - - source: can be a string representing a path either as string pathlib path object or url, or a requests.response object - - extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.) - """ - - # Local path or url - if isinstance(source, str): - if ( - source.startswith("http://") - or source.startswith("https://") - or source.startswith("file://") - ): - return self.convert_url(source, **kwargs) - else: - return self.convert_local(source, **kwargs) - # Request response - elif isinstance(source, requests.Response): - return self.convert_response(source, **kwargs) - elif isinstance(source, Path): - return self.convert_local(source, **kwargs) - - def convert_local( - self, path: Union[str, Path], **kwargs: Any - ) -> DocumentConverterResult: # TODO: deal with kwargs - if isinstance(path, Path): - path = str(path) - # Prepare a list of extensions to try (in order of priority) - ext = kwargs.get("file_extension") - extensions = [ext] if ext is not None else [] - - # Get extension alternatives from the path and puremagic - base, ext = os.path.splitext(path) - self._append_ext(extensions, ext) - - for g in self._guess_ext_magic(path): - self._append_ext(extensions, g) - - # Convert - return self._convert(path, extensions, **kwargs) - - # TODO what should stream's type be? - def convert_stream( - self, stream: Any, **kwargs: Any - ) -> DocumentConverterResult: # TODO: deal with kwargs - # Prepare a list of extensions to try (in order of priority) - ext = kwargs.get("file_extension") - extensions = [ext] if ext is not None else [] - - # Save the file locally to a temporary file. It will be deleted before this method exits - handle, temp_path = tempfile.mkstemp() - fh = os.fdopen(handle, "wb") - result = None - try: - # Write to the temporary file - content = stream.read() - if isinstance(content, str): - fh.write(content.encode("utf-8")) - else: - fh.write(content) - fh.close() - - # Use puremagic to check for more extension options - for g in self._guess_ext_magic(temp_path): - self._append_ext(extensions, g) - - # Convert - result = self._convert(temp_path, extensions, **kwargs) - # Clean up - finally: - try: - fh.close() - except Exception: - pass - os.unlink(temp_path) - - return result - - def convert_url( - self, url: str, **kwargs: Any - ) -> DocumentConverterResult: # TODO: fix kwargs type - # Send a HTTP request to the URL - response = self._requests_session.get(url, stream=True) - response.raise_for_status() - return self.convert_response(response, **kwargs) - - def convert_response( - self, response: requests.Response, **kwargs: Any - ) -> DocumentConverterResult: # TODO fix kwargs type - # Prepare a list of extensions to try (in order of priority) - ext = kwargs.get("file_extension") - extensions = [ext] if ext is not None else [] - - # Guess from the mimetype - content_type = response.headers.get("content-type", "").split(";")[0] - self._append_ext(extensions, mimetypes.guess_extension(content_type)) - - # Read the content disposition if there is one - content_disposition = response.headers.get("content-disposition", "") - m = re.search(r"filename=([^;]+)", content_disposition) - if m: - base, ext = os.path.splitext(m.group(1).strip("\"'")) - self._append_ext(extensions, ext) - - # Read from the extension from the path - base, ext = os.path.splitext(urlparse(response.url).path) - self._append_ext(extensions, ext) - - # Save the file locally to a temporary file. It will be deleted before this method exits - handle, temp_path = tempfile.mkstemp() - fh = os.fdopen(handle, "wb") - result = None - try: - # Download the file - for chunk in response.iter_content(chunk_size=512): - fh.write(chunk) - fh.close() - - # Use puremagic to check for more extension options - for g in self._guess_ext_magic(temp_path): - self._append_ext(extensions, g) - - # Convert - result = self._convert(temp_path, extensions, url=response.url, **kwargs) - # Clean up - finally: - try: - fh.close() - except Exception: - pass - os.unlink(temp_path) - - return result - - def _convert( - self, local_path: str, extensions: List[Union[str, None]], **kwargs - ) -> DocumentConverterResult: - error_trace = "" - for ext in extensions + [None]: # Try last with no extension - for converter in self._page_converters: - _kwargs = copy.deepcopy(kwargs) - - # Overwrite file_extension appropriately - if ext is None: - if "file_extension" in _kwargs: - del _kwargs["file_extension"] - else: - _kwargs.update({"file_extension": ext}) - - # Copy any additional global options - if "llm_client" not in _kwargs and self._llm_client is not None: - _kwargs["llm_client"] = self._llm_client - - if "llm_model" not in _kwargs and self._llm_model is not None: - _kwargs["llm_model"] = self._llm_model - - # Add the list of converters for nested processing - _kwargs["_parent_converters"] = self._page_converters - - if "style_map" not in _kwargs and self._style_map is not None: - _kwargs["style_map"] = self._style_map - - # If we hit an error log it and keep trying - try: - res = converter.convert(local_path, **_kwargs) - except Exception: - error_trace = ("\n\n" + traceback.format_exc()).strip() - - if res is not None: - # Normalize the content - res.text_content = "\n".join( - [line.rstrip() for line in re.split(r"\r?\n", res.text_content)] - ) - res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content) - - # Todo - return res - - # If we got this far without success, report any exceptions - if len(error_trace) > 0: - raise FileConversionException( - f"Could not convert '{local_path}' to Markdown. File type was recognized as {extensions}. While converting the file, the following error was encountered:\n\n{error_trace}" - ) - - # Nothing can handle it! - raise UnsupportedFormatException( - f"Could not convert '{local_path}' to Markdown. The formats {extensions} are not supported." - ) - - def _append_ext(self, extensions, ext): - """Append a unique non-None, non-empty extension to a list of extensions.""" - if ext is None: - return - ext = ext.strip() - if ext == "": - return - # if ext not in extensions: - extensions.append(ext) - - def _guess_ext_magic(self, path): - """Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes.""" - # Use puremagic to guess - try: - guesses = puremagic.magic_file(path) - extensions = list() - for g in guesses: - ext = g.extension.strip() - if len(ext) > 0: - if not ext.startswith("."): - ext = "." + ext - if ext not in extensions: - extensions.append(ext) - return extensions - except FileNotFoundError: - pass - except IsADirectoryError: - pass - except PermissionError: - pass - return [] - - def register_page_converter(self, converter: DocumentConverter) -> None: - """Register a page text converter.""" - self._page_converters.insert(0, converter) diff --git a/src/markitdown/converters/__init__.py b/src/markitdown/converters/__init__.py new file mode 100644 index 0000000..bd61cf6 --- /dev/null +++ b/src/markitdown/converters/__init__.py @@ -0,0 +1,3 @@ +from . import archive, base, document, media, text, web + +__all__ = ["archive", "base", "document", "media", "text", "web"] diff --git a/src/markitdown/converters/archive.py b/src/markitdown/converters/archive.py new file mode 100644 index 0000000..f0d382a --- /dev/null +++ b/src/markitdown/converters/archive.py @@ -0,0 +1,135 @@ +import os +import shutil +import zipfile +from typing import Any, Union + +from .base import DocumentConverter, DocumentConverterResult + + +class ZipConverter(DocumentConverter): + """Converts ZIP files to markdown by extracting and converting all contained files. + + The converter extracts the ZIP contents to a temporary directory, processes each file + using appropriate converters based on file extensions, and then combines the results + into a single markdown document. The temporary directory is cleaned up after processing. + + Example output format: + ```markdown + Content from the zip file `example.zip`: + + ## File: docs/readme.txt + + This is the content of readme.txt + Multiple lines are preserved + + ## File: images/example.jpg + + ImageSize: 1920x1080 + DateTimeOriginal: 2024-02-15 14:30:00 + Description: A beautiful landscape photo + + ## File: data/report.xlsx + + ## Sheet1 + | Column1 | Column2 | Column3 | + |---------|---------|---------| + | data1 | data2 | data3 | + | data4 | data5 | data6 | + ``` + + Key features: + - Maintains original file structure in headings + - Processes nested files recursively + - Uses appropriate converters for each file type + - Preserves formatting of converted content + - Cleans up temporary files after processing + """ + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not a ZIP + extension = kwargs.get("file_extension", "") + if extension.lower() != ".zip": + return None + + # Get parent converters list if available + parent_converters = kwargs.get("_parent_converters", []) + if not parent_converters: + return DocumentConverterResult( + title=None, + text_content=f"[ERROR] No converters available to process zip contents from: {local_path}", + ) + + extracted_zip_folder_name = ( + f"extracted_{os.path.basename(local_path).replace('.zip', '_zip')}" + ) + extraction_dir = os.path.normpath( + os.path.join(os.path.dirname(local_path), extracted_zip_folder_name) + ) + md_content = f"Content from the zip file `{os.path.basename(local_path)}`:\n\n" + + try: + # Extract the zip file safely + with zipfile.ZipFile(local_path, "r") as zipObj: + # Safeguard against path traversal + for member in zipObj.namelist(): + member_path = os.path.normpath(os.path.join(extraction_dir, member)) + if ( + not os.path.commonprefix([extraction_dir, member_path]) + == extraction_dir + ): + raise ValueError( + f"Path traversal detected in zip file: {member}" + ) + + # Extract all files safely + zipObj.extractall(path=extraction_dir) + + # Process each extracted file + for root, dirs, files in os.walk(extraction_dir): + for name in files: + file_path = os.path.join(root, name) + relative_path = os.path.relpath(file_path, extraction_dir) + + # Get file extension + _, file_extension = os.path.splitext(name) + + # Update kwargs for the file + file_kwargs = kwargs.copy() + file_kwargs["file_extension"] = file_extension + file_kwargs["_parent_converters"] = parent_converters + + # Try converting the file using available converters + for converter in parent_converters: + # Skip the zip converter to avoid infinite recursion + if isinstance(converter, ZipConverter): + continue + + result = converter.convert(file_path, **file_kwargs) + if result is not None: + md_content += f"\n## File: {relative_path}\n\n" + md_content += result.text_content + "\n\n" + break + + # Clean up extracted files if specified + if kwargs.get("cleanup_extracted", True): + shutil.rmtree(extraction_dir) + + return DocumentConverterResult(title=None, text_content=md_content.strip()) + + except zipfile.BadZipFile: + return DocumentConverterResult( + title=None, + text_content=f"[ERROR] Invalid or corrupted zip file: {local_path}", + ) + except ValueError as ve: + return DocumentConverterResult( + title=None, + text_content=f"[ERROR] Security error in zip file {local_path}: {str(ve)}", + ) + except Exception as e: + return DocumentConverterResult( + title=None, + text_content=f"[ERROR] Failed to process zip file {local_path}: {str(e)}", + ) diff --git a/src/markitdown/converters/base.py b/src/markitdown/converters/base.py new file mode 100644 index 0000000..810a723 --- /dev/null +++ b/src/markitdown/converters/base.py @@ -0,0 +1,18 @@ +from typing import Any, Union + + +class DocumentConverterResult: + """The result of converting a document to text.""" + + def __init__(self, title: Union[str, None] = None, text_content: str = ""): + self.title: Union[str, None] = title + self.text_content: str = text_content + + +class DocumentConverter: + """Abstract superclass of all DocumentConverters.""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + raise NotImplementedError() diff --git a/src/markitdown/converters/document.py b/src/markitdown/converters/document.py new file mode 100644 index 0000000..8e06f01 --- /dev/null +++ b/src/markitdown/converters/document.py @@ -0,0 +1,195 @@ +import html +import re +from typing import Union + +import mammoth +import pandas as pd +import pdfminer +import pptx + +from .base import DocumentConverter, DocumentConverterResult +from .web import HtmlConverter + + +class PdfConverter(DocumentConverter): + """ + Converts PDFs to Markdown. Most style information is ignored, so the results are essentially plain-text. + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a PDF + extension = kwargs.get("file_extension", "") + if extension.lower() != ".pdf": + return None + + return DocumentConverterResult( + title=None, + text_content=pdfminer.high_level.extract_text(local_path), + ) + + +class DocxConverter(HtmlConverter): + """ + Converts DOCX files to Markdown. Style information (e.g.m headings) and tables are preserved where possible. + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a DOCX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".docx": + return None + + result = None + with open(local_path, "rb") as docx_file: + style_map = kwargs.get("style_map", None) + + result = mammoth.convert_to_html(docx_file, style_map=style_map) + html_content = result.value + result = self._convert(html_content) + + return result + + +class XlsxConverter(HtmlConverter): + """ + Converts XLSX files to Markdown, with each sheet presented as a separate Markdown table. + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a XLSX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".xlsx": + return None + + sheets = pd.read_excel(local_path, sheet_name=None) + md_content = "" + for s in sheets: + md_content += f"## {s}\n" + html_content = sheets[s].to_html(index=False) + md_content += self._convert(html_content).text_content.strip() + "\n\n" + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + +class PptxConverter(HtmlConverter): + """ + Converts PPTX files to Markdown. Supports heading, tables and images with alt text. + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a PPTX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".pptx": + return None + + md_content = "" + + presentation = pptx.Presentation(local_path) + slide_num = 0 + for slide in presentation.slides: + slide_num += 1 + + md_content += f"\n\n\n" + + title = slide.shapes.title + for shape in slide.shapes: + # Pictures + if self._is_picture(shape): + # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 + alt_text = "" + try: + alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") + except Exception: + pass + + # A placeholder name + filename = re.sub(r"\W", "", shape.name) + ".jpg" + md_content += ( + "\n![" + + (alt_text if alt_text else shape.name) + + "](" + + filename + + ")\n" + ) + + # Tables + if self._is_table(shape): + html_table = "" + first_row = True + for row in shape.table.rows: + html_table += "" + for cell in row.cells: + if first_row: + html_table += "" + else: + html_table += "" + html_table += "" + first_row = False + html_table += "
" + html.escape(cell.text) + "" + html.escape(cell.text) + "
" + md_content += ( + "\n" + self._convert(html_table).text_content.strip() + "\n" + ) + + # Charts + if shape.has_chart: + md_content += self._convert_chart_to_markdown(shape.chart) + + # Text areas + elif shape.has_text_frame: + if shape == title: + md_content += "# " + shape.text.lstrip() + "\n" + else: + md_content += shape.text + "\n" + + md_content = md_content.strip() + + if slide.has_notes_slide: + md_content += "\n\n### Notes:\n" + notes_frame = slide.notes_slide.notes_text_frame + if notes_frame is not None: + md_content += notes_frame.text + md_content = md_content.strip() + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + def _is_picture(self, shape): + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE: + return True + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PLACEHOLDER: + if hasattr(shape, "image"): + return True + return False + + def _is_table(self, shape): + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.TABLE: + return True + return False + + def _convert_chart_to_markdown(self, chart): + md = "\n\n### Chart" + if chart.has_title: + md += f": {chart.chart_title.text_frame.text}" + md += "\n\n" + data = [] + category_names = [c.label for c in chart.plots[0].categories] + series_names = [s.name for s in chart.series] + data.append(["Category"] + series_names) + + for idx, category in enumerate(category_names): + row = [category] + for series in chart.series: + row.append(series.values[idx]) + data.append(row) + + markdown_table = [] + for row in data: + markdown_table.append("| " + " | ".join(map(str, row)) + " |") + header = markdown_table[0] + separator = "|" + "|".join(["---"] * len(data[0])) + "|" + return md + "\n".join([header, separator] + markdown_table[1:]) diff --git a/src/markitdown/converters/media.py b/src/markitdown/converters/media.py new file mode 100644 index 0000000..d6f40f0 --- /dev/null +++ b/src/markitdown/converters/media.py @@ -0,0 +1,246 @@ +import base64 +import json +import mimetypes +import os +import shutil +import subprocess +import tempfile +from typing import Union +from warnings import catch_warnings, resetwarnings + +from .base import DocumentConverter, DocumentConverterResult + +# Optional Transcription support +try: + # Using warnings' catch_warnings to catch + # pydub's warning of ffmpeg or avconv missing + with catch_warnings(record=True) as w: + import pydub + + if w: + raise ModuleNotFoundError + import speech_recognition as sr + + IS_AUDIO_TRANSCRIPTION_CAPABLE = True +except ModuleNotFoundError: + pass +finally: + resetwarnings() + + +class MediaConverter(DocumentConverter): + """ + Abstract class for multi-modal media (e.g., images and audio) + """ + + def _get_metadata(self, local_path): + exiftool = shutil.which("exiftool") + if not exiftool: + return None + else: + try: + result = subprocess.run( + [exiftool, "-json", local_path], capture_output=True, text=True + ).stdout + return json.loads(result)[0] + except Exception: + return None + + +class WavConverter(MediaConverter): + """ + Converts WAV files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` is installed). + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a WAV + extension = kwargs.get("file_extension", "") + if extension.lower() != ".wav": + return None + + md_content = "" + + # Add metadata + metadata = self._get_metadata(local_path) + if metadata: + for f in [ + "Title", + "Artist", + "Author", + "Band", + "Album", + "Genre", + "Track", + "DateTimeOriginal", + "CreateDate", + "Duration", + ]: + if f in metadata: + md_content += f"{f}: {metadata[f]}\n" + + # Transcribe + if IS_AUDIO_TRANSCRIPTION_CAPABLE: + try: + transcript = self._transcribe_audio(local_path) + md_content += "\n\n### Audio Transcript:\n" + ( + "[No speech detected]" if transcript == "" else transcript + ) + except Exception: + md_content += ( + "\n\n### Audio Transcript:\nError. Could not transcribe this audio." + ) + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + def _transcribe_audio(self, local_path) -> str: + recognizer = sr.Recognizer() + with sr.AudioFile(local_path) as source: + audio = recognizer.record(source) + return recognizer.recognize_google(audio).strip() + + +class Mp3Converter(WavConverter): + """ + Converts MP3 files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` AND `pydub` are installed). + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a MP3 + extension = kwargs.get("file_extension", "") + if extension.lower() != ".mp3": + return None + + md_content = "" + + # Add metadata + metadata = self._get_metadata(local_path) + if metadata: + for f in [ + "Title", + "Artist", + "Author", + "Band", + "Album", + "Genre", + "Track", + "DateTimeOriginal", + "CreateDate", + "Duration", + ]: + if f in metadata: + md_content += f"{f}: {metadata[f]}\n" + + # Transcribe + if IS_AUDIO_TRANSCRIPTION_CAPABLE: + handle, temp_path = tempfile.mkstemp(suffix=".wav") + os.close(handle) + try: + sound = pydub.AudioSegment.from_mp3(local_path) + sound.export(temp_path, format="wav") + + _args = dict() + _args.update(kwargs) + _args["file_extension"] = ".wav" + + try: + transcript = super()._transcribe_audio(temp_path).strip() + md_content += "\n\n### Audio Transcript:\n" + ( + "[No speech detected]" if transcript == "" else transcript + ) + except Exception: + md_content += "\n\n### Audio Transcript:\nError. Could not transcribe this audio." + + finally: + os.unlink(temp_path) + + # Return the result + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + +class ImageConverter(MediaConverter): + """ + Converts images to markdown via extraction of metadata (if `exiftool` is installed), OCR (if `easyocr` is installed), and description via a multimodal LLM (if an llm_client is configured). + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not an image + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".jpg", ".jpeg", ".png"]: + return None + + md_content = "" + + # Add metadata + metadata = self._get_metadata(local_path) + if metadata: + for f in [ + "ImageSize", + "Title", + "Caption", + "Description", + "Keywords", + "Artist", + "Author", + "DateTimeOriginal", + "CreateDate", + "GPSPosition", + ]: + if f in metadata: + md_content += f"{f}: {metadata[f]}\n" + + # Try describing the image with GPTV + llm_client = kwargs.get("llm_client") + llm_model = kwargs.get("llm_model") + if llm_client is not None and llm_model is not None: + md_content += ( + "\n# Description:\n" + + self._get_llm_description( + local_path, + extension, + llm_client, + llm_model, + prompt=kwargs.get("llm_prompt"), + ).strip() + + "\n" + ) + + return DocumentConverterResult( + title=None, + text_content=md_content, + ) + + def _get_llm_description(self, local_path, extension, client, model, prompt=None): + if prompt is None or prompt.strip() == "": + prompt = "Write a detailed caption for this image." + + data_uri = "" + with open(local_path, "rb") as image_file: + content_type, encoding = mimetypes.guess_type("_dummy" + extension) + if content_type is None: + content_type = "image/jpeg" + image_base64 = base64.b64encode(image_file.read()).decode("utf-8") + data_uri = f"data:{content_type};base64,{image_base64}" + + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": data_uri, + }, + }, + ], + } + ] + + response = client.chat.completions.create(model=model, messages=messages) + return response.choices[0].message.content diff --git a/src/markitdown/converters/text.py b/src/markitdown/converters/text.py new file mode 100644 index 0000000..0790d2c --- /dev/null +++ b/src/markitdown/converters/text.py @@ -0,0 +1,234 @@ +import json +import mimetypes +import traceback +from typing import Any, Union +from xml.dom import minidom + +from bs4 import BeautifulSoup +from charset_normalizer import from_path + +from ..core import _CustomMarkdownify +from ..exceptions import FileConversionException +from .base import DocumentConverter, DocumentConverterResult + + +class PlainTextConverter(DocumentConverter): + """Anything with content type text/plain""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Guess the content type from any file extension that might be around + content_type, _ = mimetypes.guess_type( + "__placeholder" + kwargs.get("file_extension", "") + ) + + # Only accept text files + if content_type is None: + return None + elif "text/" not in content_type.lower(): + return None + + text_content = str(from_path(local_path).best()) + return DocumentConverterResult( + title=None, + text_content=text_content, + ) + + +class RSSConverter(DocumentConverter): + """Convert RSS / Atom type to markdown""" + + def convert( + self, local_path: str, **kwargs + ) -> Union[None, DocumentConverterResult]: + # Bail if not RSS type + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".xml", ".rss", ".atom"]: + return None + try: + doc = minidom.parse(local_path) + except BaseException as _: + return None + result = None + if doc.getElementsByTagName("rss"): + # A RSS feed must have a root element of + result = self._parse_rss_type(doc) + elif doc.getElementsByTagName("feed"): + root = doc.getElementsByTagName("feed")[0] + if root.getElementsByTagName("entry"): + # An Atom feed must have a root element of and at least one + result = self._parse_atom_type(doc) + else: + return None + else: + # not rss or atom + return None + + return result + + def _parse_atom_type( + self, doc: minidom.Document + ) -> Union[None, DocumentConverterResult]: + """Parse the type of an Atom feed. + + Returns None if the feed type is not recognized or something goes wrong. + """ + try: + root = doc.getElementsByTagName("feed")[0] + title = self._get_data_by_tag_name(root, "title") + subtitle = self._get_data_by_tag_name(root, "subtitle") + entries = root.getElementsByTagName("entry") + md_text = f"# {title}\n" + if subtitle: + md_text += f"{subtitle}\n" + for entry in entries: + entry_title = self._get_data_by_tag_name(entry, "title") + entry_summary = self._get_data_by_tag_name(entry, "summary") + entry_updated = self._get_data_by_tag_name(entry, "updated") + entry_content = self._get_data_by_tag_name(entry, "content") + + if entry_title: + md_text += f"\n## {entry_title}\n" + if entry_updated: + md_text += f"Updated on: {entry_updated}\n" + if entry_summary: + md_text += self._parse_content(entry_summary) + if entry_content: + md_text += self._parse_content(entry_content) + + return DocumentConverterResult( + title=title, + text_content=md_text, + ) + except BaseException as _: + return None + + def _parse_rss_type( + self, doc: minidom.Document + ) -> Union[None, DocumentConverterResult]: + """Parse the type of an RSS feed. + + Returns None if the feed type is not recognized or something goes wrong. + """ + try: + root = doc.getElementsByTagName("rss")[0] + channel = root.getElementsByTagName("channel") + if not channel: + return None + channel = channel[0] + channel_title = self._get_data_by_tag_name(channel, "title") + channel_description = self._get_data_by_tag_name(channel, "description") + items = channel.getElementsByTagName("item") + if channel_title: + md_text = f"# {channel_title}\n" + if channel_description: + md_text += f"{channel_description}\n" + if not items: + items = [] + for item in items: + title = self._get_data_by_tag_name(item, "title") + description = self._get_data_by_tag_name(item, "description") + pubDate = self._get_data_by_tag_name(item, "pubDate") + content = self._get_data_by_tag_name(item, "content:encoded") + + if title: + md_text += f"\n## {title}\n" + if pubDate: + md_text += f"Published on: {pubDate}\n" + if description: + md_text += self._parse_content(description) + if content: + md_text += self._parse_content(content) + + return DocumentConverterResult( + title=channel_title, + text_content=md_text, + ) + except BaseException as _: + print(traceback.format_exc()) + return None + + def _parse_content(self, content: str) -> str: + """Parse the content of an RSS feed item""" + try: + # using bs4 because many RSS feeds have HTML-styled content + soup = BeautifulSoup(content, "html.parser") + return _CustomMarkdownify().convert_soup(soup) + except BaseException as _: + return content + + def _get_data_by_tag_name( + self, element: minidom.Element, tag_name: str + ) -> Union[str, None]: + """Get data from first child element with the given tag name. + Returns None when no such element is found. + """ + nodes = element.getElementsByTagName(tag_name) + if not nodes: + return None + fc = nodes[0].firstChild + if fc: + return fc.data + return None + + +class IpynbConverter(DocumentConverter): + """Converts Jupyter Notebook (.ipynb) files to Markdown.""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not ipynb + extension = kwargs.get("file_extension", "") + if extension.lower() != ".ipynb": + return None + + # Parse and convert the notebook + result = None + with open(local_path, "rt", encoding="utf-8") as fh: + notebook_content = json.load(fh) + result = self._convert(notebook_content) + + return result + + def _convert(self, notebook_content: dict) -> Union[None, DocumentConverterResult]: + """Helper function that converts notebook JSON content to Markdown.""" + try: + md_output = [] + title = None + + for cell in notebook_content.get("cells", []): + cell_type = cell.get("cell_type", "") + source_lines = cell.get("source", []) + + if cell_type == "markdown": + md_output.append("".join(source_lines)) + + # Extract the first # heading as title if not already found + if title is None: + for line in source_lines: + if line.startswith("# "): + title = line.lstrip("# ").strip() + break + + elif cell_type == "code": + # Code cells are wrapped in Markdown code blocks + md_output.append(f"```python\n{''.join(source_lines)}\n```") + elif cell_type == "raw": + md_output.append(f"```\n{''.join(source_lines)}\n```") + + md_text = "\n\n".join(md_output) + + # Check for title in notebook metadata + title = notebook_content.get("metadata", {}).get("title", title) + + return DocumentConverterResult( + title=title, + text_content=md_text, + ) + + except Exception as e: + raise FileConversionException( + f"Error converting .ipynb file: {str(e)}" + ) from e diff --git a/src/markitdown/converters/web.py b/src/markitdown/converters/web.py new file mode 100644 index 0000000..4ee075d --- /dev/null +++ b/src/markitdown/converters/web.py @@ -0,0 +1,314 @@ +import base64 +import binascii +import json +import re +from typing import Any, Dict, List, Union +from urllib.parse import parse_qs, urlparse + +from bs4 import BeautifulSoup + +from ..core import _CustomMarkdownify +from .base import DocumentConverter, DocumentConverterResult + +# Optional YouTube transcription support +try: + from youtube_transcript_api import YouTubeTranscriptApi + + IS_YOUTUBE_TRANSCRIPT_CAPABLE = True +except ModuleNotFoundError: + pass + + +class HtmlConverter(DocumentConverter): + """Anything with content type text/html""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not html + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + + result = None + with open(local_path, "rt", encoding="utf-8") as fh: + result = self._convert(fh.read()) + + return result + + def _convert(self, html_content: str) -> Union[None, DocumentConverterResult]: + """Helper function that converts and HTML string.""" + + # Parse the string + soup = BeautifulSoup(html_content, "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Print only the main content + body_elm = soup.find("body") + webpage_text = "" + if body_elm: + webpage_text = _CustomMarkdownify().convert_soup(body_elm) + else: + webpage_text = _CustomMarkdownify().convert_soup(soup) + + assert isinstance(webpage_text, str) + + return DocumentConverterResult( + title=None if soup.title is None else soup.title.string, + text_content=webpage_text, + ) + + +class WikipediaConverter(DocumentConverter): + """Handle Wikipedia pages separately, focusing only on the main document content.""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not Wikipedia + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url): + return None + + # Parse the file + soup = None + with open(local_path, "rt", encoding="utf-8") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Print only the main content + body_elm = soup.find("div", {"id": "mw-content-text"}) + title_elm = soup.find("span", {"class": "mw-page-title-main"}) + + webpage_text = "" + main_title = None if soup.title is None else soup.title.string + + if body_elm: + # What's the title + if title_elm and len(title_elm) > 0: + main_title = title_elm.string # type: ignore + assert isinstance(main_title, str) + + # Convert the page + webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup( + body_elm + ) + else: + webpage_text = _CustomMarkdownify().convert_soup(soup) + + return DocumentConverterResult( + title=main_title, + text_content=webpage_text, + ) + + +class YouTubeConverter(DocumentConverter): + """Handle YouTube specially, focusing on the video title, description, and transcript.""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not YouTube + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not url.startswith("https://www.youtube.com/watch?"): + return None + + # Parse the file + soup = None + with open(local_path, "rt", encoding="utf-8") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Read the meta tags + assert soup.title is not None and soup.title.string is not None + metadata: Dict[str, str] = {"title": soup.title.string} + for meta in soup(["meta"]): + for a in meta.attrs: + if a in ["itemprop", "property", "name"]: + metadata[meta[a]] = meta.get("content", "") + break + + # We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation + try: + for script in soup(["script"]): + content = script.text + if "ytInitialData" in content: + lines = re.split(r"\r?\n", content) + obj_start = lines[0].find("{") + obj_end = lines[0].rfind("}") + if obj_start >= 0 and obj_end >= 0: + data = json.loads(lines[0][obj_start : obj_end + 1]) + attrdesc = self._findKey(data, "attributedDescriptionBodyText") # type: ignore + if attrdesc: + metadata["description"] = str(attrdesc["content"]) + break + except Exception: + pass + + # Start preparing the page + webpage_text = "# YouTube\n" + + title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore + assert isinstance(title, str) + + if title: + webpage_text += f"\n## {title}\n" + + stats = "" + views = self._get(metadata, ["interactionCount"]) # type: ignore + if views: + stats += f"- **Views:** {views}\n" + + keywords = self._get(metadata, ["keywords"]) # type: ignore + if keywords: + stats += f"- **Keywords:** {keywords}\n" + + runtime = self._get(metadata, ["duration"]) # type: ignore + if runtime: + stats += f"- **Runtime:** {runtime}\n" + + if len(stats) > 0: + webpage_text += f"\n### Video Metadata\n{stats}\n" + + description = self._get(metadata, ["description", "og:description"]) # type: ignore + if description: + webpage_text += f"\n### Description\n{description}\n" + + if IS_YOUTUBE_TRANSCRIPT_CAPABLE: + transcript_text = "" + parsed_url = urlparse(url) # type: ignore + params = parse_qs(parsed_url.query) # type: ignore + if "v" in params: + assert isinstance(params["v"][0], str) + video_id = str(params["v"][0]) + try: + youtube_transcript_languages = kwargs.get( + "youtube_transcript_languages", ("en",) + ) + # Must be a single transcript. + transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=youtube_transcript_languages) # type: ignore + transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore + # Alternative formatting: + # formatter = TextFormatter() + # formatter.format_transcript(transcript) + except Exception: + pass + if transcript_text: + webpage_text += f"\n### Transcript\n{transcript_text}\n" + + title = title if title else soup.title.string + assert isinstance(title, str) + + return DocumentConverterResult( + title=title, + text_content=webpage_text, + ) + + def _get( + self, + metadata: Dict[str, str], + keys: List[str], + default: Union[str, None] = None, + ) -> Union[str, None]: + for k in keys: + if k in metadata: + return metadata[k] + return default + + def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type + if isinstance(json, list): + for elm in json: + ret = self._findKey(elm, key) + if ret is not None: + return ret + elif isinstance(json, dict): + for k in json: + if k == key: + return json[k] + else: + ret = self._findKey(json[k], key) + if ret is not None: + return ret + return None + + +class BingSerpConverter(DocumentConverter): + """ + Handle Bing results pages (only the organic search results). + NOTE: It is better to use the Bing API + """ + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a Bing SERP + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not re.search(r"^https://www\.bing\.com/search\?q=", url): + return None + + # Parse the query parameters + parsed_params = parse_qs(urlparse(url).query) + query = parsed_params.get("q", [""])[0] + + # Parse the file + soup = None + with open(local_path, "rt", encoding="utf-8") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Clean up some formatting + for tptt in soup.find_all(class_="tptt"): + if hasattr(tptt, "string") and tptt.string: + tptt.string += " " + for slug in soup.find_all(class_="algoSlug_icon"): + slug.extract() + + # Parse the algorithmic results + _markdownify = _CustomMarkdownify() + results = list() + for result in soup.find_all(class_="b_algo"): + # Rewrite redirect urls + for a in result.find_all("a", href=True): + parsed_href = urlparse(a["href"]) + qs = parse_qs(parsed_href.query) + + # The destination is contained in the u parameter, + # but appears to be base64 encoded, with some prefix + if "u" in qs: + u = ( + qs["u"][0][2:].strip() + "==" + ) # Python 3 doesn't care about extra padding + + try: + # RFC 4648 / Base64URL" variant, which uses "-" and "_" + a["href"] = base64.b64decode(u, altchars="-_").decode("utf-8") + except UnicodeDecodeError: + pass + except binascii.Error: + pass + + # Convert to markdown + md_result = _markdownify.convert_soup(result).strip() + lines = [line.strip() for line in re.split(r"\n+", md_result)] + results.append("\n".join([line for line in lines if len(line) > 0])) + + webpage_text = ( + f"## A Bing search for '{query}' found the following results:\n\n" + + "\n\n".join(results) + ) + + return DocumentConverterResult( + title=None if soup.title is None else soup.title.string, + text_content=webpage_text, + ) diff --git a/src/markitdown/core.py b/src/markitdown/core.py new file mode 100644 index 0000000..f0a3063 --- /dev/null +++ b/src/markitdown/core.py @@ -0,0 +1,409 @@ +# type: ignore +import copy +import mimetypes +import os +import re +import tempfile +import traceback +from pathlib import Path +from typing import Any, List, Optional, Union +from urllib.parse import quote, unquote, urlparse, urlunparse +from warnings import warn + +import markdownify + +# File-format detection +import puremagic +import requests + +from .converters.archive import ZipConverter +from .converters.base import DocumentConverter, DocumentConverterResult +from .converters.document import DocxConverter, PdfConverter, PptxConverter, XlsxConverter +from .converters.media import ImageConverter, Mp3Converter, WavConverter +from .converters.text import IpynbConverter, PlainTextConverter, RSSConverter +from .converters.web import BingSerpConverter, HtmlConverter, WikipediaConverter, YouTubeConverter +from .exceptions import FileConversionException, UnsupportedFormatException + + +class _CustomMarkdownify(markdownify.MarkdownConverter): + """ + A custom version of markdownify's MarkdownConverter. Changes include: + + - Altering the default heading style to use '#', '##', etc. + - Removing javascript hyperlinks. + - Truncating images with large data:uri sources. + - Ensuring URIs are properly escaped, and do not conflict with Markdown syntax + """ + + def __init__(self, **options: Any): + options["heading_style"] = options.get("heading_style", markdownify.ATX) + # Explicitly cast options to the expected type if necessary + super().__init__(**options) + + def convert_hn(self, n: int, el: Any, text: str, convert_as_inline: bool) -> str: + """Same as usual, but be sure to start with a new line""" + if not convert_as_inline: + if not re.search(r"^\n", text): + return "\n" + super().convert_hn(n, el, text, convert_as_inline) # type: ignore + + return super().convert_hn(n, el, text, convert_as_inline) # type: ignore + + def convert_a(self, el: Any, text: str, convert_as_inline: bool): + """Same as usual converter, but removes Javascript links and escapes URIs.""" + prefix, suffix, text = markdownify.chomp(text) # type: ignore + if not text: + return "" + href = el.get("href") + title = el.get("title") + + # Escape URIs and skip non-http or file schemes + if href: + try: + parsed_url = urlparse(href) # type: ignore + if parsed_url.scheme and parsed_url.scheme.lower() not in ["http", "https", "file"]: # type: ignore + return "%s%s%s" % (prefix, text, suffix) + href = urlunparse(parsed_url._replace(path=quote(unquote(parsed_url.path)))) # type: ignore + except ValueError: # It's not clear if this ever gets thrown + return "%s%s%s" % (prefix, text, suffix) + + # For the replacement see #29: text nodes underscores are escaped + if ( + self.options["autolinks"] + and text.replace(r"\_", "_") == href + and not title + and not self.options["default_title"] + ): + # Shortcut syntax + return "<%s>" % href + if self.options["default_title"] and not title: + title = href + title_part = ' "%s"' % title.replace('"', r"\"") if title else "" + return ( + "%s[%s](%s%s)%s" % (prefix, text, href, title_part, suffix) + if href + else text + ) + + def convert_img(self, el: Any, text: str, convert_as_inline: bool) -> str: + """Same as usual converter, but removes data URIs""" + + alt = el.attrs.get("alt", None) or "" + src = el.attrs.get("src", None) or "" + title = el.attrs.get("title", None) or "" + title_part = ' "%s"' % title.replace('"', r"\"") if title else "" + if ( + convert_as_inline + and el.parent.name not in self.options["keep_inline_images_in"] + ): + return alt + + # Remove dataURIs + if src.startswith("data:"): + src = src.split(",")[0] + "..." + + return "![%s](%s%s)" % (alt, src, title_part) + + def convert_soup(self, soup: Any) -> str: + return super().convert_soup(soup) # type: ignore + + +class MarkItDown: + """(In preview) An extremely simple text-based document reader, suitable for LLM use. + This reader will convert common file-types or webpages to Markdown.""" + + def __init__( + self, + requests_session: Optional[requests.Session] = None, + llm_client: Optional[Any] = None, + llm_model: Optional[str] = None, + style_map: Optional[str] = None, + # Deprecated + mlm_client: Optional[Any] = None, + mlm_model: Optional[str] = None, + ): + if requests_session is None: + self._requests_session = requests.Session() + else: + self._requests_session = requests_session + + # Handle deprecation notices + ############################# + if mlm_client is not None: + if llm_client is None: + warn( + "'mlm_client' is deprecated, and was renamed 'llm_client'.", + DeprecationWarning, + ) + llm_client = mlm_client + mlm_client = None + else: + raise ValueError( + "'mlm_client' is deprecated, and was renamed 'llm_client'. Do not use both at the same time. Just use 'llm_client' instead." + ) + + if mlm_model is not None: + if llm_model is None: + warn( + "'mlm_model' is deprecated, and was renamed 'llm_model'.", + DeprecationWarning, + ) + llm_model = mlm_model + mlm_model = None + else: + raise ValueError( + "'mlm_model' is deprecated, and was renamed 'llm_model'. Do not use both at the same time. Just use 'llm_model' instead." + ) + ############################# + + self._llm_client = llm_client + self._llm_model = llm_model + self._style_map = style_map + + self._page_converters: List[DocumentConverter] = [] + + # Register converters for successful browsing operations + # Later registrations are tried first / take higher priority than earlier registrations + # To this end, the most specific converters should appear below the most generic converters + self.register_page_converter(PlainTextConverter()) + self.register_page_converter(HtmlConverter()) + self.register_page_converter(RSSConverter()) + self.register_page_converter(WikipediaConverter()) + self.register_page_converter(YouTubeConverter()) + self.register_page_converter(BingSerpConverter()) + self.register_page_converter(DocxConverter()) + self.register_page_converter(XlsxConverter()) + self.register_page_converter(PptxConverter()) + self.register_page_converter(WavConverter()) + self.register_page_converter(Mp3Converter()) + self.register_page_converter(ImageConverter()) + self.register_page_converter(IpynbConverter()) + self.register_page_converter(PdfConverter()) + self.register_page_converter(ZipConverter()) + + def convert( + self, source: Union[str, requests.Response, Path], **kwargs: Any + ) -> DocumentConverterResult: # TODO: deal with kwargs + """ + Args: + - source: can be a string representing a path either as string pathlib path object or url, or a requests.response object + - extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.) + """ + + # Local path or url + if isinstance(source, str): + if ( + source.startswith("http://") + or source.startswith("https://") + or source.startswith("file://") + ): + return self.convert_url(source, **kwargs) + else: + return self.convert_local(source, **kwargs) + # Request response + elif isinstance(source, requests.Response): + return self.convert_response(source, **kwargs) + elif isinstance(source, Path): + return self.convert_local(source, **kwargs) + + def convert_local( + self, path: Union[str, Path], **kwargs: Any + ) -> DocumentConverterResult: # TODO: deal with kwargs + if isinstance(path, Path): + path = str(path) + # Prepare a list of extensions to try (in order of priority) + ext = kwargs.get("file_extension") + extensions = [ext] if ext is not None else [] + + # Get extension alternatives from the path and puremagic + base, ext = os.path.splitext(path) + self._append_ext(extensions, ext) + + for g in self._guess_ext_magic(path): + self._append_ext(extensions, g) + + # Convert + return self._convert(path, extensions, **kwargs) + + # TODO what should stream's type be? + def convert_stream( + self, stream: Any, **kwargs: Any + ) -> DocumentConverterResult: # TODO: deal with kwargs + # Prepare a list of extensions to try (in order of priority) + ext = kwargs.get("file_extension") + extensions = [ext] if ext is not None else [] + + # Save the file locally to a temporary file. It will be deleted before this method exits + handle, temp_path = tempfile.mkstemp() + fh = os.fdopen(handle, "wb") + result = None + try: + # Write to the temporary file + content = stream.read() + if isinstance(content, str): + fh.write(content.encode("utf-8")) + else: + fh.write(content) + fh.close() + + # Use puremagic to check for more extension options + for g in self._guess_ext_magic(temp_path): + self._append_ext(extensions, g) + + # Convert + result = self._convert(temp_path, extensions, **kwargs) + # Clean up + finally: + try: + fh.close() + except Exception: + pass + os.unlink(temp_path) + + return result + + def convert_url( + self, url: str, **kwargs: Any + ) -> DocumentConverterResult: # TODO: fix kwargs type + # Send a HTTP request to the URL + response = self._requests_session.get(url, stream=True) + response.raise_for_status() + return self.convert_response(response, **kwargs) + + def convert_response( + self, response: requests.Response, **kwargs: Any + ) -> DocumentConverterResult: # TODO fix kwargs type + # Prepare a list of extensions to try (in order of priority) + ext = kwargs.get("file_extension") + extensions = [ext] if ext is not None else [] + + # Guess from the mimetype + content_type = response.headers.get("content-type", "").split(";")[0] + self._append_ext(extensions, mimetypes.guess_extension(content_type)) + + # Read the content disposition if there is one + content_disposition = response.headers.get("content-disposition", "") + m = re.search(r"filename=([^;]+)", content_disposition) + if m: + base, ext = os.path.splitext(m.group(1).strip("\"'")) + self._append_ext(extensions, ext) + + # Read from the extension from the path + base, ext = os.path.splitext(urlparse(response.url).path) + self._append_ext(extensions, ext) + + # Save the file locally to a temporary file. It will be deleted before this method exits + handle, temp_path = tempfile.mkstemp() + fh = os.fdopen(handle, "wb") + result = None + try: + # Download the file + for chunk in response.iter_content(chunk_size=512): + fh.write(chunk) + fh.close() + + # Use puremagic to check for more extension options + for g in self._guess_ext_magic(temp_path): + self._append_ext(extensions, g) + + # Convert + result = self._convert(temp_path, extensions, url=response.url, **kwargs) + # Clean up + finally: + try: + fh.close() + except Exception: + pass + os.unlink(temp_path) + + return result + + def _convert( + self, local_path: str, extensions: List[Union[str, None]], **kwargs + ) -> DocumentConverterResult: + error_trace = "" + for ext in extensions + [None]: # Try last with no extension + for converter in self._page_converters: + _kwargs = copy.deepcopy(kwargs) + + # Overwrite file_extension appropriately + if ext is None: + if "file_extension" in _kwargs: + del _kwargs["file_extension"] + else: + _kwargs.update({"file_extension": ext}) + + # Copy any additional global options + if "llm_client" not in _kwargs and self._llm_client is not None: + _kwargs["llm_client"] = self._llm_client + + if "llm_model" not in _kwargs and self._llm_model is not None: + _kwargs["llm_model"] = self._llm_model + + # Add the list of converters for nested processing + _kwargs["_parent_converters"] = self._page_converters + + if "style_map" not in _kwargs and self._style_map is not None: + _kwargs["style_map"] = self._style_map + + # If we hit an error log it and keep trying + try: + res = converter.convert(local_path, **_kwargs) + except Exception: + error_trace = ("\n\n" + traceback.format_exc()).strip() + + if res is not None: + # Normalize the content + res.text_content = "\n".join( + [line.rstrip() for line in re.split(r"\r?\n", res.text_content)] + ) + res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content) + + # Todo + return res + + # If we got this far without success, report any exceptions + if len(error_trace) > 0: + raise FileConversionException( + f"Could not convert '{local_path}' to Markdown. File type was recognized as {extensions}. While converting the file, the following error was encountered:\n\n{error_trace}" + ) + + # Nothing can handle it! + raise UnsupportedFormatException( + f"Could not convert '{local_path}' to Markdown. The formats {extensions} are not supported." + ) + + def _append_ext(self, extensions, ext): + """Append a unique non-None, non-empty extension to a list of extensions.""" + if ext is None: + return + ext = ext.strip() + if ext == "": + return + # if ext not in extensions: + extensions.append(ext) + + def _guess_ext_magic(self, path): + """Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes.""" + # Use puremagic to guess + try: + guesses = puremagic.magic_file(path) + extensions = list() + for g in guesses: + ext = g.extension.strip() + if len(ext) > 0: + if not ext.startswith("."): + ext = "." + ext + if ext not in extensions: + extensions.append(ext) + return extensions + except FileNotFoundError: + pass + except IsADirectoryError: + pass + except PermissionError: + pass + return [] + + def register_page_converter(self, converter: DocumentConverter) -> None: + """Register a page text converter.""" + self._page_converters.insert(0, converter) diff --git a/src/markitdown/exceptions.py b/src/markitdown/exceptions.py new file mode 100644 index 0000000..17b817b --- /dev/null +++ b/src/markitdown/exceptions.py @@ -0,0 +1,6 @@ +class FileConversionException(BaseException): + pass + + +class UnsupportedFormatException(BaseException): + pass