From 20dc814c809d7a0d6565cbf2c3eb269c5dcfc31a Mon Sep 17 00:00:00 2001 From: Ryan Barrett Date: Thu, 28 Nov 2024 21:39:30 -0800 Subject: [PATCH] add 2m delay to receive tasks for deletes for #1361 --- activitypub.py | 5 +++-- atproto_firehose.py | 5 ++++- protocol.py | 1 + tests/test_activitypub.py | 18 ++++++++++++++++-- tests/test_atproto_firehose.py | 9 +++++++-- 5 files changed, 31 insertions(+), 7 deletions(-) diff --git a/activitypub.py b/activitypub.py index 77ff7598..be11fecd 100644 --- a/activitypub.py +++ b/activitypub.py @@ -42,7 +42,7 @@ ) from ids import BOT_ACTOR_AP_IDS from models import fetch_objects, Follower, Object, PROTOCOLS, User -from protocol import activity_id_memcache_key, Protocol +from protocol import activity_id_memcache_key, DELETE_TASK_DELAY, Protocol import webfinger logger = logging.getLogger(__name__) @@ -1138,9 +1138,10 @@ def inbox(protocol=None, id=None): if user and not user.existing: logger.info(f'Automatically enabled AP server actor {actor_id} for {user.enabled_protocols}') + delay = DELETE_TASK_DELAY if type in ('Delete', 'Undo') else None return create_task(queue='receive', id=id, as2=activity, source_protocol=ActivityPub.LABEL, authed_as=authed_as, - received_at=util.now().isoformat()) + received_at=util.now().isoformat(), delay=delay) # protocol in subdomain diff --git a/atproto_firehose.py b/atproto_firehose.py index 81cd60cb..e5d36e56 100644 --- a/atproto_firehose.py +++ b/atproto_firehose.py @@ -37,6 +37,7 @@ USER_AGENT, ) from models import Object, reset_protocol_properties +from protocol import DELETE_TASK_DELAY from web import Web logger = logging.getLogger(__name__) @@ -330,9 +331,11 @@ def _handle(op): logger.error(f'Unknown action {action} for {op.repo} {op.path}') return + delay = DELETE_TASK_DELAY if op.action == 'delete' else None try: create_task(queue='receive', id=obj_id, source_protocol=ATProto.LABEL, - authed_as=op.repo, received_at=op.time, **record_kwarg) + authed_as=op.repo, received_at=op.time, delay=delay, + **record_kwarg) # when running locally, comment out above and uncomment this # logger.info(f'enqueuing receive task for {at_uri}') except ContextError: diff --git a/protocol.py b/protocol.py index 067fb48b..a86556ce 100644 --- a/protocol.py +++ b/protocol.py @@ -52,6 +52,7 @@ ) OBJECT_REFRESH_AGE = timedelta(days=30) +DELETE_TASK_DELAY = timedelta(minutes=2) # require a follow for users on these domains before we deliver anything from # them other than their profile diff --git a/tests/test_activitypub.py b/tests/test_activitypub.py index de21c387..a0960b16 100644 --- a/tests/test_activitypub.py +++ b/tests/test_activitypub.py @@ -36,6 +36,7 @@ from flask_app import app from models import Follower, Object, Target import protocol +from protocol import DELETE_TASK_DELAY from web import Web # have to import module, not attrs, to avoid circular import @@ -662,13 +663,26 @@ def test_inbox_actor_id_on_opted_out_web_domain(self, mock_create_task, *_): def test_inbox_create_receive_task(self, mock_create_task, *mocks): common.RUN_TASKS_INLINE = False - author = self.make_user(ACTOR['id'], cls=ActivityPub, obj_as2=ACTOR) + self.make_user(ACTOR['id'], cls=ActivityPub, obj_as2=ACTOR) resp = self.post('/ap/sharedInbox', json=NOTE) self.assert_task(mock_create_task, 'receive', id='http://mas.to/note/as2', source_protocol='activitypub', as2=NOTE, - authed_as=NOTE['actor'], + authed_as=ACTOR['id'], received_at='2022-01-02T03:04:05+00:00') + @patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task') + def test_inbox_delete_receive_task(self, mock_create_task, *mocks): + common.RUN_TASKS_INLINE = False + + self.make_user(ACTOR['id'], cls=ActivityPub, obj_as2=ACTOR) + resp = self.post('/ap/sharedInbox', json=DELETE) + delayed_eta = util.to_utc_timestamp(NOW) + DELETE_TASK_DELAY.total_seconds() + self.assert_task(mock_create_task, 'receive', id=DELETE['id'], + source_protocol='activitypub', as2=DELETE, + authed_as=ACTOR['id'], + received_at='2022-01-02T03:04:05+00:00', + eta_seconds=delayed_eta) + def test_inbox_reply_object(self, mock_head, mock_get, mock_post): self._test_inbox_reply(REPLY_OBJECT, mock_head, mock_get, mock_post) diff --git a/tests/test_atproto_firehose.py b/tests/test_atproto_firehose.py index e011ce31..e3583486 100644 --- a/tests/test_atproto_firehose.py +++ b/tests/test_atproto_firehose.py @@ -30,6 +30,7 @@ import common from models import Object, Target import protocol +from protocol import DELETE_TASK_DELAY from .testutil import TestCase from .test_atproto import DID_DOC from web import Web @@ -490,9 +491,11 @@ def test_delete_post(self, mock_create_task): 'actor': 'did:plc:user', 'object': obj_id, } + delayed_eta = util.to_utc_timestamp(NOW) + DELETE_TASK_DELAY.total_seconds() self.assert_task(mock_create_task, 'receive', id=delete_id, our_as1=expected_as1, source_protocol='atproto', - authed_as='did:plc:user', received_at='1900-02-04') + authed_as='did:plc:user', received_at='1900-02-04', + eta_seconds=delayed_eta) def test_delete_block_is_undo(self, mock_create_task): commits.put(Op(repo='did:plc:user', action='delete', seq=789, @@ -510,9 +513,11 @@ def test_delete_block_is_undo(self, mock_create_task): 'actor': 'did:plc:user', 'object': obj_id, } + delayed_eta = util.to_utc_timestamp(NOW) + DELETE_TASK_DELAY.total_seconds() self.assert_task(mock_create_task, 'receive', id=undo_id, our_as1=expected_as1, source_protocol='atproto', - authed_as='did:plc:user', received_at='1900-02-04') + authed_as='did:plc:user', received_at='1900-02-04', + eta_seconds=delayed_eta) def test_unsupported_type(self, mock_create_task): orig_objs = Object.query().count()