From 2303f4e84d86dc5ad106e61c1f44d1a24bd7ad79 Mon Sep 17 00:00:00 2001 From: Claire Date: Mon, 12 Feb 2024 13:29:36 +0100 Subject: [PATCH] Improve performance of deleting OAuth tokens --- app/lib/application_extension.rb | 9 +++++++-- app/models/user.rb | 7 +++++-- spec/models/user_spec.rb | 6 ++++-- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/app/lib/application_extension.rb b/app/lib/application_extension.rb index 9245e00c14..400c51a023 100644 --- a/app/lib/application_extension.rb +++ b/app/lib/application_extension.rb @@ -25,8 +25,13 @@ module ApplicationExtension def push_to_streaming_api # TODO: #28793 Combine into a single topic - access_tokens.in_batches.each do |token| - redis.publish("timeline:access_token:#{token.id}", Oj.dump(event: :kill)) + payload = Oj.dump(event: :kill) + access_tokens.in_batches do |tokens| + redis.pipelined do |pipeline| + tokens.ids.each do |id| + pipeline.publish("timeline:access_token:#{id}", payload) + end + end end end end diff --git a/app/models/user.rb b/app/models/user.rb index f9dc2be667..289ee1219a 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -363,8 +363,11 @@ class User < ApplicationRecord # Revoke each access token for the Streaming API, since `update_all`` # doesn't trigger ActiveRecord Callbacks: # TODO: #28793 Combine into a single topic - batch.each do |token| - redis.publish("timeline:access_token:#{token.id}", Oj.dump(event: :kill)) + payload = Oj.dump(event: :kill) + redis.pipelined do |pipeline| + batch.ids.each do |id| + pipeline.publish("timeline:access_token:#{id}", payload) + end end end end diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index f17f5985d6..fd0246f1e0 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -438,8 +438,10 @@ RSpec.describe User do let!(:access_token) { Fabricate(:access_token, resource_owner_id: user.id) } let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_token) } + let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) } + before do - allow(redis).to receive_messages(publish: nil) + allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub) user.reset_password! end @@ -456,7 +458,7 @@ RSpec.describe User do end it 'revokes streaming access for all access tokens' do - expect(redis).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once + expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once end it 'removes push subscriptions' do