Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrator: detach receivers during migration #2711

Merged
merged 2 commits into from
Sep 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions inspirehep/modules/migrator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@

from __future__ import absolute_import, division, print_function

import logging
import os

import click
import requests
from flask_sqlalchemy import models_committed

from inspirehep.modules.records.receivers import receive_after_model_commit

from .tasks import (
add_citation_counts,
Expand All @@ -45,15 +41,6 @@
@click.group()
def migrator():
"""Command related to migrating INSPIRE data."""
logging.basicConfig()
# Disable auto-indexing receiver in migration tasks
models_committed.disconnect(receive_after_model_commit)


@migrator.resultcallback()
def process_result(result, **kwargs):
"""Callback run after migrator commands."""
models_committed.connect(receive_after_model_commit)


@migrator.command()
Expand Down
10 changes: 9 additions & 1 deletion inspirehep/modules/migrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import gzip
import re
import zlib

from collections import Counter
from itertools import chain

Expand All @@ -37,12 +36,14 @@
from elasticsearch.helpers import bulk as es_bulk
from elasticsearch.helpers import scan as es_scan
from flask import current_app, url_for
from flask_sqlalchemy import models_committed
from jsonschema import ValidationError
from redis import StrictRedis
from redis_lock import Lock
from six import text_type

from dojson.contrib.marc21.utils import create_record as marc_create_record
from invenio_collections import current_collections
from invenio_db import db
from invenio_indexer.api import RecordIndexer, current_record_to_index
from invenio_pidstore.errors import PIDDoesNotExistError
Expand All @@ -58,6 +59,7 @@
from inspirehep.modules.pidstore.minters import inspire_recid_minter
from inspirehep.modules.pidstore.utils import get_pid_type_from_schema
from inspirehep.modules.records.api import InspireRecord
from inspirehep.modules.records.receivers import receive_after_model_commit

from .models import InspireProdRecords

Expand Down Expand Up @@ -182,6 +184,9 @@ def create_index_op(record):

@shared_task(ignore_result=False, compress='zlib', acks_late=True)
def migrate_chunk(chunk):
models_committed.disconnect(receive_after_model_commit)
current_collections.unregister_signals()

index_queue = []

try:
Expand All @@ -202,6 +207,9 @@ def migrate_chunk(chunk):
request_timeout=req_timeout,
)

models_committed.connect(receive_after_model_commit)
current_collections.register_signals()


@shared_task()
def add_citation_counts(chunk_size=500, request_timeout=120):
Expand Down