-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add view find-broken-links that return a csv file with a list of contents with broken links in blocks * flake 8 * code cleanup
- Loading branch information
Showing
5 changed files
with
221 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
from Acquisition import aq_base | ||
from plone import api | ||
from plone.dexterity.utils import iterSchemata | ||
from plone.restapi.serializer.utils import uid_to_url | ||
from Products.Five import BrowserView | ||
from six import StringIO | ||
from zope.schema import getFieldsInOrder | ||
|
||
|
||
try: | ||
from collective.volto.blocksfield.field import BlocksField | ||
|
||
HAS_BLOCKSFIELD = True | ||
except ImportError: | ||
HAS_BLOCKSFIELD = False | ||
|
||
import csv | ||
import logging | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class View(BrowserView): | ||
def __call__(self): | ||
""" | ||
Check all contents if there are some internal links with resolveuid broken | ||
""" | ||
|
||
results = self.check_links() | ||
self.request.response.setHeader("Content-type", "application/csv") | ||
self.request.response.setHeader( | ||
"Content-disposition", "attachment; filename=incarichi.csv" | ||
) | ||
|
||
sbuf = StringIO() | ||
writer = csv.writer(sbuf, delimiter=" ", quoting=csv.QUOTE_ALL) | ||
writer.writerow(["url"]) | ||
for row in results: | ||
writer.writerow([row]) | ||
|
||
res = sbuf.getvalue() | ||
sbuf.close() | ||
return res | ||
|
||
def check_links(self): | ||
""" | ||
Check on root and all contents | ||
""" | ||
res = [] | ||
# first of all, check them on root | ||
res.extend(self.check_links_on_root()) | ||
# then check on contents | ||
res.extend(self.check_links_on_contents()) | ||
return res | ||
|
||
def check_links_on_root(self): | ||
""" | ||
Check root blocks | ||
""" | ||
logger.info("## Check broken links on Site Root ##") | ||
portal = api.portal.get() | ||
blocks = getattr(portal, "blocks", {}) | ||
|
||
if self.check_blocks_broken_links(data=blocks): | ||
return [portal.portal_url()] | ||
return [] | ||
|
||
def check_links_on_contents(self): | ||
""" | ||
Iterate over site contents | ||
""" | ||
logger.info("## Check broken links on Content-types ##") | ||
portal_catalog = api.portal.get_tool("portal_catalog") | ||
brains = portal_catalog() | ||
tot = len(brains) | ||
i = 0 | ||
res = [] | ||
for brain in brains: | ||
i += 1 | ||
if (i + 1) % 200 == 0: | ||
logger.info(f" - Progress {i}/{tot}") | ||
item = brain.getObject() | ||
aq_base_obj = aq_base(item) | ||
|
||
blocks = getattr(item, "blocks", {}) | ||
if blocks: | ||
if self.check_blocks_broken_links(data=blocks): | ||
res.append(brain.getURL()) | ||
continue | ||
if not HAS_BLOCKSFIELD: | ||
continue | ||
has_broken_links = False | ||
for schemata in iterSchemata(aq_base_obj): | ||
for name, field in getFieldsInOrder(schemata): | ||
if not isinstance(field, BlocksField): | ||
continue | ||
value = field.get(item) | ||
if not value: | ||
continue | ||
blocks = value.get("blocks", {}) | ||
if self.check_blocks_broken_links(data=blocks): | ||
has_broken_links = True | ||
break | ||
if has_broken_links: | ||
res.append(brain.getURL()) | ||
return res | ||
|
||
def check_blocks_broken_links(self, data): | ||
""" | ||
Recursive method that check if there is a broken resolveuid in a block prop | ||
""" | ||
if isinstance(data, str): | ||
if "resolveuid" not in data: | ||
return False | ||
if uid_to_url(data) == data: | ||
return True | ||
return False | ||
if isinstance(data, list): | ||
for child in data: | ||
res = self.check_blocks_broken_links(data=child) | ||
if res: | ||
return True | ||
return False | ||
if isinstance(data, dict): | ||
for child in data.values(): | ||
res = self.check_blocks_broken_links(data=child) | ||
if res: | ||
return True | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# -*- coding: utf-8 -*- | ||
from plone import api | ||
from plone.app.testing import setRoles | ||
from plone.app.testing import TEST_USER_ID | ||
from redturtle.volto.testing import REDTURTLE_VOLTO_INTEGRATION_TESTING | ||
|
||
import unittest | ||
|
||
|
||
class TestFindBrokenLinksView(unittest.TestCase): | ||
layer = REDTURTLE_VOLTO_INTEGRATION_TESTING | ||
maxDiff = None | ||
|
||
def setUp(self): | ||
self.app = self.layer["app"] | ||
self.portal = self.layer["portal"] | ||
self.request = self.layer["request"] | ||
setRoles(self.portal, TEST_USER_ID, ["Manager"]) | ||
|
||
self.view = api.content.get_view( | ||
name="find-broken-links", context=self.portal, request=self.request | ||
) | ||
|
||
def test_view_does_not_return_contents_if_have_working_links(self): | ||
page_a = api.content.create( | ||
container=self.portal, type="Document", title="Page A" | ||
) | ||
|
||
api.content.create( | ||
container=self.portal, | ||
type="Document", | ||
title="Page B", | ||
blocks={"xxx": {"foo": f"resolveuid/{page_a.UID()}"}}, | ||
) | ||
|
||
self.assertEqual([], self.view.check_links()) | ||
|
||
def test_view_return_contents_with_broken_links(self): | ||
page_a = api.content.create( | ||
container=self.portal, type="Document", title="Page A" | ||
) | ||
|
||
page_b = api.content.create( | ||
container=self.portal, | ||
type="Document", | ||
title="Page B", | ||
blocks={"xxx": {"foo": f"resolveuid/{page_a.UID()}"}}, | ||
) | ||
api.content.delete(obj=page_a, check_linkintegrity=False) | ||
|
||
res = self.view.check_links() | ||
self.assertEqual(len(res), 1) | ||
self.assertIn(page_b.absolute_url(), res) | ||
|
||
def test_view_can_check_several_patterns(self): | ||
page_a = api.content.create( | ||
container=self.portal, type="Document", title="Page A" | ||
) | ||
|
||
page_b = api.content.create( | ||
container=self.portal, | ||
type="Document", | ||
title="Page B", | ||
blocks={"xxx": {"bar": f"../resolveuid/{page_a.UID()}"}}, | ||
) | ||
|
||
page_c = api.content.create( | ||
container=self.portal, | ||
type="Document", | ||
title="Page C", | ||
blocks={"xxx": {"baz": f"../resolveuid/{page_a.UID()}/asd"}}, | ||
) | ||
api.content.delete(obj=page_a, check_linkintegrity=False) | ||
|
||
res = self.view.check_links() | ||
self.assertEqual(len(res), 2) | ||
self.assertIn(page_b.absolute_url(), res) | ||
self.assertIn(page_c.absolute_url(), res) |