From 46e13dd81c84ca952fb3f01e1ce389f473160fc5 Mon Sep 17 00:00:00 2001 From: Jonny Saunders Date: Wed, 12 Mar 2025 02:03:01 -0700 Subject: [PATCH] Add Fetch All Replies Part 1: Backend (#32615) Signed-off-by: sneakers-the-rat Co-authored-by: jonny Co-authored-by: Claire Co-authored-by: Kouhai <66407198+kouhaidev@users.noreply.github.com> --- .env.production.sample | 21 ++ app/controllers/api/v1/statuses_controller.rb | 2 + app/helpers/json_ld_helper.rb | 35 ++- .../concerns/status/fetch_replies_concern.rb | 53 ++++ app/models/status.rb | 2 + .../activitypub/fetch_all_replies_service.rb | 68 +++++ .../fetch_featured_collection_service.rb | 2 +- .../fetch_featured_tags_collection_service.rb | 2 +- .../fetch_remote_status_service.rb | 18 +- .../activitypub/fetch_replies_service.rb | 54 +++- .../synchronize_followers_service.rb | 2 +- .../activitypub/fetch_all_replies_worker.rb | 77 +++++ ...233930_add_fetched_replies_at_to_status.rb | 7 + db/schema.rb | 1 + .../status/fetch_replies_concern_spec.rb | 132 +++++++++ .../fetch_all_replies_service_spec.rb | 90 ++++++ .../fetch_remote_status_service_spec.rb | 53 +++- .../fetch_all_replies_worker_spec.rb | 280 ++++++++++++++++++ 18 files changed, 874 insertions(+), 25 deletions(-) create mode 100644 app/models/concerns/status/fetch_replies_concern.rb create mode 100644 app/services/activitypub/fetch_all_replies_service.rb create mode 100644 app/workers/activitypub/fetch_all_replies_worker.rb create mode 100644 db/migrate/20240918233930_add_fetched_replies_at_to_status.rb create mode 100644 spec/models/concerns/status/fetch_replies_concern_spec.rb create mode 100644 spec/services/activitypub/fetch_all_replies_service_spec.rb create mode 100644 spec/workers/activitypub/fetch_all_replies_worker_spec.rb diff --git a/.env.production.sample b/.env.production.sample index 1faaf5b57c..61bad7609c 100644 --- a/.env.production.sample +++ b/.env.production.sample @@ -86,3 +86,24 @@ S3_ALIAS_HOST=files.example.com # ----------------------- IP_RETENTION_PERIOD=31556952 SESSION_RETENTION_PERIOD=31556952 + +# Fetch All Replies Behavior +# -------------------------- +# When a user expands a post (DetailedStatus view), fetch all of its replies +# (default: true if unset, set explicitly to ``false`` to disable) +FETCH_REPLIES_ENABLED=true + +# Period to wait between fetching replies (in minutes) +FETCH_REPLIES_COOLDOWN_MINUTES=15 + +# Period to wait after a post is first created before fetching its replies (in minutes) +FETCH_REPLIES_INITIAL_WAIT_MINUTES=5 + +# Max number of replies to fetch - total, recursively through a whole reply tree +FETCH_REPLIES_MAX_GLOBAL=1000 + +# Max number of replies to fetch - for a single post +FETCH_REPLIES_MAX_SINGLE=500 + +# Max number of replies Collection pages to fetch - total +FETCH_REPLIES_MAX_PAGES=500 diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 2027bc6016..d3b0e89e97 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -58,6 +58,8 @@ class Api::V1::StatusesController < Api::BaseController statuses = [@status] + @context.ancestors + @context.descendants render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) + + ActivityPub::FetchAllRepliesWorker.perform_async(@status.id) if !current_account.nil? && @status.should_fetch_replies? end def create diff --git a/app/helpers/json_ld_helper.rb b/app/helpers/json_ld_helper.rb index ba096427cf..ccdefb35d8 100644 --- a/app/helpers/json_ld_helper.rb +++ b/app/helpers/json_ld_helper.rb @@ -155,24 +155,49 @@ module JsonLdHelper end end - def fetch_resource(uri, id_is_known, on_behalf_of = nil, request_options: {}) + # Fetch the resource given by uri. + # @param uri [String] + # @param id_is_known [Boolean] + # @param on_behalf_of [nil, Account] + # @param raise_on_error [Boolean, Symbol<:all, :temporary>] See {#fetch_resource_without_id_validation} for possible values + def fetch_resource(uri, id_is_known, on_behalf_of = nil, raise_on_error: false, request_options: {}) unless id_is_known - json = fetch_resource_without_id_validation(uri, on_behalf_of) + json = fetch_resource_without_id_validation(uri, on_behalf_of, raise_on_error: raise_on_error) return if !json.is_a?(Hash) || unsupported_uri_scheme?(json['id']) uri = json['id'] end - json = fetch_resource_without_id_validation(uri, on_behalf_of, request_options: request_options) + json = fetch_resource_without_id_validation(uri, on_behalf_of, raise_on_error: raise_on_error, request_options: request_options) json.present? && json['id'] == uri ? json : nil end - def fetch_resource_without_id_validation(uri, on_behalf_of = nil, raise_on_temporary_error = false, request_options: {}) + # Fetch the resource given by uri + # + # If an error is raised, it contains the response and can be captured for handling like + # + # begin + # fetch_resource_without_id_validation(uri, nil, true) + # rescue Mastodon::UnexpectedResponseError => e + # e.response + # end + # + # @param uri [String] + # @param on_behalf_of [nil, Account] + # @param raise_on_error [Boolean, Symbol<:all, :temporary>] + # - +true+, +:all+ - raise if response code is not in the 2** range + # - +:temporary+ - raise if the response code is not an "unsalvageable error" like a 404 + # (see {#response_error_unsalvageable} ) + # - +false+ - do not raise, return +nil+ + def fetch_resource_without_id_validation(uri, on_behalf_of = nil, raise_on_error: false, request_options: {}) on_behalf_of ||= Account.representative build_request(uri, on_behalf_of, options: request_options).perform do |response| - raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response) || !raise_on_temporary_error + raise Mastodon::UnexpectedResponseError, response if !response_successful?(response) && ( + [true, :all].include?(raise_on_error) || + (!response_error_unsalvageable?(response) && raise_on_error == :temporary) + ) body_to_json(response.body_with_limit) if response.code == 200 && valid_activitypub_content_type?(response) end diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb new file mode 100644 index 0000000000..f34bce59b4 --- /dev/null +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +module Status::FetchRepliesConcern + extend ActiveSupport::Concern + + # enable/disable fetching all replies + FETCH_REPLIES_ENABLED = ENV.key?('FETCH_REPLIES_ENABLED') ? ENV['FETCH_REPLIES_ENABLED'] == 'true' : true + + # debounce fetching all replies to minimize DoS + FETCH_REPLIES_COOLDOWN_MINUTES = (ENV['FETCH_REPLIES_COOLDOWN_MINUTES'] || 15).to_i.minutes + FETCH_REPLIES_INITIAL_WAIT_MINUTES = (ENV['FETCH_REPLIES_INITIAL_WAIT_MINUTES'] || 5).to_i.minutes + + included do + scope :created_recently, -> { where(created_at: FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago..) } + scope :not_created_recently, -> { where(created_at: ..FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago) } + scope :fetched_recently, -> { where(fetched_replies_at: FETCH_REPLIES_COOLDOWN_MINUTES.ago..) } + scope :not_fetched_recently, -> { where(fetched_replies_at: [nil, ..FETCH_REPLIES_COOLDOWN_MINUTES.ago]) } + + scope :should_not_fetch_replies, -> { local.or(created_recently.or(fetched_recently)) } + scope :should_fetch_replies, -> { remote.not_created_recently.not_fetched_recently } + + # statuses for which we won't receive update or deletion actions, + # and should update when fetching replies + # Status from an account which either + # a) has only remote followers + # b) has local follows that were created after the last update time, or + # c) has no known followers + scope :unsubscribed, lambda { + remote.merge( + Status.left_outer_joins(account: :followers).where.not(followers_accounts: { domain: nil }) + .or(where.not('follows.created_at < statuses.updated_at')) + .or(where(follows: { id: nil })) + ) + } + end + + def should_fetch_replies? + # we aren't brand new, and we haven't fetched replies since the debounce window + FETCH_REPLIES_ENABLED && !local? && created_at <= FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago && ( + fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_COOLDOWN_MINUTES.ago + ) + end + + def unsubscribed? + return false if local? + + !Follow.joins(:account).exists?( + target_account: account.id, + account: { domain: nil }, + created_at: ..updated_at + ) + end +end diff --git a/app/models/status.rb b/app/models/status.rb index c012b1ddfa..746fcde395 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -27,6 +27,7 @@ # edited_at :datetime # trendable :boolean # ordered_media_attachment_ids :bigint(8) is an Array +# fetched_replies_at :datetime # class Status < ApplicationRecord @@ -34,6 +35,7 @@ class Status < ApplicationRecord include Discard::Model include Paginable include RateLimitable + include Status::FetchRepliesConcern include Status::SafeReblogInsert include Status::SearchConcern include Status::SnapshotConcern diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb new file mode 100644 index 0000000000..9f92b7efee --- /dev/null +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService + include JsonLdHelper + + # Limit of replies to fetch per status + MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i + + def call(collection_or_uri, status_uri, max_pages = nil, request_id: nil) + @allow_synchronous_requests = true + @collection_or_uri = collection_or_uri + @status_uri = status_uri + + @items, n_pages = collection_items(collection_or_uri, max_pages) + @items = filtered_replies + return if @items.nil? + + FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + + [@items, n_pages] + end + + private + + def filtered_replies + return if @items.nil? + + # Find all statuses that we *shouldn't* update the replies for, and use that as a filter. + # We don't assume that we have the statuses before they're created, + # hence the negative filter - + # "keep all these uris except the ones we already have" + # instead of + # "keep all these uris that match some conditions on existing Status objects" + # + # Typically we assume the number of replies we *shouldn't* fetch is smaller than the + # replies we *should* fetch, so we also minimize the number of uris we should load here. + uris = @items.map { |item| value_or_id(item) } + + # Expand collection to get replies in the DB that were + # - not included in the collection, + # - that we have locally + # - but we have no local followers and thus don't get updates/deletes for + parent_id = Status.where(uri: @status_uri).pick(:id) + unless parent_id.nil? + unsubscribed_replies = Status + .where.not(uri: uris) + .where(in_reply_to_id: parent_id) + .unsubscribed + .pluck(:uri) + uris.concat(unsubscribed_replies) + end + + dont_update = Status.where(uri: uris).should_not_fetch_replies.pluck(:uri) + + # touch all statuses that already exist and that we're about to update + Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at) + + # Reject all statuses that we already have in the db + uris = (uris - dont_update).take(MAX_REPLIES) + + Rails.logger.debug { "FetchAllRepliesService - #{@collection_or_uri}: Fetching filtered statuses: #{uris}" } + uris + end + + def filter_by_host? + false + end +end diff --git a/app/services/activitypub/fetch_featured_collection_service.rb b/app/services/activitypub/fetch_featured_collection_service.rb index 89c3a1b6c0..25c62f3be6 100644 --- a/app/services/activitypub/fetch_featured_collection_service.rb +++ b/app/services/activitypub/fetch_featured_collection_service.rb @@ -33,7 +33,7 @@ class ActivityPub::FetchFeaturedCollectionService < BaseService return collection_or_uri if collection_or_uri.is_a?(Hash) return if non_matching_uri_hosts?(@account.uri, collection_or_uri) - fetch_resource_without_id_validation(collection_or_uri, local_follower, true) + fetch_resource_without_id_validation(collection_or_uri, local_follower, raise_on_error: :temporary) end def process_items(items) diff --git a/app/services/activitypub/fetch_featured_tags_collection_service.rb b/app/services/activitypub/fetch_featured_tags_collection_service.rb index a0b3c6036b..ec2422a075 100644 --- a/app/services/activitypub/fetch_featured_tags_collection_service.rb +++ b/app/services/activitypub/fetch_featured_tags_collection_service.rb @@ -45,7 +45,7 @@ class ActivityPub::FetchFeaturedTagsCollectionService < BaseService return collection_or_uri if collection_or_uri.is_a?(Hash) return if non_matching_uri_hosts?(@account.uri, collection_or_uri) - fetch_resource_without_id_validation(collection_or_uri, local_follower, true) + fetch_resource_without_id_validation(collection_or_uri, local_follower, raise_on_error: :temporary) end def process_items(items) diff --git a/app/services/activitypub/fetch_remote_status_service.rb b/app/services/activitypub/fetch_remote_status_service.rb index 6f8882378f..dc74de32f1 100644 --- a/app/services/activitypub/fetch_remote_status_service.rb +++ b/app/services/activitypub/fetch_remote_status_service.rb @@ -13,7 +13,7 @@ class ActivityPub::FetchRemoteStatusService < BaseService @request_id = request_id || "#{Time.now.utc.to_i}-status-#{uri}" @json = if prefetched_body.nil? - fetch_resource(uri, true, on_behalf_of) + fetch_status(uri, true, on_behalf_of) else body_to_json(prefetched_body, compare_id: uri) end @@ -80,4 +80,20 @@ class ActivityPub::FetchRemoteStatusService < BaseService def expected_object_type? equals_or_includes_any?(@json['type'], ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES) end + + def fetch_status(uri, id_is_known, on_behalf_of = nil) + begin + fetch_resource(uri, id_is_known, on_behalf_of, raise_on_error: true) + rescue Mastodon::UnexpectedResponseError => e + return unless e.response.code == 404 + + # If this is a 404 from a status from an account that has no local followers, delete it + existing_status = Status.find_by(uri: uri) + if !existing_status.nil? && existing_status.unsubscribed? && existing_status.distributable? + Rails.logger.debug { "FetchRemoteStatusService - Got 404 for orphaned status with URI #{uri}, deleting" } + Tombstone.find_or_create_by(uri: uri, account: existing_status.account) + RemoveStatusService.new.call(existing_status, redraft: false) + end + end + end end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 46cab6caf9..72b9c0f5a6 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -3,11 +3,14 @@ class ActivityPub::FetchRepliesService < BaseService include JsonLdHelper + # Limit of fetched replies + MAX_REPLIES = 5 + def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil) @account = parent_status.account @allow_synchronous_requests = allow_synchronous_requests - @items = collection_items(collection_or_uri) + @items, = collection_items(collection_or_uri) return if @items.nil? FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } @@ -17,25 +20,39 @@ class ActivityPub::FetchRepliesService < BaseService private - def collection_items(collection_or_uri) + def collection_items(collection_or_uri, max_pages = nil) collection = fetch_collection(collection_or_uri) return unless collection.is_a?(Hash) collection = fetch_collection(collection['first']) if collection['first'].present? return unless collection.is_a?(Hash) - case collection['type'] - when 'Collection', 'CollectionPage' - as_array(collection['items']) - when 'OrderedCollection', 'OrderedCollectionPage' - as_array(collection['orderedItems']) + all_items = [] + n_pages = 1 + while collection.is_a?(Hash) + items = case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end + + all_items.concat(as_array(items)) + + break if all_items.size >= MAX_REPLIES + break if !max_pages.nil? && n_pages >= max_pages + + collection = collection['next'].present? ? fetch_collection(collection['next']) : nil + n_pages += 1 end + + [all_items, n_pages] end def fetch_collection(collection_or_uri) return collection_or_uri if collection_or_uri.is_a?(Hash) return unless @allow_synchronous_requests - return if non_matching_uri_hosts?(@account.uri, collection_or_uri) + return if filter_by_host? && non_matching_uri_hosts?(@account.uri, collection_or_uri) # NOTE: For backward compatibility reasons, Mastodon signs outgoing # queries incorrectly by default. @@ -45,19 +62,28 @@ class ActivityPub::FetchRepliesService < BaseService # # Therefore, retry with correct signatures if this fails. begin - fetch_resource_without_id_validation(collection_or_uri, nil, true) + fetch_resource_without_id_validation(collection_or_uri, nil, raise_on_error: :temporary) rescue Mastodon::UnexpectedResponseError => e raise unless e.response && e.response.code == 401 && Addressable::URI.parse(collection_or_uri).query.present? - fetch_resource_without_id_validation(collection_or_uri, nil, true, request_options: { omit_query_string: false }) + fetch_resource_without_id_validation(collection_or_uri, nil, raise_on_error: :temporary, request_options: { omit_query_string: false }) end end def filtered_replies - # Only fetch replies to the same server as the original status to avoid - # amplification attacks. + if filter_by_host? + # Only fetch replies to the same server as the original status to avoid + # amplification attacks. - # Also limit to 5 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(5) + # Also limit to 5 fetched replies to limit potential for DoS. + @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES) + else + @items.map { |item| value_or_id(item) }.take(MAX_REPLIES) + end + end + + # Whether replies with a different domain than the replied_to post should be rejected + def filter_by_host? + true end end diff --git a/app/services/activitypub/synchronize_followers_service.rb b/app/services/activitypub/synchronize_followers_service.rb index f51d671a00..b01974dcc6 100644 --- a/app/services/activitypub/synchronize_followers_service.rb +++ b/app/services/activitypub/synchronize_followers_service.rb @@ -69,6 +69,6 @@ class ActivityPub::SynchronizeFollowersService < BaseService return collection_or_uri if collection_or_uri.is_a?(Hash) return if non_matching_uri_hosts?(@account.uri, collection_or_uri) - fetch_resource_without_id_validation(collection_or_uri, nil, true) + fetch_resource_without_id_validation(collection_or_uri, nil, raise_on_error: :temporary) end end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb new file mode 100644 index 0000000000..87eac321fa --- /dev/null +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -0,0 +1,77 @@ +# frozen_string_literal: true + +# Fetch all replies to a status, querying recursively through +# ActivityPub replies collections, fetching any statuses that +# we either don't already have or we haven't checked for new replies +# in the Status::FETCH_REPLIES_COOLDOWN_MINUTES interval +class ActivityPub::FetchAllRepliesWorker + include Sidekiq::Worker + include ExponentialBackoff + include JsonLdHelper + + sidekiq_options queue: 'pull', retry: 3 + + # Global max replies to fetch per request (all replies, recursively) + MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_GLOBAL'] || 1000).to_i + MAX_PAGES = (ENV['FETCH_REPLIES_MAX_PAGES'] || 500).to_i + + def perform(parent_status_id, options = {}) + @parent_status = Status.find(parent_status_id) + return unless @parent_status.should_fetch_replies? + + @parent_status.touch(:fetched_replies_at) + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" } + + uris_to_fetch, n_pages = get_replies(@parent_status.uri, MAX_PAGES, options) + return if uris_to_fetch.nil? + + fetched_uris = uris_to_fetch.clone.to_set + + until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES || n_pages >= MAX_PAGES + next_reply = uris_to_fetch.pop + next if next_reply.nil? + + new_reply_uris, new_n_pages = get_replies(next_reply, MAX_PAGES - n_pages, options) + next if new_reply_uris.nil? + + new_reply_uris = new_reply_uris.reject { |uri| fetched_uris.include?(uri) } + + uris_to_fetch.concat(new_reply_uris) + fetched_uris = fetched_uris.merge(new_reply_uris) + n_pages += new_n_pages + end + + Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" } + fetched_uris + end + + private + + def get_replies(status_uri, max_pages, options = {}) + replies_collection_or_uri = get_replies_uri(status_uri) + return if replies_collection_or_uri.nil? + + ActivityPub::FetchAllRepliesService.new.call(replies_collection_or_uri, status_uri, max_pages, **options.deep_symbolize_keys) + end + + def get_replies_uri(parent_status_uri) + begin + json_status = fetch_resource(parent_status_uri, true) + if json_status.nil? + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Could not get replies URI for #{parent_status_uri}, returned nil" } + nil + elsif !json_status.key?('replies') + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: No replies collection found in ActivityPub object: #{json_status}" } + nil + else + json_status['replies'] + end + rescue => e + Rails.logger.error { "FetchAllRepliesWorker - #{@parent_status.uri}: Caught exception while resolving replies URI #{parent_status_uri}: #{e} - #{e.message}" } + # Raise if we can't get the collection for top-level status to trigger retry + raise e if parent_status_uri == @parent_status.uri + + nil + end + end +end diff --git a/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb new file mode 100644 index 0000000000..229e43d978 --- /dev/null +++ b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1] + def change + add_column :statuses, :fetched_replies_at, :datetime, null: true + end +end diff --git a/db/schema.rb b/db/schema.rb index ce7e358f4f..66c63e53f5 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -1053,6 +1053,7 @@ ActiveRecord::Schema[8.0].define(version: 2025_03_05_074104) do t.datetime "edited_at", precision: nil t.boolean "trendable" t.bigint "ordered_media_attachment_ids", array: true + t.datetime "fetched_replies_at" t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)" t.index ["account_id"], name: "index_statuses_on_account_id" t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)" diff --git a/spec/models/concerns/status/fetch_replies_concern_spec.rb b/spec/models/concerns/status/fetch_replies_concern_spec.rb new file mode 100644 index 0000000000..f152cf234a --- /dev/null +++ b/spec/models/concerns/status/fetch_replies_concern_spec.rb @@ -0,0 +1,132 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Status::FetchRepliesConcern do + ActiveRecord.verbose_query_logs = true + + let!(:alice) { Fabricate(:account, username: 'alice') } + let!(:bob) { Fabricate(:account, username: 'bob', domain: 'other.com') } + + let!(:account) { alice } + let!(:status_old) { Fabricate(:status, account: account, fetched_replies_at: 1.year.ago, created_at: 1.year.ago) } + let!(:status_fetched_recently) { Fabricate(:status, account: account, fetched_replies_at: 1.second.ago, created_at: 1.year.ago) } + let!(:status_created_recently) { Fabricate(:status, account: account, created_at: 1.second.ago) } + let!(:status_never_fetched) { Fabricate(:status, account: account, created_at: 1.year.ago) } + + describe 'should_fetch_replies' do + let!(:statuses) { Status.should_fetch_replies.all } + + context 'with a local status' do + it 'never fetches local replies' do + expect(statuses).to eq([]) + end + end + + context 'with a remote status' do + let(:account) { bob } + + it 'fetches old statuses' do + expect(statuses).to include(status_old) + end + + it 'fetches statuses that have never been fetched and weren\'t created recently' do + expect(statuses).to include(status_never_fetched) + end + + it 'does not fetch statuses that were fetched recently' do + expect(statuses).to_not include(status_fetched_recently) + end + + it 'does not fetch statuses that were created recently' do + expect(statuses).to_not include(status_created_recently) + end + end + end + + describe 'should_not_fetch_replies' do + let!(:statuses) { Status.should_not_fetch_replies.all } + + context 'with a local status' do + it 'does not fetch local statuses' do + expect(statuses).to include(status_old, status_never_fetched, status_fetched_recently, status_never_fetched) + end + end + + context 'with a remote status' do + let(:account) { bob } + + it 'fetches old statuses' do + expect(statuses).to_not include(status_old) + end + + it 'fetches statuses that have never been fetched and weren\'t created recently' do + expect(statuses).to_not include(status_never_fetched) + end + + it 'does not fetch statuses that were fetched recently' do + expect(statuses).to include(status_fetched_recently) + end + + it 'does not fetch statuses that were created recently' do + expect(statuses).to include(status_created_recently) + end + end + end + + describe 'unsubscribed' do + let!(:spike) { Fabricate(:account, username: 'spike', domain: 'other.com') } + let!(:status) { Fabricate(:status, account: bob, updated_at: 1.day.ago) } + + context 'when the status is from an account with only remote followers after last update' do + before do + Fabricate(:follow, account: spike, target_account: bob) + end + + it 'shows the status as unsubscribed' do + expect(Status.unsubscribed).to eq([status]) + expect(status.unsubscribed?).to be(true) + end + end + + context 'when the status is from an account with only remote followers before last update' do + before do + Fabricate(:follow, account: spike, target_account: bob, created_at: 2.days.ago) + end + + it 'shows the status as unsubscribed' do + expect(Status.unsubscribed).to eq([status]) + expect(status.unsubscribed?).to be(true) + end + end + + context 'when status is from account with local followers after last update' do + before do + Fabricate(:follow, account: alice, target_account: bob) + end + + it 'shows the status as unsubscribed' do + expect(Status.unsubscribed).to eq([status]) + expect(status.unsubscribed?).to be(true) + end + end + + context 'when status is from account with local followers before last update' do + before do + Fabricate(:follow, account: alice, target_account: bob, created_at: 2.days.ago) + end + + it 'does not show the status as unsubscribed' do + expect(Status.unsubscribed).to eq([]) + expect(status.unsubscribed?).to be(false) + end + end + + context 'when the status has no followers' do + it 'shows the status as unsubscribed' do + expect(Status.unsubscribed).to eq([status]) + expect(status.unsubscribed?).to be(true) + end + end + end +end diff --git a/spec/services/activitypub/fetch_all_replies_service_spec.rb b/spec/services/activitypub/fetch_all_replies_service_spec.rb new file mode 100644 index 0000000000..eadd5b10fa --- /dev/null +++ b/spec/services/activitypub/fetch_all_replies_service_spec.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe ActivityPub::FetchAllRepliesService do + subject { described_class.new } + + let(:actor) { Fabricate(:account, domain: 'example.com', uri: 'http://example.com/account') } + let(:status) { Fabricate(:status, account: actor) } + let(:collection_uri) { 'http://example.com/replies/1' } + + let(:items) do + [ + 'http://example.com/self-reply-1', + 'http://example.com/self-reply-2', + 'http://example.com/self-reply-3', + 'http://other.com/other-reply-1', + 'http://other.com/other-reply-2', + 'http://other.com/other-reply-3', + 'http://example.com/self-reply-4', + 'http://example.com/self-reply-5', + 'http://example.com/self-reply-6', + ] + end + + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'Collection', + id: collection_uri, + items: items, + }.with_indifferent_access + end + + describe '#call' do + it 'fetches more than the default maximum and from multiple domains' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload, status.uri) + + expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-2 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 + http://example.com/self-reply-5 http://example.com/self-reply-6)) + end + + context 'with a recent status' do + before do + Fabricate(:status, uri: 'http://example.com/self-reply-2', fetched_replies_at: 1.second.ago, local: false) + end + + it 'skips statuses that have been updated recently' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload, status.uri) + + expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 http://example.com/self-reply-5 http://example.com/self-reply-6)) + end + end + + context 'with an old status' do + before do + Fabricate(:status, uri: 'http://other.com/other-reply-1', fetched_replies_at: 1.year.ago, created_at: 1.year.ago, account: actor) + end + + it 'updates the time that fetched statuses were last fetched' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload, status.uri) + + expect(Status.find_by(uri: 'http://other.com/other-reply-1').fetched_replies_at).to be >= 1.minute.ago + end + end + + context 'with unsubscribed replies' do + before do + remote_actor = Fabricate(:account, domain: 'other.com', uri: 'http://other.com/account') + # reply not in the collection from the remote instance, but we know about anyway without anyone following the account + Fabricate(:status, account: remote_actor, in_reply_to_id: status.id, uri: 'http://other.com/account/unsubscribed', fetched_replies_at: 1.year.ago, created_at: 1.year.ago) + end + + it 'updates the unsubscribed replies' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload, status.uri) + + expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-2 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 + http://example.com/self-reply-5 http://example.com/self-reply-6 http://other.com/account/unsubscribed)) + end + end + end +end diff --git a/spec/services/activitypub/fetch_remote_status_service_spec.rb b/spec/services/activitypub/fetch_remote_status_service_spec.rb index 9d8c6e0e0a..847affd307 100644 --- a/spec/services/activitypub/fetch_remote_status_service_spec.rb +++ b/spec/services/activitypub/fetch_remote_status_service_spec.rb @@ -9,6 +9,9 @@ RSpec.describe ActivityPub::FetchRemoteStatusService do let!(:sender) { Fabricate(:account, domain: 'foo.bar', uri: 'https://foo.bar') } + let(:follower) { Fabricate(:account, username: 'alice') } + let(:follow) { nil } + let(:response) { { body: Oj.dump(object), headers: { 'content-type': 'application/activity+json' } } } let(:existing_status) { nil } let(:note) do @@ -23,13 +26,14 @@ RSpec.describe ActivityPub::FetchRemoteStatusService do before do stub_request(:get, 'https://foo.bar/watch?v=12345').to_return(status: 404, body: '') - stub_request(:get, object[:id]).to_return(body: Oj.dump(object)) + stub_request(:get, object[:id]).to_return(**response) end describe '#call' do before do + follow existing_status - subject.call(object[:id], prefetched_body: Oj.dump(object)) + subject.call(object[:id]) end context 'with Note object' do @@ -254,6 +258,51 @@ RSpec.describe ActivityPub::FetchRemoteStatusService do expect(existing_status.text).to eq 'Lorem ipsum' expect(existing_status.edits).to_not be_empty end + + context 'when the status appears to have been deleted at source' do + let(:response) { { status: 404, body: '' } } + + shared_examples 'no delete' do + it 'does not delete the status' do + existing_status.reload + expect(existing_status.text).to eq 'Foo' + expect(existing_status.edits).to be_empty + end + end + + context 'when the status is orphaned/unsubscribed' do + it 'deletes the orphaned status' do + expect { existing_status.reload }.to raise_error(ActiveRecord::RecordNotFound) + end + end + + context 'when the status is from an account with only remote followers' do + let(:follower) { Fabricate(:account, username: 'alice', domain: 'foo.bar') } + let(:follow) { Fabricate(:follow, account: follower, target_account: sender, created_at: 2.days.ago) } + + it 'deletes the orphaned status' do + expect { existing_status.reload }.to raise_error(ActiveRecord::RecordNotFound) + end + + context 'when the status is private' do + let(:existing_status) { Fabricate(:status, account: sender, text: 'Foo', uri: note[:id], visibility: :private) } + + it_behaves_like 'no delete' + end + + context 'when the status is direct' do + let(:existing_status) { Fabricate(:status, account: sender, text: 'Foo', uri: note[:id], visibility: :direct) } + + it_behaves_like 'no delete' + end + end + + context 'when the status is from an account with local followers' do + let(:follow) { Fabricate(:follow, account: follower, target_account: sender, created_at: 2.days.ago) } + + it_behaves_like 'no delete' + end + end end context 'with a Create activity' do diff --git a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb new file mode 100644 index 0000000000..2b291e9624 --- /dev/null +++ b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb @@ -0,0 +1,280 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe ActivityPub::FetchAllRepliesWorker do + subject { described_class.new } + + let(:top_items) do + [ + 'http://example.com/self-reply-1', + 'http://other.com/other-reply-2', + 'http://example.com/self-reply-3', + ] + end + + let(:top_items_paged) do + [ + 'http://example.com/self-reply-4', + 'http://other.com/other-reply-5', + 'http://example.com/self-reply-6', + ] + end + + let(:nested_items) do + [ + 'http://example.com/nested-self-reply-1', + 'http://other.com/nested-other-reply-2', + 'http://example.com/nested-self-reply-3', + ] + end + + let(:nested_items_paged) do + [ + 'http://example.com/nested-self-reply-4', + 'http://other.com/nested-other-reply-5', + 'http://example.com/nested-self-reply-6', + ] + end + + let(:all_items) do + top_items + top_items_paged + nested_items + nested_items_paged + end + + let(:top_note_uri) do + 'http://example.com/top-post' + end + + let(:top_collection_uri) do + 'http://example.com/top-post/replies' + end + + # The reply uri that has the nested replies under it + let(:reply_note_uri) do + 'http://other.com/other-reply-2' + end + + # The collection uri of nested replies + let(:reply_collection_uri) do + 'http://other.com/other-reply-2/replies' + end + + let(:replies_top) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_collection_uri, + type: 'Collection', + items: top_items + top_items_paged, + } + end + + let(:replies_nested) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_collection_uri, + type: 'Collection', + items: nested_items + nested_items_paged, + } + end + + # The status resource for the top post + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: replies_top, + attributedTo: 'https://example.com', + } + end + + # The status resource that has the uri to the replies collection + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: replies_nested, + attributedTo: 'https://other.com', + } + end + + let(:empty_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://example.com/empty', + type: 'Note', + content: 'Lorem ipsum', + replies: [], + attributedTo: 'https://example.com', + } + end + + let(:account) { Fabricate(:account, domain: 'example.com') } + let(:status) do + Fabricate( + :status, + account: account, + uri: top_note_uri, + created_at: 1.day.ago - Status::FetchRepliesConcern::FETCH_REPLIES_INITIAL_WAIT_MINUTES + ) + end + + before do + allow(FetchReplyWorker).to receive(:push_bulk) + all_items.each do |item| + next if [top_note_uri, reply_note_uri].include? item + + stub_request(:get, item).to_return(status: 200, body: Oj.dump(empty_object), headers: { 'Content-Type': 'application/activity+json' }) + end + + stub_request(:get, top_note_uri).to_return(status: 200, body: Oj.dump(top_object), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_note_uri).to_return(status: 200, body: Oj.dump(reply_object), headers: { 'Content-Type': 'application/activity+json' }) + end + + shared_examples 'fetches all replies' do + it 'fetches statuses recursively' do + got_uris = subject.perform(status.id) + expect(got_uris).to match_array(all_items) + end + + it 'respects the maximum limits set by not recursing after the max is reached' do + stub_const('ActivityPub::FetchAllRepliesWorker::MAX_REPLIES', 5) + got_uris = subject.perform(status.id) + expect(got_uris).to match_array(top_items + top_items_paged) + end + end + + describe 'perform' do + context 'when the payload is a Note with replies as a Collection of inlined replies' do + it_behaves_like 'fetches all replies' + end + + context 'when the payload is a Note with replies as a URI to a Collection' do + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: top_collection_uri, + attributedTo: 'https://example.com', + } + end + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: reply_collection_uri, + attributedTo: 'https://other.com', + } + end + + before do + stub_request(:get, top_collection_uri).to_return(status: 200, body: Oj.dump(replies_top), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_collection_uri).to_return(status: 200, body: Oj.dump(replies_nested), headers: { 'Content-Type': 'application/activity+json' }) + end + + it_behaves_like 'fetches all replies' + end + + context 'when the payload is a Note with replies as a paginated collection' do + let(:top_page_2_uri) do + "#{top_collection_uri}/2" + end + + let(:reply_page_2_uri) do + "#{reply_collection_uri}/2" + end + + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: { + type: 'Collection', + id: top_collection_uri, + first: { + type: 'CollectionPage', + partOf: top_collection_uri, + items: top_items, + next: top_page_2_uri, + }, + }, + attributedTo: 'https://example.com', + } + end + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: { + type: 'Collection', + id: reply_collection_uri, + first: { + type: 'CollectionPage', + partOf: reply_collection_uri, + items: nested_items, + next: reply_page_2_uri, + }, + }, + attributedTo: 'https://other.com', + } + end + + let(:top_page_two) do + { + type: 'CollectionPage', + id: top_page_2_uri, + partOf: top_collection_uri, + items: top_items_paged, + } + end + + let(:reply_page_two) do + { + type: 'CollectionPage', + id: reply_page_2_uri, + partOf: reply_collection_uri, + items: nested_items_paged, + } + end + + before do + stub_request(:get, top_page_2_uri).to_return(status: 200, body: Oj.dump(top_page_two), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_page_2_uri).to_return(status: 200, body: Oj.dump(reply_page_two), headers: { 'Content-Type': 'application/activity+json' }) + end + + it_behaves_like 'fetches all replies' + + it 'limits by max pages' do + stub_const('ActivityPub::FetchAllRepliesWorker::MAX_PAGES', 3) + got_uris = subject.perform(status.id) + expect(got_uris).to match_array(top_items + top_items_paged + nested_items) + end + end + + context 'when replies should not be fetched' do + # ensure that we should not fetch by setting the status to be created in the debounce window + let(:status) { Fabricate(:status, account: account, uri: top_note_uri, created_at: DateTime.now) } + + before do + stub_const('Status::FetchRepliesConcern::FETCH_REPLIES_INITIAL_WAIT_MINUTES', 1.week) + end + + it 'returns nil without fetching' do + got_uris = subject.perform(status.id) + expect(got_uris).to be_nil + assert_not_requested :get, top_note_uri + end + end + end +end