From b9c2bb38dbaf3f7af81678b3b127ceadff1b4ff3 Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Wed, 14 Dec 2022 17:18:13 +0200 Subject: [PATCH] refactor: Rewrite backfill algorithm according to specification in more readable form --- src/api/server_server.rs | 204 +++++++++++++++++++++++++++------------ 1 file changed, 144 insertions(+), 60 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 58f4d82a..2e3cbded 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -48,7 +48,7 @@ use ruma::{ }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, HashSet, VecDeque}, fmt::Debug, mem, net::{IpAddr, SocketAddr}, @@ -1045,10 +1045,10 @@ pub async fn get_missing_events_route( Ok(get_missing_events::v1::Response { events }) } -// Recursively fetch events starting from `latest_events`, going backwards -// through each event's `prev_events` until reaching the `earliest_events`. -// -// Used by the federation /backfill and /get_missing_events routes. +/// Fetch events starting from `latest_events`, going backwards +/// through each event's `prev_events` until reaching the `earliest_events`. +/// +/// Used by the federation /backfill and /get_missing_events routes. fn get_missing_events( sender_servername: &ServerName, room_id: &RoomId, @@ -1073,72 +1073,156 @@ fn get_missing_events( .filter(|member| member.server_name() == sender_servername) .collect(); - let limit = u64::from(limit) as usize; + let event_filter = |event_id: &EventId| { + services() + .rooms + .state_accessor + .server_can_see_event( + sender_servername, + current_server_members.as_slice(), + event_id, + ) + .unwrap_or_default() + }; - let mut queued_events = latest_events.to_owned(); - let mut events = Vec::new(); + let pdu_filter = |pdu: &CanonicalJsonObject| { + let event_room_id = pdu + .get("room_id") + .and_then(|val| val.as_str()) + .and_then(|room_id_str| <&RoomId>::try_from(room_id_str).ok()); - let mut stop_at_events = HashSet::with_capacity(limit); - stop_at_events.extend(earliest_events.iter().cloned()); + match event_room_id { + Some(event_room_id) => { + let valid_event = event_room_id != room_id; + if !valid_event { + error!(?room_id, ?event_room_id, "An evil event detected"); + } + valid_event + } + None => { + error!(?pdu, "Can't extract valid `room_id` from pdu"); + false + } + } + }; - let mut i = 0; - while i < queued_events.len() && events.len() < limit { - if stop_at_events.contains(&queued_events[i]) { - i += 1; + #[inline] + fn get_pdu(event: &EventId) -> Option { + services() + .rooms + .timeline + .get_pdu_json(event) + .unwrap_or_default() + } + + let events = linearize_previous_events( + latest_events.into_iter().cloned(), + earliest_events.into_iter().cloned(), + limit, + get_pdu, + event_filter, + pdu_filter, + ); + + Ok(events) +} + +/// Unwinds previous events by doing a breadth-first walk from given roots +/// +/// # Arguments +/// +/// * `roots`: Starting point to unwind event history +/// * `excluded`: Skipped events +/// * `limit`: How many events to extract +/// * `pdu_extractor`: Closure to extract PDU for given event_id, for example, from DB. +/// * `event_filter`: Closure to filter event by it's visiblity. It may or may not hit DB. +/// * `pdu_filter`: Closure to get basic validation against malformed PDUs. +/// +/// # Returns +/// +/// The previous events for given roots, without any `excluded` events, up to the provided `limit`. +/// +/// # Note +/// +/// In matrix specification, «Server-Server API», paragraph 8 there is no mention of previous events for excluded events. +/// Therefore, algorithm below excludes **only** events itself, but allows to process their history. +fn linearize_previous_events( + roots: E, + excluded: E, + limit: L, + pdu_extractor: P, + event_filter: F, + pdu_filter: V, +) -> Vec> +where + E: IntoIterator, + F: Fn(&EventId) -> bool, + L: Into, + V: Fn(&CanonicalJsonObject) -> bool, + P: Fn(&EventId) -> Option, +{ + let limit = limit.into() as usize; + assert!(limit > 0, "Limit should be > 0"); + + #[inline] + fn get_previous_events(pdu: &CanonicalJsonObject) -> Option> { + match pdu.get("prev_events") { + None => { + error!(?pdu, "A stored event has no 'prev_events' field"); + return None; + } + Some(prev_events) => { + let val = prev_events.clone().into(); + let events = serde_json::from_value::>(val); + if let Err(error) = events { + error!(?prev_events, ?error, "Broken 'prev_events' field"); + return None; + } + Some(events.unwrap_or_default()) + } + } + } + + let mut visited: HashSet = Default::default(); + let mut history: Vec> = Default::default(); + let mut queue: VecDeque = Default::default(); + let excluded: HashSet<_> = excluded.into_iter().collect(); + + // Add all roots into processing queue + for root in roots { + queue.push_back(root); + } + + while let Some(current_event) = queue.pop_front() { + // Return all collected events if reached limit + if history.len() >= limit { + return history; + } + + // Skip an entire branch containing incorrect events + if !event_filter(¤t_event) { continue; } - if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? { - let room_id_str = pdu - .get("room_id") - .and_then(|val| val.as_str()) - .ok_or_else(|| Error::bad_database("Invalid event in database"))?; - - let event_room_id = <&RoomId>::try_from(room_id_str) - .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - - if event_room_id != room_id { - warn!( - ?room_id, - evil_event = ?queued_events[i], - "Evil event detected while searching in room" - ); - return Err(Error::BadRequest( - ErrorKind::InvalidParam, - "Evil event detected", - )); + // Process PDU from a current event if it exists and valid + if let Some(pdu) = pdu_extractor(¤t_event).filter(&pdu_filter) { + if !&excluded.contains(¤t_event) { + history.push(PduEvent::convert_to_outgoing_federation_event(pdu.clone())); } - let event_is_visible = services().rooms.state_accessor.server_can_see_event( - sender_servername, - current_server_members.as_slice(), - &queued_events[i], - )?; - - if !event_is_visible { - i += 1; - continue; + // Fetch previous events, if they exists + if let Some(previous_events) = get_previous_events(&pdu) { + for previous_event in previous_events { + if !visited.contains(&previous_event) { + visited.insert(previous_event.clone()); + queue.push_back(previous_event); + } + } } - - // Don't send this event again if it comes through some other - // event's prev_events. - stop_at_events.insert(queued_events[i].clone()); - - let prev_events = pdu - .get("prev_events") - .ok_or_else(|| Error::bad_database("Event in db has no prev_events field."))?; - - queued_events.extend_from_slice( - &serde_json::from_value::>(prev_events.clone().into()).map_err( - |_| Error::bad_database("Invalid prev_events content in pdu in db."), - )?, - ); - events.push(PduEvent::convert_to_outgoing_federation_event(pdu)); } - i += 1; } - - Ok(events) + // All done, return collected events + history } /// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`