diff --git a/oletools/crypto.py b/oletools/crypto.py index 9079ae91c..3eb5eda54 100644 --- a/oletools/crypto.py +++ b/oletools/crypto.py @@ -328,7 +328,7 @@ def decrypt(filename, passwords=None, **temp_file_args): None :type passwords: iterable or str or None :param temp_file_args: arguments for :py:func:`tempfile.mkstemp` e.g., - `dirname` or `prefix`. `suffix` will default to + `dir` or `prefix`. `suffix` will default to suffix of input `filename`, `prefix` defaults to `oletools-decrypt-`; `text` will be ignored :returns: name of the decrypted temporary file (type str) or `None` @@ -385,6 +385,7 @@ def decrypt(filename, passwords=None, **temp_file_args): crypto_file.decrypt(write_handle) # decryption was successfull; clean up and return + log.debug('Decrypt successfull') write_handle.close() write_handle = None break @@ -399,6 +400,6 @@ def decrypt(filename, passwords=None, **temp_file_args): if decrypt_file and isfile(decrypt_file): os.unlink(decrypt_file) decrypt_file = None - # if we reach this, all passwords were tried without success - log.debug('All passwords failed') + if decrypt_file is None: + log.debug('All passwords failed') return decrypt_file diff --git a/oletools/oleobj.py b/oletools/oleobj.py index 9f67752ea..247daf3cf 100644 --- a/oletools/oleobj.py +++ b/oletools/oleobj.py @@ -72,6 +72,7 @@ PptRecordExOleVbaActiveXAtom) from oletools.ooxml import XmlParser from oletools.common.io_encoding import ensure_stdout_handles_unicode +from oletools import crypto # ----------------------------------------------------------------------------- # CHANGELOG: @@ -162,13 +163,21 @@ def get_logger(name, level=logging.CRITICAL+1): log = get_logger('oleobj') # pylint: disable=invalid-name -def enable_logging(): +def enable_logging(olefile_logging=False): """ Enable logging for this module (disabled by default). + This will set the module-specific logger level to NOTSET, which means the main application controls the actual logging level. + + Also enables logging in crypto and optionally in olefile with param + `olefile_logging`. """ log.setLevel(logging.NOTSET) + crypto.enable_logging() + # todo: enable logging in ooxml and ppt_record_parser as well + if olefile_logging: + olefile.enable_logging() # === CONSTANTS =============================================================== @@ -501,6 +510,77 @@ def parse(self, data): self.extra_data = data[index+self.data_size:] +class FakeFile(io.RawIOBase): + """ create file-like object from data without copying it + + BytesIO is what I would like to use but it copies all the data. This class + does not. On the downside: data can only be read and seeked, not written. + + Assume that given data is bytes (str in py2, bytes in py3). + + See also (and maybe can put into common file with): + ppt_record_parser.IterStream, ooxml.ZipSubFile + """ + + def __init__(self, data): + """ create FakeFile with given bytes data """ + super(FakeFile, self).__init__() + self.data = data # this does not actually copy (python is lazy) + self.pos = 0 + self.size = len(data) + + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return True + + def readinto(self, target): + """ read into pre-allocated target """ + n_data = min(len(target), self.size-self.pos) + if n_data == 0: + return 0 + target[:n_data] = self.data[self.pos:self.pos+n_data] + self.pos += n_data + return n_data + + def read(self, n_data=-1): + """ read and return data """ + if self.pos >= self.size: + return bytes() + if n_data == -1: + n_data = self.size - self.pos + result = self.data[self.pos:self.pos+n_data] + self.pos += n_data + return result + + def seek(self, pos, offset=io.SEEK_SET): + """ jump to another position in file """ + # calc target position from self.pos, pos and offset + if offset == io.SEEK_SET: + new_pos = pos + elif offset == io.SEEK_CUR: + new_pos = self.pos + pos + elif offset == io.SEEK_END: + new_pos = self.size + pos + else: + raise ValueError("invalid offset {0}, need SEEK_* constant" + .format(offset)) + if new_pos < 0: + raise IOError('Seek beyond start of file not allowed') + self.pos = new_pos + + def tell(self): + """ tell where in file we are positioned """ + return self.pos + + +# === FUNCTIONS =============================================================== + + def shorten_filename(fname, max_len): """Create filename shorter than max_len, trying to preserve suffix.""" # simple cases: @@ -525,7 +605,7 @@ def shorten_filename(fname, max_len): def sanitize_filename(filename, replacement='_', max_len=MAX_FILENAME_LENGTH): """ - Return filename that is save to work with. + Return filename that is safe to work with. Removes path components, replaces all non-whitelisted characters (so output is always a pure-ascii string), replaces '..' and ' ' and shortens to @@ -653,74 +733,6 @@ def find_ole_in_ppt(filename): ppt_file.close() -class FakeFile(io.RawIOBase): - """ create file-like object from data without copying it - - BytesIO is what I would like to use but it copies all the data. This class - does not. On the downside: data can only be read and seeked, not written. - - Assume that given data is bytes (str in py2, bytes in py3). - - See also (and maybe can put into common file with): - ppt_record_parser.IterStream, ooxml.ZipSubFile - """ - - def __init__(self, data): - """ create FakeFile with given bytes data """ - super(FakeFile, self).__init__() - self.data = data # this does not actually copy (python is lazy) - self.pos = 0 - self.size = len(data) - - def readable(self): - return True - - def writable(self): - return False - - def seekable(self): - return True - - def readinto(self, target): - """ read into pre-allocated target """ - n_data = min(len(target), self.size-self.pos) - if n_data == 0: - return 0 - target[:n_data] = self.data[self.pos:self.pos+n_data] - self.pos += n_data - return n_data - - def read(self, n_data=-1): - """ read and return data """ - if self.pos >= self.size: - return bytes() - if n_data == -1: - n_data = self.size - self.pos - result = self.data[self.pos:self.pos+n_data] - self.pos += n_data - return result - - def seek(self, pos, offset=io.SEEK_SET): - """ jump to another position in file """ - # calc target position from self.pos, pos and offset - if offset == io.SEEK_SET: - new_pos = pos - elif offset == io.SEEK_CUR: - new_pos = self.pos + pos - elif offset == io.SEEK_END: - new_pos = self.size + pos - else: - raise ValueError("invalid offset {0}, need SEEK_* constant" - .format(offset)) - if new_pos < 0: - raise IOError('Seek beyond start of file not allowed') - self.pos = new_pos - - def tell(self): - """ tell where in file we are positioned """ - return self.pos - - def find_ole(filename, data, xml_parser=None): """ try to open somehow as zip/ole/rtf/... ; yield None if fail @@ -832,32 +844,28 @@ def find_customUI(xml_parser): yield customui_onload -def process_file(filename, data, output_dir=None): +def process_file(filename, data, output_dir): """ find embedded objects in given file if data is given (from xglob for encrypted zip files), then filename is not used for reading. If not (usual case), then data is read from filename on demand. - If output_dir is given and does not exist, it is created. If it is not - given, data is saved to same directory as the input file. - """ - # sanitize filename, leave space for embedded filename part - sane_fname = sanitize_filename(filename, max_len=MAX_FILENAME_LENGTH-5) or\ - 'NONAME' - if output_dir: - if not os.path.isdir(output_dir): - log.info('creating output directory %s', output_dir) - os.mkdir(output_dir) - - fname_prefix = os.path.join(output_dir, sane_fname) - else: - base_dir = os.path.dirname(filename) - fname_prefix = os.path.join(base_dir, sane_fname) + If an error occurs, it is caught locally so analysis on the file can + proceed. Return values incidate whether there was any error. + + `output_dir` is created on demand. + :returns: err_stream, err_dumping, did_dump (all three booleans) + """ # TODO: option to extract objects to files (false by default) print('-'*79) print('File: %r' % filename) + + # sanitize filename, leave space for embedded filename part + sane_fname = sanitize_filename(filename, max_len=MAX_FILENAME_LENGTH-5) or \ + 'NONAME' + fname_prefix = os.path.join(output_dir, sane_fname) index = 1 # do not throw errors but remember them and try continue with other streams @@ -913,13 +921,19 @@ def process_file(filename, data, output_dir=None): print(u'Filename = "%s"' % opkg.filename) print(u'Source path = "%s"' % opkg.src_path) print(u'Temp path = "%s"' % opkg.temp_path) + fname = 'oleobj-extracted-file' for embedded_fname in get_sane_embedded_filenames( opkg.filename, opkg.src_path, opkg.temp_path, - MAX_FILENAME_LENGTH - len(sane_fname) - 1, index): + MAX_FILENAME_LENGTH - len(fname_prefix) - 1, index): fname = fname_prefix + '_' + embedded_fname if not os.path.isfile(fname): break + # create output dir + if not os.path.isdir(output_dir): + log.info('creating output directory %s', output_dir) + os.mkdir(output_dir) + # dump try: print('saving to file %s' % fname) @@ -948,6 +962,67 @@ def process_file(filename, data, output_dir=None): return err_stream, err_dumping, did_dump +def process_crypto_wrapper(filename, data, output_dir, passwords=None, + crypto_nesting=0): + """ + Wrapper around `process_file`, that also tries to decrypt file. + + Args `filename`, `data` and `output_dir` same as for + :py:func:`process_file`. Arg `passwords` is forwarded to + :py:func:`crypt.decrypt_file`, param `crypto_nesting` prevents too deep + recursions into decrypted files. + + Returns `(err_stream, err_dumping, did_dump)` just like `process_file`. + + Failure to decrypt an encrypted file is treated like a `err_stream`. + Successfull decryption like `did_dump`, decrypted file is named and treated + like other dumps from :py:func:`process_file`. + """ + err_stream = False + err_dumping = False + did_dump = False + try: + err_stream, err_dumping, did_dump = \ + process_file(filename, data, output_dir) + log.debug('Checking for encryption') + if not crypto.is_encrypted(filename): + log.debug('No encryption found') + return err_stream, err_dumping, did_dump + except Exception: + log.debug('Checking for encryption (after exception)') + if not crypto.is_encrypted(filename): + raise + # we reach this point only if file is encrypted + # check if this is an encrypted file in an encrypted file in an ... + if crypto_nesting >= crypto.MAX_NESTING_DEPTH: + raise crypto.MaxCryptoNestingReached(crypto_nesting, filename) + + # decrypt + if not os.path.isdir(output_dir): + log.info('creating output directory %s', output_dir) + os.mkdir(output_dir) + prefix = '{}_decrypt-{}'.format(sanitize_filename(filename), + crypto_nesting or '') # (avoid the '0') + log.debug('Checking encryption passwords {}'.format(passwords)) + decrypted_file = crypto.decrypt(filename, passwords, + dir=output_dir, prefix=prefix) + if decrypted_file is None: + log.error('Decrypt failed, run with debug output to get details') + err_stream = True + return err_stream, err_dumping, did_dump + + # might still be encrypted, so call this again recursively + log.info('Working on decrypted file') + new_err_stream, new_err_dumping, new_did_dump = \ + process_crypto_wrapper(decrypted_file, data, output_dir, passwords, + crypto_nesting+1) + err_stream |= new_err_stream + err_dumping |= new_err_dumping + did_dump = True # we treat the decrypted result like we did dump + + return err_stream, err_dumping, did_dump + + # === MAIN ==================================================================== @@ -958,20 +1033,8 @@ def existing_file(filename): return filename -def main(cmd_line_args=None): - """ main function, called when running this as script - - Per default (cmd_line_args=None) uses sys.argv. For testing, however, can - provide other arguments. - """ - # print banner with version - ensure_stdout_handles_unicode() - print('oleobj %s - http://decalage.info/oletools' % __version__) - print('THIS IS WORK IN PROGRESS - Check updates regularly!') - print('Please report any issue at ' - 'https://github.com/decalage2/oletools/issues') - print('') - +def parse_args(cmd_line_args=None): + """Parse command line arguments.""" usage = 'usage: %(prog)s [options] [filename2 ...]' parser = argparse.ArgumentParser(usage=usage) # parser.add_argument('-o', '--outfile', dest='outfile', @@ -996,6 +1059,8 @@ def main(cmd_line_args=None): default=DEFAULT_LOG_LEVEL, help='logging level debug/info/warning/error/critical ' '(default=%(default)s)') + parser.add_argument('-p', '--passwords', type=str, nargs='*', + help='Passwords to try when encryption is found.') parser.add_argument('input', nargs='*', type=existing_file, metavar='FILE', help='Office files to parse (same as -i)') @@ -1013,10 +1078,31 @@ def main(cmd_line_args=None): if options.verbose: options.loglevel = 'debug' - # Print help if no arguments are passed + # Print help and return fail status if no input files were given if not options.input: parser.print_help() - return RETURN_ERR_ARGS + sys.exit(RETURN_ERR_ARGS) + + return options + + +def main(cmd_line_args=None): + """ + Main function, called when running this as script + + Per default (cmd_line_args=None) uses sys.argv. For testing, however, can + provide other arguments. + """ + # print banner with version + ensure_stdout_handles_unicode() + print('oleobj %s - http://decalage.info/oletools' % __version__) + print('THIS IS WORK IN PROGRESS - Check updates regularly!') + print('Please report any issue at ' + 'https://github.com/decalage2/oletools/issues') + print('') + + # parse command line arguments + options = parse_args(cmd_line_args) # Setup logging to the console: # here we use stdout instead of stderr by default, so that the output @@ -1024,9 +1110,7 @@ def main(cmd_line_args=None): logging.basicConfig(level=LOG_LEVELS[options.loglevel], stream=sys.stdout, format='%(levelname)-8s %(message)s') # enable logging in the modules: - log.setLevel(logging.NOTSET) - if options.loglevel == 'debug-olefile': - olefile.enable_logging() + enable_logging(options.loglevel == 'debug-olefile') # remember if there was a problem and continue with other data any_err_stream = False @@ -1040,8 +1124,14 @@ def main(cmd_line_args=None): # ignore directory names stored in zip files: if container and filename.endswith('/'): continue + + if options.output_dir: + output_dir = options.output_dir + else: + output_dir = os.path.dirname(filename) err_stream, err_dumping, did_dump = \ - process_file(filename, data, options.output_dir) + process_crypto_wrapper(filename, data, output_dir, + options.passwords) any_err_stream |= err_stream any_err_dumping |= err_dumping any_did_dump |= did_dump diff --git a/tests/oleobj/test_basic.py b/tests/oleobj/test_basic.py index 3fdcab037..3680eab68 100644 --- a/tests/oleobj/test_basic.py +++ b/tests/oleobj/test_basic.py @@ -1,9 +1,9 @@ """ Test oleobj basic functionality """ - +import sys import unittest from tempfile import mkdtemp from shutil import rmtree -from os.path import join, isfile +from os.path import join, isfile, basename from hashlib import md5 from glob import glob @@ -11,6 +11,7 @@ from tests.test_utils import DATA_BASE_DIR, call_and_capture from oletools import oleobj from oletools.common.io_encoding import ensure_stdout_handles_unicode +from oletools.crypto import check_msoffcrypto #: provide some more info to find errors @@ -158,6 +159,38 @@ def test_non_streamed(self): return self.do_test_md5(['-d', self.temp_dir], test_fun=preread_file, only_run_every=4) + @unittest.skipIf(not check_msoffcrypto(), + 'Module msoffcrypto not installed for {}' + .format(basename(sys.executable))) + def test_decrypt(self): + """Check that decryption in oleobj works.""" + sample_name = 'embedded-jpg-standardpassword.docx' + embedded_name = 'nature-home-spring-desktop-background-images.jpg' + expect_hash = '82d6e5fee5925b2148f06343da4018d2' + args = ['-d', self.temp_dir, + join(DATA_BASE_DIR, 'oleobj', sample_name), ] + output, ret_val = call_and_capture('oleobj', args, + accept_nonzero_exit=True) + self.assertEqual(ret_val, oleobj.RETURN_DID_DUMP, + msg='Wrong return value {} for {}. Output:\n{}' + .format(ret_val, sample_name, output)) + expect_name = glob(join(self.temp_dir, + sample_name + '_decrypt-*_' + embedded_name)) + if not expect_name: + self.did_fail = True + self.fail('Sample not created from {0}. Output:\n{1}' + .format(sample_name, output)) + elif len(expect_name) > 1: + self.did_fail = True + self.fail('Too many samples ({0}) created from {1}. Output:\n{2}' + .format(len(expect_name), sample_name, output)) + expect_name = expect_name[0] + md5_hash = calc_md5(expect_name) + if md5_hash != expect_hash: + self.did_fail = True + self.fail('Wrong md5 {0} of {1} from {2}. Output:\n{3}' + .format(md5_hash, expect_name, sample_name, output)) + class TestSaneFilenameCreation(unittest.TestCase): """ Test sanitization / creation of sane filenames """ diff --git a/tests/test-data/oleobj/embedded-jpg-standardpassword.docx b/tests/test-data/oleobj/embedded-jpg-standardpassword.docx new file mode 100644 index 000000000..ebf07d837 Binary files /dev/null and b/tests/test-data/oleobj/embedded-jpg-standardpassword.docx differ