From ca17bae904783dfb1f4899a533d28a79da0c6fe9 Mon Sep 17 00:00:00 2001 From: Thibaut Girka Date: Tue, 25 Jun 2019 22:56:32 +0200 Subject: [PATCH] Use a redis-cached feed for the DM timeline --- .../api/v1/timelines/direct_controller.rb | 10 +++-- app/lib/feed_manager.rb | 40 +++++++++++++++++++ app/models/direct_feed.rb | 31 ++++++++++++++ app/services/batched_remove_status_service.rb | 4 +- app/services/fan_out_on_write_service.rb | 7 ++-- app/services/precompute_feed_service.rb | 1 + app/services/remove_status_service.rb | 4 +- app/workers/feed_insert_worker.rb | 11 ++++- .../scheduler/feed_cleanup_scheduler.rb | 5 +++ 9 files changed, 100 insertions(+), 13 deletions(-) create mode 100644 app/models/direct_feed.rb diff --git a/app/controllers/api/v1/timelines/direct_controller.rb b/app/controllers/api/v1/timelines/direct_controller.rb index d8a76d153a6..6e98e9cacb9 100644 --- a/app/controllers/api/v1/timelines/direct_controller.rb +++ b/app/controllers/api/v1/timelines/direct_controller.rb @@ -27,16 +27,18 @@ class Api::V1::Timelines::DirectController < Api::BaseController end def direct_timeline_statuses - # this query requires built in pagination. - Status.as_direct_timeline( - current_account, + account_direct_feed.get( limit_param(DEFAULT_STATUSES_LIMIT), params[:max_id], params[:since_id], - true # returns array of cache_ids object + params[:min_id] ) end + def account_direct_feed + DirectFeed.new(current_account) + end + def insert_pagination_headers set_pagination_headers(next_path, prev_path) end diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index ddcbdf6da7b..59767cdfe06 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -22,6 +22,8 @@ class FeedManager filter_from_home?(status, receiver_id) elsif timeline_type == :mentions filter_from_mentions?(status, receiver_id) + elsif timeline_type == :direct + filter_from_direct?(status, receiver_id) else false end @@ -59,6 +61,18 @@ class FeedManager true end + def push_to_direct(account, status) + return false unless add_to_feed(:direct, account.id, status) + trim(:direct, account.id) + PushUpdateWorker.perform_async(account.id, status.id, "timeline:direct:#{account.id}") + true + end + + def unpush_from_direct(account, status) + return false unless remove_from_feed(:direct, account.id, status) + redis.publish("timeline:direct:#{account.id}", Oj.dump(event: :delete, payload: status.id.to_s)) + end + def trim(type, account_id) timeline_key = key(type, account_id) reblog_key = key(type, account_id, 'reblogs') @@ -142,6 +156,27 @@ class FeedManager end end + def populate_direct_feed(account) + added = 0 + limit = FeedManager::MAX_ITEMS / 2 + max_id = nil + + loop do + statuses = Status.as_direct_timeline(account, limit, max_id) + + break if statuses.empty? + + statuses.each do |status| + next if filter_from_direct?(status, account) + added += 1 if add_to_feed(:direct, account.id, status) + end + + break unless added.zero? + + max_id = statuses.last.id + end + end + private def push_update_required?(timeline_id) @@ -199,6 +234,11 @@ class FeedManager should_filter end + def filter_from_direct?(status, receiver_id) + return false if receiver_id == status.account_id + filter_from_mentions?(status, receiver_id) + end + def phrase_filtered?(status, receiver_id, context) active_filters = Rails.cache.fetch("filters:#{receiver_id}") { CustomFilter.where(account_id: receiver_id).active_irreversible.to_a }.to_a diff --git a/app/models/direct_feed.rb b/app/models/direct_feed.rb new file mode 100644 index 00000000000..c0b8a0a35bc --- /dev/null +++ b/app/models/direct_feed.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +class DirectFeed < Feed + include Redisable + + def initialize(account) + @type = :direct + @id = account.id + @account = account + end + + def get(limit, max_id = nil, since_id = nil, min_id = nil) + unless redis.exists("account:#{@account.id}:regeneration") + statuses = super + return statuses unless statuses.empty? + end + from_database(limit, max_id, since_id, min_id) + end + + private + + def from_database(limit, max_id, since_id, min_id) + loop do + statuses = Status.as_direct_timeline(@account, limit, max_id, since_id, min_id) + return statuses if statuses.empty? + max_id = statuses.last.id + statuses = statuses.reject { |status| FeedManager.instance.filter?(:direct, status, @account.id) } + return statuses unless statuses.empty? + end + end +end diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb index 02f7076f747..2fe009c9135 100644 --- a/app/services/batched_remove_status_service.rb +++ b/app/services/batched_remove_status_service.rb @@ -107,9 +107,9 @@ class BatchedRemoveStatusService < BaseService payload = @json_payloads[status.id] redis.pipelined do @mentions[status.id].each do |mention| - redis.publish("timeline:direct:#{mention.account.id}", payload) if mention.account.local? + FeedManager.instance.unpush_from_direct(mention.account, status) if mention.account.local? end - redis.publish("timeline:direct:#{status.account.id}", payload) if status.account.local? + FeedManager.instance.unpush_from_direct(status.account, status) if status.account.local? end end diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index b66dc342e5b..cf433d8a69d 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -37,6 +37,7 @@ class FanOutOnWriteService < BaseService def deliver_to_self(status) Rails.logger.debug "Delivering status #{status.id} to author" FeedManager.instance.push_to_home(status.account, status) + FeedManager.instance.push_to_direct(status.account, status) if status.direct_visibility? end def deliver_to_followers(status) @@ -98,11 +99,9 @@ class FanOutOnWriteService < BaseService def deliver_to_direct_timelines(status) Rails.logger.debug "Delivering status #{status.id} to direct timelines" - status.mentions.includes(:account).each do |mention| - Redis.current.publish("timeline:direct:#{mention.account.id}", @payload) if mention.account.local? + FeedInsertWorker.push_bulk(status.mentions.includes(:account).map(&:account).select { |mentioned_account| mentioned_account.local? }) do |account| + [status.id, account.id, :direct] end - - Redis.current.publish("timeline:direct:#{status.account.id}", @payload) if status.account.local? end def deliver_to_own_conversation(status) diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb index 076dedacab9..029c2f6e51b 100644 --- a/app/services/precompute_feed_service.rb +++ b/app/services/precompute_feed_service.rb @@ -3,6 +3,7 @@ class PrecomputeFeedService < BaseService def call(account) FeedManager.instance.populate_feed(account) + FeedManager.instance.populate_direct_feed(account) ensure Redis.current.del("account:#{account.id}:regeneration") end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index 98972fc70a6..9d5d0fc1475 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -48,6 +48,7 @@ class RemoveStatusService < BaseService def remove_from_self FeedManager.instance.unpush_from_home(@account, @status) + FeedManager.instance.unpush_from_direct(@account, @status) if @status.direct_visibility? end def remove_from_followers @@ -159,9 +160,8 @@ class RemoveStatusService < BaseService def remove_from_direct @mentions.each do |mention| - Redis.current.publish("timeline:direct:#{mention.account.id}", @payload) if mention.account.local? + FeedManager.instance.unpush_from_direct(mention.account, @status) if mention.account.local? end - Redis.current.publish("timeline:direct:#{@account.id}", @payload) if @account.local? end def lock_options diff --git a/app/workers/feed_insert_worker.rb b/app/workers/feed_insert_worker.rb index 1ae3c877b09..546f5c0c27f 100644 --- a/app/workers/feed_insert_worker.rb +++ b/app/workers/feed_insert_worker.rb @@ -13,6 +13,8 @@ class FeedInsertWorker when :list @list = List.find(id) @follower = @list.account + when :direct + @account = Account.find(id) end check_and_insert @@ -29,7 +31,12 @@ class FeedInsertWorker def feed_filtered? # Note: Lists are a variation of home, so the filtering rules # of home apply to both - FeedManager.instance.filter?(:home, @status, @follower.id) + case @type + when :home, :list + FeedManager.instance.filter?(:home, @status, @follower.id) + when :direct + FeedManager.instance.filter?(:direct, @status, @account.id) + end end def perform_push @@ -38,6 +45,8 @@ class FeedInsertWorker FeedManager.instance.push_to_home(@follower, @status) when :list FeedManager.instance.push_to_list(@list, @status) + when :direct + FeedManager.instance.push_to_direct(@account, @status) end end end diff --git a/app/workers/scheduler/feed_cleanup_scheduler.rb b/app/workers/scheduler/feed_cleanup_scheduler.rb index bf5e2075700..4933f1753b6 100644 --- a/app/workers/scheduler/feed_cleanup_scheduler.rb +++ b/app/workers/scheduler/feed_cleanup_scheduler.rb @@ -9,6 +9,7 @@ class Scheduler::FeedCleanupScheduler def perform clean_home_feeds! clean_list_feeds! + clean_direct_feeds! end private @@ -21,6 +22,10 @@ class Scheduler::FeedCleanupScheduler clean_feeds!(inactive_list_ids, :list) end + def clean_direct_feeds! + clean_feeds!(inactive_account_ids, :direct) + end + def clean_feeds!(ids, type) reblogged_id_sets = {}