From 966b8163829c367bf301fb1391a00dd75a539992 Mon Sep 17 00:00:00 2001 From: Claire Date: Wed, 12 Mar 2025 12:52:38 +0100 Subject: [PATCH] Refactor `ActivityPub::FetchRepliesService` and `ActivityPub::FetchAllRepliesService` (#34149) --- app/lib/activitypub/activity/create.rb | 2 +- .../activitypub/fetch_all_replies_service.rb | 22 +----- .../activitypub/fetch_replies_service.rb | 60 +++++++-------- .../activitypub/fetch_all_replies_worker.rb | 2 +- .../activitypub/fetch_replies_worker.rb | 2 +- .../fetch_all_replies_service_spec.rb | 74 ++++++++++++++----- .../activitypub/fetch_replies_service_spec.rb | 14 ++-- 7 files changed, 95 insertions(+), 81 deletions(-) diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb index a756912592..f54e64ad7e 100644 --- a/app/lib/activitypub/activity/create.rb +++ b/app/lib/activitypub/activity/create.rb @@ -338,7 +338,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity collection = @object['replies'] return if collection.blank? - replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id]) + replies = ActivityPub::FetchRepliesService.new.call(status.account.uri, collection, allow_synchronous_requests: false, request_id: @options[:request_id]) return unless replies.nil? uri = value_or_id(collection) diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 9f92b7efee..765e5c8ae8 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -6,25 +6,15 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService # Limit of replies to fetch per status MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i - def call(collection_or_uri, status_uri, max_pages = nil, request_id: nil) - @allow_synchronous_requests = true - @collection_or_uri = collection_or_uri + def call(status_uri, collection_or_uri, max_pages: 1, request_id: nil) @status_uri = status_uri - @items, n_pages = collection_items(collection_or_uri, max_pages) - @items = filtered_replies - return if @items.nil? - - FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } - - [@items, n_pages] + super end private - def filtered_replies - return if @items.nil? - + def filter_replies(items) # Find all statuses that we *shouldn't* update the replies for, and use that as a filter. # We don't assume that we have the statuses before they're created, # hence the negative filter - @@ -34,7 +24,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService # # Typically we assume the number of replies we *shouldn't* fetch is smaller than the # replies we *should* fetch, so we also minimize the number of uris we should load here. - uris = @items.map { |item| value_or_id(item) } + uris = items.map { |item| value_or_id(item) } # Expand collection to get replies in the DB that were # - not included in the collection, @@ -61,8 +51,4 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService Rails.logger.debug { "FetchAllRepliesService - #{@collection_or_uri}: Fetching filtered statuses: #{uris}" } uris end - - def filter_by_host? - false - end end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 72b9c0f5a6..f2e4f45104 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -6,53 +6,56 @@ class ActivityPub::FetchRepliesService < BaseService # Limit of fetched replies MAX_REPLIES = 5 - def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil) - @account = parent_status.account + def call(reference_uri, collection_or_uri, max_pages: 1, allow_synchronous_requests: true, request_id: nil) + @reference_uri = reference_uri @allow_synchronous_requests = allow_synchronous_requests - @items, = collection_items(collection_or_uri) + @items, n_pages = collection_items(collection_or_uri, max_pages: max_pages) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + @items = filter_replies(@items) + FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } - @items + [@items, n_pages] end private - def collection_items(collection_or_uri, max_pages = nil) + def collection_items(collection_or_uri, max_pages: 1) collection = fetch_collection(collection_or_uri) return unless collection.is_a?(Hash) collection = fetch_collection(collection['first']) if collection['first'].present? return unless collection.is_a?(Hash) - all_items = [] + items = [] n_pages = 1 while collection.is_a?(Hash) - items = case collection['type'] - when 'Collection', 'CollectionPage' - collection['items'] - when 'OrderedCollection', 'OrderedCollectionPage' - collection['orderedItems'] - end + items.concat(as_array(collection_page_items(collection))) - all_items.concat(as_array(items)) - - break if all_items.size >= MAX_REPLIES - break if !max_pages.nil? && n_pages >= max_pages + break if items.size >= MAX_REPLIES + break if n_pages >= max_pages collection = collection['next'].present? ? fetch_collection(collection['next']) : nil n_pages += 1 end - [all_items, n_pages] + [items, n_pages] + end + + def collection_page_items(collection) + case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end end def fetch_collection(collection_or_uri) return collection_or_uri if collection_or_uri.is_a?(Hash) return unless @allow_synchronous_requests - return if filter_by_host? && non_matching_uri_hosts?(@account.uri, collection_or_uri) + return if non_matching_uri_hosts?(@reference_uri, collection_or_uri) # NOTE: For backward compatibility reasons, Mastodon signs outgoing # queries incorrectly by default. @@ -70,20 +73,11 @@ class ActivityPub::FetchRepliesService < BaseService end end - def filtered_replies - if filter_by_host? - # Only fetch replies to the same server as the original status to avoid - # amplification attacks. + def filter_replies(items) + # Only fetch replies to the same server as the original status to avoid + # amplification attacks. - # Also limit to 5 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES) - else - @items.map { |item| value_or_id(item) }.take(MAX_REPLIES) - end - end - - # Whether replies with a different domain than the replied_to post should be rejected - def filter_by_host? - true + # Also limit to 5 fetched replies to limit potential for DoS. + items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@reference_uri, uri) }.take(MAX_REPLIES) end end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index 87eac321fa..e31ff17c23 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -51,7 +51,7 @@ class ActivityPub::FetchAllRepliesWorker replies_collection_or_uri = get_replies_uri(status_uri) return if replies_collection_or_uri.nil? - ActivityPub::FetchAllRepliesService.new.call(replies_collection_or_uri, status_uri, max_pages, **options.deep_symbolize_keys) + ActivityPub::FetchAllRepliesService.new.call(status_uri, replies_collection_or_uri, max_pages: max_pages, **options.deep_symbolize_keys) end def get_replies_uri(parent_status_uri) diff --git a/app/workers/activitypub/fetch_replies_worker.rb b/app/workers/activitypub/fetch_replies_worker.rb index d72bad7452..f9b3dbf171 100644 --- a/app/workers/activitypub/fetch_replies_worker.rb +++ b/app/workers/activitypub/fetch_replies_worker.rb @@ -7,7 +7,7 @@ class ActivityPub::FetchRepliesWorker sidekiq_options queue: 'pull', retry: 3 def perform(parent_status_id, replies_uri, options = {}) - ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys) + ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id).account.uri, replies_uri, **options.deep_symbolize_keys) rescue ActiveRecord::RecordNotFound true end diff --git a/spec/services/activitypub/fetch_all_replies_service_spec.rb b/spec/services/activitypub/fetch_all_replies_service_spec.rb index eadd5b10fa..241c1a8464 100644 --- a/spec/services/activitypub/fetch_all_replies_service_spec.rb +++ b/spec/services/activitypub/fetch_all_replies_service_spec.rb @@ -10,17 +10,17 @@ RSpec.describe ActivityPub::FetchAllRepliesService do let(:collection_uri) { 'http://example.com/replies/1' } let(:items) do - [ - 'http://example.com/self-reply-1', - 'http://example.com/self-reply-2', - 'http://example.com/self-reply-3', - 'http://other.com/other-reply-1', - 'http://other.com/other-reply-2', - 'http://other.com/other-reply-3', - 'http://example.com/self-reply-4', - 'http://example.com/self-reply-5', - 'http://example.com/self-reply-6', - ] + %w( + http://example.com/self-reply-1 + http://example.com/self-reply-2 + http://example.com/self-reply-3 + http://other.com/other-reply-1 + http://other.com/other-reply-2 + http://other.com/other-reply-3 + http://example.com/self-reply-4 + http://example.com/self-reply-5 + http://example.com/self-reply-6 + ) end let(:payload) do @@ -36,10 +36,21 @@ RSpec.describe ActivityPub::FetchAllRepliesService do it 'fetches more than the default maximum and from multiple domains' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(payload, status.uri) + subject.call(status.uri, payload) - expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-2 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 - http://example.com/self-reply-5 http://example.com/self-reply-6)) + expect(FetchReplyWorker).to have_received(:push_bulk).with( + %w( + http://example.com/self-reply-1 + http://example.com/self-reply-2 + http://example.com/self-reply-3 + http://other.com/other-reply-1 + http://other.com/other-reply-2 + http://other.com/other-reply-3 + http://example.com/self-reply-4 + http://example.com/self-reply-5 + http://example.com/self-reply-6 + ) + ) end context 'with a recent status' do @@ -50,9 +61,20 @@ RSpec.describe ActivityPub::FetchAllRepliesService do it 'skips statuses that have been updated recently' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(payload, status.uri) + subject.call(status.uri, payload) - expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 http://example.com/self-reply-5 http://example.com/self-reply-6)) + expect(FetchReplyWorker).to have_received(:push_bulk).with( + %w( + http://example.com/self-reply-1 + http://example.com/self-reply-3 + http://other.com/other-reply-1 + http://other.com/other-reply-2 + http://other.com/other-reply-3 + http://example.com/self-reply-4 + http://example.com/self-reply-5 + http://example.com/self-reply-6 + ) + ) end end @@ -64,7 +86,7 @@ RSpec.describe ActivityPub::FetchAllRepliesService do it 'updates the time that fetched statuses were last fetched' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(payload, status.uri) + subject.call(status.uri, payload) expect(Status.find_by(uri: 'http://other.com/other-reply-1').fetched_replies_at).to be >= 1.minute.ago end @@ -80,10 +102,22 @@ RSpec.describe ActivityPub::FetchAllRepliesService do it 'updates the unsubscribed replies' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(payload, status.uri) + subject.call(status.uri, payload) - expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-2 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 - http://example.com/self-reply-5 http://example.com/self-reply-6 http://other.com/account/unsubscribed)) + expect(FetchReplyWorker).to have_received(:push_bulk).with( + %w( + http://example.com/self-reply-1 + http://example.com/self-reply-2 + http://example.com/self-reply-3 + http://other.com/other-reply-1 + http://other.com/other-reply-2 + http://other.com/other-reply-3 + http://example.com/self-reply-4 + http://example.com/self-reply-5 + http://example.com/self-reply-6 + http://other.com/account/unsubscribed + ) + ) end end end diff --git a/spec/services/activitypub/fetch_replies_service_spec.rb b/spec/services/activitypub/fetch_replies_service_spec.rb index e7d8d3528a..36159309f1 100644 --- a/spec/services/activitypub/fetch_replies_service_spec.rb +++ b/spec/services/activitypub/fetch_replies_service_spec.rb @@ -40,7 +40,7 @@ RSpec.describe ActivityPub::FetchRepliesService do it 'queues the expected worker' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(status, payload) + subject.call(status.account.uri, payload) expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1']) end @@ -50,7 +50,7 @@ RSpec.describe ActivityPub::FetchRepliesService do it 'spawns workers for up to 5 replies on the same server' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(status, payload) + subject.call(status.account.uri, payload) expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) end @@ -64,7 +64,7 @@ RSpec.describe ActivityPub::FetchRepliesService do it 'spawns workers for up to 5 replies on the same server' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(status, collection_uri) + subject.call(status.account.uri, collection_uri) expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) end @@ -85,7 +85,7 @@ RSpec.describe ActivityPub::FetchRepliesService do it 'spawns workers for up to 5 replies on the same server' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(status, payload) + subject.call(status.account.uri, payload) expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) end @@ -99,7 +99,7 @@ RSpec.describe ActivityPub::FetchRepliesService do it 'spawns workers for up to 5 replies on the same server' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(status, collection_uri) + subject.call(status.account.uri, collection_uri) expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) end @@ -124,7 +124,7 @@ RSpec.describe ActivityPub::FetchRepliesService do it 'spawns workers for up to 5 replies on the same server' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(status, payload) + subject.call(status.account.uri, payload) expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) end @@ -138,7 +138,7 @@ RSpec.describe ActivityPub::FetchRepliesService do it 'spawns workers for up to 5 replies on the same server' do allow(FetchReplyWorker).to receive(:push_bulk) - subject.call(status, collection_uri) + subject.call(status.account.uri, collection_uri) expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5']) end