Skip to content

Commit

Permalink
Merge pull request #159 from dimagi/jls/inserted_at
Browse files Browse the repository at this point in the history
Use inserted_at to determine which forms/cases need updating
  • Loading branch information
snopoke authored Jan 28, 2021
2 parents 6ce2e93 + f7db1c9 commit 9f82c71
Show file tree
Hide file tree
Showing 13 changed files with 491 additions and 223 deletions.
75 changes: 59 additions & 16 deletions commcare_export/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from commcare_export.commcare_minilinq import PaginationMode
from commcare_export.exceptions import DataExportException
from commcare_export.writers import SqlMixin

Expand All @@ -38,6 +39,16 @@ class Checkpoint(Base):
final = Column(Boolean)
data_source = Column(String)
last_doc_id = Column(String)
pagination_mode = Column(String)

def get_pagination_mode(self):
"""Get Enum from value stored in the checkpoint. Null or empty value defaults to
'date_modified' mode to support legacy checkpoints.
"""
if not self.pagination_mode:
return PaginationMode.date_modified

return PaginationMode[self.pagination_mode]

def __repr__(self):
return (
Expand All @@ -53,7 +64,8 @@ def __repr__(self):
"time_of_run={r.time_of_run}, "
"final={r.final}), "
"data_source={r.data_source}, "
"last_doc_id={r.last_doc_id}>"
"last_doc_id={r.last_doc_id}, "
"pagination_mode={r.pagination_mode}>"
).format(r=self)


Expand Down Expand Up @@ -93,18 +105,20 @@ def for_dataset(self, data_source, table_names):
engine=self.engine, table_names=table_names, data_source=data_source
)

def set_checkpoint(self, checkpoint_time, is_final=False, doc_id=None):
self._set_checkpoint(checkpoint_time, is_final, doc_id=doc_id)
def set_checkpoint(self, checkpoint_time, pagination_mode, is_final=False, doc_id=None):
self._set_checkpoint(checkpoint_time, pagination_mode, is_final, doc_id=doc_id)
if is_final:
self._cleanup()

def _set_checkpoint(self, checkpoint_time, final, time_of_run=None, doc_id=None):
def _set_checkpoint(self, checkpoint_time, pagination_mode, final, time_of_run=None, doc_id=None):
logger.info(
'Setting %s checkpoint: data_source: %s, tables %s: checkpoint: %s:%s',
'Setting %s checkpoint: data_source: %s, tables: %s, pagination_mode: %s, checkpoint: %s:%s',
'final' if final else 'batch',
self.data_source,
', '.join(self.table_names),
checkpoint_time, doc_id
pagination_mode.name,
checkpoint_time,
doc_id
)
if not checkpoint_time:
raise DataExportException('Tried to set an empty checkpoint. This is not allowed.')
Expand All @@ -130,7 +144,8 @@ def _set_checkpoint(self, checkpoint_time, final, time_of_run=None, doc_id=None)
time_of_run=time_of_run or datetime.datetime.utcnow().isoformat(),
final=final,
data_source=self.data_source,
last_doc_id=doc_id
last_doc_id=doc_id,
pagination_mode=pagination_mode.name
)
session.add(checkpoint)
created.append(checkpoint)
Expand Down Expand Up @@ -195,15 +210,19 @@ def get_legacy_checkpoints(self):
project=self.project, commcare=self.commcare, key=self.key
)
if table_run:
return self._set_checkpoint(table_run.since_param, table_run.final, table_run.time_of_run)
return self._set_checkpoint(
table_run.since_param, PaginationMode.date_modified, table_run.final, table_run.time_of_run
)

# Check for run without the args
table_run = self._get_last_checkpoint(
session, query_file_md5=self.query_md5, key=self.key,
project=None, commcare=None, table_name=None
)
if table_run:
return self._set_checkpoint(table_run.since_param, table_run.final, table_run.time_of_run)
return self._set_checkpoint(
table_run.since_param, PaginationMode.date_modified, table_run.final, table_run.time_of_run
)

def _get_last_checkpoint(self, session, **kwarg_filters):
query = session.query(Checkpoint)
Expand Down Expand Up @@ -299,14 +318,15 @@ def _validate_tables(self):
raise Exception("Not tables set in checkpoint manager")


class CheckpointManagerWithSince(object):
def __init__(self, manager, since):
class CheckpointManagerWithDetails(object):
def __init__(self, manager, since_param, pagination_mode):
self.manager = manager
self.since_param = since
self.since_param = since_param
self.pagination_mode = pagination_mode

def set_checkpoint(self, checkpoint_time, is_final=False, doc_id=None):
if self.manager:
self.manager.set_checkpoint(checkpoint_time, is_final, doc_id=doc_id)
self.manager.set_checkpoint(checkpoint_time, self.pagination_mode, is_final, doc_id=doc_id)


class CheckpointManagerProvider(object):
Expand All @@ -326,6 +346,19 @@ def get_since(self, checkpoint_manager):
since = checkpoint_manager.get_time_of_last_checkpoint()
return dateutil.parser.parse(since) if since else None

def get_pagination_mode(self, checkpoint_manager):
"""Always use the default pagination mode unless we are continuing from
a previous checkpoint in which case use the same pagination mode as before.
"""
if self.start_over or self.since or not checkpoint_manager:
return PaginationMode.date_indexed

last_checkpoint = checkpoint_manager.get_last_checkpoint()
if not last_checkpoint:
return PaginationMode.date_indexed

return last_checkpoint.get_pagination_mode()

def get_checkpoint_manager(self, data_source, table_names):
"""This get's called before each table is exported and set in the `env`. It is then
passed to the API client and used to set the checkpoints.
Expand All @@ -339,9 +372,19 @@ def get_checkpoint_manager(self, data_source, table_names):
manager = self.base_checkpoint_manager.for_dataset(data_source, table_names)

since = self.get_since(manager)
pagination_mode = self.get_pagination_mode(manager)

logger.info(
"Creating checkpoint manager for tables: %s with 'since' parameter: %s",
', '.join(table_names), since
"Creating checkpoint manager for tables: %s, since: %s, pagination_mode: %s",
', '.join(table_names), since, pagination_mode.name
)
return CheckpointManagerWithSince(manager, since)
if pagination_mode != PaginationMode.date_indexed:
logger.warning(
"\n====================================\n"
"This export is using a deprecated pagination mode which will be removed in future versions.\n"
"To switch to the new mode you must re-sync your data using `--start-over`.\n"
"For more details see: %s"
"\n====================================\n",
"https://wiki.commcarehq.org/display/commcarepublic/CommCare+Export+Tool+Release+Notes"
)
return CheckpointManagerWithDetails(manager, since, pagination_mode)
90 changes: 50 additions & 40 deletions commcare_export/commcare_minilinq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,26 @@
API directly.
"""
import json
from enum import Enum

from commcare_export.env import DictEnv, CannotBind, CannotReplace
from datetime import datetime

from commcare_export.env import CannotBind, CannotReplace, DictEnv
from commcare_export.misc import unwrap
from dateutil.parser import ParserError, parse

try:
from urllib.parse import urlparse, parse_qs
from urllib.parse import parse_qs, urlparse
except ImportError:
from urlparse import urlparse, parse_qs
from urlparse import parse_qs, urlparse


SUPPORTED_RESOURCES = {
'form', 'case', 'user', 'location', 'application', 'web-user'
}


class PaginationMode(Enum):
date_indexed = "date_indexed"
date_modified = "date_modified"


class SimpleSinceParams(object):
Expand All @@ -23,9 +33,9 @@ def __init__(self, start, end):
self.end_param = end

def __call__(self, since, until):
params = {
self.start_param: since.isoformat()
}
params = {}
if since:
params[self.start_param] = since.isoformat()
if until:
params[self.end_param] = until.isoformat()
return params
Expand Down Expand Up @@ -74,33 +84,31 @@ def __call__(self, since, until):
return {'_search': query}


resource_since_params = {
'form': FormFilterSinceParams(),
'case': SimpleSinceParams('server_date_modified_start', 'server_date_modified_end'),
'user': None,
'location': None,
'application': None,
'web-user': None,
DATE_PARAMS = {
'indexed_on': SimpleSinceParams('indexed_on_start', 'indexed_on_end'),
'server_date_modified': SimpleSinceParams('server_date_modified_start', 'server_date_modified_end')
}


def get_paginator(resource, page_size=1000):
def get_paginator(resource, page_size=1000, pagination_mode=PaginationMode.date_indexed):
return {
'form': DatePaginator('form', ['server_modified_on','received_on'], page_size),
'case': DatePaginator('case', 'server_date_modified', page_size),
'user': SimplePaginator('user', page_size),
'location': SimplePaginator('location', page_size),
'application': SimplePaginator('application', page_size),
'web-user': SimplePaginator('web-user', page_size),
}[resource]
PaginationMode.date_indexed: {
'form': DatePaginator('indexed_on', page_size),
'case': DatePaginator('indexed_on', page_size),
},
PaginationMode.date_modified: {
'form': DatePaginator(['server_modified_on', 'received_on'], page_size, params=FormFilterSinceParams()),
'case': DatePaginator('server_date_modified', page_size),
}
}[pagination_mode].get(resource, SimplePaginator(page_size))


class CommCareHqEnv(DictEnv):
"""
An environment providing primitives for pulling from the
CommCareHq API.
"""

def __init__(self, commcare_hq_client, until=None, page_size=1000):
self.commcare_hq_client = commcare_hq_client
self.until = until
Expand All @@ -111,10 +119,10 @@ def __init__(self, commcare_hq_client, until=None, page_size=1000):

@unwrap('checkpoint_manager')
def api_data(self, resource, checkpoint_manager, payload=None, include_referenced_items=None):
if resource not in resource_since_params:
raise ValueError('I do not know how to access the API resource "%s"' % resource)
if resource not in SUPPORTED_RESOURCES:
raise ValueError('Unknown API resource "%s' % resource)

paginator = get_paginator(resource, self.page_size)
paginator = get_paginator(resource, self.page_size, checkpoint_manager.pagination_mode)
paginator.init(payload, include_referenced_items, self.until)
initial_params = paginator.next_page_params_since(checkpoint_manager.since_param)
return self.commcare_hq_client.iterate(
Expand All @@ -133,9 +141,9 @@ class SimplePaginator(object):
"""
Paginate based on the 'next' URL provided in the API response.
"""
def __init__(self, resource, page_size=1000):
self.resource = resource
def __init__(self, page_size=1000, params=None):
self.page_size = page_size
self.params = params

def init(self, payload=None, include_referenced_items=None, until=None):
self.payload = dict(payload or {}) # Do not mutate passed-in dicts
Expand All @@ -146,10 +154,9 @@ def next_page_params_since(self, since=None):
params = self.payload
params['limit'] = self.page_size

resource_date_params = resource_since_params[self.resource]
if (since or self.until) and resource_date_params:
if (since or self.until) and self.params:
params.update(
resource_date_params(since, self.until)
self.params(since, self.until)
)

if self.include_referenced_items:
Expand All @@ -172,11 +179,15 @@ class DatePaginator(SimplePaginator):
This also adds an ordering parameter to ensure that the records are ordered by the date field in ascending order.
:param resource: The name of the resource being fetched: ``form`` or ``case``.
:param since_field: The name of the date field to use for pagination.
:param page_size: Number of results to request in each page
"""
def __init__(self, resource, since_field, page_size=1000):
super(DatePaginator, self).__init__(resource, page_size)

DEFAULT_PARAMS = object()

def __init__(self, since_field, page_size=1000, params=DEFAULT_PARAMS):
params = DATE_PARAMS[since_field] if params is DatePaginator.DEFAULT_PARAMS else params
super(DatePaginator, self).__init__(page_size, params)
self.since_field = since_field

def next_page_params_since(self, since=None):
Expand Down Expand Up @@ -205,8 +216,7 @@ def get_since_date(self, batch):
since = last_obj.get(self.since_field)

if since:
for fmt in ('%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ'):
try:
return datetime.strptime(since, fmt)
except ValueError:
pass
try:
return parse(since, ignoretz=True) # ignoretz since we assume utc, and use naive datetimes everywhere
except ParserError:
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Add pagination_mode to checkpoint
Revision ID: 6f158d161ab6
Revises: a56c82a8d02e
Create Date: 2021-01-25 15:13:45.996453
"""
from alembic import op
import sqlalchemy as sa


revision = '6f158d161ab6'
down_revision = 'a56c82a8d02e'
branch_labels = None
depends_on = None



def upgrade():
url = op.get_bind().engine.url
collation = 'utf8_bin' if 'mysql' in url.drivername else None
op.add_column(
'commcare_export_runs',
sa.Column('pagination_mode', sa.Unicode(255, collation=collation))
)

def downgrade():
op.drop_column('commcare_export_runs', 'pagination_mode')
Loading

0 comments on commit 9f82c71

Please sign in to comment.