diff --git a/CHANGES.rst b/CHANGES.rst index 3f26d7e..d2f4111 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -9,6 +9,14 @@ Changes ======= +Version 0.6.0 (2021-04-23) +-------------------------- + +- Changed the structure of the control table in dynamodb +- Fix endpoint to get cube status (`#137 `_) +- Fix reprocess tiles with force parameter (`#128 `_) + + Version 0.4.0 (2021-04-13) -------------------------- diff --git a/cube_builder_aws/cube_builder_aws/controller.py b/cube_builder_aws/cube_builder_aws/controller.py index 9cbe180..494ff2d 100644 --- a/cube_builder_aws/cube_builder_aws/controller.py +++ b/cube_builder_aws/cube_builder_aws/controller.py @@ -548,15 +548,21 @@ def start_process(self, params): # prepare merge crs = cube_infos.grs.crs formatted_version = format_version(cube_infos.version) - prepare_merge(self, cube_infos.name, cube_infos_irregular.name, collections, satellite, + not_started = prepare_merge(self, cube_infos.name, cube_infos_irregular.name, collections, satellite, bands_list, bands_ids_list, bands_ql_list, float(bands[0].resolution_x), float(bands[0].resolution_y), int(bands[0].nodata), crs, quality_band, functions, formatted_version, params.get('force', False), mask, bands_expressions=bands_expressions, indexes_only_regular_cube=params.get('indexes_only_regular_cube')) + if len(not_started): + return dict( + message='Some scenes have not been started! If necessary, use the force parameter.', + scenes_not_started=not_started + ), 200 + return dict( message='Processing started with succesfully' - ), 201 + ), 200 @classmethod def create_grs_schema(cls, name, description, projection, meridian, degreesx, degreesy, bbox, srid=SRID_ALBERS_EQUAL_AREA): diff --git a/cube_builder_aws/cube_builder_aws/maestro.py b/cube_builder_aws/cube_builder_aws/maestro.py index 893b2e8..acd9d01 100644 --- a/cube_builder_aws/cube_builder_aws/maestro.py +++ b/cube_builder_aws/cube_builder_aws/maestro.py @@ -217,6 +217,8 @@ def prepare_merge(self, datacube, irregular_datacube, datasets, satellite, bands logger.info('prepare merge - Score {} items'.format(self.score['items'])) + scenes_not_started = [] + # For all tiles for tile_name in self.score['items']: activity['tileid'] = tile_name @@ -242,11 +244,20 @@ def prepare_merge(self, datacube, irregular_datacube, datasets, satellite, bands activity['end'] = activity['end'].strftime('%Y-%m-%d') # When force is True, we must rebuild the merge - if force: + publish_control_key = 'publish{}{}'.format(activity['datacube'], encode_key(activity, ['tileid', 'start', 'end'])) + if not force: + response = services.get_activity_item({'id': publish_control_key, 'sk': 'ALLBANDS' }) + if 'Item' in response and response['Item']['mystatus'] == 'DONE': + scenes_not_started.append(f'{activity["tileid"]}_{activity["start"]}_{activity["end"]}') + continue + else: merge_control_key = encode_key(activity, ['action', 'irregular_datacube', 'tileid', 'start', 'end']) - blend_control_key = 'blend{}_{}'.format(activity['datacube'], encode_key(activity, ['tileid', 'start', 'end'])) + blend_control_key = 'blend{}{}'.format(activity['datacube'], encode_key(activity, ['tileid', 'start', 'end'])) + posblend_control_key = 'posblend{}{}'.format(activity['datacube'], encode_key(activity, ['tileid', 'start', 'end'])) self.services.remove_control_by_key(merge_control_key) self.services.remove_control_by_key(blend_control_key) + self.services.remove_control_by_key(posblend_control_key) + self.services.remove_control_by_key(publish_control_key) self.score['items'][tile_name]['periods'][periodkey]['scenes'] = services.search_STAC(activity) @@ -319,6 +330,8 @@ def prepare_merge(self, datacube, irregular_datacube, datasets, satellite, bands and services.s3_file_exists(key=activity['ARDfile']): next_step(services, activity) continue + else: + services.remove_activity_by_key(activity['dynamoKey'], activity['sk']) # Re-schedule a merge-warped activity['mystatus'] = 'NOTDONE' @@ -331,6 +344,7 @@ def prepare_merge(self, datacube, irregular_datacube, datasets, satellite, bands services.put_item_kinesis(activity) services.send_to_sqs(activity) + return scenes_not_started def merge_warped(self, activity): logger.info('==> start MERGE') @@ -549,7 +563,7 @@ def next_blend(services, mergeactivity): blendactivity['scenes'] = {} mergeactivity['band'] = mergeactivity['quality_band'] blendactivity['band'] = mergeactivity['quality_band'] - _ = fill_blend(services, mergeactivity, blendactivity) + status = fill_blend(services, mergeactivity, blendactivity) # Reset mycount in activitiesControlTable activitiesControlTableKey = blendactivity['dynamoKey'] @@ -557,12 +571,12 @@ def next_blend(services, mergeactivity): datetime.now().strftime('%Y-%m-%d %H:%M:%S')) # If no quality file was found for this tile/period, register it in DynamoDB and go on - if blendactivity['instancesToBeDone'] == 0: + if not status or blendactivity['instancesToBeDone'] == 0: blendactivity['sk'] = 'ALLBANDS' blendactivity['mystatus'] = 'ERROR' blendactivity['errors'] = dict( step='next_blend', - message='not quality file was found for this tile/period' + message='not all merges were found for this tile/period' ) blendactivity['mystart'] = 'SSSS-SS-SS' blendactivity['myend'] = 'EEEE-EE-EE' @@ -587,22 +601,23 @@ def next_blend(services, mergeactivity): # Check if we are doing it again and if we have to do it because a different number of ARDfiles is present response = services.get_activity_item( - {'id': blendactivity['dynamoKey'], 'sk': internal_band if internal_band else band }) - - if 'Item' in response \ - and response['Item']['mystatus'] == 'DONE' \ - and response['Item']['instancesToBeDone'] == blendactivity['instancesToBeDone']: + {'id': blendactivity['dynamoKey'], 'sk': blendactivity['sk'] }) + if 'Item' in response: exists = True for func in blendactivity['functions']: if func == 'IDT' or (func == 'MED' and internal_band == 'PROVENANCE'): continue if not services.s3_file_exists(bucket_name=mergeactivity['bucket_name'], key=blendactivity['{}file'.format(func)]): exists = False - if not blendactivity.get('force') and exists: - blendactivity['mystatus'] = 'DONE' - next_step(services, blendactivity) + if not blendactivity.get('force') \ + and response['Item']['mystatus'] == 'DONE' \ + and response['Item']['instancesToBeDone'] == blendactivity['instancesToBeDone'] \ + and exists: + next_step(services, json.loads(response['Item']['activity'])) continue + else: + services.remove_activity_by_key(blendactivity['dynamoKey'], blendactivity['sk']) # Blend has not been performed, do it blendactivity['mystatus'] = 'NOTDONE' @@ -687,7 +702,7 @@ def blend(self, activity): band = activity['band'] numscenes = len(activity['scenes']) - + nodata = int(activity.get('nodata', -9999)) if band == activity['quality_band']: nodata = activity_mask['nodata'] @@ -1049,6 +1064,16 @@ def next_posblend(services, blendactivity): posblendactivity['cloudratio'] = '100' posblendactivity['mylaunch'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + response = services.get_activity_item({'id': posblendactivity['dynamoKey'], 'sk': posblendactivity['sk']}) + if 'Item' in response: + if not posblendactivity.get('force') \ + and response['Item']['mystatus'] == 'DONE' \ + and response['Item']['instancesToBeDone'] == blendactivity['instancesToBeDone']: + next_step(services, json.loads(response['Item']['activity'])) + continue + else: + services.remove_activity_by_key(posblendactivity['dynamoKey'], posblendactivity['sk']) + if i == 'IDT': dates_refs = activity['scenes'].keys() for date_ref in dates_refs: @@ -1144,7 +1169,7 @@ def next_publish(services, posblendactivity): for key in ['datacube','bands','bands_ids','quicklook','tileid','start','end', \ 'dirname', 'cloudratio', 'bucket_name', 'quality_band', 'internal_bands', \ 'functions', 'indexesToBe', 'version', 'irregular_datacube', \ - 'indexes_only_regular_cube']: + 'indexes_only_regular_cube', 'force']: publishactivity[key] = posblendactivity.get(key) publishactivity['action'] = 'publish' @@ -1232,6 +1257,14 @@ def next_publish(services, posblendactivity): services.put_control_table(activitiesControlTableKey, 0, publishactivity['totalInstancesToBeDone'], datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + response = services.get_activity_item({'id': publishactivity['dynamoKey'], 'sk': 'ALLBANDS'}) + if 'Item' in response: + if not publishactivity.get('force') and response['Item']['mystatus'] == 'DONE': + next_step(services, json.loads(response['Item']['activity'])) + return + else: + services.remove_activity_by_key(publishactivity['dynamoKey'], 'ALLBANDS') + # Launch activity services.put_item_kinesis(publishactivity) services.send_to_sqs(publishactivity) diff --git a/cube_builder_aws/cube_builder_aws/services.py b/cube_builder_aws/cube_builder_aws/services.py index 1675566..4cbf725 100644 --- a/cube_builder_aws/cube_builder_aws/services.py +++ b/cube_builder_aws/cube_builder_aws/services.py @@ -272,6 +272,15 @@ def remove_process_by_key(self, key: str): except: return False + def remove_activity_by_key(self, key: str, sk: str): + try: + self.activitiesTable.delete_item( + Key=dict(id=key, sk=sk) + ) + return True + except: + return False + def update_control_table(self, Key, UpdateExpression, ExpressionAttributeNames, ExpressionAttributeValues, ReturnValues): return self.activitiesControlTable.update_item( Key=Key, diff --git a/cube_builder_aws/cube_builder_aws/version.py b/cube_builder_aws/cube_builder_aws/version.py index 60d3b03..73d7971 100644 --- a/cube_builder_aws/cube_builder_aws/version.py +++ b/cube_builder_aws/cube_builder_aws/version.py @@ -6,4 +6,4 @@ # under the terms of the MIT License; see LICENSE file for more details. # -__version__ = '0.4.0' +__version__ = '0.6.0' diff --git a/cube_builder_aws/cube_builder_aws/views.py b/cube_builder_aws/cube_builder_aws/views.py index 60a3ed2..0edb147 100644 --- a/cube_builder_aws/cube_builder_aws/views.py +++ b/cube_builder_aws/cube_builder_aws/views.py @@ -34,7 +34,6 @@ def status(): ), 200 -# Cube Metadata @bp.route("/cube-status", methods=["GET"]) def get_status(): """Retrieve the cube processing state, which refers to total items and total to be done."""