Skip to content

Commit

Permalink
Get and parse metadata also from DataCite API
Browse files Browse the repository at this point in the history
  • Loading branch information
nsoranzo committed Oct 25, 2024
1 parent 72eba54 commit bf3a509
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 65 deletions.
238 changes: 173 additions & 65 deletions rfparser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import re
import sys
import urllib.parse
from dataclasses import dataclass
from time import sleep
from typing import (
Any,
Optional,
SupportsIndex,
TYPE_CHECKING,
Union,
)
from xml.etree import ElementTree
from xml.etree.ElementTree import indent
Expand All @@ -26,6 +27,7 @@
)

from .util import (
extend_list_to_size,
is_same_person,
str_if_not_None,
strip_tags,
Expand All @@ -41,6 +43,7 @@
REQUEST_RETRIES = 3
REQUEST_RETRIES_BACKOFF_FACTOR = 1.0
BASE_CR_URL = "https://api.crossref.org"
BASE_DC_URL = "https://api.datacite.org"
BASE_DOI_URL = "https://doi.org"
BASE_RF_URL = "https://api.researchfish.com/restapi"
BASE_UNPAYWALL_URL = "https://api.unpaywall.org"
Expand All @@ -56,6 +59,13 @@
"Methods in Molecular Biology",
}

DC_resourceTypeGeneral_TO_CR_TYPE = {
"BookChapter": "book-chapter",
"ConferencePaper": "proceedings-article",
"JournalArticle": "journal-article",
"Preprint": "preprint",
}
# Map CrossRef's "type" to the CategoryId element of the output XML file
CR_TYPE_TO_XML_CATEGORY_ID = {
"book-chapter": "2",
"journal-article": "1",
Expand All @@ -81,12 +91,75 @@
log = logging.getLogger(__name__)


@dataclass
class Person:
username: str
given_names: str
class Researcher:
def __init__(
self,
family_names: Optional[str] = None,
given_names: Optional[str] = None,
name: Optional[str] = None,
orcid_id: Optional[str] = None,
):
self.family_names = family_names
self.given_names = given_names
if not family_names:
assert name
self.name = name
self.orcid_id = sanitise_orcid_id(orcid_id)


class Author(Researcher):
@classmethod
def from_CR(cls, author_dict: dict[str, Any]) -> "Self":
orcid_id = author_dict.get("ORCID")
family_names = author_dict.get("family")
if family_names:
given_names = author_dict.get("given")
return cls(family_names=family_names, given_names=given_names, orcid_id=orcid_id)
else:
name = author_dict.get("name")
if not name:
raise Exception(f"Unrecognised author_dict format: {author_dict}")
return cls(name=name, orcid_id=orcid_id)

@classmethod
def from_DC(cls, creator_dict: dict[str, Any]) -> "Self":
nameIdentifiers = creator_dict["nameIdentifiers"]
orcid_id: Optional[str] = None
for nameIdentifier_dict in nameIdentifiers:
if nameIdentifier_dict["nameIdentifierScheme"] == "ORCID":
orcid_id = nameIdentifier_dict["nameIdentifier"]
family_names = creator_dict.get("familyName")
if family_names:
given_names = creator_dict.get("givenName")
return cls(family_names=family_names, given_names=given_names, orcid_id=orcid_id)
else:
name = creator_dict.get("name")
if not name:
raise Exception(f"Unrecognised creator_dict format: {creator_dict}")
return cls(name=name, orcid_id=orcid_id)

def to_contributor_format(self) -> str:
"""
Format the author's names to a str for the ContributorsList field of the
XML output.
"""
if self.family_names:
if self.given_names:
given_name_initials = "".join(name[0] for name in self.given_names.split())
return f"{self.family_names} {given_name_initials}"
else:
return self.family_names
else:
return self.name


class User(Researcher):
family_names: str
orcid_id: Optional[str]
given_names: str

def __init__(self, username: str, family_names: str, given_names: str, orcid_id: Optional[str] = None):
self.username = username
super().__init__(family_names=family_names, given_names=given_names, orcid_id=orcid_id)


class DOI(str):
Expand Down Expand Up @@ -115,6 +188,15 @@ def __eq__(self, __value: object) -> bool:
def __hash__(self) -> int:
return hash(self.lower())

def startswith(
self,
prefix: Union[str, tuple[str, ...]],
start: Optional[SupportsIndex] = None,
end: Optional[SupportsIndex] = None,
) -> bool:
lower_prefix = prefix.lower() if isinstance(prefix, str) else tuple(_.lower() for _ in prefix)
return self.lower().startswith(lower_prefix, start, end)


def RF_login(username: str, password: str) -> Session:
"""
Expand Down Expand Up @@ -207,6 +289,12 @@ def CR_get_pub_metadata(doi: str, headers: Optional[dict[str, str]] = None) -> d
return r_dict["message"]


def DC_get_pub_metadata(doi: str) -> dict[str, Any]:
r = get_url(f"{BASE_DC_URL}/dois/{doi}")
r_dict = r.json()
return r_dict["data"]["attributes"]


def unpaywall_get_oa_status(s: Session, doi: str, email: str) -> str:
"""
Get the Open Access status of a publication using the Unpaywall API
Expand Down Expand Up @@ -268,31 +356,29 @@ def sanitise_orcid_id(orcid_id: Optional[str]) -> Optional[str]:
return f"https://orcid.org/{number}"


def get_persons(people_data_csv_url: Optional[str]) -> list[Person]:
log.info("Started get_persons")
def get_users(people_data_csv_url: Optional[str]) -> list[User]:
log.info("Started get_users")
if not people_data_csv_url:
log.warning("people_data_csv_url option not specified")
return []
r = requests.get(people_data_csv_url)
r.raise_for_status()
reader = csv.reader(r.text.splitlines())
persons = [
Person(
username=username, given_names=given_names, family_names=family_names, orcid_id=sanitise_orcid_id(orcid_id)
)
users = [
User(username=username, family_names=family_names, given_names=given_names, orcid_id=orcid_id)
for (username, given_names, family_names, orcid_id) in reader
]
duplicated_person_indexes = []
for i, person1 in enumerate(persons):
for person2 in persons[i + 1 :]:
if person1.given_names == person2.given_names and person1.family_names == person2.family_names:
duplicated_person_indexes.append(i)
duplicated_user_indexes = []
for i, user1 in enumerate(users):
for user2 in users[i + 1 :]:
if user1.given_names == user2.given_names and user1.family_names == user2.family_names:
duplicated_user_indexes.append(i)
break
for index in reversed(duplicated_person_indexes):
log.warning("Duplicated person %s will be eliminated", persons[index])
del persons[index]
log.info("Total persons: %s", len(persons))
return persons
for index in reversed(duplicated_user_indexes):
log.warning("Duplicated user %s will be eliminated", users[index])
del users[index]
log.info("Total users: %s", len(users))
return users


def write_xml_output(
Expand All @@ -304,43 +390,24 @@ def write_xml_output(
Write the publications to an XML file for the EI website.
"""

def author_dict_to_contributor(author_dict: dict[str, Any]) -> str:
"""
Transform an author dict from CrossRef to a str for the ContributorsList
field of the XML output.
"""
family_names = author_dict.get("family")
if family_names:
given_names = author_dict.get("given")
if given_names:
given_name_initials = "".join(name[0] for name in given_names.split())
return f"{family_names} {given_name_initials}"
else:
return family_names
else:
name = author_dict.get("name")
if not name:
raise Exception(f"Unrecognised author_dict format: {author_dict}")
return name

def author_dict_to_username(author_dict: dict[str, Any]) -> Optional[str]:
def author_dict_to_username(author: Author) -> Optional[str]:
# First try to match the ORCID id
orcid_id = sanitise_orcid_id(author_dict.get("ORCID"))
orcid_id = author.orcid_id
if orcid_id:
usernames = [person.username for person in persons if person.orcid_id == orcid_id]
usernames = [user.username for user in users if user.orcid_id == orcid_id]
if usernames:
if len(usernames) > 1:
log.warning("Multiple usernames for ORCID id %s", orcid_id)
return usernames[0]
# Try to match the family and given names
family_names = author_dict.get("family")
family_names = author.family_names
if family_names:
given_names = author_dict.get("given", "")
given_names = author.given_names or ""
usernames = [
person.username
for person in persons
if not (orcid_id and person.orcid_id)
and is_same_person(person.family_names, person.given_names, family_names, given_names)
user.username
for user in users
if not (orcid_id and user.orcid_id)
and is_same_person(user.family_names, user.given_names, family_names, given_names)
]
if usernames:
if len(usernames) > 1:
Expand All @@ -355,7 +422,7 @@ def author_dict_to_username(author_dict: dict[str, Any]) -> Optional[str]:
return None

log.info("Started write_xml_output")
persons = get_persons(people_data_csv_url)
users = get_users(people_data_csv_url)
root_el = ElementTree.Element("publications")
for doi, pub in reversed(pubs_with_doi.items()):
if pub["metadata_ok"]:
Expand Down Expand Up @@ -391,7 +458,7 @@ def author_dict_to_username(author_dict: dict[str, Any]) -> Optional[str]:
raise
ElementTree.SubElement(publication_el, "ContributorIds").text = ", ".join(contributor_ids)
ElementTree.SubElement(publication_el, "ContributorList").text = ", ".join(
author_dict_to_contributor(author_dict) for author_dict in pub["authors"]
author.to_contributor_format() for author in pub["authors"]
)
ElementTree.SubElement(publication_el, "Year").text = str_if_not_None(pub["year"])
ElementTree.SubElement(publication_el, "Month").text = str_if_not_None(pub["month"])
Expand Down Expand Up @@ -521,8 +588,8 @@ def main() -> None:
pub_metadata["subtype"] == "preprint"
), f"publication is of type '{pub_type}' with unknown subtype '{pub_metadata['pub_subtype']}'"
pub_type = pub_metadata["subtype"]
pub["type"] = pub_type
assert pub_type in CR_TYPE_TO_XML_CATEGORY_ID, f"unknown publication type {pub_type}"
pub["type"] = pub_type

container_title_list = pub_metadata["container-title"]
if len(container_title_list) == 0:
Expand Down Expand Up @@ -558,31 +625,72 @@ def main() -> None:
container_title, pub["series-title"] = container_title_list
else:
raise Exception(
f"publication with DOI '{doi}' of type {pub_type} has container-title with unknown book series: {container_title_list}"
f"publication of type {pub_type} has container-title with unknown book series: {container_title_list}"
)
pub["container-title"] = container_title

pub["volume"] = pub_metadata.get("volume")
pub["pages"] = pub_metadata.get("page")

pub["authors"] = pub_metadata.get("author")
authors = pub_metadata.get("author")
# Missing authors (or any other incorrect publication metadata) need
# to be fixed by the publisher, see
# https://community.crossref.org/t/where-to-report-incorrect-metadata/3321
assert pub["authors"], f"publication of type {pub_type} cannot have empty authors"
assert authors, "missing authors"
pub["authors"] = [Author.from_CR(author_dict) for author_dict in authors]

# The "issued" field contains the earliest known publication date
# (see https://github.com/CrossRef/rest-api-doc#sorting )
issued_date = pub_metadata["issued"]["date-parts"][0]
# issued_date may not contain the day, i.e. be a list of length 2
issued_year, issued_month, issued_day = issued_date + [None] * (3 - len(issued_date))
pub["year"] = issued_year
pub["month"] = issued_month
pub["day"] = issued_day
else:
raise Exception(f"Registration agency is {pub['RA']}, not CrossRef")
issued_date_parts = pub_metadata["issued"]["date-parts"][0]
pub["year"], pub["month"], pub["day"] = extend_list_to_size(issued_date_parts, 3)

pub["oa_status"] = unpaywall_get_oa_status(unpaywall_session, doi, config["email"])
elif pub["RA"] == "DataCite":
pub_metadata = DC_get_pub_metadata(doi)
pub["title"] = pub_metadata["titles"][0]["title"]

resourceTypeGeneral = pub_metadata["types"]["resourceTypeGeneral"]
pub_type = DC_resourceTypeGeneral_TO_CR_TYPE.get(resourceTypeGeneral)
assert pub_type, f"unknown publication type {resourceTypeGeneral}"
pub["type"] = pub_type

if pub_type == "preprint":
publisher = pub_metadata["publisher"]
assert publisher, f"publication of type {pub_type} cannot have empty publisher"
assert isinstance(publisher, str)
container_title = publisher
else:
container = pub_metadata["container"]
if not container and doi.startswith("10.17863/cam."):
log.warning("Skipping publication '%s': likely a copy of the publisher version", doi)
continue
raise Exception("Unhandled situation!")
pub["container-title"] = container_title

pub["oa_status"] = unpaywall_get_oa_status(unpaywall_session, doi, config["email"])
pub["volume"] = pub_metadata.get("volume")
pages = pub_metadata.get("firstPage")
if pages and (lastPage := pub_metadata.get("lastPage")):
pages = f"{pages}-{lastPage}"
pub["pages"] = pages

creators = pub_metadata["creators"]
assert creators, "missing authors"
pub["authors"] = [Author.from_DC(creator_dict) for creator_dict in creators]

dates = pub_metadata["dates"]
issued_date = None
for date_dict in dates:
if date_dict["dateType"] == "Issued":
issued_date = date_dict["date"]
break
assert issued_date, "missing issued date"
issued_date_parts = issued_date[:10].split("-")
pub["year"], pub["month"], pub["day"] = extend_list_to_size(issued_date_parts, 3)

# https://support.unpaywall.org/support/solutions/articles/44001900286
pub["oa_status"] = "green"
else:
raise Exception(f"Registration agency is {pub['RA']}, which is not currently handled")

pub["metadata_ok"] = True
except Exception as e:
Expand Down
8 changes: 8 additions & 0 deletions rfparser/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Any,
Optional,
TypeVar,
Union,
)

NAME_SPLITTER_PATTERN = re.compile(r"[\s-]+")
Expand Down Expand Up @@ -85,3 +86,10 @@ def is_same_person(family_names1: str, given_names1: str, family_names2: str, gi
continue
return False
return True


def extend_list_to_size(t: list[T], size: int) -> list[Union[None, T]]:
"""
Extend a list with ``None``s if it is shorter than the requested size.
"""
return t + [None] * (size - len(t))
2 changes: 2 additions & 0 deletions tests/test_dois.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def test_DOI():
DOI("")
with pytest.raises(ValueError, match="malformed DOI"):
DOI("foo")
assert doi.startswith(doi_str[:12])
assert not doi.startswith("foo")


def test_get_dois_RA():
Expand Down

0 comments on commit bf3a509

Please sign in to comment.