Fix unbounded recursion in post discovery (#23507)

* Add a limit to how many posts can get fetched as a result of a single request

* Add tests

* Always pass `request_id` when processing `Announce` activities

---------

Co-authored-by: nametoolong <nametoolong@users.noreply.github.com>
pull/24332/head
Claire 2023-02-10 22:16:47 +01:00 committed by GitHub
parent be1caad933
commit a8a3e86216
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 126 additions and 22 deletions

View File

@ -106,7 +106,8 @@ class ActivityPub::Activity
actor_id = value_or_id(first_of_value(@object['attributedTo'])) actor_id = value_or_id(first_of_value(@object['attributedTo']))
if actor_id == @account.uri if actor_id == @account.uri
return ActivityPub::Activity.factory({ 'type' => 'Create', 'actor' => actor_id, 'object' => @object }, @account).perform virtual_object = { 'type' => 'Create', 'actor' => actor_id, 'object' => @object }
return ActivityPub::Activity.factory(virtual_object, @account, request_id: @options[:request_id]).perform
end end
end end
@ -152,9 +153,9 @@ class ActivityPub::Activity
def fetch_remote_original_status def fetch_remote_original_status
if object_uri.start_with?('http') if object_uri.start_with?('http')
return if ActivityPub::TagManager.instance.local_uri?(object_uri) return if ActivityPub::TagManager.instance.local_uri?(object_uri)
ActivityPub::FetchRemoteStatusService.new.call(object_uri, id: true, on_behalf_of: @account.followers.local.first) ActivityPub::FetchRemoteStatusService.new.call(object_uri, id: true, on_behalf_of: @account.followers.local.first, request_id: @options[:request_id])
elsif @object['url'].present? elsif @object['url'].present?
::FetchRemoteStatusService.new.call(@object['url']) ::FetchRemoteStatusService.new.call(@object['url'], request_id: @options[:request_id])
end end
end end

View File

@ -327,18 +327,18 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def resolve_thread(status) def resolve_thread(status)
return unless status.reply? && status.thread.nil? && Request.valid_url?(in_reply_to_uri) return unless status.reply? && status.thread.nil? && Request.valid_url?(in_reply_to_uri)
ThreadResolveWorker.perform_async(status.id, in_reply_to_uri) ThreadResolveWorker.perform_async(status.id, in_reply_to_uri, { 'request_id' => @options[:request_id]})
end end
def fetch_replies(status) def fetch_replies(status)
collection = @object['replies'] collection = @object['replies']
return if collection.nil? return if collection.nil?
replies = ActivityPub::FetchRepliesService.new.call(status, collection, false) replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id])
return unless replies.nil? return unless replies.nil?
uri = value_or_id(collection) uri = value_or_id(collection)
ActivityPub::FetchRepliesWorker.perform_async(status.id, uri) unless uri.nil? ActivityPub::FetchRepliesWorker.perform_async(status.id, uri, { 'request_id' => @options[:request_id]}) unless uri.nil?
end end
def conversation_from_uri(uri) def conversation_from_uri(uri)

View File

@ -2,10 +2,13 @@
class ActivityPub::FetchRemoteStatusService < BaseService class ActivityPub::FetchRemoteStatusService < BaseService
include JsonLdHelper include JsonLdHelper
include Redisable
DISCOVERIES_PER_REQUEST = 1000
# Should be called when uri has already been checked for locality # Should be called when uri has already been checked for locality
def call(uri, id: true, prefetched_body: nil, on_behalf_of: nil, expected_actor_uri: nil, request_id: nil) def call(uri, id: true, prefetched_body: nil, on_behalf_of: nil, expected_actor_uri: nil, request_id: nil)
@request_id = request_id @request_id = request_id || "#{Time.now.utc.to_i}-status-#{uri}"
@json = begin @json = begin
if prefetched_body.nil? if prefetched_body.nil?
fetch_resource(uri, id, on_behalf_of) fetch_resource(uri, id, on_behalf_of)
@ -42,7 +45,13 @@ class ActivityPub::FetchRemoteStatusService < BaseService
# activity as an update rather than create # activity as an update rather than create
activity_json['type'] = 'Update' if equals_or_includes_any?(activity_json['type'], %w(Create)) && Status.where(uri: object_uri, account_id: actor.id).exists? activity_json['type'] = 'Update' if equals_or_includes_any?(activity_json['type'], %w(Create)) && Status.where(uri: object_uri, account_id: actor.id).exists?
ActivityPub::Activity.factory(activity_json, actor, request_id: request_id).perform with_redis do |redis|
discoveries = redis.incr("status_discovery_per_request:#{@request_id}")
redis.expire("status_discovery_per_request:#{@request_id}", 5.minutes.seconds)
return nil if discoveries > DISCOVERIES_PER_REQUEST
end
ActivityPub::Activity.factory(activity_json, actor, request_id: @request_id).perform
end end
private private

View File

@ -3,14 +3,14 @@
class ActivityPub::FetchRepliesService < BaseService class ActivityPub::FetchRepliesService < BaseService
include JsonLdHelper include JsonLdHelper
def call(parent_status, collection_or_uri, allow_synchronous_requests = true) def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil)
@account = parent_status.account @account = parent_status.account
@allow_synchronous_requests = allow_synchronous_requests @allow_synchronous_requests = allow_synchronous_requests
@items = collection_items(collection_or_uri) @items = collection_items(collection_or_uri)
return if @items.nil? return if @items.nil?
FetchReplyWorker.push_bulk(filtered_replies) FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id}] }
@items @items
end end

View File

@ -1,7 +1,7 @@
# frozen_string_literal: true # frozen_string_literal: true
class FetchRemoteStatusService < BaseService class FetchRemoteStatusService < BaseService
def call(url, prefetched_body = nil) def call(url, prefetched_body: nil, request_id: nil)
if prefetched_body.nil? if prefetched_body.nil?
resource_url, resource_options = FetchResourceService.new.call(url) resource_url, resource_options = FetchResourceService.new.call(url)
else else
@ -9,6 +9,6 @@ class FetchRemoteStatusService < BaseService
resource_options = { prefetched_body: prefetched_body } resource_options = { prefetched_body: prefetched_body }
end end
ActivityPub::FetchRemoteStatusService.new.call(resource_url, **resource_options) unless resource_url.nil? ActivityPub::FetchRemoteStatusService.new.call(resource_url, **resource_options.merge(request_id: request_id)) unless resource_url.nil?
end end
end end

View File

@ -23,7 +23,7 @@ class ResolveURLService < BaseService
if equals_or_includes_any?(type, ActivityPub::FetchRemoteAccountService::SUPPORTED_TYPES) if equals_or_includes_any?(type, ActivityPub::FetchRemoteAccountService::SUPPORTED_TYPES)
ActivityPub::FetchRemoteAccountService.new.call(resource_url, prefetched_body: body) ActivityPub::FetchRemoteAccountService.new.call(resource_url, prefetched_body: body)
elsif equals_or_includes_any?(type, ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES) elsif equals_or_includes_any?(type, ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES)
status = FetchRemoteStatusService.new.call(resource_url, body) status = FetchRemoteStatusService.new.call(resource_url, prefetched_body: body)
authorize_with @on_behalf_of, status, :show? unless status.nil? authorize_with @on_behalf_of, status, :show? unless status.nil?
status status
end end

View File

@ -6,8 +6,8 @@ class ActivityPub::FetchRepliesWorker
sidekiq_options queue: 'pull', retry: 3 sidekiq_options queue: 'pull', retry: 3
def perform(parent_status_id, replies_uri) def perform(parent_status_id, replies_uri, options = {})
ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri) ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys)
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
true true
end end

View File

@ -6,7 +6,7 @@ class FetchReplyWorker
sidekiq_options queue: 'pull', retry: 3 sidekiq_options queue: 'pull', retry: 3
def perform(child_url) def perform(child_url, options = {})
FetchRemoteStatusService.new.call(child_url) FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys)
end end
end end

View File

@ -6,9 +6,9 @@ class ThreadResolveWorker
sidekiq_options queue: 'pull', retry: 3 sidekiq_options queue: 'pull', retry: 3
def perform(child_status_id, parent_url) def perform(child_status_id, parent_url, options = {})
child_status = Status.find(child_status_id) child_status = Status.find(child_status_id)
parent_status = FetchRemoteStatusService.new.call(parent_url) parent_status = FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
return if parent_status.nil? return if parent_status.nil?

View File

@ -48,7 +48,7 @@ RSpec.describe ActivityPub::Activity::Add do
end end
it 'fetches the status and pins it' do it 'fetches the status and pins it' do
allow(service_stub).to receive(:call) do |uri, id: true, on_behalf_of: nil| allow(service_stub).to receive(:call) do |uri, id: true, on_behalf_of: nil, request_id: nil|
expect(uri).to eq 'https://example.com/unknown' expect(uri).to eq 'https://example.com/unknown'
expect(id).to eq true expect(id).to eq true
expect(on_behalf_of&.following?(sender)).to eq true expect(on_behalf_of&.following?(sender)).to eq true
@ -62,7 +62,7 @@ RSpec.describe ActivityPub::Activity::Add do
context 'when there is no local follower' do context 'when there is no local follower' do
it 'tries to fetch the status' do it 'tries to fetch the status' do
allow(service_stub).to receive(:call) do |uri, id: true, on_behalf_of: nil| allow(service_stub).to receive(:call) do |uri, id: true, on_behalf_of: nil, request_id: nil|
expect(uri).to eq 'https://example.com/unknown' expect(uri).to eq 'https://example.com/unknown'
expect(id).to eq true expect(id).to eq true
expect(on_behalf_of).to eq nil expect(on_behalf_of).to eq nil

View File

@ -223,4 +223,98 @@ RSpec.describe ActivityPub::FetchRemoteStatusService, type: :service do
end end
end end
end end
context 'statuses referencing other statuses' do
before do
stub_const 'ActivityPub::FetchRemoteStatusService::DISCOVERIES_PER_REQUEST', 5
end
context 'using inReplyTo' do
let(:object) do
{
'@context': 'https://www.w3.org/ns/activitystreams',
id: "https://foo.bar/@foo/1",
type: 'Note',
content: 'Lorem ipsum',
inReplyTo: 'https://foo.bar/@foo/2',
attributedTo: ActivityPub::TagManager.instance.uri_for(sender),
}
end
before do
8.times do |i|
status_json = {
'@context': 'https://www.w3.org/ns/activitystreams',
id: "https://foo.bar/@foo/#{i}",
type: 'Note',
content: 'Lorem ipsum',
inReplyTo: "https://foo.bar/@foo/#{i + 1}",
attributedTo: ActivityPub::TagManager.instance.uri_for(sender),
to: 'as:Public',
}.with_indifferent_access
stub_request(:get, "https://foo.bar/@foo/#{i}").to_return(status: 200, body: status_json.to_json, headers: { 'Content-Type': 'application/activity+json' })
end
end
it 'creates at least some statuses' do
expect { subject.call(object[:id], prefetched_body: Oj.dump(object)) }.to change { sender.statuses.count }.by_at_least(2)
end
it 'creates no more account than the limit allows' do
expect { subject.call(object[:id], prefetched_body: Oj.dump(object)) }.to change { sender.statuses.count }.by_at_most(5)
end
end
context 'using replies' do
let(:object) do
{
'@context': 'https://www.w3.org/ns/activitystreams',
id: "https://foo.bar/@foo/1",
type: 'Note',
content: 'Lorem ipsum',
replies: {
type: 'Collection',
id: 'https://foo.bar/@foo/1/replies',
first: {
type: 'CollectionPage',
partOf: 'https://foo.bar/@foo/1/replies',
items: ['https://foo.bar/@foo/2'],
},
},
attributedTo: ActivityPub::TagManager.instance.uri_for(sender),
}
end
before do
8.times do |i|
status_json = {
'@context': 'https://www.w3.org/ns/activitystreams',
id: "https://foo.bar/@foo/#{i}",
type: 'Note',
content: 'Lorem ipsum',
replies: {
type: 'Collection',
id: "https://foo.bar/@foo/#{i}/replies",
first: {
type: 'CollectionPage',
partOf: "https://foo.bar/@foo/#{i}/replies",
items: ["https://foo.bar/@foo/#{i+1}"],
},
},
attributedTo: ActivityPub::TagManager.instance.uri_for(sender),
to: 'as:Public',
}.with_indifferent_access
stub_request(:get, "https://foo.bar/@foo/#{i}").to_return(status: 200, body: status_json.to_json, headers: { 'Content-Type': 'application/activity+json' })
end
end
it 'creates at least some statuses' do
expect { subject.call(object[:id], prefetched_body: Oj.dump(object)) }.to change { sender.statuses.count }.by_at_least(2)
end
it 'creates no more account than the limit allows' do
expect { subject.call(object[:id], prefetched_body: Oj.dump(object)) }.to change { sender.statuses.count }.by_at_most(5)
end
end
end
end end

View File

@ -15,7 +15,7 @@ RSpec.describe FetchRemoteStatusService, type: :service do
end end
context 'protocol is :activitypub' do context 'protocol is :activitypub' do
subject { described_class.new.call(note[:id], prefetched_body) } subject { described_class.new.call(note[:id], prefetched_body: prefetched_body) }
let(:prefetched_body) { Oj.dump(note) } let(:prefetched_body) { Oj.dump(note) }
before do before do