From ff194a51d675cd4a003e98ddb615cfd0ef08b96a Mon Sep 17 00:00:00 2001 From: Mark Pittaway Date: Fri, 2 Feb 2024 17:43:20 +1100 Subject: [PATCH] [SDESK-7166] fix(ingest): Skip linking coverages that already have an assignment Also made the hook code more robust by catching & logging errors, allowing ingested item to succeed ingest procedure --- server/requirements.in | 2 +- server/requirements.txt | 12 ++--- server/stt/signal_hooks.py | 77 ++++++++++++++++++++-------- server/stt/stt_planning_ml.py | 96 ++++++++++++++++++++++++----------- 4 files changed, 128 insertions(+), 59 deletions(-) diff --git a/server/requirements.in b/server/requirements.in index 0a849be..1b36131 100644 --- a/server/requirements.in +++ b/server/requirements.in @@ -2,4 +2,4 @@ gunicorn honcho git+https://github.com/superdesk/superdesk-core.git@v2.7.0-rc2#egg=superdesk-core -git+https://github.com/superdesk/superdesk-planning.git@v2.7.0-rc5#egg=superdesk-planning +git+https://github.com/superdesk/superdesk-planning.git@release/2.7#egg=superdesk-planning diff --git a/server/requirements.txt b/server/requirements.txt index 6889704..21cf10b 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -26,9 +26,9 @@ blinker==1.4 # flask-mail # raven # superdesk-core -boto3==1.34.27 +boto3==1.34.33 # via superdesk-core -botocore==1.34.27 +botocore==1.34.33 # via # boto3 # s3transfer @@ -42,7 +42,7 @@ cerberus==1.3.5 # via # eve # superdesk-core -certifi==2023.11.17 +certifi==2024.2.2 # via # elastic-apm # elasticsearch @@ -73,7 +73,7 @@ click-repl==0.3.0 # via celery croniter==0.3.37 # via superdesk-core -cryptography==42.0.0 +cryptography==42.0.2 # via # authlib # jwcrypto @@ -218,7 +218,7 @@ python-magic==0.4.27 # via superdesk-core python-twitter==3.5 # via superdesk-core -pytz==2023.3.post1 +pytz==2024.1 # via # babel # celery @@ -264,7 +264,7 @@ six==1.16.0 # python-dateutil superdesk-core @ git+https://github.com/superdesk/superdesk-core.git@v2.7.0-rc2 # via -r requirements.in -superdesk-planning @ git+https://github.com/superdesk/superdesk-planning.git@v2.7.0-rc5 +superdesk-planning @ git+https://github.com/superdesk/superdesk-planning.git@release/2.7 # via -r requirements.in typing-extensions==4.9.0 # via superdesk-core diff --git a/server/stt/signal_hooks.py b/server/stt/signal_hooks.py index 48ab0b4..95cc99a 100644 --- a/server/stt/signal_hooks.py +++ b/server/stt/signal_hooks.py @@ -60,6 +60,9 @@ def link_coverages_to_content(_sender: Any, item: Dict[str, Any], original: Opti if coverage["flags"]["placeholder"] is True: # This is a placeholder coverage, and will never be attached to content continue + elif coverage["assigned_to"]["assignment_id"] is not None: + # This coverage is already linked to an Assignment, no need to continue + continue except (KeyError, TypeError): pass @@ -90,14 +93,30 @@ def link_coverages_to_content(_sender: Any, item: Dict[str, Any], original: Opti return # Update the planning item with the latest Assignment information, and link the coverages to the content - updated_item = planning_service.patch(planning_id, updates) + try: + updated_item = planning_service.patch(planning_id, updates) + except Exception as err: + logger.exception(err) + logger.error("Failed to update planning with newly linked coverages") + return + for coverage in updated_item.get("coverages") or []: - coverage_id = coverage.get("coverage_id") - assignment_id = (coverage.get("assigned_to") or {}).get("assignment_id") - if coverage_id not in updated_coverage_ids or assignment_id is None: + try: + coverage_id = coverage["coverage_id"] + assignment_id = coverage["assigned_to"]["assignment_id"] + except (KeyError, TypeError): + # Either ``coverage_id`` or ``assignment_id`` is not defined + continue + + if assignment_id is None: + # This coverage has no Assignment, no need to link to content continue - _link_assignment_and_content(assignment_id, coverage_id, coverage_id_to_content_id_map[coverage_id]) + try: + _link_assignment_and_content(assignment_id, coverage_id, coverage_id_to_content_id_map[coverage_id]) + except Exception as err: + logger.exception(err) + logger.error("Failed to link coverage assignment to content") def before_content_published(_sender: Any, item: Dict[str, Any], updates: Dict[str, Any]): @@ -132,7 +151,13 @@ def before_content_published(_sender: Any, item: Dict[str, Any], updates: Dict[s continue _update_coverage_assignment_details(coverage, item) - updated_planning = planning_service.patch(planning_id, planning_updates) + try: + updated_planning = planning_service.patch(planning_id, planning_updates) + except Exception as err: + logger.exception(err) + logger.error("Failed to update planning with newly linked coverages") + return + assignment_id = next( (coverage for coverage in updated_planning.get("coverages", []) if coverage.get("coverage_id") == coverage_id), {} @@ -141,7 +166,13 @@ def before_content_published(_sender: Any, item: Dict[str, Any], updates: Dict[s logger.warning(f"Failed to get 'assignment_id' of coverage '{coverage_id}'") return - _link_assignment_and_content(assignment_id, coverage_id, item.get("guid"), True) + try: + _link_assignment_and_content(assignment_id, coverage_id, item.get("guid"), True) + except Exception as err: + logger.exception(err) + logger.error("Failed to link coverage assignment to content") + return + item["assignment_id"] = assignment_id updates["assignment_id"] = assignment_id @@ -166,19 +197,23 @@ def _get_content_item_by_uris(uris: List[str]) -> Optional[Dict[str, Any]]: # No URIs were provided, so there return None - req = ParsedRequest() - req.args = { - "source": json.dumps({ - "query": {"bool": {"must": [{"terms": {"uri": uris}}]}}, - "sort": [{"rewrite_sequence": "asc"}], - "size": 1 - }), - "repo": "archive,published,archived", - } - cursor = get_resource_service("search").get(req=req, lookup=None) - - if cursor.count(): - return cursor[0] + try: + req = ParsedRequest() + req.args = { + "source": json.dumps({ + "query": {"bool": {"must": [{"terms": {"uri": uris}}]}}, + "sort": [{"rewrite_sequence": "asc"}], + "size": 1 + }), + "repo": "archive,published,archived", + } + cursor = get_resource_service("search").get(req=req, lookup=None) + + if cursor.count(): + return cursor[0] + except Exception as err: + logger.exception(err) + logger.error("Failed to retrieve list of content based on URIs") return None @@ -194,7 +229,7 @@ def _update_coverage_assignment_details(coverage: Dict[str, Any], content: Dict[ ASSIGNMENT_WORKFLOW_STATE.COMPLETED if content.get("pubstatus") is not None else ASSIGNMENT_WORKFLOW_STATE.IN_PROGRESS ), - "priority": content.get("priority", 2), + "priority": content.get("priority") or coverage["assigned_to"].get("priority") or 2, "user": content["task"]["user"], "assignor_desk": content["task"]["user"], "assignor_user": content["task"]["user"], diff --git a/server/stt/stt_planning_ml.py b/server/stt/stt_planning_ml.py index bc1309a..233c375 100644 --- a/server/stt/stt_planning_ml.py +++ b/server/stt/stt_planning_ml.py @@ -1,11 +1,14 @@ -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, Set from xml.etree.ElementTree import Element from eve.utils import config from superdesk import get_resource_service from superdesk.utc import local_to_utc from superdesk.io.registry import register_feed_parser + +from planning.types import Planning from planning.feed_parsers.superdesk_planning_xml import PlanningMLParser +from planning.common import get_coverage_from_planning from .common import planning_xml_contains_remove_signal, unpost_or_spike_event_or_planning, \ remove_date_portion_from_id, original_item_exists @@ -27,25 +30,19 @@ def get_item_id(self, tree: Element) -> str: item_id = super(STTPlanningMLParser, self).get_item_id(tree) return item_id if original_item_exists("planning", item_id) else remove_date_portion_from_id(item_id) - def parse(self, tree: Element, provider=None): - items = super(STTPlanningMLParser, self).parse(tree, provider) - items_to_ingest = [] - planning_service = get_resource_service("planning") - for item in items: - if planning_xml_contains_remove_signal(tree): - unpost_or_spike_event_or_planning(item) - # If the item contains the ``sttinstruct:remove`` signal, no need to ingest this one - continue - - planning_item = planning_service.find_one(req=None, _id=item["_id"]) - self.check_coverage( - item, planning_item, tree - ) if planning_item else self.set_placeholder_coverage(item, tree) + def parse_item(self, tree: Element, original: Optional[Planning]) -> Optional[Planning]: + if original is not None and planning_xml_contains_remove_signal(tree): + unpost_or_spike_event_or_planning(original) + # If the item contains the ``sttinstruct:remove`` signal, no need to ingest this one + return None - self.set_extra_fields(item, tree) - items_to_ingest.append(item) + item = super(STTPlanningMLParser, self).parse_item(tree, original) + if item is None: + return None - return items_to_ingest + self.check_coverage(item, original, tree) if original else self.set_placeholder_coverage(item, tree) + self.set_extra_fields(tree, item, original) + return item def datetime(self, value: str): """When there is no timezone info, assume it's Helsinki timezone.""" @@ -54,22 +51,20 @@ def datetime(self, value: str): return local_to_utc(TIMEZONE, parsed) return parsed - def set_extra_fields(self, item: Dict[str, Any], tree: Element): + def set_extra_fields(self, tree: Element, item: Dict[str, Any], original: Optional[Planning]): """Adds extra fields""" item.setdefault("extra", {})["stt_topics"] = item["guid"].split(":")[-1] news_coverage_set = tree.find(self.qname("newsCoverageSet")) if news_coverage_set is not None: - self._create_temp_assignment_deliveries(item, news_coverage_set) + self._create_temp_assignment_deliveries(news_coverage_set, item, original) content_meta = tree.find(self.qname("contentMeta")) if content_meta is not None: self.set_urgency(content_meta, item) - def get_coverage_details( - self, news_coverage_item: Element, item: Dict[str, Any] - ) -> Optional[Dict[str, Any]]: - event_id = self._get_linked_event_id(news_coverage_item) + def get_coverage_details(self, news_coverage_elt: Element, item: Planning, original: Optional[Planning]): + event_id = self._get_linked_event_id(news_coverage_elt) if event_id is not None: # This entry is an Event and not an actual coverage if not item.get("event_item"): @@ -80,7 +75,7 @@ def get_coverage_details( # Return ``None`` so this coverage isn't added to the Planning item return None - return super().get_coverage_details(news_coverage_item, item) + return super().get_coverage_details(news_coverage_elt, item, original) def _get_linked_event_id(self, news_coverage_item: Element) -> Optional[str]: planning = news_coverage_item.find(self.qname("planning")) @@ -94,27 +89,66 @@ def _get_linked_event_id(self, news_coverage_item: Element) -> Optional[str]: return None def _create_temp_assignment_deliveries( - self, item: Dict[str, Any], news_coverage_set: Element + self, + news_coverage_set: Element, + item: Planning, + original: Optional[Planning] ): """Create temporary delivery records for later mapping content to coverages""" delivery_service = get_resource_service("delivery") planning_id = item[config.ID_FIELD] - guids_processed = [] + content_uris_processed: Set[str] = set() deliveries = [] + existing_deliveries: Dict[str, Set[str]] = {} + if original is not None: + for entry in delivery_service.get_from_mongo(req=None, lookup={"planning_id": planning_id}): + try: + existing_deliveries.setdefault(entry["coverage_id"], set()) + existing_deliveries[entry["coverage_id"]].add(entry["item_id"]) + except (KeyError, TypeError): + # ``coverage_id`` or ``item_id`` not defined for this entry for some reason + pass + for news_coverage_item in news_coverage_set.findall(self.qname("newsCoverage")): - coverage_id = news_coverage_item.get("id") delivery = news_coverage_item.find(self.qname("delivery")) - if delivery is None: continue + coverage_id = news_coverage_item.get("id") + original_coverage = get_coverage_from_planning(original, coverage_id) if original else None + + try: + if original_coverage["assigned_to"]["assignment_id"] is not None: + # This coverage is already linked to an Assignment + # No need to create a temporary delivery record + continue + except (KeyError, TypeError): + pass + for delivery_item in delivery.findall(self.qname("deliveredItemRef")): content_guid = delivery_item.get("guidref") - if content_guid is None or content_guid in guids_processed: + + if content_guid is None: + # Skip this entry, as no ``guidref`` found continue + content_uri = remove_date_portion_from_id(content_guid) + if content_uri in content_uris_processed: + # Skip this entry, as we have already processed content with this ``uri`` + continue + content_uris_processed.add(content_uri) + + try: + if content_uri in existing_deliveries[coverage_id]: + # A delivery entry already exists for this content's ``uri`` + # No need to create another one + continue + except (KeyError, TypeError): + # No existing delivery entry for this coverage + pass + # Create temporary ``delivery`` item for this ``coverage`` (without ``assignment_id``) # This will be used later to lookup when: # * this Planning item has been created (if content already exists), or @@ -123,7 +157,7 @@ def _create_temp_assignment_deliveries( { "planning_id": planning_id, "coverage_id": coverage_id, - "item_id": remove_date_portion_from_id(content_guid), + "item_id": content_uri, } )