diff --git a/fido/fido.py b/fido/fido.py index 9e42a5a..6e2abac 100755 --- a/fido/fido.py +++ b/fido/fido.py @@ -15,15 +15,16 @@ import tempfile import zipfile from contextlib import closing -from time import perf_counter from typing import Optional -from xml.etree import cElementTree as ET + +from defusedxml import ElementTree as ET from fido import CONFIG_DIR, __version__ -from fido.char_handler import escape from fido.cli_args import parse_cli_args from fido.package import OlePackage, ZipPackage -from fido.versions import get_local_versions, sig_file_actions +from fido.pronom.versions import get_local_versions, sig_file_actions +from fido.utils.char_handler import escape +from fido.utils.timer import PerfTimer defaults = { "config_dir": CONFIG_DIR, @@ -50,22 +51,6 @@ } -class PerfTimer: - """Utility class that carries out simple process timings.""" - - def __init__(self): - """New instance with start time running.""" - self.start_time = perf_counter() - - def start(self): - """Start new timer.""" - self.start_time = perf_counter() - - def duration(self): - """Return the duration since instantiation or start() was last called.""" - return perf_counter() - self.start_time - - class Fido: """Main FIDO application class.""" diff --git a/fido/prepare.py b/fido/pronom/prepare.py similarity index 80% rename from fido/prepare.py rename to fido/pronom/prepare.py index 7580bef..1409c09 100644 --- a/fido/prepare.py +++ b/fido/pronom/prepare.py @@ -12,10 +12,11 @@ from urllib.parse import urlparse from urllib.request import urlopen from xml.dom import minidom -from xml.etree import ElementTree as ET -from .char_handler import escape -from .versions import get_local_versions +from defusedxml import ElementTree as ET + +from fido.pronom.versions import get_local_versions +from fido.utils.char_handler import escape FLG_INCOMPATIBLE = "__INCOMPATIBLE_SIG__" @@ -126,10 +127,10 @@ def load_pronom_xml(self, puid_filter=None): """ formats = [] try: - zip = zipfile.ZipFile(self.pronom_files, "r") - for item in zip.infolist(): + pronom_collection = zipfile.ZipFile(self.pronom_files, "r") + for item in pronom_collection.infolist(): try: - stream = zip.open(item) + stream = pronom_collection.open(item) # Work is done here! format_ = self.parse_pronom_xml(stream, puid_filter) if format_ is not None: @@ -138,12 +139,10 @@ def load_pronom_xml(self, puid_filter=None): stream.close() finally: try: - zip.close() + pronom_collection.close() except Exception as e: print( - "An error occured loading '{0}' (exception: {1})".format( - self.pronom_files, e - ), + "An error occured loading '{0}' (exception: {1})".format(self.pronom_files, e), file=sys.stderr, ) sys.exit() @@ -182,62 +181,46 @@ def parse_pronom_xml(self, source, puid_filter=None): pronom_format = pronom_root.find(TNA("report_format_detail/FileFormat")) fido_format = ET.Element("format") # Get the base Format information - for id in pronom_format.findall(TNA("FileFormatIdentifier")): - type = get_text_tna(id, "IdentifierType") - if type == "PUID": - puid = get_text_tna(id, "Identifier") + for xml_id in pronom_format.findall(TNA("FileFormatIdentifier")): + xml_id_type = get_text_tna(xml_id, "IdentifierType") + if xml_id_type == "PUID": + puid = get_text_tna(xml_id, "Identifier") ET.SubElement(fido_format, "puid").text = puid if puid_filter and puid != puid_filter: return None # A bit clumsy. I want to have puid first, then mime, then container. - for id in pronom_format.findall(TNA("FileFormatIdentifier")): - type = get_text_tna(id, "IdentifierType") - if type == "MIME": - ET.SubElement(fido_format, "mime").text = get_text_tna(id, "Identifier") - elif type == "PUID": - puid = get_text_tna(id, "Identifier") + for xml_id in pronom_format.findall(TNA("FileFormatIdentifier")): + xml_id_type = get_text_tna(xml_id, "IdentifierType") + if xml_id_type == "MIME": + ET.SubElement(fido_format, "mime").text = get_text_tna(xml_id, "Identifier") + elif xml_id_type == "PUID": + puid = get_text_tna(xml_id, "Identifier") if puid == "x-fmt/263": ET.SubElement(fido_format, "container").text = "zip" elif puid == "x-fmt/265": ET.SubElement(fido_format, "container").text = "tar" - ET.SubElement(fido_format, "name").text = get_text_tna( - pronom_format, "FormatName" - ) - ET.SubElement(fido_format, "version").text = get_text_tna( - pronom_format, "FormatVersion" - ) - ET.SubElement(fido_format, "alias").text = get_text_tna( - pronom_format, "FormatAliases" - ) - ET.SubElement(fido_format, "pronom_id").text = get_text_tna( - pronom_format, "FormatID" - ) + ET.SubElement(fido_format, "name").text = get_text_tna(pronom_format, "FormatName") + ET.SubElement(fido_format, "version").text = get_text_tna(pronom_format, "FormatVersion") + ET.SubElement(fido_format, "alias").text = get_text_tna(pronom_format, "FormatAliases") + ET.SubElement(fido_format, "pronom_id").text = get_text_tna(pronom_format, "FormatID") # Get the extensions from the ExternalSignature for x in pronom_format.findall(TNA("ExternalSignature")): ET.SubElement(fido_format, "extension").text = get_text_tna(x, "Signature") - for id in pronom_format.findall(TNA("FileFormatIdentifier")): - type = get_text_tna(id, "IdentifierType") - if type == "Apple Uniform Type Identifier": - ET.SubElement(fido_format, "apple_uti").text = get_text_tna( - id, "Identifier" - ) + for xml_id in pronom_format.findall(TNA("FileFormatIdentifier")): + xml_id_type = get_text_tna(xml_id, "IdentifierType") + if xml_id_type == "Apple Uniform Type Identifier": + ET.SubElement(fido_format, "apple_uti").text = get_text_tna(xml_id, "Identifier") # Handle the relationships for x in pronom_format.findall(TNA("RelatedFormat")): rel = get_text_tna(x, "RelationshipType") if rel == "Has priority over": - ET.SubElement(fido_format, "has_priority_over").text = get_text_tna( - x, "RelatedFormatID" - ) + ET.SubElement(fido_format, "has_priority_over").text = get_text_tna(x, "RelatedFormatID") # Get the InternalSignature information for pronom_sig in pronom_format.findall(TNA("InternalSignature")): fido_sig = ET.SubElement(fido_format, "signature") - ET.SubElement(fido_sig, "name").text = get_text_tna( - pronom_sig, "SignatureName" - ) + ET.SubElement(fido_sig, "name").text = get_text_tna(pronom_sig, "SignatureName") # There are some funny chars in the notes, which caused me trouble and it is a unicode string, - ET.SubElement(fido_sig, "note").text = get_text_tna( - pronom_sig, "SignatureNote" - ) + ET.SubElement(fido_sig, "note").text = get_text_tna(pronom_sig, "SignatureNote") for pronom_pat in pronom_sig.findall(TNA("ByteSequence")): # print('Parsing ID:{}'.format(puid)) fido_pat = ET.SubElement(fido_sig, "pattern") @@ -249,14 +232,10 @@ def parse_pronom_xml(self, source, puid_filter=None): pass # print "working on puid:", puid, ", position: ", pos, "with offset, maxoffset: ", offset, ",", max_offset try: - regex = convert_to_regex( - byte_seq, "Little", pos, offset, max_offset - ) + regex = convert_to_regex(byte_seq, "Little", pos, offset, max_offset) except ValueError as ve: print( - "ValueError converting PUID {} signature to regex: {}".format( - puid, ve - ), + "ValueError converting PUID {} signature to regex: {}".format(puid, ve), file=sys.stderr, ) regex = FLG_INCOMPATIBLE @@ -264,9 +243,7 @@ def parse_pronom_xml(self, source, puid_filter=None): # print "done puid", puid if regex == FLG_INCOMPATIBLE: print( - "Error: incompatible PRONOM signature found for puid {} skipping...".format( - puid - ), + "Error: incompatible PRONOM signature found for puid {} skipping...".format(puid), file=sys.stderr, ) # remove the empty 'signature' nodes @@ -280,80 +257,54 @@ def parse_pronom_xml(self, source, puid_filter=None): ET.SubElement(fido_pat, "regex").text = regex # Get the format details fido_details = ET.SubElement(fido_format, "details") - ET.SubElement(fido_details, "dc:description").text = get_text_tna( - pronom_format, "FormatDescription" - ) - ET.SubElement(fido_details, "dcterms:available").text = get_text_tna( - pronom_format, "ReleaseDate" - ) - ET.SubElement(fido_details, "dc:creator").text = get_text_tna( - pronom_format, "Developers/DeveloperCompoundName" - ) + ET.SubElement(fido_details, "dc:description").text = get_text_tna(pronom_format, "FormatDescription") + ET.SubElement(fido_details, "dcterms:available").text = get_text_tna(pronom_format, "ReleaseDate") + ET.SubElement(fido_details, "dc:creator").text = get_text_tna(pronom_format, "Developers/DeveloperCompoundName") ET.SubElement(fido_details, "dcterms:publisher").text = get_text_tna( pronom_format, "Developers/OrganisationName" ) for x in pronom_format.findall(TNA("RelatedFormat")): rel = get_text_tna(x, "RelationshipType") if rel == "Is supertype of": - ET.SubElement(fido_details, "is_supertype_of").text = get_text_tna( - x, "RelatedFormatID" - ) + ET.SubElement(fido_details, "is_supertype_of").text = get_text_tna(x, "RelatedFormatID") for x in pronom_format.findall(TNA("RelatedFormat")): rel = get_text_tna(x, "RelationshipType") if rel == "Is subtype of": - ET.SubElement(fido_details, "is_subtype_of").text = get_text_tna( - x, "RelatedFormatID" - ) - ET.SubElement(fido_details, "content_type").text = get_text_tna( - pronom_format, "FormatTypes" - ) + ET.SubElement(fido_details, "is_subtype_of").text = get_text_tna(x, "RelatedFormatID") + ET.SubElement(fido_details, "content_type").text = get_text_tna(pronom_format, "FormatTypes") # References for x in pronom_format.findall(TNA("Document")): r = ET.SubElement(fido_details, "reference") ET.SubElement(r, "dc:title").text = get_text_tna(x, "TitleText") - ET.SubElement(r, "dc:creator").text = get_text_tna( - x, "Author/AuthorCompoundName" - ) - ET.SubElement(r, "dc:publisher").text = get_text_tna( - x, "Publisher/PublisherCompoundName" - ) - ET.SubElement(r, "dcterms:available").text = get_text_tna( - x, "PublicationDate" - ) - for id in x.findall(TNA("DocumentIdentifier")): - type = get_text_tna(id, "IdentifierType") - if type == "URL": - ET.SubElement(r, "dc:identifier").text = "http://" + get_text_tna( - id, "Identifier" - ) + ET.SubElement(r, "dc:creator").text = get_text_tna(x, "Author/AuthorCompoundName") + ET.SubElement(r, "dc:publisher").text = get_text_tna(x, "Publisher/PublisherCompoundName") + ET.SubElement(r, "dcterms:available").text = get_text_tna(x, "PublicationDate") + for xml_id in x.findall(TNA("DocumentIdentifier")): + xml_id_type = get_text_tna(xml_id, "IdentifierType") + if xml_id_type == "URL": + ET.SubElement(r, "dc:identifier").text = "http://" + get_text_tna(xml_id, "Identifier") else: ET.SubElement(r, "dc:identifier").text = ( - get_text_tna(id, "IdentifierType") - + ":" - + get_text_tna(id, "Identifier") + get_text_tna(xml_id, "IdentifierType") + ":" + get_text_tna(xml_id, "Identifier") ) ET.SubElement(r, "dc:description").text = get_text_tna(x, "DocumentNote") ET.SubElement(r, "dc:type").text = get_text_tna(x, "DocumentType") ET.SubElement(r, "dcterms:license").text = ( - get_text_tna(x, "AvailabilityDescription") - + " " - + get_text_tna(x, "AvailabilityNote") + get_text_tna(x, "AvailabilityDescription") + " " + get_text_tna(x, "AvailabilityNote") ) ET.SubElement(r, "dc:rights").text = get_text_tna(x, "DocumentIPR") # Examples for x in pronom_format.findall(TNA("ReferenceFile")): rf = ET.SubElement(fido_details, "example_file") ET.SubElement(rf, "dc:title").text = get_text_tna(x, "ReferenceFileName") - ET.SubElement(rf, "dc:description").text = get_text_tna( - x, "ReferenceFileDescription" - ) + ET.SubElement(rf, "dc:description").text = get_text_tna(x, "ReferenceFileDescription") checksum = "" - for id in x.findall(TNA("ReferenceFileIdentifier")): - type = get_text_tna(id, "IdentifierType") - if type == "URL": + for xml_id in x.findall(TNA("ReferenceFileIdentifier")): + xml_id_type = get_text_tna(xml_id, "IdentifierType") + if xml_id_type == "URL": # Starting with PRONOM 89, some URLs contain http:// # and others do not. - url = get_text_tna(id, "Identifier") + url = get_text_tna(xml_id, "Identifier") if not urlparse(url).scheme: url = "http://" + url ET.SubElement(rf, "dc:identifier").text = url @@ -364,20 +315,14 @@ def parse_pronom_xml(self, source, puid_filter=None): m.update(sock.read()) sock.close() except HTTPError as http_excep: - sys.stderr.write( - "HTTP {} error loading resource {}\n".format( - http_excep.code, url - ) - ) + sys.stderr.write("HTTP {} error loading resource {}\n".format(http_excep.code, url)) if http_excep.code == 404: continue checksum = m.hexdigest() else: ET.SubElement(rf, "dc:identifier").text = ( - get_text_tna(id, "IdentifierType") - + ":" - + get_text_tna(id, "Identifier") + get_text_tna(xml_id, "IdentifierType") + ":" + get_text_tna(xml_id, "Identifier") ) ET.SubElement(rf, "dcterms:license").text = "" ET.SubElement(rf, "dc:rights").text = get_text_tna(x, "ReferenceFileIPR") @@ -387,18 +332,10 @@ def parse_pronom_xml(self, source, puid_filter=None): # Record Metadata md = ET.SubElement(fido_details, "record_metadata") ET.SubElement(md, "status").text = "unknown" - ET.SubElement(md, "dc:creator").text = get_text_tna( - pronom_format, "ProvenanceName" - ) - ET.SubElement(md, "dcterms:created").text = get_text_tna( - pronom_format, "ProvenanceSourceDate" - ) - ET.SubElement(md, "dcterms:modified").text = get_text_tna( - pronom_format, "LastUpdatedDate" - ) - ET.SubElement(md, "dc:description").text = get_text_tna( - pronom_format, "ProvenanceDescription" - ) + ET.SubElement(md, "dc:creator").text = get_text_tna(pronom_format, "ProvenanceName") + ET.SubElement(md, "dcterms:created").text = get_text_tna(pronom_format, "ProvenanceSourceDate") + ET.SubElement(md, "dcterms:modified").text = get_text_tna(pronom_format, "LastUpdatedDate") + ET.SubElement(md, "dc:description").text = get_text_tna(pronom_format, "ProvenanceDescription") return fido_format # FIXME: I don't think that this quite works yet! @@ -485,9 +422,7 @@ def do_byte(chars, i, littleendian, esc=True): c2 = "0123456789ABCDEF".find(chars[i + 1].upper()) buf = StringIO() if c1 < 0 or c2 < 0: - raise Exception( - _convert_err_msg("bad byte sequence", chars[i : i + 2], i, chars, buf) - ) + raise Exception(_convert_err_msg("bad byte sequence", chars[i : i + 2], i, chars, buf)) if littleendian: val = chr(16 * c1 + c2) else: @@ -553,16 +488,12 @@ def calculate_repetition(char, pos, offset, maxoffset): def do_all_bitmasks(chars, i, littleendian): """(byte & bitmask) == bitmask.""" - return do_any_all_bitmasks( - chars, i, lambda byt, bitmask: ((byt & bitmask) == bitmask), littleendian - ) + return do_any_all_bitmasks(chars, i, lambda byt, bitmask: ((byt & bitmask) == bitmask), littleendian) def do_any_bitmasks(chars, i, littleendian): """(byte & bitmask) != 0.""" - return do_any_all_bitmasks( - chars, i, lambda byt, bitmask: ((byt & bitmask) != 0), littleendian - ) + return do_any_all_bitmasks(chars, i, lambda byt, bitmask: ((byt & bitmask) != 0), littleendian) def do_any_all_bitmasks(chars, i, predicate, littleendian): @@ -581,13 +512,7 @@ def do_any_all_bitmasks(chars, i, predicate, littleendian): byt, inc = do_byte(chars, i + 1, littleendian, esc=False) bitmask = ord(byt) regex = "({})".format( - "|".join( - [ - "\\x" + hex(byte)[2:].zfill(2) - for byte in range(0x100) - if predicate(byte, bitmask) - ] - ) + "|".join(["\\x" + hex(byte)[2:].zfill(2) for byte in range(0x100) if predicate(byte, bitmask)]) ) return regex, inc + 1 @@ -645,11 +570,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): elif chars[i] in "*+?": state = "specials" else: - raise ValueError( - _convert_err_msg( - "Illegal character in start", chars[i], i, chars, buf - ) - ) + raise ValueError(_convert_err_msg("Illegal character in start", chars[i], i, chars, buf)) elif state == "bytes": (byt, inc) = do_byte(chars, i, littleendian) buf.write(byt) @@ -684,11 +605,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): elif chars[i] == "]": break else: - raise Exception( - _convert_err_msg( - "Illegal character in non-match", chars[i], i, chars, buf - ) - ) + raise Exception(_convert_err_msg("Illegal character in non-match", chars[i], i, chars, buf)) buf.write(")") i += 1 state = "start" @@ -714,11 +631,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): buf.write("]") i += 1 except Exception: - print( - _convert_err_msg( - "Illegal character in bracket", chars[i], i, chars, buf - ) - ) + print(_convert_err_msg("Illegal character in bracket", chars[i], i, chars, buf)) raise if i < len(chars) and chars[i] == "{": state = "curly-after-bracket" @@ -761,9 +674,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): else: raise Exception( _convert_err_msg( - ( - "Current state = '{0}' : Illegal character in paren" - ).format(state), + ("Current state = '{0}' : Illegal character in paren").format(state), chars[i], i, chars, @@ -796,11 +707,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): elif chars[i] == "}": break else: - raise Exception( - _convert_err_msg( - "Illegal character in curly", chars[i], i, chars, buf - ) - ) + raise Exception(_convert_err_msg("Illegal character in curly", chars[i], i, chars, buf)) buf.write("}") i += 1 # skip the ) state = "start" @@ -813,11 +720,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): i += 1 elif chars[i] == "?": if chars[i + 1] != "?": - raise Exception( - _convert_err_msg( - "Illegal character after ?", chars[i + 1], i + 1, chars, buf - ) - ) + raise Exception(_convert_err_msg("Illegal character after ?", chars[i + 1], i + 1, chars, buf)) buf.write(".?") i += 2 state = "start" @@ -833,18 +736,18 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): return val -def run(input=None, output=None, puid=None): +def run(input_file=None, output_file=None, puid=None): """Convert PRONOM formats into FIDO signatures.""" versions = get_local_versions() - if input is None: - input = versions.get_zip_file() - if output is None: - output = versions.get_signature_file() + if input_file is None: + input_file = versions.get_zip_file() + if output_file is None: + output_file = versions.get_signature_file() - info = FormatInfo(input) + info = FormatInfo(input_file) info.load_pronom_xml(puid) - info.save(output) + info.save(output_file) print( "Converted {0} PRONOM formats to FIDO signatures".format(len(info.formats)), file=sys.stderr, @@ -856,19 +759,13 @@ def main(args=None): if args is None: args = sys.argv[1:] - parser = ArgumentParser( - description="Produce the FIDO format XML that is loaded at run-time" - ) - parser.add_argument( - "-input", default=None, help="Input file, a Zip containing PRONOM XML files" - ) + parser = ArgumentParser(description="Produce the FIDO format XML that is loaded at run-time") + parser.add_argument("-input", default=None, help="Input file, a Zip containing PRONOM XML files") parser.add_argument("-output", default=None, help="Output file") - parser.add_argument( - "-puid", default=None, help="A particular PUID record to extract" - ) + parser.add_argument("-puid", default=None, help="A particular PUID record to extract") args = parser.parse_args(args) - run(input=args.input, output=args.output, puid=args.puid) + run(input_file=args.input, output_file=args.output, puid=args.puid) if __name__ == "__main__": diff --git a/fido/pronom/soap.py b/fido/pronom/soap.py index 67d2a73..49ac707 100644 --- a/fido/pronom/soap.py +++ b/fido/pronom/soap.py @@ -19,10 +19,13 @@ PRONOM format signatures SOAP calls. """ + import sys import urllib -import xml.etree.ElementTree as ET from urllib.error import HTTPError, URLError +from xml.etree import ElementTree as ET + +from defusedxml.ElementTree import fromstring from fido import __version__ @@ -50,9 +53,7 @@ def get_sig_xml_for_puid(puid): """Return the full PRONOM signature XML for the passed PUID.""" - req = urllib.request.Request( - "http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid) - ) + req = urllib.request.Request("http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid)) response = urllib.request.urlopen(req) xml = response.read() return xml @@ -82,16 +83,12 @@ def get_droid_signatures(version): format_count = False try: with urllib.request.urlopen( - "https://www.nationalarchives.gov.uk/documents/DROID_SignatureFile_V{}.xml".format( - version - ) + "https://www.nationalarchives.gov.uk/documents/DROID_SignatureFile_V{}.xml".format(version) ) as f: xml = f.read().decode("utf-8") - root_ele = ET.fromstring(xml) + root_ele = fromstring(xml) format_count = len( - root_ele.findall( - ".//{http://www.nationalarchives.gov.uk/pronom/SignatureFile}FileFormat" - ) + root_ele.findall(".//{http://www.nationalarchives.gov.uk/pronom/SignatureFile}FileFormat") ) except HTTPError as httpe: sys.stderr.write( @@ -105,27 +102,19 @@ def get_droid_signatures(version): def _get_soap_ele_tree(soap_action): soap_string = '{}<{} xmlns="{}" />'.format( XML_PROC, NS.get("xsi"), NS.get("xsd"), NS.get("soap"), soap_action, PRONOM_NS - ).encode( - ENCODING - ) + ).encode(ENCODING) soap_action = '"{}:{}In"'.format(PRONOM_NS, soap_action) xml = _get_soap_response(soap_action, soap_string) for prefix, uri in NS.items(): ET.register_namespace(prefix, uri) - return ET.fromstring(xml) + return fromstring(xml) def _get_soap_response(soap_action, soap_string): try: - req = urllib.request.Request( - "http://{}/pronom/service.asmx".format(PRONOM_HOST), data=soap_string - ) + req = urllib.request.Request("http://{}/pronom/service.asmx".format(PRONOM_HOST), data=soap_string) except URLError: - print( - "There was a problem contacting the PRONOM service at http://{}/pronom/service.asmx.".format( - PRONOM_HOST - ) - ) + print("There was a problem contacting the PRONOM service at http://{}/pronom/service.asmx.".format(PRONOM_HOST)) print("Please check your network connection and try again.") sys.exit(1) for key, value in HEADERS.items(): diff --git a/fido/update_signatures.py b/fido/pronom/update_signatures.py similarity index 92% rename from fido/update_signatures.py rename to fido/pronom/update_signatures.py index 919dfad..a99a49a 100644 --- a/fido/update_signatures.py +++ b/fido/pronom/update_signatures.py @@ -19,17 +19,18 @@ import zipfile from argparse import ArgumentParser from shutil import rmtree -from xml.etree import ElementTree as CET -from . import CONFIG_DIR, __version__ -from .prepare import run as prepare_pronom_to_fido -from .pronom.soap import ( +from defusedxml import ElementTree as CET +from pronom.prepare import run as prepare_pronom_to_fido + +from fido import CONFIG_DIR, __version__ +from fido.pronom.soap import ( NS, get_droid_signatures, get_pronom_sig_version, get_sig_xml_for_puid, ) -from .versions import get_local_versions +from fido.pronom.versions import get_local_versions ABORT_MSG = "Aborting update..." @@ -117,9 +118,7 @@ def sig_version_check(version="latest"): print("Getting latest version number from PRONOM...") version = get_pronom_sig_version() if not version: - sys.exit( - "Failed to obtain PRONOM signature file version number, please try again." - ) + sys.exit("Failed to obtain PRONOM signature file version number, please try again.") print("Querying PRONOM for signaturefile version {}.".format(version)) sig_file_name = _sig_file_name(version) @@ -159,9 +158,7 @@ def init_sig_download(defaults): resume = False if os.path.isdir(tmpdir): print("Found previously created temporary folder for download:", tmpdir) - resume = query_yes_no( - "Do you want to resume download (yes) or start over (no)?" - ) + resume = query_yes_no("Do you want to resume download (yes) or start over (no)?") if resume: print("Resuming download...") else: @@ -171,9 +168,7 @@ def init_sig_download(defaults): except OSError: pass if not os.path.isdir(tmpdir): - sys.stderr.write( - "Failed to create temporary folder for PUID's, using: " + tmpdir - ) + sys.stderr.write("Failed to create temporary folder for PUID's, using: " + tmpdir) return tmpdir, resume @@ -187,9 +182,7 @@ def download_signatures(defaults, format_eles, resume, tmpdir): download_sig(format_ele, tmpdir, resume, defaults) numfiles += 1 print( - r"Downloaded {}/{} files [{}%]".format( - numfiles, puid_count, int(float(numfiles) / one_percent) - ), + r"Downloaded {}/{} files [{}%]".format(numfiles, puid_count, int(float(numfiles) / one_percent)), end="\r", ) print("100%") @@ -258,9 +251,7 @@ def update_versions_xml(version): def main(): """Main CLI entrypoint.""" - parser = ArgumentParser( - description="Download and convert the latest PRONOM signatures" - ) + parser = ArgumentParser(description="Download and convert the latest PRONOM signatures") parser.add_argument( "-tmpdir", default=OPTIONS["tmp_dir"], diff --git a/fido/versions.py b/fido/pronom/versions.py similarity index 83% rename from fido/versions.py rename to fido/pronom/versions.py index 55fa220..25843d2 100644 --- a/fido/versions.py +++ b/fido/pronom/versions.py @@ -17,15 +17,14 @@ PRONOM is available from http://www.nationalarchives.gov.uk/pronom/ """ - import importlib.resources import os import re import sys -from xml.etree import ElementTree as ET -from xml.etree.ElementTree import ParseError, parse import requests +from defusedxml import ElementTree as ET +from defusedxml.ElementTree import ParseError, parse from fido import CONFIG_DIR @@ -87,9 +86,7 @@ def __setattr__(self, name, value): def get_zip_file(self): """Obtain location to the PRONOM XML Zip file based on the current PRONOM version.""" - return os.path.join( - self.conf_dir, "pronom-xml-v{}.zip".format(self.pronom_version) - ) + return os.path.join(self.conf_dir, "pronom-xml-v{}.zip".format(self.pronom_version)) def get_signature_file(self): """Obtain location to the current PRONOM signature file.""" @@ -101,9 +98,7 @@ def write(self): for key, value in self.PROPS_MAPPING.items(): if self.root.find(value) is None: raise ValueError("Field {} has not been defined!".format(key)) - self.tree.write( - self.versions_file, xml_declaration=True, method="xml", encoding="utf-8" - ) + self.tree.write(self.versions_file, xml_declaration=True, method="xml", encoding="utf-8") def get_local_versions(config_dir=CONFIG_DIR): @@ -147,19 +142,11 @@ def _list_available_versions(update_url): def _check_update_signatures(sig_vers, update_url, versions, is_update=False): is_new, latest = _version_check(sig_vers, update_url) if is_new: - sys.stdout.write( - "Updated signatures v{} are available, current version is v{}\n".format( - latest, sig_vers - ) - ) + sys.stdout.write("Updated signatures v{} are available, current version is v{}\n".format(latest, sig_vers)) if is_update: _output_details(latest, update_url, versions) else: - sys.stdout.write( - "Your signature files are up to date, current version is v{}\n".format( - sig_vers - ) - ) + sys.stdout.write("Your signature files are up to date, current version is v{}\n".format(sig_vers)) sys.exit(0) @@ -169,23 +156,15 @@ def _download_sig_version(sig_act, update_url, versions): if not match: sys.exit( - '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format( - sig_act - ) + '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format(sig_act) ) ver = sig_act if not ver.startswith("v"): ver = "v" + sig_act resp = requests.get(update_url + "format/" + ver + "/") if resp.status_code != 200: - sys.exit( - "No signature files found for {}, REST status {}".format( - sig_act, resp.status_code - ) - ) - _output_details( - re.search(r"\d+|$", ver).group(), update_url, versions - ) # noqa: W605 + sys.exit("No signature files found for {}, REST status {}".format(sig_act, resp.status_code)) + _output_details(re.search(r"\d+|$", ver).group(), update_url, versions) # noqa: W605 def _get_version(ver_string): @@ -193,9 +172,7 @@ def _get_version(ver_string): match = re.search(r"^v?(\d+)$", ver_string, re.IGNORECASE) if not match: sys.exit( - '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format( - ver_string - ) + '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format(ver_string) ) ver = ver_string return ver_string if not ver.startswith("v") else ver_string[1:] @@ -214,18 +191,14 @@ def _output_details(version, update_url, versions): def _version_check(sig_ver, update_url): resp = requests.get(update_url + "format/latest/") if resp.status_code != 200: - sys.exit( - "Error getting latest version info: HTTP Status {}".format(resp.status_code) - ) + sys.exit("Error getting latest version info: HTTP Status {}".format(resp.status_code)) root_ele = ET.fromstring(resp.text) latest = _get_version(root_ele.get("version")) return int(latest) > int(sig_ver), latest def _write_sigs(latest, update_url, type, name_template): - sig_out = str( - importlib.resources.files("fido").joinpath("conf", name_template.format(latest)) - ) + sig_out = str(importlib.resources.files("fido").joinpath("conf", name_template.format(latest))) if os.path.exists(sig_out): return resp = requests.get(update_url + "format/{0}/{1}/".format(latest, type)) diff --git a/fido/toxml.py b/fido/toxml.py index ca1905a..9e240da 100644 --- a/fido/toxml.py +++ b/fido/toxml.py @@ -22,8 +22,9 @@ import csv import sys +from fido.pronom.versions import get_local_versions + from . import __version__ -from .versions import get_local_versions def main(): @@ -34,9 +35,7 @@ def main(): {0} {1} - """.format( - __version__, get_local_versions().pronom_version - ) + """.format(__version__, get_local_versions().pronom_version) ) reader = csv.reader(sys.stdin) @@ -54,9 +53,7 @@ def main(): {6} {7} {8} - """.format( - row[6], row[0], row[8], row[1], row[2], row[7], row[3], row[4], row[5] - ) + """.format(row[6], row[0], row[8], row[1], row[2], row[7], row[3], row[4], row[5]) ) sys.stdout.write("\n\n") diff --git a/fido/utils/__init__.py b/fido/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fido/char_handler.py b/fido/utils/char_handler.py similarity index 100% rename from fido/char_handler.py rename to fido/utils/char_handler.py diff --git a/fido/utils/timer.py b/fido/utils/timer.py new file mode 100644 index 0000000..af1cbb1 --- /dev/null +++ b/fido/utils/timer.py @@ -0,0 +1,17 @@ +from time import perf_counter + + +class PerfTimer: + """Utility class that carries out simple process timings.""" + + def __init__(self): + """New instance with start time running.""" + self.start_time = perf_counter() + + def start(self): + """Start new timer.""" + self.start_time = perf_counter() + + def duration(self): + """Return the duration since instantiation or start() was last called.""" + return perf_counter() - self.start_time diff --git a/pyproject.toml b/pyproject.toml index 07ef2c8..dc306e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,8 @@ classifiers = [ dependencies = [ "olefile >= 0.46, < 1", - "requests", + "requests >= 2", + "defusedxml >= 0.7" ] [project.urls] @@ -45,9 +46,9 @@ testing = [ [project.scripts] fido = "fido.fido:main" -fido-prepare = "fido.prepare:main" +fido-prepare = "fido.pronom.prepare:main" fido-toxml = "fido.toxml:main" - +fido-update-signatures = "fido.pronom.update_signatures:run" [tool.setuptools.package-data] "fido" = ["*.*", "conf/*.*", "pronom/*.*"] diff --git a/tests/test_fido.py b/tests/test_fido.py index e0c256e..420dbd5 100644 --- a/tests/test_fido.py +++ b/tests/test_fido.py @@ -7,7 +7,8 @@ import pytest -from fido.fido import Fido, PerfTimer +from fido.fido import Fido +from fido.utils.timer import PerfTimer def test_perf_timer(): diff --git a/tests/test_prepare.py b/tests/test_prepare.py index 752fcd3..10cbbad 100644 --- a/tests/test_prepare.py +++ b/tests/test_prepare.py @@ -2,7 +2,7 @@ import pytest -from fido.prepare import convert_to_regex +from fido.pronom.prepare import convert_to_regex def binrep_convert(byt): @@ -64,17 +64,17 @@ def test_bitmasks(pronom_bytesequence, matches_predicate): ("pronom_bytesequence", "input_", "matches_bool"), ( # These are good: - ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x02\x11\xFF", True), - ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x03\x11\xFF", True), - ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x02\xFE\xFF", True), + ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xdd\xcd\x02\x11\xff", True), + ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xdd\xcd\x03\x11\xff", True), + ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xdd\xcd\x02\xfe\xff", True), # Bad because missing three anythings between AB and CD - ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xCD\x02\x11\xFF", False), + ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xcd\x02\x11\xff", False), # Bad because not at start of string - ("ab{3}cd(01|02|03)~07ff", "\xDA\xAB\xDD\xDD\xDD\xCD\x02\x11\xFF", False), + ("ab{3}cd(01|02|03)~07ff", "\xda\xab\xdd\xdd\xdd\xcd\x02\x11\xff", False), # Bad because 04 is not in (01|02|03) - ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x04\x11\xFF", False), + ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xdd\xcd\x04\x11\xff", False), # Bad because 18 is not in ~07 - ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x02\x18\xFF", False), + ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xdd\xcd\x02\x18\xff", False), ), ) def test_heterogenous_sequences(pronom_bytesequence, input_, matches_bool):