From 89ff4070bd7745a2050e8fdc60a33ad295101a25 Mon Sep 17 00:00:00 2001 From: Mustafa Kemal Gilor Date: Fri, 2 Jun 2023 13:23:04 +0300 Subject: [PATCH] searchkit/constraints: rewrite of binary search algorithm Implemented a new binary search algorithm that no longer needs filemarkers or knowing the lines beforehand, which reduces the time spent applying a SearchConstraintSearchSince to a file, especially if the file is large in size. Removed the following classes which are no longer necessary: - SkipRange - SkipRangeOverlapException - BinarySearchState - FileMarkers (and respective unit tests) - SeekInfo Removed `test_logs_since_junk_not_allow_unverifiable` test case since we're no longer parsing all lines in the file. Removed following functions from BinarySeekSearchBase: - _seek_and_validate - _check_line - _seek_next Introduced the following new classes: - LogFileDateSinceOffsetSeeker (the main binary search class) - DateSearchFailedAtOffset (exception type) - NoLogsFoundSince (exception type) - NoDateFoundInLogs (exception type) Signed-off-by: Mustafa Kemal Gilor --- searchkit/constraints.py | 1173 ++++++++++++++----------- tests/unit/test_search.py | 22 - tests/unit/test_search_constraints.py | 662 +++++++++++--- 3 files changed, 1198 insertions(+), 659 deletions(-) diff --git a/searchkit/constraints.py b/searchkit/constraints.py index 4fe3123..4a0fa0f 100644 --- a/searchkit/constraints.py +++ b/searchkit/constraints.py @@ -1,14 +1,12 @@ import abc -import hashlib -import os import re -import tempfile import uuid +import bisect +from enum import Enum from datetime import datetime, timedelta from functools import cached_property -from searchkit.utils import MPCacheSharded from searchkit.log import log @@ -109,578 +107,693 @@ def __repr__(self): """ provide string repr of this object. """ -class SkipRangeOverlapException(Exception): - def __init__(self, ln): - msg = ("the current and previous skip ranges overlap which " - "suggests that we have re-entered a range by skipping " - "line {}.".format(ln)) - super().__init__(msg) +class BinarySeekSearchBase(ConstraintBase): + """ + Provides a way to seek to a point in a file using a binary search and a + given condition. + """ + def __init__(self, allow_constraints_for_unverifiable_logs=True): + self.allow_unverifiable_logs = allow_constraints_for_unverifiable_logs -class SkipRange(object): - # skip directions - SKIP_BWD = 0 - SKIP_FWD = 1 + @abc.abstractmethod + def extracted_datetime(self, line): + """ + Extract timestamp from start of line. - def __init__(self): - self.direction = self.SKIP_BWD - self.current = set() - self.prev = set() + @param line: text line to extract a datetime from. + @return: datetime.datetime object or None + """ - def re_entered(self): - if self.prev.intersection(self.current): - return True + @abc.abstractproperty + def _since_date(self): + """ A datetime.datetime object representing the "since" date/time """ - return False + def _line_date_is_valid(self, extracted_datetime): + """ + Validate if the given line falls within the provided constraint. In + this case that's whether it has a datetime that is >= to the "since" + date. + """ + ts = extracted_datetime + if ts is None: + # log.info("s:%s: failed to extract datetime from " + # "using expressions %s - assuming line is not valid", + # unique_search_id, ', '.join(self.exprs)) + return False - def add(self, ln): - self.current.add(ln) - if self.prev.intersection(self.current): - raise SkipRangeOverlapException(ln) + if ts < self._since_date: + # log.debug("%s < %s at (%s) i.e. False", ts, self._since_date, + # line[-3:].strip()) + return False - def __len__(self): - return len(self.current) + # log.debug("%s >= %s at (%s) i.e. True", ts, self._since_date, + # line[-3:].strip()) - def save_and_reset(self): - if self.current: - self.prev = self.current - self.current = set() - self.direction = self.SKIP_BWD + return True - def __repr__(self): - if self.current: - _r = sorted(list(self.current)) - if self.direction == self.SKIP_BWD: - _dir = '<-' - else: - _dir = '->' - return "+skip({}{}{})".format(_r[0], _dir, _r[-1]) +class ValidLinesNotFound(Exception): + """Raised when a log file contains proper timestamps but + no log lines after the since date.""" - return "" +class ValidFormattedDateNotFound(Exception): + """Raised when a log file does not contain any line with + date suitable to specified date format""" -class BinarySearchState(object): - # max contiguous skip lines before bailing on file search - SKIP_MAX = 500 - RC_FOUND_GOOD = 0 - RC_FOUND_BAD = 1 - RC_SKIPPING = 2 - RC_ERROR = 3 +class DateNotFoundInLine(Exception): + """Raised when searcher has encountered a line with no date + and performed forward-backward searches, but still yet, could + not found a line with date.""" - def __init__(self, fd_info, cur_pos): - self.fd_info = fd_info - self.rc = self.RC_FOUND_GOOD - self.cur_ln = 0 - self.cur_pos = cur_pos - self.next_pos = 0 - def start(self): - """ Must be called before starting searches beyond the first line of - a file. +class InvalidSearchState(Exception): + """Raised when a variable dependent on another variable (e.g. + the variable x only has value when y is True) is accessed without + checking the prerequisite variable.""" - This is not done in __init__ since it will load file markers, a - potentially expensive operation that should only be done if necessary. - """ - self.search_range_start = 0 - self.search_range_end = len(self.fd_info.markers) - 1 - self.update_pos_pointers() - self.invalid_range = SkipRange() - self.last_known_good_ln = None - - def save_last_known_good(self): - self.last_known_good_ln = self.cur_ln - - def skip_current_line(self): - if len(self.invalid_range) == self.SKIP_MAX - 1: - self.rc = self.RC_ERROR - log.warning("reached max contiguous skips (%d) - skipping " - "constraint for file %s", self.SKIP_MAX, - self.fd_info.fd.name) - return - self.rc = self.RC_SKIPPING - try: - self.invalid_range.add(self.cur_ln) - if (self.invalid_range.direction == SkipRange.SKIP_BWD and - self.cur_ln > self.search_range_start): - self.cur_ln -= 1 - elif self.cur_ln < self.search_range_end: - if self.invalid_range.direction == SkipRange.SKIP_BWD: - log.debug("changing skip direction to fwd") - - self.invalid_range.direction = SkipRange.SKIP_FWD - self.cur_ln += 1 - - self.update_pos_pointers() - except SkipRangeOverlapException: - if self.last_known_good_ln is not None: - self.rc = self.RC_FOUND_GOOD - self.cur_ln = self.last_known_good_ln - self.update_pos_pointers() - log.debug("re-entered skip range so good line is %s", - self.cur_ln) - self.fd_info.fd.seek(self.cur_pos) - else: - self.rc = self.RC_ERROR - log.error("last known good not set so not sure where to " - "go after skip range overlap.") - - def update_pos_pointers(self): - if len(self.fd_info.markers) == 0: - log.debug("file %s has no markers - skipping update pos pointers", - self.fd_info.fd.name) - return +class FindTokenStatus(Enum): + FOUND = 1, + REACHED_EOF = 2, + FAILED = 3 - ln = self.cur_ln - self.cur_pos = self.fd_info.markers[ln] - if len(self.fd_info.markers) > ln + 1: - self.next_pos = self.fd_info.markers[ln + 1] - else: - self.next_pos = self.fd_info.eof_pos - def get_next_midpoint(self): +class SearchState(object): + def __init__(self, status: FindTokenStatus, offset=0): """ - Given two line numbers in a file, find the mid point. + @param status: current status of search + @param offset: current position in file from which next search will be + started. """ - start = self.search_range_start - end = self.search_range_end - if start == end: - return start, self.fd_info.markers[start] + self._status = status + self._offset = offset - range = end - start - mid = start + int(range / 2) + (range % 2) - log.debug("midpoint: start=%s, mid=%s, end=%s", start, mid, end) - self.cur_ln = mid - - def __repr__(self): - return ("start={}{}, end={}, cur_pos={}, cur_ln={}, rc={}".format( - self.search_range_start, - self.invalid_range, - self.search_range_end, - self.cur_pos, - self.cur_ln, - self.rc)) - - -class FileMarkers(object): - INDEX_FSIZE_LIMIT = (1024 ** 3) * 10 - SYNC_LIMIT = 100000 - # NOTE: chunk much contain a newline - BLOCK_SIZE_BASE = 4096 - CHUNK_SIZE = BLOCK_SIZE_BASE * 64 - - def __init__(self, fd, eof_pos, cache_path=None): - """ - Index start position for every line in file. Starts at current - position and is non-destructive i.e. original position is - restored. + @property + def status(self): + return self._status - This is an expensive operation and we only want to do it once per - file/path so the results are cached on disk and loaded when needed. + @property + def offset(self): + if self.status == FindTokenStatus.FAILED: + raise InvalidSearchState() - NOTE: this is only safe to use if the file does not change between - calls. - """ - if cache_path is None: - self.cache_path = tempfile.mkdtemp() - log.debug("cache path not provided to filemarkers so using a " - "custom one (%s)", self.cache_path) - else: - self.cache_path = cache_path - - self.fd = fd - self.file_path = self.fd.name - hash = self._fname_hash(self.file_path) - mtimesize = self._fmtime_size(self.file_path) - self.cache_id = 'file_markers_{}_{}'.format(hash, mtimesize) - self.cache_type = 'search_constraints' - self.orig_pos = self.fd.tell() - self.eof_pos = eof_pos - self._primed = False - self._num_lines = None - self.chunk_size = self.CHUNK_SIZE - - def _do_chunk(self, cache, chunk, current_position, marker): - if (self.fd.tell() < self.eof_pos) and '\n' not in chunk: - log.warning("no newline found in chunk len=%s starting at " - "%s - increasing chunksize by %s bytes and " - "trying again", self.chunk_size, current_position, - self.BLOCK_SIZE_BASE) - self.chunk_size += self.BLOCK_SIZE_BASE - self.fd.seek(current_position) - return -1, marker - - markers = {} - chunkpos = 0 - while True: - n = chunk[chunkpos:].find('\n') - if n == -1: - break + return self._offset - chunkpos += n + 1 - if current_position + chunkpos == self.eof_pos: - # don't save eof - break - marker += 1 - markers[marker] = current_position + chunkpos - if marker % self.SYNC_LIMIT == 0: - log.debug("indexed {0:.2f}% of file".format( - (100 / self._num_lines) * marker)) - - cache.bulk_set(markers) - return chunkpos, marker - - def _prime(self): - self.fd.seek(self.orig_pos) - self._num_lines = sum(1 for i in self.fd) - self.fd.seek(self.orig_pos) - log.debug("priming index cache for file %s in %s (numlines=%s)", - self.file_path, - self.cache_path, self._num_lines) - with MPCacheSharded(self.cache_id, self.cache_type, - self.cache_path) as cache: - if cache.get(self._num_lines - 1) is not None: - log.debug("using existing file index from cache.") - self._primed = True - return - - main_marker = 0 - main_pos = self.orig_pos - cache.set(main_marker, self.orig_pos) - remainder = '' - for chunk in self._readchunk(self.fd): - if remainder != '': - chunk = remainder + chunk - - chunk_pos, main_marker = self._do_chunk(cache, chunk, main_pos, - main_marker) - if chunk_pos >= 0: - remainder = chunk[chunk_pos:] - main_pos += chunk_pos - else: - # this implies we are going around again - remainder = '' - - self.fd.seek(self.orig_pos) - log.debug("finished creating index. (lines=%s)", self._num_lines) - self._primed = True - - def _fname_hash(self, path): - hash = hashlib.sha256() - hash.update(path.encode('utf-8')) - return hash.hexdigest() - - def _fmtime_size(self, path): - """ - Criteria used to determine if file contents changed since markers were - last generated. - """ - if not os.path.exists(path): - return 0 +class NonDestructiveFileRead(object): + """ Context manager class that saves current position at start and restores + once finished. """ + def __init__(self, file): + self.file = file + self.original_position = file.tell() - mtime = os.path.getmtime(path) - size = os.path.getsize(path) - return "{}+{}".format(mtime, size) + def __enter__(self): + return self.file - def _readchunk(self, fd): - while True: - data = fd.read(self.chunk_size).decode('unicode_escape') - if data == '': - break + def __exit__(self, exc_type, exc_value, exc_traceback): + self.file.seek(self.original_position) - yield data - def __getitem__(self, key): - if not self._primed: - self._prime() +class LogLine(object): + """Class representing a line in a log file. - with MPCacheSharded(self.cache_id, self.cache_type, - self.cache_path) as cache: - return cache.get(key) + Keeps the start/end offsets of the line. Line content is lazy-loaded on + demand by calling the `text` method. + """ - def __iter__(self): - if not self._primed: - self._prime() + MAX_TEXT_READ_BYTES = 64 - with MPCacheSharded(self.cache_id, self.cache_type, - self.cache_path) as cache: - for value in sorted(cache): - yield value + def __init__(self, file, constraint, line_start_lf, line_end_lf): + assert line_start_lf + assert line_end_lf + assert line_start_lf.offset is not None + assert line_end_lf.offset is not None + self._file = file + self._constraint = constraint + self._line_start_lf = line_start_lf + self._line_end_lf = line_end_lf def __len__(self): - if not self._primed: - self._prime() + return (self.end_offset - self.start_offset) + 1 - return self._num_lines + @property + def start_offset(self): + """Offset of the log line's first character (excluding \n)""" + # Depending on whether we found the start line feed + # or not, we discard one character at the beginning + # (being the \n) + if self.start_lf.status == FindTokenStatus.FOUND: + return self.start_lf.offset + 1 + return self.start_lf.offset -class SeekInfo(object): + @property + def end_offset(self): + """Offset of the log line's last character (excluding \n)""" + # Depending on whether we found the end line feed + # or not, we discard one character at the end (being + # the \n) + if self.end_lf.status == FindTokenStatus.FOUND: + return self.end_lf.offset - 1 - def __init__(self, fd, cache_path=None): - """ - @param cache_path: provide a path to save cache info if you want it to - persist. - """ - self.fd = fd - self.iterations = 0 - self._orig_pos = self.fd.tell() - self.markers = FileMarkers(fd, self.eof_pos, cache_path) + return self.end_lf.offset - @cached_property - def eof_pos(self): - """ - Returns file EOF position. - """ - orig = self.fd.tell() - eof = self.fd.seek(0, 2) - self.fd.seek(orig) - return eof + @property + def start_lf(self): + """Offset of the log line's starting line feed.""" + return self._line_start_lf - @cached_property - def orig_pos(self): + @property + def end_lf(self): + """Offset of the log line's ending line feed.""" + return self._line_end_lf + + @property + def date(self): + """Extract the date from the log line, if any. + + The function will use extracted_datetime function to parse + the date/time. + + @return: datetime: if `text` contains a valid datetime otherwise None. """ - The original position of the file descriptor. + return self._constraint.extracted_datetime(self.text) + + @property + def text(self): + """Retrieve the line text. + + This function seeks to the line start and reads the line content + on demand. The function will revert the file offset back after reading + to where it was before. - NOTE: cannot be called when iterating over an fd. Must be called before - any destructive operations take place. + @return: the line text string """ - return self._orig_pos + with NonDestructiveFileRead(self._file) as f: + f.seek(self.start_offset) + line_text = f.read(min(self.MAX_TEXT_READ_BYTES, len(self))) + return line_text - def reset(self): - log.debug("restoring file position to start (%s)", - self.orig_pos) - self.fd.seek(self.orig_pos) +class LogFileDateSinceSeeker(object): + r"""Performs "since" date lookups with file offsets. This is + useful for performing line-based binary date searches on a log file. -class BinarySeekSearchBase(ConstraintBase): - """ - Provides a way to seek to a point in a file using a binary search and a - given condition. - """ + Implements __len__ and __getitem__ methods in order to behave like a list. + When __getitem__ is called with an offset the algorithm locates the + rightmost and leftmost line feed '\n' to form a line. For example with the + following file contents: - def __init__(self, allow_constraints_for_unverifiable_logs=True): - self.fd_info = None - self.allow_unverifiable_logs = allow_constraints_for_unverifiable_logs + 13:15 AAAAAA\n13:16 BBBBBBB\n13:17 CCCCCC - @abc.abstractmethod - def extracted_datetime(self, line): - """ - Extract timestamp from start of line. + and assuming __getitem__ is called with offset 19 i.e. - @param line: text line to extract a datetime from. - @return: datetime.datetime object or None - """ + 13:15 AAAAAA\n13:16 BBBBBBB\n13:17 CCCCCC + ^19 - @abc.abstractproperty - def _since_date(self): - """ A datetime.datetime object representing the "since" date/time """ + The algorithm will first read SEEK_HORIZON bytes forward, starting + from offset `19`, and then try to find the first line feed: - def _line_date_is_valid(self, extracted_datetime): - """ - Validate if the given line falls within the provided constraint. In - this case that's whether it has a datetime that is >= to the "since" - date. - """ - ts = extracted_datetime - if ts is None: - # log.info("s:%s: failed to extract datetime from " - # "using expressions %s - assuming line is not valid", - # unique_search_id, ', '.join(self.exprs)) - return False + 13:15 AAAAAA\n13:16 BBBBBBB\n13:17 CCCCCC + ^19 ^r-lf - if ts < self._since_date: - # log.debug("%s < %s at (%s) i.e. False", ts, self._since_date, - # line[-3:].strip()) - return False + Then the algorithm will seek SEEK_HORIZON bytes backward, starting from + offset 19, read SEEK_HORIZON bytes and then try to find the first line feed + scanning in reverse: - # log.debug("%s >= %s at (%s) i.e. True", ts, self._since_date, - # line[-3:].strip()) + 13:15 AAAAAA\n13:16 BBBBBBB\n13:17 CCCCCC + ^l-lf ^19 ^r-lf - return True + Then, the algorithm will extract the characters between l-lf and r-lf + to form a line. The line will be checked against the date matcher + to extract the date. If the date matcher yields a valid date, __getitem__ + will return that date. Otherwise, the search will be extended to other + nearby lines, prioritising the lines prior to the current, until either of + the following is true: - def _seek_and_validate(self, datetime_obj): - """ - Seek to position and validate. If the line at pos is valid the new - position is returned otherwise None. + - a line with a timestamp is found + - MAX_*_FALLBACK_LINES has been reached + """ - NOTE: this operation is destructive and will always point to the next - line after being called. + # Number of characters to read while searching + SEEK_HORIZON = 256 + + # How many times we can expand the search horizon while trying to find a + # line feed. This means the search will read SEEK_HORIZON times + # MAX_SEEK_HORIZON_EXPAND bytes in total when a line feed character is not + # found. + MAX_SEEK_HORIZON_EXPAND = 4096 + + # Number of lines to search forwards when the algorithm encounters lines + # with no date. + MAX_TRY_FIND_WITH_DATE_ATTEMPTS = 500 + + LINE_FEED_TOKEN = b'\n' + + def __init__(self, fd, c): + self.file = fd + self.constraint = c + self.line_info = None + self.found_any_date = False + self.lookup_times = 0 + with NonDestructiveFileRead(self.file) as f: + self.length = f.seek(0, 2) + + def find_token_reverse(self, start_offset, horizon, + attempts=MAX_SEEK_HORIZON_EXPAND): + r"""Find `token` in `file` starting from `start_offset` and backing off + `horizon`bytes on each iteration for maximum of `max_iterations` times. + + @param start_offset (int): start offset of search + @param horizon (int): Amount of bytes to be processed on each step. + @param attempts (int, optional): Maximum amount of search iterations. + Defaults to MAX_SEEK_HORIZON_EXPAND. + + @return: + FindDelimiterResult(FindTokenStatus.FOUND, ...) if token is found + FindDelimiterResult(FindTokenStatus.REACHED_EOF, ...) if token is + not found because the scan reached the EOF + FindDelimiterResult(FindTokenStatus.FAILED, ...) if token is + not found because scan exhausted `max_iterations` + """ + + current_offset = -horizon + while True: + attempts -= 1 + read_offset = start_offset + current_offset + read_offset = read_offset if read_offset > 0 else 0 + read_size = horizon + if start_offset + current_offset <= 0: + read_size = read_size + (start_offset + current_offset) + + self.file.seek(read_offset) + chunk = self.file.read(read_size) + if not chunk or len(chunk) == 0: + # We've reached the start of the file and could not find the + # token. + return SearchState(status=FindTokenStatus.REACHED_EOF, + offset=0) + + chunk_offset = chunk.rfind(self.LINE_FEED_TOKEN) + + if chunk_offset != -1: + return SearchState(status=FindTokenStatus.FOUND, + offset=read_offset + chunk_offset) + + if attempts <= 0: + break - @param pos: position in a file. - """ - if self._line_date_is_valid(datetime_obj): - return self.fd_info.fd.tell() + current_offset = current_offset - len(chunk) + if (start_offset + current_offset) < 0: + return SearchState(status=FindTokenStatus.REACHED_EOF, + offset=0) + + return SearchState(FindTokenStatus.FAILED) + + def find_token(self, start_offset, horizon, + attempts=MAX_SEEK_HORIZON_EXPAND): + r"""Find `token` in `file` starting from `start_offset` and moving + forward `horizon` bytes on each iteration for maximum of + `max_iterations` times. - def _check_line(self, search_state): - """ - Attempt to read and validate datetime from line. + @param start_offset (int): start offset of search + @param horizon (int): Amount of bytes to be processed on each step. + @param attempts (int, optional): Maximum amount of search iterations. + Defaults to MAX_SEEK_HORIZON_EXPAND. - @return new position or -1 if we were not able to validate the line. - """ - self.fd_info.fd.seek(search_state.cur_pos) - # don't read the whole line since we only need the date at the start. - # hopefully 64 bytes is enough for any date+time format. - datetime_obj = self.extracted_datetime(self.fd_info.fd.read(64)) - self.fd_info.fd.seek(search_state.next_pos) - if datetime_obj is None: - return -1 - - return self._seek_and_validate(datetime_obj) - - def _seek_next(self, state): - log.debug("seek %s", state) - newpos = self._check_line(state) - if newpos == -1: - if not self.allow_unverifiable_logs: - log.info("file contains unverifiable lines and " - "allow_constraints_for_unverifiable_logs is False " - "- aborting constraints for this file.") - state.rc = state.RC_ERROR - return state - - # until we get out of a skip range we want to leave the pos at the - # start but we rely on the caller to enforce this so that we don't - # have to seek(0) after every skip. - state.skip_current_line() - return state - - if newpos is None: - state.rc = state.RC_FOUND_BAD - if state.cur_ln == 0: - log.debug("first line is not valid, checking last line") - state.cur_ln = state.search_range_end - state.update_pos_pointers() - elif (state.search_range_end - state.search_range_start) >= 1: - # _start_ln = state.search_range_start - state.search_range_start = state.cur_ln - # log.debug("going forwards (%s->%s:%s)", _start_ln, - # state.search_range_start, state.search_range_end) - state.invalid_range.save_and_reset() - state.get_next_midpoint() - state.update_pos_pointers() - else: - state.save_last_known_good() - state.rc = state.RC_FOUND_GOOD - if state.cur_ln == 0: - log.debug("first line is valid so assuming same for rest of " - "file") - self.fd_info.reset() - elif state.search_range_end - state.search_range_start <= 1: - log.debug("found final good ln=%s", state.cur_ln) - self.fd_info.fd.seek(state.cur_pos) - elif (len(state.invalid_range) > 0 and - state.invalid_range.direction == SkipRange.SKIP_FWD): - log.debug("found good after skip range") - self.fd_info.fd.seek(state.cur_pos) - else: - # set rc to bad since we are going to a new range - state.rc = state.RC_FOUND_BAD - # _end_ln = state.search_range_end - state.search_range_end = state.cur_ln - # log.debug("going backwards (%s:%s->%s)", - # state.search_range_start, _end_ln, - # state.search_range_end) - self.fd_info.fd.seek(state.cur_pos) - state.invalid_range.save_and_reset() - state.get_next_midpoint() - state.update_pos_pointers() - - return state - - def _seek_to_first_valid(self, destructive=True): + @return: + FindDelimiterResult(FindTokenStatus.FOUND, ...) if token is found + FindDelimiterResult(FindTokenStatus.REACHED_EOF, ...) if token is + not found because the scan reached the EOF + FindDelimiterResult(FindTokenStatus.FAILED, ...) if token is + not found because scan exhausted `max_iterations` """ - Find first valid line in file using binary search. By default this is a - destructive and will actually seek to the line. If no line is found the - descriptor will be at EOF. - Returns offset at which valid line was found. + current_offset = 0 + # Seek to the initial starting position + self.file.seek(start_offset) + while attempts > 0: + # Read `horizon` bytes from the file. + chunk = self.file.read(horizon) - @param destructive: by default this seek operation is destructive i.e. - it will find the least valid point and stay there. - If that is not desired this can be set to False. - """ - search_state = BinarySearchState(self.fd_info, self.fd_info.fd.tell()) - offset = 0 - - # check first line before going ahead with full search which requires - # generating file markers that is expensive for large files. - if self._check_line(search_state) == search_state.next_pos: - self.fd_info.reset() - log.debug("first line is valid so assuming same for rest of " - "file") - log.debug("seek %s finished (skipped %d lines) current_pos=%s, " - "offset=%s iterations=%s", - self.fd_info.fd.name, offset, - self.fd_info.fd.tell(), offset, self.fd_info.iterations) - - return offset - - log.debug("first line is not valid - checking rest of file") - self.fd_info.reset() - search_state.start() - num_lines = len(self.fd_info.markers) - if num_lines > 0: - while True: - if search_state.cur_ln >= num_lines: - log.debug("reached eof - no more lines to check") - break - - self.fd_info.iterations += 1 - search_state = self._seek_next(search_state) - if search_state.rc == search_state.RC_ERROR: - offset = 0 - self.fd_info.reset() - break - - if (search_state.search_range_end - - search_state.search_range_start) < 1: - # we've reached the end of all ranges but the result in - # undetermined. - if search_state.rc != search_state.RC_FOUND_BAD: - self.fd_info.reset() - offset = 0 - else: - offset = search_state.cur_ln - - break - - # log.debug(search_state) - if search_state.rc == search_state.RC_FOUND_GOOD: - # log.debug("seek ended at offset=%s", search_state.cur_ln) - offset = search_state.cur_ln - break - - if search_state.rc == search_state.RC_SKIPPING: - if ((search_state.cur_ln >= search_state.search_range_end) - and (len(search_state.invalid_range) == - search_state.search_range_end)): - # offset and pos should still be SOF so we - # make this the same - search_state.cur_ln = 0 - self.fd_info.reset() - break - - if self.fd_info.iterations >= len(self.fd_info.markers): - log.warning("exiting seek loop since limit reached " - "(eof=%s)", self.fd_info.eof_pos) - offset = 0 - self.fd_info.reset() - break - else: - log.debug("file %s is empty", self.fd_info.fd.name) + if not chunk or len(chunk) == 0: + # Reached end of file + return SearchState(status=FindTokenStatus.REACHED_EOF, + offset=len(self)) - if not destructive: - self.fd_info.fd.reset() + chunk_offset = chunk.find(self.LINE_FEED_TOKEN) + if chunk_offset != -1: + # We've found the token in the chunk. + # As the chunk_offset is a relative offset to the chunk + # translate it to file offset while returning. + return SearchState(status=FindTokenStatus.FOUND, + offset=(start_offset + current_offset + + chunk_offset)) + # We failed to find the token in the chunk. + # Progress the current offset forward by + # chunk's length. + current_offset = current_offset + len(chunk) + attempts -= 1 + + # Reached max_iterations and found nothing. + return SearchState(FindTokenStatus.FAILED) + + def try_find_line(self, epicenter, slf_off=None, elf_off=None): + r"""Try to find a line at `epicenter`. This function allows extracting + the corresponding line from a file offset. "Line" is a string between + two line feed characters i.e.; + + - \nThis is a line\n + + ... except when the line starts at the start of the file or ends at the + end of the file, where SOF/EOF are also accepted as line start/end: + + - This is line at SOF\n + - \nThis is a line at EOF + + Assume that we have the following file content: + + 11.01.2023 fine\n11.01.2023 a line\n11.01.2023 another line + + We have a file with three lines in above, and the offsets for these + lines would be: + + ----------------------------------------------------- + Line | Line | Line | + Nr. | Text | Offsets | + ----------------------------------------------------- + #0: | "11.01.2023 fine" | [0,14] | + #1: | "11.01.2023 a line" | [16,33] | + #2: | "11.01.2023 another line" | [35,58] | + + The function `try_find_line_w_date` would return the following for + the calls: + + (0) -> (11.01.2023,0-14) + (7) -> (11.01.2023,0-14) + (18) -> (11.01.2023,16-33) + (47) -> (11.01.2023,35-58) + + This function will try to locate first line feed characters from both + left and right side of the position `epicenter`. Assume the file + content we have above, and we want to extract the line at offset `18`, + which corresponds to the first dot `.` of date of the line #1: + + 11.01.2023 fine\n11.01.2023 a line\n11.01.2023 another line + ^epicenter + + The function will try to form a line by first searching for the first + line feed character in the left (slf) (if slf_off is None): + + 11.01.2023 fine\n11.01.2023 a line\n11.01.2023 another line + ^slf^epicenter + + and then the same for the right (elf) (if elf_off is None): + + 11.01.2023 fine\n11.01.2023 a line\n11.01.2023 another line + ^slf^epicenter ^elf + + The function will then extract the string between the `slf` and `elf`, + which yields the string "11.01.2023 a line". + + The function will either return a valid LogLine object, or raise an + exception. + + @param epicenter: Search start offset + @param slf_off: Optional starting line feed offset, if known. Defaults + to None. + @param elf_off: Optional ending line feed offset, if known. Defaults to + None. + + @raise ValueError: when ending line feed offset could not be found or + when starting line feed offset could not be found. + + @return: found logline + """ + log.debug(" > EPICENTER: %d", epicenter) + + # Find the first LF token from the right of the epicenter + # e.g. \nThis is a line\n + # ^epicenter ^line end lf + line_end_lf = self.find_token( + epicenter, LogFileDateSinceSeeker.SEEK_HORIZON + ) if elf_off is None else SearchState( + FindTokenStatus.FOUND, elf_off) + + if line_end_lf.status == FindTokenStatus.FAILED: + raise ValueError("Could not find ending line feed " + f"offset at epicenter {epicenter}") + + # Find the first LF token from the left of the epicenter + # e.g. \nThis is a line\n + # line start lf ^ ^epicenter + line_start_lf = self.find_token_reverse( + epicenter, LogFileDateSinceSeeker.SEEK_HORIZON + ) if slf_off is None else SearchState( + FindTokenStatus.FOUND, slf_off) + + if line_start_lf.status == FindTokenStatus.FAILED: + raise ValueError("Could not find start line feed " + f"offset at epicenter {epicenter}") + + # Ensure that found offsets are in file range + assert line_start_lf.offset <= len(self) + assert line_start_lf.offset >= 0 + assert line_end_lf.offset <= len(self) + assert line_end_lf.offset >= 0 + # Ensure that end lf offset is >= start lf offset + assert line_end_lf.offset >= line_start_lf.offset + + return LogLine(file=self.file, constraint=self.constraint, + line_start_lf=line_start_lf, line_end_lf=line_end_lf) + + def try_find_line_with_date(self, start_offset, line_feed_offset=None, + forwards=True, + attempts=MAX_TRY_FIND_WITH_DATE_ATTEMPTS): + r"""Try to fetch a line with date, starting from `start_offset`. + + The algorithm will try to fetch a new line searching for a valid date + for a maximum of `attempts` times. The lines will be fetched from + either prior or after `start_offset`, depending on the value of the + `forwards` parameter. + + If `prev_offset` parameter is used, the value will be used as either + fwd_line_feed or rwd_line_feed position depending on the value of the + `forwards` parameter. + + @param start_offset: Where to begin searching + @param line_feed_offset: Offset of the fwd_line_feed, or rwd_line_feed + if known. Defaults to None. + @param forwards: Search forwards, or backwards. Defaults to True + (forwards). + @param attempts (int): How many lines will be checked at most. + + @return: line if found otherwise None. + """ + offset = start_offset + log_line = None + while attempts > 0: + log_line = self.try_find_line( + offset, + (None, line_feed_offset)[forwards], + (None, line_feed_offset)[not forwards] + ) + + log.debug( + " > TRY_FETCH, REMAINING_ATTEMPTS:%d, START_LF_OFFSET: %d, " + "END_LF_OFFSET: %d >: on line -> %s", + attempts, + log_line.start_lf.offset, + log_line.end_lf.offset, + log_line.text, + ) + + # If the line has a valid date, return it. + if log_line.date: + return log_line + + # Set offset of the found line feed + line_feed_offset = (log_line.start_lf, + log_line.end_lf)[forwards].offset + # Set the next search starting point + offset = line_feed_offset + (-1, +1)[forwards] + if offset < 0 or offset > len(self): + log.debug(" > TRY_FETCH EXIT EOF/SOF") + break - log.debug("seek %s finished (skipped %d lines) current_pos=%s, " - "offset=%s iterations=%s", - self.fd_info.fd.name, offset, - self.fd_info.fd.tell(), offset, self.fd_info.iterations) + attempts -= 1 + return None + + def __len__(self): + return self.length + + def __getitem__(self, offset): + r"""Find the nearest line with a date at `offset` and return its date. + + To illustrate how this function works, let's assume that we have a file + with the contents as follows: + + line\n01.01.1970 line\nthisisaline\nthisisline2\n01.01.1970 thisisline3 + + ----------------------------------------------------------------------- + For a lookup at offset `13`: + + line\n01.01.1970 line\nthisisaline\nthisisline2\n01.01.1970 thisisline3 + ^ offset(13) + |--------------| + ^try_find_line_with_date(bwd, itr #1) + + The function will first call ^try_find_line_with_date, which will yield + `01.01.1970 line` in turn. As the line contains a date, the return + value will be datetime("01.01.1970") + ----------------------------------------------------------------------- + For a lookup at offset `25`: + + line\n01.01.1970 line\nthisisaline\nthisisline2\n01.01.1970 thisisline3 + ^ offset(25) + |---------| + ^try_find_line_with_date(bwd, itr #1) + |--------------| + ^try_find_line_with_date(bwd, itr #2) + + The function will first call try_find_line, which will yield + `thisisaline` in turn. As the line does not contain a valid date, the + function will perform a backwards search to find first line that + contains a date by caling `try_find_line_with_date`, which will yield + the line + "01.01.1970 line", which contains a valid date, and the return value + will be datetime("01.01.1970"). + ----------------------------------------------------------------------- + For a lookup at offset `35`: + + line\n01.01.1970 line\nthisisaline\nthisisline2\n01.01.1970 thisisline3 + ^ offset(35) + |---------| + ^try_find_line_with_date + (bwd,itr #1) + |---------| + ^try_find_line_with_date(bwd, itr #2) + |--------------| + ^try_find_line_with_date(bwd, itr #3) + + The function will first call try_find_line, which will yield + `thisisaline2` in turn. As the line does not contain a valid + date, the function will perform a backwards search to find first + line that contains a date by caling `try_find_line_with_date`, + which will yield the line "01.01.1970 line", (notice that it'll + skip line `thisisaline`) which contains a valid date, and the return + value will be datetime("01.01.1970"). + ----------------------------------------------------------------------- + For a lookup at offset `3`: + + line\n01.01.1970 line\nthisisaline\nthisisline2\n01.01.1970 thisisline3 + ^ offset(3) + |--| + ^try_find_line_with_date(bwd, itr #1) + |-------------| + ^try_find_line_with_date(fwd, itr #1) + + The function will first call try_find_line_with_date, which will yield + `line` in turn. As the line does not contain a valid date, the + function will perform a backwards search to find first line that + contains a date by caling `try_find_line_with_date`, which will + fail since there'is no more lines before the line. The function + will then perform a forwards search to find first line with a date + by calling `try_find_line_with_date` in forwards search mode, which + will yield `01.01.1970 line` in turn, which contains a valid date, + and the return value will be datetime("01.01.1970"). + + This function is intended to be used with bisect.bisect* + functions, so therefore it only returns the `date` for + the comparison. + + @param offset: integer lookup offset + @raise DateNotFoundInLine: When a line with a date could not + be found. + @return: Date of the line at `offset` + """ + + self.lookup_times += 1 + log.debug("-------------------------------------------") + log.debug("-------------------------------------------") + log.debug("-------------------------------------------") + log.debug("-------------------------------------------") + log.debug("LOOKUP (#%d) AT OFFSET: %d", self.lookup_times, offset) + result = None + + # Try to search backwards first. + # First call will effectively be a regular forward search given + # that we're not passing a line feed offset to the function. + # Any subsequent search attempt will be made backwards. + log.debug("######### BACKWARDS SEARCH START #########") + result = self.try_find_line_with_date( + offset, + None, + False, + ) + log.debug("######### BACKWARDS SEARCH END #########") + + # ... then, forwards. + if not result or result.date is None: + log.debug("######### FORWARDS SEARCH START #########") + result = self.try_find_line_with_date(offset + 1, offset, True) + + log.debug("######### FORWARDS SEARCH END #########") + + if not result or result.date is None: + raise DateNotFoundInLine( + f"Date search failed at offset `{offset}`") + + # This is mostly for diagnostics. If we could not find + # any valid date in given file, we throw a specific exception + # to indicate that. + + self.found_any_date = True + if result.date >= self.constraint._since_date: + # Keep the matching line so we can access it + # after the bisect without having to perform another + # lookup. + self.line_info = result + + log.debug( + " > EXTRACTED_DATE: `%s` >= SINCE DATE: `%s` == %s", + result.date, + self.constraint._since_date, + (result.date >= self.constraint._since_date) + if result.date else False, + ) + return result.date + + def run(self): + # bisect_left will give us the first occurenct of the date + # that satisfies the constraint. + # Similarly, bisect_right would allow the last occurence of + # a date that satisfies the criteria. + + try: + bisect.bisect_left(self, self.constraint._since_date) + except DateNotFoundInLine as exc: + if not self.found_any_date: + raise ValidFormattedDateNotFound from exc - return offset + raise + + if not self.line_info: + raise ValidLinesNotFound + + log.debug( + "RUN END, FOUND LINE(START:%d, END:%d, CONTENT:%s)" + " IN %d LOOKUP(S)", + self.line_info.start_offset, + self.line_info.end_offset, + self.line_info.text, + self.lookup_times + ) + + return (self.line_info.start_offset, self.line_info.end_offset) class SearchConstraintSearchSince(BinarySeekSearchBase): @@ -730,7 +843,7 @@ def __init__(self, current_date, cache_path, exprs=None, def extracted_datetime(self, line): if type(line) == bytes: # need this for e.g. gzipped files - line = line.decode("utf-8") + line = line.decode("utf-8", errors='backslashreplace') if self.ts_matcher_cls: timestamp = self.ts_matcher_cls(line) @@ -802,35 +915,47 @@ def apply_to_line(self, line): return ret def apply_to_file(self, fd, destructive=True): - self.fd_info = SeekInfo(fd, cache_path=self.cache_path) if not self._is_valid: log.warning("c:%s unable to apply constraint to %s", self.id, fd.name) return - # indexing files larger than this takes too long and is too resource - # intensive so best to fall back to per-line check. - if os.path.getsize(fd.name) >= FileMarkers.INDEX_FSIZE_LIMIT: - log.debug("s:%s: file %s too big to perform binary search - " - "skipping", self.id, fd.name) - return - if fd.name in self._results: + log.debug("ret cached") return self._results[fd.name] log.debug("c:%s: starting binary seek search to %s in file %s " "(destructive=True)", self.id, self._since_date, fd.name) - self._results[fd.name] = self._seek_to_first_valid(destructive) - log.debug("c:%s: finished binary seek search in file %s", self.id, - fd.name) + try: + seeker = LogFileDateSinceSeeker(fd, self) + result = seeker.run() + fd.seek(result[0] if result and destructive else 0) + + if not result or result[0] == len(seeker): + self._results[fd.name] = None + else: + self._results[fd.name] = result[0] + except ValidFormattedDateNotFound: + log.debug("c:%s No timestamp found in file", self.id) + fd.seek(0) + return fd.tell() + except ValidLinesNotFound: + log.debug("c:%s No date after found in file", self.id) + fd.seek(0, 2) + return fd.tell() + except DateNotFoundInLine as ed: + log.debug("c:%s Expanded date search failed for a line: %s", + self.id, ed) + fd.seek(0) + return fd.tell() + + log.debug("c:%s: finished binary seek search in file %s, offset %d", + self.id, fd.name, self._results[fd.name]) return self._results[fd.name] def stats(self): _stats = {'line': {'pass': self._line_pass, 'fail': self._line_fail}} - if self.fd_info: - _stats['file'] = {'name': self.fd_info.fd.name, - 'iterations': self.fd_info.iterations} return _stats def __repr__(self): diff --git a/tests/unit/test_search.py b/tests/unit/test_search.py index 0d653f2..90c8c18 100644 --- a/tests/unit/test_search.py +++ b/tests/unit/test_search.py @@ -828,28 +828,6 @@ def test_logs_since_junk_at_end_of_file_and_start_invalid(self): results = results.find_by_tag('mysd') self.assertEqual([r.get(2) for r in results], ['L2', 'L3', 'L4']) - @utils.create_files({'atestfile': LOGS_W_TS + "\n"}) - def test_logs_since_junk_not_allow_unverifiable(self): - """ - Test scenario: file has invalid start and unverifiable end so with - allow_constraints_for_unverifiable_logs=False the - constraint will be aborted. - """ - self.current_date = self.get_date('Tue Jan 03 00:00:00 UTC 2022') - c = SearchConstraintSearchSince( - current_date=self.current_date, - cache_path=self.constraints_cache_path, - ts_matcher_cls=TimestampSimple, days=1, - allow_constraints_for_unverifiable_logs=False) - s = FileSearcher(constraint=c) - sd = SearchDef(r"{}\S+ (.+)".format(self.datetime_expr), tag='mysd') - fname = os.path.join(self.data_root, 'atestfile') - s.add(sd, path=fname) - results = s.run() - results = results.find_by_tag('mysd') - self.assertEqual([r.get(2) for r in results], - ['L0', 'L1', 'L2', 'L3', 'L4']) - @utils.create_files({'atestfile': LOGS_W_TS_AND_UNMATCABLE_LINES}) def test_logs_since_file_valid_with_unmatchable_lines(self): """ diff --git a/tests/unit/test_search_constraints.py b/tests/unit/test_search_constraints.py index fc43432..9f42c31 100644 --- a/tests/unit/test_search_constraints.py +++ b/tests/unit/test_search_constraints.py @@ -3,15 +3,22 @@ import tempfile import shutil import subprocess - +from datetime import datetime from unittest import mock +from io import BytesIO from . import utils from searchkit.constraints import ( - BinarySearchState, TimestampMatcherBase, - FileMarkers, SearchConstraintSearchSince, + FindTokenStatus, + SearchState, + InvalidSearchState, + ValidFormattedDateNotFound, + ValidLinesNotFound, + NonDestructiveFileRead, + LogLine, + LogFileDateSinceSeeker, ) @@ -19,7 +26,7 @@ class TimestampSimple(TimestampMatcherBase): @property def patterns(self): - return [r'^(?P\d{4})-(?P\d{2})-(?P\d{2})+\s+' + return [r'^(?P\d{4})-(?P\d{2})-(?P\d{2})[\sT]+' r'(?P\d{2}):(?P\d{2}):(?P\d+)'] @@ -46,6 +53,77 @@ def patterns(self): blah 9 """ +# NOTE: a non-unicode char \xe2 has been inserted into the following logs +MYSQL_LOGS = b"""2019-04-04T14:47:33.199550Z mysqld_safe Logging to '/var/log/mysql/error.log'. +2019-04-04T14:47:33.243881Z mysqld_safe Starting mysqld daemon with databases from /var/lib/percona-xtradb-cluster +2019-04-04T14:47:33.259636Z mysqld_safe Skipping wsrep-recover for f29f7843-56e6-11e9-b1f5-7e15c4ed7ddc:2 pair +2019-04-04T14:47:33.261913Z mysqld_safe Assigning f29f7843-56e6-11e9-b1f5-7e15c4ed7ddc:2 to wsrep_start_position +2019-04-04T14:47:33.279756Z 0 [Warning] Changed limits: max_open_files: 5000 (requested 10005) +2019-04-04T14:47:33.279845Z 0 [Warning] Changed limits: table_open_cache: 1495 (requested 2048) +2019-04-04T14:47:33.478375Z 0 [Warning] TIMESTAMP with implicit DEFAULT value is deprecated. Please use --explicit_defaults_for_timestamp server option (see documentation for more details). +2019-04-04T14:47:33.480000Z 0 [Note] /usr/sbin/mysqld (mysqld 5.7.20-18-18-log) starting as process 126025 ... +2019-04-04T14:47:33.482384Z 0 [Note] WSREP: Setting wsrep_ready to false +2019-04-04T14:47:33.482398Z 0 [Note] WSREP: No pre-stored wsrep-start position found. Skipping position initialization. +2019-04-04T14:47:33.482402Z 0 [Note] WSREP: wsrep_load(): loading provider library '/usr/lib/galera3/libgalera_smm.so' +2019-04-04T14:47:33.484752Z 0 [Note] WSREP: wsrep_load(): Galera 3.24(r) by Codership Oy loaded successfully. +2019-04-04T14:47:33.484800Z 0 [Note] WSREP: CRC-32C: using hardware acceleration. +2019-04-04T14:47:33.485042Z 0 [Note] WSREP: Found saved state: f29f7843-56e6-11e9-b1f5-7e15c4ed7ddc:2, safe_to_bootstrap: 1 +2019-04-04T14:47:33.486876Z 0 [Note] WSREP: Passing config to GCS: base_dir = /var/lib/percona-xtradb-cluster/; base_host = 10.160.0.153; base_port = 4567; cert.log_conflicts = no; debug = no; evs.auto_evict = 0; evs.delay_margin = PT1S; evs.delayed_keep_period = PT30S; evs.inactive_check_period = PT0.5S; evs.inactive_timeout = PT15S; evs.join_retrans_period = PT1S; evs.max_install_timeouts = 3; evs.send_window = 10; evs.stats_report_period = PT1M; evs.suspect_timeout = PT5S; evs.user_send_window = 4; evs.view_forget_timeout = PT24H; gcache.dir = /var/lib/percona-xtradb-cluster/; gcache.freeze_purge_at_seqno = -1; gcache.keep_pages_count = 0; gcache.keep_pages_size = 0; gcache.mem_size = 0; gcache.name = /var/lib/percona-xtradb-cluster//galera.cache; gcache.page_size = 128M; gcache.recover = no; gcache.size = 128M; gcomm.thread_prio = ; gcs.fc_debug = 0; gcs.fc_factor = 1; gcs.fc_limit = 100; gcs.fc_master_slave = no; gcs.max_packet_size = 64500; gcs.max_throttle = 0.25; gcs.recv_q_hard_limit = 9223372036854775807; gcs.recv_q_soft_limit = 0.25; gcs.sync_donor = no; gmcast.segment = 0; gmcast.version = 0; pc.announce_timeout = PT3S; pc.checksum = false; pc.ignore_quorum = false; pc.ignore_sb = false; pc.npvo = false; pc.recovery = 1; pc.version = 0; pc.wait_prim = true; pc.wait_prim_timeout = PT30S; pc.weight = 1; protonet.backend = asio; protonet.version = 0; repl.causal_read_timeout = PT30S; repl.commit_order = 3; repl.key_format = FLAT8; repl.max_ws_size = 2147483647; repl.proto_max = 7; socket.checksum = 2; socket.recv_buf_size = 212992; +2019-04-04T14:47:33.517506Z 0 [Note] WSREP: GCache history reset: f29f7843-56e6-11e9-b1f5-7e15c4ed7ddc:0 -> f29f7843-56e6-11e9-b1f5-7e15c4ed7ddc:2 +2019-04-04T14:47:33.517838Z 0 [Note] WSREP: Assign initial position for certification: 2, protocol version: -1 +2019-04-04T14:47:33.517876Z 0 [Note] WSREP: Preparing to initiate SST/IST +2019-04-04T14:47:33.517887Z 0 [Note] WSREP: Starting replication +2019-04-04T14:47:33.517906Z 0 [Note] WSREP: Setting initial position to f29f7843-56e6-11e9-b1f5-7e15c4ed7ddc:2 +2019-04-04T14:47:33.518229Z 0 [Note] WSREP: Using CRC-32C for message checksums. +2019-04-04T14:47:33.518374Z 0 [Note] WSREP: gcomm thread scheduling priority set to other:0 +2019-04-04T14:47:33.518535Z 0 [Warning] WSREP: Fail to access the file (/var/lib/percona-xtradb-cluster//gvwstate.dat) error (No such file or directory). It is possible if node is booting for first time or re-booting after a graceful shutdown +2019-04-04T14:47:33.518549Z 0 [Note] WSREP: Restoring primary-component from disk failed. Either node is booting for first time or re-booting after a graceful shutdown +2019-04-04T14:47:33.518960Z 0 [Note] WSREP: GMCast version 0 +2019-04-04T14:47:33.519303Z 0 [Note] WSREP: (947dd30b, 'tcp://0.0.0.0:4567') listening at tcp://0.0.0.0:4567 +2019-04-04T14:47:33.519324Z 0 [Note] WSREP: (947dd30b, 'tcp://0.0.0.0:4567') multicast: , ttl: 1 +2019-04-04T14:47:33.520034Z 0 [Note] WSREP: EVS version 0 +2019-04-04T14:47:33.520195Z 0 [Note] WSREP: gcomm: connecting to group 'juju_cluster', peer '10.160.0.148:' +2019-04-04T14:47:34.532443Z 0 [Note] WSREP: (947dd30b, 'tcp://0.0.0.0:4567') connection established to 0256ddd2 tcp://10.160.0.148:4567 +2019-04-04T14:47:34.532579Z 0 [Note] WSREP: (947dd30b, 'tcp://0.0.0.0:4567') turning message relay requesting on, nonlive peers: +2019-04-04T14:47:35.022049Z 0 [Note] WSREP: declaring 0256ddd2 at tcp://10.160.0.148:4567 stable +2019-04-04T14:47:35.022303Z 0 [Note] WSREP: Node 0256ddd2 state primary +2019-04-04T14:47:35.023075Z 0 [Note] WSREP: Current view of cluster as seen by this node +view (view_id(PRIM,0256ddd2,2) +memb { + 0256ddd2,0 + 947dd30b,0 + } +joined { + } +left { + } +partitioned { + } +) +2019-04-04T14:47:35.023099Z 0 [Note] WSREP: Save the discovered primary-component to disk +2019-04-04T14:47:35.521301Z 0 [Note] WSREP: gcomm: connected \xe2 +2019-04-04T14:47:35.521465Z 0 [Note] WSREP: Shifting CLOSED -> OPEN (TO: 0) +2019-04-04T14:47:35.521724Z 0 [Note] WSREP: New COMPONENT: primary = yes, bootstrap = no, my_idx = 1, memb_num = 2 +2019-04-04T14:47:35.521778Z 0 [Note] WSREP: STATE EXCHANGE: Waiting for state UUID. +2019-04-04T14:47:35.521831Z 0 [Note] WSREP: STATE EXCHANGE: sent state msg: 95636247-56e8-11e9-baaa-cb92a57ee4d8 +2019-04-04T14:47:35.521848Z 0 [Note] WSREP: STATE EXCHANGE: got state msg: 95636247-56e8-11e9-baaa-cb92a57ee4d8 from 0 (juju-0aa49a-7-lxd-7) +2019-04-04T14:47:35.521926Z 0 [Note] WSREP: Waiting for SST/IST to complete. +2019-04-04T14:47:35.522184Z 0 [Warning] WSREP: last inactive check more than PT1.5S (3*evs.inactive_check_period) ago (PT2.00221S), skipping check +2019-04-04T14:47:35.522333Z 0 [Note] WSREP: STATE EXCHANGE: got state msg: 95636247-56e8-11e9-baaa-cb92a57ee4d8 from 1 (juju-0aa49a-8-lxd-7) +2019-04-04T14:47:35.522352Z 0 [Note] WSREP: Quorum results: + version = 4, + component = PRIMARY, + conf_id = 1, + members = 1/2 (primary/total), + act_id = 2, + last_appl. = -1, + protocols = 0/7/3 (gcs/repl/appl), + group UUID = f020c044-56e4-11e9-9652-aad4a917a89c +2019-04-04T14:47:35.522366Z 0 [Note] WSREP: Flow-control interval: [141, 141] +2019-04-04T14:47:35.522376Z 0 [Note] WSREP: Trying to continue unpaused monitor +2019-04-04T14:47:35.522386Z 0 [Note] WSREP: Shifting OPEN -> PRIMARY (TO: 2) +2019-04-04T14:47:35.522479Z 2 [Note] WSREP: State transfer required: """ # noqa, pylint: disable=all + class TestSearchKitBase(utils.BaseTestCase): @@ -69,112 +147,6 @@ def tearDown(self): class TestSearchConstraints(TestSearchKitBase): - def test_constraints_file_markers_w_trailing_newline(self): - with tempfile.TemporaryDirectory() as dtmp: - cache_path = os.path.join(dtmp, 'cache') - fpath = os.path.join(dtmp, 'f1') - with open(fpath, 'w') as fd: - fd.write('this is\n') - fd.write('some\n') - fd.write('file content\n') - - with open(fpath, 'rb') as fd: - orig_pos = fd.tell() - eof_pos = fd.seek(0, 2) - fd.seek(orig_pos) - markers = FileMarkers(fd, eof_pos, cache_path) - self.assertEqual(len(markers), 3) - self.assertEqual(list(markers), [0, 8, 13]) - full_cache_path = os.path.join(markers.cache_path, 'caches', - 'search_constraints') - self.assertEqual(len(os.listdir(full_cache_path)), 1) - cache_file = os.path.join(full_cache_path, - os.listdir(full_cache_path)[0]) - mtime1 = os.path.getmtime(cache_file) - # re-run and this time should use existing cache - markers = FileMarkers(fd, eof_pos, cache_path) - self.assertEqual(len(markers), 3) - self.assertEqual(list(markers), [0, 8, 13]) - mtime2 = os.path.getmtime(cache_file) - full_cache_path = os.path.join(markers.cache_path, 'caches', - 'search_constraints') - self.assertEqual(len(os.listdir(full_cache_path)), 1) - self.assertEqual(mtime1, mtime2) - self.assertEqual(orig_pos, fd.tell()) - - self.assertEqual(markers[0], 0) - self.assertEqual(markers[1], 8) - self.assertEqual(markers[2], 13) - self.assertEqual(markers[3], None) - - def test_constraints_file_markers_no_trailing_newline(self): - with tempfile.TemporaryDirectory() as dtmp: - cache_path = os.path.join(dtmp, 'cache') - fpath = os.path.join(dtmp, 'f1') - with open(fpath, 'w') as fd: - fd.write('this is\n') - fd.write('some\n') - fd.write('file content') - - with open(fpath, 'rb') as fd: - orig_pos = fd.tell() - eof_pos = fd.seek(0, 2) - fd.seek(orig_pos) - markers = FileMarkers(fd, eof_pos, cache_path) - self.assertEqual(len(markers), 3) - self.assertEqual(list(markers), [0, 8, 13]) - - self.assertEqual(markers[0], 0) - self.assertEqual(markers[1], 8) - self.assertEqual(markers[2], 13) - self.assertEqual(markers[3], None) - - @mock.patch.object(FileMarkers, 'CHUNK_SIZE', 14) - def test_constraints_file_markers_gt_block(self): - with tempfile.TemporaryDirectory() as dtmp: - cache_path = os.path.join(dtmp, 'cache') - fpath = os.path.join(dtmp, 'f1') - with open(fpath, 'w') as fd: - fd.write('this is\n') - fd.write('some\n') - fd.write('file content\n') - - with open(fpath, 'rb') as fd: - orig_pos = fd.tell() - eof_pos = fd.seek(0, 2) - fd.seek(orig_pos) - markers = FileMarkers(fd, eof_pos, cache_path) - self.assertEqual(len(markers), 3) - self.assertEqual(list(markers), [0, 8, 13]) - self.assertEqual(markers[0], 0) - self.assertEqual(markers[1], 8) - self.assertEqual(markers[2], 13) - self.assertEqual(markers[3], None) - - @mock.patch.object(FileMarkers, 'CHUNK_SIZE', 7) - @mock.patch.object(FileMarkers, 'BLOCK_SIZE_BASE', 1) - def test_constraints_file_markers_chunk_too_small(self): - with tempfile.TemporaryDirectory() as dtmp: - cache_path = os.path.join(dtmp, 'cache') - fpath = os.path.join(dtmp, 'f1') - fpath = os.path.join(dtmp, 'f1') - with open(fpath, 'w') as fd: - fd.write('this is\n') - fd.write('some\n') - fd.write('file content') - - with open(fpath, 'rb') as fd: - orig_pos = fd.tell() - eof_pos = fd.seek(0, 2) - fd.seek(orig_pos) - markers = FileMarkers(fd, eof_pos, cache_path) - self.assertEqual(len(markers), 3) - self.assertEqual(list(markers), [0, 8, 13]) - self.assertEqual(markers[0], 0) - self.assertEqual(markers[1], 8) - self.assertEqual(markers[2], 13) - self.assertEqual(markers[3], None) - @utils.create_files({'f1': LOGS_W_TS}) def test_binary_search(self): self.current_date = self.get_date('Tue Jan 03 00:00:01 UTC 2022') @@ -192,7 +164,7 @@ def test_binary_search(self): fd.write('somejunk\n' + LOGS_W_TS) with open(_file, 'rb') as fd: - self.assertEqual(c.apply_to_file(fd), 1) + self.assertEqual(c.apply_to_file(fd), 9) c = SearchConstraintSearchSince(current_date=self.current_date, cache_path=self.constraints_cache_path, @@ -202,14 +174,478 @@ def test_binary_search(self): with open(_file, 'rb') as fd: offset = c.apply_to_file(fd) - self.assertEqual(offset, BinarySearchState.SKIP_MAX - 1) + self.assertEqual(offset, 4491) c = SearchConstraintSearchSince(current_date=self.current_date, cache_path=self.constraints_cache_path, ts_matcher_cls=TimestampSimple, days=7) with open(_file, 'w') as fd: - fd.write('somejunk\n' * 500 + LOGS_W_TS) + fd.write('somejunk\n' * 501 + LOGS_W_TS) with open(_file, 'rb') as fd: offset = c.apply_to_file(fd) self.assertEqual(offset, 0) + + +class TestSearchState(TestSearchKitBase): + + def test_construct_found(self): + uut = SearchState(FindTokenStatus.FOUND, 15) + self.assertEqual(uut.status, FindTokenStatus.FOUND) + self.assertEqual(uut.offset, 15) + + def test_construct_failed(self): + uut = SearchState(FindTokenStatus.FAILED, 15) + self.assertEqual(uut.status, FindTokenStatus.FAILED) + + with self.assertRaises(InvalidSearchState): + uut.offset + + def test_construct_reached_eof(self): + uut = SearchState(FindTokenStatus.REACHED_EOF, 15) + self.assertEqual(uut.status, FindTokenStatus.REACHED_EOF) + self.assertEqual(uut.offset, 15) + + +class TestNonDestructiveFileRead(TestSearchKitBase): + + def test_peek(self): + mock_file = mock.MagicMock() + mock_file.tell.return_value = 0xBADC0DE + with NonDestructiveFileRead(mock_file): + pass + mock_file.seek.assert_called_once_with(0xBADC0DE) + + +class TestLogLine(TestSearchKitBase): + + def test_construct(self): + mock_file = mock.MagicMock() + mock_file.read.return_value = "01.01.1970 thisisaline" + mock_constraint = mock.MagicMock() + mock_constraint.extracted_datetime.return_value = datetime(1970, 1, 1) + mock_lslf = mock.MagicMock() + mock_lslf.offset = 5 + mock_lslf.status = FindTokenStatus.FOUND + mock_lelf = mock.MagicMock() + mock_lelf.offset = 10 + mock_lelf.status = FindTokenStatus.FOUND + uut = LogLine(mock_file, mock_constraint, mock_lslf, mock_lelf) + + self.assertEqual(uut.start_lf.offset, 5) + self.assertEqual(uut.end_lf.offset, 10) + self.assertEqual(uut.start_offset, 6) + self.assertEqual(uut.end_offset, 9) + self.assertEqual(uut.text, "01.01.1970 thisisaline") + self.assertEqual(uut.date, datetime(1970, 1, 1)) + + mock_lslf.status = FindTokenStatus.REACHED_EOF + mock_lelf.status = FindTokenStatus.REACHED_EOF + + self.assertEqual(uut.start_offset, 5) + self.assertEqual(uut.end_offset, 10) + + +class TestLogFileDateSinceSeeker(TestSearchKitBase): + + def setUp(self): + super().setUp() + self.bio = BytesIO(MYSQL_LOGS) + self.mock_file = mock.MagicMock() + self.mock_file.read.side_effect = lambda amount: self.bio.read(amount) + self.mock_file.seek.side_effect = lambda off, wh = 0: self.bio.seek( + off, wh) + self.mock_file.tell.side_effect = lambda: self.bio.tell() + self.constraint = SearchConstraintSearchSince( + current_date=self.get_date('Tue Apr 04 14:40:01 UTC 2019'), + cache_path=self.constraints_cache_path, + ts_matcher_cls=TimestampSimple, days=7) + self.mock_constraint = mock.MagicMock() + self.mock_constraint.extracted_datetime.return_value = datetime( + 2019, 4, 4, 14, 47, 33) + self.max_line_length = LogFileDateSinceSeeker.MAX_SEEK_HORIZON_EXPAND + self.max_line_length *= LogFileDateSinceSeeker.SEEK_HORIZON + + def test_construct(self): + uut = LogFileDateSinceSeeker( + self.mock_file, + self.mock_constraint) + self.assertEqual(uut.file, self.mock_file) + self.assertEqual(uut.constraint, self.mock_constraint) + self.assertEqual(uut.line_info, None) + self.assertEqual(uut.found_any_date, False) + + def test_find_token_reverse(self): + uut = LogFileDateSinceSeeker( + self.mock_file, + self.mock_constraint) + # Expectation: there is one LF between [0,100], status should be FOUND + # and offset should be `78` + result = uut.find_token_reverse(100, 256) + self.assertEqual(result.status, FindTokenStatus.FOUND) + self.assertEqual(result.offset, 78) + + def test_find_token_reverse_eof(self): + uut = LogFileDateSinceSeeker( + self.mock_file, + self.mock_constraint) + # Expectation: no LF between [0,77], status should be REACHED_EOF + result = uut.find_token_reverse(77, 256) + self.assertEqual(result.status, FindTokenStatus.REACHED_EOF) + self.assertEqual(result.offset, 0) + + def test_find_token_reverse_fail(self): + self.mock_file.read.side_effect = lambda n: bytes(('A' * n).encode()) + uut = LogFileDateSinceSeeker(self.mock_file, self.mock_constraint) + # Expectation: find_token_reverse should give up the search and + # status should be `failed` + result = uut.find_token_reverse(self.max_line_length + 257, 256) + self.assertEqual(result.status, FindTokenStatus.FAILED) + + def test_find_token(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.mock_constraint) + # Expectation: there is one LF between [0,100], status should be FOUND + # and offset should be `78` + result = uut.find_token(78, 256) + self.assertEqual(result.status, FindTokenStatus.FOUND) + self.assertEqual(result.offset, 78) + + def test_find_token_eof(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.mock_constraint) + # Expectation: there is one LF between [0,100], status should be FOUND + # and offset should be `78` + result = uut.find_token(6896, 256) + self.assertEqual(result.status, FindTokenStatus.REACHED_EOF) + self.assertEqual(result.offset, 6907) + + def test_find_token_fail(self): + self.mock_file.read.side_effect = lambda n: bytes(('A' * n).encode()) + uut = LogFileDateSinceSeeker(self.mock_file, self.mock_constraint) + # Expectation: there is one LF between [0,100], status should be FOUND + # and offset should be `78` + result = uut.find_token(100000, 256) + self.assertEqual(result.status, FindTokenStatus.FAILED) + + def test_try_find_line_slf_is_eof(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.mock_constraint) + result = uut.try_find_line(74) + self.assertEqual(result.start_lf.status, FindTokenStatus.REACHED_EOF) + self.assertEqual(result.start_lf.offset, 0) + self.assertEqual(result.end_lf.status, FindTokenStatus.FOUND) + self.assertEqual(result.end_lf.offset, 78) + self.assertEqual(result.start_offset, 0) + self.assertEqual(result.end_offset, 77) + self.assertEqual(result.text, + b"2019-04-04T14:47:33.199550Z mysqld_safe" + b" Logging to '/var/log/mysql/error.log'."[ + :LogLine.MAX_TEST_READ_BYTES]) + self.assertEqual(result.date, datetime(2019, 4, 4, 14, 47, 33)) + + def test_try_find_line_elf_is_eof(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.mock_constraint) + result = uut.try_find_line(6896) + self.assertEqual(result.start_lf.status, FindTokenStatus.FOUND) + self.assertEqual(result.start_lf.offset, 6837) + self.assertEqual(result.end_lf.status, FindTokenStatus.REACHED_EOF) + self.assertEqual(result.end_lf.offset, 6907) + self.assertEqual(result.start_offset, 6838) + self.assertEqual(result.end_offset, 6907) + self.assertEqual(result.text, + b"2019-04-04T14:47:35.522479Z 2 [Note] WSREP:" + b" State transfer required: ") + self.assertEqual(result.date, datetime(2019, 4, 4, 14, 47, 33)) + + def test_try_find_line_both_slf_elf_eof(self): + self.bio = BytesIO(b"thisisaline") + uut = LogFileDateSinceSeeker(self.mock_file, self.mock_constraint) + result = uut.try_find_line(4) + self.assertEqual(result.start_lf.status, FindTokenStatus.REACHED_EOF) + self.assertEqual(result.start_lf.offset, 0) + self.assertEqual(result.end_lf.status, FindTokenStatus.REACHED_EOF) + self.assertEqual(result.end_lf.offset, 11) + self.assertEqual(result.start_offset, 0) + self.assertEqual(result.end_offset, 11) + self.assertEqual(result.text, b"thisisaline") + self.assertEqual(result.date, datetime(2019, 4, 4, 14, 47, 33)) + + def test_try_find_line_slf_elf_exists(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.mock_constraint) + result = uut.try_find_line(83) + self.assertEqual(result.start_lf.status, FindTokenStatus.FOUND) + self.assertEqual(result.start_lf.offset, 78) + self.assertEqual(result.end_lf.status, FindTokenStatus.FOUND) + self.assertEqual(result.end_lf.offset, 193) + self.assertEqual(result.start_offset, 79) + self.assertEqual(result.end_offset, 192) + self.assertEqual(result.text, + b"2019-04-04T14:47:33.243881Z mysqld_safe" + b" Starting mysqld daemon with databases from" + b" /var/lib/percona-xtradb-cluster"[ + :LogLine.MAX_TEST_READ_BYTES]) + self.assertEqual(result.date, datetime(2019, 4, 4, 14, 47, 33)) + + def test_try_find_line_elf_failed(self): + self.mock_file.read.side_effect = lambda n: bytes(('A' * n).encode()) + uut = LogFileDateSinceSeeker(self.mock_file, self.mock_constraint) + with self.assertRaises(ValueError) as rexc: + uut.try_find_line(83) + self.assertEqual(str(rexc.exception), "Could not find ending line" + " feed offset at epicenter 83") + + def test_try_find_line_slf_failed(self): + contents = ('A' * ((self.max_line_length * 2) - 1)) + '\n' + self.bio = BytesIO(bytes(contents.encode())) + uut = LogFileDateSinceSeeker(self.mock_file, self.mock_constraint) + with self.assertRaises(ValueError) as rexc: + uut.try_find_line(self.max_line_length) + self.assertEqual(str(rexc.exception), "Could not find start line feed " + "offset at epicenter 1048576") + + def test_try_find_line_w_constraint(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + result = uut.try_find_line(83) + # 2019-04-04T14:47:33.199550Z + self.assertEqual(result.date, datetime(2019, 4, 4, 14, 47, 33)) + + result = uut.try_find_line(5000) + self.assertEqual(result.date, datetime(2019, 4, 4, 14, 47, 35)) + + def test_try_find_line_verify_offsets(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + seen_lines = set() + expected_offsets = set( + [ + (0, 77), (79, 192), (194, 303), (305, 416), + (418, 511), (513, 607), (609, 797), (799, 908), + (910, 981), (983, 1101), (1103, 1220), (1222, 1351), + (1353, 1433), (1435, 1557), (1559, 3125), (3127, 3272), + (3274, 3383), (3385, 3457), (3459, 3522), (3524, 3633), + (3635, 3714), (3716, 3807), (3809, 4050), (4052, 4218), + (4220, 4279), (4281, 4388), (4390, 4485), (4487, 4543), + (4545, 4651), (4653, 4787), (4789, 4917), (4919, 5014), + (5016, 5086), (5088, 5175), (5177, 5206), (5208, 5213), + (5215, 5225), (5227, 5237), (5239, 5240), (5242, 5249), + (5251, 5252), (5254, 5259), (5261, 5262), (5264, 5276), + (5278, 5279), (5281, 5281), (5283, 5371), (5373, 5432), + (5434, 5508), (5510, 5623), (5625, 5707), (5709, 5820), + (5822, 5961), (5963, 6038), (6040, 6185), (6187, 6326), + (6328, 6386), (6388, 6403), (6405, 6426), (6428, 6443), + (6445, 6478), (6480, 6495), (6497, 6513), (6515, 6550), + (6552, 6601), (6603, 6679), (6681, 6759), (6761, 6836), + (6838, 6907) + ] + ) + + for i in range(0, len(MYSQL_LOGS)): + result = uut.try_find_line(i) + self.assertNotEqual(result, None) + self.assertNotEqual(result.text, None) + self.assertGreater(len(result.text), 0) + self.assertTrue(result.date or "2019" not in result.text.decode()) + self.assertGreaterEqual(result.start_offset, 0) + self.assertLessEqual(result.start_offset, len(MYSQL_LOGS)) + self.assertGreaterEqual(result.end_offset, 0) + self.assertLessEqual(result.end_offset, len(MYSQL_LOGS)) + self.assertGreaterEqual(result.start_offset, + result.start_lf.offset) + self.assertLessEqual(result.start_offset, + result.start_lf.offset + 1) + self.assertGreaterEqual(result.end_offset, + result.end_lf.offset - 1) + self.assertLessEqual(result.end_offset, + result.end_lf.offset) + seen_lines.add((result.start_offset, result.end_offset)) + + self.assertEqual(len(seen_lines), 69) + self.assertEqual(len(seen_lines), len(expected_offsets)) + self.assertEqual(seen_lines, expected_offsets) + + def test_try_find_line_with_date(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + seen_lines = set() + expected_offsets = set( + [ + (0, 77), (79, 192), (194, 303), (305, 416), + (418, 511), (513, 607), (609, 797), (799, 908), + (910, 981), (983, 1101), (1103, 1220), (1222, 1351), + (1353, 1433), (1435, 1557), (1559, 3125), (3127, 3272), + (3274, 3383), (3385, 3457), (3459, 3522), (3524, 3633), + (3635, 3714), (3716, 3807), (3809, 4050), (4052, 4218), + (4220, 4279), (4281, 4388), (4390, 4485), (4487, 4543), + (4545, 4651), (4653, 4787), (4789, 4917), (4919, 5014), + (5016, 5086), (5088, 5175), (5283, 5371), (5373, 5432), + (5434, 5508), (5510, 5623), (5625, 5707), (5709, 5820), + (5822, 5961), (5963, 6038), (6040, 6185), (6187, 6326), + (6328, 6386), (6603, 6679), (6681, 6759), (6761, 6836), + (6838, 6907) + ] + ) + for i in range(0, len(MYSQL_LOGS)): + result = uut.try_find_line_with_date(i) + self.assertNotEqual(result, None) + self.assertNotEqual(result.text, None) + self.assertGreater(len(result.text), 0) + self.assertNotEqual(result.date, None) + self.assertGreaterEqual(result.start_offset, 0) + self.assertLessEqual(result.start_offset, len(MYSQL_LOGS)) + self.assertGreaterEqual(result.end_offset, 0) + self.assertLessEqual(result.end_offset, len(MYSQL_LOGS)) + self.assertGreaterEqual(result.start_offset, + result.start_lf.offset) + self.assertLessEqual(result.start_offset, + result.start_lf.offset + 1) + self.assertGreaterEqual(result.end_offset, + result.end_lf.offset - 1) + self.assertLessEqual(result.end_offset, + result.end_lf.offset) + seen_lines.add((result.start_offset, result.end_offset)) + + # We expect here to see 49 seen lines as the log file contains + # log lines without date. The algorithm should retrieve the nearest + # log line with date in that case. + self.assertEqual(len(seen_lines), 49) + self.assertEqual(len(seen_lines), len(expected_offsets)) + self.assertEqual(seen_lines, expected_offsets) + + def test_try_find_line_with_date_fallback_backwards(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + for offset in [5181, 5212, 5248, 5258, 5275]: + result = uut.try_find_line_with_date(offset, forwards=False) + self.assertNotEqual(result, None) + self.assertNotEqual(result.date, None) + self.assertEqual(result.date, datetime(2019, 4, 4, 14, 47, 35)) + self.assertEqual(result.start_offset, 5088) + self.assertEqual(result.end_offset, 5175) + + def test_try_find_line_with_date_fallback_forwards(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + for offset in [5181, 5212, 5248, 5258, 5275]: + result = uut.try_find_line_with_date(offset, forwards=True) + self.assertNotEqual(result, None) + self.assertNotEqual(result.date, None) + self.assertEqual(result.date, datetime(2019, 4, 4, 14, 47, 35)) + self.assertEqual(result.start_offset, 5283) + self.assertEqual(result.end_offset, 5371) + + def test_getitem(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + for offset in [5181, 5212, 5248, 5258, 5275]: + result = uut[offset] + self.assertNotEqual(uut.line_info, None) + self.assertNotEqual(uut.line_info.date, None) + self.assertEqual(uut.line_info.date, result) + self.assertEqual(result, datetime(2019, 4, 4, 14, 47, 35)) + self.assertEqual(uut.line_info.start_offset, 5088) + self.assertEqual(uut.line_info.end_offset, 5175) + + def test_getitem_forwards(self): + # Cut MYSQL_LOGS in such a way that there's no backwards logs + # so the algorithm have to fallback to forwards search. + self.bio = BytesIO(MYSQL_LOGS[5177:]) + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + for offset in [1, 31, 65, 77, 87]: + result = uut[offset] + self.assertNotEqual(uut.line_info, None) + self.assertNotEqual(uut.line_info.date, None) + self.assertEqual(uut.line_info.date, result) + self.assertEqual(result, datetime(2019, 4, 4, 14, 47, 35)) + self.assertEqual(uut.line_info.start_offset, 106) + self.assertEqual(uut.line_info.end_offset, 194) + + def test_getitem_backwards(self): + # Cut MYSQL_LOGS in such a way that there's no forwards logs. + self.bio = BytesIO(MYSQL_LOGS[5016:5282]) + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + for offset in [192, 226]: + result = uut[offset] + self.assertNotEqual(uut.line_info, None) + self.assertNotEqual(uut.line_info.date, None) + self.assertEqual(uut.line_info.date, result) + self.assertEqual(result, datetime(2019, 4, 4, 14, 47, 35)) + self.assertEqual(uut.line_info.start_offset, 72) + self.assertEqual(uut.line_info.end_offset, 159) + + def test_getitem_all(self): + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + for i in range(0, len(MYSQL_LOGS)): + result = uut.try_find_line_with_date(i) + self.assertNotEqual(result, None) + self.assertNotEqual(result.text, None) + self.assertGreater(len(result.text), 0) + self.assertNotEqual(result.date, None) + self.assertGreaterEqual(result.start_offset, 0) + self.assertLessEqual(result.start_offset, len(MYSQL_LOGS)) + self.assertGreaterEqual(result.end_offset, 0) + self.assertLessEqual(result.end_offset, len(MYSQL_LOGS)) + self.assertGreaterEqual(result.start_offset, + result.start_lf.offset) + self.assertLessEqual(result.start_offset, + result.start_lf.offset + 1) + self.assertGreaterEqual(result.end_offset, + result.end_lf.offset - 1) + self.assertLessEqual(result.end_offset, + result.end_lf.offset) + + def test_run_1(self): + self.constraint = SearchConstraintSearchSince( + current_date=self.get_date('Tue Apr 11 14:47:33 UTC 2019'), + cache_path=self.constraints_cache_path, + ts_matcher_cls=TimestampSimple, days=7) + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + result = uut.run() + self.assertEqual(result, (0, 77)) + + def test_run_2(self): + self.constraint = SearchConstraintSearchSince( + current_date=self.get_date('Tue Apr 11 14:47:34 UTC 2019'), + cache_path=self.constraints_cache_path, + ts_matcher_cls=TimestampSimple, days=7) + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + result = uut.run() + self.assertEqual(result, (4653, 4787)) + + def test_run_3(self): + self.constraint = SearchConstraintSearchSince( + current_date=self.get_date('Tue Apr 11 14:47:35 UTC 2019'), + cache_path=self.constraints_cache_path, + ts_matcher_cls=TimestampSimple, days=7) + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + result = uut.run() + self.assertEqual(result, (4919, 5014)) + + def test_run_4(self): + self.constraint = SearchConstraintSearchSince( + current_date=self.get_date('Tue Apr 11 14:47:35 UTC 2019'), + cache_path=self.constraints_cache_path, + ts_matcher_cls=TimestampSimple, days=7) + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + result = uut.run() + self.assertEqual(result, (4919, 5014)) + + def test_run_before(self): + self.constraint = SearchConstraintSearchSince( + current_date=self.get_date('Tue Apr 11 14:47:32 UTC 2019'), + cache_path=self.constraints_cache_path, + ts_matcher_cls=TimestampSimple, days=7) + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + result = uut.run() + self.assertEqual(result, (0, 77)) + + def test_run_no_such_date(self): + self.constraint = SearchConstraintSearchSince( + current_date=self.get_date('Tue Apr 11 14:47:36 UTC 2019'), + cache_path=self.constraints_cache_path, + ts_matcher_cls=TimestampSimple, days=7) + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + with self.assertRaises(ValidLinesNotFound): + uut.run() + + def test_run_no_date_found(self): + self.bio = BytesIO(b"nodatewhatsoever") + self.constraint = SearchConstraintSearchSince( + current_date=self.get_date('Tue Apr 11 14:47:36 UTC 2019'), + cache_path=self.constraints_cache_path, + ts_matcher_cls=TimestampSimple, days=7) + uut = LogFileDateSinceSeeker(self.mock_file, self.constraint) + with self.assertRaises(ValidFormattedDateNotFound): + uut.run()