Change algorithm of `tootctl search deploy` to improve performance (#18463)

lolsob-rspec
Eugen Rochko 2022-05-22 22:16:43 +02:00 committed by GitHub
parent 4be9216ccd
commit eda9c41ed8
9 changed files with 294 additions and 103 deletions

View File

@ -23,7 +23,7 @@ class AccountsIndex < Chewy::Index
}, },
} }
index_scope ::Account.searchable.includes(:account_stat), delete_if: ->(account) { account.destroyed? || !account.searchable? } index_scope ::Account.searchable.includes(:account_stat)
root date_detection: false do root date_detection: false do
field :id, type: 'long' field :id, type: 'long'
@ -36,8 +36,8 @@ class AccountsIndex < Chewy::Index
field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content' field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content'
end end
field :following_count, type: 'long', value: ->(account) { account.following.local.count } field :following_count, type: 'long', value: ->(account) { account.following_count }
field :followers_count, type: 'long', value: ->(account) { account.followers.local.count } field :followers_count, type: 'long', value: ->(account) { account.followers_count }
field :last_status_at, type: 'date', value: ->(account) { account.last_status_at || account.created_at } field :last_status_at, type: 'date', value: ->(account) { account.last_status_at || account.created_at }
end end
end end

View File

@ -33,6 +33,8 @@ class StatusesIndex < Chewy::Index
}, },
} }
# We do not use delete_if option here because it would call a method that we
# expect to be called with crutches without crutches, causing n+1 queries
index_scope ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll) index_scope ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll)
crutch :mentions do |collection| crutch :mentions do |collection|

View File

@ -23,7 +23,11 @@ class TagsIndex < Chewy::Index
}, },
} }
index_scope ::Tag.listable, delete_if: ->(tag) { tag.destroyed? || !tag.listable? } index_scope ::Tag.listable
crutch :time_period do
7.days.ago.to_date..0.days.ago.to_date
end
root date_detection: false do root date_detection: false do
field :name, type: 'text', analyzer: 'content' do field :name, type: 'text', analyzer: 'content' do
@ -31,7 +35,7 @@ class TagsIndex < Chewy::Index
end end
field :reviewed, type: 'boolean', value: ->(tag) { tag.reviewed? } field :reviewed, type: 'boolean', value: ->(tag) { tag.reviewed? }
field :usage, type: 'long', value: ->(tag) { tag.history.reduce(0) { |total, day| total + day.accounts } } field :usage, type: 'long', value: ->(tag, crutches) { tag.history.aggregate(crutches.time_period).accounts }
field :last_status_at, type: 'date', value: ->(tag) { tag.last_status_at || tag.created_at } field :last_status_at, type: 'date', value: ->(tag) { tag.last_status_at || tag.created_at }
end end
end end

View File

@ -0,0 +1,30 @@
# frozen_string_literal: true
class Importer::AccountsIndexImporter < Importer::BaseImporter
def import!
scope.includes(:account_stat).find_in_batches(batch_size: @batch_size) do |tmp|
in_work_unit(tmp) do |accounts|
bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: accounts).bulk_body
indexed = bulk.select { |entry| entry[:index] }.size
deleted = bulk.select { |entry| entry[:delete] }.size
Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
[indexed, deleted]
end
end
wait!
end
private
def index
AccountsIndex
end
def scope
Account.searchable
end
end

View File

@ -0,0 +1,87 @@
# frozen_string_literal: true
class Importer::BaseImporter
# @param [Integer] batch_size
# @param [Concurrent::ThreadPoolExecutor] executor
def initialize(batch_size:, executor:)
@batch_size = batch_size
@executor = executor
@wait_for = Concurrent::Set.new
end
# Callback to run when a concurrent work unit completes
# @param [Proc]
def on_progress(&block)
@on_progress = block
end
# Callback to run when a concurrent work unit fails
# @param [Proc]
def on_failure(&block)
@on_failure = block
end
# Reduce resource usage during and improve speed of indexing
def optimize_for_import!
Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: -1 } }
end
# Restore original index settings
def optimize_for_search!
Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: index.settings_hash[:settings][:index][:refresh_interval] } }
end
# Estimate the amount of documents that would be indexed. Not exact!
# @returns [Integer]
def estimate!
ActiveRecord::Base.connection_pool.with_connection { |connection| connection.select_one("SELECT reltuples AS estimate FROM pg_class WHERE relname = '#{index.adapter.target.table_name}'")['estimate'].to_i }
end
# Import data from the database into the index
def import!
raise NotImplementedError
end
# Remove documents from the index that no longer exist in the database
def clean_up!
index.scroll_batches do |documents|
ids = documents.map { |doc| doc['_id'] }
existence_map = index.adapter.target.where(id: ids).pluck(:id).each_with_object({}) { |id, map| map[id.to_s] = true }
tmp = ids.reject { |id| existence_map[id] }
next if tmp.empty?
in_work_unit(tmp) do |deleted_ids|
bulk = Chewy::Index::Import::BulkBuilder.new(index, delete: deleted_ids).bulk_body
Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
[0, bulk.size]
end
end
wait!
end
protected
def in_work_unit(*args, &block)
work_unit = Concurrent::Promises.future_on(@executor, *args, &block)
work_unit.on_fulfillment!(&@on_progress)
work_unit.on_rejection!(&@on_failure)
work_unit.on_resolution! { @wait_for.delete(work_unit) }
@wait_for << work_unit
rescue Concurrent::RejectedExecutionError
sleep(0.1) && retry # Backpressure
end
def wait!
Concurrent::Promises.zip(*@wait_for).wait
end
def index
raise NotImplementedError
end
end

View File

@ -0,0 +1,89 @@
# frozen_string_literal: true
class Importer::StatusesIndexImporter < Importer::BaseImporter
def import!
# The idea is that instead of iterating over all statuses in the database
# and calculating the searchable_by for each of them (majority of which
# would be empty), we approach the index from the other end
scopes.each do |scope|
# We could be tempted to keep track of status IDs we have already processed
# from a different scope to avoid indexing them multiple times, but that
# could end up being a very large array
scope.find_in_batches(batch_size: @batch_size) do |tmp|
in_work_unit(tmp.map(&:status_id)) do |status_ids|
bulk = ActiveRecord::Base.connection_pool.with_connection do
Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll).where(id: status_ids)).bulk_body
end
indexed = 0
deleted = 0
# We can't use the delete_if proc to do the filtering because delete_if
# is called before rendering the data and we need to filter based
# on the results of the filter, so this filtering happens here instead
bulk.map! do |entry|
new_entry = begin
if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
{ delete: entry[:index].except(:data) }
else
entry
end
end
if new_entry[:index]
indexed += 1
else
deleted += 1
end
new_entry
end
Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
[indexed, deleted]
end
end
end
wait!
end
private
def index
StatusesIndex
end
def scopes
[
local_statuses_scope,
local_mentions_scope,
local_favourites_scope,
local_votes_scope,
local_bookmarks_scope,
]
end
def local_mentions_scope
Mention.where(account: Account.local, silent: false).select(:id, :status_id)
end
def local_favourites_scope
Favourite.where(account: Account.local).select(:id, :status_id)
end
def local_bookmarks_scope
Bookmark.select(:id, :status_id)
end
def local_votes_scope
Poll.joins(:votes).where(votes: { account: Account.local }).select('polls.id, polls.status_id')
end
def local_statuses_scope
Status.local.select('id, coalesce(reblog_of_id, id) as status_id')
end
end

View File

@ -0,0 +1,26 @@
# frozen_string_literal: true
class Importer::TagsIndexImporter < Importer::BaseImporter
def import!
index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp|
in_work_unit(tmp) do |tags|
bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: tags).bulk_body
indexed = bulk.select { |entry| entry[:index] }.size
deleted = bulk.select { |entry| entry[:delete] }.size
Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
[indexed, deleted]
end
end
wait!
end
private
def index
TagsIndex
end
end

View File

@ -11,11 +11,11 @@ class Trends::History
end end
def uses def uses
redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum with_redis { |redis| redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum }
end end
def accounts def accounts
redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) with_redis { |redis| redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) }
end end
end end
@ -33,19 +33,21 @@ class Trends::History
attr_reader :day attr_reader :day
def accounts def accounts
redis.pfcount(key_for(:accounts)) with_redis { |redis| redis.pfcount(key_for(:accounts)) }
end end
def uses def uses
redis.get(key_for(:uses))&.to_i || 0 with_redis { |redis| redis.get(key_for(:uses))&.to_i || 0 }
end end
def add(account_id) def add(account_id)
redis.pipelined do with_redis do |redis|
redis.incrby(key_for(:uses), 1) redis.pipelined do |pipeline|
redis.pfadd(key_for(:accounts), account_id) pipeline.incrby(key_for(:uses), 1)
redis.expire(key_for(:uses), EXPIRE_AFTER) pipeline.pfadd(key_for(:accounts), account_id)
redis.expire(key_for(:accounts), EXPIRE_AFTER) pipeline.expire(key_for(:uses), EXPIRE_AFTER)
pipeline.expire(key_for(:accounts), EXPIRE_AFTER)
end
end end
end end

View File

@ -16,19 +16,21 @@ module Mastodon
StatusesIndex, StatusesIndex,
].freeze ].freeze
option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads' option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads'
option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch' option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch'
option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices' option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
option :import, type: :boolean, default: true, desc: 'Import data from the database to the index'
option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index'
desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them' desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them'
long_desc <<~LONG_DESC long_desc <<~LONG_DESC
If Elasticsearch is empty, this command will create the necessary indices If Elasticsearch is empty, this command will create the necessary indices
and then import data from the database into those indices. and then import data from the database into those indices.
This command will also upgrade indices if the underlying schema has been This command will also upgrade indices if the underlying schema has been
changed since the last run. changed since the last run. Index upgrades erase index data.
Even if creating or upgrading indices is not necessary, data from the Even if creating or upgrading indices is not necessary, data from the
database will be imported into the indices. database will be imported into the indices, unless overriden with --no-import.
LONG_DESC LONG_DESC
def deploy def deploy
if options[:concurrency] < 1 if options[:concurrency] < 1
@ -49,6 +51,8 @@ module Mastodon
end end
end end
pool = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10)
importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) }
progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false) progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
# First, ensure all indices are created and have the correct # First, ensure all indices are created and have the correct
@ -59,99 +63,46 @@ module Mastodon
index.specification.lock! index.specification.lock!
end end
progress.title = 'Estimating workload '
progress.total = indices.sum { |index| importers[index].estimate! }
reset_connection_pools! reset_connection_pools!
pool = Concurrent::FixedThreadPool.new(options[:concurrency]) added = 0
added = Concurrent::AtomicFixnum.new(0) removed = 0
removed = Concurrent::AtomicFixnum.new(0)
progress.title = 'Estimating workload '
# Estimate the amount of data that has to be imported first
progress.total = indices.sum { |index| index.adapter.default_scope.count }
# Now import all the actual data. Mind that unlike chewy:sync, we don't
# fetch and compare all record IDs from the database and the index to
# find out which to add and which to remove from the index. Because with
# potentially millions of rows, the memory footprint of such a calculation
# is uneconomical. So we only ever add.
indices.each do |index| indices.each do |index|
importer = importers[index]
importer.optimize_for_import!
importer.on_progress do |(indexed, deleted)|
progress.total = nil if progress.progress + indexed + deleted > progress.total
progress.progress += indexed + deleted
added += indexed
removed += deleted
end
importer.on_failure do |reason|
progress.log(pastel.red("Error while importing #{index}: #{reason}"))
end
if options[:import]
progress.title = "Importing #{index} " progress.title = "Importing #{index} "
batch_size = options[:batch_size] importer.import!
slice_size = (batch_size / options[:concurrency]).ceil
index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
futures = []
batch.each_slice(slice_size) do |records|
futures << Concurrent::Future.execute(executor: pool) do
begin
if !progress.total.nil? && progress.progress + records.size > progress.total
# The number of items has changed between start and now,
# since there is no good way to predict the final count from
# here, just change the progress bar to an indeterminate one
progress.total = nil
end end
grouped_records = nil if options[:clean]
bulk_body = nil progress.title = "Cleaning #{index} "
index_count = 0 importer.clean_up!
delete_count = 0
ActiveRecord::Base.connection_pool.with_connection do
grouped_records = records.to_a.group_by do |record|
index.adapter.send(:delete_from_index?, record) ? :delete : :to_index
end end
bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body
end
index_count = grouped_records[:to_index].size if grouped_records.key?(:to_index)
delete_count = grouped_records[:delete].size if grouped_records.key?(:delete)
# The following is an optimization for statuses specifically, since
# we want to de-index statuses that cannot be searched by anybody,
# but can't use Chewy's delete_if logic because it doesn't use
# crutches and our searchable_by logic depends on them
if index == StatusesIndex
bulk_body.map! do |entry|
if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank?
index_count -= 1
delete_count += 1
{ delete: entry[:to_index].except(:data) }
else
entry
end
end
end
Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body)
progress.progress += records.size
added.increment(index_count)
removed.increment(delete_count)
sleep 1
rescue => e
progress.log pastel.red("Error importing #{index}: #{e}")
ensure ensure
RedisConfiguration.pool.checkin if Thread.current[:redis] importer.optimize_for_search!
Thread.current[:redis] = nil
end
end
end end
futures.map(&:value) progress.title = 'Done! '
end progress.finish
end
progress.title = '' say("Indexed #{added} records, de-indexed #{removed}", :green, true)
progress.stop
say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
end end
end end
end end