From efe28ab29d1af0a46f133e3e0c63790ddf4dafc0 Mon Sep 17 00:00:00 2001 From: David Chiu Date: Thu, 8 Aug 2024 14:02:44 +0800 Subject: [PATCH] feat: parallel crawling and refactor --- sync_crawler/crawlers/ltn_crawler.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/sync_crawler/crawlers/ltn_crawler.py b/sync_crawler/crawlers/ltn_crawler.py index b177f00..31084f3 100644 --- a/sync_crawler/crawlers/ltn_crawler.py +++ b/sync_crawler/crawlers/ltn_crawler.py @@ -1,4 +1,7 @@ +import json +import logging from collections.abc import Iterable +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import datetime from itertools import count @@ -22,19 +25,24 @@ class LtnCrawler(BaseCrawler): metadata_api = "https://news.ltn.com.tw/ajax/breakingnews/all/{}" def read(self, start_from: datetime) -> Iterable[News]: - ltn_metadatas: list[LtnNewsMetadata] = [] - for page in count(1, step=1): - metadatas = list(self._fetch_metadata(page)) - - if metadatas[-1].publish_time < start_from: - metadatas = filter(lambda x: x.publish_time >= start_from, metadatas) - ltn_metadatas.extend(metadatas) + try: + metadatas = list(self._fetch_metadata(page)) + except json.JSONDecodeError: + logging.error(f"Failed to fetch metadata from page {page}.") break - ltn_metadatas.extend(metadatas) + with ThreadPoolExecutor() as executor: + news = executor.map( + self._crawl_news, + filter(lambda x: x.publish_time >= start_from, metadatas), + ) + news = filter(lambda x: x is not None, news) - return filter(lambda x: x is not None, map(self._crawl_news, ltn_metadatas)) + yield from news + + if metadatas[-1].publish_time < start_from: + break @ignore_exception def _crawl_news(self, metadata: LtnNewsMetadata) -> News: