Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration integration #541

Merged
merged 35 commits into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ebc49ff
migrator: add gh actions test data
Sep 20, 2023
92361a0
migrator: add gh hook enable/disable actions
Sep 20, 2023
9917595
migrator: add gh release reception and update actions
Sep 21, 2023
0be7144
migrator: transform dates in github actions
Sep 21, 2023
6801db7
migrator: gh release process action
Sep 21, 2023
3373e1f
migrator: load json fields in gh actions
Sep 22, 2023
3ecef8e
migrator: fix gh release process action
Sep 22, 2023
62e1cb3
migrator: oauth2server client transform bugfix
Sep 19, 2023
27055d9
migrator: oautch client linked applications, dis/connect orcid
Sep 19, 2023
33df2a5
migrator: add gh disconnect actions
Sep 20, 2023
47fda00
migrator: add support for gh connect action
Sep 20, 2023
638095d
migrator: transform dates in oauth actions
Sep 21, 2023
aa2cdc7
migrator: load json fields in oauth actions
Sep 22, 2023
e3ad9af
migrator: fix permission_flags for parent
slint Sep 20, 2023
f82a5cb
migrator: use orsjon in vocabulary dump scripts
slint Sep 21, 2023
14cea53
migrator: use binary dumps for OAuthClient
slint Sep 21, 2023
1fa9cfa
migrator: fix deleted records transformation
slint Sep 21, 2023
2f2af36
migrator: fix scripts resilience
slint Sep 21, 2023
e53ab9b
extract: yield Kafka transactions in commit LSN order
slint Sep 22, 2023
b887440
migrator: add logging to Kafka extract
slint Sep 22, 2023
c680577
extract: add better offsets support
slint Sep 22, 2023
d3905a3
migrator: add ignored actions
slint Sep 22, 2023
9514268
migrator: reorganize action imports
slint Sep 22, 2023
672a809
migrator: set default communities review policy
slint Sep 22, 2023
d4a46fb
migrator: fix access conditions for parents
slint Sep 22, 2023
cabb1f0
migrator: additional Kafka extract logging
slint Sep 23, 2023
f9dc981
migrator: add missing GitHub action and fix matching
slint Sep 23, 2023
c95362a
migrator: update "partial" Entry parsing
slint Sep 23, 2023
675f00f
migrator: better transaction yielding for Kafka extract
slint Sep 23, 2023
8c485dc
migrator: adjust partial entries for parent and record
slint Sep 23, 2023
b3454a0
migrator: update ignored actions
slint Sep 23, 2023
339fa8f
migrator: less verbose Kafka logging
slint Sep 23, 2023
b562fb6
migrator: fix optional user "active" key
slint Sep 23, 2023
10eec02
migrator: update ignored actions
slint Sep 23, 2023
7e62fd7
migrator: include Kafka offset in TxState
slint Sep 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion migrator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,5 +228,5 @@ psql $DB_URI -f scripts/webhook_events_dump.sql > "dumps/webhook_events-$(date -
# GitHub repositories
psql $DB_URI -f scripts/github_repositories_dump.sql > "dumps/github_repositories-$(date -I).csv"
# GitHub releases
psql $DB_URI -f scripts/github_releases_dump.sql > "dumps/github_releases-$(date -I).jsonl"
psql $DB_URI -f scripts/github_releases_dump.sql | sed 's/\\\\/\\/g' > "dumps/github_releases-$(date -I).jsonl"
```
19 changes: 8 additions & 11 deletions migrator/migrate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ invenio communities custom-fields init
# Script base path
script_path=$( cd -- "$(dirname "$0")" && pwd )

# Backup FK/PK/unique constraints
DB_URI="postgresql://zenodo:zenodo@localhost/zenodo"
LEGACY_DB_URI="service=zenodo-legacy"
DB_URI="service=zenodo-dev"

# Backup FK/PK/unique constraints
psql $DB_URI --tuples-only --quiet -f scripts/gen_create_constraints.sql > scripts/create_constraints.sql
psql $DB_URI --tuples-only --quiet -f scripts/gen_delete_constraints.sql > scripts/delete_constraints.sql

Expand All @@ -51,6 +52,11 @@ python -m zenodo_rdm_migrator "streams.yaml"
# TODO: These should be fixed in the legacy/source
# Apply various consistency fixes

# Import OAuthclient models
psql $DB_URI -f scripts/oauthclient_remoteaccount_dump.sql > "dumps/oauthclient_remoteaccount.bin"
psql $DB_URI -f scripts/oauthclient_remotetoken_dump.sql > "dumps/oauthclient_remotetoken.bin"
pv dumps/oauthclient_remoteaccount.bin | psql $DB_URI -c "COPY oauthclient_remoteaccount (id, user_id, client_id, extra_data, created, updated) FROM STDIN (FORMAT binary);"
pv dumps/oauthclient_remotetoken.bin | psql $DB_URI -c "COPY oauthclient_remotetoken (id_remote_account, token_type, access_token, secret, created, updated) FROM STDIN (FORMAT binary);"

# Restore FK/PK/unique constraints and indices
psql $DB_URI -f scripts/create_constraints.sql
Expand All @@ -64,15 +70,6 @@ psql $DB_URI -f scripts/create_missing_indices.sql

# Fixtures
invenio rdm-records fixtures
invenio vocabularies import -v names -f $script_path/app_data/vocabularies-future.yaml # zenodo specific names

# TODO: Load these via regular migration streams
# Load awards via COPY
pv awards.csv | psql $DB_URI -c "COPY award_metadata (id, pid, json, created, updated, version_id) FROM STDIN (FORMAT csv);"

# Truncate any previous funders (e.g. from fixtures), and load funders via copy
psql $DB_URI -c "truncate funder_metadata"
pv funders.csv | psql $DB_URI -c "COPY funder_metadata (id, pid, json, created, updated, version_id) FROM STDIN (FORMAT csv);"

# Reindex records and communities
invenio rdm-records rebuild-index
Expand Down
20 changes: 10 additions & 10 deletions migrator/scripts/create_missing_indices.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
CREATE INDEX idx_accounts_user_displayname ON accounts_user USING btree (displayname);
CREATE INDEX idx_communities_members_group_id ON communities_members USING btree (group_id);
CREATE INDEX idx_communities_members_user_id ON communities_members USING btree (user_id);
CREATE INDEX idx_communities_metadata_bucket_id ON communities_metadata USING btree (bucket_id);
CREATE INDEX idx_files_files_last_check ON files_files USING btree (last_check);
CREATE INDEX idx_pidstore_pid_pid_value ON pidstore_pid USING btree (pid_value);
CREATE INDEX idx_rdm_parents_community_request_id ON rdm_parents_community USING btree (request_id);
CREATE INDEX idx_rdm_records_files_record_id ON rdm_records_files USING btree (record_id);
CREATE INDEX idx_rdm_records_metadata_bucket_id ON rdm_records_metadata USING btree (bucket_id);
CREATE INDEX idx_rdm_versions_state_next_draft_id ON rdm_versions_state USING btree (next_draft_id);
CREATE INDEX IF NOT EXISTS idx_accounts_user_displayname ON accounts_user USING btree (displayname);
CREATE INDEX IF NOT EXISTS idx_communities_members_group_id ON communities_members USING btree (group_id);
CREATE INDEX IF NOT EXISTS idx_communities_members_user_id ON communities_members USING btree (user_id);
CREATE INDEX IF NOT EXISTS idx_communities_metadata_bucket_id ON communities_metadata USING btree (bucket_id);
CREATE INDEX IF NOT EXISTS idx_files_files_last_check ON files_files USING btree (last_check);
CREATE INDEX IF NOT EXISTS idx_pidstore_pid_pid_value ON pidstore_pid USING btree (pid_value);
CREATE INDEX IF NOT EXISTS idx_rdm_parents_community_request_id ON rdm_parents_community USING btree (request_id);
CREATE INDEX IF NOT EXISTS idx_rdm_records_files_record_id ON rdm_records_files USING btree (record_id);
CREATE INDEX IF NOT EXISTS idx_rdm_records_metadata_bucket_id ON rdm_records_metadata USING btree (bucket_id);
CREATE INDEX IF NOT EXISTS idx_rdm_versions_state_next_draft_id ON rdm_versions_state USING btree (next_draft_id);
5 changes: 4 additions & 1 deletion migrator/scripts/deleted_records_dump.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ COPY (
removal_json,
removal_date,
version_id,
recid,
transaction_id
) AS (
SELECT
r.id,
r.json as removal_json,
r.updated as removal_date,
r.version_id,
p.pid_value,
r.transaction_id
FROM
records_metadata_version as r
Expand All @@ -30,8 +32,9 @@ COPY (
r.id as id,
r.json as json,
r.created as created,
dr.removal_date as updated,
dr.version_id as version_id,
dr.recid as recid,
dr.removal_date as updated,
dr.removal_json as removal_json,
dr.removal_date as removal_date
FROM
Expand Down
6 changes: 3 additions & 3 deletions migrator/scripts/dump_affiliations_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import csv
import uuid

import orjson as json
import orjson
from idutils import normalize_ror
from invenio_rdm_migrator.utils import ts

Expand Down Expand Up @@ -59,7 +59,7 @@ def load_file(datafile, outpath):
with open(outpath, "w") as fout, open(datafile, "rb") as fp:
print(f"[{ts()}] loading {datafile}")
writer = csv.writer(fout)
entries = json.loads(fp.read())
entries = orjson.loads(fp.read())
for idx, data in enumerate(entries):
if idx % 1000 == 0:
print(f"[{ts()}] {idx}")
Expand All @@ -74,7 +74,7 @@ def load_file(datafile, outpath):
(
str(uuid.uuid4()), # id
affiliation_id, # pid
json.dumps(affiliation), # json
orjson.dumps(affiliation).decode("utf-8"), # json
creation_ts, # created
creation_ts, # updated (same as created)
1, # version_id
Expand Down
6 changes: 3 additions & 3 deletions migrator/scripts/dump_awards_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
"""
import csv
import gzip
import json
import uuid

import orjson
from invenio_rdm_migrator.utils import ts

DATA_PATHS = [
Expand Down Expand Up @@ -111,7 +111,7 @@ def load_files(file_paths, outpath):
if idx % 1000 == 0:
print(f"[{ts()}] {idx}")
try:
data = json.loads(line)
data = orjson.loads(line)
award = transform_openaire_grant(data)
if not award:
print(f"[{ts()}] Failed to transform line {idx}:\n{data}\n")
Expand All @@ -127,7 +127,7 @@ def load_files(file_paths, outpath):
(
str(uuid.uuid4()), # id
award_id, # pid
json.dumps(award), # json
orjson.dumps(award).decode("utf-8"), # json
creation_ts, # created
creation_ts, # updated (same as created)
1, # version_id
Expand Down
6 changes: 3 additions & 3 deletions migrator/scripts/dump_funders_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import csv
import uuid

import orjson as json
import orjson
from idutils import normalize_ror
from invenio_rdm_migrator.utils import ts

Expand Down Expand Up @@ -73,7 +73,7 @@ def load_file(datafile, outpath):
with open(outpath, "w") as fout, open(datafile, "rb") as fp:
print(f"[{ts()}] loading {datafile}")
writer = csv.writer(fout)
entries = json.loads(fp.read())
entries = orjson.loads(fp.read())
for idx, data in enumerate(entries):
if idx % 1000 == 0:
print(f"[{ts()}] {idx}")
Expand All @@ -88,7 +88,7 @@ def load_file(datafile, outpath):
(
str(uuid.uuid4()), # id
funder_id, # pid
json.dumps(funder), # json
orjson.dumps(funder).decode("utf-8"), # json
creation_ts, # created
creation_ts, # updated (same as created)
1, # version_id
Expand Down
2 changes: 1 addition & 1 deletion migrator/scripts/oauthclient_remoteaccount_dump.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ COPY (
created,
updated
FROM oauthclient_remoteaccount
) TO STDOUT WITH (FORMAT csv);
) TO STDOUT WITH (FORMAT binary);
2 changes: 1 addition & 1 deletion migrator/scripts/oauthclient_remotetoken_dump.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ COPY (
updated
FROM
oauthclient_remotetoken
) TO STDOUT WITH (FORMAT csv);
) TO STDOUT WITH (FORMAT binary);
1 change: 0 additions & 1 deletion migrator/scripts/update_sequences.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
SELECT setval(pg_get_serial_sequence('access_actionssystemroles', 'id'), COALESCE(max(id) + 1, 1), false) FROM access_actionssystemroles;
SELECT setval(pg_get_serial_sequence('accounts_role', 'id'), COALESCE(max(id) + 1, 1), false) FROM accounts_role;
SELECT setval(pg_get_serial_sequence('accounts_user', 'id'), COALESCE(max(id) + 1, 1), false) FROM accounts_user;
SELECT setval(pg_get_serial_sequence('banners', 'id'), COALESCE(max(id) + 1, 1), false) FROM banners;
SELECT setval(pg_get_serial_sequence('files_location', 'id'), COALESCE(max(id) + 1, 1), false) FROM files_location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from invenio_rdm_migrator.load.postgresql.transactions.operations import OperationType
from invenio_rdm_migrator.streams.actions import load

from zenodo_rdm_migrator.actions.transform import (
from zenodo_rdm_migrator.actions.transform.communities import (
CommunityCreateAction,
CommunityDeleteAction,
CommunityUpdateAction,
Expand Down
64 changes: 40 additions & 24 deletions migrator/tests/actions/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@
FilesInstance,
FilesObjectVersion,
)
from invenio_rdm_migrator.streams.models.github import Release, Repository, WebhookEvent
from invenio_rdm_migrator.streams.models.oai import OAISet
from invenio_rdm_migrator.streams.models.oauth import ServerClient, ServerToken
from invenio_rdm_migrator.streams.models.oauth import (
RemoteAccount,
RemoteToken,
ServerClient,
ServerToken,
)
from invenio_rdm_migrator.streams.models.pids import PersistentIdentifier
from invenio_rdm_migrator.streams.models.records import (
RDMDraftFile,
Expand All @@ -41,6 +47,7 @@
LoginInformation,
SessionActivity,
User,
UserIdentity,
)
from invenio_rdm_migrator.streams.records.state import ParentModelValidator

Expand Down Expand Up @@ -92,8 +99,8 @@ def test_extract_cls():
class TestExtract(Extract):
"""Test extractor."""

def __init__(self, tx, filter_unchanged=True):
self.tx = tx
def __init__(self, txs, filter_unchanged=True):
self.txs = txs if isinstance(txs, list) else [txs]
self.filter_unchanged = filter_unchanged

# NOTE: Copied from KafkaExtract
Expand All @@ -111,26 +118,29 @@ def _filter_unchanged_values(self, op):

def run(self):
"""Yield one element at a time."""
if isinstance(self.tx, dict):
tx = self.tx
if isinstance(self.tx, (str, Path)):
tx_path = Path(self.tx)
assert tx_path.exists()
with jsonlines.open(tx_path) as tx_ops:
tx = {
"operations": [
{"key": op["key"], **op["value"]} for op in tx_ops
]
}
# convert "op" to OperationType enum
for op in tx["operations"]:
op["op"] = OperationType(op["op"].upper())
# extract the tx_id
tx["tx_id"] = tx["operations"][0]["source"]["txId"]
yield Tx(
id=tx["tx_id"],
operations=list(map(self._filter_unchanged_values, tx["operations"])),
)
for tx in self.txs:
if isinstance(tx, dict):
tx = tx
if isinstance(tx, (str, Path)):
tx_path = Path(tx)
assert tx_path.exists()
with jsonlines.open(tx_path) as tx_ops:
tx = {
"operations": [
{"key": op["key"], **op["value"]} for op in tx_ops
]
}
# convert "op" to OperationType enum
for op in tx["operations"]:
op["op"] = OperationType(op["op"].upper())
# extract the tx_id
tx["tx_id"] = tx["operations"][0]["source"]["txId"]
yield Tx(
id=tx["tx_id"],
operations=list(
map(self._filter_unchanged_values, tx["operations"])
),
)

return TestExtract

Expand Down Expand Up @@ -161,10 +171,16 @@ def database(engine):
RDMParentMetadata,
RDMVersionState,
RDMParentCommunityMetadata,
RemoteAccount,
RemoteToken,
Release,
Repository,
ServerClient,
ServerToken,
SessionActivity,
User,
UserIdentity,
WebhookEvent,
]

# create tables
Expand All @@ -181,4 +197,4 @@ def database(engine):
@pytest.fixture(scope="function")
def pg_tx_load(db_uri, session):
"""Load instance configured with the DB session fixture."""
return PostgreSQLTx(db_uri, _session=session)
return PostgreSQLTx(db_uri, _session=session, dry=False)
2 changes: 1 addition & 1 deletion migrator/tests/actions/drafts/test_drafts_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from invenio_rdm_migrator.load.postgresql.transactions.operations import OperationType
from invenio_rdm_migrator.streams.actions import load

from zenodo_rdm_migrator.actions.transform import (
from zenodo_rdm_migrator.actions.transform.drafts import (
DraftCreateAction,
DraftEditAction,
DraftPublishAction,
Expand Down
2 changes: 1 addition & 1 deletion migrator/tests/actions/files/test_files_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from invenio_rdm_migrator.load.postgresql.transactions.operations import OperationType
from invenio_rdm_migrator.streams.actions import load

from zenodo_rdm_migrator.actions.transform import DraftFileUploadAction
from zenodo_rdm_migrator.actions.transform.files import DraftFileUploadAction


def test_matches_with_valid_data(file_upload_tx):
Expand Down
21 changes: 21 additions & 0 deletions migrator/tests/actions/github/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2023 CERN.
#
# ZenodoRDM is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Migrator community actions tests configuration."""

from pathlib import Path

import pytest


# FIXME: deduplicate from actions/communities tests
@pytest.fixture()
def tx_files():
"""Transactions file paths."""
testdata_dir = Path(__file__).parent / "testdata"
assert testdata_dir.exists()
return {f.stem: f for f in testdata_dir.iterdir() if f.is_file()}
Loading
Loading