Skip to content

Commit

Permalink
Merge pull request #144 from lsst-sqre/tickets/DM-40605
Browse files Browse the repository at this point in the history
DM-40605: Improve Algolia audit job's reliability
  • Loading branch information
jonathansick authored Sep 5, 2023
2 parents 39b76d8 + c812542 commit b813002
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 47 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

<!-- scriv-insert-here -->

<a id='changelog-0.7.1'></a>
## 0.7.1 (2023-09-05)

### Bug fixes

- Improved and logging and exception reporting around the `ook audit` command.
- Fixed the `base_url` attribute's JSON alias for the Algolia DocumentRecord model. Was `baseURL` and is now restored to `baseUrl`.
- Fix typo in creating records for Lander content types (`source_update_time` and `source_update_timestamp` fields).

<a id='changelog-0.7.0'></a>
## 0.7.0 (2023-08-31)

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ ignore = [
"TD004", # skip requiring TODOs to have issue links
"TID252", # if we're going to use relative imports, use them always
"TRY003", # good general advice but lint is way too aggressive
"TRY400", # We want JSON formatting of the exception messages
]
select = ["ALL"]
target-version = "py311"
Expand Down
11 changes: 7 additions & 4 deletions src/ook/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import structlog
from algoliasearch.search_client import SearchClient
from safir.asyncio import run_with_asyncio
from safir.logging import configure_logging

# from ook.factory import Factory # noqa: ERA001
from ook.config import Configuration
from ook.config import config
from ook.domain.algoliarecord import MinimalDocumentModel
from ook.factory import Factory
from ook.services.algoliadocindex import AlgoliaDocIndexService
Expand All @@ -29,6 +29,11 @@ def main() -> None:
Administrative command-line interface for ook.
"""
configure_logging(
profile=config.profile,
log_level=config.log_level,
name="ook",
)


@main.command()
Expand Down Expand Up @@ -63,7 +68,6 @@ async def upload_doc_stub(dataset: Path) -> None:
The schema for the document stub is the
`ook.domain.algoliarecord.MinimalDocumentModel` Pydantic class.
"""
config = Configuration()
logger = structlog.get_logger("ook")
if any(
_ is None
Expand Down Expand Up @@ -97,7 +101,6 @@ async def audit(*, reingest: bool = False) -> None:
"""Audit the Algolia document index and check if any documents are missing
based on the listing of projects registered in the LTD Keeper service.
"""
config = Configuration()
logger = structlog.get_logger("ook")
if any(
_ is None
Expand Down
2 changes: 1 addition & 1 deletion src/ook/domain/algoliarecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class DocumentRecord(BaseModel):
"The base URL of the record (whereas ``url`` may refer to an "
"anchor link."
),
alias="baseURL",
alias="baseUrl",
)

content: str = Field(description="The full-text content of the record.")
Expand Down
40 changes: 40 additions & 0 deletions src/ook/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Ook's exceptions."""

from __future__ import annotations

__all__ = ["LtdSlugClassificationError"]


class LtdSlugClassificationError(Exception):
"""An error occurred during classification and ingest queueing for an
LTD document.
"""

def __init__(
self,
message: str,
*,
product_slug: str,
edition_slug: str,
error: Exception | None = None,
) -> None:
"""Initialize the exception.
Parameters
----------
message
A message describing the error.
"""
self.product_slug = product_slug
self.edition_slug = edition_slug
self.error = error
super().__init__(message)

def __str__(self) -> str:
message = (
f"Unable to queue ingest for LTD slug: {self.product_slug} "
f"({self.edition_slug}): {super().__str__()}"
)
if self.error is not None:
message += f"\n\n{self.error}"
return message
22 changes: 15 additions & 7 deletions src/ook/handlers/kafka/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from typing import TYPE_CHECKING, Any

from dataclasses_avroschema.avrodantic import AvroBaseModel
from structlog import get_logger
from structlog.stdlib import BoundLogger

Expand All @@ -23,13 +24,21 @@

def bind_logger_with_message_metadata(
logger: BoundLogger,
*,
message_metadata: MessageMetadata,
key: AvroBaseModel,
value: AvroBaseModel,
) -> BoundLogger:
"""Bind a logger with message metadata."""
return logger.bind(
kafka_topic=message_metadata.topic,
kafka_partition=message_metadata.partition,
kafka_offset=message_metadata.offset,
kafka_key=key.dict(),
kafka_value=value.dict(),
serialized_key_size=message_metadata.serialized_key_size,
serialized_value_size=message_metadata.serialized_value_size,
kafka_headers=message_metadata.headers,
)


Expand All @@ -43,17 +52,16 @@ async def handle_ltd_document_ingest(
"""Handle a message requesting an ingest for an LTD document."""
logger = bind_logger_with_message_metadata(
get_logger("ook"),
message_metadata,
message_metadata=message_metadata,
key=key,
value=value,
)
logger = logger.bind(
ltd_slug=value.project.slug, content_type=value.content_type.value
)
logger = logger.bind(content_type=value.content_type.value)

logger.info(
"Starting processing of LTD document ingest request.",
key=key.json(),
value=value.json(),
serialized_key_size=message_metadata.serialized_key_size,
serialized_value_size=message_metadata.serialized_value_size,
kafka_headers=message_metadata.headers,
)

factory = await Factory.create(logger=logger)
Expand Down
3 changes: 1 addition & 2 deletions src/ook/handlers/kafka/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,12 @@ async def start(self) -> None:
)
try:
await self._handle_message(msg)
except Exception as e:
except Exception:
self._logger.exception(
"Error handling message",
topic=msg.topic,
partition=msg.partition,
offset=msg.offset,
exception=e,
)
self._logger.debug(
"Finished handling message",
Expand Down
20 changes: 17 additions & 3 deletions src/ook/services/algoliaaudit.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,24 @@ async def audit_missing_documents(
)

if ingest_missing and len(missing_docs) > 0:
reingest_count = 0
for doc in missing_docs:
await self._classifier.queue_ingest_for_ltd_product_slug(
product_slug=doc.slug, edition_slug="main"
)
try:
await self._classifier.queue_ingest_for_ltd_product_slug(
product_slug=doc.slug, edition_slug="main"
)
reingest_count += 1
except Exception:
self._logger.exception(
"Failed to queue ingest for missing document",
handle=doc.slug.upper(),
published_url=doc.published_url,
)
self._logger.info(
"Queued ingest for missing documents",
queued=reingest_count,
failed=len(missing_docs) - reingest_count,
)

return missing_docs

Expand Down
72 changes: 47 additions & 25 deletions src/ook/services/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ook.services.kafkaproducer import PydanticKafkaProducer
from ook.services.ltdmetadataservice import LtdMetadataService

from ..exceptions import LtdSlugClassificationError
from .githubmetadata import GitHubMetadataService

__all__ = ["ClassificationService"]
Expand Down Expand Up @@ -76,33 +77,54 @@ async def queue_ingest_for_ltd_product_slug(

date_rebuilt = parse_isodatetime(edition_data["date_rebuilt"])
if date_rebuilt is None:
raise RuntimeError(
f"Could not parse date_rebuilt {edition_data['date_rebuilt']}"
raise LtdSlugClassificationError(
"Could not parse date_rebuilt: "
f"{edition_data['date_rebuilt']}",
product_slug=product_slug,
edition_slug=edition_slug,
error=None,
)

kafka_key = UrlIngestKeyV1(url=published_url)
kafka_value = LtdUrlIngestV1(
url=published_url,
content_type=content_type,
request_timestamp=datetime.now(tz=UTC),
update_timestamp=date_rebuilt,
edition=LtdEditionV1(
slug=edition_slug,
published_url=edition_data["published_url"],
url=edition_data["self_url"],
build_url=edition_data["build_url"],
),
project=LtdProjectV1(
slug=product_slug,
published_url=product_data["published_url"],
url=product_data["self_url"],
),
)
await self._kafka_producer.send(
topic=config.ingest_kafka_topic,
key=kafka_key,
value=kafka_value,
)
try:
kafka_key = UrlIngestKeyV1(url=published_url)
kafka_value = LtdUrlIngestV1(
url=published_url,
content_type=content_type,
request_timestamp=datetime.now(tz=UTC),
update_timestamp=date_rebuilt,
edition=LtdEditionV1(
slug=edition_slug,
published_url=edition_data["published_url"],
url=edition_data["self_url"],
build_url=edition_data["build_url"],
),
project=LtdProjectV1(
slug=product_slug,
published_url=product_data["published_url"],
url=product_data["self_url"],
),
)
except Exception as e:
raise LtdSlugClassificationError(
"Failed to create Kafka ingest key/value",
product_slug=product_slug,
edition_slug=edition_slug,
error=e,
) from e

try:
await self._kafka_producer.send(
topic=config.ingest_kafka_topic,
key=kafka_key,
value=kafka_value,
)
except Exception as e:
raise LtdSlugClassificationError(
"Failed to send Kafka ingest message",
product_slug=product_slug,
edition_slug=edition_slug,
error=e,
) from e

async def queue_ingest_for_ltd_product_slug_pattern(
self, *, product_slug_pattern: str, edition_slug: str
Expand Down
10 changes: 7 additions & 3 deletions src/ook/services/landerjsonldingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ async def save_to_algolia(

await self._algolia_service.save_document_records(records)

self._logger.info("Finished building records")
self._logger.info(
"Finished uploading document records",
record_count=len(records),
surrogate_key=records[0].surrogate_key,
)

def _create_records(
self,
Expand Down Expand Up @@ -170,9 +174,9 @@ def _create_record(
record_args = {
"object_id": object_id,
"surrogate_key": surrogate_key,
"source_update_Time": format_utc_datetime(document.timestamp),
"source_update_time": format_utc_datetime(document.timestamp),
"source_update_timestamp": format_timestamp(document.timestamp),
"source_creation_Timestamp": (
"source_creation_timestamp": (
format_timestamp(creation_date) if creation_date else None
),
"record_update_time": format_utc_datetime(datetime.now(tz=UTC)),
Expand Down
8 changes: 6 additions & 2 deletions src/ook/services/sphinxtechnoteingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,14 @@ async def save_to_algolia(
self._logger.exception("Failed to build records")
raise

self._logger.info("Finished building records")

await self._algolia_service.save_document_records(records)

self._logger.info(
"Finished uploading document records",
record_count=len(records),
surrogate_key=records[0].surrogate_key,
)

def _create_records(
self,
reduced_technote: ReducedLtdSphinxTechnote,
Expand Down

0 comments on commit b813002

Please sign in to comment.