diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 09143127d5464f..50be4c9e8b4a58 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -9,6 +9,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) @allow_synchronous_requests = allow_synchronous_requests @filter_by_host = false + @collection_or_uri = collection_or_uri @items = collection_items(collection_or_uri) @items = filtered_replies @@ -40,6 +41,9 @@ def filtered_replies Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at) # Reject all statuses that we already have in the db - uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES) + uris = uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES) + + Rails.logger.debug { "FetchAllRepliesService - #{@collection_or_uri}: Fetching filtered statuses: #{uris}" } + uris end end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index b641ef62bcb4b7..0f4fd83caf6a78 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -16,12 +16,15 @@ class ActivityPub::FetchAllRepliesWorker def perform(parent_status_id, options = {}) @parent_status = Status.find(parent_status_id) - Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: Fetching all replies for status: #{@parent_status}" } + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" } uris_to_fetch = get_replies(@parent_status.uri, options) - fetched_uris = uris_to_fetch.clone return if uris_to_fetch.nil? + @parent_status.touch(:fetched_replies_at) + + fetched_uris = uris_to_fetch.clone.to_set + until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES next_reply = uris_to_fetch.pop next if next_reply.nil? @@ -29,8 +32,10 @@ def perform(parent_status_id, options = {}) new_reply_uris = get_replies(next_reply, options) next if new_reply_uris.nil? + new_reply_uris = new_reply_uris.reject { |uri| fetched_uris.include?(uri) } + uris_to_fetch.concat(new_reply_uris) - fetched_uris.concat(new_reply_uris) + fetched_uris = fetched_uris.merge(new_reply_uris) end Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" } @@ -49,11 +54,20 @@ def get_replies(status_uri, options = {}) def get_replies_uri(parent_status_uri) begin json_status = fetch_resource(parent_status_uri, true) - replies_collection_or_uri = json_status['replies'] - Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status_id}: replies URI was nil" } if replies_collection_or_uri.nil? - replies_collection_or_uri + if json_status.nil? + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: error getting replies URI for #{parent_status_uri}, returned nil" } + nil + elsif !json_status.key?('replies') + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: no replies collection found in ActivityPub object: #{json_status}" } + nil + else + json_status['replies'] + end rescue => e - Rails.logger.error { "FetchAllRepliesWorker - #{@parent_status_id}: Got exception while resolving replies URI: #{e} - #{e.message}" } + Rails.logger.warn { "FetchAllRepliesWorker - #{@parent_status.uri}: caught exception fetching replies URI: #{e}" } + # Raise if we can't get the collection for top-level status to trigger retry + raise e if parent_status_uri == @parent_status.uri + nil end end