This repository has been archived by the owner on Jun 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
test_demo.py
90 lines (77 loc) · 3.19 KB
/
test_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from __future__ import annotations
import glob
import json
import logging
from pathlib import Path
from time import perf_counter
from tungsten import SdsQueryFieldName, SigmaAldrichFieldMapper
from tungsten.parsers.supplier.sigma_aldrich.sds_parser import (
SigmaAldrichSdsParser
)
# from tungsten.parsers.supplier.sigma_aldrich.table_injector import (
# SigmaAldrichTableInjector
# )
def main():
formatter = MultiLineFormatter(
fmt=u"[%(asctime)s][%(levelname)s][%(name)s] - %(message)s",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.setStream(open("table.log", "w", encoding="utf-8"))
# logging.basicConfig(level=logging.DEBUG, handlers=[handler])
logging.basicConfig(level=logging.INFO, handlers=[handler])
parser = SigmaAldrichSdsParser()
# table_parser = SigmaAldrichTableInjector()
field_mapper = SigmaAldrichFieldMapper()
fields = [
SdsQueryFieldName.PRODUCT_NAME,
SdsQueryFieldName.PRODUCT_NUMBER,
SdsQueryFieldName.CAS_NUMBER,
SdsQueryFieldName.PRODUCT_BRAND,
SdsQueryFieldName.RECOMMENDED_USE_AND_RESTRICTIONS,
SdsQueryFieldName.SUPPLIER_ADDRESS,
SdsQueryFieldName.SUPPLIER_TELEPHONE,
SdsQueryFieldName.SUPPLIER_FAX,
SdsQueryFieldName.EMERGENCY_TELEPHONE,
SdsQueryFieldName.IDENTIFICATION_OTHER,
SdsQueryFieldName.SUBSTANCE_CLASSIFICATION,
SdsQueryFieldName.PICTOGRAM,
SdsQueryFieldName.SIGNAL_WORD,
SdsQueryFieldName.STATEMENTS,
SdsQueryFieldName.HNOC_HAZARD,
]
for filename in glob.glob(str(Path('msds', '*.pdf').absolute())):
file_start_time = perf_counter()
logging.info(f"Processing {filename}")
with open(filename, "rb") as f:
parsed = parser.parse_to_ghs_sds(f)
with open(Path('msds', 'output',
str(Path(filename).relative_to(Path('msds').absolute())) + ".json"),
'w') as fw:
parsed.dump(fw)
with open(Path('msds', 'mapped',
str(Path(filename).relative_to(Path('msds').absolute())) + ".json"),
'w') as fw:
temp = {}
for field in fields:
temp[field.name] = field_mapper.get_field(field, json.loads(parsed.dumps()))
json.dump(temp, fw)
# table_parser.generate_injections(f)
logging.info(f'Parse complete in {perf_counter() - file_start_time} seconds.')
class MultiLineFormatter(logging.Formatter):
def format(self, record):
header: str = super().format(
logging.LogRecord(
name=record.name,
level=record.levelno,
pathname=record.pathname,
lineno=record.lineno,
msg="", # omit record.msg
args=(), # omit record.args
exc_info=None # omit record.exc_info
))
first, *trailing = super().format(record).splitlines(keepends=False)
return first + ('\n' if len(trailing) else '') + ''.join(
f"{header[0:-2]} {line}\n" for line in trailing)[0:-1]
if __name__ == "__main__":
main()