Skip to content

Commit

Permalink
Merge pull request #139 from betonr/hotfix/reprocess
Browse files Browse the repository at this point in the history
fix reprocess steps and prepare to release 0.6.0
  • Loading branch information
betonr authored Apr 23, 2021
2 parents f5fcdfd + be58999 commit f3725a3
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 19 deletions.
8 changes: 8 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
Changes
=======

Version 0.6.0 (2021-04-23)
--------------------------

- Changed the structure of the control table in dynamodb
- Fix endpoint to get cube status (`#137 <https://github.com/brazil-data-cube/cube-builder-aws/issues/137>`_)
- Fix reprocess tiles with force parameter (`#128 <https://github.com/brazil-data-cube/cube-builder-aws/issues/128>`_)


Version 0.4.0 (2021-04-13)
--------------------------

Expand Down
10 changes: 8 additions & 2 deletions cube_builder_aws/cube_builder_aws/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,15 +548,21 @@ def start_process(self, params):
# prepare merge
crs = cube_infos.grs.crs
formatted_version = format_version(cube_infos.version)
prepare_merge(self, cube_infos.name, cube_infos_irregular.name, collections, satellite,
not_started = prepare_merge(self, cube_infos.name, cube_infos_irregular.name, collections, satellite,
bands_list, bands_ids_list, bands_ql_list, float(bands[0].resolution_x),
float(bands[0].resolution_y), int(bands[0].nodata), crs, quality_band, functions, formatted_version,
params.get('force', False), mask, bands_expressions=bands_expressions,
indexes_only_regular_cube=params.get('indexes_only_regular_cube'))

if len(not_started):
return dict(
message='Some scenes have not been started! If necessary, use the force parameter.',
scenes_not_started=not_started
), 200

return dict(
message='Processing started with succesfully'
), 201
), 200

@classmethod
def create_grs_schema(cls, name, description, projection, meridian, degreesx, degreesy, bbox, srid=SRID_ALBERS_EQUAL_AREA):
Expand Down
63 changes: 48 additions & 15 deletions cube_builder_aws/cube_builder_aws/maestro.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ def prepare_merge(self, datacube, irregular_datacube, datasets, satellite, bands

logger.info('prepare merge - Score {} items'.format(self.score['items']))

scenes_not_started = []

# For all tiles
for tile_name in self.score['items']:
activity['tileid'] = tile_name
Expand All @@ -242,11 +244,20 @@ def prepare_merge(self, datacube, irregular_datacube, datasets, satellite, bands
activity['end'] = activity['end'].strftime('%Y-%m-%d')

# When force is True, we must rebuild the merge
if force:
publish_control_key = 'publish{}{}'.format(activity['datacube'], encode_key(activity, ['tileid', 'start', 'end']))
if not force:
response = services.get_activity_item({'id': publish_control_key, 'sk': 'ALLBANDS' })
if 'Item' in response and response['Item']['mystatus'] == 'DONE':
scenes_not_started.append(f'{activity["tileid"]}_{activity["start"]}_{activity["end"]}')
continue
else:
merge_control_key = encode_key(activity, ['action', 'irregular_datacube', 'tileid', 'start', 'end'])
blend_control_key = 'blend{}_{}'.format(activity['datacube'], encode_key(activity, ['tileid', 'start', 'end']))
blend_control_key = 'blend{}{}'.format(activity['datacube'], encode_key(activity, ['tileid', 'start', 'end']))
posblend_control_key = 'posblend{}{}'.format(activity['datacube'], encode_key(activity, ['tileid', 'start', 'end']))
self.services.remove_control_by_key(merge_control_key)
self.services.remove_control_by_key(blend_control_key)
self.services.remove_control_by_key(posblend_control_key)
self.services.remove_control_by_key(publish_control_key)

self.score['items'][tile_name]['periods'][periodkey]['scenes'] = services.search_STAC(activity)

Expand Down Expand Up @@ -319,6 +330,8 @@ def prepare_merge(self, datacube, irregular_datacube, datasets, satellite, bands
and services.s3_file_exists(key=activity['ARDfile']):
next_step(services, activity)
continue
else:
services.remove_activity_by_key(activity['dynamoKey'], activity['sk'])

# Re-schedule a merge-warped
activity['mystatus'] = 'NOTDONE'
Expand All @@ -331,6 +344,7 @@ def prepare_merge(self, datacube, irregular_datacube, datasets, satellite, bands
services.put_item_kinesis(activity)
services.send_to_sqs(activity)

return scenes_not_started

def merge_warped(self, activity):
logger.info('==> start MERGE')
Expand Down Expand Up @@ -549,20 +563,20 @@ def next_blend(services, mergeactivity):
blendactivity['scenes'] = {}
mergeactivity['band'] = mergeactivity['quality_band']
blendactivity['band'] = mergeactivity['quality_band']
_ = fill_blend(services, mergeactivity, blendactivity)
status = fill_blend(services, mergeactivity, blendactivity)

# Reset mycount in activitiesControlTable
activitiesControlTableKey = blendactivity['dynamoKey']
services.put_control_table(activitiesControlTableKey, 0, blendactivity['totalInstancesToBeDone'],
datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

# If no quality file was found for this tile/period, register it in DynamoDB and go on
if blendactivity['instancesToBeDone'] == 0:
if not status or blendactivity['instancesToBeDone'] == 0:
blendactivity['sk'] = 'ALLBANDS'
blendactivity['mystatus'] = 'ERROR'
blendactivity['errors'] = dict(
step='next_blend',
message='not quality file was found for this tile/period'
message='not all merges were found for this tile/period'
)
blendactivity['mystart'] = 'SSSS-SS-SS'
blendactivity['myend'] = 'EEEE-EE-EE'
Expand All @@ -587,22 +601,23 @@ def next_blend(services, mergeactivity):

# Check if we are doing it again and if we have to do it because a different number of ARDfiles is present
response = services.get_activity_item(
{'id': blendactivity['dynamoKey'], 'sk': internal_band if internal_band else band })

if 'Item' in response \
and response['Item']['mystatus'] == 'DONE' \
and response['Item']['instancesToBeDone'] == blendactivity['instancesToBeDone']:
{'id': blendactivity['dynamoKey'], 'sk': blendactivity['sk'] })

if 'Item' in response:
exists = True
for func in blendactivity['functions']:
if func == 'IDT' or (func == 'MED' and internal_band == 'PROVENANCE'): continue
if not services.s3_file_exists(bucket_name=mergeactivity['bucket_name'], key=blendactivity['{}file'.format(func)]):
exists = False

if not blendactivity.get('force') and exists:
blendactivity['mystatus'] = 'DONE'
next_step(services, blendactivity)
if not blendactivity.get('force') \
and response['Item']['mystatus'] == 'DONE' \
and response['Item']['instancesToBeDone'] == blendactivity['instancesToBeDone'] \
and exists:
next_step(services, json.loads(response['Item']['activity']))
continue
else:
services.remove_activity_by_key(blendactivity['dynamoKey'], blendactivity['sk'])

# Blend has not been performed, do it
blendactivity['mystatus'] = 'NOTDONE'
Expand Down Expand Up @@ -687,7 +702,7 @@ def blend(self, activity):

band = activity['band']
numscenes = len(activity['scenes'])

nodata = int(activity.get('nodata', -9999))
if band == activity['quality_band']:
nodata = activity_mask['nodata']
Expand Down Expand Up @@ -1049,6 +1064,16 @@ def next_posblend(services, blendactivity):
posblendactivity['cloudratio'] = '100'
posblendactivity['mylaunch'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

response = services.get_activity_item({'id': posblendactivity['dynamoKey'], 'sk': posblendactivity['sk']})
if 'Item' in response:
if not posblendactivity.get('force') \
and response['Item']['mystatus'] == 'DONE' \
and response['Item']['instancesToBeDone'] == blendactivity['instancesToBeDone']:
next_step(services, json.loads(response['Item']['activity']))
continue
else:
services.remove_activity_by_key(posblendactivity['dynamoKey'], posblendactivity['sk'])

if i == 'IDT':
dates_refs = activity['scenes'].keys()
for date_ref in dates_refs:
Expand Down Expand Up @@ -1144,7 +1169,7 @@ def next_publish(services, posblendactivity):
for key in ['datacube','bands','bands_ids','quicklook','tileid','start','end', \
'dirname', 'cloudratio', 'bucket_name', 'quality_band', 'internal_bands', \
'functions', 'indexesToBe', 'version', 'irregular_datacube', \
'indexes_only_regular_cube']:
'indexes_only_regular_cube', 'force']:
publishactivity[key] = posblendactivity.get(key)
publishactivity['action'] = 'publish'

Expand Down Expand Up @@ -1232,6 +1257,14 @@ def next_publish(services, posblendactivity):
services.put_control_table(activitiesControlTableKey, 0, publishactivity['totalInstancesToBeDone'],
datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

response = services.get_activity_item({'id': publishactivity['dynamoKey'], 'sk': 'ALLBANDS'})
if 'Item' in response:
if not publishactivity.get('force') and response['Item']['mystatus'] == 'DONE':
next_step(services, json.loads(response['Item']['activity']))
return
else:
services.remove_activity_by_key(publishactivity['dynamoKey'], 'ALLBANDS')

# Launch activity
services.put_item_kinesis(publishactivity)
services.send_to_sqs(publishactivity)
Expand Down
9 changes: 9 additions & 0 deletions cube_builder_aws/cube_builder_aws/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,15 @@ def remove_process_by_key(self, key: str):
except:
return False

def remove_activity_by_key(self, key: str, sk: str):
try:
self.activitiesTable.delete_item(
Key=dict(id=key, sk=sk)
)
return True
except:
return False

def update_control_table(self, Key, UpdateExpression, ExpressionAttributeNames, ExpressionAttributeValues, ReturnValues):
return self.activitiesControlTable.update_item(
Key=Key,
Expand Down
2 changes: 1 addition & 1 deletion cube_builder_aws/cube_builder_aws/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
# under the terms of the MIT License; see LICENSE file for more details.
#

__version__ = '0.4.0'
__version__ = '0.6.0'
1 change: 0 additions & 1 deletion cube_builder_aws/cube_builder_aws/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def status():
), 200


# Cube Metadata
@bp.route("/cube-status", methods=["GET"])
def get_status():
"""Retrieve the cube processing state, which refers to total items and total to be done."""
Expand Down

0 comments on commit f3725a3

Please sign in to comment.