Fix opening and closing Redis connections instead of using a pool (#18171)
* Fix opening and closing Redis connections instead of using a pool * Fix Redis connections not being returned to the pool in CLI commandsmain
parent
6476f7e4da
commit
7b0fe4aef9
|
@ -2,12 +2,17 @@
|
||||||
|
|
||||||
class RedisConfiguration
|
class RedisConfiguration
|
||||||
class << self
|
class << self
|
||||||
|
def establish_pool(new_pool_size)
|
||||||
|
@pool&.shutdown(&:close)
|
||||||
|
@pool = ConnectionPool.new(size: new_pool_size) { new.connection }
|
||||||
|
end
|
||||||
|
|
||||||
def with
|
def with
|
||||||
pool.with { |redis| yield redis }
|
pool.with { |redis| yield redis }
|
||||||
end
|
end
|
||||||
|
|
||||||
def pool
|
def pool
|
||||||
@pool ||= ConnectionPool.new(size: pool_size) { new.connection }
|
@pool ||= establish_pool(pool_size)
|
||||||
end
|
end
|
||||||
|
|
||||||
def pool_size
|
def pool_size
|
||||||
|
|
|
@ -6,6 +6,6 @@ module Redisable
|
||||||
private
|
private
|
||||||
|
|
||||||
def redis
|
def redis
|
||||||
Thread.current[:redis] ||= RedisConfiguration.new.connection
|
Thread.current[:redis] ||= RedisConfiguration.pool.checkout
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,4 +1,6 @@
|
||||||
require 'stoplight'
|
require 'stoplight'
|
||||||
|
|
||||||
Stoplight::Light.default_data_store = Stoplight::DataStore::Redis.new(RedisConfiguration.new.connection)
|
Rails.application.reloader.to_prepare do
|
||||||
Stoplight::Light.default_notifiers = [Stoplight::Notifier::Logger.new(Rails.logger)]
|
Stoplight::Light.default_data_store = Stoplight::DataStore::Redis.new(RedisConfiguration.new.connection)
|
||||||
|
Stoplight::Light.default_notifiers = [Stoplight::Notifier::Logger.new(Rails.logger)]
|
||||||
|
end
|
||||||
|
|
|
@ -19,15 +19,18 @@ module Mastodon
|
||||||
ProgressBar.create(total: total, format: '%c/%u |%b%i| %e')
|
ProgressBar.create(total: total, format: '%c/%u |%b%i| %e')
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def reset_connection_pools!
|
||||||
|
ActiveRecord::Base.establish_connection(ActiveRecord::Base.configurations[Rails.env].dup.tap { |config| config['pool'] = options[:concurrency] + 1 })
|
||||||
|
RedisConfiguration.establish_pool(options[:concurrency])
|
||||||
|
end
|
||||||
|
|
||||||
def parallelize_with_progress(scope)
|
def parallelize_with_progress(scope)
|
||||||
if options[:concurrency] < 1
|
if options[:concurrency] < 1
|
||||||
say('Cannot run with this concurrency setting, must be at least 1', :red)
|
say('Cannot run with this concurrency setting, must be at least 1', :red)
|
||||||
exit(1)
|
exit(1)
|
||||||
end
|
end
|
||||||
|
|
||||||
db_config = ActiveRecord::Base.configurations[Rails.env].dup
|
reset_connection_pools!
|
||||||
db_config['pool'] = options[:concurrency] + 1
|
|
||||||
ActiveRecord::Base.establish_connection(db_config)
|
|
||||||
|
|
||||||
progress = create_progress_bar(scope.count)
|
progress = create_progress_bar(scope.count)
|
||||||
pool = Concurrent::FixedThreadPool.new(options[:concurrency])
|
pool = Concurrent::FixedThreadPool.new(options[:concurrency])
|
||||||
|
@ -52,6 +55,9 @@ module Mastodon
|
||||||
|
|
||||||
result = ActiveRecord::Base.connection_pool.with_connection do
|
result = ActiveRecord::Base.connection_pool.with_connection do
|
||||||
yield(item)
|
yield(item)
|
||||||
|
ensure
|
||||||
|
RedisConfiguration.pool.checkin if Thread.current[:redis]
|
||||||
|
Thread.current[:redis] = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
aggregate.increment(result) if result.is_a?(Integer)
|
aggregate.increment(result) if result.is_a?(Integer)
|
||||||
|
|
|
@ -19,7 +19,7 @@ class Mastodon::RackMiddleware
|
||||||
end
|
end
|
||||||
|
|
||||||
def clean_up_redis_socket!
|
def clean_up_redis_socket!
|
||||||
Thread.current[:redis]&.close
|
RedisConfiguration.pool.checkin if Thread.current[:redis]
|
||||||
Thread.current[:redis] = nil
|
Thread.current[:redis] = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -59,9 +59,7 @@ module Mastodon
|
||||||
index.specification.lock!
|
index.specification.lock!
|
||||||
end
|
end
|
||||||
|
|
||||||
db_config = ActiveRecord::Base.configurations[Rails.env].dup
|
reset_connection_pools!
|
||||||
db_config['pool'] = options[:concurrency] + 1
|
|
||||||
ActiveRecord::Base.establish_connection(db_config)
|
|
||||||
|
|
||||||
pool = Concurrent::FixedThreadPool.new(options[:concurrency])
|
pool = Concurrent::FixedThreadPool.new(options[:concurrency])
|
||||||
added = Concurrent::AtomicFixnum.new(0)
|
added = Concurrent::AtomicFixnum.new(0)
|
||||||
|
@ -139,6 +137,9 @@ module Mastodon
|
||||||
sleep 1
|
sleep 1
|
||||||
rescue => e
|
rescue => e
|
||||||
progress.log pastel.red("Error importing #{index}: #{e}")
|
progress.log pastel.red("Error importing #{index}: #{e}")
|
||||||
|
ensure
|
||||||
|
RedisConfiguration.pool.checkin if Thread.current[:redis]
|
||||||
|
Thread.current[:redis] = nil
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -26,7 +26,7 @@ class Mastodon::SidekiqMiddleware
|
||||||
end
|
end
|
||||||
|
|
||||||
def clean_up_redis_socket!
|
def clean_up_redis_socket!
|
||||||
Thread.current[:redis]&.close
|
RedisConfiguration.pool.checkin if Thread.current[:redis]
|
||||||
Thread.current[:redis] = nil
|
Thread.current[:redis] = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -220,6 +220,8 @@ RSpec.describe ResolveAccountService, type: :service do
|
||||||
return_values << described_class.new.call('foo@ap.example.com')
|
return_values << described_class.new.call('foo@ap.example.com')
|
||||||
rescue ActiveRecord::RecordNotUnique
|
rescue ActiveRecord::RecordNotUnique
|
||||||
fail_occurred = true
|
fail_occurred = true
|
||||||
|
ensure
|
||||||
|
RedisConfiguration.pool.checkin if Thread.current[:redis]
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue