-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #84 from CybercentreCanada/AL-2766
apply formatting and silence eml conversions
- Loading branch information
Showing
1 changed file
with
50 additions
and
43 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
|
||
try: | ||
from PIL import Image | ||
|
||
Image.MAX_IMAGE_PIXELS = 2147483647 | ||
except: | ||
print('[ERROR] pillow module not installed ("pip install pillow")') | ||
|
@@ -40,8 +41,8 @@ | |
__email__ = "[email protected]" | ||
__name__ = "EMLRender" | ||
|
||
textTypes = ['text/plain', 'text/html'] | ||
imageTypes = ['image/gif', 'image/jpeg', 'image/png'] | ||
textTypes = ["text/plain", "text/html"] | ||
imageTypes = ["image/gif", "image/jpeg", "image/png"] | ||
|
||
|
||
def appendImages(images): | ||
|
@@ -50,7 +51,7 @@ def appendImages(images): | |
|
||
new_width = max(widths) | ||
new_height = sum(heights) | ||
new_im = Image.new('RGB', (new_width, new_height), color=bgColor) | ||
new_im = Image.new("RGB", (new_width, new_height), color=bgColor) | ||
offset = 0 | ||
for im in images: | ||
# x = int((new_width - im.size[0])/2) | ||
|
@@ -61,58 +62,58 @@ def appendImages(images): | |
|
||
|
||
def processEml(data, output_dir, logger, load_ext_images=False, load_images=False): | ||
''' | ||
""" | ||
Process the email (bytes), extract MIME parts and useful headers. | ||
Generate a PNG picture of the mail | ||
''' | ||
""" | ||
msg = email.message_from_bytes(data) | ||
try: | ||
decode = email.header.decode_header(msg['Date'])[0] | ||
decode = email.header.decode_header(msg["Date"])[0] | ||
dateField = str(decode[0]) | ||
except: | ||
dateField = '<Unknown>' | ||
logger.info('Date: %s' % dateField) | ||
dateField = "<Unknown>" | ||
logger.info("Date: %s" % dateField) | ||
|
||
try: | ||
decode = email.header.decode_header(msg['From'])[0] | ||
decode = email.header.decode_header(msg["From"])[0] | ||
fromField = str(decode[0]) | ||
except: | ||
fromField = '<Unknown>' | ||
logger.info('From: %s' % fromField) | ||
fromField = fromField.replace('<', '<').replace('>', '>') | ||
fromField = "<Unknown>" | ||
logger.info("From: %s" % fromField) | ||
fromField = fromField.replace("<", "<").replace(">", ">") | ||
|
||
try: | ||
decode = email.header.decode_header(msg['To'])[0] | ||
decode = email.header.decode_header(msg["To"])[0] | ||
toField = str(decode[0]) | ||
except: | ||
toField = '<Unknown>' | ||
logger.info('To: %s' % toField) | ||
toField = toField.replace('<', '<').replace('>', '>') | ||
toField = "<Unknown>" | ||
logger.info("To: %s" % toField) | ||
toField = toField.replace("<", "<").replace(">", ">") | ||
|
||
try: | ||
decode = email.header.decode_header(msg['Subject'])[0] | ||
decode = email.header.decode_header(msg["Subject"])[0] | ||
subjectField = str(decode[0]) | ||
except: | ||
subjectField = '<Unknown>' | ||
logger.info('Subject: %s' % subjectField) | ||
subjectField = subjectField.replace('<', '<').replace('>', '>') | ||
subjectField = "<Unknown>" | ||
logger.info("Subject: %s" % subjectField) | ||
subjectField = subjectField.replace("<", "<").replace(">", ">") | ||
|
||
try: | ||
decode = email.header.decode_header(msg['Message-Id'])[0] | ||
decode = email.header.decode_header(msg["Message-Id"])[0] | ||
idField = str(decode[0]) | ||
except: | ||
idField = '<Unknown>' | ||
logger.info('Message-Id: %s' % idField) | ||
idField = idField.replace('<', '<').replace('>', '>') | ||
idField = "<Unknown>" | ||
logger.info("Message-Id: %s" % idField) | ||
idField = idField.replace("<", "<").replace(">", ">") | ||
|
||
imgkitOptions = {'load-error-handling': 'skip'} | ||
imgkitOptions = {"load-error-handling": "skip", "quiet": None} | ||
if not load_ext_images: | ||
imgkitOptions.update({'no-images': None, 'disable-javascript': None}) | ||
imgkitOptions.update({"no-images": None, "disable-javascript": None}) | ||
# imgkitOptions.update({ 'quiet': None }) | ||
imagesList = [] | ||
|
||
# Build a first image with basic mail details | ||
headers = ''' | ||
headers = """ | ||
<table width="100%%"> | ||
<tr><td align="right"><b>Date:</b></td><td>%s</td></tr> | ||
<tr><td align="right"><b>From:</b></td><td>%s</td></tr> | ||
|
@@ -121,33 +122,39 @@ def processEml(data, output_dir, logger, load_ext_images=False, load_images=Fals | |
<tr><td align="right"><b>Message-Id:</b></td><td>%s</td></tr> | ||
</table> | ||
<hr></p> | ||
''' % (dateField, fromField, toField, subjectField, idField) | ||
""" % ( | ||
dateField, | ||
fromField, | ||
toField, | ||
subjectField, | ||
idField, | ||
) | ||
try: | ||
header_path = NamedTemporaryFile(suffix=".png").name | ||
imgkit.from_string(headers, header_path, options=imgkitOptions) | ||
logger.info('Created headers %s' % header_path) | ||
logger.info("Created headers %s" % header_path) | ||
imagesList.append(header_path) | ||
except Exception as e: | ||
logger.warning(f'Creation of headers failed: {e}') | ||
logger.warning(f"Creation of headers failed: {e}") | ||
|
||
# | ||
# Main loop - process the MIME parts | ||
# | ||
for part in msg.walk(): | ||
mimeType = part.get_content_type() | ||
if part.is_multipart(): | ||
logger.info('Multipart found, continue') | ||
logger.info("Multipart found, continue") | ||
continue | ||
|
||
logger.info('Found MIME part: %s' % mimeType) | ||
logger.info("Found MIME part: %s" % mimeType) | ||
if mimeType in textTypes: | ||
try: | ||
# Fix formatting | ||
payload = part.get_payload(decode=True) | ||
payload = regex.sub(rb"(\r\n){1,}", b"\r\n", payload) | ||
payload = payload.replace(b"\r\n", b'<br>') | ||
payload = regex.sub(rb"(<br> ){2,}", b'<br><br>', payload) | ||
payload = quopri.decodestring(payload).decode('utf-8', errors="ignore") | ||
payload = payload.replace(b"\r\n", b"<br>") | ||
payload = regex.sub(rb"(<br> ){2,}", b"<br><br>", payload) | ||
payload = quopri.decodestring(payload).decode("utf-8", errors="ignore") | ||
except Exception as e: | ||
payload = str(quopri.decodestring(part.get_payload(decode=True)))[2:-1] | ||
|
||
|
@@ -159,31 +166,31 @@ def processEml(data, output_dir, logger, load_ext_images=False, load_images=Fals | |
try: | ||
payload_path = NamedTemporaryFile(suffix=".png").name | ||
imgkit.from_string(payload, payload_path, options=imgkitOptions) | ||
logger.info('Decoded %s' % payload_path) | ||
logger.info("Decoded %s" % payload_path) | ||
imagesList.append(payload_path) | ||
except Exception as e: | ||
logger.warning(f'Decoding this MIME part returned error: {e}') | ||
logger.warning(f"Decoding this MIME part returned error: {e}") | ||
|
||
elif mimeType in imageTypes and load_images: | ||
payload = part.get_payload(decode=False) | ||
payload_path = NamedTemporaryFile(suffix=".png").name | ||
imgdata = base64.b64decode(payload) | ||
try: | ||
with open(payload_path, 'wb') as f: | ||
with open(payload_path, "wb") as f: | ||
f.write(imgdata) | ||
logger.info('Decoded %s' % payload_path) | ||
logger.info("Decoded %s" % payload_path) | ||
imagesList.append(payload_path) | ||
except Exception as e: | ||
logger.warning(f'Decoding this MIME part returned error: {e}') | ||
logger.warning(f"Decoding this MIME part returned error: {e}") | ||
|
||
resultImage = os.path.join(output_dir, 'output.png') | ||
resultImage = os.path.join(output_dir, "output.png") | ||
if len(imagesList) > 0: | ||
images = list(map(Image.open, imagesList)) | ||
combo = appendImages(images) | ||
combo.save(resultImage) | ||
# Clean up temporary images | ||
for i in imagesList: | ||
os.remove(i) | ||
return(resultImage) | ||
return resultImage | ||
else: | ||
return(False) | ||
return False |