diff --git a/data_processors/const.py b/data_processors/const.py index 166aeed2..30bfbc33 100644 --- a/data_processors/const.py +++ b/data_processors/const.py @@ -26,8 +26,15 @@ class ReportHelper(ABC): "dragen_tso_ctdna", ] - # Operational limit for decompressed report json data size ~10MB - MAX_DECOMPRESSED_REPORT_SIZE_IN_BYTES = 11000000 + # FIXME Report table growth rate is quite progressive; this observes over a year worth of run + # since we started report data ingesting pipeline. See a quick db stats snapshot at + # https://github.com/umccr/data-portal-apis/issues/143 + # Initially, we started ingest report data size < 150MB; then quickly reduced to 10MB. And now 1MB. + # So I propose, this merit its own Report data warehousing ETL pipeline with a better fitted solution for the need. + # -victor, on 20220421 + # + # Operational limit for decompressed report json data size ~1MB + MAX_DECOMPRESSED_REPORT_SIZE_IN_BYTES = 1100000 SQS_REPORT_EVENT_QUEUE_ARN = "/data_portal/backend/sqs_report_event_queue_arn" diff --git a/data_processors/s3/lambdas/s3_event.py b/data_processors/s3/lambdas/s3_event.py index 096cfc0d..50d5bca1 100644 --- a/data_processors/s3/lambdas/s3_event.py +++ b/data_processors/s3/lambdas/s3_event.py @@ -20,7 +20,7 @@ from data_processors.s3 import services from data_processors.const import ReportHelper, S3EventRecord -from aws_xray_sdk.core import xray_recorder +# from aws_xray_sdk.core import xray_recorder logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -38,25 +38,27 @@ def handler(event: dict, context) -> Union[bool, Dict[str, int]]: logger.info("Start processing S3 event") logger.info(libjson.dumps(event)) - with xray_recorder.in_subsegment("S3_EVENT_RECORDS_TRACE") as subsegment: - messages = event['Records'] + # subsegment = xray_recorder.begin_subsegment("S3_EVENT_RECORDS_TRACE") - event_records_dict = parse_raw_s3_event_records(messages) + messages = event['Records'] - s3_event_records = event_records_dict['s3_event_records'] - report_event_records = event_records_dict['report_event_records'] + event_records_dict = parse_raw_s3_event_records(messages) - subsegment.put_metadata('total', len(s3_event_records), 's3_event_records') - subsegment.put_metadata('records', s3_event_records, 's3_event_records') + s3_event_records = event_records_dict['s3_event_records'] + report_event_records = event_records_dict['report_event_records'] - subsegment.put_metadata('total', len(report_event_records), 'report_event_records') - subsegment.put_metadata('records', report_event_records, 'report_event_records') + # subsegment.put_metadata('total', len(s3_event_records), 's3_event_records') + # subsegment.put_metadata('records', s3_event_records, 's3_event_records') + # subsegment.put_metadata('total', len(report_event_records), 'report_event_records') + # subsegment.put_metadata('records', report_event_records, 'report_event_records') - results = services.sync_s3_event_records(s3_event_records) + results = services.sync_s3_event_records(s3_event_records) - if report_event_records: - queue_arn = libssm.get_ssm_param(ReportHelper.SQS_REPORT_EVENT_QUEUE_ARN) - libsqs.dispatch_jobs(queue_arn=queue_arn, job_list=report_event_records, fifo=False) + if report_event_records: + queue_arn = libssm.get_ssm_param(ReportHelper.SQS_REPORT_EVENT_QUEUE_ARN) + libsqs.dispatch_jobs(queue_arn=queue_arn, job_list=report_event_records, fifo=False) + + # xray_recorder.end_subsegment() logger.info("S3 event processing complete") diff --git a/data_processors/s3/services.py b/data_processors/s3/services.py index 9aa4a3f4..7edc620e 100644 --- a/data_processors/s3/services.py +++ b/data_processors/s3/services.py @@ -14,10 +14,10 @@ from django.core.exceptions import ObjectDoesNotExist from django.db import transaction -from django.db.models import ExpressionWrapper, Value, CharField, Q, F +# from django.db.models import ExpressionWrapper, Value, CharField, Q, F FIXME to be removed when refactoring #343 from libumccr.aws import libs3 -from data_portal.models.limsrow import LIMSRow, S3LIMS +# from data_portal.models.limsrow import LIMSRow, S3LIMS FIXME to be removed when refactoring #343 from data_portal.models.s3object import S3Object from data_processors.const import S3EventRecord @@ -109,35 +109,40 @@ def persist_s3_object(bucket: str, key: str, last_modified_date: datetime, size: return 0, 0 # TODO remove association logic and drop S3LIMS table, related with global search overhaul + # see https://github.com/umccr/data-portal-apis/issues/343 # Number of s3-lims association records we have created in this run new_association_count = 0 - # Find all related LIMS rows and associate them - # Credit: https://stackoverflow.com/questions/49622088/django-filtering-queryset-by-parameter-has-part-of-fields-value - # If the linking columns have changed, we need to modify - key_param = ExpressionWrapper(Value(key), output_field=CharField()) - - # For each attr (values), s3 object key should contain it - attr_filter = Q() - # AND all filters - for attr in LIMSRow.S3_LINK_ATTRS: - attr_filter &= Q(param__contains=F(attr)) - - lims_rows = LIMSRow.objects.annotate(param=key_param).filter(attr_filter) - lims_row: LIMSRow - for lims_row in lims_rows: - # Create association if not exist - if not S3LIMS.objects.filter(s3_object=s3_object, lims_row=lims_row).exists(): - logger.info(f"Linking the S3Object ({str(s3_object)}) with LIMSRow ({str(lims_row)})") - - association = S3LIMS(s3_object=s3_object, lims_row=lims_row) - association.save() - - new_association_count += 1 - - # Check if we do find any association at all or not - if len(lims_rows) == 0: - logger.debug(f"No association to any LIMS row is found for the S3Object (bucket={bucket}, key={key})") + # FIXME quick patch fix, permanently remove these when refactoring #343 in next iteration + # commented out the following association link due to performance issue upon S3 object Update events + # see https://github.com/umccr/data-portal-apis/issues/143 + # + # # Find all related LIMS rows and associate them + # # Credit: https://stackoverflow.com/questions/49622088/django-filtering-queryset-by-parameter-has-part-of-fields-value + # # If the linking columns have changed, we need to modify + # key_param = ExpressionWrapper(Value(key), output_field=CharField()) + # + # # For each attr (values), s3 object key should contain it + # attr_filter = Q() + # # AND all filters + # for attr in LIMSRow.S3_LINK_ATTRS: + # attr_filter &= Q(param__contains=F(attr)) + # + # lims_rows = LIMSRow.objects.annotate(param=key_param).filter(attr_filter) + # lims_row: LIMSRow + # for lims_row in lims_rows: + # # Create association if not exist + # if not S3LIMS.objects.filter(s3_object=s3_object, lims_row=lims_row).exists(): + # logger.info(f"Linking the S3Object ({str(s3_object)}) with LIMSRow ({str(lims_row)})") + # + # association = S3LIMS(s3_object=s3_object, lims_row=lims_row) + # association.save() + # + # new_association_count += 1 + # + # # Check if we do find any association at all or not + # if len(lims_rows) == 0: + # logger.debug(f"No association to any LIMS row is found for the S3Object (bucket={bucket}, key={key})") return 1, new_association_count @@ -152,9 +157,16 @@ def delete_s3_object(bucket_name: str, key: str) -> Tuple[int, int]: """ try: s3_object: S3Object = S3Object.objects.get(bucket=bucket_name, key=key) - s3_lims_records = S3LIMS.objects.filter(s3_object=s3_object) - s3_lims_count = s3_lims_records.count() - s3_lims_records.delete() + + # FIXME quick patch fix, permanently remove these when refactoring #343 in next iteration + # commented out the following de-association link due to performance issue upon S3 object Delete events + # see https://github.com/umccr/data-portal-apis/issues/143 + # + # s3_lims_records = S3LIMS.objects.filter(s3_object=s3_object) + # s3_lims_count = s3_lims_records.count() + # s3_lims_records.delete() + s3_lims_count = 0 + s3_object.delete() logger.info(f"Deleted S3Object: s3://{bucket_name}/{key}") return 1, s3_lims_count diff --git a/data_processors/s3/tests/test_s3_event.py b/data_processors/s3/tests/test_s3_event.py index 785c6c64..e5af65b0 100644 --- a/data_processors/s3/tests/test_s3_event.py +++ b/data_processors/s3/tests/test_s3_event.py @@ -148,11 +148,11 @@ def test_handler(self) -> None: self.assertEqual(results['removed_count'], 1) # We should expect the existing association removed as well - self.assertEqual(results['s3_lims_removed_count'], 1) + # self.assertEqual(results['s3_lims_removed_count'], 1) FIXME to be removed when refactoring #343 self.assertEqual(results['created_count'], 1) # We should expect the new association created as well - self.assertEqual(results['s3_lims_created_count'], 1) + # self.assertEqual(results['s3_lims_created_count'], 1) FIXME to be removed when refactoring #343 self.assertEqual(results['unsupported_count'], 0) def test_delete_non_existent_s3_object(self):