From f06baf13ca0aa37e7d6aef9c851a2791861c34e1 Mon Sep 17 00:00:00 2001 From: Jacopo Notarstefano Date: Sun, 3 Sep 2017 17:51:52 +0200 Subject: [PATCH 1/2] migrator: don't index records just after commit The solution attempted in 16e9cb9 didn't work as that receiver was detached by the Python interpreter executing the CLI command, not the one executing the task code in the worker. Signed-off-by: Jacopo Notarstefano --- inspirehep/modules/migrator/cli.py | 13 ------------- inspirehep/modules/migrator/tasks.py | 7 ++++++- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/inspirehep/modules/migrator/cli.py b/inspirehep/modules/migrator/cli.py index 449ab811b9..b3b8a102f8 100644 --- a/inspirehep/modules/migrator/cli.py +++ b/inspirehep/modules/migrator/cli.py @@ -24,14 +24,10 @@ from __future__ import absolute_import, division, print_function -import logging import os import click import requests -from flask_sqlalchemy import models_committed - -from inspirehep.modules.records.receivers import receive_after_model_commit from .tasks import ( add_citation_counts, @@ -45,15 +41,6 @@ @click.group() def migrator(): """Command related to migrating INSPIRE data.""" - logging.basicConfig() - # Disable auto-indexing receiver in migration tasks - models_committed.disconnect(receive_after_model_commit) - - -@migrator.resultcallback() -def process_result(result, **kwargs): - """Callback run after migrator commands.""" - models_committed.connect(receive_after_model_commit) @migrator.command() diff --git a/inspirehep/modules/migrator/tasks.py b/inspirehep/modules/migrator/tasks.py index 50a6049839..13935e9665 100644 --- a/inspirehep/modules/migrator/tasks.py +++ b/inspirehep/modules/migrator/tasks.py @@ -27,7 +27,6 @@ import gzip import re import zlib - from collections import Counter from itertools import chain @@ -37,6 +36,7 @@ from elasticsearch.helpers import bulk as es_bulk from elasticsearch.helpers import scan as es_scan from flask import current_app, url_for +from flask_sqlalchemy import models_committed from jsonschema import ValidationError from redis import StrictRedis from redis_lock import Lock @@ -58,6 +58,7 @@ from inspirehep.modules.pidstore.minters import inspire_recid_minter from inspirehep.modules.pidstore.utils import get_pid_type_from_schema from inspirehep.modules.records.api import InspireRecord +from inspirehep.modules.records.receivers import receive_after_model_commit from .models import InspireProdRecords @@ -182,6 +183,8 @@ def create_index_op(record): @shared_task(ignore_result=False, compress='zlib', acks_late=True) def migrate_chunk(chunk): + models_committed.disconnect(receive_after_model_commit) + index_queue = [] try: @@ -202,6 +205,8 @@ def migrate_chunk(chunk): request_timeout=req_timeout, ) + models_committed.connect(receive_after_model_commit) + @shared_task() def add_citation_counts(chunk_size=500, request_timeout=120): From 45f4a6899fef2ebf9021b2cb0f9a8f5d6cf52ffc Mon Sep 17 00:00:00 2001 From: Jacopo Notarstefano Date: Sun, 3 Sep 2017 17:54:38 +0200 Subject: [PATCH 2/2] migrator: don't compute collections when migrating Fixes the performance issue caused by invenio-collections by detaching its receiver during migration. Its work, in fact, is already handled by inspire-dojson since inspirehep/inspire-dojson#62. Signed-off-by: Jacopo Notarstefano --- inspirehep/modules/migrator/tasks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/inspirehep/modules/migrator/tasks.py b/inspirehep/modules/migrator/tasks.py index 13935e9665..9f066f271f 100644 --- a/inspirehep/modules/migrator/tasks.py +++ b/inspirehep/modules/migrator/tasks.py @@ -43,6 +43,7 @@ from six import text_type from dojson.contrib.marc21.utils import create_record as marc_create_record +from invenio_collections import current_collections from invenio_db import db from invenio_indexer.api import RecordIndexer, current_record_to_index from invenio_pidstore.errors import PIDDoesNotExistError @@ -184,6 +185,7 @@ def create_index_op(record): @shared_task(ignore_result=False, compress='zlib', acks_late=True) def migrate_chunk(chunk): models_committed.disconnect(receive_after_model_commit) + current_collections.unregister_signals() index_queue = [] @@ -206,6 +208,7 @@ def migrate_chunk(chunk): ) models_committed.connect(receive_after_model_commit) + current_collections.register_signals() @shared_task()