Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File stat (e.g., mtime) timestamps #38

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 166 additions & 57 deletions rotate_backups/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ def load_config_file(configuration_file=None, expand=True):
# 'timestamp-pattern' configuration file option has a value set.
if items.get('timestamp-pattern'):
options['timestamp_pattern'] = items['timestamp-pattern']
if items.get('stat-timestamp'):
options['stat_timestamp'] = items['stat-timestamp']
# Expand filename patterns?
if expand and location.have_wildcards:
logger.verbose("Expanding filename pattern %s on %s ..", location.directory, location.context)
Expand Down Expand Up @@ -260,8 +262,134 @@ def rotate_backups(directory, rotation_scheme, **options):
program.rotate_backups(directory)


class RotateBackups(PropertyManager):
class Match:
"""An interface-like class for a date match."""

def match_to_datetime(self) -> datetime.datetime:
"""Return a datetime from the Match."""
pass


class Matcher:
"""An interface-like class for a date-matching scheme."""

def search(self, location: str, entry: str) -> Match:
"""Process an entry in a location and return a Match."""
pass


class FilenameMatch(Match):
"""A match based on a filename pattern."""

match: re.Match = None

def __init__(self, match: re.Match):
"""Make a Match from a regular expression match."""
self.match = match

def match_to_datetime(self) -> datetime.datetime:
"""
Convert the regular expression match to a :class:`~datetime.datetime` value.

:returns: A :class:`~datetime.datetime` value.
:raises: :exc:`exceptions.ValueError` when a required date component is
not captured by the pattern, the captured value is an empty
string or the captured value cannot be interpreted as a
base-10 integer.

.. seealso:: :data:`SUPPORTED_DATE_COMPONENTS`
"""
kw = {}
captures = self.match.groupdict()
for component, required in SUPPORTED_DATE_COMPONENTS:
value = captures.get(component)
if value:
kw[component] = int(value, 10)
elif required:
raise ValueError("Missing required date component! (%s)" % component)
else:
kw[component] = 0
return datetime.datetime(**kw)


class FilenameMatcher(Matcher, PropertyManager):
"""A date-matching scheme based on filenames."""

def search(self, location: str, entry: str) -> FilenameMatch:
"""Apply the pattern to the entry's name, and return a Match if found."""
if match := self.timestamp_pattern.search(entry):
return FilenameMatch(match)

@mutable_property
def timestamp_pattern(self):
"""
The pattern used to extract timestamps from filenames (defaults to :data:`TIMESTAMP_PATTERN`).

The value of this property is a compiled regular expression object.
Callers can provide their own compiled regular expression which
makes it possible to customize the compilation flags (see the
:func:`re.compile()` documentation for details).

The regular expression pattern is expected to be a Python compatible
regular expression that defines the named capture groups 'year',
'month' and 'day' and optionally 'hour', 'minute' and 'second'.

String values are automatically coerced to compiled regular expressions
by calling :func:`~humanfriendly.coerce_pattern()`, in this case only
the :data:`re.VERBOSE` flag is used.

If the caller provides a custom pattern it will be validated
to confirm that the pattern contains named capture groups
corresponding to each of the required date components
defined by :data:`SUPPORTED_DATE_COMPONENTS`.
"""
return TIMESTAMP_PATTERN

@timestamp_pattern.setter
def timestamp_pattern(self, value):
"""Coerce the value of :attr:`timestamp_pattern` to a compiled regular expression."""
pattern = coerce_pattern(value, re.VERBOSE)
for component, required in SUPPORTED_DATE_COMPONENTS:
if component not in pattern.groupindex and required:
raise ValueError("Pattern is missing required capture group! (%s)" % component)
set_property(self, 'timestamp_pattern', pattern)


class FilestatMatch(Match):
"""A match based on a filename statistics."""

entry: str = None
field: str = None

def __init__(self, entry: str, timestamp='mtime'):
"""Make a Match from a field's stat attribute."""
self.entry = entry
self.field = f'st_{timestamp}'

def match_to_datetime(self) -> datetime.datetime:
"""Return a datetime from the file's stat field."""
return datetime.datetime.fromtimestamp(
getattr(os.stat(self.entry), self.field)
)


class FilestatMatcher(Matcher):
"""A date-matching scheme based on file statistics."""

timestamp: str = None

def __init__(self, timestamp: str = 'mtime'):
"""Make a Matcher based on a file's stat attribute."""
self.timestamp = timestamp

def search(self, location: str, entry: str) -> FilestatMatch:
"""Return the file's stat attribute."""
return FilestatMatch(
os.path.join(location.directory, entry),
self.timestamp)


class RotateBackups(PropertyManager):
"""Python API for the ``rotate-backups`` program."""

def __init__(self, rotation_scheme, **options):
Expand Down Expand Up @@ -418,6 +546,19 @@ def rotation_scheme(self):
in the dictionary.
"""

@mutable_property
def stat_timestamp(self):
"""Whether to use the files' mtime instead of parsing their name."""
return isinstance(self.match, FilestatMatcher)

@stat_timestamp.setter
def stat_timestamp(self, value):
if value:
logger.info("Using file mtime to determine file date")
self.matcher = FilestatMatcher()
elif isinstance(self.match, FilestatMatcher):
del self.matcher

@mutable_property
def strict(self):
"""
Expand Down Expand Up @@ -447,39 +588,31 @@ def strict(self):
"""
return True

@mutable_property
@property
def timestamp_pattern(self):
"""
The pattern used to extract timestamps from filenames (defaults to :data:`TIMESTAMP_PATTERN`).

The value of this property is a compiled regular expression object.
Callers can provide their own compiled regular expression which
makes it possible to customize the compilation flags (see the
:func:`re.compile()` documentation for details).

The regular expression pattern is expected to be a Python compatible
regular expression that defines the named capture groups 'year',
'month' and 'day' and optionally 'hour', 'minute' and 'second'.

String values are automatically coerced to compiled regular expressions
by calling :func:`~humanfriendly.coerce_pattern()`, in this case only
the :data:`re.VERBOSE` flag is used.

If the caller provides a custom pattern it will be validated
to confirm that the pattern contains named capture groups
corresponding to each of the required date components
defined by :data:`SUPPORTED_DATE_COMPONENTS`.
"""
return TIMESTAMP_PATTERN
"""Pattern to use to extract a timestamp from a filename."""
if not isinstance(self.matcher, FilenameMatcher):
raise ValueError('Current matcher is not a FilenameMatcher')
return self.matcher.timestamp_pattern

@timestamp_pattern.setter
def timestamp_pattern(self, value):
"""Coerce the value of :attr:`timestamp_pattern` to a compiled regular expression."""
pattern = coerce_pattern(value, re.VERBOSE)
for component, required in SUPPORTED_DATE_COMPONENTS:
if component not in pattern.groupindex and required:
raise ValueError("Pattern is missing required capture group! (%s)" % component)
set_property(self, 'timestamp_pattern', pattern)
"""Pattern to use to extract a timestamp from a filename."""
if not isinstance(self.matcher, FilenameMatcher):
raise ValueError('Current matcher is not a FilenameMatcher')
self.matcher.timestamp_pattern = value

@cached_property
def matcher(self):
"""Matcher to use to extract a timestamp from a file."""
return FilenameMatcher(timestamp_pattern=TIMESTAMP_PATTERN)

@matcher.setter
def matcher(self, matcher):
"""Matcher to use to extract a timestamp from a file."""
if not isinstance(matcher, Matcher):
raise ValueError(f'{matcher} is not a Matcher')
set_property(self, 'matcher', matcher)

def rotate_concurrent(self, *locations, **kw):
"""
Expand Down Expand Up @@ -642,8 +775,9 @@ def collect_backups(self, location):
location = coerce_location(location)
logger.info("Scanning %s for backups ..", location)
location.ensure_readable(self.force)

for entry in natsort(location.context.list_entries(location.directory)):
match = self.timestamp_pattern.search(entry)
match = self.matcher.search(location, entry)
if match:
if self.exclude_list and any(fnmatch.fnmatch(entry, p) for p in self.exclude_list):
logger.verbose("Excluded %s (it matched the exclude list).", entry)
Expand All @@ -653,7 +787,7 @@ def collect_backups(self, location):
try:
backups.append(Backup(
pathname=os.path.join(location.directory, entry),
timestamp=self.match_to_datetime(match),
timestamp=match.match_to_datetime(),
))
except ValueError as e:
logger.notice("Ignoring %s due to invalid date (%s).", entry, e)
Expand All @@ -663,31 +797,6 @@ def collect_backups(self, location):
logger.info("Found %i timestamped backups in %s.", len(backups), location)
return sorted(backups)

def match_to_datetime(self, match):
"""
Convert a regular expression match to a :class:`~datetime.datetime` value.

:param match: A regular expression match object.
:returns: A :class:`~datetime.datetime` value.
:raises: :exc:`exceptions.ValueError` when a required date component is
not captured by the pattern, the captured value is an empty
string or the captured value cannot be interpreted as a
base-10 integer.

.. seealso:: :data:`SUPPORTED_DATE_COMPONENTS`
"""
kw = {}
captures = match.groupdict()
for component, required in SUPPORTED_DATE_COMPONENTS:
value = captures.get(component)
if value:
kw[component] = int(value, 10)
elif required:
raise ValueError("Missing required date component! (%s)" % component)
else:
kw[component] = 0
return datetime.datetime(**kw)

def group_backups(self, backups):
"""
Group backups collected by :func:`collect_backups()` by rotation frequencies.
Expand Down
13 changes: 10 additions & 3 deletions rotate_backups/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@
readable and/or writable for the current user (or the user logged in to a
remote system over SSH).

-s, --stat-timestamp

Use mtime stat timestamps, instead of filenames, to determine the
date of each file.

-S, --syslog=CHOICE

Explicitly enable or disable system logging instead of letting the program
Expand Down Expand Up @@ -241,12 +246,12 @@ def main():
selected_locations = []
# Parse the command line arguments.
try:
options, arguments = getopt.getopt(sys.argv[1:], 'M:H:d:w:m:y:t:I:x:jpri:c:C:uS:fnvqh', [
options, arguments = getopt.getopt(sys.argv[1:], 'M:H:d:w:m:y:t:I:x:jpri:c:C:usS:fnvqh', [
'minutely=', 'hourly=', 'daily=', 'weekly=', 'monthly=', 'yearly=',
'timestamp-pattern=', 'include=', 'exclude=', 'parallel',
'prefer-recent', 'relaxed', 'ionice=', 'config=',
'removal-command=', 'use-sudo', 'syslog=', 'force',
'dry-run', 'verbose', 'quiet', 'help',
'removal-command=', 'use-sudo', 'stat-timestamp', 'syslog=',
'force', 'dry-run', 'verbose', 'quiet', 'help',
])
for option, value in options:
if option in ('-M', '--minutely'):
Expand Down Expand Up @@ -284,6 +289,8 @@ def main():
kw['removal_command'] = removal_command
elif option in ('-u', '--use-sudo'):
use_sudo = True
elif option in ('-s', '--stat-timestamp'):
kw['stat_timestamp'] = True
elif option in ('-S', '--syslog'):
use_syslog = coerce_boolean(value)
elif option in ('-f', '--force'):
Expand Down
Loading