Skip to content

Commit

Permalink
Fix linting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Feb 27, 2024
1 parent e9a38e1 commit 76ddfbc
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 18 deletions.
2 changes: 1 addition & 1 deletion sources/google_sheets/helpers/data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def serial_date_to_datetime(
)
# int values are dates, float values are datetimes
if data_type == "date":
return conv_datetime.date()
return conv_datetime.date() # type: ignore

return conv_datetime

Expand Down
6 changes: 3 additions & 3 deletions sources/scraping/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
from dlt.sources import DltResource
from dlt.common.source import _SOURCES, SourceInfo

from scrapy import Spider
from scrapy import Spider # type: ignore

from .helpers import ScrapingConfig, create_pipeline_runner
from .types import P, AnyDict


def run_pipeline(
def run_pipeline( # type: ignore[valid-type]
pipeline: dlt.Pipeline,
spider: t.Type[Spider],
*args: P.args,
Expand Down Expand Up @@ -44,7 +44,7 @@ def run_pipeline(
loader_file_format: TLoaderFileFormat = None
```
"""
options = {}
options: AnyDict = {}
if scrapy_settings:
options["scrapy_settings"] = scrapy_settings

Expand Down
4 changes: 2 additions & 2 deletions sources/scraping/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .queue import ScrapingQueue
from .settings import SOURCE_SCRAPY_QUEUE_SIZE, SOURCE_SCRAPY_SETTINGS
from .runner import ScrapingHost, PipelineRunner, ScrapyRunner, Signals
from .types import AnyDict # type: ignore
from .types import AnyDict


@configspec
Expand Down Expand Up @@ -62,7 +62,7 @@ def create_pipeline_runner(
queue_result_timeout: float = dlt.config.value,
scrapy_settings: t.Optional[AnyDict] = None,
) -> ScrapingHost:
queue = ScrapingQueue(
queue = ScrapingQueue( # type: ignore
maxsize=queue_size,
batch_size=batch_size,
read_timeout=queue_result_timeout,
Expand Down
2 changes: 1 addition & 1 deletion sources/scraping/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class QueueClosedError(Exception):
pass


class ScrapingQueue(_Queue):
class ScrapingQueue(_Queue[T]):
def __init__(
self,
maxsize: int = 0,
Expand Down
22 changes: 12 additions & 10 deletions sources/scraping/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import dlt
from dlt.common import logger
from pydispatch import dispatcher
from pydispatch import dispatcher # type: ignore

from scrapy import signals, Item, Spider # type: ignore
from scrapy.crawler import CrawlerProcess # type: ignore
Expand All @@ -12,23 +12,25 @@
from .types import AnyDict, Runnable, P
from .queue import ScrapingQueue

T = t.TypeVar("T")


class Signals:
def __init__(self, pipeline_name: str, queue: ScrapingQueue) -> None:
def __init__(self, pipeline_name: str, queue: ScrapingQueue[T]) -> None:
self.queue = queue
self.pipeline_name = pipeline_name

def on_item_scraped(self, item: Item) -> None:
if not self.queue.is_closed:
self.queue.put(item) # type: ignore
self.queue.put(item)
else:
logger.info(
"Queue is closed ",
extra={"pipeline_name": self.pipeline_name},
)
raise CloseSpider("Queue is closed")

def on_spider_opened(self):
def on_spider_opened(self) -> None:
if self.queue.is_closed:
raise CloseSpider("Queue is closed")

Expand All @@ -37,7 +39,7 @@ def on_engine_stopped(self) -> None:
self.queue.join()
self.queue.close()

def __enter__(self):
def __enter__(self) -> None:
# There might be an edge case when Scrapy opens a new spider but
# the queue has already been closed thus rendering endless wait
dispatcher.connect(self.on_spider_opened, signals.spider_opened)
Expand All @@ -49,7 +51,7 @@ def __enter__(self):
# Once crawling engine stops we would like to know about it as well.
dispatcher.connect(self.on_engine_stopped, signals.engine_stopped)

def __exit__(self, exc_type, exc_val, exc_tb):
def __exit__(self, exc_type: t.Any, exc_val: t.Any, exc_tb: t.Any) -> None:
dispatcher.disconnect(self.on_spider_opened, signals.spider_opened)
dispatcher.disconnect(self.on_item_scraped, signals.item_scraped)
dispatcher.disconnect(self.on_engine_stopped, signals.engine_stopped)
Expand Down Expand Up @@ -98,7 +100,7 @@ class PipelineRunner(Runnable):
def __init__(
self,
pipeline: dlt.Pipeline,
queue: ScrapingQueue,
queue: ScrapingQueue[T],
) -> None:
self.pipeline = pipeline
self.queue = queue
Expand All @@ -117,7 +119,7 @@ def __init__(
name=resource_name,
)

def run( # type: ignore[override]
def run(
self,
*args: P.args,
**kwargs: P.kwargs,
Expand All @@ -140,7 +142,7 @@ def run( # type: ignore[override]

def run() -> None:
try:
self.pipeline.run(self.scraping_resource, **kwargs)
self.pipeline.run(self.scraping_resource, **kwargs) # type: ignore[arg-type]
except Exception:
logger.error("Error during pipeline.run call, closing the queue")
raise
Expand All @@ -157,7 +159,7 @@ class ScrapingHost:

def __init__(
self,
queue: ScrapingQueue,
queue: ScrapingQueue[T],
scrapy_runner: ScrapyRunner,
pipeline_runner: PipelineRunner,
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion sources/scraping_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def scrape_quotes_callback_access_resource() -> None:
dataset_name="quotes",
)

def on_before_start(res: DltResource):
def on_before_start(res: DltResource) -> None:
res.add_limit(2)

run_pipeline(
Expand Down

0 comments on commit 76ddfbc

Please sign in to comment.