Skip to content

Commit

Permalink
feat: better import
Browse files Browse the repository at this point in the history
* enable removing items in live updates and imports
* fix a potential bug in get_processed_since
* add documentation
  • Loading branch information
alexgarel committed Aug 6, 2024
1 parent c495b10 commit 1053f49
Show file tree
Hide file tree
Showing 10 changed files with 476 additions and 114 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
ignore = E203, E501, W503
max-line-length = 88
exclude = .git,__pycache__,build,dist,*_pb2.py,.venv
max-doc-length = 79
max-doc-length = 88
183 changes: 130 additions & 53 deletions app/_import.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import math
from datetime import datetime
from multiprocessing import Pool
from pathlib import Path
Expand All @@ -10,7 +11,7 @@
from elasticsearch_dsl import Index, Search
from redis import Redis

from app._types import JSONType
from app._types import FetcherResult, FetcherStatus, JSONType
from app.config import Config, IndexConfig, TaxonomyConfig
from app.indexing import (
DocumentProcessor,
Expand All @@ -29,7 +30,7 @@ def __init__(self, config: IndexConfig) -> None:
self.config = config

@abc.abstractmethod
def fetch_document(self, stream_name: str, item: JSONType) -> JSONType | None:
def fetch_document(self, stream_name: str, item: JSONType) -> FetcherResult:
"""Fetch a document using elements coming from a Redis stream.
The Redis stream contains information about documents that were
Expand All @@ -38,6 +39,7 @@ def fetch_document(self, stream_name: str, item: JSONType) -> JSONType | None:
:param stream_name: the name of the Redis stream
:param item: the item from the Redis stream
:return: the fetched document and a status that will pilot the action to be done
"""
pass

Expand All @@ -59,9 +61,9 @@ def get_processed_since(
id_field_name: str,
document_fetcher: BaseDocumentFetcher,
batch_size: int = 100,
) -> Iterator[tuple[int, JSONType]]:
) -> Iterator[tuple[int, FetcherResult]]:
"""Fetches all the documents that have been processed since the given
timestamp.
timestamp (using redis event stream).
:param redis_client: the Redis client
:param redis_stream_name: the name of the Redis stream to read from
Expand All @@ -71,7 +73,8 @@ def get_processed_since(
:param batch_size: the size of the batch to fetch, defaults to 100
:yield: a tuple containing the timestamp (in milliseconds) and the document
"""
fetched_ids = set()
# store fetched ids with a timestamp
fetched_ids: dict[str, int] = {}
# We start from the given timestamp
min_id = f"{start_timestamp_ms}-0"

Expand All @@ -96,15 +99,32 @@ def get_processed_since(
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])
# Avoid fetching the same ID repeatedly
if id_ in fetched_ids:
if id_ in fetched_ids and fetched_ids[id_] > timestamp:
logger.debug(f"Skipping ID {id_} because it was already fetched")
continue
fetched_ids.add(id_)
document = document_fetcher.fetch_document(redis_stream_name, item)
if document is None:
logger.debug(f"Skipping ID {id_} because it was not found")
continue
yield timestamp, document
# use current timestamp
# as a pessimistic timestamp of last index update
# *1000 because python timestamp are in seconds,
# redis in milliseconds
fetched_ids[id_] = math.floor(datetime.now().timestamp() * 1000)
result = document_fetcher.fetch_document(redis_stream_name, item)
if result.status == FetcherStatus.SKIP:
logger.debug(f"Skipping ID {id_} because fetches stated to do so")
elif result.status == FetcherStatus.RETRY:
logger.warn(
f"Should retry ID {id_} due to status RETRY, but it's not yet implemented !"
)
elif result.status == FetcherStatus.REMOVED:
yield timestamp, result
elif result.status == FetcherStatus.FOUND:
if result.document is None:
logger.error(
f"Document is None for ID {id_}, while status is FOUND !"
)
else:
yield timestamp, result
else:
logger.debug(f"Skipping ID {id_} due to status {result.status.name}")


def get_new_updates(
Expand All @@ -113,7 +133,7 @@ def get_new_updates(
id_field_names: dict[str, str],
document_fetchers: dict[str, BaseDocumentFetcher],
batch_size: int = 100,
) -> Iterator[tuple[str, int, JSONType]]:
) -> Iterator[tuple[str, int, FetcherResult]]:
"""Reads new updates from Redis Stream, starting from the moment this
function is called.
Expand Down Expand Up @@ -152,27 +172,52 @@ def get_new_updates(
logger.debug("Fetched ID: %s", id_)
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])
document = document_fetcher.fetch_document(stream_name, item)
if document is None:
result = document_fetcher.fetch_document(stream_name, item)
if result.status == FetcherStatus.SKIP:
logger.debug(
f"Skipping ID {id_} in {stream_name} because fetches stated to do so"
)
elif result.status == FetcherStatus.RETRY:
logger.warn(
f"Should retry ID {id_} in {stream_name} due to status RETRY, "
"but it's not yet implemented !"
)
elif result.status == FetcherStatus.REMOVED:
yield stream_name, timestamp, result
elif result.status == FetcherStatus.FOUND:
if result.document is None:
logger.error(
f"Document is None for ID {id_} in {stream_name}, while status is FOUND !"
)
else:
yield stream_name, timestamp, result
else:
logger.debug(
"Stream %s: Skipping ID %s because it was not found",
stream_name,
id_,
f"Skipping ID {id_} in {stream_name} due to status {result.status.name}"
)
continue
yield stream_name, timestamp, document


def get_document_dict(
processor: DocumentProcessor, row: JSONType, next_index: str
processor: DocumentProcessor, result: FetcherResult, index_name: str
) -> JSONType | None:
"""Return the document dict suitable for a bulk insert operation."""
document = processor.from_dict(row)
if result.document is None:
return None
result = processor.from_result(result)
document = result.document
if not document:
return None

_id = document.pop("_id")
return {"_source": document, "_index": next_index, "_id": _id}
elif result.status == FetcherStatus.FOUND:
_id = document.pop("_id")
return {"_source": document, "_index": index_name, "_id": _id}
elif result.status == FetcherStatus.REMOVED:
return {
"_op_type": "delete",
"_index": index_name,
"_id": document["_id"],
}
else:
return None


def gen_documents(
Expand All @@ -194,7 +239,11 @@ def gen_documents(
if i % num_processes != process_id:
continue

document_dict = get_document_dict(processor, row, next_index)
document_dict = get_document_dict(
processor,
FetcherResult(status=FetcherStatus.FOUND, document=row),
next_index,
)
if not document_dict:
continue

Expand Down Expand Up @@ -293,6 +342,7 @@ def import_parallel(
)
if not success:
logger.error("Encountered errors: %s", errors)
return success, errors


def import_taxonomies(config: IndexConfig, next_index: str):
Expand Down Expand Up @@ -342,29 +392,30 @@ def get_redis_products(
logger.info("Processing redis updates since %s", last_updated_timestamp_ms)
redis_client = connection.get_redis_client()
processed = 0
for _, row in get_processed_since(
for _, result in get_processed_since(
redis_client,
stream_name,
last_updated_timestamp_ms,
id_field_name,
document_fetcher=fetcher,
):
yield get_document_dict(processor, row, index)
document_dict = get_document_dict(processor, result, index)
if document_dict:
yield document_dict
processed += 1
logger.info("Processed %d updates from Redis", processed)


def get_redis_updates(
es_client: Elasticsearch, index: str, config: IndexConfig
) -> None:
def get_redis_updates(es_client: Elasticsearch, index: str, config: IndexConfig) -> int:
"""Fetch updates from Redis and index them.
:param index: the index to write to
:param config: the index configuration to use
:return: the number of errors encountered
"""
if config.redis_stream_name is None:
logger.info(f"Redis updates are disabled for index {index}")
return
return 0

processor = DocumentProcessor(config)
fetcher = load_document_fetcher(config)
Expand All @@ -385,6 +436,7 @@ def get_redis_updates(
last_updated_timestamp_ms = int(last_updated_timestamp * 1000)
id_field_name = config.index.id_field_name
# Since this is only done by a single process, we can use parallel_bulk
num_errors = 0
for success, info in parallel_bulk(
es_client,
get_redis_products(
Expand All @@ -398,14 +450,17 @@ def get_redis_updates(
):
if not success:
logger.warning("A document failed: %s", info)
num_errors += 1
return num_errors


def run_full_import(
def run_items_import(
file_path: Path,
num_processes: int,
config: IndexConfig,
num_items: int | None = None,
skip_updates: bool = False,
partial: bool = False,
):
"""Run a full data import from a JSONL.
Expand All @@ -420,16 +475,23 @@ def run_full_import(
import
:param config: the index configuration to use
:param num_items: the number of items to import, defaults to None (all)
:param skip_updates: if True, skip the updates from Redis
:param partial: (exclusive with `skip_updates`),
if True consider we don't have a full import,
and directly updates items in current index.
"""
es_client = connection.get_es_client()
# we create a temporary index to import to
# at the end we will change alias to point to it
index_date = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
next_index = f"{config.index.name}-{index_date}"

index = generate_index_object(next_index, config)
# create the index
index.save()
if not partial:
# we create a temporary index to import to
# at the end we will change alias to point to it
index_date = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
next_index = f"{config.index.name}-{index_date}"
index = generate_index_object(next_index, config)
# create the index
index.save()
else:
# use current index
next_index = config.index.name

# split the work between processes
args = []
Expand All @@ -445,13 +507,18 @@ def run_full_import(
)
)
# run in parallel
num_errors = 0
with Pool(num_processes) as pool:
pool.starmap(import_parallel, args)
for success, errors in pool.starmap(import_parallel, args):
if not success:
num_errors += len(errors)
# update with last index updates (hopefully since the jsonl)
if not skip_updates:
get_redis_updates(es_client, next_index, config)
# make alias point to new index
update_alias(es_client, next_index, config.index.name)
num_errors += get_redis_updates(es_client, next_index, config)
if not partial:
# make alias point to new index
update_alias(es_client, next_index, config.index.name)
return num_errors


def perform_taxonomy_import(config: IndexConfig) -> None:
Expand Down Expand Up @@ -510,20 +577,30 @@ def run_update_daemon(config: Config) -> None:
id_field_names[stream_name] = index_config.index.id_field_name
stream_name_to_index_id[stream_name] = index_id

for stream_name, _, document in get_new_updates(
for stream_name, _, result in get_new_updates(
redis_client,
list(id_field_names.keys()),
id_field_names=id_field_names,
document_fetchers=document_fetchers,
):
processed_document = processors[stream_name].from_dict(document)
if not processed_document:
processed_result = processors[stream_name].from_result(result)
processed_document = processed_result.document
if (
processed_result.status not in (FetcherStatus.FOUND, FetcherStatus.REMOVED)
or processed_document is None
):
continue
_id = processed_document.pop("_id")
index_id = stream_name_to_index_id[stream_name]
logger.debug("Document:\n%s", processed_document)
es_client.index(
index=config.indices[index_id].index.name,
body=processed_document,
id=_id,
)
if processed_result.status == FetcherStatus.REMOVED:
es_client.delete(
index=config.indices[index_id].index.name,
id=_id,
)
else:
es_client.index(
index=config.indices[index_id].index.name,
body=processed_document,
id=_id,
)
25 changes: 25 additions & 0 deletions app/_types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import textwrap
from enum import Enum
from functools import cached_property
from typing import Annotated, Any, Literal, Optional, Tuple, Union, cast, get_type_hints

Expand Down Expand Up @@ -432,3 +433,27 @@ class GetSearchParamsTypes:
facets = _annotation_new_type(str, SEARCH_PARAMS_ANN["facets"])
charts = _annotation_new_type(str, SEARCH_PARAMS_ANN["charts"])
index_id = SEARCH_PARAMS_ANN["index_id"]


class FetcherStatus(Enum):
"""Status of a fetcher
* FOUND - document was found, index it
* REMOVED - document was removed, remove it
* SKIP - skip this document / update
* RETRY - retry this document / update later
* OTHER - unknown error
"""

FOUND = 1
REMOVED = -1
SKIP = 0
RETRY = 2
OTHER = 3


class FetcherResult(BaseModel):
"""Result for a document fecher"""

status: FetcherStatus
document: JSONType | None
2 changes: 1 addition & 1 deletion app/charts.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def build_scatter_chart(
def _get(v, path):
return reduce(lambda c, k: c.get(k, {}), path.split("."), v)

chart = empty_chart(f"{chart_option.x} x { chart_option.y }")
chart = empty_chart(f"{chart_option.x} x {chart_option.y}")

# nutriments.xxx is broken in vega.
# I think it searches for nutriments[xxx]
Expand Down
Loading

0 comments on commit 1053f49

Please sign in to comment.