Skip to content

Commit

Permalink
[SDESK-7166] fix(ingest): Skip linking coverages that already have an…
Browse files Browse the repository at this point in the history
… assignment

Also made the hook code more robust by catching & logging errors, allowing ingested item to succeed ingest procedure
  • Loading branch information
MarkLark86 committed Feb 2, 2024
1 parent c62fd02 commit ff194a5
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 59 deletions.
2 changes: 1 addition & 1 deletion server/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ gunicorn
honcho

git+https://github.com/superdesk/[email protected]#egg=superdesk-core
git+https://github.com/superdesk/superdesk-planning.git@v2.7.0-rc5#egg=superdesk-planning
git+https://github.com/superdesk/superdesk-planning.git@release/2.7#egg=superdesk-planning
12 changes: 6 additions & 6 deletions server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ blinker==1.4
# flask-mail
# raven
# superdesk-core
boto3==1.34.27
boto3==1.34.33
# via superdesk-core
botocore==1.34.27
botocore==1.34.33
# via
# boto3
# s3transfer
Expand All @@ -42,7 +42,7 @@ cerberus==1.3.5
# via
# eve
# superdesk-core
certifi==2023.11.17
certifi==2024.2.2
# via
# elastic-apm
# elasticsearch
Expand Down Expand Up @@ -73,7 +73,7 @@ click-repl==0.3.0
# via celery
croniter==0.3.37
# via superdesk-core
cryptography==42.0.0
cryptography==42.0.2
# via
# authlib
# jwcrypto
Expand Down Expand Up @@ -218,7 +218,7 @@ python-magic==0.4.27
# via superdesk-core
python-twitter==3.5
# via superdesk-core
pytz==2023.3.post1
pytz==2024.1
# via
# babel
# celery
Expand Down Expand Up @@ -264,7 +264,7 @@ six==1.16.0
# python-dateutil
superdesk-core @ git+https://github.com/superdesk/[email protected]
# via -r requirements.in
superdesk-planning @ git+https://github.com/superdesk/superdesk-planning.git@v2.7.0-rc5
superdesk-planning @ git+https://github.com/superdesk/superdesk-planning.git@release/2.7
# via -r requirements.in
typing-extensions==4.9.0
# via superdesk-core
Expand Down
77 changes: 56 additions & 21 deletions server/stt/signal_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def link_coverages_to_content(_sender: Any, item: Dict[str, Any], original: Opti
if coverage["flags"]["placeholder"] is True:
# This is a placeholder coverage, and will never be attached to content
continue
elif coverage["assigned_to"]["assignment_id"] is not None:
# This coverage is already linked to an Assignment, no need to continue
continue
except (KeyError, TypeError):
pass

Expand Down Expand Up @@ -90,14 +93,30 @@ def link_coverages_to_content(_sender: Any, item: Dict[str, Any], original: Opti
return

# Update the planning item with the latest Assignment information, and link the coverages to the content
updated_item = planning_service.patch(planning_id, updates)
try:
updated_item = planning_service.patch(planning_id, updates)
except Exception as err:
logger.exception(err)
logger.error("Failed to update planning with newly linked coverages")
return

for coverage in updated_item.get("coverages") or []:
coverage_id = coverage.get("coverage_id")
assignment_id = (coverage.get("assigned_to") or {}).get("assignment_id")
if coverage_id not in updated_coverage_ids or assignment_id is None:
try:
coverage_id = coverage["coverage_id"]
assignment_id = coverage["assigned_to"]["assignment_id"]
except (KeyError, TypeError):
# Either ``coverage_id`` or ``assignment_id`` is not defined
continue

if assignment_id is None:
# This coverage has no Assignment, no need to link to content
continue

_link_assignment_and_content(assignment_id, coverage_id, coverage_id_to_content_id_map[coverage_id])
try:
_link_assignment_and_content(assignment_id, coverage_id, coverage_id_to_content_id_map[coverage_id])
except Exception as err:
logger.exception(err)
logger.error("Failed to link coverage assignment to content")


def before_content_published(_sender: Any, item: Dict[str, Any], updates: Dict[str, Any]):
Expand Down Expand Up @@ -132,7 +151,13 @@ def before_content_published(_sender: Any, item: Dict[str, Any], updates: Dict[s
continue
_update_coverage_assignment_details(coverage, item)

updated_planning = planning_service.patch(planning_id, planning_updates)
try:
updated_planning = planning_service.patch(planning_id, planning_updates)
except Exception as err:
logger.exception(err)
logger.error("Failed to update planning with newly linked coverages")
return

assignment_id = next(
(coverage for coverage in updated_planning.get("coverages", []) if coverage.get("coverage_id") == coverage_id),
{}
Expand All @@ -141,7 +166,13 @@ def before_content_published(_sender: Any, item: Dict[str, Any], updates: Dict[s
logger.warning(f"Failed to get 'assignment_id' of coverage '{coverage_id}'")
return

_link_assignment_and_content(assignment_id, coverage_id, item.get("guid"), True)
try:
_link_assignment_and_content(assignment_id, coverage_id, item.get("guid"), True)
except Exception as err:
logger.exception(err)
logger.error("Failed to link coverage assignment to content")
return

item["assignment_id"] = assignment_id
updates["assignment_id"] = assignment_id

Expand All @@ -166,19 +197,23 @@ def _get_content_item_by_uris(uris: List[str]) -> Optional[Dict[str, Any]]:
# No URIs were provided, so there
return None

req = ParsedRequest()
req.args = {
"source": json.dumps({
"query": {"bool": {"must": [{"terms": {"uri": uris}}]}},
"sort": [{"rewrite_sequence": "asc"}],
"size": 1
}),
"repo": "archive,published,archived",
}
cursor = get_resource_service("search").get(req=req, lookup=None)

if cursor.count():
return cursor[0]
try:
req = ParsedRequest()
req.args = {
"source": json.dumps({
"query": {"bool": {"must": [{"terms": {"uri": uris}}]}},
"sort": [{"rewrite_sequence": "asc"}],
"size": 1
}),
"repo": "archive,published,archived",
}
cursor = get_resource_service("search").get(req=req, lookup=None)

if cursor.count():
return cursor[0]
except Exception as err:
logger.exception(err)
logger.error("Failed to retrieve list of content based on URIs")

return None

Expand All @@ -194,7 +229,7 @@ def _update_coverage_assignment_details(coverage: Dict[str, Any], content: Dict[
ASSIGNMENT_WORKFLOW_STATE.COMPLETED if content.get("pubstatus") is not None
else ASSIGNMENT_WORKFLOW_STATE.IN_PROGRESS
),
"priority": content.get("priority", 2),
"priority": content.get("priority") or coverage["assigned_to"].get("priority") or 2,
"user": content["task"]["user"],
"assignor_desk": content["task"]["user"],
"assignor_user": content["task"]["user"],
Expand Down
96 changes: 65 additions & 31 deletions server/stt/stt_planning_ml.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, Set
from xml.etree.ElementTree import Element
from eve.utils import config

from superdesk import get_resource_service
from superdesk.utc import local_to_utc
from superdesk.io.registry import register_feed_parser

from planning.types import Planning
from planning.feed_parsers.superdesk_planning_xml import PlanningMLParser
from planning.common import get_coverage_from_planning

from .common import planning_xml_contains_remove_signal, unpost_or_spike_event_or_planning, \
remove_date_portion_from_id, original_item_exists
Expand All @@ -27,25 +30,19 @@ def get_item_id(self, tree: Element) -> str:
item_id = super(STTPlanningMLParser, self).get_item_id(tree)
return item_id if original_item_exists("planning", item_id) else remove_date_portion_from_id(item_id)

def parse(self, tree: Element, provider=None):
items = super(STTPlanningMLParser, self).parse(tree, provider)
items_to_ingest = []
planning_service = get_resource_service("planning")
for item in items:
if planning_xml_contains_remove_signal(tree):
unpost_or_spike_event_or_planning(item)
# If the item contains the ``sttinstruct:remove`` signal, no need to ingest this one
continue

planning_item = planning_service.find_one(req=None, _id=item["_id"])
self.check_coverage(
item, planning_item, tree
) if planning_item else self.set_placeholder_coverage(item, tree)
def parse_item(self, tree: Element, original: Optional[Planning]) -> Optional[Planning]:
if original is not None and planning_xml_contains_remove_signal(tree):
unpost_or_spike_event_or_planning(original)
# If the item contains the ``sttinstruct:remove`` signal, no need to ingest this one
return None

self.set_extra_fields(item, tree)
items_to_ingest.append(item)
item = super(STTPlanningMLParser, self).parse_item(tree, original)
if item is None:
return None

return items_to_ingest
self.check_coverage(item, original, tree) if original else self.set_placeholder_coverage(item, tree)
self.set_extra_fields(tree, item, original)
return item

def datetime(self, value: str):
"""When there is no timezone info, assume it's Helsinki timezone."""
Expand All @@ -54,22 +51,20 @@ def datetime(self, value: str):
return local_to_utc(TIMEZONE, parsed)
return parsed

def set_extra_fields(self, item: Dict[str, Any], tree: Element):
def set_extra_fields(self, tree: Element, item: Dict[str, Any], original: Optional[Planning]):
"""Adds extra fields"""

item.setdefault("extra", {})["stt_topics"] = item["guid"].split(":")[-1]

news_coverage_set = tree.find(self.qname("newsCoverageSet"))
if news_coverage_set is not None:
self._create_temp_assignment_deliveries(item, news_coverage_set)
self._create_temp_assignment_deliveries(news_coverage_set, item, original)
content_meta = tree.find(self.qname("contentMeta"))
if content_meta is not None:
self.set_urgency(content_meta, item)

def get_coverage_details(
self, news_coverage_item: Element, item: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
event_id = self._get_linked_event_id(news_coverage_item)
def get_coverage_details(self, news_coverage_elt: Element, item: Planning, original: Optional[Planning]):
event_id = self._get_linked_event_id(news_coverage_elt)
if event_id is not None:
# This entry is an Event and not an actual coverage
if not item.get("event_item"):
Expand All @@ -80,7 +75,7 @@ def get_coverage_details(
# Return ``None`` so this coverage isn't added to the Planning item
return None

return super().get_coverage_details(news_coverage_item, item)
return super().get_coverage_details(news_coverage_elt, item, original)

def _get_linked_event_id(self, news_coverage_item: Element) -> Optional[str]:
planning = news_coverage_item.find(self.qname("planning"))
Expand All @@ -94,27 +89,66 @@ def _get_linked_event_id(self, news_coverage_item: Element) -> Optional[str]:
return None

def _create_temp_assignment_deliveries(
self, item: Dict[str, Any], news_coverage_set: Element
self,
news_coverage_set: Element,
item: Planning,
original: Optional[Planning]
):
"""Create temporary delivery records for later mapping content to coverages"""

delivery_service = get_resource_service("delivery")
planning_id = item[config.ID_FIELD]
guids_processed = []
content_uris_processed: Set[str] = set()
deliveries = []

existing_deliveries: Dict[str, Set[str]] = {}
if original is not None:
for entry in delivery_service.get_from_mongo(req=None, lookup={"planning_id": planning_id}):
try:
existing_deliveries.setdefault(entry["coverage_id"], set())
existing_deliveries[entry["coverage_id"]].add(entry["item_id"])
except (KeyError, TypeError):
# ``coverage_id`` or ``item_id`` not defined for this entry for some reason
pass

for news_coverage_item in news_coverage_set.findall(self.qname("newsCoverage")):
coverage_id = news_coverage_item.get("id")
delivery = news_coverage_item.find(self.qname("delivery"))

if delivery is None:
continue

coverage_id = news_coverage_item.get("id")
original_coverage = get_coverage_from_planning(original, coverage_id) if original else None

try:
if original_coverage["assigned_to"]["assignment_id"] is not None:
# This coverage is already linked to an Assignment
# No need to create a temporary delivery record
continue
except (KeyError, TypeError):
pass

for delivery_item in delivery.findall(self.qname("deliveredItemRef")):
content_guid = delivery_item.get("guidref")
if content_guid is None or content_guid in guids_processed:

if content_guid is None:
# Skip this entry, as no ``guidref`` found
continue

content_uri = remove_date_portion_from_id(content_guid)
if content_uri in content_uris_processed:
# Skip this entry, as we have already processed content with this ``uri``
continue
content_uris_processed.add(content_uri)

try:
if content_uri in existing_deliveries[coverage_id]:
# A delivery entry already exists for this content's ``uri``
# No need to create another one
continue
except (KeyError, TypeError):
# No existing delivery entry for this coverage
pass

# Create temporary ``delivery`` item for this ``coverage`` (without ``assignment_id``)
# This will be used later to lookup when:
# * this Planning item has been created (if content already exists), or
Expand All @@ -123,7 +157,7 @@ def _create_temp_assignment_deliveries(
{
"planning_id": planning_id,
"coverage_id": coverage_id,
"item_id": remove_date_portion_from_id(content_guid),
"item_id": content_uri,
}
)

Expand Down

0 comments on commit ff194a5

Please sign in to comment.