From a976c9f277c170df48986b2137e05f1b8d69c650 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 Nov 2024 06:29:02 -0600 Subject: [PATCH 01/17] Fixes #1097 --- .envs/.local/.django | 2 +- local.yml | 25 +++++++---- sde_collections/sinequa_api.py | 81 +++++++++++++++++++++++++--------- sde_collections/tasks.py | 31 ++++--------- 4 files changed, 85 insertions(+), 54 deletions(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 0978166d..9e7b56c4 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -46,4 +46,4 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- LRM_DEV_TOKEN='' -XLI_TOKEN='' +XLI_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI3OTAzMzAzLCJzaWQiOiJCRDkwN0Q4QzJCMjg0MDA2ODQ5OEZFOENCRjdEODQwNiIsImtpbmQiOiJhY2Nlc3MiLCJleHAiOjE3MzU2NzkzMDMsInN1YiI6IlNpbmVxdWF8Z3JhX3VzZXJzIn0.o1a3eDPgEWdoHu7S8KQi0wMw_brxfAM1lClbfncVQVI' diff --git a/local.yml b/local.yml index ebdb810b..84893914 100644 --- a/local.yml +++ b/local.yml @@ -54,14 +54,23 @@ services: container_name: sde_indexing_helper_local_redis celeryworker: - <<: *django - image: sde_indexing_helper_local_celeryworker - container_name: sde_indexing_helper_local_celeryworker - depends_on: - - redis - - postgres - ports: [] - command: /start-celeryworker + <<: *django + image: sde_indexing_helper_local_celeryworker + container_name: sde_indexing_helper_local_celeryworker + depends_on: + - redis + - postgres + ports: [] + command: /start-celeryworker + deploy: + resources: + limits: + cpus: '4.0' + memory: 8G + reservations: + cpus: '2.0' + memory: 4G + # celerybeat: # <<: *django diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index 868afb77..f3e7dda3 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -1,10 +1,10 @@ import json from typing import Any - import requests import urllib3 from django.conf import settings - +from .models.delta_url import DumpUrl +from django.db import transaction urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) server_configs = { @@ -134,26 +134,64 @@ def query(self, page: int, collection_config_folder: str | None = None, source: payload["query"]["advanced"]["collection"] = f"/{source}/{collection_config_folder}/" return self.process_response(url, payload) - - def sql_query(self, sql: str) -> Any: - """Executes an SQL query on the configured server using token-based authentication.""" + def sql_query(self, sql: str, collection) -> Any: + """Executes an SQL query on the configured server using token-based authentication with pagination.""" token = self._get_token() if not token: raise ValueError("A token is required to use the SQL endpoint") - - url = f"{self.base_url}/api/v1/engine.sql" - headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} - raw_payload = json.dumps( - { + + page = 0 + page_size = 5000 # Number of records per page + skip_records = 0 + + while True: + paginated_sql = f"{sql} SKIP {skip_records} COUNT {page_size}" + url = f"{self.base_url}/api/v1/engine.sql" + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} + raw_payload = json.dumps({ "method": "engine.sql", - "sql": sql, + "sql": paginated_sql, "pretty": True, - } - ) - - return self.process_response(url, headers=headers, raw_data=raw_payload) - - def get_full_texts(self, collection_config_folder: str, source: str = None) -> Any: + }) + + response = self.process_response(url, headers=headers, raw_data=raw_payload) + batch_data = response.get('Rows', []) + total_row_count = response.get('TotalRowCount', 0) + processed_response = self._process_full_text_response(response) + self.process_and_update_data(processed_response, collection) + print(f"Batch {page + 1} is being processed and updated") + + # Check if all rows have been fetched + if len(batch_data) == 0 or (skip_records + page_size) >= total_row_count: + break + + page += 1 + skip_records += page_size + + return f"All {total_row_count} records have been processed and updated." + + def process_and_update_data(self, batch_data, collection): + for record in batch_data: + try: + with transaction.atomic(): + url = record['url'] + scraped_text = record.get('full_text', '') + scraped_title = record.get('title', '') + # Ensure the collection is included in the defaults + DumpUrl.objects.update_or_create( + url=url, + defaults={ + 'scraped_text': scraped_text, + 'scraped_title': scraped_title, + 'collection': collection + } + ) + except KeyError as e: + print(f"Missing key in data: {str(e)}") + except Exception as e: + print(f"Error processing record: {str(e)}") + + def get_full_texts(self, collection_config_folder: str, source: str = None, collection=None) -> Any: """ Retrieves the full texts, URLs, and titles for a specified collection. @@ -184,11 +222,10 @@ def get_full_texts(self, collection_config_folder: str, source: str = None) -> A raise ValueError("Index not defined for this server") sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'" - full_text_response = self.sql_query(sql) - return self._process_full_text_response(full_text_response) - + return self.sql_query(sql,collection) @staticmethod - def _process_full_text_response(full_text_response: str): + def _process_full_text_response(batch_data:str): return [ - {"url": url, "full_text": full_text, "title": title} for url, full_text, title in full_text_response["Rows"] + {"url": url, "full_text": full_text, "title": title} for url, full_text, title in batch_data["Rows"] ] + \ No newline at end of file diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index 47c96338..572eccdc 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -145,13 +145,11 @@ def resolve_title_pattern(title_pattern_id): title_pattern.apply() -@celery_app.task +@celery_app.task(soft_time_limit=600) def fetch_and_replace_full_text(collection_id, server_name): """ - Task to fetch and replace full text and metadata for all URLs associated with a specified collection - from a given server. This task deletes all existing DumpUrl entries for the collection and creates - new entries based on the latest fetched data. - + Task to initiate fetching and replacing full text and metadata for all URLs associated with a specified collection + from a given server. Args: collection_id (int): The identifier for the collection in the database. server_name (str): The name of the server. @@ -161,28 +159,15 @@ def fetch_and_replace_full_text(collection_id, server_name): """ collection = Collection.objects.get(id=collection_id) api = Api(server_name) - documents = api.get_full_texts(collection.config_folder) # Step 1: Delete all existing DumpUrl entries for the collection deleted_count, _ = DumpUrl.objects.filter(collection=collection).delete() + print(f"Deleted {deleted_count} old records.") - # Step 2: Create new DumpUrl entries from the fetched documents - processed_count = 0 - for doc in documents: - try: - DumpUrl.objects.create( - url=doc["url"], - collection=collection, - scraped_text=doc.get("full_text", ""), - scraped_title=doc.get("title", ""), - ) - processed_count += 1 - except IntegrityError: - # Handle duplicate URL case if needed - print(f"Duplicate URL found, skipping: {doc['url']}") + # Step 2: Fetch and process new data + result_message = api.get_full_texts(collection.config_folder,collection=collection) + # Step 3: Migrate DumpUrl to DeltaUrl collection.migrate_dump_to_delta() - print(f"Processed {processed_count} new records.") - - return f"Successfully processed {len(documents)} records and updated the database." + return result_message From 663457613ac301d613c08949406f2dfa4b5ed92d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:53:46 +0000 Subject: [PATCH 02/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- sde_collections/sinequa_api.py | 52 ++++++++++++++++++---------------- sde_collections/tasks.py | 2 +- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index f3e7dda3..c16277d5 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -1,10 +1,13 @@ import json from typing import Any + import requests import urllib3 from django.conf import settings -from .models.delta_url import DumpUrl from django.db import transaction + +from .models.delta_url import DumpUrl + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) server_configs = { @@ -134,29 +137,32 @@ def query(self, page: int, collection_config_folder: str | None = None, source: payload["query"]["advanced"]["collection"] = f"/{source}/{collection_config_folder}/" return self.process_response(url, payload) + def sql_query(self, sql: str, collection) -> Any: """Executes an SQL query on the configured server using token-based authentication with pagination.""" token = self._get_token() if not token: raise ValueError("A token is required to use the SQL endpoint") - + page = 0 page_size = 5000 # Number of records per page - skip_records = 0 + skip_records = 0 while True: paginated_sql = f"{sql} SKIP {skip_records} COUNT {page_size}" url = f"{self.base_url}/api/v1/engine.sql" headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} - raw_payload = json.dumps({ - "method": "engine.sql", - "sql": paginated_sql, - "pretty": True, - }) + raw_payload = json.dumps( + { + "method": "engine.sql", + "sql": paginated_sql, + "pretty": True, + } + ) response = self.process_response(url, headers=headers, raw_data=raw_payload) - batch_data = response.get('Rows', []) - total_row_count = response.get('TotalRowCount', 0) + batch_data = response.get("Rows", []) + total_row_count = response.get("TotalRowCount", 0) processed_response = self._process_full_text_response(response) self.process_and_update_data(processed_response, collection) print(f"Batch {page + 1} is being processed and updated") @@ -174,17 +180,17 @@ def process_and_update_data(self, batch_data, collection): for record in batch_data: try: with transaction.atomic(): - url = record['url'] - scraped_text = record.get('full_text', '') - scraped_title = record.get('title', '') + url = record["url"] + scraped_text = record.get("full_text", "") + scraped_title = record.get("title", "") # Ensure the collection is included in the defaults DumpUrl.objects.update_or_create( - url=url, + url=url, defaults={ - 'scraped_text': scraped_text, - 'scraped_title': scraped_title, - 'collection': collection - } + "scraped_text": scraped_text, + "scraped_title": scraped_title, + "collection": collection, + }, ) except KeyError as e: print(f"Missing key in data: {str(e)}") @@ -222,10 +228,8 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll raise ValueError("Index not defined for this server") sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'" - return self.sql_query(sql,collection) + return self.sql_query(sql, collection) + @staticmethod - def _process_full_text_response(batch_data:str): - return [ - {"url": url, "full_text": full_text, "title": title} for url, full_text, title in batch_data["Rows"] - ] - \ No newline at end of file + def _process_full_text_response(batch_data: str): + return [{"url": url, "full_text": full_text, "title": title} for url, full_text, title in batch_data["Rows"]] diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index 572eccdc..5a4bb3be 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -165,7 +165,7 @@ def fetch_and_replace_full_text(collection_id, server_name): print(f"Deleted {deleted_count} old records.") # Step 2: Fetch and process new data - result_message = api.get_full_texts(collection.config_folder,collection=collection) + result_message = api.get_full_texts(collection.config_folder, collection=collection) # Step 3: Migrate DumpUrl to DeltaUrl collection.migrate_dump_to_delta() From 05ec13bd8c6ee135963b7a98cfe6829d3c69d2a3 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 Nov 2024 06:57:03 -0600 Subject: [PATCH 03/17] Updated_#1097 --- sde_collections/tasks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index 5a4bb3be..188147ac 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -7,7 +7,6 @@ from django.conf import settings from django.core import management from django.core.management.commands import loaddata -from django.db import IntegrityError from config import celery_app From df4a1de1c0a8415a3e050a5d438d071b9d79d6ed Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 Nov 2024 07:00:14 -0600 Subject: [PATCH 04/17] Fixes_Issue__#1097 --- .envs/.local/.django | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 9e7b56c4..54c76263 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -46,4 +46,4 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- LRM_DEV_TOKEN='' -XLI_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI3OTAzMzAzLCJzaWQiOiJCRDkwN0Q4QzJCMjg0MDA2ODQ5OEZFOENCRjdEODQwNiIsImtpbmQiOiJhY2Nlc3MiLCJleHAiOjE3MzU2NzkzMDMsInN1YiI6IlNpbmVxdWF8Z3JhX3VzZXJzIn0.o1a3eDPgEWdoHu7S8KQi0wMw_brxfAM1lClbfncVQVI' +XLI_TOKEN='' \ No newline at end of file From 03ce0e63bfa643e717f2acba946bf7719c44824f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:00:42 +0000 Subject: [PATCH 05/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .envs/.local/.django | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 54c76263..0978166d 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -46,4 +46,4 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- LRM_DEV_TOKEN='' -XLI_TOKEN='' \ No newline at end of file +XLI_TOKEN='' From 89756bfc769c17583235ec998d38c452cd05b182 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 Nov 2024 08:30:10 -0600 Subject: [PATCH 06/17] Include Api tests #1097 --- sde_collections/tests/api_tests.py | 159 +++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 sde_collections/tests/api_tests.py diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py new file mode 100644 index 00000000..7f34e45f --- /dev/null +++ b/sde_collections/tests/api_tests.py @@ -0,0 +1,159 @@ +#docker-compose -f local.yml run --rm django pytest sde_collections/tests/api_tests.py +import pytest +from unittest.mock import patch, MagicMock +from django.utils import timezone +from sde_collections.models.collection import Collection, WorkflowStatusChoices +from sde_collections.models.delta_url import DumpUrl +from sde_collections.tests.factories import CollectionFactory, UserFactory +from sde_collections.sinequa_api import Api +from sde_collections.tasks import fetch_and_replace_full_text + + +@pytest.mark.django_db +class TestApiClass: + @pytest.fixture + def collection(self): + """Fixture to create a collection object for testing.""" + user = UserFactory() + return CollectionFactory( + curated_by=user, + curation_started=timezone.now(), + config_folder="example_config", + workflow_status=WorkflowStatusChoices.RESEARCH_IN_PROGRESS + ) + + @pytest.fixture + def api_instance(self): + """Fixture to create an Api instance with mocked server configs.""" + with patch("sde_collections.sinequa_api.server_configs", { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index" + } + }): + return Api(server_name="test_server", user="test_user", password="test_pass", token="test_token") + + @patch("requests.post") + def test_process_response_success(self, mock_post, api_instance): + """Test that process_response handles successful responses.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"key": "value"} + mock_post.return_value = mock_response + + response = api_instance.process_response("http://example.com", payload={"test": "data"}) + assert response == {"key": "value"} + + @patch("requests.post") + def test_process_response_failure(self, mock_post, api_instance): + """Test that process_response raises an exception on failure.""" + mock_response = MagicMock() + mock_response.status_code = 500 + mock_post.return_value = mock_response + mock_response.raise_for_status.side_effect = Exception("Internal Server Error") + + with pytest.raises(Exception, match="Internal Server Error"): + api_instance.process_response("http://example.com", payload={"test": "data"}) + + @patch("sde_collections.sinequa_api.Api.process_response") + def test_query(self, mock_process_response, api_instance): + """Test that query sends correct payload and processes response.""" + mock_process_response.return_value = {"result": "success"} + response = api_instance.query(page=1, collection_config_folder="folder") + assert response == {"result": "success"} + + @patch("sde_collections.sinequa_api.Api.process_response") + def test_sql_query(self, mock_process_response, api_instance, collection): + """Test SQL query execution and response processing.""" + mock_process_response.return_value = { + "Rows": [{"url": "http://example.com", "full_text": "Text", "title": "Title"}], + "TotalRowCount": 1 + } + response = api_instance.sql_query("SELECT * FROM test_index", collection) + assert response == "All 1 records have been processed and updated." + + @patch("sde_collections.sinequa_api.Api.process_response") + def test_get_full_texts(self, mock_process_response, api_instance, collection): + """Test fetching full texts from the API.""" + mock_process_response.return_value = { + "Rows": [{"url": "http://example.com", "text": "Example text", "title": "Example title"}] + } + response = api_instance.get_full_texts(collection_config_folder="folder", source="source", collection=collection) + assert response == "All 0 records have been processed and updated." + + def test_process_and_update_data(self, api_instance, collection): + """Test processing and updating data in the database.""" + batch_data = [ + {"url": "http://example.com", "full_text": "Example text", "title": "Example title"} + ] + api_instance.process_and_update_data(batch_data, collection) + dump_urls = DumpUrl.objects.filter(collection=collection) + assert dump_urls.count() == 1 + assert dump_urls.first().url == "http://example.com" + + @patch("sde_collections.sinequa_api.Api.sql_query") + @patch("sde_collections.models.collection.Collection.migrate_dump_to_delta") + def test_fetch_and_replace_full_text(self, mock_migrate, mock_sql_query, collection): + """Test the fetch_and_replace_full_text Celery task.""" + with patch("sde_collections.sinequa_api.server_configs", { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index" + } + }): + mock_sql_query.return_value = "All records processed" + mock_migrate.return_value = None + + result = fetch_and_replace_full_text(collection.id, "test_server") + assert result == "All records processed" + mock_migrate.assert_called_once() + + @patch("sde_collections.sinequa_api.server_configs", { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index" + } + }) + @pytest.mark.parametrize("server_name, user, password, expected", [ + ("test_server", "user1", "pass1", True), + ("invalid_server", None, None, False) + ]) + def test_api_init(self, server_name, user, password, expected): + """Test API initialization with valid and invalid server names.""" + if expected: + api = Api(server_name=server_name, user=user, password=password) + assert api.server_name == server_name + else: + with pytest.raises(ValueError): + Api(server_name=server_name) + + @patch("requests.post") + def test_query_dev_server_authentication(self, mock_post, api_instance): + """Test query on dev servers requiring authentication.""" + api_instance.server_name = "xli" # Setting a dev server + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"result": "success"}) + + response = api_instance.query(page=1, collection_config_folder="folder") + assert response == {"result": "success"} + + # Extract URL from call_args (positional arguments) + called_url = mock_post.call_args[0][0] # URL is the first positional argument + assert "?Password=test_pass&User=test_user" in called_url + + @patch("sde_collections.sinequa_api.Api.process_response") + def test_sql_query_pagination(self, mock_process_response, api_instance, collection): + """Test SQL query with pagination.""" + mock_process_response.side_effect = [ + {"Rows": [{"url": "http://example.com/1", "full_text": "Text 1", "title": "Title 1"}], "TotalRowCount": 6}, + {"Rows": [{"url": "http://example.com/2", "full_text": "Text 2", "title": "Title 2"}], "TotalRowCount": 6}, + {"Rows": [], "TotalRowCount": 6}, + ] + + result = api_instance.sql_query("SELECT * FROM test_index", collection) + assert result == "All 6 records have been processed and updated." From 280e49134fc0e6186596442bf7cb44165ecb08ab Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 14:31:12 +0000 Subject: [PATCH 07/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- sde_collections/tests/api_tests.py | 85 +++++++++++++++++------------- 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py index 7f34e45f..46487d19 100644 --- a/sde_collections/tests/api_tests.py +++ b/sde_collections/tests/api_tests.py @@ -1,12 +1,14 @@ -#docker-compose -f local.yml run --rm django pytest sde_collections/tests/api_tests.py +# docker-compose -f local.yml run --rm django pytest sde_collections/tests/api_tests.py +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import patch, MagicMock from django.utils import timezone + from sde_collections.models.collection import Collection, WorkflowStatusChoices from sde_collections.models.delta_url import DumpUrl -from sde_collections.tests.factories import CollectionFactory, UserFactory from sde_collections.sinequa_api import Api from sde_collections.tasks import fetch_and_replace_full_text +from sde_collections.tests.factories import CollectionFactory, UserFactory @pytest.mark.django_db @@ -19,20 +21,23 @@ def collection(self): curated_by=user, curation_started=timezone.now(), config_folder="example_config", - workflow_status=WorkflowStatusChoices.RESEARCH_IN_PROGRESS + workflow_status=WorkflowStatusChoices.RESEARCH_IN_PROGRESS, ) @pytest.fixture def api_instance(self): """Fixture to create an Api instance with mocked server configs.""" - with patch("sde_collections.sinequa_api.server_configs", { - "test_server": { - "app_name": "test_app", - "query_name": "test_query", - "base_url": "http://testserver.com/api", - "index": "test_index" - } - }): + with patch( + "sde_collections.sinequa_api.server_configs", + { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index", + } + }, + ): return Api(server_name="test_server", user="test_user", password="test_pass", token="test_token") @patch("requests.post") @@ -69,7 +74,7 @@ def test_sql_query(self, mock_process_response, api_instance, collection): """Test SQL query execution and response processing.""" mock_process_response.return_value = { "Rows": [{"url": "http://example.com", "full_text": "Text", "title": "Title"}], - "TotalRowCount": 1 + "TotalRowCount": 1, } response = api_instance.sql_query("SELECT * FROM test_index", collection) assert response == "All 1 records have been processed and updated." @@ -80,14 +85,14 @@ def test_get_full_texts(self, mock_process_response, api_instance, collection): mock_process_response.return_value = { "Rows": [{"url": "http://example.com", "text": "Example text", "title": "Example title"}] } - response = api_instance.get_full_texts(collection_config_folder="folder", source="source", collection=collection) + response = api_instance.get_full_texts( + collection_config_folder="folder", source="source", collection=collection + ) assert response == "All 0 records have been processed and updated." def test_process_and_update_data(self, api_instance, collection): """Test processing and updating data in the database.""" - batch_data = [ - {"url": "http://example.com", "full_text": "Example text", "title": "Example title"} - ] + batch_data = [{"url": "http://example.com", "full_text": "Example text", "title": "Example title"}] api_instance.process_and_update_data(batch_data, collection) dump_urls = DumpUrl.objects.filter(collection=collection) assert dump_urls.count() == 1 @@ -97,14 +102,17 @@ def test_process_and_update_data(self, api_instance, collection): @patch("sde_collections.models.collection.Collection.migrate_dump_to_delta") def test_fetch_and_replace_full_text(self, mock_migrate, mock_sql_query, collection): """Test the fetch_and_replace_full_text Celery task.""" - with patch("sde_collections.sinequa_api.server_configs", { - "test_server": { - "app_name": "test_app", - "query_name": "test_query", - "base_url": "http://testserver.com/api", - "index": "test_index" - } - }): + with patch( + "sde_collections.sinequa_api.server_configs", + { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index", + } + }, + ): mock_sql_query.return_value = "All records processed" mock_migrate.return_value = None @@ -112,18 +120,21 @@ def test_fetch_and_replace_full_text(self, mock_migrate, mock_sql_query, collect assert result == "All records processed" mock_migrate.assert_called_once() - @patch("sde_collections.sinequa_api.server_configs", { - "test_server": { - "app_name": "test_app", - "query_name": "test_query", - "base_url": "http://testserver.com/api", - "index": "test_index" - } - }) - @pytest.mark.parametrize("server_name, user, password, expected", [ - ("test_server", "user1", "pass1", True), - ("invalid_server", None, None, False) - ]) + @patch( + "sde_collections.sinequa_api.server_configs", + { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index", + } + }, + ) + @pytest.mark.parametrize( + "server_name, user, password, expected", + [("test_server", "user1", "pass1", True), ("invalid_server", None, None, False)], + ) def test_api_init(self, server_name, user, password, expected): """Test API initialization with valid and invalid server names.""" if expected: From 3c1114389844cf6452bd0bb9c8f11ae1603fe0ed Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 Nov 2024 08:45:43 -0600 Subject: [PATCH 08/17] Include_Api_tests #1097 --- sde_collections/tests/api_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py index 46487d19..0a7a9245 100644 --- a/sde_collections/tests/api_tests.py +++ b/sde_collections/tests/api_tests.py @@ -4,7 +4,7 @@ import pytest from django.utils import timezone -from sde_collections.models.collection import Collection, WorkflowStatusChoices +from sde_collections.models.collection import WorkflowStatusChoices from sde_collections.models.delta_url import DumpUrl from sde_collections.sinequa_api import Api from sde_collections.tasks import fetch_and_replace_full_text From 28956276361a055b57bfb7a2d09ff02ace5ffd83 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 25 Nov 2024 15:27:23 -0600 Subject: [PATCH 09/17] celeryworker_updates --- .envs/.local/.django | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 0978166d..172022c5 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -45,5 +45,5 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- -LRM_DEV_TOKEN='' -XLI_TOKEN='' +LRM_DEV_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI5NDgzMzU4LCJzaWQiOiIwRUM1QjI3QjU1RTQ0QjhBODA2QzM2QjY0REM0QkVCNiIsImtpbmQiOiJhY2Nlc3MiLCJzdWIiOiJzaW5lcXVhfGNvc21vc19tbF91c2VyIn0.slzYgP9vr1CE-lVRo3ZzJ7sTlh-S9bBC-bX5PUt5Ns8' +XLI_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI3OTAzMzAzLCJzaWQiOiJCRDkwN0Q4QzJCMjg0MDA2ODQ5OEZFOENCRjdEODQwNiIsImtpbmQiOiJhY2Nlc3MiLCJleHAiOjE3MzU2NzkzMDMsInN1YiI6IlNpbmVxdWF8Z3JhX3VzZXJzIn0.o1a3eDPgEWdoHu7S8KQi0wMw_brxfAM1lClbfncVQVI' From 5f8e7e1bd882b6667ea2f9fd74e3f5c7e1237289 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 25 Nov 2024 15:29:47 -0600 Subject: [PATCH 10/17] latest --- .envs/.local/.django | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 172022c5..54c76263 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -45,5 +45,5 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- -LRM_DEV_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI5NDgzMzU4LCJzaWQiOiIwRUM1QjI3QjU1RTQ0QjhBODA2QzM2QjY0REM0QkVCNiIsImtpbmQiOiJhY2Nlc3MiLCJzdWIiOiJzaW5lcXVhfGNvc21vc19tbF91c2VyIn0.slzYgP9vr1CE-lVRo3ZzJ7sTlh-S9bBC-bX5PUt5Ns8' -XLI_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI3OTAzMzAzLCJzaWQiOiJCRDkwN0Q4QzJCMjg0MDA2ODQ5OEZFOENCRjdEODQwNiIsImtpbmQiOiJhY2Nlc3MiLCJleHAiOjE3MzU2NzkzMDMsInN1YiI6IlNpbmVxdWF8Z3JhX3VzZXJzIn0.o1a3eDPgEWdoHu7S8KQi0wMw_brxfAM1lClbfncVQVI' +LRM_DEV_TOKEN='' +XLI_TOKEN='' \ No newline at end of file From 4747f591fe192fd04090a3af0771d6f443ba48f1 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 25 Nov 2024 15:30:48 -0600 Subject: [PATCH 11/17] latest --- local.yml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/local.yml b/local.yml index 84893914..49611576 100644 --- a/local.yml +++ b/local.yml @@ -62,14 +62,6 @@ services: - postgres ports: [] command: /start-celeryworker - deploy: - resources: - limits: - cpus: '4.0' - memory: 8G - reservations: - cpus: '2.0' - memory: 4G # celerybeat: From 437cd659a32c3455864a33705c17b18b58044d2c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 25 Nov 2024 21:31:25 +0000 Subject: [PATCH 12/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .envs/.local/.django | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 54c76263..0978166d 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -46,4 +46,4 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- LRM_DEV_TOKEN='' -XLI_TOKEN='' \ No newline at end of file +XLI_TOKEN='' From 41dd9e57d9cd7ff9adf14f4fb8fc540d5f064a4d Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 25 Nov 2024 15:35:42 -0600 Subject: [PATCH 13/17] latest --- local.yml | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/local.yml b/local.yml index 49611576..7359ac75 100644 --- a/local.yml +++ b/local.yml @@ -53,16 +53,15 @@ services: image: redis:6 container_name: sde_indexing_helper_local_redis - celeryworker: - <<: *django - image: sde_indexing_helper_local_celeryworker - container_name: sde_indexing_helper_local_celeryworker - depends_on: - - redis - - postgres - ports: [] - command: /start-celeryworker - + celeryworker: + <<: *django + image: sde_indexing_helper_local_celeryworker + container_name: sde_indexing_helper_local_celeryworker + depends_on: + - redis + - postgres + ports: [] + command: /start-celeryworker # celerybeat: # <<: *django From be863894a79c304162722b2bcf4d75ef041f90b1 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 25 Nov 2024 15:38:44 -0600 Subject: [PATCH 14/17] latest_ --- local.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/local.yml b/local.yml index 7359ac75..ebdb810b 100644 --- a/local.yml +++ b/local.yml @@ -53,7 +53,7 @@ services: image: redis:6 container_name: sde_indexing_helper_local_redis - celeryworker: + celeryworker: <<: *django image: sde_indexing_helper_local_celeryworker container_name: sde_indexing_helper_local_celeryworker From 77b1ec328b321d099bea36aa1579f8abc3da601d Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 26 Nov 2024 01:33:50 -0600 Subject: [PATCH 15/17] Fixes #1096 --- sde_collections/sinequa_api.py | 14 +++++++++++-- sde_collections/tests/api_tests.py | 33 +++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index c16277d5..d0713062 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -231,5 +231,15 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll return self.sql_query(sql, collection) @staticmethod - def _process_full_text_response(batch_data: str): - return [{"url": url, "full_text": full_text, "title": title} for url, full_text, title in batch_data["Rows"]] + def _process_full_text_response(batch_data: dict): + if 'Rows' not in batch_data or not isinstance(batch_data['Rows'], list): + raise ValueError("Expected 'Rows' key with a list of data.") + + processed_data = [] + for row in batch_data['Rows']: + # Ensure each row has exactly three elements (url, full_text, title) + if len(row) != 3: + raise ValueError("Each row must contain exactly three elements (url, full_text, title).") + url, full_text, title = row + processed_data.append({"url": url, "full_text": full_text, "title": title}) + return processed_data diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py index 0a7a9245..85db82a8 100644 --- a/sde_collections/tests/api_tests.py +++ b/sde_collections/tests/api_tests.py @@ -147,7 +147,7 @@ def test_api_init(self, server_name, user, password, expected): @patch("requests.post") def test_query_dev_server_authentication(self, mock_post, api_instance): """Test query on dev servers requiring authentication.""" - api_instance.server_name = "xli" # Setting a dev server + api_instance.server_name = "xli" mock_post.return_value = MagicMock(status_code=200, json=lambda: {"result": "success"}) response = api_instance.query(page=1, collection_config_folder="folder") @@ -168,3 +168,34 @@ def test_sql_query_pagination(self, mock_process_response, api_instance, collect result = api_instance.sql_query("SELECT * FROM test_index", collection) assert result == "All 6 records have been processed and updated." + + def test_process_full_text_response(self, api_instance): + """Test that _process_full_text_response correctly processes the data.""" + batch_data = {"Rows": [ + ["http://example.com", "Example text", "Example title"], + ["http://example.net", "Another text", "Another title"] + ]} + expected_output = [ + {"url": "http://example.com", "full_text": "Example text", "title": "Example title"}, + {"url": "http://example.net", "full_text": "Another text", "title": "Another title"} + ] + result = api_instance._process_full_text_response(batch_data) + assert result == expected_output + + def test_process_full_text_response_with_invalid_data(self, api_instance): + """Test that _process_full_text_response raises an error with invalid data.""" + # Test for missing 'Rows' key + invalid_data_no_rows = {} # No 'Rows' key + with pytest.raises(ValueError, match="Expected 'Rows' key with a list of data"): + api_instance._process_full_text_response(invalid_data_no_rows) + + # Test for incorrect row length + invalid_data_wrong_length = {"Rows": [["http://example.com", "Example text"]]} # Missing 'title' + with pytest.raises(ValueError, match="Each row must contain exactly three elements"): + api_instance._process_full_text_response(invalid_data_wrong_length) + + @patch("sde_collections.sinequa_api.Api._get_token", return_value=None) + def test_sql_query_missing_token(self, mock_get_token, api_instance, collection): + """Test that sql_query raises an error when no token is provided.""" + with pytest.raises(ValueError, match="A token is required to use the SQL endpoint"): + api_instance.sql_query("SELECT * FROM test_table", collection) From 60fdac1f91c9324f9664fdb0a1a41369db85b5d2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 26 Nov 2024 07:34:12 +0000 Subject: [PATCH 16/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- sde_collections/sinequa_api.py | 4 ++-- sde_collections/tests/api_tests.py | 12 +++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index d0713062..03d3a724 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -232,11 +232,11 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll @staticmethod def _process_full_text_response(batch_data: dict): - if 'Rows' not in batch_data or not isinstance(batch_data['Rows'], list): + if "Rows" not in batch_data or not isinstance(batch_data["Rows"], list): raise ValueError("Expected 'Rows' key with a list of data.") processed_data = [] - for row in batch_data['Rows']: + for row in batch_data["Rows"]: # Ensure each row has exactly three elements (url, full_text, title) if len(row) != 3: raise ValueError("Each row must contain exactly three elements (url, full_text, title).") diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py index 85db82a8..8abb5f08 100644 --- a/sde_collections/tests/api_tests.py +++ b/sde_collections/tests/api_tests.py @@ -171,13 +171,15 @@ def test_sql_query_pagination(self, mock_process_response, api_instance, collect def test_process_full_text_response(self, api_instance): """Test that _process_full_text_response correctly processes the data.""" - batch_data = {"Rows": [ - ["http://example.com", "Example text", "Example title"], - ["http://example.net", "Another text", "Another title"] - ]} + batch_data = { + "Rows": [ + ["http://example.com", "Example text", "Example title"], + ["http://example.net", "Another text", "Another title"], + ] + } expected_output = [ {"url": "http://example.com", "full_text": "Example text", "title": "Example title"}, - {"url": "http://example.net", "full_text": "Another text", "title": "Another title"} + {"url": "http://example.net", "full_text": "Another text", "title": "Another title"}, ] result = api_instance._process_full_text_response(batch_data) assert result == expected_output From c4c1bc1c8796f29693f68e080f9380559fd038bf Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Tue, 26 Nov 2024 15:11:41 -0600 Subject: [PATCH 17/17] improved doc strings and the errors --- sde_collections/sinequa_api.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index 03d3a724..aa559474 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -64,7 +64,7 @@ class Api: def __init__(self, server_name: str = None, user: str = None, password: str = None, token: str = None) -> None: self.server_name = server_name if server_name not in server_configs: - raise ValueError(f"Server name '{server_name}' is not in server_configs") + raise ValueError(f"Invalid server configuration: '{server_name}' is not a recognized server name") self.config = server_configs[server_name] self.app_name: str = self.config["app_name"] @@ -72,7 +72,6 @@ def __init__(self, server_name: str = None, user: str = None, password: str = No self.base_url: str = self.config["base_url"] self.dev_servers = ["xli", "lrm_dev", "lrm_qa"] - # Store provided values only self._provided_user = user self._provided_password = password self._provided_token = token @@ -116,7 +115,8 @@ def query(self, page: int, collection_config_folder: str | None = None, source: password = self._get_password() if not user or not password: raise ValueError( - "User and password are required for the query endpoint on the following servers: {self.dev_servers}" + f"Authentication error: Missing credentials for dev server '{self.server_name}'. " + f"Both username and password are required for servers: {', '.join(self.dev_servers)}" ) authentication = f"?Password={password}&User={user}" url = f"{url}{authentication}" @@ -139,10 +139,9 @@ def query(self, page: int, collection_config_folder: str | None = None, source: return self.process_response(url, payload) def sql_query(self, sql: str, collection) -> Any: - """Executes an SQL query on the configured server using token-based authentication with pagination.""" token = self._get_token() if not token: - raise ValueError("A token is required to use the SQL endpoint") + raise ValueError("Authentication error: Token is required for SQL endpoint access") page = 0 page_size = 5000 # Number of records per page @@ -165,7 +164,6 @@ def sql_query(self, sql: str, collection) -> Any: total_row_count = response.get("TotalRowCount", 0) processed_response = self._process_full_text_response(response) self.process_and_update_data(processed_response, collection) - print(f"Batch {page + 1} is being processed and updated") # Check if all rows have been fetched if len(batch_data) == 0 or (skip_records + page_size) >= total_row_count: @@ -183,7 +181,6 @@ def process_and_update_data(self, batch_data, collection): url = record["url"] scraped_text = record.get("full_text", "") scraped_title = record.get("title", "") - # Ensure the collection is included in the defaults DumpUrl.objects.update_or_create( url=url, defaults={ @@ -193,6 +190,7 @@ def process_and_update_data(self, batch_data, collection): }, ) except KeyError as e: + # TODO: reevaluate whether this should be a Raise and break the code print(f"Missing key in data: {str(e)}") except Exception as e: print(f"Error processing record: {str(e)}") @@ -225,7 +223,10 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll source = self._get_source_name() if (index := self.config.get("index")) is None: - raise ValueError("Index not defined for this server") + raise ValueError( + f"Configuration error: Index not defined for server '{self.server_name}'. " + "Please update server configuration with the required index." + ) sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'" return self.sql_query(sql, collection) @@ -233,13 +234,18 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll @staticmethod def _process_full_text_response(batch_data: dict): if "Rows" not in batch_data or not isinstance(batch_data["Rows"], list): - raise ValueError("Expected 'Rows' key with a list of data.") + raise ValueError( + "Invalid response format: Expected 'Rows' key with list data in Sinequa server response. " + f"Received: {type(batch_data.get('Rows', None))}" + ) processed_data = [] - for row in batch_data["Rows"]: - # Ensure each row has exactly three elements (url, full_text, title) + for idx, row in enumerate(batch_data["Rows"]): if len(row) != 3: - raise ValueError("Each row must contain exactly three elements (url, full_text, title).") + raise ValueError( + f"Invalid row format at index {idx}: Expected exactly three elements (url, full_text, title). " + f"Received {len(row)} elements." + ) url, full_text, title = row processed_data.append({"url": url, "full_text": full_text, "title": title}) return processed_data