Skip to content

Commit

Permalink
Incremental harvesting within a given time window (bloomonkey#23)
Browse files Browse the repository at this point in the history
* Factor out date parsing into add_argument type parameter

* Enable incremental harvesting

* Add an option to resume from a given token
  • Loading branch information
jgonggrijp authored and bloomonkey committed Jan 3, 2019
1 parent 0ab56f6 commit e80e59a
Showing 1 changed file with 76 additions and 14 deletions.
90 changes: 76 additions & 14 deletions oaiharvest/harvest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# encoding: utf-8
"""Harvest records from an OAI-PMH provider.
usage: %prog [-h] [--db DATABASEPATH] [-p METADATAPREFIX]
[-f YYYY-MM-DD] [-u YYYY-MM-DD] [-s SET] [-d DIR]
[--delete | --no-delete]
[-l LIMIT]
usage: %prog [-h] [--db DATABASEPATH] [-p METADATAPREFIX] [-r TOKEN]
[-f YYYY-MM-DD] [-u YYYY-MM-DD] [-s SET] [-b HH:MM HH:MM]
[-d DIR] [--delete | --no-delete] [-l LIMIT]
[--create-subdirs | --subdirs-on SUBDIRS]
provider [provider ...]
positional arguments:
Expand All @@ -21,20 +21,30 @@
-p METADATAPREFIX, --metadataPrefix METADATAPREFIX
the metadataPrefix of the format (XML Schema) in which
records should be harvested.
-r TOKEN, --resume-from TOKEN
start at the given resumption TOKEN
-f YYYY-MM-DD, --from YYYY-MM-DD
harvest only records added/modified after this date.
-u YYYY-MM-DD, --until YYYY-MM-DD
harvest only records added/modified up to this date.
-s SET, --set SET harvest only records within this set
-b HH:MM HH:MM, --between HH:MM HH:MM
harvest only between the first and the second wall
clock time (enables incremental harvesting)
-d DIR, --dir DIR where to output files for harvested records.default:
current working path
--delete respect the server's instructions regarding deletions,
i.e. delete the files locally (default)
--no-delete ignore the server's instructions regarding deletions,
i.e. DO NOT delete the files locally
-l LIMIT, --limit LIMIT
place a limit on the number of records to harvest from
each provider
limit the number of records to harvest from each
provider
--create-subdirs create target subdirs (based on / characters in
identifiers) ifthey don't exist. To use something
other than /, use the newer--subdirs-on option
--subdirs-on SUBDIRS create target subdirs based on occurrences of the
given characterin identifiers
Copyright (c) 2013, the University of Liverpool <http://www.liv.ac.uk>.
All rights reserved.
Expand All @@ -51,7 +61,8 @@
import platform
import sys
from argparse import ArgumentParser
from datetime import datetime
from datetime import datetime, timedelta
from time import sleep

import six.moves.urllib.parse as urllib
from oaipmh.client import Client
Expand All @@ -73,11 +84,33 @@ class OAIHarvester(object):
def __init__(self, mdRegistry):
self._mdRegistry = mdRegistry

def pause(self, now, until):
""" Unconditionally pause the process from `now` to `until`. """
logger = logging.getLogger(__name__).getChild('OAIHarvester.pause')
logger.info('Pausing until {} (incremental harvest).'.format(until))
sleep((until - now) / timedelta(seconds=1))

def maybe_pause_if_incremental(self, time_range):
""" Pause the process depending on incremental time range settings. """
if time_range is None:
return
now = datetime.now()
start = datetime.combine(now.date(), time_range[0].time())
stop = datetime.combine(now.date(), time_range[1].time())
if now < start:
if now < stop < start:
return
return self.pause(now, start)
if start < stop <= now:
return self.pause(now, start + timedelta(days=1))
# If we reach this point, there is no need to pause.

def _listRecords(self, baseUrl, metadataPrefix="oai_dc", **kwargs):
# Generator to yield records from baseUrl in the given metadataPrefix
# Add metatdataPrefix to args
kwargs['metadataPrefix'] = metadataPrefix
client = Client(baseUrl, self._mdRegistry)
incremental_range = kwargs.pop('between', None)
# Check that baseUrl actually represents an OAI-PMH target
try:
client.identify()
Expand All @@ -88,13 +121,15 @@ def _listRecords(self, baseUrl, metadataPrefix="oai_dc", **kwargs):
)
# Check server timestamp granularity support
client.updateGranularity()
self.maybe_pause_if_incremental(incremental_range)
for record in client.listRecords(**kwargs):
# Unit test hotfix
header, metadata, about = record
# Fix pyoai returning a "b'...'" string for py3k
if isinstance(metadata, str) and metadata.startswith("b'"):
metadata = ast.literal_eval(metadata).decode("utf-8")
yield (header, metadata, about)
self.maybe_pause_if_incremental(incremental_range)

def harvest(self, baseUrl, metadataPrefix, **kwargs):
"Harvest records"
Expand All @@ -120,7 +155,7 @@ def __init__(self, mdRegistry, directory,

def harvest(self, baseUrl, metadataPrefix, **kwargs):
"""Harvest records, return if completed.
:rtype: bool
:returns: Were all available records fetched and stored?
Expand Down Expand Up @@ -207,11 +242,6 @@ def main(argv=None):
else:
args = argparser.parse_args(argv)
logger = logging.getLogger(__name__).getChild('main')
# Parse from and until into datetime objects
if args.from_ is not None:
args.from_ = datetime.strptime(args.from_, "%Y-%m-%d")
if args.until is not None:
args.until = datetime.strptime(args.until, "%Y-%m-%d")
# Establish connection to persistent storage
cxn = verify_database(args.databasePath)
# Make a set of providers - don't repeat for repeated arguments
Expand Down Expand Up @@ -259,7 +289,7 @@ def main(argv=None):
if args.from_ is not None:
logger.warning('Value for command line option --from'
' over-rides recorded lastHarvest timestamp')
else:
elif args.resumptionToken is None:
args.from_ = row[3]
else:
baseUrl = provider
Expand Down Expand Up @@ -299,6 +329,10 @@ def main(argv=None):

if args.set is not None:
kwargs['set'] = args.set
if args.between is not None:
kwargs['between'] = args.between
if args.resumptionToken is not None:
kwargs['resumptionToken'] = args.resumptionToken
try:
completed = harvester.harvest(baseUrl,
args.metadataPrefix,
Expand Down Expand Up @@ -332,6 +366,16 @@ def main(argv=None):
"available from the server")


def parse_date(argument):
""" Date parser to be used as type argument for argparser options. """
return datetime.strptime(argument, "%Y-%m-%d")


def parse_time(argument):
""" Time parser to be used as type argument for argparser options. """
return datetime.strptime(argument, "%H:%M")


# Set up argument parser
docbits = __doc__.split('\n\n')

Expand Down Expand Up @@ -359,16 +403,25 @@ def main(argv=None):
dest='metadataPrefix',
help=("the metadataPrefix of the format (XML Schema) "
"in which records should be harvested."))
argparser.add_argument(
'-r',
'--resume-from',
dest='resumptionToken',
metavar='TOKEN',
help='start at the given resumption TOKEN',
)
argparser.add_argument(
"-f",
"--from",
type=parse_date,
dest="from_",
metavar="YYYY-MM-DD",
help=("harvest only records added/modified after this "
"date."))
argparser.add_argument(
"-u",
"--until",
type=parse_date,
dest="until",
metavar="YYYY-MM-DD",
help=("harvest only records added/modified up to this "
Expand All @@ -378,6 +431,15 @@ def main(argv=None):
"--set",
dest="set",
help=("harvest only records within this set"))
argparser.add_argument(
'-b',
'--between',
type=parse_time,
nargs=2,
metavar='HH:MM',
help=('harvest only between the first and the second wall clock time '
'(enables incremental harvesting)'),
)

group = argparser.add_mutually_exclusive_group()
group.add_argument(
Expand Down

0 comments on commit e80e59a

Please sign in to comment.