Skip to content

Commit

Permalink
Simplify helpers code
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Feb 22, 2024
1 parent c39a26f commit dcc16b1
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 181 deletions.
72 changes: 0 additions & 72 deletions sources/scraping/__init__.py
Original file line number Diff line number Diff line change
@@ -1,72 +0,0 @@
"""Scraping source using scrapy"""
from typing import Any, Iterator, List, TypeVar
from queue import Empty

import dlt

from dlt.common import logger
from dlt.common.configuration.inject import with_config

from .helpers import ScrapingConfig
from .queue import BaseQueue, QueueClosedError


T = TypeVar("T")


@dlt.source
def scrapy_source(queue: BaseQueue[T]) -> Iterator[Any]:
yield scrapy_resource(queue)


@with_config(sections=("sources", "scraping"), spec=ScrapingConfig)
def scrapy_resource(
queue: BaseQueue[T],
batch_size: int = dlt.config.value,
queue_result_timeout: int = dlt.config.value,
) -> Iterator[Any]:
"""Scrapy resource to retrieve scraped items from the queue
Args:
queue(BaseQueue[T]): Queue instance
queue_result_timeout(int): timeout to wait for items in the queue
Returns:
Iterator[Any]: yields scraped items one by one
"""
batch: List[T] = []
num_batches = 0
while True:
if len(batch) >= batch_size:
num_batches += 1
yield batch
batch = []

try:
if queue.is_closed:
raise QueueClosedError("Queue is closed")

result = queue.get(timeout=queue_result_timeout)
batch.append(result)

# Mark task as completed
queue.task_done()
except Empty:
logger.info(f"Queue has been empty for {queue_result_timeout}s...")

# Return the current batch
if batch:
num_batches += 1
yield batch
batch = []
except QueueClosedError:
logger.info("Queue is closed, stopping...")

# Return the last batch before exiting
if batch:
num_batches += 1
yield batch

logger.info(f"Loaded {num_batches} batches")

break
156 changes: 47 additions & 109 deletions sources/scraping/helpers.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
import threading
import typing as t

import dlt
import scrapy # type: ignore

from dlt.common import logger
from dlt.common.configuration.inject import with_config
from dlt.common.configuration.specs.base_configuration import (
configspec,
BaseConfiguration,
)

from scrapy import signals, Item # type: ignore
from scrapy.exceptions import CloseSpider # type: ignore
from scrapy.crawler import CrawlerProcess # type: ignore


from .types import AnyDict
from .queue import BaseQueue
from .settings import SOURCE_SCRAPY_SETTINGS, SOURCE_SCRAPY_QUEUE_SIZE


T = t.TypeVar("T")

from scrapy import Item, Spider # type: ignore
from scrapy.exceptions import CloseSpider # type: ignore

class Runnable(t.Protocol):
def run(self, *args: t.Any, **kwargs: AnyDict) -> Any:
pass
from .settings import SOURCE_SCRAPY_QUEUE_SIZE, SOURCE_SCRAPY_SETTINGS
from .queue import ScrapingQueue
from .runner import ScrapingHost, PipelineRunner, ScrapyRunner
from .types import StartUrls, P


@configspec
Expand All @@ -41,95 +30,29 @@ class ScrapingConfig(BaseConfiguration):
queue_result_timeout: t.Optional[int] = 5

# List of start urls
start_urls: t.Optional[t.List[str]] = None


class ScrapyRunner(Runnable):
def __init__(
self,
queue: BaseQueue[T],
spider: t.Type[scrapy.Spider],
start_urls: t.List[str],
settings: AnyDict,
on_item_scraped: t.Callable[[Item], None],
on_engine_stopped: t.Callable[[], None],
) -> None:
self.queue = queue
self.spider = spider
self.settings = settings
self.start_urls = start_urls
self.on_item_scraped = on_item_scraped
self.on_engien_stopped = on_engine_stopped

def run(self, *args: t.Any, **kwargs: AnyDict) -> None:
crawler = CrawlerProcess(settings=self.settings)
crawler.crawl(
self.spider,
queue=self.queue,
name="scraping_spider",
start_urls=self.start_urls,
**kwargs,
)
crawler.signals.connect(self.on_item_scraped, signals.item_scraped)
crawler.signals.connect(self.on_engien_stopped, signals.engine_stopped)
crawler.start()

class PipelineRunner(Runnable):
def __init__(
self,
pipeline: dlt.Pipeline,
queue: BaseQueue[T],
) -> None:
self.pipeline = pipeline
self.queue = queue

def run( # type: ignore[override]
self,
data: t.Any,
*args: t.Any,
**kwargs: t.Any,
) -> None:
"""You can use all regular dlt.pipeline.run() arguments
```
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: TWriteDisposition = None,
columns: TAnySchemaColumns = None,
primary_key: TColumnNames = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None
```
"""
start_urls: StartUrls = None

def run() -> None:
try:
self.pipeline.run(data, **kwargs)
except Exception:
logger.error("Error during pipeline.run call, closing the queue")
self.queue.close()
raise

self.thread_runner = threading.Thread(target=run)
self.thread_runner.start()
def resolve_start_urls(path: StartUrls) -> t.List[str]:
if isinstance(path, str):
with open(path) as fp:
return fp.readlines()

def join(self) -> None:
self.thread_runner.join()
return path or []


@with_config(sections=("sources", "scraping"), spec=ScrapingConfig)
def create_pipeline_runner(
pipeline: dlt.Pipeline,
spider: t.Type[Spider],
queue_size: int = dlt.config.value,
spider: t.Type[scrapy.Spider] = None,
start_urls: t.List[str] = dlt.config.value,
) -> t.Tuple[PipelineRunner, ScrapyRunner, t.Callable[[], None]]:
queue = BaseQueue(maxsize=queue_size)

settings = {**SOURCE_SCRAPY_SETTINGS}
queue_result_timeout: int = dlt.config.value,
start_urls: StartUrls = dlt.config.value,
) -> ScrapingHost:
queue = ScrapingQueue(
maxsize=queue_size,
read_timeout=queue_result_timeout,
)

def on_item_scraped(item: Item) -> None:
if not queue.is_closed:
Expand All @@ -140,29 +63,44 @@ def on_item_scraped(item: Item) -> None:

def on_engine_stopped() -> None:
queue.join()
if not queue.is_closed:
queue.close()
queue.close()

scrapy_runner = ScrapyRunner(
queue=queue,
spider=spider,
start_urls=start_urls,
settings=settings,
start_urls=resolve_start_urls(start_urls),
settings=SOURCE_SCRAPY_SETTINGS,
on_item_scraped=on_item_scraped,
on_engine_stopped=on_engine_stopped,
)

pipeline_runner = PipelineRunner(pipeline=pipeline)

def wait_for_results() -> None:
pipeline_runner.join()

return pipeline_runner, scrapy_runner, wait_for_results
scraping_host = ScrapingHost(scrapy_runner, pipeline_runner)
return scraping_host


def run_pipeline(
pipeline: dlt.Pipeline,
spider: t.Type[scrapy.Spider],
urls: t.List[str],
spider: t.Type[Spider],
*args: P.args,
**kwargs: P.kwargs,
) -> None:
create_pipeline_runner(pipeline=pipeline, spider=spider, )
"""Simple runner for the scraping pipeline
You can pass all parameters via kwargs to `dlt.pipeline.run(....)`
```
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: TWriteDisposition = None,
columns: TAnySchemaColumns = None,
primary_key: TColumnNames = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None
```
"""
scraping_host = create_pipeline_runner(pipeline, spider)
scraping_host.run(*args, **kwargs)

0 comments on commit dcc16b1

Please sign in to comment.