Skip to content

Commit

Permalink
Merge pull request #44 from NeuromatchAcademy/fetch-all-replies-service
Browse files Browse the repository at this point in the history
Fetch All Replies v2 - Service Edition
  • Loading branch information
sneakers-the-rat authored Oct 20, 2024
2 parents 0621d39 + 8048524 commit 00d9cb9
Show file tree
Hide file tree
Showing 11 changed files with 558 additions and 23 deletions.
18 changes: 18 additions & 0 deletions .env.production.sample
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,24 @@ MAX_POLL_OPTION_CHARS=100
# New registrations will automatically follow these accounts (separated by commas)
AUTOFOLLOW=

# -- Fetch all replies settings --
# When a user expands a post (DetailedStatus view), fetch all of its replies
# (default: true if unset, set explicitly to ``false`` to disable)
FETCH_REPLIES_ENABLED=true

# Period to wait between fetching replies (in minutes)
FETCH_REPLIES_DEBOUNCE=15

# Period to wait after a post is first created before fetching its replies (in minutes)
FETCH_REPLIES_CREATED_RECENTLY=5

# Max number of replies to fetch - total, recursively through a whole reply tree
FETCH_REPLIES_MAX_GLOBAL=1000

# Max number of replies to fetch - for a single post
FETCH_REPLIES_MAX_SINGLE=500


# IP and session retention
# -----------------------
# Make sure to modify the scheduling of ip_cleanup_scheduler in config/sidekiq.yml
Expand Down
9 changes: 9 additions & 0 deletions app/controllers/api/v1/statuses_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ def context
statuses = [@status] + @context.ancestors + @context.descendants

render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id)

if !current_account.nil? && @status.should_fetch_replies?
ActivityPub::FetchAllRepliesWorker.perform_async(
@status.id,
{
allow_synchronous_requests: true,
}
)
end
end

def create
Expand Down
29 changes: 29 additions & 0 deletions app/models/concerns/status/fetch_replies_concern.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

module Status::FetchRepliesConcern
extend ActiveSupport::Concern

# enable/disable fetching all replies
FETCH_REPLIES_ENABLED = ENV.key?('FETCH_REPLIES_ENABLED') ? ENV['FETCH_REPLIES_ENABLED'] == 'true' : true

# debounce fetching all replies to minimize DoS
FETCH_REPLIES_DEBOUNCE = (ENV['FETCH_REPLIES_DEBOUNCE'] || 15).to_i.minutes
CREATED_RECENTLY_DEBOUNCE = (ENV['FETCH_REPLIES_CREATED_RECENTLY'] || 5).to_i.minutes

included do
scope :created_recently, -> { where(created_at: CREATED_RECENTLY_DEBOUNCE.ago..) }
scope :not_created_recently, -> { where(created_at: ..CREATED_RECENTLY_DEBOUNCE.ago) }
scope :fetched_recently, -> { where(fetched_replies_at: FETCH_REPLIES_DEBOUNCE.ago..) }
scope :not_fetched_recently, -> { where(fetched_replies_at: ..FETCH_REPLIES_DEBOUNCE.ago).or(where(fetched_replies_at: nil)) }

scope :shouldnt_fetch_replies, -> { local.merge(created_recently).merge(fetched_recently) }
scope :should_fetch_replies, -> { local.invert_where.merge(not_created_recently).merge(not_fetched_recently) }
end

def should_fetch_replies?
# we aren't brand new, and we haven't fetched replies since the debounce window
FETCH_REPLIES_ENABLED && !local? && created_at <= CREATED_RECENTLY_DEBOUNCE.ago && (
fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago
)
end
end
2 changes: 2 additions & 0 deletions app/models/status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
# edited_at :datetime
# trendable :boolean
# ordered_media_attachment_ids :bigint(8) is an Array
# fetched_replies_at :datetime
#

class Status < ApplicationRecord
include Cacheable
include Discard::Model
include Paginable
include RateLimitable
include Status::FetchRepliesConcern
include Status::SafeReblogInsert
include Status::SearchConcern
include Status::SnapshotConcern
Expand Down
49 changes: 49 additions & 0 deletions app/services/activitypub/fetch_all_replies_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# frozen_string_literal: true

class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService
include JsonLdHelper

# Limit of replies to fetch per status
MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i

def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil)
@allow_synchronous_requests = allow_synchronous_requests
@filter_by_host = false
@collection_or_uri = collection_or_uri

@items = collection_items(collection_or_uri)
@items = filtered_replies
return if @items.nil?

FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] }

@items
end

private

def filtered_replies
return if @items.nil?

# Find all statuses that we *shouldn't* update the replies for, and use that as a filter.
# We don't assume that we have the statuses before they're created,
# hence the negative filter -
# "keep all these uris except the ones we already have"
# instead of
# "keep all these uris that match some conditions on existing Status objects"
#
# Typically we assume the number of replies we *shouldn't* fetch is smaller than the
# replies we *should* fetch, so we also minimize the number of uris we should load here.
uris = @items.map { |item| value_or_id(item) }
dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri)

# touch all statuses that already exist and that we're about to update
Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at)

# Reject all statuses that we already have in the db
uris = uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES)

Rails.logger.debug { "FetchAllRepliesService - #{@collection_or_uri}: Fetching filtered statuses: #{uris}" }
uris
end
end
41 changes: 30 additions & 11 deletions app/services/activitypub/fetch_replies_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
class ActivityPub::FetchRepliesService < BaseService
include JsonLdHelper

def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil)
# Limit of fetched replies
MAX_REPLIES = 5

def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, filter_by_host: true)
@account = parent_status.account
@allow_synchronous_requests = allow_synchronous_requests
@filter_by_host = filter_by_host

@items = collection_items(collection_or_uri)
return if @items.nil?
Expand All @@ -24,18 +28,29 @@ def collection_items(collection_or_uri)
collection = fetch_collection(collection['first']) if collection['first'].present?
return unless collection.is_a?(Hash)

case collection['type']
when 'Collection', 'CollectionPage'
as_array(collection['items'])
when 'OrderedCollection', 'OrderedCollectionPage'
as_array(collection['orderedItems'])
all_items = []
while collection.is_a?(Hash)
items = case collection['type']
when 'Collection', 'CollectionPage'
collection['items']
when 'OrderedCollection', 'OrderedCollectionPage'
collection['orderedItems']
end

all_items.concat(as_array(items))

break if all_items.size > MAX_REPLIES

collection = collection['next'].present? ? fetch_collection(collection['next']) : nil
end

all_items
end

def fetch_collection(collection_or_uri)
return collection_or_uri if collection_or_uri.is_a?(Hash)
return unless @allow_synchronous_requests
return if non_matching_uri_hosts?(@account.uri, collection_or_uri)
return if @filter_by_host && non_matching_uri_hosts?(@account.uri, collection_or_uri)

# NOTE: For backward compatibility reasons, Mastodon signs outgoing
# queries incorrectly by default.
Expand All @@ -54,10 +69,14 @@ def fetch_collection(collection_or_uri)
end

def filtered_replies
# Only fetch replies to the same server as the original status to avoid
# amplification attacks.
if @filter_by_host
# Only fetch replies to the same server as the original status to avoid
# amplification attacks.

# Also limit to 5 fetched replies to limit potential for DoS.
@items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(5)
# Also limit to 5 fetched replies to limit potential for DoS.
@items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES)
else
@items.map { |item| value_or_id(item) }.take(MAX_REPLIES)
end
end
end
74 changes: 74 additions & 0 deletions app/workers/activitypub/fetch_all_replies_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# frozen_string_literal: true

# Fetch all replies to a status, querying recursively through
# ActivityPub replies collections, fetching any statuses that
# we either don't already have or we haven't checked for new replies
# in the Status::FETCH_REPLIES_DEBOUNCE interval
class ActivityPub::FetchAllRepliesWorker
include Sidekiq::Worker
include ExponentialBackoff
include JsonLdHelper

sidekiq_options queue: 'pull', retry: 3

# Global max replies to fetch per request (all replies, recursively)
MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_GLOBAL'] || 1000).to_i

def perform(parent_status_id, options = {})
@parent_status = Status.find(parent_status_id)
Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" }

uris_to_fetch = get_replies(@parent_status.uri, options)
return if uris_to_fetch.nil?

@parent_status.touch(:fetched_replies_at)

fetched_uris = uris_to_fetch.clone.to_set

until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES
next_reply = uris_to_fetch.pop
next if next_reply.nil?

new_reply_uris = get_replies(next_reply, options)
next if new_reply_uris.nil?

new_reply_uris = new_reply_uris.reject { |uri| fetched_uris.include?(uri) }

uris_to_fetch.concat(new_reply_uris)
fetched_uris = fetched_uris.merge(new_reply_uris)
end

Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" }
fetched_uris
end

private

def get_replies(status_uri, options = {})
replies_collection_or_uri = get_replies_uri(status_uri)
return if replies_collection_or_uri.nil?

ActivityPub::FetchAllRepliesService.new.call(replies_collection_or_uri, **options.deep_symbolize_keys)
end

def get_replies_uri(parent_status_uri)
begin
json_status = fetch_resource(parent_status_uri, true)
if json_status.nil?
Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: error getting replies URI for #{parent_status_uri}, returned nil" }
nil
elsif !json_status.key?('replies')
Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: no replies collection found in ActivityPub object: #{json_status}" }
nil
else
json_status['replies']
end
rescue => e
Rails.logger.warn { "FetchAllRepliesWorker - #{@parent_status.uri}: caught exception fetching replies URI: #{e}" }
# Raise if we can't get the collection for top-level status to trigger retry
raise e if parent_status_uri == @parent_status.uri

nil
end
end
end
7 changes: 7 additions & 0 deletions db/migrate/20240918233930_add_fetched_replies_at_to_status.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1]
def change
add_column :statuses, :fetched_replies_at, :datetime, null: true
end
end
25 changes: 13 additions & 12 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -553,12 +553,12 @@
end

create_table "ip_blocks", force: :cascade do |t|
t.datetime "created_at", precision: nil, null: false
t.datetime "updated_at", precision: nil, null: false
t.datetime "expires_at", precision: nil
t.inet "ip", default: "0.0.0.0", null: false
t.integer "severity", default: 0, null: false
t.datetime "expires_at", precision: nil
t.text "comment", default: "", null: false
t.datetime "created_at", precision: nil, null: false
t.datetime "updated_at", precision: nil, null: false
t.index ["ip"], name: "index_ip_blocks_on_ip", unique: true
end

Expand Down Expand Up @@ -1052,6 +1052,7 @@
t.datetime "edited_at", precision: nil
t.boolean "trendable"
t.bigint "ordered_media_attachment_ids", array: true
t.datetime "fetched_replies_at"
t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)"
t.index ["account_id"], name: "index_statuses_on_account_id"
t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)"
Expand Down Expand Up @@ -1380,9 +1381,9 @@
add_index "instances", ["domain"], name: "index_instances_on_domain", unique: true

create_view "user_ips", sql_definition: <<-SQL
SELECT user_id,
ip,
max(used_at) AS used_at
SELECT t0.user_id,
t0.ip,
max(t0.used_at) AS used_at
FROM ( SELECT users.id AS user_id,
users.sign_up_ip AS ip,
users.created_at AS used_at
Expand All @@ -1399,7 +1400,7 @@
login_activities.created_at
FROM login_activities
WHERE (login_activities.success = true)) t0
GROUP BY user_id, ip;
GROUP BY t0.user_id, t0.ip;
SQL
create_view "account_summaries", materialized: true, sql_definition: <<-SQL
SELECT accounts.id AS account_id,
Expand All @@ -1420,9 +1421,9 @@
add_index "account_summaries", ["account_id"], name: "index_account_summaries_on_account_id", unique: true

create_view "global_follow_recommendations", materialized: true, sql_definition: <<-SQL
SELECT account_id,
sum(rank) AS rank,
array_agg(reason) AS reason
SELECT t0.account_id,
sum(t0.rank) AS rank,
array_agg(t0.reason) AS reason
FROM ( SELECT account_summaries.account_id,
((count(follows.id))::numeric / (1.0 + (count(follows.id))::numeric)) AS rank,
'most_followed'::text AS reason
Expand All @@ -1446,8 +1447,8 @@
WHERE (follow_recommendation_suppressions.account_id = statuses.account_id)))))
GROUP BY account_summaries.account_id
HAVING (sum((status_stats.reblogs_count + status_stats.favourites_count)) >= (5)::numeric)) t0
GROUP BY account_id
ORDER BY (sum(rank)) DESC;
GROUP BY t0.account_id
ORDER BY (sum(t0.rank)) DESC;
SQL
add_index "global_follow_recommendations", ["account_id"], name: "index_global_follow_recommendations_on_account_id", unique: true

Expand Down
Loading

0 comments on commit 00d9cb9

Please sign in to comment.