From a3d48dedfed353304543f8e5c9927a72541cd862 Mon Sep 17 00:00:00 2001 From: philip Date: Tue, 19 Nov 2024 13:53:12 +0000 Subject: [PATCH 1/6] find and cleanup old index and alias --- portality/dao.py | 33 +++++++++++++++++++----- portality/scripts/anon_import.py | 44 +++++++++++++++++++++++++++----- 2 files changed, 65 insertions(+), 12 deletions(-) diff --git a/portality/dao.py b/portality/dao.py index 80635d714a..b0a63f7da4 100644 --- a/portality/dao.py +++ b/portality/dao.py @@ -1,15 +1,17 @@ -import time +from __future__ import annotations + +import json import re import sys -import uuid -import json -import elasticsearch +import time import urllib.parse - +import uuid from collections import UserDict from copy import deepcopy from datetime import timedelta -from typing import List +from typing import List, Iterable, Tuple + +import elasticsearch from portality.core import app, es_connection as ES from portality.lib import dates @@ -964,6 +966,25 @@ def refresh(): return ES.indices.refresh() +def find_indexes_by_prefix(index_prefix) -> list[str]: + data = ES.indices.get(f'{index_prefix}*') + return list(data.keys()) + + +def find_index_aliases(alias_prefixes=None) -> Iterable[Tuple[str, str]]: + def _yield_index_alias(): + data = ES.indices.get_alias() + for index, d in data.items(): + for alias in d['aliases'].keys(): + yield index, alias + + index_aliases = _yield_index_alias() + if alias_prefixes: + index_aliases = ((index, alias) for index, alias in index_aliases + if any(alias.startswith(p) for p in alias_prefixes)) + return index_aliases + + class BlockTimeOutException(Exception): pass diff --git a/portality/scripts/anon_import.py b/portality/scripts/anon_import.py index 5740a9fb29..d754f8ae7d 100644 --- a/portality/scripts/anon_import.py +++ b/portality/scripts/anon_import.py @@ -13,11 +13,21 @@ DOAJENV=test python portality/scripts/anon_import.py data_import_settings/test_server.json """ -import esprit, json, gzip, shutil, elasticsearch +from __future__ import annotations + +import gzip +import itertools +import json +import shutil + +import elasticsearch +import esprit + +from doajtest.helpers import patch_config +from portality import dao from portality.core import app, es_connection, initialise_index from portality.store import StoreFactory from portality.util import ipt_prefix -from doajtest.helpers import patch_config # FIXME: monkey patch for esprit.bulk (but esprit's chunking is handy) @@ -37,7 +47,6 @@ def es_bulk(connection, data, type=""): def do_import(config): - # filter for the types we are going to work with import_types = {} for t, s in config.get("types", {}).items(): @@ -50,16 +59,38 @@ def do_import(config): print(("{x} from {y}".format(x=count, y=import_type))) print("\n") + toberemoved_index_prefixes = [ + es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type) + for import_type in import_types.keys() + ] + toberemoved_indexes = list(itertools.chain.from_iterable( + dao.find_indexes_by_prefix(p) for p in toberemoved_index_prefixes + )) + toberemoved_index_aliases = list(dao.find_index_aliases(toberemoved_index_prefixes)) + + print("==Removing the following indexes==") + print(' {}'.format(', '.join(toberemoved_indexes))) + print("==Removing the following aliases==") + print(' {}'.format(', '.join(alias for _, alias in toberemoved_index_aliases))) if config.get("confirm", True): text = input("Continue? [y/N] ") if text.lower() != "y": exit() # remove all the types that we are going to import - for import_type in list(import_types.keys()): + for index in toberemoved_indexes: try: - if es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type): - es_connection.indices.delete(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type) + if es_connection.indices.exists(index): + print("Deleting index: {}".format(index)) + es_connection.indices.delete(index) + except elasticsearch.exceptions.NotFoundError: + pass + + for index, alias in toberemoved_index_aliases: + try: + if es_connection.indices.exists_alias(index, alias): + print("Deleting alias: {} -> {}".format(index, alias)) + es_connection.indices.delete_alias(index, alias) except elasticsearch.exceptions.NotFoundError: pass @@ -117,6 +148,7 @@ def do_import(config): if __name__ == '__main__': import argparse + parser = argparse.ArgumentParser() parser.add_argument("config", help="Config file for import run, e.g dev_basics.json") From 41e2d74868998bac5abf90c4cb76cbad81054028 Mon Sep 17 00:00:00 2001 From: philip Date: Wed, 20 Nov 2024 09:27:35 +0000 Subject: [PATCH 2/6] support parameter str_format --- portality/lib/dates.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/portality/lib/dates.py b/portality/lib/dates.py index 52f6b0a809..775cb7d6b8 100644 --- a/portality/lib/dates.py +++ b/portality/lib/dates.py @@ -92,8 +92,8 @@ def now_str_with_microseconds() -> str: return format(now(), format=FMT_DATETIME_MS_STD) -def today() -> str: - return format(now(), format=FMT_DATE_STD) +def today(str_format=FMT_DATE_STD) -> str: + return format(now(), format=str_format) def random_date(fro: datetime = None, to: datetime = None) -> str: From 13c0d24bd751c075ffd24553a7db8ec6390b170f Mon Sep 17 00:00:00 2001 From: philip Date: Wed, 20 Nov 2024 09:29:59 +0000 Subject: [PATCH 3/6] create instance indexes and aliases --- portality/scripts/anon_import.py | 82 ++++++++++++++++++++++---------- 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/portality/scripts/anon_import.py b/portality/scripts/anon_import.py index d754f8ae7d..4924479df5 100644 --- a/portality/scripts/anon_import.py +++ b/portality/scripts/anon_import.py @@ -19,13 +19,15 @@ import itertools import json import shutil +from dataclasses import dataclass +from time import sleep -import elasticsearch import esprit from doajtest.helpers import patch_config from portality import dao -from portality.core import app, es_connection, initialise_index +from portality.core import app, es_connection +from portality.lib import dates, es_data_mapping from portality.store import StoreFactory from portality.util import ipt_prefix @@ -46,6 +48,13 @@ def es_bulk(connection, data, type=""): return Resp(status_code=500, text=str(e)) +@dataclass +class IndexDetail: + index_type: str + instance_name: str + alias_name: str + + def do_import(config): # filter for the types we are going to work with import_types = {} @@ -59,19 +68,21 @@ def do_import(config): print(("{x} from {y}".format(x=count, y=import_type))) print("\n") - toberemoved_index_prefixes = [ - es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type) - for import_type in import_types.keys() - ] + toberemoved_index_prefixes = [ipt_prefix(import_type) for import_type in import_types.keys()] toberemoved_indexes = list(itertools.chain.from_iterable( dao.find_indexes_by_prefix(p) for p in toberemoved_index_prefixes )) toberemoved_index_aliases = list(dao.find_index_aliases(toberemoved_index_prefixes)) - print("==Removing the following indexes==") - print(' {}'.format(', '.join(toberemoved_indexes))) - print("==Removing the following aliases==") - print(' {}'.format(', '.join(alias for _, alias in toberemoved_index_aliases))) + if toberemoved_indexes: + print("==Removing the following indexes==") + print(' {}'.format(', '.join(toberemoved_indexes))) + print() + if toberemoved_index_aliases: + print("==Removing the following aliases==") + print(' {}'.format(', '.join(alias for _, alias in toberemoved_index_aliases))) + print() + if config.get("confirm", True): text = input("Continue? [y/N] ") if text.lower() != "y": @@ -79,24 +90,32 @@ def do_import(config): # remove all the types that we are going to import for index in toberemoved_indexes: - try: - if es_connection.indices.exists(index): - print("Deleting index: {}".format(index)) - es_connection.indices.delete(index) - except elasticsearch.exceptions.NotFoundError: - pass + if es_connection.indices.exists(index): + print("Deleting index: {}".format(index)) + es_connection.indices.delete(index, ignore=[404]) for index, alias in toberemoved_index_aliases: - try: - if es_connection.indices.exists_alias(index, alias): - print("Deleting alias: {} -> {}".format(index, alias)) - es_connection.indices.delete_alias(index, alias) - except elasticsearch.exceptions.NotFoundError: - pass + if es_connection.indices.exists_alias(alias, index=index): + print("Deleting alias: {} -> {}".format(index, alias)) + es_connection.indices.delete_alias(index, alias, ignore=[404]) + + index_details = {} + for import_type in import_types.keys(): + alias_name = ipt_prefix(import_type) + index_details[import_type] = IndexDetail( + index_type=import_type, + instance_name=alias_name + '-{}'.format(dates.today(dates.FMT_DATE_SHORT)), + alias_name=alias_name + ) # re-initialise the index (sorting out mappings, etc) print("==Initialising Index for Mappings==") - initialise_index(app, es_connection) + mappings = es_data_mapping.get_mappings(app) + for index_detail in index_details.values(): + print("Initialising index: {}".format(index_detail.instance_name)) + es_connection.indices.create(index=index_detail.instance_name, + body=mappings[index_detail.index_type], + request_timeout=app.config.get("ES_SOCKET_TIMEOUT", None)) mainStore = StoreFactory.get("anon_data") tempStore = StoreFactory.tmp() @@ -128,8 +147,9 @@ def do_import(config): shutil.copyfileobj(f_in, f_out) tempStore.delete_file(container, filename + ".gz") - print(("Importing from {x}".format(x=filename))) - imported_count = esprit.tasks.bulk_load(es_connection, ipt_prefix(import_type), uncompressed_file, + instance_index_name = index_details[import_type].instance_name + print("Importing from {x} to index[{index}]".format(x=filename, index=instance_index_name)) + imported_count = esprit.tasks.bulk_load(es_connection, instance_index_name, uncompressed_file, limit=limit, max_content_length=config.get("max_content_length", 100000000)) tempStore.delete_file(container, filename) @@ -144,6 +164,18 @@ def do_import(config): # once we've finished importing, clean up by deleting the entire temporary container tempStore.delete_container(container) + # create aliases for the indexes + print("\n==Creating Aliases==") + for index_detail in index_details.values(): + for retry in range(5): + if not es_connection.indices.exists(index_detail.alias_name): + break + print(f"Old alias exists, waiting for it to be removed, alias[{index_detail.alias_name}] retry[{retry}]") + sleep(5) + + print("Creating alias: {:<30} -> {}".format(index_detail.instance_name, index_detail.alias_name)) + es_connection.indices.put_alias(index=index_detail.instance_name, name=index_detail.alias_name) + if __name__ == '__main__': From 04e9adc4234586091d214b39d79f92e1e52667d1 Mon Sep 17 00:00:00 2001 From: philip Date: Wed, 27 Nov 2024 15:37:17 +0000 Subject: [PATCH 4/6] create alias immediately --- portality/scripts/anon_import.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/portality/scripts/anon_import.py b/portality/scripts/anon_import.py index 4924479df5..1887d2992a 100644 --- a/portality/scripts/anon_import.py +++ b/portality/scripts/anon_import.py @@ -109,7 +109,7 @@ def do_import(config): ) # re-initialise the index (sorting out mappings, etc) - print("==Initialising Index for Mappings==") + print("==Initialising Index Mappings and alias ==") mappings = es_data_mapping.get_mappings(app) for index_detail in index_details.values(): print("Initialising index: {}".format(index_detail.instance_name)) @@ -117,6 +117,10 @@ def do_import(config): body=mappings[index_detail.index_type], request_timeout=app.config.get("ES_SOCKET_TIMEOUT", None)) + print("Creating alias: {:<25} -> {}".format(index_detail.instance_name, index_detail.alias_name)) + blocking_if_indices_exist(index_detail.alias_name) + es_connection.indices.put_alias(index=index_detail.instance_name, name=index_detail.alias_name) + mainStore = StoreFactory.get("anon_data") tempStore = StoreFactory.tmp() container = app.config.get("STORE_ANON_DATA_CONTAINER") @@ -164,17 +168,13 @@ def do_import(config): # once we've finished importing, clean up by deleting the entire temporary container tempStore.delete_container(container) - # create aliases for the indexes - print("\n==Creating Aliases==") - for index_detail in index_details.values(): - for retry in range(5): - if not es_connection.indices.exists(index_detail.alias_name): - break - print(f"Old alias exists, waiting for it to be removed, alias[{index_detail.alias_name}] retry[{retry}]") - sleep(5) - print("Creating alias: {:<30} -> {}".format(index_detail.instance_name, index_detail.alias_name)) - es_connection.indices.put_alias(index=index_detail.instance_name, name=index_detail.alias_name) +def blocking_if_indices_exist(index_name): + for retry in range(5): + if not es_connection.indices.exists(index_name): + break + print(f"Old alias exists, waiting for it to be removed, alias[{index_name}] retry[{retry}]...") + sleep(5) if __name__ == '__main__': From bf7b2d92b250e2199ab6bab56718dbf240a05d3b Mon Sep 17 00:00:00 2001 From: philip Date: Wed, 27 Nov 2024 16:31:52 +0000 Subject: [PATCH 5/6] fix index variable --- portality/dao.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/portality/dao.py b/portality/dao.py index 74facf0a03..176a741486 100644 --- a/portality/dao.py +++ b/portality/dao.py @@ -728,7 +728,7 @@ def dump(cls, q=None, page_size=1000, limit=None, out=None, out_template=None, o @classmethod def bulk_load_from_file(cls, source_file, index=None, limit=None, max_content_length=100000000): """ ported from esprit.tasks - bulk load to index from file """ - index = cls.index_name() + index = index or cls.index_name() source_size = os.path.getsize(source_file) with open(source_file, "r") as f: From 3802a5b27a3c3674172c1cf56b91edf7351daaa8 Mon Sep 17 00:00:00 2001 From: philip Date: Thu, 28 Nov 2024 09:33:48 +0000 Subject: [PATCH 6/6] add condition to avoid delete wrong index --- portality/scripts/anon_import.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/portality/scripts/anon_import.py b/portality/scripts/anon_import.py index edb88d798b..33bc914fc9 100644 --- a/portality/scripts/anon_import.py +++ b/portality/scripts/anon_import.py @@ -18,6 +18,7 @@ import gzip import itertools import json +import re import shutil from dataclasses import dataclass from time import sleep @@ -39,6 +40,12 @@ class IndexDetail: alias_name: str +def find_toberemoved_indexes(prefix): + for index in portality.dao.find_indexes_by_prefix(prefix): + if index == prefix or re.match(rf"{prefix}-\d+", index): + yield index + + def do_import(config): # filter for the types we are going to work with import_types = {} @@ -53,9 +60,9 @@ def do_import(config): print("\n") toberemoved_index_prefixes = [ipt_prefix(import_type) for import_type in import_types.keys()] - toberemoved_indexes = list(itertools.chain.from_iterable( - portality.dao.find_indexes_by_prefix(p) for p in toberemoved_index_prefixes - )) + toberemoved_indexes = itertools.chain.from_iterable( + find_toberemoved_indexes(p) for p in toberemoved_index_prefixes + ) toberemoved_index_aliases = list(portality.dao.find_index_aliases(toberemoved_index_prefixes)) if toberemoved_indexes: