Skip to content

Commit

Permalink
Close the queue after timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 4, 2024
1 parent 65a5019 commit 7bfdc80
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
26 changes: 10 additions & 16 deletions tests/scraping/test_scraping_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@

import dlt
import pytest
from dlt.sources import DltResource
from twisted.internet import reactor

import sources.scraping.helpers
import sources.scraping.queue
import sources.scraping.runner
from twisted.internet import reactor

from sources.scraping import run_pipeline
from sources.scraping.helpers import create_pipeline_runner
from sources.scraping.queue import ScrapingQueue
from sources.scraping.runner import PipelineRunner

Expand Down Expand Up @@ -60,19 +57,16 @@ def test_pipeline_runners_handle_extended_and_simple_use_cases(
destination="duckdb",
)

spy_on_queue_put = mocker.spy(sources.scraping.queue.ScrapingQueue, "put")
spy_on_queue_close = mocker.spy(sources.scraping.queue.ScrapingQueue, "close")
spy_on_queue_put = mocker.spy(ScrapingQueue, "put")
spy_on_queue_close = mocker.spy(ScrapingQueue, "close")
spy_on_crawler_process = mocker.spy(TestCrawlerProcess, "stop")
scraping_host = create_pipeline_runner(pipeline, MySpider, batch_size=10)
scraping_host.pipeline_runner.scraping_resource.add_limit(2)

def on_before_start(res: DltResource) -> None:
res.add_limit(2)

run_pipeline(
pipeline,
MySpider,
on_before_start=on_before_start,
batch_size=10,
)
# Make sure we close the queue to let the scraping to shut down
# in testing machine
queue_closer(scraping_host.queue, close_after_seconds=30)
scraping_host.run(dataset_name="quotes", write_disposition="append")

table_expect_at_least_n_records("scraping_res_add_limit_results", 20, pipeline)
table_expect_at_least_n_records(
Expand Down
12 changes: 10 additions & 2 deletions tests/scraping/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ def queue_closer(
queue: ScrapingQueue, close_after_seconds: float = 1.0
) -> threading.Thread:
def close_queue():
time.sleep(close_after_seconds)
queue.close()
slept: int = 0
while True:
time.sleep(1)
slept += 1
if queue.is_closed:
return

if slept >= close_after_seconds:
queue.close()
break

closer = threading.Thread(target=close_queue)
closer.start()
Expand Down

0 comments on commit 7bfdc80

Please sign in to comment.