diff --git a/fido/fido.py b/fido/fido.py
index 9e42a5a..6e2abac 100755
--- a/fido/fido.py
+++ b/fido/fido.py
@@ -15,15 +15,16 @@
import tempfile
import zipfile
from contextlib import closing
-from time import perf_counter
from typing import Optional
-from xml.etree import cElementTree as ET
+
+from defusedxml import ElementTree as ET
from fido import CONFIG_DIR, __version__
-from fido.char_handler import escape
from fido.cli_args import parse_cli_args
from fido.package import OlePackage, ZipPackage
-from fido.versions import get_local_versions, sig_file_actions
+from fido.pronom.versions import get_local_versions, sig_file_actions
+from fido.utils.char_handler import escape
+from fido.utils.timer import PerfTimer
defaults = {
"config_dir": CONFIG_DIR,
@@ -50,22 +51,6 @@
}
-class PerfTimer:
- """Utility class that carries out simple process timings."""
-
- def __init__(self):
- """New instance with start time running."""
- self.start_time = perf_counter()
-
- def start(self):
- """Start new timer."""
- self.start_time = perf_counter()
-
- def duration(self):
- """Return the duration since instantiation or start() was last called."""
- return perf_counter() - self.start_time
-
-
class Fido:
"""Main FIDO application class."""
diff --git a/fido/prepare.py b/fido/pronom/prepare.py
similarity index 80%
rename from fido/prepare.py
rename to fido/pronom/prepare.py
index 7580bef..1409c09 100644
--- a/fido/prepare.py
+++ b/fido/pronom/prepare.py
@@ -12,10 +12,11 @@
from urllib.parse import urlparse
from urllib.request import urlopen
from xml.dom import minidom
-from xml.etree import ElementTree as ET
-from .char_handler import escape
-from .versions import get_local_versions
+from defusedxml import ElementTree as ET
+
+from fido.pronom.versions import get_local_versions
+from fido.utils.char_handler import escape
FLG_INCOMPATIBLE = "__INCOMPATIBLE_SIG__"
@@ -126,10 +127,10 @@ def load_pronom_xml(self, puid_filter=None):
"""
formats = []
try:
- zip = zipfile.ZipFile(self.pronom_files, "r")
- for item in zip.infolist():
+ pronom_collection = zipfile.ZipFile(self.pronom_files, "r")
+ for item in pronom_collection.infolist():
try:
- stream = zip.open(item)
+ stream = pronom_collection.open(item)
# Work is done here!
format_ = self.parse_pronom_xml(stream, puid_filter)
if format_ is not None:
@@ -138,12 +139,10 @@ def load_pronom_xml(self, puid_filter=None):
stream.close()
finally:
try:
- zip.close()
+ pronom_collection.close()
except Exception as e:
print(
- "An error occured loading '{0}' (exception: {1})".format(
- self.pronom_files, e
- ),
+ "An error occured loading '{0}' (exception: {1})".format(self.pronom_files, e),
file=sys.stderr,
)
sys.exit()
@@ -182,62 +181,46 @@ def parse_pronom_xml(self, source, puid_filter=None):
pronom_format = pronom_root.find(TNA("report_format_detail/FileFormat"))
fido_format = ET.Element("format")
# Get the base Format information
- for id in pronom_format.findall(TNA("FileFormatIdentifier")):
- type = get_text_tna(id, "IdentifierType")
- if type == "PUID":
- puid = get_text_tna(id, "Identifier")
+ for xml_id in pronom_format.findall(TNA("FileFormatIdentifier")):
+ xml_id_type = get_text_tna(xml_id, "IdentifierType")
+ if xml_id_type == "PUID":
+ puid = get_text_tna(xml_id, "Identifier")
ET.SubElement(fido_format, "puid").text = puid
if puid_filter and puid != puid_filter:
return None
# A bit clumsy. I want to have puid first, then mime, then container.
- for id in pronom_format.findall(TNA("FileFormatIdentifier")):
- type = get_text_tna(id, "IdentifierType")
- if type == "MIME":
- ET.SubElement(fido_format, "mime").text = get_text_tna(id, "Identifier")
- elif type == "PUID":
- puid = get_text_tna(id, "Identifier")
+ for xml_id in pronom_format.findall(TNA("FileFormatIdentifier")):
+ xml_id_type = get_text_tna(xml_id, "IdentifierType")
+ if xml_id_type == "MIME":
+ ET.SubElement(fido_format, "mime").text = get_text_tna(xml_id, "Identifier")
+ elif xml_id_type == "PUID":
+ puid = get_text_tna(xml_id, "Identifier")
if puid == "x-fmt/263":
ET.SubElement(fido_format, "container").text = "zip"
elif puid == "x-fmt/265":
ET.SubElement(fido_format, "container").text = "tar"
- ET.SubElement(fido_format, "name").text = get_text_tna(
- pronom_format, "FormatName"
- )
- ET.SubElement(fido_format, "version").text = get_text_tna(
- pronom_format, "FormatVersion"
- )
- ET.SubElement(fido_format, "alias").text = get_text_tna(
- pronom_format, "FormatAliases"
- )
- ET.SubElement(fido_format, "pronom_id").text = get_text_tna(
- pronom_format, "FormatID"
- )
+ ET.SubElement(fido_format, "name").text = get_text_tna(pronom_format, "FormatName")
+ ET.SubElement(fido_format, "version").text = get_text_tna(pronom_format, "FormatVersion")
+ ET.SubElement(fido_format, "alias").text = get_text_tna(pronom_format, "FormatAliases")
+ ET.SubElement(fido_format, "pronom_id").text = get_text_tna(pronom_format, "FormatID")
# Get the extensions from the ExternalSignature
for x in pronom_format.findall(TNA("ExternalSignature")):
ET.SubElement(fido_format, "extension").text = get_text_tna(x, "Signature")
- for id in pronom_format.findall(TNA("FileFormatIdentifier")):
- type = get_text_tna(id, "IdentifierType")
- if type == "Apple Uniform Type Identifier":
- ET.SubElement(fido_format, "apple_uti").text = get_text_tna(
- id, "Identifier"
- )
+ for xml_id in pronom_format.findall(TNA("FileFormatIdentifier")):
+ xml_id_type = get_text_tna(xml_id, "IdentifierType")
+ if xml_id_type == "Apple Uniform Type Identifier":
+ ET.SubElement(fido_format, "apple_uti").text = get_text_tna(xml_id, "Identifier")
# Handle the relationships
for x in pronom_format.findall(TNA("RelatedFormat")):
rel = get_text_tna(x, "RelationshipType")
if rel == "Has priority over":
- ET.SubElement(fido_format, "has_priority_over").text = get_text_tna(
- x, "RelatedFormatID"
- )
+ ET.SubElement(fido_format, "has_priority_over").text = get_text_tna(x, "RelatedFormatID")
# Get the InternalSignature information
for pronom_sig in pronom_format.findall(TNA("InternalSignature")):
fido_sig = ET.SubElement(fido_format, "signature")
- ET.SubElement(fido_sig, "name").text = get_text_tna(
- pronom_sig, "SignatureName"
- )
+ ET.SubElement(fido_sig, "name").text = get_text_tna(pronom_sig, "SignatureName")
# There are some funny chars in the notes, which caused me trouble and it is a unicode string,
- ET.SubElement(fido_sig, "note").text = get_text_tna(
- pronom_sig, "SignatureNote"
- )
+ ET.SubElement(fido_sig, "note").text = get_text_tna(pronom_sig, "SignatureNote")
for pronom_pat in pronom_sig.findall(TNA("ByteSequence")):
# print('Parsing ID:{}'.format(puid))
fido_pat = ET.SubElement(fido_sig, "pattern")
@@ -249,14 +232,10 @@ def parse_pronom_xml(self, source, puid_filter=None):
pass
# print "working on puid:", puid, ", position: ", pos, "with offset, maxoffset: ", offset, ",", max_offset
try:
- regex = convert_to_regex(
- byte_seq, "Little", pos, offset, max_offset
- )
+ regex = convert_to_regex(byte_seq, "Little", pos, offset, max_offset)
except ValueError as ve:
print(
- "ValueError converting PUID {} signature to regex: {}".format(
- puid, ve
- ),
+ "ValueError converting PUID {} signature to regex: {}".format(puid, ve),
file=sys.stderr,
)
regex = FLG_INCOMPATIBLE
@@ -264,9 +243,7 @@ def parse_pronom_xml(self, source, puid_filter=None):
# print "done puid", puid
if regex == FLG_INCOMPATIBLE:
print(
- "Error: incompatible PRONOM signature found for puid {} skipping...".format(
- puid
- ),
+ "Error: incompatible PRONOM signature found for puid {} skipping...".format(puid),
file=sys.stderr,
)
# remove the empty 'signature' nodes
@@ -280,80 +257,54 @@ def parse_pronom_xml(self, source, puid_filter=None):
ET.SubElement(fido_pat, "regex").text = regex
# Get the format details
fido_details = ET.SubElement(fido_format, "details")
- ET.SubElement(fido_details, "dc:description").text = get_text_tna(
- pronom_format, "FormatDescription"
- )
- ET.SubElement(fido_details, "dcterms:available").text = get_text_tna(
- pronom_format, "ReleaseDate"
- )
- ET.SubElement(fido_details, "dc:creator").text = get_text_tna(
- pronom_format, "Developers/DeveloperCompoundName"
- )
+ ET.SubElement(fido_details, "dc:description").text = get_text_tna(pronom_format, "FormatDescription")
+ ET.SubElement(fido_details, "dcterms:available").text = get_text_tna(pronom_format, "ReleaseDate")
+ ET.SubElement(fido_details, "dc:creator").text = get_text_tna(pronom_format, "Developers/DeveloperCompoundName")
ET.SubElement(fido_details, "dcterms:publisher").text = get_text_tna(
pronom_format, "Developers/OrganisationName"
)
for x in pronom_format.findall(TNA("RelatedFormat")):
rel = get_text_tna(x, "RelationshipType")
if rel == "Is supertype of":
- ET.SubElement(fido_details, "is_supertype_of").text = get_text_tna(
- x, "RelatedFormatID"
- )
+ ET.SubElement(fido_details, "is_supertype_of").text = get_text_tna(x, "RelatedFormatID")
for x in pronom_format.findall(TNA("RelatedFormat")):
rel = get_text_tna(x, "RelationshipType")
if rel == "Is subtype of":
- ET.SubElement(fido_details, "is_subtype_of").text = get_text_tna(
- x, "RelatedFormatID"
- )
- ET.SubElement(fido_details, "content_type").text = get_text_tna(
- pronom_format, "FormatTypes"
- )
+ ET.SubElement(fido_details, "is_subtype_of").text = get_text_tna(x, "RelatedFormatID")
+ ET.SubElement(fido_details, "content_type").text = get_text_tna(pronom_format, "FormatTypes")
# References
for x in pronom_format.findall(TNA("Document")):
r = ET.SubElement(fido_details, "reference")
ET.SubElement(r, "dc:title").text = get_text_tna(x, "TitleText")
- ET.SubElement(r, "dc:creator").text = get_text_tna(
- x, "Author/AuthorCompoundName"
- )
- ET.SubElement(r, "dc:publisher").text = get_text_tna(
- x, "Publisher/PublisherCompoundName"
- )
- ET.SubElement(r, "dcterms:available").text = get_text_tna(
- x, "PublicationDate"
- )
- for id in x.findall(TNA("DocumentIdentifier")):
- type = get_text_tna(id, "IdentifierType")
- if type == "URL":
- ET.SubElement(r, "dc:identifier").text = "http://" + get_text_tna(
- id, "Identifier"
- )
+ ET.SubElement(r, "dc:creator").text = get_text_tna(x, "Author/AuthorCompoundName")
+ ET.SubElement(r, "dc:publisher").text = get_text_tna(x, "Publisher/PublisherCompoundName")
+ ET.SubElement(r, "dcterms:available").text = get_text_tna(x, "PublicationDate")
+ for xml_id in x.findall(TNA("DocumentIdentifier")):
+ xml_id_type = get_text_tna(xml_id, "IdentifierType")
+ if xml_id_type == "URL":
+ ET.SubElement(r, "dc:identifier").text = "http://" + get_text_tna(xml_id, "Identifier")
else:
ET.SubElement(r, "dc:identifier").text = (
- get_text_tna(id, "IdentifierType")
- + ":"
- + get_text_tna(id, "Identifier")
+ get_text_tna(xml_id, "IdentifierType") + ":" + get_text_tna(xml_id, "Identifier")
)
ET.SubElement(r, "dc:description").text = get_text_tna(x, "DocumentNote")
ET.SubElement(r, "dc:type").text = get_text_tna(x, "DocumentType")
ET.SubElement(r, "dcterms:license").text = (
- get_text_tna(x, "AvailabilityDescription")
- + " "
- + get_text_tna(x, "AvailabilityNote")
+ get_text_tna(x, "AvailabilityDescription") + " " + get_text_tna(x, "AvailabilityNote")
)
ET.SubElement(r, "dc:rights").text = get_text_tna(x, "DocumentIPR")
# Examples
for x in pronom_format.findall(TNA("ReferenceFile")):
rf = ET.SubElement(fido_details, "example_file")
ET.SubElement(rf, "dc:title").text = get_text_tna(x, "ReferenceFileName")
- ET.SubElement(rf, "dc:description").text = get_text_tna(
- x, "ReferenceFileDescription"
- )
+ ET.SubElement(rf, "dc:description").text = get_text_tna(x, "ReferenceFileDescription")
checksum = ""
- for id in x.findall(TNA("ReferenceFileIdentifier")):
- type = get_text_tna(id, "IdentifierType")
- if type == "URL":
+ for xml_id in x.findall(TNA("ReferenceFileIdentifier")):
+ xml_id_type = get_text_tna(xml_id, "IdentifierType")
+ if xml_id_type == "URL":
# Starting with PRONOM 89, some URLs contain http://
# and others do not.
- url = get_text_tna(id, "Identifier")
+ url = get_text_tna(xml_id, "Identifier")
if not urlparse(url).scheme:
url = "http://" + url
ET.SubElement(rf, "dc:identifier").text = url
@@ -364,20 +315,14 @@ def parse_pronom_xml(self, source, puid_filter=None):
m.update(sock.read())
sock.close()
except HTTPError as http_excep:
- sys.stderr.write(
- "HTTP {} error loading resource {}\n".format(
- http_excep.code, url
- )
- )
+ sys.stderr.write("HTTP {} error loading resource {}\n".format(http_excep.code, url))
if http_excep.code == 404:
continue
checksum = m.hexdigest()
else:
ET.SubElement(rf, "dc:identifier").text = (
- get_text_tna(id, "IdentifierType")
- + ":"
- + get_text_tna(id, "Identifier")
+ get_text_tna(xml_id, "IdentifierType") + ":" + get_text_tna(xml_id, "Identifier")
)
ET.SubElement(rf, "dcterms:license").text = ""
ET.SubElement(rf, "dc:rights").text = get_text_tna(x, "ReferenceFileIPR")
@@ -387,18 +332,10 @@ def parse_pronom_xml(self, source, puid_filter=None):
# Record Metadata
md = ET.SubElement(fido_details, "record_metadata")
ET.SubElement(md, "status").text = "unknown"
- ET.SubElement(md, "dc:creator").text = get_text_tna(
- pronom_format, "ProvenanceName"
- )
- ET.SubElement(md, "dcterms:created").text = get_text_tna(
- pronom_format, "ProvenanceSourceDate"
- )
- ET.SubElement(md, "dcterms:modified").text = get_text_tna(
- pronom_format, "LastUpdatedDate"
- )
- ET.SubElement(md, "dc:description").text = get_text_tna(
- pronom_format, "ProvenanceDescription"
- )
+ ET.SubElement(md, "dc:creator").text = get_text_tna(pronom_format, "ProvenanceName")
+ ET.SubElement(md, "dcterms:created").text = get_text_tna(pronom_format, "ProvenanceSourceDate")
+ ET.SubElement(md, "dcterms:modified").text = get_text_tna(pronom_format, "LastUpdatedDate")
+ ET.SubElement(md, "dc:description").text = get_text_tna(pronom_format, "ProvenanceDescription")
return fido_format
# FIXME: I don't think that this quite works yet!
@@ -485,9 +422,7 @@ def do_byte(chars, i, littleendian, esc=True):
c2 = "0123456789ABCDEF".find(chars[i + 1].upper())
buf = StringIO()
if c1 < 0 or c2 < 0:
- raise Exception(
- _convert_err_msg("bad byte sequence", chars[i : i + 2], i, chars, buf)
- )
+ raise Exception(_convert_err_msg("bad byte sequence", chars[i : i + 2], i, chars, buf))
if littleendian:
val = chr(16 * c1 + c2)
else:
@@ -553,16 +488,12 @@ def calculate_repetition(char, pos, offset, maxoffset):
def do_all_bitmasks(chars, i, littleendian):
"""(byte & bitmask) == bitmask."""
- return do_any_all_bitmasks(
- chars, i, lambda byt, bitmask: ((byt & bitmask) == bitmask), littleendian
- )
+ return do_any_all_bitmasks(chars, i, lambda byt, bitmask: ((byt & bitmask) == bitmask), littleendian)
def do_any_bitmasks(chars, i, littleendian):
"""(byte & bitmask) != 0."""
- return do_any_all_bitmasks(
- chars, i, lambda byt, bitmask: ((byt & bitmask) != 0), littleendian
- )
+ return do_any_all_bitmasks(chars, i, lambda byt, bitmask: ((byt & bitmask) != 0), littleendian)
def do_any_all_bitmasks(chars, i, predicate, littleendian):
@@ -581,13 +512,7 @@ def do_any_all_bitmasks(chars, i, predicate, littleendian):
byt, inc = do_byte(chars, i + 1, littleendian, esc=False)
bitmask = ord(byt)
regex = "({})".format(
- "|".join(
- [
- "\\x" + hex(byte)[2:].zfill(2)
- for byte in range(0x100)
- if predicate(byte, bitmask)
- ]
- )
+ "|".join(["\\x" + hex(byte)[2:].zfill(2) for byte in range(0x100) if predicate(byte, bitmask)])
)
return regex, inc + 1
@@ -645,11 +570,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""):
elif chars[i] in "*+?":
state = "specials"
else:
- raise ValueError(
- _convert_err_msg(
- "Illegal character in start", chars[i], i, chars, buf
- )
- )
+ raise ValueError(_convert_err_msg("Illegal character in start", chars[i], i, chars, buf))
elif state == "bytes":
(byt, inc) = do_byte(chars, i, littleendian)
buf.write(byt)
@@ -684,11 +605,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""):
elif chars[i] == "]":
break
else:
- raise Exception(
- _convert_err_msg(
- "Illegal character in non-match", chars[i], i, chars, buf
- )
- )
+ raise Exception(_convert_err_msg("Illegal character in non-match", chars[i], i, chars, buf))
buf.write(")")
i += 1
state = "start"
@@ -714,11 +631,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""):
buf.write("]")
i += 1
except Exception:
- print(
- _convert_err_msg(
- "Illegal character in bracket", chars[i], i, chars, buf
- )
- )
+ print(_convert_err_msg("Illegal character in bracket", chars[i], i, chars, buf))
raise
if i < len(chars) and chars[i] == "{":
state = "curly-after-bracket"
@@ -761,9 +674,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""):
else:
raise Exception(
_convert_err_msg(
- (
- "Current state = '{0}' : Illegal character in paren"
- ).format(state),
+ ("Current state = '{0}' : Illegal character in paren").format(state),
chars[i],
i,
chars,
@@ -796,11 +707,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""):
elif chars[i] == "}":
break
else:
- raise Exception(
- _convert_err_msg(
- "Illegal character in curly", chars[i], i, chars, buf
- )
- )
+ raise Exception(_convert_err_msg("Illegal character in curly", chars[i], i, chars, buf))
buf.write("}")
i += 1 # skip the )
state = "start"
@@ -813,11 +720,7 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""):
i += 1
elif chars[i] == "?":
if chars[i + 1] != "?":
- raise Exception(
- _convert_err_msg(
- "Illegal character after ?", chars[i + 1], i + 1, chars, buf
- )
- )
+ raise Exception(_convert_err_msg("Illegal character after ?", chars[i + 1], i + 1, chars, buf))
buf.write(".?")
i += 2
state = "start"
@@ -833,18 +736,18 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""):
return val
-def run(input=None, output=None, puid=None):
+def run(input_file=None, output_file=None, puid=None):
"""Convert PRONOM formats into FIDO signatures."""
versions = get_local_versions()
- if input is None:
- input = versions.get_zip_file()
- if output is None:
- output = versions.get_signature_file()
+ if input_file is None:
+ input_file = versions.get_zip_file()
+ if output_file is None:
+ output_file = versions.get_signature_file()
- info = FormatInfo(input)
+ info = FormatInfo(input_file)
info.load_pronom_xml(puid)
- info.save(output)
+ info.save(output_file)
print(
"Converted {0} PRONOM formats to FIDO signatures".format(len(info.formats)),
file=sys.stderr,
@@ -856,19 +759,13 @@ def main(args=None):
if args is None:
args = sys.argv[1:]
- parser = ArgumentParser(
- description="Produce the FIDO format XML that is loaded at run-time"
- )
- parser.add_argument(
- "-input", default=None, help="Input file, a Zip containing PRONOM XML files"
- )
+ parser = ArgumentParser(description="Produce the FIDO format XML that is loaded at run-time")
+ parser.add_argument("-input", default=None, help="Input file, a Zip containing PRONOM XML files")
parser.add_argument("-output", default=None, help="Output file")
- parser.add_argument(
- "-puid", default=None, help="A particular PUID record to extract"
- )
+ parser.add_argument("-puid", default=None, help="A particular PUID record to extract")
args = parser.parse_args(args)
- run(input=args.input, output=args.output, puid=args.puid)
+ run(input_file=args.input, output_file=args.output, puid=args.puid)
if __name__ == "__main__":
diff --git a/fido/pronom/soap.py b/fido/pronom/soap.py
index 67d2a73..49ac707 100644
--- a/fido/pronom/soap.py
+++ b/fido/pronom/soap.py
@@ -19,10 +19,13 @@
PRONOM format signatures SOAP calls.
"""
+
import sys
import urllib
-import xml.etree.ElementTree as ET
from urllib.error import HTTPError, URLError
+from xml.etree import ElementTree as ET
+
+from defusedxml.ElementTree import fromstring
from fido import __version__
@@ -50,9 +53,7 @@
def get_sig_xml_for_puid(puid):
"""Return the full PRONOM signature XML for the passed PUID."""
- req = urllib.request.Request(
- "http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid)
- )
+ req = urllib.request.Request("http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid))
response = urllib.request.urlopen(req)
xml = response.read()
return xml
@@ -82,16 +83,12 @@ def get_droid_signatures(version):
format_count = False
try:
with urllib.request.urlopen(
- "https://www.nationalarchives.gov.uk/documents/DROID_SignatureFile_V{}.xml".format(
- version
- )
+ "https://www.nationalarchives.gov.uk/documents/DROID_SignatureFile_V{}.xml".format(version)
) as f:
xml = f.read().decode("utf-8")
- root_ele = ET.fromstring(xml)
+ root_ele = fromstring(xml)
format_count = len(
- root_ele.findall(
- ".//{http://www.nationalarchives.gov.uk/pronom/SignatureFile}FileFormat"
- )
+ root_ele.findall(".//{http://www.nationalarchives.gov.uk/pronom/SignatureFile}FileFormat")
)
except HTTPError as httpe:
sys.stderr.write(
@@ -105,27 +102,19 @@ def get_droid_signatures(version):
def _get_soap_ele_tree(soap_action):
soap_string = '{}<{} xmlns="{}" />'.format(
XML_PROC, NS.get("xsi"), NS.get("xsd"), NS.get("soap"), soap_action, PRONOM_NS
- ).encode(
- ENCODING
- )
+ ).encode(ENCODING)
soap_action = '"{}:{}In"'.format(PRONOM_NS, soap_action)
xml = _get_soap_response(soap_action, soap_string)
for prefix, uri in NS.items():
ET.register_namespace(prefix, uri)
- return ET.fromstring(xml)
+ return fromstring(xml)
def _get_soap_response(soap_action, soap_string):
try:
- req = urllib.request.Request(
- "http://{}/pronom/service.asmx".format(PRONOM_HOST), data=soap_string
- )
+ req = urllib.request.Request("http://{}/pronom/service.asmx".format(PRONOM_HOST), data=soap_string)
except URLError:
- print(
- "There was a problem contacting the PRONOM service at http://{}/pronom/service.asmx.".format(
- PRONOM_HOST
- )
- )
+ print("There was a problem contacting the PRONOM service at http://{}/pronom/service.asmx.".format(PRONOM_HOST))
print("Please check your network connection and try again.")
sys.exit(1)
for key, value in HEADERS.items():
diff --git a/fido/update_signatures.py b/fido/pronom/update_signatures.py
similarity index 92%
rename from fido/update_signatures.py
rename to fido/pronom/update_signatures.py
index 919dfad..a99a49a 100644
--- a/fido/update_signatures.py
+++ b/fido/pronom/update_signatures.py
@@ -19,17 +19,18 @@
import zipfile
from argparse import ArgumentParser
from shutil import rmtree
-from xml.etree import ElementTree as CET
-from . import CONFIG_DIR, __version__
-from .prepare import run as prepare_pronom_to_fido
-from .pronom.soap import (
+from defusedxml import ElementTree as CET
+from pronom.prepare import run as prepare_pronom_to_fido
+
+from fido import CONFIG_DIR, __version__
+from fido.pronom.soap import (
NS,
get_droid_signatures,
get_pronom_sig_version,
get_sig_xml_for_puid,
)
-from .versions import get_local_versions
+from fido.pronom.versions import get_local_versions
ABORT_MSG = "Aborting update..."
@@ -117,9 +118,7 @@ def sig_version_check(version="latest"):
print("Getting latest version number from PRONOM...")
version = get_pronom_sig_version()
if not version:
- sys.exit(
- "Failed to obtain PRONOM signature file version number, please try again."
- )
+ sys.exit("Failed to obtain PRONOM signature file version number, please try again.")
print("Querying PRONOM for signaturefile version {}.".format(version))
sig_file_name = _sig_file_name(version)
@@ -159,9 +158,7 @@ def init_sig_download(defaults):
resume = False
if os.path.isdir(tmpdir):
print("Found previously created temporary folder for download:", tmpdir)
- resume = query_yes_no(
- "Do you want to resume download (yes) or start over (no)?"
- )
+ resume = query_yes_no("Do you want to resume download (yes) or start over (no)?")
if resume:
print("Resuming download...")
else:
@@ -171,9 +168,7 @@ def init_sig_download(defaults):
except OSError:
pass
if not os.path.isdir(tmpdir):
- sys.stderr.write(
- "Failed to create temporary folder for PUID's, using: " + tmpdir
- )
+ sys.stderr.write("Failed to create temporary folder for PUID's, using: " + tmpdir)
return tmpdir, resume
@@ -187,9 +182,7 @@ def download_signatures(defaults, format_eles, resume, tmpdir):
download_sig(format_ele, tmpdir, resume, defaults)
numfiles += 1
print(
- r"Downloaded {}/{} files [{}%]".format(
- numfiles, puid_count, int(float(numfiles) / one_percent)
- ),
+ r"Downloaded {}/{} files [{}%]".format(numfiles, puid_count, int(float(numfiles) / one_percent)),
end="\r",
)
print("100%")
@@ -258,9 +251,7 @@ def update_versions_xml(version):
def main():
"""Main CLI entrypoint."""
- parser = ArgumentParser(
- description="Download and convert the latest PRONOM signatures"
- )
+ parser = ArgumentParser(description="Download and convert the latest PRONOM signatures")
parser.add_argument(
"-tmpdir",
default=OPTIONS["tmp_dir"],
diff --git a/fido/versions.py b/fido/pronom/versions.py
similarity index 83%
rename from fido/versions.py
rename to fido/pronom/versions.py
index 55fa220..25843d2 100644
--- a/fido/versions.py
+++ b/fido/pronom/versions.py
@@ -17,15 +17,14 @@
PRONOM is available from http://www.nationalarchives.gov.uk/pronom/
"""
-
import importlib.resources
import os
import re
import sys
-from xml.etree import ElementTree as ET
-from xml.etree.ElementTree import ParseError, parse
import requests
+from defusedxml import ElementTree as ET
+from defusedxml.ElementTree import ParseError, parse
from fido import CONFIG_DIR
@@ -87,9 +86,7 @@ def __setattr__(self, name, value):
def get_zip_file(self):
"""Obtain location to the PRONOM XML Zip file based on the current PRONOM version."""
- return os.path.join(
- self.conf_dir, "pronom-xml-v{}.zip".format(self.pronom_version)
- )
+ return os.path.join(self.conf_dir, "pronom-xml-v{}.zip".format(self.pronom_version))
def get_signature_file(self):
"""Obtain location to the current PRONOM signature file."""
@@ -101,9 +98,7 @@ def write(self):
for key, value in self.PROPS_MAPPING.items():
if self.root.find(value) is None:
raise ValueError("Field {} has not been defined!".format(key))
- self.tree.write(
- self.versions_file, xml_declaration=True, method="xml", encoding="utf-8"
- )
+ self.tree.write(self.versions_file, xml_declaration=True, method="xml", encoding="utf-8")
def get_local_versions(config_dir=CONFIG_DIR):
@@ -147,19 +142,11 @@ def _list_available_versions(update_url):
def _check_update_signatures(sig_vers, update_url, versions, is_update=False):
is_new, latest = _version_check(sig_vers, update_url)
if is_new:
- sys.stdout.write(
- "Updated signatures v{} are available, current version is v{}\n".format(
- latest, sig_vers
- )
- )
+ sys.stdout.write("Updated signatures v{} are available, current version is v{}\n".format(latest, sig_vers))
if is_update:
_output_details(latest, update_url, versions)
else:
- sys.stdout.write(
- "Your signature files are up to date, current version is v{}\n".format(
- sig_vers
- )
- )
+ sys.stdout.write("Your signature files are up to date, current version is v{}\n".format(sig_vers))
sys.exit(0)
@@ -169,23 +156,15 @@ def _download_sig_version(sig_act, update_url, versions):
if not match:
sys.exit(
- '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format(
- sig_act
- )
+ '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format(sig_act)
)
ver = sig_act
if not ver.startswith("v"):
ver = "v" + sig_act
resp = requests.get(update_url + "format/" + ver + "/")
if resp.status_code != 200:
- sys.exit(
- "No signature files found for {}, REST status {}".format(
- sig_act, resp.status_code
- )
- )
- _output_details(
- re.search(r"\d+|$", ver).group(), update_url, versions
- ) # noqa: W605
+ sys.exit("No signature files found for {}, REST status {}".format(sig_act, resp.status_code))
+ _output_details(re.search(r"\d+|$", ver).group(), update_url, versions) # noqa: W605
def _get_version(ver_string):
@@ -193,9 +172,7 @@ def _get_version(ver_string):
match = re.search(r"^v?(\d+)$", ver_string, re.IGNORECASE)
if not match:
sys.exit(
- '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format(
- ver_string
- )
+ '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format(ver_string)
)
ver = ver_string
return ver_string if not ver.startswith("v") else ver_string[1:]
@@ -214,18 +191,14 @@ def _output_details(version, update_url, versions):
def _version_check(sig_ver, update_url):
resp = requests.get(update_url + "format/latest/")
if resp.status_code != 200:
- sys.exit(
- "Error getting latest version info: HTTP Status {}".format(resp.status_code)
- )
+ sys.exit("Error getting latest version info: HTTP Status {}".format(resp.status_code))
root_ele = ET.fromstring(resp.text)
latest = _get_version(root_ele.get("version"))
return int(latest) > int(sig_ver), latest
def _write_sigs(latest, update_url, type, name_template):
- sig_out = str(
- importlib.resources.files("fido").joinpath("conf", name_template.format(latest))
- )
+ sig_out = str(importlib.resources.files("fido").joinpath("conf", name_template.format(latest)))
if os.path.exists(sig_out):
return
resp = requests.get(update_url + "format/{0}/{1}/".format(latest, type))
diff --git a/fido/toxml.py b/fido/toxml.py
index ca1905a..9e240da 100644
--- a/fido/toxml.py
+++ b/fido/toxml.py
@@ -22,8 +22,9 @@
import csv
import sys
+from fido.pronom.versions import get_local_versions
+
from . import __version__
-from .versions import get_local_versions
def main():
@@ -34,9 +35,7 @@ def main():
{0}
{1}
- """.format(
- __version__, get_local_versions().pronom_version
- )
+ """.format(__version__, get_local_versions().pronom_version)
)
reader = csv.reader(sys.stdin)
@@ -54,9 +53,7 @@ def main():
{6}
{7}
{8}
- """.format(
- row[6], row[0], row[8], row[1], row[2], row[7], row[3], row[4], row[5]
- )
+ """.format(row[6], row[0], row[8], row[1], row[2], row[7], row[3], row[4], row[5])
)
sys.stdout.write("\n\n")
diff --git a/fido/utils/__init__.py b/fido/utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/fido/char_handler.py b/fido/utils/char_handler.py
similarity index 100%
rename from fido/char_handler.py
rename to fido/utils/char_handler.py
diff --git a/fido/utils/timer.py b/fido/utils/timer.py
new file mode 100644
index 0000000..af1cbb1
--- /dev/null
+++ b/fido/utils/timer.py
@@ -0,0 +1,17 @@
+from time import perf_counter
+
+
+class PerfTimer:
+ """Utility class that carries out simple process timings."""
+
+ def __init__(self):
+ """New instance with start time running."""
+ self.start_time = perf_counter()
+
+ def start(self):
+ """Start new timer."""
+ self.start_time = perf_counter()
+
+ def duration(self):
+ """Return the duration since instantiation or start() was last called."""
+ return perf_counter() - self.start_time
diff --git a/pyproject.toml b/pyproject.toml
index 07ef2c8..dc306e9 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -30,7 +30,8 @@ classifiers = [
dependencies = [
"olefile >= 0.46, < 1",
- "requests",
+ "requests >= 2",
+ "defusedxml >= 0.7"
]
[project.urls]
@@ -45,9 +46,9 @@ testing = [
[project.scripts]
fido = "fido.fido:main"
-fido-prepare = "fido.prepare:main"
+fido-prepare = "fido.pronom.prepare:main"
fido-toxml = "fido.toxml:main"
-
+fido-update-signatures = "fido.pronom.update_signatures:run"
[tool.setuptools.package-data]
"fido" = ["*.*", "conf/*.*", "pronom/*.*"]
diff --git a/tests/test_fido.py b/tests/test_fido.py
index e0c256e..420dbd5 100644
--- a/tests/test_fido.py
+++ b/tests/test_fido.py
@@ -7,7 +7,8 @@
import pytest
-from fido.fido import Fido, PerfTimer
+from fido.fido import Fido
+from fido.utils.timer import PerfTimer
def test_perf_timer():
diff --git a/tests/test_prepare.py b/tests/test_prepare.py
index 752fcd3..10cbbad 100644
--- a/tests/test_prepare.py
+++ b/tests/test_prepare.py
@@ -2,7 +2,7 @@
import pytest
-from fido.prepare import convert_to_regex
+from fido.pronom.prepare import convert_to_regex
def binrep_convert(byt):
@@ -64,17 +64,17 @@ def test_bitmasks(pronom_bytesequence, matches_predicate):
("pronom_bytesequence", "input_", "matches_bool"),
(
# These are good:
- ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x02\x11\xFF", True),
- ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x03\x11\xFF", True),
- ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x02\xFE\xFF", True),
+ ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xdd\xcd\x02\x11\xff", True),
+ ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xdd\xcd\x03\x11\xff", True),
+ ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xdd\xcd\x02\xfe\xff", True),
# Bad because missing three anythings between AB and CD
- ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xCD\x02\x11\xFF", False),
+ ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xcd\x02\x11\xff", False),
# Bad because not at start of string
- ("ab{3}cd(01|02|03)~07ff", "\xDA\xAB\xDD\xDD\xDD\xCD\x02\x11\xFF", False),
+ ("ab{3}cd(01|02|03)~07ff", "\xda\xab\xdd\xdd\xdd\xcd\x02\x11\xff", False),
# Bad because 04 is not in (01|02|03)
- ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x04\x11\xFF", False),
+ ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xdd\xcd\x04\x11\xff", False),
# Bad because 18 is not in ~07
- ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x02\x18\xFF", False),
+ ("ab{3}cd(01|02|03)~07ff", "\xab\xdd\xdd\xdd\xcd\x02\x18\xff", False),
),
)
def test_heterogenous_sequences(pronom_bytesequence, input_, matches_bool):