Skip to content

Commit

Permalink
global: support for ES scroll
Browse files Browse the repository at this point in the history
* Adds scrolling support for receiving records from ES cluster.
  (closes inveniosoftware#17)

Signed-off-by: Jiri Kuncar <[email protected]>
  • Loading branch information
jirikuncar committed Feb 12, 2016
1 parent 7bba244 commit a655686
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 66 deletions.
9 changes: 5 additions & 4 deletions examples/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,17 @@ def fixtures():


@fixtures.command()
@click.option('-n', 'number', type=click.INT, default=27)
def oaiserver(number):
@click.option('-s', 'sets', type=click.INT, default=27)
@click.option('-r', 'records', type=click.INT, default=27)
def oaiserver(sets, records):
"""Initialize OAI-PMH server."""
from invenio_db import db
from invenio_oaiserver.models import OAISet
from invenio_records.api import Record

# create a OAI Set
with db.session.begin_nested():
for i in range(number):
for i in range(sets):
db.session.add(OAISet(
spec='test{0}'.format(i),
name='Test{0}'.format(i),
Expand All @@ -126,7 +127,7 @@ def oaiserver(number):
with app.app_context():
indexer = RecordIndexer()
with db.session.begin_nested():
for i in range(number):
for i in range(records):
record_id = uuid.uuid4()
data = {'title': 'Test{0}'.format(i), '$schema': schema}
recid_minter(record_id, data)
Expand Down
2 changes: 1 addition & 1 deletion invenio_oaiserver/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
'identity',
]

OAISERVER_RESUMPTION_TOKEN_EXPIRE_TIME = 60 * 60 # time in seconds
OAISERVER_RESUMPTION_TOKEN_EXPIRE_TIME = 1 * 60 # time in seconds = 1 minute

OAISERVER_SETS_MAX_LENGTH = 3

Expand Down
87 changes: 55 additions & 32 deletions invenio_oaiserver/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,53 +48,76 @@ def match(self, record):

def get_records(**kwargs):
"""Get records."""
page = kwargs.get('resumptionToken', {}).get('page', 1)
size = current_app.config['OAISERVER_PAGE_SIZE']
query = Query()[(page-1)*size:page*size]

body = {}
if 'set' in kwargs:
body['must'] = [{'match': {'_oai.sets': kwargs['set']}}]

time_range = {}
if 'from_' in kwargs:
time_range['gte'] = kwargs['from_']
if 'until' in kwargs:
time_range['lte'] = kwargs['until']
if time_range:
body['filter'] = [{'range': {'_oai.updated': time_range}}]

if body:
query.body = {'query': {'bool': body}}

response = current_search_client.search(
index=current_app.config['OAISERVER_RECORD_INDEX'],
body=query.body,
)
page_ = kwargs.get('resumptionToken', {}).get('page', 1)
size_ = current_app.config['OAISERVER_PAGE_SIZE']
scroll = current_app.config['OAISERVER_RESUMPTION_TOKEN_EXPIRE_TIME']
scroll_id = kwargs.get('resumptionToken', {}).get('scroll_id')

if scroll_id is None:
query = Query()

body = {}
if 'set' in kwargs:
body['must'] = [{'match': {'_oai.sets': kwargs['set']}}]

time_range = {}
if 'from_' in kwargs:
time_range['gte'] = kwargs['from_']
if 'until' in kwargs:
time_range['lte'] = kwargs['until']
if time_range:
body['filter'] = [{'range': {'_oai.updated': time_range}}]

if body:
query.body = {'query': {'bool': body}}

response = current_search_client.search(
index=current_app.config['OAISERVER_RECORD_INDEX'],
body=query.body,
from_=(page_-1)*size_,
size=size_,
scroll='{0}s'.format(scroll),
)
else:
response = current_search_client.scroll(
scroll_id=scroll_id,
scroll='{0}s'.format(scroll),
)
scroll_id = response.get('_scroll_id')

# clean descriptor on last page
if page * per_page > total:
response = current_search_client.clear_scroll(
scroll_id=scroll_id
)
scroll_id = None

class Pagination(object):
"""Dummy pagination class."""

@property
def total(self):
"""Return number of hits found."""
return response['hits']['total']
# custom property for scrolling
_scroll_id = scroll_id

@property
def has_next(self):
"""Return True if there are more results."""
return page*size <= self.total
page = page_
total = response['hits']['total']
per_page = size_
has_next = page * per_page <= total
next_num = page + 1 if has_next else None

@property
def items(self):
"""Return iterator."""
from datetime import datetime
for result in response['hits']['hits']:
yield {
'id': result['_id'],
'json': result['_source'],
# FIXME use ES
'updated': RecordMetadata.query.filter_by(
id=result['_id']).one().updated,
# datetime.strptime(
# result['_source']['_oai']['updated'],
# '%Y-%m-%dT%H:%M:%S.%f'
# ),
}

return Pagination()
26 changes: 11 additions & 15 deletions invenio_oaiserver/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,20 @@ def identify(**kwargs):
return e_tree


def resumption_token(parent, has_next=False, total=None, **kwargs):
def resumption_token(parent, pagination, **kwargs):
"""Attach resumption token element to a parent."""
page = kwargs.get('resumptionToken', {}).get('page', 1)
size = current_app.config['OAISERVER_PAGE_SIZE']

# Do not add resumptionToken if all results fit to the first page.
if page == 1 and not has_next:
if pagination.page == 1 and not pagination.has_next:
return

token = serialize(has_next=has_next, **kwargs)
token = serialize(pagination, **kwargs)
e_resumptionToken = SubElement(parent, etree.QName(NS_OAIPMH,
'resumptionToken'))
if total:
e_resumptionToken.set('cursor', str((page - 1) * size))
e_resumptionToken.set('completeListSize', str(total))
if pagination.total:
e_resumptionToken.set('cursor', str(
(pagination.page - 1) * pagination.per_page
))
e_resumptionToken.set('completeListSize', str(pagination.total))

if token:
e_resumptionToken.text = token
Expand Down Expand Up @@ -209,8 +208,7 @@ def listsets(**kwargs):
e_description = SubElement(e_dc, etree.QName(NS_DC, 'description'))
e_description.text = oai_set.description

resumption_token(e_listsets, has_next=oai_sets.has_next,
total=oai_sets.total, **kwargs)
resumption_token(e_listsets, oai_sets, **kwargs)
return e_tree


Expand Down Expand Up @@ -293,8 +291,7 @@ def listidentifiers(**kwargs):
sets=record['json'].get('_oai', {}).get('sets', []),
)

resumption_token(e_listidentifiers, has_next=result.has_next,
total=result.total, **kwargs)
resumption_token(e_listidentifiers, result, **kwargs)
return e_tree


Expand All @@ -318,6 +315,5 @@ def listrecords(**kwargs):
e_metadata = SubElement(e_record, etree.QName(NS_OAIPMH, 'metadata'))
e_metadata.append(record_dumper(record['json']))

resumption_token(e_listrecords, has_next=result.has_next,
total=result.total, **kwargs)
resumption_token(e_listrecords, result, **kwargs)
return e_tree
25 changes: 11 additions & 14 deletions invenio_oaiserver/resumption_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,29 @@

"""Implement funtions for managing OAI-PMH resumption token."""

import random

from flask import current_app, request
from itsdangerous import URLSafeTimedSerializer
from marshmallow import fields


def serialize(has_next=True, **kwargs):
def serialize(pagination, **kwargs):
"""Return resumtion token serializer."""
if not has_next:
if not pagination.has_next:
return

# TODO completeListSize and cursor
page = 2 # first build token for next page

if 'resumptionToken' in kwargs:
page = kwargs['resumptionToken']['page'] + 1
del kwargs['resumptionToken']

for key in ('from_', 'until'):
if key in kwargs:
kwargs[key] = request.args.get(key)

token_builder = URLSafeTimedSerializer(
current_app.config['SECRET_KEY'],
salt=kwargs['verb'],
)
return token_builder.dumps(dict(kwargs=kwargs, page=page))

data = dict(seed=random.random(), page=pagination.next_num)
scroll_id = getattr(pagination, '_scroll_id', None)
if scroll_id:
data['scroll_id'] = scroll_id

return token_builder.dumps(data)


class ResumptionToken(fields.Field):
Expand Down

0 comments on commit a655686

Please sign in to comment.