From 2d3e607ad1e8823f7b3f6e02e4ea8dd7a2cb6add Mon Sep 17 00:00:00 2001 From: "Ronald E. Robertson" Date: Mon, 26 Aug 2024 09:41:24 -0700 Subject: [PATCH] Merge dev: v0.4.1 (#61) * add: initial extract from left-bar layout * update: classify and parse url for images-multimedia * update: clean code * version: 0.4.1.dev0 * update: add handling for layouts circa Mar 2024 * version: 0.4.1.dev1 * update: rename text-based header classifier for clarity * update: rename header file for addtl clarity * add: pipeline for query notices component * update: filter empty divs util function * fix: initialize output dict * update: move top image parser to header section parsers * version: 0.4.1.dev2 * fix: missing cmpt_ranks due to empty ad components, filter before adding to the list * fix: broader filtering, sub_types, better title and url parser for medium * fix: handle ads and shopping ads extracted from same serp * update: handling for no subcomponents, pass error and text * clean: quotation formatting * update: readme example * version: 0.4.1.dev3 * update: add query suggestion variation, handle multiple suggestions, drop internal url * update: assert parsed list is not empty * update: reorg, clearer header extractors, handle shopping ads in ads * update: catch location query notices * update: rename query_notice to notice, includes location notices * version: 0.4.1.dev4 * fix: renaming, include more query edit notices * version: 0.4.1.dev5 * update: refactor notices parser as class * version: 0.4.1.dev6 * update: add language tip sub type * update: grab notice divs more directly * fix: wrong get_url usage for images urls * version: 0.4.1.dev7 * Bump to 0.4.1 --- README.md | 54 +++--- WebSearcher/__init__.py | 2 +- WebSearcher/classifiers/__init__.py | 3 +- WebSearcher/classifiers/header_components.py | 15 ++ .../{headers.py => header_text.py} | 41 +++-- WebSearcher/classifiers/main.py | 4 +- WebSearcher/component_parsers/__init__.py | 14 +- WebSearcher/component_parsers/ads.py | 61 ++++-- WebSearcher/component_parsers/banner.py | 97 +++++----- WebSearcher/component_parsers/images.py | 45 +++-- WebSearcher/component_parsers/notices.py | 173 ++++++++++++++++++ WebSearcher/components.py | 60 +++--- WebSearcher/extractors.py | 87 ++++++--- WebSearcher/webutils.py | 15 +- setup.py | 112 ++++++------ 15 files changed, 526 insertions(+), 257 deletions(-) create mode 100644 WebSearcher/classifiers/header_components.py rename WebSearcher/classifiers/{headers.py => header_text.py} (75%) create mode 100644 WebSearcher/component_parsers/notices.py diff --git a/README.md b/README.md index 7f55984..74c1602 100644 --- a/README.md +++ b/README.md @@ -50,54 +50,60 @@ se = ws.SearchEngine() vars(se) ``` ```python -{'url': 'https://www.google.com/search', - 'params': {}, +{'version': '0.4.1', + 'base_url': 'https://www.google.com/search', 'headers': {'Host': 'www.google.com', 'Referer': 'https://www.google.com/', 'Accept': '*/*', - 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip,deflate,br', - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0'}, + 'Accept-Language': 'en-US,en;q=0.5', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/118.0'}, + 'sesh': , 'ssh_tunnel': None, - 'sesh': , - 'log': , + 'unzip': True, + 'params': {}, + 'qry': None, + 'loc': None, + 'num_results': None, + 'url': None, + 'timestamp': None, + 'serp_id': None, + 'crawl_id': None, + 'response': None, 'html': None, 'results': [], - 'results_html': []} + 'log': } ``` #### Conduct a search ```python # Conduct Search -se.search('immigration') +se.search('immigration news') ``` ``` -2019-08-14 01:25:38,267 | 2688 | INFO | WebSearcher.searchers | 200 | Searching immigration +2024-08-19 14:09:18.502 | INFO | WebSearcher.searchers | 200 | immigration news ``` ```python # Parse Results se.parse_results() ``` -``` -2019-08-14 01:25:42,208 | 2688 | INFO | WebSearcher.parsers | Parsing SERP 4d4fe27fe6b6466041e326622719b03ccc6542427c577c69740ae7fc -``` ```python se.results[0] -{'cite': 'The New York Times', +{'section': 'main', 'cmpt_rank': 0, - 'details': {'img_url': None, 'live_stamp': False, 'orient': 'h'}, - 'lang': 'en', - 'qry': 'immigration', - 'serp_id': '4d4fe27fe6b6466041e326622719b03ccc6542427c577c69740ae7fc', - 'serp_rank': 0, 'sub_rank': 0, - 'timestamp': '1 day ago', - 'title': 'Trump Policy Favors Wealthier Immigrants for Green Cards', 'type': 'top_stories', - 'url': 'https://www.nytimes.com/2019/08/12/us/politics/trump-immigration-policy.html'} + 'sub_type': None, + 'title': 'Biden citizenship program for migrant spouses in US launches', + 'url': 'https://www.newsnationnow.com/us-news/immigration/biden-citizenship-program-migrant-spouses-us-launches/', + 'text': None, + 'cite': 'NewsNation', + 'details': None, + 'error': None, + 'serp_rank': 0} ``` ### Save a Search @@ -140,9 +146,9 @@ Happy to have help! If you see a component that we aren't covering yet, please a ### Add a Parser -1. Add classifier to `component_classifier.py`, as `'cmpt_name'` -2. Add parser file in `/component_parsers` as `cmpt_name.py`, with function `parse_cmpt_name`. -3. Add import for `parse_cmpt_name` in `/component_parsers/__init__.py` +1. Add classifier to `classifiers/{main,footer,headers}.py` +2. Add parser as new file in `/component_parsers` +3. Add new parser to imports and catalogue in `/component_parsers/__init__.py` ### Testing Run tests: diff --git a/WebSearcher/__init__.py b/WebSearcher/__init__.py index af5b3dc..f4e0c30 100644 --- a/WebSearcher/__init__.py +++ b/WebSearcher/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.4.0" +__version__ = "0.4.1" from .searchers import SearchEngine from .parsers import parse_serp from .extractors import Extractor diff --git a/WebSearcher/classifiers/__init__.py b/WebSearcher/classifiers/__init__.py index f070a5b..44c05bf 100644 --- a/WebSearcher/classifiers/__init__.py +++ b/WebSearcher/classifiers/__init__.py @@ -1,3 +1,4 @@ -from .headers import ClassifyByHeader +from .header_text import ClassifyHeaderText +from .header_components import ClassifyHeaderComponent from .main import ClassifyMain from .footer import ClassifyFooter \ No newline at end of file diff --git a/WebSearcher/classifiers/header_components.py b/WebSearcher/classifiers/header_components.py new file mode 100644 index 0000000..c88218e --- /dev/null +++ b/WebSearcher/classifiers/header_components.py @@ -0,0 +1,15 @@ +from .. import webutils +import bs4 + + +class ClassifyHeaderComponent: + """Classify a component from the header section based on its bs4.element.Tag""" + + @staticmethod + def classify(cmpt: bs4.element.Tag) -> str: + """Classify the component type based on header text""" + + cmpt_type = "unknown" + if webutils.check_dict_value(cmpt.attrs, "id", ["taw", "topstuff"]): + cmpt_type = "notice" + return cmpt_type diff --git a/WebSearcher/classifiers/headers.py b/WebSearcher/classifiers/header_text.py similarity index 75% rename from WebSearcher/classifiers/headers.py rename to WebSearcher/classifiers/header_text.py index 8a38c32..7369c69 100644 --- a/WebSearcher/classifiers/headers.py +++ b/WebSearcher/classifiers/header_text.py @@ -1,35 +1,35 @@ import bs4 -class ClassifyByHeader: +class ClassifyHeaderText: """Classify components based on header text (e.g.

title

)""" @staticmethod def classify(cmpt: bs4.element.Tag, levels: list[int] = [2, 3]) -> str: for level in levels: - header = ClassifyByHeader._classify_header(cmpt, level) + header = ClassifyHeaderText._classify_header(cmpt, level) if header != "unknown": return header return "unknown" @staticmethod def classify_header_lvl2(cmpt: bs4.element.Tag) -> str: - return ClassifyByHeader._classify_header(cmpt, level=2) + return ClassifyHeaderText._classify_header(cmpt, level=2) @staticmethod def classify_header_lvl3(cmpt: bs4.element.Tag) -> str: - return ClassifyByHeader._classify_header(cmpt, level=3) + return ClassifyHeaderText._classify_header(cmpt, level=3) @staticmethod def _classify_header(cmpt: bs4.element.Tag, level: int) -> str: """Check text in common headers for dict matches""" - header_dict = ClassifyByHeader._get_header_level_mapping(level) + header_dict = ClassifyHeaderText._get_header_level_mapping(level) # Collect list of potential header divs header_list = [] header_list.extend(cmpt.find_all(f"h{level}", {"role":"heading"})) header_list.extend(cmpt.find_all(f"h{level}", {"class":["O3JH7", "q8U8x"]})) - header_list.extend(cmpt.find_all("div", {'aria-level':f"{level}", "role":"heading"})) - header_list.extend(cmpt.find_all("div", {'aria-level':f"{level}", "class":"XmmGVd"})) + header_list.extend(cmpt.find_all("div", {"aria-level":f"{level}", "role":"heading"})) + header_list.extend(cmpt.find_all("div", {"aria-level":f"{level}", "class":"XmmGVd"})) # Check header text for known title matches for header in filter(None, header_list): @@ -42,8 +42,8 @@ def _classify_header(cmpt: bs4.element.Tag, level: int) -> str: @staticmethod def _get_header_level_mapping(level) -> dict: """Return mapping of header level to header text""" - options = {2: ClassifyByHeader.TYPE_TO_H2_MAPPING, - 3: ClassifyByHeader.TYPE_TO_H3_MAPPING} + options = {2: ClassifyHeaderText.TYPE_TO_H2_MAPPING, + 3: ClassifyHeaderText.TYPE_TO_H3_MAPPING} return options.get(level, {}) # WS type -> header level 2 text (e.g.,

title

) @@ -54,6 +54,7 @@ def _get_header_level_mapping(level) -> dict: "Resultados de la Web", "Web Result with Site Links", "Web results"], + "images": ["Images"], "jobs": ["Jobs"], "knowledge": ["Calculator Result", "Featured snippet from the web", @@ -86,17 +87,17 @@ def _get_header_level_mapping(level) -> dict: # WS type -> header level 2 text (e.g.,

title

) TYPE_TO_H3_MAPPING = { - 'images': ['Images for'], - 'latest_from': ['Latest from'], - 'products': ['Popular products'], - 'news_quotes': ['Quotes in the news'], - 'recipes': ['Recipes'], - 'searches_related': ['Related searches'], - 'scholarly_articles': ['Scholarly articles for'], - 'top_stories': ['Top stories'], - 'videos': ['Videos'], - 'view_more_news': ['View more news'], - 'view_more_videos': ['View more videos'] + "images": ["Images for"], + "latest_from": ["Latest from"], + "products": ["Popular products"], + "news_quotes": ["Quotes in the news"], + "recipes": ["Recipes"], + "searches_related": ["Related searches"], + "scholarly_articles": ["Scholarly articles for"], + "top_stories": ["Top stories"], + "videos": ["Videos"], + "view_more_news": ["View more news"], + "view_more_videos": ["View more videos"] } # Invert from {label: [text, ...]} to [{text: label}, ...] diff --git a/WebSearcher/classifiers/main.py b/WebSearcher/classifiers/main.py index 441e190..d14963e 100644 --- a/WebSearcher/classifiers/main.py +++ b/WebSearcher/classifiers/main.py @@ -1,4 +1,4 @@ -from .headers import ClassifyByHeader +from .header_text import ClassifyHeaderText from .. import webutils import bs4 @@ -11,7 +11,7 @@ def classify(cmpt: bs4.element.Tag) -> str: # Ordered list of classifiers to try component_classifiers = [ ClassifyMain.top_stories, # Check top stories - ClassifyByHeader.classify, # Check levels 2 & 3 header text + ClassifyHeaderText.classify, # Check levels 2 & 3 header text ClassifyMain.img_cards, # Check image cards ClassifyMain.images, # Check images ClassifyMain.knowledge_panel, # Check knowledge panel diff --git a/WebSearcher/component_parsers/__init__.py b/WebSearcher/component_parsers/__init__.py index 3f16791..aaff223 100644 --- a/WebSearcher/component_parsers/__init__.py +++ b/WebSearcher/component_parsers/__init__.py @@ -1,3 +1,7 @@ + +from .notices import parse_notices +from .top_image_carousel import parse_top_image_carousel + from .ads import parse_ads from .available_on import parse_available_on from .banner import parse_banner @@ -19,7 +23,6 @@ from .scholarly_articles import parse_scholarly_articles from .searches_related import parse_searches_related from .shopping_ads import parse_shopping_ads -from .top_image_carousel import parse_top_image_carousel from .twitter_cards import parse_twitter_cards from .twitter_result import parse_twitter_result from .videos import parse_videos @@ -28,6 +31,14 @@ from .footer import Footer from .knowledge_rhs import parse_knowledge_rhs +# Header parsers +header_parsers = [ + ("notice", parse_notices, "Notices"), + ('top_image_carousel', parse_top_image_carousel, 'Top Image Carousel'), +] +header_parser_dict = {i[0]:i[1] for i in header_parsers} # Format {type: function} +header_parser_labels = {i[0]:i[2] for i in header_parsers} # Format {type: label} + # Component details dataframe columns = ['type', 'func', 'label'] main_parsers = [ @@ -49,7 +60,6 @@ ('scholarly_articles', parse_scholarly_articles, 'Scholar Articles'), ('searches_related', parse_searches_related, 'Related Searches'), ('shopping_ads', parse_shopping_ads, 'Shopping Ad'), - ('top_image_carousel', parse_top_image_carousel, 'Top Image Carousel'), ('top_stories', parse_top_stories, 'Top Stories'), ('twitter_cards', parse_twitter_cards, 'Twitter Cards'), ('twitter_result', parse_twitter_result, 'Twitter Result'), diff --git a/WebSearcher/component_parsers/ads.py b/WebSearcher/component_parsers/ads.py index 9916454..c6213da 100644 --- a/WebSearcher/component_parsers/ads.py +++ b/WebSearcher/component_parsers/ads.py @@ -11,25 +11,43 @@ from .. import webutils from ..models import BaseResult +from .shopping_ads import parse_shopping_ads import bs4 -def parse_ads(cmpt: bs4.element.Tag): +def parse_ads(cmpt: bs4.element.Tag) -> list: """Parse ads from ad component""" - if cmpt.find_all('li', {'class':'ads-ad'}): - # Check for legacy ad format - subs = cmpt.find_all('li', {'class':'ads-ad'}) - parser = parse_ad_legacy - elif cmpt.find_all('li', {'class':'ads-fr'}): - # Check for secondary ad format - subs = cmpt.find_all('li', {'class':'ads-fr'}) - parser = parse_ad_secondary - else: - # Check for latest ad format - subs = cmpt.find_all('div', {'class':'uEierd'}) - parser = parse_ad - - return [parser(sub, sub_rank) for sub_rank, sub in enumerate(subs)] + parsed = [] + sub_type = classify_ad_type(cmpt) + + if sub_type == 'legacy': + subs = cmpt.find_all('li', {'class': 'ads-ad'}) + parsed = [parse_ad_legacy(sub, sub_rank) for sub_rank, sub in enumerate(subs)] + elif sub_type == 'secondary': + subs = cmpt.find_all('li', {'class': 'ads-fr'}) + parsed = [parse_ad_secondary(sub, sub_rank) for sub_rank, sub in enumerate(subs)] + elif sub_type == 'standard': + subs = webutils.find_all_divs(cmpt, 'div', {'class': ['uEierd', 'commercial-unit-desktop-top']}) + for sub in subs: + sub_classes = sub.attrs.get("class", []) + if "commercial-unit-desktop-top" in sub_classes: + parsed.extend(parse_shopping_ads(sub)) + elif "uEierd" in sub_classes: + parsed.append(parse_ad(sub)) + return [BaseResult(**parsed_item).model_dump() for parsed_item in parsed] + + +def classify_ad_type(cmpt: bs4.element.Tag) -> str: + """Classify the type of ad component""" + label_divs = { + "legacy": webutils.find_all_divs(cmpt, 'div', {'class': 'ad_cclk'}), + "secondary": webutils.find_all_divs(cmpt, 'div', {'class': 'd5oMvf'}), + "standard": webutils.find_all_divs(cmpt, 'div', {'class': ['uEierd', 'commercial-unit-desktop-top']}) + } + for label, divs in label_divs.items(): + if divs: + return label + return 'unknown' def parse_ad(sub: bs4.element.Tag, sub_rank: int = 0) -> dict: @@ -56,8 +74,7 @@ def parse_ad(sub: bs4.element.Tag, sub_rank: int = 0) -> dict: parsed['sub_type'] = 'submenu' parsed['details'] = submenu - validated = BaseResult(**parsed) - return validated.model_dump() + return parsed def parse_ad_menu(sub: bs4.element.Tag) -> list: @@ -81,7 +98,9 @@ def parse_ad_menu(sub: bs4.element.Tag) -> list: def parse_ad_secondary(sub: bs4.element.Tag, sub_rank: int = 0) -> dict: """Parse details of a single ad subcomponent, similar to general""" - parsed = {'type':'ad', 'sub_rank':sub_rank} + parsed = {"type": "ad", + "sub_type": "secondary", + "sub_rank": sub_rank} parsed['title'] = sub.find('div', {'role':'heading'}).text parsed['url'] = sub.find('div', {'class':'d5oMvf'}).find('a')['href'] parsed['cite'] = sub.find('span', {'class':'gBIQub'}).text @@ -103,10 +122,12 @@ def parse_ad_secondary(sub: bs4.element.Tag, sub_rank: int = 0) -> dict: return parsed -def parse_ad_secondary(sub: bs4.element.Tag, sub_rank: int = 0) -> dict: +def parse_ad_legacy(sub: bs4.element.Tag, sub_rank: int = 0) -> dict: """[legacy] Parse details of a single ad subcomponent, similar to general""" - parsed = {'type':'ad', 'sub_rank':sub_rank} + parsed = {"type": "ad", + "sub_type": "legacy", + "sub_rank": sub_rank} header = sub.find('div', {'class':'ad_cclk'}) parsed['title'] = header.find('h3').text parsed['url'] = header.find('cite').text diff --git a/WebSearcher/component_parsers/banner.py b/WebSearcher/component_parsers/banner.py index b525669..e45bdf2 100644 --- a/WebSearcher/component_parsers/banner.py +++ b/WebSearcher/component_parsers/banner.py @@ -1,55 +1,42 @@ -from ..models import BaseResult - - -def parse_banner(cmpt): - """Parse a search suggestion component - - Args: - cmpt (bs4 object): A search suggestion component - - Returns: - list: List of BannerResult objects, with the main component and its subcomponents - """ - banner_results = [] - - # Header subcomponent - banner_result_header = BaseResult( - type='banner', - sub_type='header', - sub_rank=0, - title=get_result_text(cmpt, '.v3jTId'), - text=get_result_text(cmpt, '.Cy9gW'), - ) - banner_results.append(banner_result_header) - - # Suggestion subcomponents - for i, suggestion in enumerate(cmpt.select('.TjBpC')): - banner_result_suggestion = BaseResult( - type='banner', - sub_type='suggestion', - sub_rank=i + 1, - title=get_result_text(suggestion, '.AbPV3'), - url=suggestion.get('href') - ) - banner_results.append(banner_result_suggestion) - - return [banner.model_dump() for banner in banner_results] - -def get_result_text(cmpt, selector): - if cmpt.select_one(selector): - return cmpt.select_one(selector).get_text(strip=True) - else: - return "" - - -# ------------------------------------------------------------------------------ - -# with open("examples/banner-topicality.html", "r") as f: -# html = f.read() - -# wraprint(html, width=150) -# # print(ws.make_soup(html).prettify()) - -# l = parse_banner(ws.make_soup(html)) -# df = pd.DataFrame([i.model_dump() for i in l]) -# df \ No newline at end of file +from ..models import BaseResult + + +def parse_banner(cmpt): + """Parse a warning banner component + + Args: + cmpt (bs4 object): A search suggestion component + + Returns: + list: List of BannerResult objects, with the main component and its subcomponents + """ + banner_results = [] + + # Header subcomponent + banner_result_header = BaseResult( + type='banner', + sub_type='header', + sub_rank=0, + title=get_result_text(cmpt, '.v3jTId'), + text=get_result_text(cmpt, '.Cy9gW'), + ) + banner_results.append(banner_result_header) + + # Suggestion subcomponents + for i, suggestion in enumerate(cmpt.select('.TjBpC')): + banner_result_suggestion = BaseResult( + type='banner', + sub_type='suggestion', + sub_rank=i + 1, + title=get_result_text(suggestion, '.AbPV3'), + url=suggestion.get('href') + ) + banner_results.append(banner_result_suggestion) + + return [banner.model_dump() for banner in banner_results] + +def get_result_text(cmpt, selector): + if cmpt.select_one(selector): + return cmpt.select_one(selector).get_text(strip=True) + else: + return "" diff --git a/WebSearcher/component_parsers/images.py b/WebSearcher/component_parsers/images.py index 3d2d172..61b1a67 100644 --- a/WebSearcher/component_parsers/images.py +++ b/WebSearcher/component_parsers/images.py @@ -1,6 +1,5 @@ from ..models import BaseResult -from ..webutils import get_text, get_link - +from ..webutils import get_text, get_link, get_div def parse_images(cmpt): """Parse an image component @@ -16,28 +15,31 @@ def parse_images(cmpt): # Small images: thumbnails with text labels if cmpt.find('g-expandable-container'): - subs_small = cmpt.find_all('a', {'class': 'dgdd6c'}) - parsed_small = [parse_image_small(div, sub_rank) for sub_rank, div in enumerate(subs_small)] - parsed.extend(parsed_small) + subs = cmpt.find_all('a', {'class': 'dgdd6c'}) + sub_type = 'small' + parsed_subs = [parse_image_small(div, sub_rank) for sub_rank, div in enumerate(subs)] + parsed.extend(parsed_subs) if cmpt.find('g-scrolling-carousel'): # Medium images or video previews, no text labels subs = cmpt.find_all('div', {'class':'eA0Zlc'}) - _parsed = [parse_image_multimedia(sub, sub_rank + len(parsed)) for sub_rank, sub in enumerate(subs)] - parsed.extend(_parsed) + sub_type = 'multimedia' + parsed_subs = [parse_image_multimedia(sub, sub_rank + len(parsed)) for sub_rank, sub in enumerate(subs)] + parsed.extend(parsed_subs) else: # Medium images with titles and urls subs = cmpt.find_all('div', {'class':'eA0Zlc'}) - _parsed = [parse_image_medium(sub, sub_rank + len(parsed)) for sub_rank, sub in enumerate(subs)] - parsed.extend(_parsed) + sub_type = 'medium' + parsed_subs = [parse_image_medium(sub, sub_rank + len(parsed)) for sub_rank, sub in enumerate(subs)] + parsed.extend(parsed_subs) # Filter empty results - parsed = [p for p in parsed if p['title']] - + parsed = [p for p in parsed if any([p['title'], p['url'], p['text']])] + return parsed -def parse_image_multimedia(sub, sub_rank=0): +def parse_image_multimedia(sub, sub_rank=0) -> dict: """Parse an image subcomponent Args: @@ -52,7 +54,7 @@ def parse_image_multimedia(sub, sub_rank=0): sub_type="multimedia", sub_rank=sub_rank, title=get_img_alt(sub), - # url=get_img_url(sub), # dynamic load, no source url via requests + url=get_img_url(sub), ) return parsed.model_dump() @@ -66,14 +68,17 @@ def parse_image_medium(sub, sub_rank=0): Returns: dict : parsed subresult """ - + title_div = get_div(sub, 'a', {'class':'EZAeBe'}) + title = get_text(title_div) if title_div else get_img_alt(sub) + url = get_link(sub) if title_div else get_img_url(sub) + parsed = BaseResult( type="images", sub_type="medium", sub_rank=sub_rank, - title=get_text(sub, 'a', {'class':'EZAeBe'}), - url=get_link(sub, {'class':'EZAeBe'}), + title=title, + url=url, cite=get_text(sub, 'div', {'class':'ptes9b'}) ) return parsed.model_dump() @@ -98,17 +103,17 @@ def parse_image_small(sub, sub_rank=0): return parsed.model_dump() -def get_img_url(soup): +def get_img_url(sub): """Get image source""" try: - return soup.find('img').attrs['src'] + return sub.attrs['data-lpage'] except Exception: return None -def get_img_alt(soup): +def get_img_alt(sub): """Get image alt text""" try: - return f"alt-text: {soup.find('img').attrs['alt']}" + return f"alt-text: {sub.find('img').attrs['alt']}" except Exception: return None \ No newline at end of file diff --git a/WebSearcher/component_parsers/notices.py b/WebSearcher/component_parsers/notices.py new file mode 100644 index 0000000..1f662b3 --- /dev/null +++ b/WebSearcher/component_parsers/notices.py @@ -0,0 +1,173 @@ +import re +import copy +from ..models import BaseResult +from ..webutils import get_text, get_link + + +def parse_notices(cmpt) -> list: + notice_parser = NoticeParser() + return notice_parser.parse_notices(cmpt) + + +class NoticeParser: + def __init__(self): + self.parsed = {} + self.sub_type = "unknown" + self.sub_type_text = { + "query_edit": {"Showing results for", "Including results for"}, + "query_edit_no_results": {"No results found for"}, + "query_suggestion": { + "Did you mean:", + "Are you looking for:", + "Search for this instead?", + "Did you mean to search for:", + "Search instead for:" + }, + "location_choose_area": {"Results for", "Choose area"}, + "location_use_precise_location": {"Results for", "Use precise location"}, + "language_tip": {"Tip:", "Learn more about filtering by language"} + } + self.parser_dict = { + 'query_edit': self._parse_query_edit, + 'query_edit_no_results': self._parse_no_results_replacement, + 'query_suggestion': self._parse_query_suggestion, + 'location_choose_area': self._parse_location_choose_area, + 'location_use_precise_location': self._parse_location_use_precise_location, + 'language_tip': self._parse_language_tip + } + + def parse_notices(self, cmpt) -> list: + """Parse a query notices component""" + + self._classify_sub_type(cmpt) + self._parse_sub_type(cmpt) + self._validate_parsed() + return self.parsed + + def _classify_sub_type(self, cmpt) -> str: + """Classify the sub-type of a query notice component""" + cmpt_text = cmpt.text.strip() + cmpt_text = re.sub(r'\s+', ' ', cmpt_text) + + for sub_type, text_list in self.sub_type_text.items(): + if sub_type.startswith("location_"): + if all(text in cmpt_text for text in text_list): + self.sub_type = sub_type + break + elif sub_type.startswith("query_"): + if any(text in cmpt_text for text in text_list): + self.sub_type = sub_type + break + elif sub_type.startswith("language_"): + if all(text in cmpt_text for text in text_list): + self.sub_type = sub_type + break + + + def _parse_sub_type(self, cmpt): + sub_parser = self.parser_dict.get(self.sub_type, None) + if sub_parser: + self.parsed = sub_parser(cmpt) + + def _validate_parsed(self): + result = BaseResult( + type='notice', + sub_type=self.sub_type, + sub_rank=0, + title=self.parsed.get('title', None), + text=self.parsed.get('text', None), + ) + self.parsed = [result.model_dump()] + + def _parse_no_results_replacement(self, cmpt): + output = {"title": None, "text": None} + + cmpt = copy.copy(cmpt) + div_title = cmpt.find('div', {'role':'heading', 'aria-level': '2'}) + if div_title: + output['title'] = div_title.text.strip() + div_title.extract() + + div_text = cmpt.find("div", {"class": "card-section"}) + if div_text: + output['text'] = div_text.text.strip() + + return output + + + def _parse_query_edit(self, cmpt): + output = {"title": None, "text": None} + showing_results_span = cmpt.find('span', class_='gL9Hy') + if showing_results_span: + output['title'] = showing_results_span.text.strip() + + modified_query_link = cmpt.find('a', id='fprsl') + if modified_query_link: + modified_query = modified_query_link.text.strip() + output['title'] += f" {modified_query}" + + search_instead_span = cmpt.find('span', class_='spell_orig') + if search_instead_span: + output['text'] = search_instead_span.text.strip() + + original_query_link = cmpt.find('a', class_='spell_orig') + if original_query_link: + original_query = original_query_link.text.strip() + output['text'] += f" {original_query}" + return output + + def _parse_query_suggestion(self, cmpt): + output = {"title": None, "text": None} + + # check in div and span with same class + cmpt_checks = { + cmpt.find('span', class_='gL9Hy'), + cmpt.find('div', class_='gL9Hy') + } + for cmpt_check in cmpt_checks: + if cmpt_check: + output['title'] = cmpt_check.text.strip() + break + + suggestion_links = cmpt.find_all('a', class_='gL9Hy') + suggested_queries = [get_text(suggestion_link) for suggestion_link in suggestion_links if suggestion_link] + output['text'] = '<|>'.join(suggested_queries) + + return output + + def _parse_location_choose_area(self, cmpt): + output = {"title": None, "text": None} + + # Extract the main heading + heading = cmpt.find('div', class_='eKPi4') + if heading: + results_for_span = heading.find('span', class_='gm7Ysb') + location_span = heading.find('span', class_='BBwThe') + + if results_for_span and location_span: + output['title'] = f"{results_for_span.text.strip()} {location_span.text.strip()}" + + return output + + def _parse_location_use_precise_location(self, cmpt): + output = {"title": None, "text": None} + + # Extract the main heading + heading = cmpt.find('div', class_='eKPi4') + if heading: + results_for_span = heading.find('span', class_='gm7Ysb') + location_span = heading.find('span', class_='BBwThe') + + if results_for_span and location_span: + output['title'] = f"{results_for_span.text.strip()} {location_span.text.strip()}" + + return output + + def _parse_language_tip(self, cmpt): + output = {"title": None, "text": None} + title_div = cmpt.find('div', class_='Ww4FFb') + if title_div: + output['title'] = re.sub(r'\s+', ' ', title_div.text) + + return output + diff --git a/WebSearcher/components.py b/WebSearcher/components.py index 55413d9..a71ecf1 100644 --- a/WebSearcher/components.py +++ b/WebSearcher/components.py @@ -1,6 +1,7 @@ from .models import BaseResult -from .classifiers import ClassifyMain, ClassifyFooter -from .component_parsers import main_parser_dict, footer_parser_dict, parse_unknown, parse_not_implemented +from .classifiers import ClassifyMain, ClassifyFooter, ClassifyHeaderComponent +from .component_parsers import main_parser_dict, footer_parser_dict, header_parser_dict +from .component_parsers import parse_unknown, parse_not_implemented from .logger import Logger log = Logger().start(__name__) @@ -9,7 +10,7 @@ from typing import Dict class Component: - def __init__(self, elem: bs4.element.Tag, section="unknown", type='unknown', cmpt_rank=None): + def __init__(self, elem: bs4.element.Tag, section="unknown", type="unknown", cmpt_rank=None): self.elem: bs4.element.Tag = elem self.section: str = section self.type = type @@ -32,6 +33,9 @@ def classify_component(self, classify_type_func: callable = None): self.type = classify_type_func(self.elem) else: if self.type == "unknown": + if self.section == "header": + self.type = ClassifyHeaderComponent.classify(self.elem) + log.debug(f"header classification: {self.type}") if self.section == "main": self.type = ClassifyMain.classify(self.elem) elif self.section == "footer": @@ -40,52 +44,66 @@ def classify_component(self, classify_type_func: callable = None): def parse_component(self, parser_type_func: callable = None): log.debug(f"parsing: {self.cmpt_rank} | {self.section} | {self.type}") - assert self.type, 'Null component type' + assert self.type, "Null component type" if not parser_type_func: - # Assign parser function and run on component try: - if self.type == 'unknown': + if self.type == "unknown": parsed_list = parse_unknown(self) + if self.section == "header": + header_parser = header_parser_dict.get(self.type, None) + parsed_list = header_parser(self.elem) + elif self.type not in main_parser_dict and self.type not in footer_parser_dict: parsed_list = parse_not_implemented(self) - elif self.section == 'footer': + elif self.section == "footer": footer_parser = footer_parser_dict.get(self.type, None) parsed_list = footer_parser(self.elem) - elif self.section in {'main', 'header', 'rhs'}: + elif self.section in {"main", "header", "rhs"}: # TODO: Update component_parsers/* to accept a Component object, currently expects a bs4 element main_parser = main_parser_dict.get(self.type, None) parsed_list = main_parser(self.elem) except Exception: - log.exception(f'Parsing Exception | {self.cmpt_rank} | {self.type}') - parsed_list = [{'type': self.type, - 'cmpt_rank': self.cmpt_rank, - 'text': self.elem.get_text("<|>", strip=True), - 'error': traceback.format_exc()}] + log.exception(f"Parsing Exception | {self.cmpt_rank} | {self.type}") + parsed_list = [{"type": self.type, + "cmpt_rank": self.cmpt_rank, + "text": self.elem.get_text("<|>", strip=True), + "error": traceback.format_exc()}] else: # Run provided parser function on component try: parser_type_func(self) except Exception: - log.exception(f'Parsing Exception | {self.cmpt_rank} | {self.type}') - parsed_list = [{'type': self.type, - 'cmpt_rank': self.cmpt_rank, - 'text': self.elem.get_text("<|>", strip=True), - 'error': traceback.format_exc()}] + log.exception(f"Parsing Exception | {self.cmpt_rank} | {self.type}") + parsed_list = [{"type": self.type, + "cmpt_rank": self.cmpt_rank, + "text": self.elem.get_text("<|>", strip=True), + "error": traceback.format_exc()}] + + + # Check for empty results list + if len(parsed_list) == 0: + log.debug(f"No subcomponents parsed for {self.cmpt_rank} | {self.type}") + parsed_list = [{"type": self.type, + "cmpt_rank": self.cmpt_rank, + "text": self.elem.get_text("<|>", strip=True), + "error": "No results parsed"}] # Track parsed results - assert type(parsed_list) in [list, dict], f'parser output must be list or dict: {type(parsed_list)}' + assert type(parsed_list) in [list, dict], f"parser output must be list or dict: {type(parsed_list)}" + assert len(parsed_list) > 0, f"Empty parsed list: {parsed_list}" parsed_list = parsed_list if isinstance(parsed_list, list) else [parsed_list] self.add_parsed_result_list(parsed_list) def add_parsed_result_list(self, parsed_result_list): """Add a list of parsed results with BaseResult validation to results_list""" + assert len(parsed_result_list) > 0, "Empty parsed result list" for parsed_result in parsed_result_list: self.add_parsed_result(parsed_result) @@ -116,7 +134,7 @@ def __iter__(self): for component in self.components: yield component - def add_component(self, elem:bs4.element.Tag, section="unknown", type='unknown', cmpt_rank=None): + def add_component(self, elem:bs4.element.Tag, section="unknown", type="unknown", cmpt_rank=None): """Add a component to the list of components""" cmpt_rank = self.cmpt_rank_counter if not cmpt_rank else cmpt_rank component = Component(elem, section, type, cmpt_rank) @@ -129,7 +147,7 @@ def export_component_results(self): results = [] for cmpt in self.components: for result in cmpt.export_results(): - result['serp_rank'] = self.serp_rank_counter + result["serp_rank"] = self.serp_rank_counter results.append(result) self.serp_rank_counter += 1 return results diff --git a/WebSearcher/extractors.py b/WebSearcher/extractors.py index 55a1a22..6815061 100644 --- a/WebSearcher/extractors.py +++ b/WebSearcher/extractors.py @@ -1,4 +1,5 @@ from .components import Component, ComponentList +from . import utils from . import webutils from . import logger log = logger.Logger().start(__name__) @@ -65,53 +66,92 @@ def extract_rhs(self): def append_rhs(self): """Append the RHS Knowledge Panel to the components list at the end""" if self.rhs: + log.debug(f"appending rhs") self.components.add_component(**self.rhs) self.rhs = None + # -------------------------------------------------------------------------- # Header Components # -------------------------------------------------------------------------- def extract_header(self): """Extract the header section, often a carousel of images or other suggestions.""" + self.extract_top_bar() + self.extract_notices() + + + def extract_top_bar(self): + """Extract the top bar section, often a carousel of images or other suggestions.""" top_bar = self.soup.find('div', {'id':'appbar'}) if top_bar: has_img = top_bar.find(lambda tag: tag.has_attr('src') and not tag.has_attr('data-src')) if top_bar.find('g-scrolling-carousel') and has_img: self.components.add_component(top_bar, section='header', type='top_image_carousel') + + def extract_notices(self): + """Append notices to the components list at the end""" + notices = webutils.find_all_divs(self.soup, "div", {"id": "oFNiHe"}) + notices = webutils.filter_empty_divs(notices) + + log.debug(f"notices: {len(notices)}") + for notice in notices: + self.components.add_component(notice, section="header", type="notice") + # -------------------------------------------------------------------------- # Main Components # -------------------------------------------------------------------------- def extract_main(self): """Extract the main results sections of the SERP""" - self.extract_main_shopping_ads() + # self.extract_main_shopping_ads() self.extract_main_ads_top() self.extract_main_components() self.extract_main_ads_bottom() - - def extract_main_shopping_ads(self): - """Extract the main shopping ads section of the SERP""" - shopping_ads = self.soup.find('div', {'class': 'commercial-unit-desktop-top'}) - if shopping_ads: - self.components.add_component(shopping_ads, section='main', type='shopping_ads') + + # def extract_main_shopping_ads(self): + # """Extract the main shopping ads section of the SERP""" + # shopping_ads = self.soup.find('div', {'class': 'commercial-unit-desktop-top'}) + # if shopping_ads: + # self.components.add_component(shopping_ads, section='main', type='shopping_ads') def extract_main_ads_top(self): """Extract the main ads section of the SERP""" ads = self.soup.find('div', {'id':'tads'}) - if ads: + if ads and webutils.get_text(ads): + # Filter if already extracted as shopping ads + # if not ads.find('div', {'class': 'commercial-unit-desktop-top'}): self.components.add_component(ads, section='main', type='ad') + def extract_main_components(self, drop_tags: set={'script', 'style', None}): + """Extract main components based on SERP layout""" + log.debug("Extracting main column components") + self.check_layout_main() + try: + layout_extractor = self.layout_extractors[self.layout_label] + column = layout_extractor(drop_tags) + for component in column: + if Extractor.is_valid_main_component(component): + self.components.add_component(component, section='main') + except KeyError: + raise ValueError(f"no extractor for layout_label: {self.layout_label}") + log.debug(f"Extracted main components: {self.components.cmpt_rank_counter:,}") + + def extract_main_ads_bottom(self): """Extract the main ads section of the SERP""" ads = self.soup.find('div', {'id':'tadsb'}) - if ads: + if ads and webutils.get_text(ads): self.components.add_component(ads, section='main', type='ad') + # -------------------------------------------------------------------------- + # Layout Specifics + # -------------------------------------------------------------------------- + def check_layout_main(self): """Divide and label the page layout""" @@ -134,27 +174,21 @@ def check_layout_main(self): first_match = label_matches[0] if label_matches else None self.layout_label = first_match log.debug(f"layout: {self.layout_label}") - - - def extract_main_components(self, drop_tags: set={'script', 'style', None}): - """Extract main components based on SERP layout""" - log.debug("Extracting main column components") - - self.check_layout_main() - try: - layout_extractor = self.layout_extractors[self.layout_label] - column = layout_extractor(drop_tags) - for component in column: - if Extractor.is_valid_main_component(component): - self.components.add_component(component, section='main') - except KeyError: - raise ValueError(f"no extractor for layout_label: {self.layout_label}") + def extract_from_standard(self, drop_tags: set = {}) -> list: + + if self.layout_divs['rso'].find('div', {'id':'kp-wp-tab-overview'}): + log.debug("layout update: standard-alt-1") + self.layout_label = 'standard-alt' + column = self.layout_divs['rso'].find_all('div', {'class':'TzHB6b'}) + return column + column = Extractor.extract_children(self.layout_divs['rso'], drop_tags) column = [c for c in column if Extractor.is_valid_main_component(c)] + if len(column) == 0: - log.debug("layout update: standard-alt") + log.debug("layout update: standard-alt-0") self.layout_label = 'standard-alt' divs = self.layout_divs['rso'].find_all('div', {'id':'kp-wp-tab-overview'}) column = sum([div.find_all('div', {'class':'TzHB6b'}) for div in divs], []) @@ -180,8 +214,7 @@ def extract_from_top_bar(self, drop_tags: set = {}) -> list: def extract_from_left_bar(self, drop_tags: set = {}) -> list: """Extract components from left-bar layout""" - log.debug("not implemented | may appear in pre-2022 data") - column = [] + column = self.soup.find_all('div', {'class':'TzHB6b'}) return column diff --git a/WebSearcher/webutils.py b/WebSearcher/webutils.py index 4c2bb3a..5eb88ef 100644 --- a/WebSearcher/webutils.py +++ b/WebSearcher/webutils.py @@ -111,8 +111,6 @@ def get_text(soup: BeautifulSoup, name: str=None, attrs: dict={}, separator:str= text = div.get_text(separator=separator) return text.strip() if strip else text - - def get_link(soup: BeautifulSoup, attrs: dict = {}, key: str = 'href') -> str: """Utility for `soup.find('a')['href']` with null key handling""" link = get_div(soup, 'a', attrs) @@ -125,18 +123,19 @@ def get_link_list(soup: BeautifulSoup, attrs: dict = {}, key: str = 'href', filt def find_all_divs(soup: BeautifulSoup, name: str, attrs: dict = {}, filter_empty: bool = True) -> list: divs = soup.find_all(name, attrs) if attrs else soup.find_all(name) - if filter_empty: - divs = [c for c in divs if c] - divs = [c for c in divs if c.text != ''] + divs = filter_empty_divs(divs) if filter_empty else divs + return divs + +def filter_empty_divs(divs): + divs = [c for c in divs if c] + divs = [c for c in divs if c.text != ''] return divs def find_children(soup, name: str, attrs: dict = {}, filter_empty: bool = False): """Find all children of a div with a given name and attribute""" div = get_div(soup, name, attrs) divs = div.children if div else [] - if divs and filter_empty: - divs = [c for c in divs if c] - divs = [c for c in divs if c.text != ''] + divs = filter_empty_divs(divs) if filter_empty else divs return divs diff --git a/setup.py b/setup.py index 3403c02..50e158c 100644 --- a/setup.py +++ b/setup.py @@ -1,56 +1,56 @@ -# WebSearcher - Tools for conducting, collecting, and parsing web search -import setuptools -import codecs -import os - -def read(rel_path): - here = os.path.abspath(os.path.dirname(__file__)) - with codecs.open(os.path.join(here, rel_path), 'r') as fp: - return fp.read() - -def get_version(rel_path): - for line in read(rel_path).splitlines(): - if line.startswith('__version__'): - delim = '"' if '"' in line else "'" - return line.split(delim)[1] - else: - raise RuntimeError("Unable to find version string.") - -def get_readme_descriptions(fp='README.md', s='#', stop_at=2): - with open(fp, 'r') as infile: - # Extract short description (title) and long description - descriptions = {'short': '', 'long': ''} - readme = [l.strip() for l in infile.read().split('\n')] - descriptions['short'] = readme[0].replace('# ', '') - heading_idx = [idx for idx, l in enumerate(readme) if l.startswith(s)] - descriptions['long'] = ' \n'.join(readme[:heading_idx[stop_at]]) - return descriptions - -version = get_version("WebSearcher/__init__.py") -descriptions = get_readme_descriptions() - -setuptools.setup( - name='WebSearcher', - version=version, - url='http://github.com/gitronald/WebSearcher', - author='Ronald E. Robertson', - author_email='rer@acm.org', - license='BSD-3-Clause', - classifiers=[ - 'Programming Language :: Python :: 3', - 'License :: OSI Approved :: BSD License' - ], - description=descriptions['short'], - long_description=descriptions['long'], - long_description_content_type='text/markdown', - packages=setuptools.find_packages(), - install_requires=[ - 'requests', - 'lxml', - 'beautifulsoup4', - 'tldextract', - 'brotli', - 'pydantic' - ], - python_requires='>=3.6' -) +# WebSearcher - Tools for conducting, collecting, and parsing web search +import setuptools +import codecs +import os + +def read(rel_path): + here = os.path.abspath(os.path.dirname(__file__)) + with codecs.open(os.path.join(here, rel_path), 'r') as fp: + return fp.read() + +def get_version(rel_path): + for line in read(rel_path).splitlines(): + if line.startswith('__version__'): + delim = '"' if '"' in line else "'" + return line.split(delim)[1] + else: + raise RuntimeError("Unable to find version string.") + +def get_readme_descriptions(fp='README.md', s='#', stop_at=2): + with open(fp, 'r') as infile: + # Extract short description (title) and long description + descriptions = {'short': '', 'long': ''} + readme = [l.strip() for l in infile.read().split('\n')] + descriptions['short'] = readme[0].replace('# ', '') + heading_idx = [idx for idx, l in enumerate(readme) if l.startswith(s)] + descriptions['long'] = ' \n'.join(readme[:heading_idx[stop_at]]) + return descriptions + +version = get_version("WebSearcher/__init__.py") +descriptions = get_readme_descriptions() + +setuptools.setup( + name='WebSearcher', + version=version, + url='http://github.com/gitronald/WebSearcher', + author='Ronald E. Robertson', + author_email='rer@acm.org', + license='BSD-3-Clause', + classifiers=[ + 'Programming Language :: Python :: 3', + 'License :: OSI Approved :: BSD License' + ], + description=descriptions['short'], + long_description=descriptions['long'], + long_description_content_type='text/markdown', + packages=setuptools.find_packages(), + install_requires=[ + 'requests', + 'lxml', + 'beautifulsoup4', + 'tldextract', + 'brotli', + 'pydantic' + ], + python_requires='>=3.6' +)