Skip to content

Commit

Permalink
Updated to address S3 event processing performance issue (#143)
Browse files Browse the repository at this point in the history
* Deactivated association links between S3 and LIMSRow
  i.e. graceful first-pass for #343 refactor
* Further reduced Report data size ingest limit to 1MB
* Commented out X-Ray recorder so that it does not even
  ping to AWS instrumentation endpoint for init boot
  i.e. "radio silent" mode that'd save few (sub) milliseconds
  • Loading branch information
victorskl committed Apr 21, 2022
1 parent 64aeca9 commit a17678f
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 49 deletions.
11 changes: 9 additions & 2 deletions data_processors/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@ class ReportHelper(ABC):
"dragen_tso_ctdna",
]

# Operational limit for decompressed report json data size ~10MB
MAX_DECOMPRESSED_REPORT_SIZE_IN_BYTES = 11000000
# FIXME Report table growth rate is quite progressive; this observes over a year worth of run
# since we started report data ingesting pipeline. See a quick db stats snapshot at
# https://github.com/umccr/data-portal-apis/issues/143
# Initially, we started ingest report data size < 150MB; then quickly reduced to 10MB. And now 1MB.
# So I propose, this merit its own Report data warehousing ETL pipeline with a better fitted solution for the need.
# -victor, on 20220421
#
# Operational limit for decompressed report json data size ~1MB
MAX_DECOMPRESSED_REPORT_SIZE_IN_BYTES = 1100000

SQS_REPORT_EVENT_QUEUE_ARN = "/data_portal/backend/sqs_report_event_queue_arn"

Expand Down
30 changes: 16 additions & 14 deletions data_processors/s3/lambdas/s3_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from data_processors.s3 import services
from data_processors.const import ReportHelper, S3EventRecord

from aws_xray_sdk.core import xray_recorder
# from aws_xray_sdk.core import xray_recorder

logger = logging.getLogger()
logger.setLevel(logging.INFO)
Expand All @@ -38,25 +38,27 @@ def handler(event: dict, context) -> Union[bool, Dict[str, int]]:
logger.info("Start processing S3 event")
logger.info(libjson.dumps(event))

with xray_recorder.in_subsegment("S3_EVENT_RECORDS_TRACE") as subsegment:
messages = event['Records']
# subsegment = xray_recorder.begin_subsegment("S3_EVENT_RECORDS_TRACE")

event_records_dict = parse_raw_s3_event_records(messages)
messages = event['Records']

s3_event_records = event_records_dict['s3_event_records']
report_event_records = event_records_dict['report_event_records']
event_records_dict = parse_raw_s3_event_records(messages)

subsegment.put_metadata('total', len(s3_event_records), 's3_event_records')
subsegment.put_metadata('records', s3_event_records, 's3_event_records')
s3_event_records = event_records_dict['s3_event_records']
report_event_records = event_records_dict['report_event_records']

subsegment.put_metadata('total', len(report_event_records), 'report_event_records')
subsegment.put_metadata('records', report_event_records, 'report_event_records')
# subsegment.put_metadata('total', len(s3_event_records), 's3_event_records')
# subsegment.put_metadata('records', s3_event_records, 's3_event_records')
# subsegment.put_metadata('total', len(report_event_records), 'report_event_records')
# subsegment.put_metadata('records', report_event_records, 'report_event_records')

results = services.sync_s3_event_records(s3_event_records)
results = services.sync_s3_event_records(s3_event_records)

if report_event_records:
queue_arn = libssm.get_ssm_param(ReportHelper.SQS_REPORT_EVENT_QUEUE_ARN)
libsqs.dispatch_jobs(queue_arn=queue_arn, job_list=report_event_records, fifo=False)
if report_event_records:
queue_arn = libssm.get_ssm_param(ReportHelper.SQS_REPORT_EVENT_QUEUE_ARN)
libsqs.dispatch_jobs(queue_arn=queue_arn, job_list=report_event_records, fifo=False)

# xray_recorder.end_subsegment()

logger.info("S3 event processing complete")

Expand Down
74 changes: 43 additions & 31 deletions data_processors/s3/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.db.models import ExpressionWrapper, Value, CharField, Q, F
# from django.db.models import ExpressionWrapper, Value, CharField, Q, F FIXME to be removed when refactoring #343
from libumccr.aws import libs3

from data_portal.models.limsrow import LIMSRow, S3LIMS
# from data_portal.models.limsrow import LIMSRow, S3LIMS FIXME to be removed when refactoring #343
from data_portal.models.s3object import S3Object
from data_processors.const import S3EventRecord

Expand Down Expand Up @@ -109,35 +109,40 @@ def persist_s3_object(bucket: str, key: str, last_modified_date: datetime, size:
return 0, 0

# TODO remove association logic and drop S3LIMS table, related with global search overhaul
# see https://github.com/umccr/data-portal-apis/issues/343
# Number of s3-lims association records we have created in this run
new_association_count = 0

# Find all related LIMS rows and associate them
# Credit: https://stackoverflow.com/questions/49622088/django-filtering-queryset-by-parameter-has-part-of-fields-value
# If the linking columns have changed, we need to modify
key_param = ExpressionWrapper(Value(key), output_field=CharField())

# For each attr (values), s3 object key should contain it
attr_filter = Q()
# AND all filters
for attr in LIMSRow.S3_LINK_ATTRS:
attr_filter &= Q(param__contains=F(attr))

lims_rows = LIMSRow.objects.annotate(param=key_param).filter(attr_filter)
lims_row: LIMSRow
for lims_row in lims_rows:
# Create association if not exist
if not S3LIMS.objects.filter(s3_object=s3_object, lims_row=lims_row).exists():
logger.info(f"Linking the S3Object ({str(s3_object)}) with LIMSRow ({str(lims_row)})")

association = S3LIMS(s3_object=s3_object, lims_row=lims_row)
association.save()

new_association_count += 1

# Check if we do find any association at all or not
if len(lims_rows) == 0:
logger.debug(f"No association to any LIMS row is found for the S3Object (bucket={bucket}, key={key})")
# FIXME quick patch fix, permanently remove these when refactoring #343 in next iteration
# commented out the following association link due to performance issue upon S3 object Update events
# see https://github.com/umccr/data-portal-apis/issues/143
#
# # Find all related LIMS rows and associate them
# # Credit: https://stackoverflow.com/questions/49622088/django-filtering-queryset-by-parameter-has-part-of-fields-value
# # If the linking columns have changed, we need to modify
# key_param = ExpressionWrapper(Value(key), output_field=CharField())
#
# # For each attr (values), s3 object key should contain it
# attr_filter = Q()
# # AND all filters
# for attr in LIMSRow.S3_LINK_ATTRS:
# attr_filter &= Q(param__contains=F(attr))
#
# lims_rows = LIMSRow.objects.annotate(param=key_param).filter(attr_filter)
# lims_row: LIMSRow
# for lims_row in lims_rows:
# # Create association if not exist
# if not S3LIMS.objects.filter(s3_object=s3_object, lims_row=lims_row).exists():
# logger.info(f"Linking the S3Object ({str(s3_object)}) with LIMSRow ({str(lims_row)})")
#
# association = S3LIMS(s3_object=s3_object, lims_row=lims_row)
# association.save()
#
# new_association_count += 1
#
# # Check if we do find any association at all or not
# if len(lims_rows) == 0:
# logger.debug(f"No association to any LIMS row is found for the S3Object (bucket={bucket}, key={key})")

return 1, new_association_count

Expand All @@ -152,9 +157,16 @@ def delete_s3_object(bucket_name: str, key: str) -> Tuple[int, int]:
"""
try:
s3_object: S3Object = S3Object.objects.get(bucket=bucket_name, key=key)
s3_lims_records = S3LIMS.objects.filter(s3_object=s3_object)
s3_lims_count = s3_lims_records.count()
s3_lims_records.delete()

# FIXME quick patch fix, permanently remove these when refactoring #343 in next iteration
# commented out the following de-association link due to performance issue upon S3 object Delete events
# see https://github.com/umccr/data-portal-apis/issues/143
#
# s3_lims_records = S3LIMS.objects.filter(s3_object=s3_object)
# s3_lims_count = s3_lims_records.count()
# s3_lims_records.delete()
s3_lims_count = 0

s3_object.delete()
logger.info(f"Deleted S3Object: s3://{bucket_name}/{key}")
return 1, s3_lims_count
Expand Down
4 changes: 2 additions & 2 deletions data_processors/s3/tests/test_s3_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ def test_handler(self) -> None:

self.assertEqual(results['removed_count'], 1)
# We should expect the existing association removed as well
self.assertEqual(results['s3_lims_removed_count'], 1)
# self.assertEqual(results['s3_lims_removed_count'], 1) FIXME to be removed when refactoring #343

self.assertEqual(results['created_count'], 1)
# We should expect the new association created as well
self.assertEqual(results['s3_lims_created_count'], 1)
# self.assertEqual(results['s3_lims_created_count'], 1) FIXME to be removed when refactoring #343
self.assertEqual(results['unsupported_count'], 0)

def test_delete_non_existent_s3_object(self):
Expand Down

0 comments on commit a17678f

Please sign in to comment.