Use a redis-cached feed for the DM timeline
parent
eead3a7c91
commit
ec1fcf1584
|
@ -27,16 +27,18 @@ class Api::V1::Timelines::DirectController < Api::BaseController
|
||||||
end
|
end
|
||||||
|
|
||||||
def direct_timeline_statuses
|
def direct_timeline_statuses
|
||||||
# this query requires built in pagination.
|
account_direct_feed.get(
|
||||||
Status.as_direct_timeline(
|
|
||||||
current_account,
|
|
||||||
limit_param(DEFAULT_STATUSES_LIMIT),
|
limit_param(DEFAULT_STATUSES_LIMIT),
|
||||||
params[:max_id],
|
params[:max_id],
|
||||||
params[:since_id],
|
params[:since_id],
|
||||||
true # returns array of cache_ids object
|
params[:min_id]
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def account_direct_feed
|
||||||
|
DirectFeed.new(current_account)
|
||||||
|
end
|
||||||
|
|
||||||
def insert_pagination_headers
|
def insert_pagination_headers
|
||||||
set_pagination_headers(next_path, prev_path)
|
set_pagination_headers(next_path, prev_path)
|
||||||
end
|
end
|
||||||
|
|
|
@ -22,6 +22,8 @@ class FeedManager
|
||||||
filter_from_home?(status, receiver_id)
|
filter_from_home?(status, receiver_id)
|
||||||
elsif timeline_type == :mentions
|
elsif timeline_type == :mentions
|
||||||
filter_from_mentions?(status, receiver_id)
|
filter_from_mentions?(status, receiver_id)
|
||||||
|
elsif timeline_type == :direct
|
||||||
|
filter_from_direct?(status, receiver_id)
|
||||||
else
|
else
|
||||||
false
|
false
|
||||||
end
|
end
|
||||||
|
@ -59,6 +61,18 @@ class FeedManager
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def push_to_direct(account, status)
|
||||||
|
return false unless add_to_feed(:direct, account.id, status)
|
||||||
|
trim(:direct, account.id)
|
||||||
|
PushUpdateWorker.perform_async(account.id, status.id, "timeline:direct:#{account.id}")
|
||||||
|
true
|
||||||
|
end
|
||||||
|
|
||||||
|
def unpush_from_direct(account, status)
|
||||||
|
return false unless remove_from_feed(:direct, account.id, status)
|
||||||
|
redis.publish("timeline:direct:#{account.id}", Oj.dump(event: :delete, payload: status.id.to_s))
|
||||||
|
end
|
||||||
|
|
||||||
def trim(type, account_id)
|
def trim(type, account_id)
|
||||||
timeline_key = key(type, account_id)
|
timeline_key = key(type, account_id)
|
||||||
reblog_key = key(type, account_id, 'reblogs')
|
reblog_key = key(type, account_id, 'reblogs')
|
||||||
|
@ -142,6 +156,27 @@ class FeedManager
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def populate_direct_feed(account)
|
||||||
|
added = 0
|
||||||
|
limit = FeedManager::MAX_ITEMS / 2
|
||||||
|
max_id = nil
|
||||||
|
|
||||||
|
loop do
|
||||||
|
statuses = Status.as_direct_timeline(account, limit, max_id)
|
||||||
|
|
||||||
|
break if statuses.empty?
|
||||||
|
|
||||||
|
statuses.each do |status|
|
||||||
|
next if filter_from_direct?(status, account)
|
||||||
|
added += 1 if add_to_feed(:direct, account.id, status)
|
||||||
|
end
|
||||||
|
|
||||||
|
break unless added.zero?
|
||||||
|
|
||||||
|
max_id = statuses.last.id
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def push_update_required?(timeline_id)
|
def push_update_required?(timeline_id)
|
||||||
|
@ -199,6 +234,11 @@ class FeedManager
|
||||||
should_filter
|
should_filter
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def filter_from_direct?(status, receiver_id)
|
||||||
|
return false if receiver_id == status.account_id
|
||||||
|
filter_from_mentions?(status, receiver_id)
|
||||||
|
end
|
||||||
|
|
||||||
def phrase_filtered?(status, receiver_id, context)
|
def phrase_filtered?(status, receiver_id, context)
|
||||||
active_filters = Rails.cache.fetch("filters:#{receiver_id}") { CustomFilter.where(account_id: receiver_id).active_irreversible.to_a }.to_a
|
active_filters = Rails.cache.fetch("filters:#{receiver_id}") { CustomFilter.where(account_id: receiver_id).active_irreversible.to_a }.to_a
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class DirectFeed < Feed
|
||||||
|
include Redisable
|
||||||
|
|
||||||
|
def initialize(account)
|
||||||
|
@type = :direct
|
||||||
|
@id = account.id
|
||||||
|
@account = account
|
||||||
|
end
|
||||||
|
|
||||||
|
def get(limit, max_id = nil, since_id = nil, min_id = nil)
|
||||||
|
unless redis.exists("account:#{@account.id}:regeneration")
|
||||||
|
statuses = super
|
||||||
|
return statuses unless statuses.empty?
|
||||||
|
end
|
||||||
|
from_database(limit, max_id, since_id, min_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def from_database(limit, max_id, since_id, min_id)
|
||||||
|
loop do
|
||||||
|
statuses = Status.as_direct_timeline(@account, limit, max_id, since_id, min_id)
|
||||||
|
return statuses if statuses.empty?
|
||||||
|
max_id = statuses.last.id
|
||||||
|
statuses = statuses.reject { |status| FeedManager.instance.filter?(:direct, status, @account.id) }
|
||||||
|
return statuses unless statuses.empty?
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -107,9 +107,9 @@ class BatchedRemoveStatusService < BaseService
|
||||||
payload = @json_payloads[status.id]
|
payload = @json_payloads[status.id]
|
||||||
redis.pipelined do
|
redis.pipelined do
|
||||||
@mentions[status.id].each do |mention|
|
@mentions[status.id].each do |mention|
|
||||||
redis.publish("timeline:direct:#{mention.account.id}", payload) if mention.account.local?
|
FeedManager.instance.unpush_from_direct(mention.account, status) if mention.account.local?
|
||||||
end
|
end
|
||||||
redis.publish("timeline:direct:#{status.account.id}", payload) if status.account.local?
|
FeedManager.instance.unpush_from_direct(status.account, status) if status.account.local?
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ class FanOutOnWriteService < BaseService
|
||||||
def deliver_to_self(status)
|
def deliver_to_self(status)
|
||||||
Rails.logger.debug "Delivering status #{status.id} to author"
|
Rails.logger.debug "Delivering status #{status.id} to author"
|
||||||
FeedManager.instance.push_to_home(status.account, status)
|
FeedManager.instance.push_to_home(status.account, status)
|
||||||
|
FeedManager.instance.push_to_direct(status.account, status) if status.direct_visibility?
|
||||||
end
|
end
|
||||||
|
|
||||||
def deliver_to_followers(status)
|
def deliver_to_followers(status)
|
||||||
|
@ -98,11 +99,9 @@ class FanOutOnWriteService < BaseService
|
||||||
def deliver_to_direct_timelines(status)
|
def deliver_to_direct_timelines(status)
|
||||||
Rails.logger.debug "Delivering status #{status.id} to direct timelines"
|
Rails.logger.debug "Delivering status #{status.id} to direct timelines"
|
||||||
|
|
||||||
status.mentions.includes(:account).each do |mention|
|
FeedInsertWorker.push_bulk(status.mentions.includes(:account).map(&:account).select { |mentioned_account| mentioned_account.local? }) do |account|
|
||||||
Redis.current.publish("timeline:direct:#{mention.account.id}", @payload) if mention.account.local?
|
[status.id, account.id, :direct]
|
||||||
end
|
end
|
||||||
|
|
||||||
Redis.current.publish("timeline:direct:#{status.account.id}", @payload) if status.account.local?
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def deliver_to_own_conversation(status)
|
def deliver_to_own_conversation(status)
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
class PrecomputeFeedService < BaseService
|
class PrecomputeFeedService < BaseService
|
||||||
def call(account)
|
def call(account)
|
||||||
FeedManager.instance.populate_feed(account)
|
FeedManager.instance.populate_feed(account)
|
||||||
|
FeedManager.instance.populate_direct_feed(account)
|
||||||
ensure
|
ensure
|
||||||
Redis.current.del("account:#{account.id}:regeneration")
|
Redis.current.del("account:#{account.id}:regeneration")
|
||||||
end
|
end
|
||||||
|
|
|
@ -48,6 +48,7 @@ class RemoveStatusService < BaseService
|
||||||
|
|
||||||
def remove_from_self
|
def remove_from_self
|
||||||
FeedManager.instance.unpush_from_home(@account, @status)
|
FeedManager.instance.unpush_from_home(@account, @status)
|
||||||
|
FeedManager.instance.unpush_from_direct(@account, @status) if @status.direct_visibility?
|
||||||
end
|
end
|
||||||
|
|
||||||
def remove_from_followers
|
def remove_from_followers
|
||||||
|
@ -159,9 +160,8 @@ class RemoveStatusService < BaseService
|
||||||
|
|
||||||
def remove_from_direct
|
def remove_from_direct
|
||||||
@mentions.each do |mention|
|
@mentions.each do |mention|
|
||||||
Redis.current.publish("timeline:direct:#{mention.account.id}", @payload) if mention.account.local?
|
FeedManager.instance.unpush_from_direct(mention.account, @status) if mention.account.local?
|
||||||
end
|
end
|
||||||
Redis.current.publish("timeline:direct:#{@account.id}", @payload) if @account.local?
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def lock_options
|
def lock_options
|
||||||
|
|
|
@ -13,6 +13,8 @@ class FeedInsertWorker
|
||||||
when :list
|
when :list
|
||||||
@list = List.find(id)
|
@list = List.find(id)
|
||||||
@follower = @list.account
|
@follower = @list.account
|
||||||
|
when :direct
|
||||||
|
@account = Account.find(id)
|
||||||
end
|
end
|
||||||
|
|
||||||
check_and_insert
|
check_and_insert
|
||||||
|
@ -29,7 +31,12 @@ class FeedInsertWorker
|
||||||
def feed_filtered?
|
def feed_filtered?
|
||||||
# Note: Lists are a variation of home, so the filtering rules
|
# Note: Lists are a variation of home, so the filtering rules
|
||||||
# of home apply to both
|
# of home apply to both
|
||||||
FeedManager.instance.filter?(:home, @status, @follower.id)
|
case @type
|
||||||
|
when :home, :list
|
||||||
|
FeedManager.instance.filter?(:home, @status, @follower.id)
|
||||||
|
when :direct
|
||||||
|
FeedManager.instance.filter?(:direct, @status, @account.id)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform_push
|
def perform_push
|
||||||
|
@ -38,6 +45,8 @@ class FeedInsertWorker
|
||||||
FeedManager.instance.push_to_home(@follower, @status)
|
FeedManager.instance.push_to_home(@follower, @status)
|
||||||
when :list
|
when :list
|
||||||
FeedManager.instance.push_to_list(@list, @status)
|
FeedManager.instance.push_to_list(@list, @status)
|
||||||
|
when :direct
|
||||||
|
FeedManager.instance.push_to_direct(@account, @status)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -9,6 +9,7 @@ class Scheduler::FeedCleanupScheduler
|
||||||
def perform
|
def perform
|
||||||
clean_home_feeds!
|
clean_home_feeds!
|
||||||
clean_list_feeds!
|
clean_list_feeds!
|
||||||
|
clean_direct_feeds!
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
@ -21,6 +22,10 @@ class Scheduler::FeedCleanupScheduler
|
||||||
clean_feeds!(inactive_list_ids, :list)
|
clean_feeds!(inactive_list_ids, :list)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def clean_direct_feeds!
|
||||||
|
clean_feeds!(inactive_account_ids, :direct)
|
||||||
|
end
|
||||||
|
|
||||||
def clean_feeds!(ids, type)
|
def clean_feeds!(ids, type)
|
||||||
reblogged_id_sets = {}
|
reblogged_id_sets = {}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue