Refactor how Redis locks are created (#18400)

* Refactor how Redis locks are created

* Fix autorelease duration on account deletion lock
pull/18408/head
Eugen Rochko 2022-05-13 00:02:35 +02:00 committed by GitHub
parent 12535568f7
commit 6cf57c6765
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 112 additions and 179 deletions

View File

@ -4,6 +4,7 @@ class MediaProxyController < ApplicationController
include RoutingHelper include RoutingHelper
include Authorization include Authorization
include Redisable include Redisable
include Lockable
skip_before_action :store_current_location skip_before_action :store_current_location
skip_before_action :require_functional! skip_before_action :require_functional!
@ -16,14 +17,10 @@ class MediaProxyController < ApplicationController
rescue_from HTTP::TimeoutError, HTTP::ConnectionError, OpenSSL::SSL::SSLError, with: :internal_server_error rescue_from HTTP::TimeoutError, HTTP::ConnectionError, OpenSSL::SSL::SSLError, with: :internal_server_error
def show def show
RedisLock.acquire(lock_options) do |lock| with_lock("media_download:#{params[:id]}") do
if lock.acquired?
@media_attachment = MediaAttachment.remote.attached.find(params[:id]) @media_attachment = MediaAttachment.remote.attached.find(params[:id])
authorize @media_attachment.status, :show? authorize @media_attachment.status, :show?
redownload! if @media_attachment.needs_redownload? && !reject_media? redownload! if @media_attachment.needs_redownload? && !reject_media?
else
raise Mastodon::RaceConditionError
end
end end
redirect_to full_asset_url(@media_attachment.file.url(version)) redirect_to full_asset_url(@media_attachment.file.url(version))
@ -45,10 +42,6 @@ class MediaProxyController < ApplicationController
end end
end end
def lock_options
{ redis: redis, key: "media_download:#{params[:id]}", autorelease: 15.minutes.seconds }
end
def reject_media? def reject_media?
DomainBlock.reject_media?(@media_attachment.account.domain) DomainBlock.reject_media?(@media_attachment.account.domain)
end end

View File

@ -3,6 +3,7 @@
class Settings::ExportsController < Settings::BaseController class Settings::ExportsController < Settings::BaseController
include Authorization include Authorization
include Redisable include Redisable
include Lockable
skip_before_action :require_functional! skip_before_action :require_functional!
@ -14,21 +15,13 @@ class Settings::ExportsController < Settings::BaseController
def create def create
backup = nil backup = nil
RedisLock.acquire(lock_options) do |lock| with_lock("backup:#{current_user.id}") do
if lock.acquired?
authorize :backup, :create? authorize :backup, :create?
backup = current_user.backups.create! backup = current_user.backups.create!
else
raise Mastodon::RaceConditionError
end
end end
BackupWorker.perform_async(backup.id) BackupWorker.perform_async(backup.id)
redirect_to settings_export_path redirect_to settings_export_path
end end
def lock_options
{ redis: redis, key: "backup:#{current_user.id}" }
end
end end

View File

@ -3,6 +3,7 @@
class ActivityPub::Activity class ActivityPub::Activity
include JsonLdHelper include JsonLdHelper
include Redisable include Redisable
include Lockable
SUPPORTED_TYPES = %w(Note Question).freeze SUPPORTED_TYPES = %w(Note Question).freeze
CONVERTED_TYPES = %w(Image Audio Video Article Page Event).freeze CONVERTED_TYPES = %w(Image Audio Video Article Page Event).freeze
@ -157,22 +158,6 @@ class ActivityPub::Activity
end end
end end
def lock_or_return(key, expire_after = 2.hours.seconds)
yield if redis.set(key, true, nx: true, ex: expire_after)
ensure
redis.del(key)
end
def lock_or_fail(key, expire_after = 15.minutes.seconds)
RedisLock.acquire({ redis: redis, key: key, autorelease: expire_after }) do |lock|
if lock.acquired?
yield
else
raise Mastodon::RaceConditionError
end
end
end
def fetch? def fetch?
!@options[:delivery] !@options[:delivery]
end end

View File

@ -4,7 +4,7 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
def perform def perform
return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity? return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity?
lock_or_fail("announce:#{@object['id']}") do with_lock("announce:#{@object['id']}") do
original_status = status_from_object original_status = status_from_object
return reject_payload! if original_status.nil? || !announceable?(original_status) return reject_payload! if original_status.nil? || !announceable?(original_status)

View File

@ -47,7 +47,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def create_status def create_status
return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity? return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity?
lock_or_fail("create:#{object_uri}") do with_lock("create:#{object_uri}") do
return if delete_arrived_first?(object_uri) || poll_vote? return if delete_arrived_first?(object_uri) || poll_vote?
@status = find_existing_status @status = find_existing_status
@ -315,7 +315,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
poll = replied_to_status.preloadable_poll poll = replied_to_status.preloadable_poll
already_voted = true already_voted = true
lock_or_fail("vote:#{replied_to_status.poll_id}:#{@account.id}") do with_lock("vote:#{replied_to_status.poll_id}:#{@account.id}") do
already_voted = poll.votes.where(account: @account).exists? already_voted = poll.votes.where(account: @account).exists?
poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri) poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
end end

View File

@ -12,7 +12,7 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
private private
def delete_person def delete_person
lock_or_return("delete_in_progress:#{@account.id}") do with_lock("delete_in_progress:#{@account.id}", autorelease: 2.hours, raise_on_failure: false) do
DeleteAccountService.new.call(@account, reserve_username: false, skip_activitypub: true) DeleteAccountService.new.call(@account, reserve_username: false, skip_activitypub: true)
end end
end end
@ -20,14 +20,14 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
def delete_note def delete_note
return if object_uri.nil? return if object_uri.nil?
lock_or_return("delete_status_in_progress:#{object_uri}", 5.minutes.seconds) do with_lock("delete_status_in_progress:#{object_uri}", raise_on_failure: false) do
unless invalid_origin?(object_uri) unless invalid_origin?(object_uri)
# This lock ensures a concurrent `ActivityPub::Activity::Create` either # This lock ensures a concurrent `ActivityPub::Activity::Create` either
# does not create a status at all, or has finished saving it to the # does not create a status at all, or has finished saving it to the
# database before we try to load it. # database before we try to load it.
# Without the lock, `delete_later!` could be called after `delete_arrived_first?` # Without the lock, `delete_later!` could be called after `delete_arrived_first?`
# and `Status.find` before `Status.create!` # and `Status.find` before `Status.create!`
lock_or_fail("create:#{object_uri}") { delete_later!(object_uri) } with_lock("create:#{object_uri}") { delete_later!(object_uri) }
Tombstone.find_or_create_by(uri: object_uri, account: @account) Tombstone.find_or_create_by(uri: object_uri, account: @account)
end end

View File

@ -15,6 +15,7 @@
class AccountMigration < ApplicationRecord class AccountMigration < ApplicationRecord
include Redisable include Redisable
include Lockable
COOLDOWN_PERIOD = 30.days.freeze COOLDOWN_PERIOD = 30.days.freeze
@ -41,12 +42,8 @@ class AccountMigration < ApplicationRecord
return false unless errors.empty? return false unless errors.empty?
RedisLock.acquire(lock_options) do |lock| with_lock("account_migration:#{account.id}") do
if lock.acquired?
save save
else
raise Mastodon::RaceConditionError
end
end end
end end
@ -83,8 +80,4 @@ class AccountMigration < ApplicationRecord
def validate_migration_cooldown def validate_migration_cooldown
errors.add(:base, I18n.t('migrations.errors.on_cooldown')) if account.migrations.within_cooldown.exists? errors.add(:base, I18n.t('migrations.errors.on_cooldown')) if account.migrations.within_cooldown.exists?
end end
def lock_options
{ redis: redis, key: "account_migration:#{account.id}" }
end
end end

View File

@ -0,0 +1,19 @@
# frozen_string_literal: true
module Lockable
# @param [String] lock_name
# @param [ActiveSupport::Duration] autorelease Automatically release the lock after this time
# @param [Boolean] raise_on_failure Raise an error if a lock cannot be acquired, or fail silently
# @raise [Mastodon::RaceConditionError]
def with_lock(lock_name, autorelease: 15.minutes, raise_on_failure: true)
with_redis do |redis|
RedisLock.acquire(redis: redis, key: "lock:#{lock_name}", autorelease: autorelease.seconds) do |lock|
if lock.acquired?
yield
elsif raise_on_failure
raise Mastodon::RaceConditionError, "Could not acquire lock for #{lock_name}, try again later"
end
end
end
end
end

View File

@ -1,11 +1,11 @@
# frozen_string_literal: true # frozen_string_literal: true
module Redisable module Redisable
extend ActiveSupport::Concern
private
def redis def redis
Thread.current[:redis] ||= RedisConfiguration.pool.checkout Thread.current[:redis] ||= RedisConfiguration.pool.checkout
end end
def with_redis(&block)
RedisConfiguration.with(&block)
end
end end

View File

@ -4,6 +4,7 @@ class ActivityPub::ProcessAccountService < BaseService
include JsonLdHelper include JsonLdHelper
include DomainControlHelper include DomainControlHelper
include Redisable include Redisable
include Lockable
# Should be called with confirmed valid JSON # Should be called with confirmed valid JSON
# and WebFinger-resolved username and domain # and WebFinger-resolved username and domain
@ -17,8 +18,7 @@ class ActivityPub::ProcessAccountService < BaseService
@domain = domain @domain = domain
@collections = {} @collections = {}
RedisLock.acquire(lock_options) do |lock| with_lock("process_account:#{@uri}") do
if lock.acquired?
@account = Account.remote.find_by(uri: @uri) if @options[:only_key] @account = Account.remote.find_by(uri: @uri) if @options[:only_key]
@account ||= Account.find_remote(@username, @domain) @account ||= Account.find_remote(@username, @domain)
@old_public_key = @account&.public_key @old_public_key = @account&.public_key
@ -30,9 +30,6 @@ class ActivityPub::ProcessAccountService < BaseService
process_tags process_tags
process_duplicate_accounts! if @options[:verified_webfinger] process_duplicate_accounts! if @options[:verified_webfinger]
else
raise Mastodon::RaceConditionError
end
end end
return if @account.nil? return if @account.nil?
@ -289,10 +286,6 @@ class ActivityPub::ProcessAccountService < BaseService
!@old_protocol.nil? && @old_protocol != @account.protocol !@old_protocol.nil? && @old_protocol != @account.protocol
end end
def lock_options
{ redis: redis, key: "process_account:#{@uri}", autorelease: 15.minutes.seconds }
end
def process_tags def process_tags
return if @json['tag'].blank? return if @json['tag'].blank?

View File

@ -3,6 +3,7 @@
class ActivityPub::ProcessStatusUpdateService < BaseService class ActivityPub::ProcessStatusUpdateService < BaseService
include JsonLdHelper include JsonLdHelper
include Redisable include Redisable
include Lockable
def call(status, json) def call(status, json)
raise ArgumentError, 'Status has unsaved changes' if status.changed? raise ArgumentError, 'Status has unsaved changes' if status.changed?
@ -33,8 +34,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
last_edit_date = @status.edited_at.presence || @status.created_at last_edit_date = @status.edited_at.presence || @status.created_at
# Only allow processing one create/update per status at a time # Only allow processing one create/update per status at a time
RedisLock.acquire(lock_options) do |lock| with_lock("create:#{@uri}") do
if lock.acquired?
Status.transaction do Status.transaction do
record_previous_edit! record_previous_edit!
update_media_attachments! update_media_attachments!
@ -50,25 +50,17 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
reset_preview_card! reset_preview_card!
broadcast_updates! broadcast_updates!
else
raise Mastodon::RaceConditionError
end
end end
forward_activity! if significant_changes? && @status_parser.edited_at > last_edit_date forward_activity! if significant_changes? && @status_parser.edited_at > last_edit_date
end end
def handle_implicit_update! def handle_implicit_update!
RedisLock.acquire(lock_options) do |lock| with_lock("create:#{@uri}") do
if lock.acquired?
update_poll!(allow_significant_changes: false) update_poll!(allow_significant_changes: false)
else
raise Mastodon::RaceConditionError
end
end
queue_poll_notifications! queue_poll_notifications!
end end
end
def update_media_attachments! def update_media_attachments!
previous_media_attachments = @status.media_attachments.to_a previous_media_attachments = @status.media_attachments.to_a
@ -241,10 +233,6 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
equals_or_includes_any?(@json['type'], %w(Note Question)) equals_or_includes_any?(@json['type'], %w(Note Question))
end end
def lock_options
{ redis: redis, key: "create:#{@uri}", autorelease: 15.minutes.seconds }
end
def record_previous_edit! def record_previous_edit!
@previous_edit = @status.build_snapshot(at_time: @status.created_at, rate_limit: false) if @status.edits.empty? @previous_edit = @status.build_snapshot(at_time: @status.created_at, rate_limit: false) if @status.edits.empty?
end end

View File

@ -2,6 +2,7 @@
class FetchLinkCardService < BaseService class FetchLinkCardService < BaseService
include Redisable include Redisable
include Lockable
URL_PATTERN = %r{ URL_PATTERN = %r{
(#{Twitter::TwitterText::Regex[:valid_url_preceding_chars]}) # $1 preceding chars (#{Twitter::TwitterText::Regex[:valid_url_preceding_chars]}) # $1 preceding chars
@ -22,13 +23,9 @@ class FetchLinkCardService < BaseService
@url = @original_url.to_s @url = @original_url.to_s
RedisLock.acquire(lock_options) do |lock| with_lock("fetch:#{@original_url}") do
if lock.acquired?
@card = PreviewCard.find_by(url: @url) @card = PreviewCard.find_by(url: @url)
process_url if @card.nil? || @card.updated_at <= 2.weeks.ago || @card.missing_image? process_url if @card.nil? || @card.updated_at <= 2.weeks.ago || @card.missing_image?
else
raise Mastodon::RaceConditionError
end
end end
attach_card if @card&.persisted? attach_card if @card&.persisted?
@ -155,8 +152,4 @@ class FetchLinkCardService < BaseService
@card.assign_attributes(link_details_extractor.to_preview_card_attributes) @card.assign_attributes(link_details_extractor.to_preview_card_attributes)
@card.save_with_optional_image! unless @card.title.blank? && @card.html.blank? @card.save_with_optional_image! unless @card.title.blank? && @card.html.blank?
end end
def lock_options
{ redis: redis, key: "fetch:#{@original_url}", autorelease: 15.minutes.seconds }
end
end end

View File

@ -3,6 +3,7 @@
class RemoveStatusService < BaseService class RemoveStatusService < BaseService
include Redisable include Redisable
include Payloadable include Payloadable
include Lockable
# Delete a status # Delete a status
# @param [Status] status # @param [Status] status
@ -17,8 +18,7 @@ class RemoveStatusService < BaseService
@account = status.account @account = status.account
@options = options @options = options
RedisLock.acquire(lock_options) do |lock| with_lock("distribute:#{@status.id}") do
if lock.acquired?
@status.discard @status.discard
remove_from_self if @account.local? remove_from_self if @account.local?
@ -45,9 +45,6 @@ class RemoveStatusService < BaseService
end end
@status.destroy! if permanently? @status.destroy! if permanently?
else
raise Mastodon::RaceConditionError
end
end end
end end
@ -144,8 +141,4 @@ class RemoveStatusService < BaseService
def permanently? def permanently?
@options[:immediate] || !(@options[:preserve] || @status.reported?) @options[:immediate] || !(@options[:preserve] || @status.reported?)
end end
def lock_options
{ redis: redis, key: "distribute:#{@status.id}", autorelease: 5.minutes.seconds }
end
end end

View File

@ -5,6 +5,7 @@ class ResolveAccountService < BaseService
include DomainControlHelper include DomainControlHelper
include WebfingerHelper include WebfingerHelper
include Redisable include Redisable
include Lockable
# Find or create an account record for a remote user. When creating, # Find or create an account record for a remote user. When creating,
# look up the user's webfinger and fetch ActivityPub data # look up the user's webfinger and fetch ActivityPub data
@ -108,12 +109,8 @@ class ResolveAccountService < BaseService
def fetch_account! def fetch_account!
return unless activitypub_ready? return unless activitypub_ready?
RedisLock.acquire(lock_options) do |lock| with_lock("resolve:#{@username}@#{@domain}") do
if lock.acquired?
@account = ActivityPub::FetchRemoteAccountService.new.call(actor_url) @account = ActivityPub::FetchRemoteAccountService.new.call(actor_url)
else
raise Mastodon::RaceConditionError
end
end end
@account @account
@ -146,8 +143,4 @@ class ResolveAccountService < BaseService
@account.suspend!(origin: :remote) @account.suspend!(origin: :remote)
AccountDeletionWorker.perform_async(@account.id, { 'reserve_username' => false, 'skip_activitypub' => true }) AccountDeletionWorker.perform_async(@account.id, { 'reserve_username' => false, 'skip_activitypub' => true })
end end
def lock_options
{ redis: redis, key: "resolve:#{@username}@#{@domain}", autorelease: 15.minutes.seconds }
end
end end

View File

@ -4,6 +4,7 @@ class VoteService < BaseService
include Authorization include Authorization
include Payloadable include Payloadable
include Redisable include Redisable
include Lockable
def call(account, poll, choices) def call(account, poll, choices)
authorize_with account, poll, :vote? authorize_with account, poll, :vote?
@ -15,8 +16,7 @@ class VoteService < BaseService
already_voted = true already_voted = true
RedisLock.acquire(lock_options) do |lock| with_lock("vote:#{@poll.id}:#{@account.id}") do
if lock.acquired?
already_voted = @poll.votes.where(account: @account).exists? already_voted = @poll.votes.where(account: @account).exists?
ApplicationRecord.transaction do ApplicationRecord.transaction do
@ -24,9 +24,6 @@ class VoteService < BaseService
@votes << @poll.votes.create!(account: @account, choice: Integer(choice)) @votes << @poll.votes.create!(account: @account, choice: Integer(choice))
end end
end end
else
raise Mastodon::RaceConditionError
end
end end
increment_voters_count! unless already_voted increment_voters_count! unless already_voted
@ -76,8 +73,4 @@ class VoteService < BaseService
@poll.reload @poll.reload
retry retry
end end
def lock_options
{ redis: redis, key: "vote:#{@poll.id}:#{@account.id}" }
end
end end

View File

@ -3,14 +3,11 @@
class DistributionWorker class DistributionWorker
include Sidekiq::Worker include Sidekiq::Worker
include Redisable include Redisable
include Lockable
def perform(status_id, options = {}) def perform(status_id, options = {})
RedisLock.acquire(redis: redis, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock| with_lock("distribute:#{status_id}") do
if lock.acquired?
FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys) FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
else
raise Mastodon::RaceConditionError
end
end end
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
true true