diff --git a/app/lib/application_extension.rb b/app/lib/application_extension.rb index 9245e00c14..400c51a023 100644 --- a/app/lib/application_extension.rb +++ b/app/lib/application_extension.rb @@ -25,8 +25,13 @@ module ApplicationExtension def push_to_streaming_api # TODO: #28793 Combine into a single topic - access_tokens.in_batches.each do |token| - redis.publish("timeline:access_token:#{token.id}", Oj.dump(event: :kill)) + payload = Oj.dump(event: :kill) + access_tokens.in_batches do |tokens| + redis.pipelined do |pipeline| + tokens.ids.each do |id| + pipeline.publish("timeline:access_token:#{id}", payload) + end + end end end end diff --git a/app/models/user.rb b/app/models/user.rb index 64edd3557d..6c8c8e33fd 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -364,8 +364,11 @@ class User < ApplicationRecord # Revoke each access token for the Streaming API, since `update_all`` # doesn't trigger ActiveRecord Callbacks: # TODO: #28793 Combine into a single topic - batch.each do |token| - redis.publish("timeline:access_token:#{token.id}", Oj.dump(event: :kill)) + payload = Oj.dump(event: :kill) + redis.pipelined do |pipeline| + batch.ids.each do |id| + pipeline.publish("timeline:access_token:#{id}", payload) + end end end end diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index f351db3720..12ec3d3355 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -432,8 +432,10 @@ RSpec.describe User do let!(:access_token) { Fabricate(:access_token, resource_owner_id: user.id) } let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_token) } + let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) } + before do - allow(redis).to receive_messages(publish: nil) + allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub) user.reset_password! end @@ -450,7 +452,7 @@ RSpec.describe User do end it 'revokes streaming access for all access tokens' do - expect(redis).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once + expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once end it 'removes push subscriptions' do