From 169622173ea929a7566b22e6708d5b66a2be16b0 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Wed, 18 Sep 2024 19:42:59 -0700 Subject: [PATCH] working (i think?) recursive fetch --- app/controllers/api/v1/statuses_controller.rb | 32 ++++++------- app/lib/activitypub/activity/create.rb | 2 +- app/models/status.rb | 10 ++++ .../activitypub/fetch_replies_service.rb | 46 ++++++++++++------- ...233930_add_fetched_replies_at_to_status.rb | 9 ++++ db/schema.rb | 3 +- 6 files changed, 66 insertions(+), 36 deletions(-) create mode 100644 db/migrate/20240918233930_add_fetched_replies_at_to_status.rb diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 37bdcba36a11d2..462802f3cb362b 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -49,24 +49,20 @@ def context descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT else - unless @status.local? - json_status = fetch_resource(@status.uri, true, @current_account) - - logger.warn "json status" - logger.warn json_status - # rescue this whole block on failure, don't want to fail the whole context request if we can't do this - collection = json_status['replies'] - logger.warn "replies uri" - logger.warn collection - - unless collection.nil? - ActivityPub::FetchRepliesService.new.call( - @status, - collection, - allow_synchronous_requests: true, - all_replies: true - ) - end + unless @status.local? && !@status.should_fetch_replies? + json_status = fetch_resource(@status.uri, true, @current_account) + + # rescue this whole block on failure, don't want to fail the whole context request if we can't do this + collection = json_status['replies'] + + unless collection.nil? + ActivityPub::FetchRepliesService.new.call( + @status, + collection, + allow_synchronous_requests: true, + all_replies: true + ) + end end end diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb index f69b68392a5a01..1aa8576345bf73 100644 --- a/app/lib/activitypub/activity/create.rb +++ b/app/lib/activitypub/activity/create.rb @@ -338,7 +338,7 @@ def fetch_replies(status) collection = @object['replies'] return if collection.blank? - replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id]) + replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id], all_replies: status.should_fetch_replies?) return unless replies.nil? uri = value_or_id(collection) diff --git a/app/models/status.rb b/app/models/status.rb index 4f2ceb9ca95271..41505279963613 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -29,6 +29,7 @@ # edited_at :datetime # trendable :boolean # ordered_media_attachment_ids :bigint(8) is an Array +# fetched_replies_at :datetime # class Status < ApplicationRecord @@ -183,6 +184,8 @@ class Status < ApplicationRecord delegate :domain, to: :account, prefix: true REAL_TIME_WINDOW = 6.hours + # debounce fetching all replies to minimize DoS + FETCH_REPLIES_DEBOUNCE = 1.hour def cache_key "v3:#{super}" @@ -440,6 +443,13 @@ def unlink_from_conversations! end end + def should_fetch_replies? + # we aren't brand new, and we haven't fetched replies since the debounce window + created_at <= 10.minutes.ago && ( + fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago + ) + end + private def update_status_stat!(attrs) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index d9b3a09a207eff..3ab7e7e0460d11 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -5,22 +5,27 @@ class ActivityPub::FetchRepliesService < BaseService # Limit of fetched replies used when not fetching all replies MAX_REPLIES_LOW = 5 + # limit of fetched replies used when fetching all replies + MAX_REPLIES_HIGH = 500 def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) # Whether we are getting replies from more than the originating server, # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` @all_replies = all_replies + # store the status and whether we should fetch replies for it to avoid + # race conditions if something else updates us in the meantime + @status = parent_status + @should_fetch_replies = parent_status.should_fetch_replies? @account = parent_status.account @allow_synchronous_requests = allow_synchronous_requests @items = collection_items(collection_or_uri) - logger = Logger.new(STDOUT) - logger.warn 'collection items' - logger.warn @items return if @items.nil? FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + # Store last fetched all to debounce + @status.update(fetched_replies_at: Time.now.utc) if fetch_all_replies? @items end @@ -28,26 +33,30 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req private def collection_items(collection_or_uri) - logger = Logger.new(STDOUT) collection = fetch_collection(collection_or_uri) - logger.warn 'first collection' - logger.warn collection return unless collection.is_a?(Hash) collection = fetch_collection(collection['first']) if collection['first'].present? - logger.warn 'second collection' - logger.warn collection return unless collection.is_a?(Hash) - # Need to do another "next" here. see https://neuromatch.social/users/jonny/statuses/112401738180959195/replies for example - # then we are home free (stopping for tonight tho.) + all_items = [] + while collection.is_a?(Hash) + items = case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end - case collection['type'] - when 'Collection', 'CollectionPage' - as_array(collection['items']) - when 'OrderedCollection', 'OrderedCollectionPage' - as_array(collection['orderedItems']) + all_items.concat(as_array(items)) + + # Quit early if we are not fetching all replies + break if all_items.size >= MAX_REPLIES_HIGH || !fetch_all_replies? + + collection = collection['next'].present? ? fetch_collection(collection['next']) : nil end + + all_items end def fetch_collection(collection_or_uri) @@ -73,7 +82,8 @@ def fetch_collection(collection_or_uri) def filtered_replies if @all_replies - @items.map { |item| value_or_id(item) } + # Reject all statuses that we already have in the db + @items.map { |item| value_or_id(item) }.reject { |uri| Status.exists?(uri: uri) } else # Only fetch replies to the same server as the original status to avoid # amplification attacks. @@ -82,4 +92,8 @@ def filtered_replies @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES_LOW) end end + + def fetch_all_replies? + @all_replies && @should_fetch_replies + end end diff --git a/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb new file mode 100644 index 00000000000000..c42eff6aeb38fc --- /dev/null +++ b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1] + disable_ddl_transaction! + + def change + add_column :statuses, :fetched_replies_at, :datetime, null: true + end +end diff --git a/db/schema.rb b/db/schema.rb index a3afab78167fe8..e550fa618d250c 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2024_08_08_125420) do +ActiveRecord::Schema[7.1].define(version: 2024_09_18_233930) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -1089,6 +1089,7 @@ t.datetime "edited_at", precision: nil t.boolean "trendable" t.bigint "ordered_media_attachment_ids", array: true + t.datetime "fetched_replies_at" t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)" t.index ["account_id"], name: "index_statuses_on_account_id" t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)"