Skip to content

Commit

Permalink
Merge pull request #2433 from DOAJ/feature/4016_anon_import_aliases
Browse files Browse the repository at this point in the history
Feature/4016 anon import aliases
  • Loading branch information
Steven-Eardley authored Nov 28, 2024
2 parents 7cb9797 + 3802a5b commit 920a70d
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 27 deletions.
40 changes: 31 additions & 9 deletions portality/dao.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import time
from __future__ import annotations

import json
import re
import os
import sys
import uuid
import json
import elasticsearch
import time
import urllib.parse

import uuid
from collections import UserDict
from copy import deepcopy
from datetime import timedelta
from typing import List
from typing import List, Iterable, Tuple

import elasticsearch

from portality.core import app, es_connection as ES
from portality.lib import dates
Expand Down Expand Up @@ -724,15 +726,16 @@ def dump(cls, q=None, page_size=1000, limit=None, out=None, out_template=None, o
return filenames

@classmethod
def bulk_load_from_file(cls, source_file, limit=None, max_content_length=100000000):
def bulk_load_from_file(cls, source_file, index=None, limit=None, max_content_length=100000000):
""" ported from esprit.tasks - bulk load to index from file """
index = index or cls.index_name()

source_size = os.path.getsize(source_file)
with open(source_file, "r") as f:
if limit is None and source_size < max_content_length:
# if we aren't selecting a portion of the file, and the file is below the max content length, then
# we can just serve it directly
ES.bulk(body=f.read(), index=cls.index_name(), doc_type=cls.doc_type(), request_timeout=120)
ES.bulk(body=f.read(), index=index, doc_type=cls.doc_type(), request_timeout=120)
return -1
else:
count = 0
Expand All @@ -755,7 +758,7 @@ def bulk_load_from_file(cls, source_file, limit=None, max_content_length=1000000
else:
count += records

ES.bulk(body=chunk, index=cls.index_name(), doc_type=cls.doc_type(), request_timeout=120)
ES.bulk(body=chunk, index=index, doc_type=cls.doc_type(), request_timeout=120)
if finished:
break
if limit is not None:
Expand Down Expand Up @@ -1065,6 +1068,25 @@ def refresh():
return ES.indices.refresh()


def find_indexes_by_prefix(index_prefix) -> list[str]:
data = ES.indices.get(f'{index_prefix}*')
return list(data.keys())


def find_index_aliases(alias_prefixes=None) -> Iterable[Tuple[str, str]]:
def _yield_index_alias():
data = ES.indices.get_alias()
for index, d in data.items():
for alias in d['aliases'].keys():
yield index, alias

index_aliases = _yield_index_alias()
if alias_prefixes:
index_aliases = ((index, alias) for index, alias in index_aliases
if any(alias.startswith(p) for p in alias_prefixes))
return index_aliases


class BlockTimeOutException(Exception):
pass

Expand Down
4 changes: 2 additions & 2 deletions portality/lib/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def now_str_with_microseconds() -> str:
return format(now(), format=FMT_DATETIME_MS_STD)


def today() -> str:
return format(now(), format=FMT_DATE_STD)
def today(str_format=FMT_DATE_STD) -> str:
return format(now(), format=str_format)


def random_date(fro: datetime = None, to: datetime = None) -> str:
Expand Down
103 changes: 87 additions & 16 deletions portality/scripts/anon_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,40 @@
DOAJENV=test python portality/scripts/anon_import.py data_import_settings/test_server.json
"""

import json, gzip, shutil, elasticsearch
from portality.core import app, es_connection, initialise_index
from portality.store import StoreFactory
from portality.dao import DomainObject
from portality import models
from __future__ import annotations

import gzip
import itertools
import json
import re
import shutil
from dataclasses import dataclass
from time import sleep

import portality.dao
from doajtest.helpers import patch_config
from portality import models
from portality.core import app, es_connection
from portality.dao import DomainObject
from portality.lib import dates, es_data_mapping
from portality.store import StoreFactory
from portality.util import ipt_prefix


def do_import(config):
@dataclass
class IndexDetail:
index_type: str
instance_name: str
alias_name: str


def find_toberemoved_indexes(prefix):
for index in portality.dao.find_indexes_by_prefix(prefix):
if index == prefix or re.match(rf"{prefix}-\d+", index):
yield index


def do_import(config):
# filter for the types we are going to work with
import_types = {}
for t, s in config.get("types", {}).items():
Expand All @@ -35,22 +59,58 @@ def do_import(config):
print(("{x} from {y}".format(x=count, y=import_type)))
print("\n")

toberemoved_index_prefixes = [ipt_prefix(import_type) for import_type in import_types.keys()]
toberemoved_indexes = itertools.chain.from_iterable(
find_toberemoved_indexes(p) for p in toberemoved_index_prefixes
)
toberemoved_index_aliases = list(portality.dao.find_index_aliases(toberemoved_index_prefixes))

if toberemoved_indexes:
print("==Removing the following indexes==")
print(' {}'.format(', '.join(toberemoved_indexes)))
print()
if toberemoved_index_aliases:
print("==Removing the following aliases==")
print(' {}'.format(', '.join(alias for _, alias in toberemoved_index_aliases)))
print()

if config.get("confirm", True):
text = input("Continue? [y/N] ")
if text.lower() != "y":
exit()

# remove all the types that we are going to import
for import_type in list(import_types.keys()):
try:
if es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type):
es_connection.indices.delete(app.config['ELASTIC_SEARCH_DB_PREFIX'] + import_type)
except elasticsearch.exceptions.NotFoundError:
pass
for index in toberemoved_indexes:
if es_connection.indices.exists(index):
print("Deleting index: {}".format(index))
es_connection.indices.delete(index, ignore=[404])

for index, alias in toberemoved_index_aliases:
if es_connection.indices.exists_alias(alias, index=index):
print("Deleting alias: {} -> {}".format(index, alias))
es_connection.indices.delete_alias(index, alias, ignore=[404])

index_details = {}
for import_type in import_types.keys():
alias_name = ipt_prefix(import_type)
index_details[import_type] = IndexDetail(
index_type=import_type,
instance_name=alias_name + '-{}'.format(dates.today(dates.FMT_DATE_SHORT)),
alias_name=alias_name
)

# re-initialise the index (sorting out mappings, etc)
print("==Initialising Index for Mappings==")
initialise_index(app, es_connection)
print("==Initialising Index Mappings and alias ==")
mappings = es_data_mapping.get_mappings(app)
for index_detail in index_details.values():
print("Initialising index: {}".format(index_detail.instance_name))
es_connection.indices.create(index=index_detail.instance_name,
body=mappings[index_detail.index_type],
request_timeout=app.config.get("ES_SOCKET_TIMEOUT", None))

print("Creating alias: {:<25} -> {}".format(index_detail.instance_name, index_detail.alias_name))
blocking_if_indices_exist(index_detail.alias_name)
es_connection.indices.put_alias(index=index_detail.instance_name, name=index_detail.alias_name)

mainStore = StoreFactory.get("anon_data")
tempStore = StoreFactory.tmp()
Expand Down Expand Up @@ -85,10 +145,12 @@ def do_import(config):
shutil.copyfileobj(f_in, f_out)
tempStore.delete_file(container, filename + ".gz")

print(("Importing from {x}".format(x=filename)))
instance_index_name = index_details[import_type].instance_name
print("Importing from {x} to index[{index}]".format(x=filename, index=instance_index_name))

imported_count = dao.bulk_load_from_file(uncompressed_file,
limit=limit, max_content_length=config.get("max_content_length", 100000000))
index=instance_index_name, limit=limit,
max_content_length=config.get("max_content_length", 100000000))
tempStore.delete_file(container, filename)

if limit is not None and imported_count != -1:
Expand All @@ -105,9 +167,18 @@ def do_import(config):
tempStore.delete_container(container)


def blocking_if_indices_exist(index_name):
for retry in range(5):
if not es_connection.indices.exists(index_name):
break
print(f"Old alias exists, waiting for it to be removed, alias[{index_name}] retry[{retry}]...")
sleep(5)


if __name__ == '__main__':

import argparse

parser = argparse.ArgumentParser()

parser.add_argument("config", help="Config file for import run, e.g dev_basics.json")
Expand Down

0 comments on commit 920a70d

Please sign in to comment.