Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First commit of a script to transform ISA-tab-like metadata to catalog #2

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,15 @@
# catalog-utilities
# catalog-utilities


How to install and run:

```
git clone https://github.com/psychoinformatics-de/catalog-utilities.git
cd catalog-utilities

chmod -R u+rwx code/*

python code/create_catalog_metadata.py -m data/dataset_metadata.tsv -t dataset
```

Output in: `data/dataset_metadata.jsonl`
376 changes: 376 additions & 0 deletions bin/tubby2catalog
Original file line number Diff line number Diff line change
@@ -0,0 +1,376 @@
#!/usr/bin/env python3
"""

"""
from datetime import datetime
import fileinput
import json
import logging
import subprocess
import sys
from typing import List
import uuid

lgr = logging.getLogger('catalog-utilities')

# The dataset schema defines recognized fields contained in
# incoming dataset metadata, and their properties:
# - type: could be a single text field, or a list of text fields
# - required: whether the field is required for the purpose of
# creating a catalog entry
# - case: could be 'single' or 'multiple', indicating whether the
# field can only be supplied once or multiple times in the metadata file
# - columns: the headings of all columns, in order, for the fields
# where type == 'list'
# Currently, the 'type' and 'case' properties are not used
dataset_schema = {
'identifier': {
'type': 'text',
'required': True,
'case': 'single',
},
'version': {
'type': 'text',
'required': False,
'case': 'single',
},
'name': {
'type': 'text',
'required': True,
'case': 'single',
},
'description': {
'type': 'text',
'required': False,
'case': 'single',
},
'author': {
'type': 'list',
'required': False,
'case': 'multiple',
'columns': ['full_name', 'orcid', 'email', 'affiliations'],
},
'publication': {
'type': 'list',
'required': False,
'case': 'multiple',
'columns': ['doi', 'citation'],
},
'keywords': {
'type': 'list',
'required': False,
'case': 'single',
# how to define the situation where all columns of the list have the same definition
},
'property': {
'type': 'list',
'required': False,
'case': 'multiple',
'columns': ['name', 'value'],
},
}

# This defines mapping of field names from incoming metadata
# to corresponding catalog fields
dataset_catalog_mapping = {
'identifier': 'dataset_id',
'version': 'dataset_version',
'name': 'name',
'description': 'description',
'author': 'authors',
'publication': 'publications',
'keywords': 'keywords',
'property': 'top_display',
}
# and the inverse
catalog_dataset_mapping = {v: k for k, v in dataset_catalog_mapping.items()}

file_schema = {
}


def main(metadata_paths: List, metadata_type: str, config: dict):
"""Main function called with command line arguments

Checks a few basic constraints and then calls the
relevant metadata transformation method (dataset | file)"""
# call the appropriate method to transform metadata
if metadata_type == 'dataset':
transform_dataset_metadata(metadata_paths, config)
else:
transform_file_metadata(metadata_paths, config)


def transform_dataset_metadata(metadata_paths: List, config: dict):
"""Reads and transforms dataset metadata from TSV format to JSON"""
errored = False
metadata = {}
for i, line in enumerate(fileinput.input(metadata_paths)):
fields = line.rstrip().split('\t')
try:
row_key = fields[0]
# get item schema and handle non-recognized keys
item_schema = dataset_schema.get(
row_key, dataset_schema['property'])
catalog_key, value = parse_dataset_columns(
row_key, fields[1:], item_schema)
add_metadata_item(catalog_key, row_key, value, metadata)
except Exception as e:
lgr.error(msg=f'Error encountered on line {i + 1}', exc_info=e)
errored = True
# Now we have the metadata in an almost catalog-valid format, but still to do:
# - Some fields still require some wrangling to be catalog-valid
# - Some non-content-specific fields still need to be added
metadata = map_to_catalog(metadata, config)
print(json.dumps(metadata))
if errored:
sys.exit(1)


def parse_dataset_columns(key: str, value: list, item_schema: dict):
# Here the type of metadata field and associated columns are handled
# based on the amount of columns, i.e. based on supplied data which
# could be wrong. We might consider rather handling it based on the
# definition encoded in the dataset_schema dict.
if len(value) > 1:
# handle the case where the field has values in multiple columns
catalog_key = dataset_catalog_mapping.get(key, 'additional_display')
columns = item_schema.get('columns', None)
if columns is None:
# this is interpreted as all columns having the same
# definition e.g. keywords. We just return the same list.
return catalog_key, value
else:
# Map elements of the list onto column names from the schema
# but first make sure that list lengths are equal
if len(columns) > len(value):
columns = columns[:len(value)]
new_value = {k: v for k, v in zip(columns, value)}
return catalog_key, new_value
else:
# Handle the simple case: direct mapping
return dataset_catalog_mapping[key], value[0]


def add_metadata_item(catalog_key, row_key, value, metadata):
""""""
# If the field has already been supplied, the default
# is to assume that it is intentionally supplied multiple
# times, i.e. that it will eventually be an element in a list
# If this is undesireable, the 'case' property from the schema
# could be incorporated
existing = metadata.get(catalog_key)
if catalog_key == 'additional_display':
# special handling, because we need to capture the property category
# from the row_key
if existing is None:
value = {row_key: {value['name']: value['value']}}
else:
v = existing.get(row_key, {})
v[value['name']] = value['value']
existing[row_key] = v
else:
if existing is not None:
if not isinstance(existing, list):
# make sure that the existing value is a list
existing = [existing]
existing.append(value)

if existing:
metadata[catalog_key] = existing
else:
metadata[catalog_key] = value


def map_to_catalog(metadata, config):
""""""
# Get basic valid metadata item
meta_item = new_dataset_meta_item(
ds_id=get_dataset_id(metadata, config),
ds_version=get_dataset_version(metadata),
ds_name=metadata.get('name', ''),
ds_description=metadata.get('description', ''),
)
# map and add remaining fields to meta_item
for key in metadata.keys():
if key in meta_item.keys():
continue
# some fields require wrangling:
# - authors
# - additional_display
# - publications
# other fields are mapped directly from their current value
if key == 'authors':
if key not in meta_item.keys():
meta_item[key] = []
for author in metadata[key]:
meta_item[key].append(
get_author(author)
)
elif key == 'additional_display':
meta_item[key] = get_additional_display(metadata[key])
elif key == 'publications':
if key not in meta_item.keys():
meta_item[key] = []
for pub in metadata[key]:
meta_item[key].append(
get_publication(pub)
)
else:
meta_item[key] = metadata[key]

return meta_item


def transform_file_metadata(input_path, output_path):
"""Reads and transforms file metadata from TSV format to JSON"""
raise NotImplementedError


def get_dataset_id(input, config):
""""""
# consult config for custom ID selection,
# otherwise take plain standard field
fmt = config.get('dataset_id_fmt', '{dataset_id}')
# instantiate raw ID string
raw_id = fmt.format(**input)
# now turn into UUID deterministically
return str(uuid.uuid5(
uuid.uuid5(uuid.NAMESPACE_DNS, 'datalad.org'),
raw_id,
))


def get_dataset_version(input):
""""""
# Version is required for catalog, but not for incoming metadata
# TODO: what to do here?
# For now, just return 'latest' if not provided
return str(input.get('dataset_version', 'latest'))


def get_author(author):
full_name = author.get('full_name', None)
email = author.get('email', None)
orcid = author.get('orcid', None)
identifiers = [{
'type': 'ORCID',
'identifier': orcid
}] if orcid is not None else []
# TODO: where to put 'affiliations', which is part of incoming metadata
return {
'name': full_name or '',
'givenName': '',
'familyName': '',
'email': email or '',
'honorificSuffix': '',
'identifiers': identifiers
}


def get_additional_display(display):
return [
{'name': category, 'content': mappings}
for category, mappings in display.items()
]


def get_publication(publication):
# catalog publications expect: title, doi, authors
# incoming metadata provides: doi, citation
return {
'type': '',
'title': publication.get('citation', ''),
'doi': publication.get('doi', ''),
'datePublished': '',
'publicationOutlet': '',
'authors': []
}


def get_gitconfig(conf_name):
"""Get current user's git config to append to metadata item for catalog"""
result = (
subprocess.run(['git', 'config', conf_name], capture_output=True)
.stdout.decode()
.rstrip()
)
return result


def get_metadata_source():
"""Create metadata_sources dict required by catalog schema"""
source = {
'key_source_map': {},
'sources': [
{
'source_name': 'manual_to_automated_addition',
'source_version': '0.1.0',
'source_time': datetime.now().timestamp(),
'agent_email': get_gitconfig('user.name'),
'agent_name': get_gitconfig('user.email'),
}
],
}
return source


def new_dataset_meta_item(ds_id, ds_version, ds_name = '', ds_description = ''):
"""Create a minimal valid dataset metadata blob in catalog schema"""
meta_item = {
'type': 'dataset',
'dataset_id': ds_id,
'dataset_version': ds_version,
'name': ds_name,
'description': ds_description,
'metadata_sources': get_metadata_source(),
}
return meta_item


def new_file_meta_item(ds_id, ds_version, filepath, content_bytesize = None, url = None):
"""Create a minimal valid dataset metadata blob in catalog schema"""
meta_item = {
'type': 'file',
'dataset_id': ds_id,
'dataset_version': ds_version,
'path': filepath,
'metadata_sources': get_metadata_source(),
}
return meta_item


# -----

if __name__ == '__main__':
import argparse
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
'metadata', metavar='PATH', nargs='*',
help="paths to files to read metadata from, "
"if metadata should not be read from STDIN."
)
p.add_argument(
'-t', '--type', required=True,
choices=['dataset', 'file'],
help="type of metadata supplied",
)
p.add_argument(
'-c', '--config',
metavar='KEY=VALUE',
action='append',
help="configuration items, can be given more than once. "
"Recognized: `dataset_id_fmt`",
)
args = p.parse_args()

config = dict(
c.split('=', maxsplit=1)
for c in args.config or []
)

main(
metadata_paths=args.metadata if len(args.metadata) else ['-'],
metadata_type=args.type,
config=config,
)
1 change: 1 addition & 0 deletions data/dataset_metadata.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type": "dataset", "dataset_id": "1234", "dataset_version": "latest", "name": "Demo", "description": "This is a dataset description", "metadata_sources": {"key_source_map": {}, "sources": [{"source_name": "manual_to_automated_addition", "source_version": "0.1.0", "source_time": 1686089360.497007, "agent_email": "Stephan Heunis", "agent_name": "[email protected]"}]}, "authors": [{"name": "Dr Stephan Heunis", "givenName": "", "familyName": "", "email": "", "honorificSuffix": "", "identifiers": [{"type": "ORCID", "identifier": "0000-1234-5678"}]}, {"name": "Prof Michael Hanke", "givenName": "", "familyName": "", "email": "", "honorificSuffix": "", "identifiers": [{"type": "ORCID", "identifier": "9999-1224-3378"}]}], "keywords": ["minimal", "example", "catalog", "from", "metadata"], "publications": [{"type": "", "title": "Heunis et al,2022,The best paper ever", "doi": "https://doi.org/666", "datePublished": "", "publicationOutlet": "", "authors": []}, {"type": "", "title": "Doe et al,2023,The best paper ever in 2023", "doi": "https://doi.org/999", "datePublished": "", "publicationOutlet": "", "authors": []}], "top_display": [{"name": "Storage", "value": "7PB"}, {"name": "Source", "value": "Open"}], "additional_display": [{"name": "sfb1451", "content": {"Species": "Human", "Limb": "Leg", "project": "Z03"}}]}
Loading