Skip to content

Commit

Permalink
[app][feat] redo ddb xslt
Browse files Browse the repository at this point in the history
  • Loading branch information
M3ssman committed Nov 22, 2023
1 parent 9326a09 commit 874689e
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 113 deletions.
31 changes: 8 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,20 @@
![Python CI](https://github.com/ulb-sachsen-anhalt/digital-flow/actions/workflows/main.yml/badge.svg)
[![PyPi version](https://badgen.net/pypi/v/digiflow/)](https://pypi.org/project/digiflow/) ![PyPI - Downloads](https://img.shields.io/pypi/dm/digiflow) ![PyPI - License](https://img.shields.io/pypi/l/digiflow) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/digiflow)

Father's little helper for internal library digitalization workflows running on Linux-Systems.
Father's little helper for internal library digitalization workflows running on Linux-Systems. Use at own risk.

## Testing
## Tests with toxic container

Testing with tox uses different Python versions.

### Local tests using Tox on Ubuntu 20.04 LTS

For running tox locally for different Python Versions, it is required to have them installed

```bash
apt update
apt-get -y install software-properties-common
add-apt-repository -y ppa:deadsnakes/ppa
apt update
apt-get -y install python python3.6 python3-pip python-setuptools

# activate project's environment
. ./venv/bin/activate
pip install tox
tox
```

### Run Tests with toxic containers
Execute `tox` in a closed environment:

```bash
setup_tox.sh your-python-tox-image
docker build --tag <your-test-image> --build-arg BASE_IMAGE=<your-python-tox-image> -f Dockerfile.tox .
docker run --rm <your-test-image> tox
```

This executes `tox` in a closed container without distributioning.
## License

This project's source code is licensed under terms of the [MIT license](https://opensource.org/licenses/MIT).

NOTE: This project depends on components that may use different license terms.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "digiflow"
version = "3.1.1"
version = "3.2.0"
description = "Father's Little Digitization Workflow Helper"
readme = "README.md"
requires-python = ">=3.8"
Expand All @@ -13,7 +13,7 @@ authors = [
]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License"
"License :: OSI Approved :: MIT License"
]
dependencies = [
"lxml",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ numpy
wheel
requests
docker
saxonche
2 changes: 1 addition & 1 deletion src/digiflow/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.1.0
3.2.0
166 changes: 110 additions & 56 deletions src/digiflow/digiflow_validate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
"""Helper to trigger DDB-Validation for digitized prints and newspaper issues/additionals
cf. https://github.com/Deutsche-Digitale-Bibliothek/ddb-metadata-schematron-validation
"""

import os
import subprocess

from typing import (
Dict,
Expand All @@ -17,29 +18,22 @@
from digiflow import (
run_command
)

SAXON_PY_ENABLED = True
SAXON_ENABLED = True
try:
from saxonpy import (
from saxonche import (
PySaxonProcessor,
PyXdmNode
PyXdmValue,
)
except ModuleNotFoundError:
SAXON_PY_ENABLED = False
SAXON_ENABLED = False

from .digiflow_metadata import (
MetsReader
MetsReader,
XMLNS,
)

XMLNS = {
'mets': 'http://www.loc.gov/METS/',
'mods': 'http://www.loc.gov/mods/v3',
'xlink': 'http://www.w3.org/1999/xlink',
'dv': 'http://dfg-viewer.de/',
'vlz': 'http://visuallibrary.net/vlz/1.0/',
'vl': 'http://visuallibrary.net/vl',
'ulb': 'https://bibliothek.uni-halle.de',
}
# add schematron validation language mapping
XMLNS['svrl'] = 'http://purl.oclc.org/dsdl/svrl'


# DDB Report information *not* to care about
Expand Down Expand Up @@ -99,6 +93,62 @@
DIGIS_MULTIVOLUME = ['Ac', 'Af', 'AF', 'Hc', 'Hf', 'HF', 'volume', 'periodical', 'periodical_volume']
DIGIS_NEWSPAPER = ['issue', 'additional']

# context text len
ASSERT_DSCR_MAX_LEN = 32
ASSERT_TEXT_MAX_LEN = 96
CRITICAL_VALIDATION_ROLES = ['critical', 'error', 'fatal']
DDB_ERROR = 'ddb_error'
DDB_WARNG = 'ddb_warning'


class FailedAssert:
"""Encapsulate individual Schematron Report
for failed assertion"""

def __init__(self, elem: ET.Element):
self._element = elem
self.id = elem.get('id')
self.role = elem.get('role')
self.xpath = elem.get('location')
self._text = '. '.join(elem.xpath('svrl:text/text()', namespaces=XMLNS))
self._description = ' '.join(elem.xpath('svrl:description/text()', namespaces=XMLNS))
self._properties = elem.xpath('svrl:property', namespaces=XMLNS)
self.context = ''

def explain(self):
"""Explain the failure"""
_msg = f'[{self.id}] '
if len(self._properties) > 0:
_props = '=>'.join(f"{_p.get('id')}:{_p.text}" for _p in self._properties)
_msg = f'{_msg} {_props}'
if len(self.context) > 0:
_msg = f'{_msg} test:{self.context}'
_add_dscr = ''
_add_text = ''
if self._description:
_dscr = self._description
if len(_dscr) > ASSERT_DSCR_MAX_LEN:
_dscr = _dscr[:ASSERT_DSCR_MAX_LEN]
_add_dscr = _dscr
if self._text:
_txt = self._text.split('\n', maxsplit=1)[0]
if len(_txt) > ASSERT_TEXT_MAX_LEN:
_txt = _txt[:(ASSERT_TEXT_MAX_LEN-3)] + '...'
_add_text = _txt
if len(_add_dscr)>0 or len(_add_text) > 0:
_msg = f'{_msg} ({_add_dscr} {_add_text})'
return _msg

def add_context(self, ctx:str):
"""Add even more context on error data"""
if ctx is not None and len(ctx) > 0:
self.context = ctx

def is_error(self):
"""Need special care?"""
return self.role in CRITICAL_VALIDATION_ROLES



def ddb_validation(path_mets, path_schematron=PATH_MEDIA_SCH, path_schematron_bin=PATH_CLI,
ignore_rules=None, aggregate_errors=True) -> Dict:
Expand Down Expand Up @@ -160,7 +210,7 @@ def _curate_ignorances(path_mets, ignore_rules) -> List[str]:
return _rules


def ddb_xslt_validation(path_mets, digi_type='Aa', ignore_rules=DDB_IGNORE_RULES_BASIC):
def ddb_xslt_validation(path_mets, digi_type='Aa', ignore_rules=None):
"""Process METS/MODS Validation which complies to Deutsche Digitale Bibliothek (DDB)
plus applying additional set of rules which shall not be taken into account
Expand All @@ -174,32 +224,39 @@ def ddb_xslt_validation(path_mets, digi_type='Aa', ignore_rules=DDB_IGNORE_RULES
ignored when evaluating validation outcome.
Defaults to DDB_IGNORE_RULES_BASIC.
"""

if ignore_rules is None:
ignore_rules = DDB_IGNORE_RULES_BASIC
_path_xslt = str(PATH_MEDIA_XSL)
if digi_type in DIGIS_NEWSPAPER:
_path_xslt = str(PATH_NEWSP_XSL)
if not isinstance(path_mets, str):
path_mets = str(path_mets)
results_map = {}
# just create saxon processor context once
# several calls seem to break stuff
with PySaxonProcessor() as proc:
try:
_path_tmp_file = _apply_and_store_result(path_mets, proc, _path_xslt)
if not os.path.isfile(_path_tmp_file):
raise RuntimeError(f"missing tmp output file {_path_tmp_file}")
results_map = _process_result_file(path_mets, proc, _path_tmp_file, ignore_rules)
# additional post-processing
# merge roles 'fatal' and 'error' both into 'error'
fatals = results_map['fatal'] if 'fatal' in results_map else []
errors = results_map['error'] if 'error' in results_map else []
total_errors = fatals + errors
if total_errors and len(fatals) > 0:
results_map['error'] = total_errors
del results_map['fatal']
_path_tmp_result_file = _validate_and_store_result(path_mets, proc, _path_xslt)
if not os.path.isfile(_path_tmp_result_file):
raise RuntimeError(f"missing tmp output file {_path_tmp_result_file}")
_failures = _get_failures(path_mets, proc, _path_tmp_result_file, ignore_rules)
return _aggregate_failures(_failures)
except Exception as _exc:
raise RuntimeError(_exc) from _exc
return results_map
return {}


def _aggregate_failures(fails:List[FailedAssert]) -> Dict:
_dict = {}
for _f in fails:
_xpln = _f.explain()
_role = DDB_ERROR if _f.is_error() else DDB_WARNG
if _role in _dict:
_prev = _dict[_role]
_prev.append(_xpln)
_dict[_role] = _prev
else:
_dict.setdefault(_role, [_xpln])
return _dict


def _matches_digi_types(path_mets, some_types):
Expand All @@ -210,52 +267,49 @@ def _matches_digi_types(path_mets, some_types):
return any(t in some_types for t in _types)


def _apply_and_store_result(path_mets, proc, path_template):
def _validate_and_store_result(path_mets, proc, path_template):
_path_tmp = None
try:
xsltproc = proc.new_xslt_processor()
_document = proc.parse_xml(xml_file_name=path_mets)
xsltproc.set_source(xdm_node=_document)
xsltproc.compile_stylesheet(stylesheet_file=path_template)
xsltproc = proc.new_xslt30_processor()
_exec = xsltproc.compile_stylesheet(stylesheet_file=path_template)
_doc_dir = os.path.dirname(path_mets)
_path_tmp = os.path.join(_doc_dir, TMP_XSL_FILE_NAME)
# write explicite temporary file
# since the *.transform_to_file doesn't work as expected
# https://www.saxonica.com/saxon-c/doc11/html/saxonc.html#PyXslt30Processor-transform_to_file
with open(_path_tmp, mode='w') as wrt:
wrt.write(xsltproc.transform_to_string())
_doc_dir = os.path.dirname(path_mets)
_path_tmp = os.path.join(_doc_dir, TMP_XSL_FILE_NAME)
_exec.transform_to_file(source_file=path_mets, stylesheet_file=path_template, output_file=_path_tmp)
except Exception as _exc:
raise RuntimeError(_exc) from _exc
return _path_tmp


def _process_result_file(path_mets, proc, path_report, ignore_rules):
def _get_failures(path_mets, proc, path_report, ignore_rules) -> List[FailedAssert]:
"""Inspect results from tmp report file
if any irregularities detected, apply XSLT2.0
expressions from report file to gather details"""

rois_map = {}
_failures = []
tmp_root = ET.parse(path_report).getroot()
reports = tmp_root.findall('{http://purl.oclc.org/dsdl/svrl}*[@role]')
reports = tmp_root.findall('svrl:*[@role]', XMLNS)
n_reports = len(reports)
if n_reports > 0:
rois = [(e.attrib['role'], e.attrib['id'], e.attrib['location'])
_fails = [FailedAssert(e)
for e in reports
if e.attrib['id'] not in ignore_rules]
try:
_mets_doc = proc.parse_xml(xml_file_name=path_mets)
_xp_proc = proc.new_xpath_processor()
_xp_proc.set_context(xdm_item=_mets_doc)
for _roi in rois:
_item = _xp_proc.evaluate(_roi[2])
_ctx_msg = _item.string_value.strip() if isinstance(_item, PyXdmNode) else ''
if _roi[0] in rois_map:
rois_map[_roi[0]].append((_roi[1], _ctx_msg))
else:
rois_map[_roi[0]] = [(_roi[1], _ctx_msg)]
for _fail in _fails:
print(f"{_fail.explain()}")
_info = _xp_proc.evaluate(_fail.xpath)
if isinstance(_info, PyXdmValue) and _info.size > 0:
_items = [_info.item_at(i) for i in range(0, _info.size)]
_ctx = ','.join(_i.string_value.strip() for _i in _items)
_fail.add_context(_ctx)
_failures.append(_fail)
except Exception as _exc:
raise RuntimeError(_exc) from _exc
return rois_map
return _failures


def _transform_report(entries, path_mets, aggregate_errors):
Expand Down Expand Up @@ -322,7 +376,7 @@ def _ddb_report_location_2_lxml_xpath(xp_in):
Returns:
str: XPath stripped any qualified namespace annotations + leading '/'
"""

return '/' + __strip_namespaces(xp_in)


Expand Down
Loading

0 comments on commit 874689e

Please sign in to comment.