Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination on the Sinequa sql.engine Api #1104

Open
wants to merge 18 commits into
base: dev
Choose a base branch
from
Open
111 changes: 84 additions & 27 deletions sde_collections/sinequa_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import requests
import urllib3
from django.conf import settings
from django.db import transaction

from .models.delta_url import DumpUrl

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Expand Down Expand Up @@ -61,15 +64,14 @@ class Api:
def __init__(self, server_name: str = None, user: str = None, password: str = None, token: str = None) -> None:
self.server_name = server_name
if server_name not in server_configs:
raise ValueError(f"Server name '{server_name}' is not in server_configs")
raise ValueError(f"Invalid server configuration: '{server_name}' is not a recognized server name")

self.config = server_configs[server_name]
self.app_name: str = self.config["app_name"]
self.query_name: str = self.config["query_name"]
self.base_url: str = self.config["base_url"]
self.dev_servers = ["xli", "lrm_dev", "lrm_qa"]

# Store provided values only
self._provided_user = user
self._provided_password = password
self._provided_token = token
Expand Down Expand Up @@ -113,7 +115,8 @@ def query(self, page: int, collection_config_folder: str | None = None, source:
password = self._get_password()
if not user or not password:
raise ValueError(
"User and password are required for the query endpoint on the following servers: {self.dev_servers}"
f"Authentication error: Missing credentials for dev server '{self.server_name}'. "
f"Both username and password are required for servers: {', '.join(self.dev_servers)}"
)
authentication = f"?Password={password}&User={user}"
url = f"{url}{authentication}"
Expand All @@ -135,25 +138,64 @@ def query(self, page: int, collection_config_folder: str | None = None, source:

return self.process_response(url, payload)

def sql_query(self, sql: str) -> Any:
"""Executes an SQL query on the configured server using token-based authentication."""
def sql_query(self, sql: str, collection) -> Any:
token = self._get_token()
if not token:
raise ValueError("A token is required to use the SQL endpoint")

url = f"{self.base_url}/api/v1/engine.sql"
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
raw_payload = json.dumps(
{
"method": "engine.sql",
"sql": sql,
"pretty": True,
}
)

return self.process_response(url, headers=headers, raw_data=raw_payload)

def get_full_texts(self, collection_config_folder: str, source: str = None) -> Any:
raise ValueError("Authentication error: Token is required for SQL endpoint access")

page = 0
page_size = 5000 # Number of records per page
skip_records = 0

while True:
paginated_sql = f"{sql} SKIP {skip_records} COUNT {page_size}"
url = f"{self.base_url}/api/v1/engine.sql"
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
raw_payload = json.dumps(
{
"method": "engine.sql",
"sql": paginated_sql,
"pretty": True,
}
)

response = self.process_response(url, headers=headers, raw_data=raw_payload)
batch_data = response.get("Rows", [])
total_row_count = response.get("TotalRowCount", 0)
processed_response = self._process_full_text_response(response)
self.process_and_update_data(processed_response, collection)

# Check if all rows have been fetched
if len(batch_data) == 0 or (skip_records + page_size) >= total_row_count:
break

page += 1
skip_records += page_size

return f"All {total_row_count} records have been processed and updated."

def process_and_update_data(self, batch_data, collection):
for record in batch_data:
try:
with transaction.atomic():
url = record["url"]
scraped_text = record.get("full_text", "")
scraped_title = record.get("title", "")
DumpUrl.objects.update_or_create(
url=url,
defaults={
"scraped_text": scraped_text,
"scraped_title": scraped_title,
"collection": collection,
},
)
except KeyError as e:
# TODO: reevaluate whether this should be a Raise and break the code
print(f"Missing key in data: {str(e)}")
except Exception as e:
print(f"Error processing record: {str(e)}")

def get_full_texts(self, collection_config_folder: str, source: str = None, collection=None) -> Any:
"""
Retrieves the full texts, URLs, and titles for a specified collection.

Expand Down Expand Up @@ -181,14 +223,29 @@ def get_full_texts(self, collection_config_folder: str, source: str = None) -> A
source = self._get_source_name()

if (index := self.config.get("index")) is None:
raise ValueError("Index not defined for this server")
raise ValueError(
f"Configuration error: Index not defined for server '{self.server_name}'. "
"Please update server configuration with the required index."
)

sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'"
full_text_response = self.sql_query(sql)
return self._process_full_text_response(full_text_response)
return self.sql_query(sql, collection)

@staticmethod
def _process_full_text_response(full_text_response: str):
return [
{"url": url, "full_text": full_text, "title": title} for url, full_text, title in full_text_response["Rows"]
]
def _process_full_text_response(batch_data: dict):
if "Rows" not in batch_data or not isinstance(batch_data["Rows"], list):
raise ValueError(
"Invalid response format: Expected 'Rows' key with list data in Sinequa server response. "
f"Received: {type(batch_data.get('Rows', None))}"
)

processed_data = []
for idx, row in enumerate(batch_data["Rows"]):
if len(row) != 3:
raise ValueError(
f"Invalid row format at index {idx}: Expected exactly three elements (url, full_text, title). "
f"Received {len(row)} elements."
)
url, full_text, title = row
processed_data.append({"url": url, "full_text": full_text, "title": title})
return processed_data
32 changes: 8 additions & 24 deletions sde_collections/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from django.conf import settings
from django.core import management
from django.core.management.commands import loaddata
from django.db import IntegrityError

from config import celery_app

Expand Down Expand Up @@ -145,13 +144,11 @@ def resolve_title_pattern(title_pattern_id):
title_pattern.apply()


@celery_app.task
@celery_app.task(soft_time_limit=600)
def fetch_and_replace_full_text(collection_id, server_name):
"""
Task to fetch and replace full text and metadata for all URLs associated with a specified collection
from a given server. This task deletes all existing DumpUrl entries for the collection and creates
new entries based on the latest fetched data.

Task to initiate fetching and replacing full text and metadata for all URLs associated with a specified collection
from a given server.
Args:
collection_id (int): The identifier for the collection in the database.
server_name (str): The name of the server.
Expand All @@ -161,28 +158,15 @@ def fetch_and_replace_full_text(collection_id, server_name):
"""
collection = Collection.objects.get(id=collection_id)
api = Api(server_name)
documents = api.get_full_texts(collection.config_folder)

# Step 1: Delete all existing DumpUrl entries for the collection
deleted_count, _ = DumpUrl.objects.filter(collection=collection).delete()
print(f"Deleted {deleted_count} old records.")

# Step 2: Create new DumpUrl entries from the fetched documents
processed_count = 0
for doc in documents:
try:
DumpUrl.objects.create(
url=doc["url"],
collection=collection,
scraped_text=doc.get("full_text", ""),
scraped_title=doc.get("title", ""),
)
processed_count += 1
except IntegrityError:
# Handle duplicate URL case if needed
print(f"Duplicate URL found, skipping: {doc['url']}")
# Step 2: Fetch and process new data
result_message = api.get_full_texts(collection.config_folder, collection=collection)

# Step 3: Migrate DumpUrl to DeltaUrl
collection.migrate_dump_to_delta()

print(f"Processed {processed_count} new records.")

return f"Successfully processed {len(documents)} records and updated the database."
return result_message
Loading