Skip to content

Commit

Permalink
fix(recap): Added support for processing attachment pages in doppelgä…
Browse files Browse the repository at this point in the history
…nger cases
  • Loading branch information
albertisfu committed Dec 4, 2024
1 parent 32fd828 commit e532157
Show file tree
Hide file tree
Showing 3 changed files with 415 additions and 42 deletions.
69 changes: 52 additions & 17 deletions cl/recap/mergers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,54 @@ async def clean_duplicate_attachment_entries(
await duplicate_rd_queryset.exclude(pk=keep_rd.pk).adelete()


async def look_for_doppelganger_rds(
court: Court, pq: ProcessingQueue, pacer_doc_id: int, text: str
) -> list[ProcessingQueue]:
"""Identify and process potential RECAPDocuments with the same pacer_doc_id
in the court that likely belong to a doppelgänger case.
Return a list of ProcessingQueue instances for processing them.
:param court: The court associated with the PACER document.
:param pq: The original processing queue object.
:param pacer_doc_id: The PACER document ID to match against.
:param text: The attachment page text.
:return: A list of ProcessingQueue objects to process.
"""
main_rds = (
RECAPDocument.objects.select_related("docket_entry__docket")
.filter(
pacer_doc_id=pacer_doc_id,
docket_entry__docket__court=court,
)
.order_by("docket_entry__docket__pacer_case_id")
.distinct("docket_entry__docket__pacer_case_id")
.only(
"pacer_doc_id",
"docket_entry__docket__pacer_case_id",
"docket_entry__docket__court_id",
)
)
pqs_to_process = [pq] # Add the original pq to the list of pqs to process
original_file_content = text.encode("utf-8")
original_file_name = pq.filepath_local.name
async for main_rd in main_rds:
main_pacer_case_id = main_rd.docket_entry.docket.pacer_case_id
if main_pacer_case_id != pq.pacer_case_id:
# Create additional pqs for each doppelgänger case found.
pq_created = await ProcessingQueue.objects.acreate(
uploader_id=pq.uploader_id,
pacer_doc_id=pacer_doc_id,
pacer_case_id=main_pacer_case_id,
court_id=court.pk,
upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE,
filepath_local=ContentFile(
original_file_content, name=original_file_name
),
)
pqs_to_process.append(pq_created)
return pqs_to_process


async def merge_attachment_page_data(
court: Court,
pacer_case_id: int,
Expand Down Expand Up @@ -1658,23 +1706,10 @@ async def merge_attachment_page_data(
.afirst()
)
else:
try:
main_rd = await RECAPDocument.objects.select_related(
"docket_entry", "docket_entry__docket"
).aget(**params)
except RECAPDocument.DoesNotExist as exc:
# In cases where we have "doppelgänger" dockets drop pacer
# case id and check if the docket exists once more.
if params.get("docket_entry__docket__pacer_case_id"):
retry_params = params.copy()
retry_params.pop(
"docket_entry__docket__pacer_case_id", None
)
main_rd = await RECAPDocument.objects.select_related(
"docket_entry", "docket_entry__docket"
).aget(**retry_params)
else:
raise exc
main_rd = await RECAPDocument.objects.select_related(
"docket_entry", "docket_entry__docket"
).aget(**params)

except RECAPDocument.MultipleObjectsReturned as exc:
if pacer_case_id:
duplicate_rd_queryset = RECAPDocument.objects.filter(**params)
Expand Down
81 changes: 56 additions & 25 deletions cl/recap/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
find_docket_object,
get_data_from_appellate_att_report,
get_data_from_att_report,
look_for_doppelganger_rds,
merge_attachment_page_data,
merge_pacer_docket_into_cl_docket,
process_orphan_documents,
Expand Down Expand Up @@ -115,7 +116,7 @@ async def process_recap_upload(pq: ProcessingQueue) -> None:
docket = await process_recap_docket(pq.pk)
await sync_to_async(add_or_update_recap_docket.delay)(docket)
elif pq.upload_type == UPLOAD_TYPE.ATTACHMENT_PAGE:
await process_recap_attachment(pq.pk)
await look_for_doppelganger_rds_and_process_recap_attachment(pq.pk)
elif pq.upload_type == UPLOAD_TYPE.PDF:
await process_recap_pdf(pq.pk)
elif pq.upload_type == UPLOAD_TYPE.DOCKET_HISTORY_REPORT:
Expand Down Expand Up @@ -657,14 +658,62 @@ async def process_recap_docket(pk):
}


async def get_att_data_from_pq(
pq: ProcessingQueue,
) -> tuple[ProcessingQueue | None, dict | None, str | None]:
"""Extract attachment data from a ProcessingQueue object.
:param pq: The ProcessingQueue object.
:return: A tuple containing the updated pq, att_data, and text.
"""
try:
with pq.filepath_local.open("r") as file:
text = file.read().decode()
except IOError as exc:
msg = f"Internal processing error ({exc.errno}: {exc.strerror})."
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None, None, None

att_data = get_data_from_att_report(text, pq.court_id)
if not att_data:
msg = "Not a valid attachment page upload."
await mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT)
return None, None, None

if pq.pacer_case_id in ["undefined", "null"]:
pq.pacer_case_id = att_data.get("pacer_case_id")
await pq.asave()

return pq, att_data, text


async def look_for_doppelganger_rds_and_process_recap_attachment(
pk: int,
) -> None:
"""Look for doppelgänger RECAPDocuments and process the corresponding
attachment page for each RECAPDocument.
:param pk: Primary key of the processing queue item.
:return: None
"""

pq = await ProcessingQueue.objects.aget(pk=pk)
court = await Court.objects.aget(id=pq.court_id)
pq, att_data, text = await get_att_data_from_pq(pq)
pqs_to_process = await look_for_doppelganger_rds(
court, pq, att_data["pacer_doc_id"], text
)
for pq in pqs_to_process:
await process_recap_attachment(pq.pk)


async def process_recap_attachment(
pk: int,
tag_names: Optional[List[str]] = None,
document_number: int | None = None,
) -> Optional[Tuple[int, str, list[RECAPDocument]]]:
"""Process an uploaded attachment page from the RECAP API endpoint.
:param self: The Celery task
:param pk: The primary key of the processing queue item you want to work on
:param tag_names: A list of tag names to add to all items created or
modified in this function.
Expand All @@ -678,30 +727,11 @@ async def process_recap_attachment(
await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS)
logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}")

try:
text = pq.filepath_local.read().decode()
except IOError as exc:
msg = f"Internal processing error ({exc.errno}: {exc.strerror})."
pq_status, msg = await mark_pq_status(
pq, msg, PROCESSING_STATUS.FAILED
)
return pq_status, msg, []

att_data = get_data_from_att_report(text, pq.court_id)
logger.info(f"Parsing completed for item {pq}")

if att_data == {}:
# Bad attachment page.
msg = "Not a valid attachment page upload."
pq_status, msg = await mark_pq_status(
pq, msg, PROCESSING_STATUS.INVALID_CONTENT
)
return pq_status, msg, []
pq = await ProcessingQueue.objects.aget(pk=pk)
await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS)
logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}")

if pq.pacer_case_id in ["undefined", "null"]:
# Bad data from the client. Fix it with parsed data.
pq.pacer_case_id = att_data.get("pacer_case_id")
await pq.asave()
pq, att_data, text = await get_att_data_from_pq(pq)

if document_number is None:
document_number = att_data["document_number"]
Expand Down Expand Up @@ -735,6 +765,7 @@ async def process_recap_attachment(
await add_tags_to_objs(tag_names, rds_affected)
await associate_related_instances(pq, d_id=de.docket_id, de_id=de.pk)
pq_status, msg = await mark_pq_successful(pq)

return pq_status, msg, rds_affected


Expand Down
Loading

0 comments on commit e532157

Please sign in to comment.