Skip to content

Commit

Permalink
Cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Feb 22, 2024
1 parent dcc16b1 commit f9eb79d
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 23 deletions.
14 changes: 11 additions & 3 deletions sources/scraping/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,24 @@ def on_engine_stopped() -> None:
queue.close()

scrapy_runner = ScrapyRunner(
queue=queue,
spider=spider,
start_urls=resolve_start_urls(start_urls),
settings=SOURCE_SCRAPY_SETTINGS,
on_item_scraped=on_item_scraped,
on_engine_stopped=on_engine_stopped,
)

pipeline_runner = PipelineRunner(pipeline=pipeline)
scraping_host = ScrapingHost(scrapy_runner, pipeline_runner)
pipeline_runner = PipelineRunner(
pipeline=pipeline,
queue=queue,
)

scraping_host = ScrapingHost(
queue,
scrapy_runner,
pipeline_runner,
)

return scraping_host


Expand Down
10 changes: 7 additions & 3 deletions sources/scraping/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import dlt
from dlt.common import logger
from pydispatch import dispatcher

from scrapy import signals, Item, Spider # type: ignore
from scrapy.crawler import CrawlerProcess # type: ignore
Expand Down Expand Up @@ -39,8 +40,8 @@ def run(self, *args: P.args, **kwargs: P.kwargs) -> t.Any:
)

# Setup signals
crawler.signals.connect(self.on_item_scraped, signals.item_scraped)
crawler.signals.connect(self.on_engien_stopped, signals.engine_stopped)
dispatcher.connect(self.on_item_scraped, signals.item_scraped)
dispatcher.connect(self.on_engien_stopped, signals.engine_stopped)

try:
crawler.start()
Expand Down Expand Up @@ -117,7 +118,10 @@ def run(
pipeline_worker = self.pipeline_runner.run(
# Queue get_batches is a generator so we can
# pass it to pipeline.run and dlt will handle the rest.
self.queue.get_batches(),
dlt.resource(
self.queue.get_batches,
name=f"{self.pipeline_runner.pipeline.pipeline_name}_results",
),
*args,
**kwargs,
)
Expand Down
2 changes: 1 addition & 1 deletion sources/scraping/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"TELNETCONSOLE_ENABLED": False,
# How many sub pages to scrape
# https://docs.scrapy.org/en/latest/topics/settings.html#depth-limit
"DEPTH_LIMIT": 2,
"DEPTH_LIMIT": 0,
"SPIDER_MIDDLEWARES": {
"scrapy.spidermiddlewares.depth.DepthMiddleware": 200,
"scrapy.spidermiddlewares.httperror.HttpErrorMiddleware": 300,
Expand Down
20 changes: 4 additions & 16 deletions sources/scraping_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from typing import Any

import dlt
from dlt.common import logger
from scrapy import Spider # type: ignore
from scrapy.http import Response # type: ignore

from scraping import scrapy_source
from scraping.helpers import create_pipeline_runner
from scraping.helpers import run_pipeline


class MySpider(Spider):
Expand All @@ -30,24 +28,14 @@ def scrape_quotes() -> None:
pipeline = dlt.pipeline(
pipeline_name="scraping",
destination="duckdb",
dataset_name="quotes",
)

pipeline_runner, scrapy_runner, wait = create_pipeline_runner(
pipeline, spider=MySpider
)

logger.info("Starting pipeline")
pipeline_runner.run(
scrapy_source(scrapy_runner.queue),
run_pipeline(
pipeline,
MySpider,
write_disposition="replace",
table_name="quotes",
)

logger.info("Starting scrapy crawler")
scrapy_runner.run()
wait()


if __name__ == "__main__":
scrape_quotes()

0 comments on commit f9eb79d

Please sign in to comment.