diff --git a/inspirehep/modules/migrator/cli.py b/inspirehep/modules/migrator/cli.py index 449ab811b9..b3b8a102f8 100644 --- a/inspirehep/modules/migrator/cli.py +++ b/inspirehep/modules/migrator/cli.py @@ -24,14 +24,10 @@ from __future__ import absolute_import, division, print_function -import logging import os import click import requests -from flask_sqlalchemy import models_committed - -from inspirehep.modules.records.receivers import receive_after_model_commit from .tasks import ( add_citation_counts, @@ -45,15 +41,6 @@ @click.group() def migrator(): """Command related to migrating INSPIRE data.""" - logging.basicConfig() - # Disable auto-indexing receiver in migration tasks - models_committed.disconnect(receive_after_model_commit) - - -@migrator.resultcallback() -def process_result(result, **kwargs): - """Callback run after migrator commands.""" - models_committed.connect(receive_after_model_commit) @migrator.command() diff --git a/inspirehep/modules/migrator/tasks.py b/inspirehep/modules/migrator/tasks.py index 50a6049839..9f066f271f 100644 --- a/inspirehep/modules/migrator/tasks.py +++ b/inspirehep/modules/migrator/tasks.py @@ -27,7 +27,6 @@ import gzip import re import zlib - from collections import Counter from itertools import chain @@ -37,12 +36,14 @@ from elasticsearch.helpers import bulk as es_bulk from elasticsearch.helpers import scan as es_scan from flask import current_app, url_for +from flask_sqlalchemy import models_committed from jsonschema import ValidationError from redis import StrictRedis from redis_lock import Lock from six import text_type from dojson.contrib.marc21.utils import create_record as marc_create_record +from invenio_collections import current_collections from invenio_db import db from invenio_indexer.api import RecordIndexer, current_record_to_index from invenio_pidstore.errors import PIDDoesNotExistError @@ -58,6 +59,7 @@ from inspirehep.modules.pidstore.minters import inspire_recid_minter from inspirehep.modules.pidstore.utils import get_pid_type_from_schema from inspirehep.modules.records.api import InspireRecord +from inspirehep.modules.records.receivers import receive_after_model_commit from .models import InspireProdRecords @@ -182,6 +184,9 @@ def create_index_op(record): @shared_task(ignore_result=False, compress='zlib', acks_late=True) def migrate_chunk(chunk): + models_committed.disconnect(receive_after_model_commit) + current_collections.unregister_signals() + index_queue = [] try: @@ -202,6 +207,9 @@ def migrate_chunk(chunk): request_timeout=req_timeout, ) + models_committed.connect(receive_after_model_commit) + current_collections.register_signals() + @shared_task() def add_citation_counts(chunk_size=500, request_timeout=120):