diff --git a/final_project/crawler/__init__.py b/final_project/crawler/__init__.py index 31e11d3..6b112c0 100644 --- a/final_project/crawler/__init__.py +++ b/final_project/crawler/__init__.py @@ -1,10 +1,13 @@ # web crawler +import logging import pathlib import click from crawler.crawl import crawl +logger = logging.getLogger(__name__) + @click.command("Crawler") @click.argument("url") @@ -14,4 +17,6 @@ def main(url: str, storage_directory: str): def __main__(): - main() + logging.basicConfig(level="INFO") + logger.info("Started crawler!") + # main() diff --git a/final_project/crawler/crawl.py b/final_project/crawler/crawl.py index 3a46423..62810a7 100644 --- a/final_project/crawler/crawl.py +++ b/final_project/crawler/crawl.py @@ -1,7 +1,12 @@ +import logging import pathlib import time -from queue import Queue -from threading import Lock, Thread +from concurrent.futures.thread import ThreadPoolExecutor +from dataclasses import dataclass +from enum import Enum +from queue import Empty, Queue +from threading import Lock +from typing import Union import bs4 import numpy as np @@ -10,12 +15,64 @@ from crawler.storage import WebsiteStorage from crawler.utils import RE_BASE, RE_HTML, RE_STRIPBASE, normalize_url +NUM_WORKERS = 10 +JOB_TIMEOUT_WAIT = 10 -def download_webpage(url: str) -> str: + +logger = logging.getLogger(__name__) +logging.basicConfig(level="INFO", format="%(levelname)s: %(message)s") + +job_queue = Queue() +visited = set() +visited_lock = Lock() + + +def crawl_page(worker_id: int, url: str, storage: WebsiteStorage) -> bool: + with visited_lock: + if url in visited: + return False + + with visited_lock: + visited.add(url) + + logger.info(f"Worker %d | Downloading %s...", worker_id, url) + if (webpage := download_webpage(worker_id, url)) is None: + return False + + storage.insert(url, webpage) + links = parse_links(webpage, url) + for link in links: + with visited_lock: + if link not in visited: + job_queue.put(link) + return True + + +def worker(worker_id: int, storage: WebsiteStorage) -> None: + logger.info("Worker %d | Started crawl, waiting for requests", worker_id) + shutdown = False + rng = np.random.default_rng() + while not shutdown: + try: + url = job_queue.get(timeout=JOB_TIMEOUT_WAIT) + if crawl_page(worker_id, url, storage): + delay = rng.uniform(10, 20) + logger.info("Worker %d | Sleeping for %f seconds", worker_id, delay) + time.sleep(delay) + except Empty: + logger.info("Worker %d | Wait for job timed out, shutting down", worker_id) + shutdown = True + + +def download_webpage(worker_id: int, url: str) -> str: headers = {"User-Agent": "Mozilla/5.0"} response = requests.get(url, headers) if response.status_code >= 400: - print(f"Request failed with error code {response.status_code}, skipping") + logger.warn( + "Worker %d | Request failed with error code %d, skipping", + worker_id, + response.status_code, + ) return None return response.text @@ -42,30 +99,26 @@ def parse_links(content: str, prefix_url: str): def crawl(url: str, storage_directory: pathlib.Path): + structure_path = storage_directory / pathlib.Path("structure.json") + if structure_path.exists(): + print( + f"Existing crawl data found at {str(storage_directory)}, would you like to recrawl?" + ) + response = input("> [y/n]: ") + while response != "y" and response != "n": + print(f"Invalid response: {response}") + response = input("> [y/n]: ") + if response == "n": + print("Exiting crawler") + return + webpath = storage_directory / pathlib.Path(normalize_url(url)) webpath.mkdir(parents=True, exist_ok=True) - stack = [url] storage = WebsiteStorage(storage_directory) - visited = set() - - rng = np.random.default_rng() - - while len(stack) > 0: - delay = rng.integers(2, 15) - current_url = stack[-1] - stack.pop() - if current_url in visited: - continue - print(f"Downloading {current_url}...", end="") - - if (webpage := download_webpage(current_url)) is None: - continue - storage.insert(current_url, webpage) - visited.add(current_url) - - stack.extend(parse_links(webpage, current_url)) - print(f"DONE, sleeping for {delay} seconds") - time.sleep(delay) - + job_queue.put(url) + with ThreadPoolExecutor(NUM_WORKERS) as executor: + for index in range(NUM_WORKERS): + executor.submit(worker, index, storage) + time.sleep(1) # this is done to offset the wait times storage.write() diff --git a/final_project/crawler/storage.py b/final_project/crawler/storage.py index bc43b79..fdca9a2 100755 --- a/final_project/crawler/storage.py +++ b/final_project/crawler/storage.py @@ -7,7 +7,7 @@ from threading import Lock from typing import List, Union -from crawler.utils import RE_BASE, RE_HTML, NodeType, normalize_url +from crawler.utils import RE_BASE, RE_HTML, RE_TRAILING_SLASH, NodeType, normalize_url from crawler.website_node import WebsiteNode, WebsiteNodeDecoder, WebsiteNodeEncoder @@ -150,5 +150,8 @@ def pprint(self): def insert(self, url: str, data: str): """Insert a webpage url into the tree structure.""" + url = RE_TRAILING_SLASH.sub("", url) + if not RE_HTML.search(url): + url += "/index.html" url = RE_BASE.sub("", url) self.__insert(self.root, [""] + url.split("/"), 0, data) diff --git a/final_project/crawler/worker.py b/final_project/crawler/worker.py deleted file mode 100644 index e69de29..0000000