Split SalmonWorker into smaller parts, move profile updating into another job
parent
afc48578a4
commit
cd68e54a7d
|
@ -45,13 +45,13 @@ class FollowRemoteAccountService < BaseService
|
||||||
account.suspended = true if domain_block && domain_block.suspend?
|
account.suspended = true if domain_block && domain_block.suspend?
|
||||||
account.silenced = true if domain_block && domain_block.silence?
|
account.silenced = true if domain_block && domain_block.silence?
|
||||||
|
|
||||||
xml = get_feed(account.remote_url)
|
body, xml = get_feed(account.remote_url)
|
||||||
hubs = get_hubs(xml)
|
hubs = get_hubs(xml)
|
||||||
|
|
||||||
account.uri = get_account_uri(xml)
|
account.uri = get_account_uri(xml)
|
||||||
account.hub_url = hubs.first.attribute('href').value
|
account.hub_url = hubs.first.attribute('href').value
|
||||||
|
|
||||||
get_profile(xml, account)
|
get_profile(body, account)
|
||||||
account.save!
|
account.save!
|
||||||
|
|
||||||
account
|
account
|
||||||
|
@ -61,7 +61,7 @@ class FollowRemoteAccountService < BaseService
|
||||||
|
|
||||||
def get_feed(url)
|
def get_feed(url)
|
||||||
response = http_client.get(Addressable::URI.parse(url))
|
response = http_client.get(Addressable::URI.parse(url))
|
||||||
Nokogiri::XML(response)
|
[response.to_s, Nokogiri::XML(response)]
|
||||||
end
|
end
|
||||||
|
|
||||||
def get_hubs(xml)
|
def get_hubs(xml)
|
||||||
|
@ -82,12 +82,8 @@ class FollowRemoteAccountService < BaseService
|
||||||
author_uri.content
|
author_uri.content
|
||||||
end
|
end
|
||||||
|
|
||||||
def get_profile(xml, account)
|
def get_profile(body, account)
|
||||||
update_remote_profile_service.call(xml.at_xpath('/xmlns:feed'), account)
|
RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), false)
|
||||||
end
|
|
||||||
|
|
||||||
def update_remote_profile_service
|
|
||||||
@update_remote_profile_service ||= UpdateRemoteProfileService.new
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def http_client
|
def http_client
|
||||||
|
|
|
@ -5,15 +5,15 @@ class ProcessFeedService < BaseService
|
||||||
xml = Nokogiri::XML(body)
|
xml = Nokogiri::XML(body)
|
||||||
xml.encoding = 'utf-8'
|
xml.encoding = 'utf-8'
|
||||||
|
|
||||||
update_author(xml, account)
|
update_author(body, xml, account)
|
||||||
process_entries(xml, account)
|
process_entries(xml, account)
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def update_author(xml, account)
|
def update_author(body, xml, account)
|
||||||
return if xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS).nil?
|
return if xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS).nil?
|
||||||
UpdateRemoteProfileService.new.call(xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS), account, true)
|
RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true)
|
||||||
end
|
end
|
||||||
|
|
||||||
def process_entries(xml, account)
|
def process_entries(xml, account)
|
||||||
|
|
|
@ -24,7 +24,7 @@ class ProcessInteractionService < BaseService
|
||||||
return if account.suspended?
|
return if account.suspended?
|
||||||
|
|
||||||
if salmon.verify(envelope, account.keypair)
|
if salmon.verify(envelope, account.keypair)
|
||||||
update_remote_profile_service.call(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS), account, true)
|
RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true)
|
||||||
|
|
||||||
case verb(xml)
|
case verb(xml)
|
||||||
when :follow
|
when :follow
|
||||||
|
@ -114,7 +114,7 @@ class ProcessInteractionService < BaseService
|
||||||
|
|
||||||
return if status.nil?
|
return if status.nil?
|
||||||
|
|
||||||
remove_status_service.call(status) if account.id == status.account_id
|
RemovalWorker.perform_async(status.id) if account.id == status.account_id
|
||||||
end
|
end
|
||||||
|
|
||||||
def favourite!(xml, from_account)
|
def favourite!(xml, from_account)
|
||||||
|
@ -130,7 +130,7 @@ class ProcessInteractionService < BaseService
|
||||||
end
|
end
|
||||||
|
|
||||||
def add_post!(body, account)
|
def add_post!(body, account)
|
||||||
process_feed_service.call(body, account)
|
ProcessingWorker.perform_async(account.id, body.force_encoding('UTF-8'))
|
||||||
end
|
end
|
||||||
|
|
||||||
def status(xml)
|
def status(xml)
|
||||||
|
@ -153,10 +153,6 @@ class ProcessInteractionService < BaseService
|
||||||
@process_feed_service ||= ProcessFeedService.new
|
@process_feed_service ||= ProcessFeedService.new
|
||||||
end
|
end
|
||||||
|
|
||||||
def update_remote_profile_service
|
|
||||||
@update_remote_profile_service ||= UpdateRemoteProfileService.new
|
|
||||||
end
|
|
||||||
|
|
||||||
def remove_status_service
|
def remove_status_service
|
||||||
@remove_status_service ||= RemoveStatusService.new
|
@remove_status_service ||= RemoveStatusService.new
|
||||||
end
|
end
|
||||||
|
|
|
@ -3,6 +3,8 @@
|
||||||
class Admin::SuspensionWorker
|
class Admin::SuspensionWorker
|
||||||
include Sidekiq::Worker
|
include Sidekiq::Worker
|
||||||
|
|
||||||
|
sidekiq_options queue: 'pull'
|
||||||
|
|
||||||
def perform(account_id)
|
def perform(account_id)
|
||||||
SuspendAccountService.new.call(Account.find(account_id))
|
SuspendAccountService.new.call(Account.find(account_id))
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
class ApplicationWorker
|
class ApplicationWorker
|
||||||
def info(message)
|
def info(message)
|
||||||
Rails.logger.info("#{self.class.name} - #{message}")
|
Rails.logger.info("#{self.class.name} - #{message}")
|
||||||
|
|
|
@ -4,10 +4,7 @@ class DistributionWorker < ApplicationWorker
|
||||||
include Sidekiq::Worker
|
include Sidekiq::Worker
|
||||||
|
|
||||||
def perform(status_id)
|
def perform(status_id)
|
||||||
status = Status.find(status_id)
|
FanOutOnWriteService.new.call(Status.find(status_id))
|
||||||
|
|
||||||
FanOutOnWriteService.new.call(status)
|
|
||||||
WarmCacheService.new.call(status)
|
|
||||||
rescue ActiveRecord::RecordNotFound
|
rescue ActiveRecord::RecordNotFound
|
||||||
info("Couldn't find the status")
|
info("Couldn't find the status")
|
||||||
end
|
end
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class RemoteProfileUpdateWorker
|
||||||
|
include Sidekiq::Worker
|
||||||
|
|
||||||
|
sidekiq_options queue: 'pull'
|
||||||
|
|
||||||
|
def perform(account_id, body, resubscribe)
|
||||||
|
account = Account.find(account_id)
|
||||||
|
|
||||||
|
xml = Nokogiri::XML(body)
|
||||||
|
xml.encoding = 'utf-8'
|
||||||
|
|
||||||
|
author_container = xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS) || xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS)
|
||||||
|
|
||||||
|
UpdateRemoteProfileService.new.call(author_container, account, resubscribe)
|
||||||
|
rescue ActiveRecord::RecordNotFound
|
||||||
|
true
|
||||||
|
end
|
||||||
|
end
|
|
@ -7,7 +7,7 @@ class SalmonWorker
|
||||||
|
|
||||||
def perform(account_id, body)
|
def perform(account_id, body)
|
||||||
ProcessInteractionService.new.call(body, Account.find(account_id))
|
ProcessInteractionService.new.call(body, Account.find(account_id))
|
||||||
rescue ActiveRecord::RecordNotFound
|
rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -16,6 +16,7 @@ RSpec.describe ProcessFeedService do
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'updates remote user\'s account information' do
|
it 'updates remote user\'s account information' do
|
||||||
|
account.reload
|
||||||
expect(account.display_name).to eq '::1'
|
expect(account.display_name).to eq '::1'
|
||||||
expect(account).to have_attached_file(:avatar)
|
expect(account).to have_attached_file(:avatar)
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue