From ed64b528c112fea733e2ec13e2ec9f1afea9fc5b Mon Sep 17 00:00:00 2001 From: Andrei Vasiliu Date: Sat, 15 Jan 2022 19:13:17 +0200 Subject: [PATCH 01/16] feat: Add federation backfill and event visibility Co-authored-by: Nyaaori <+@nyaaori.cat> --- src/api/server_server.rs | 106 ++++++++++++++++-- .../key_value/rooms/state_accessor.rs | 37 +++++- src/main.rs | 1 + src/service/rooms/state_accessor/data.rs | 5 +- src/service/rooms/state_accessor/mod.rs | 12 +- 5 files changed, 150 insertions(+), 11 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index fc3e2c0f..65be5a6f 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -12,6 +12,7 @@ use ruma::{ client::error::{Error as RumaError, ErrorKind}, federation::{ authorization::get_event_authorization, + backfill::get_backfill, device::get_devices::{self, v1::UserDevice}, directory::{get_public_rooms, get_public_rooms_filtered}, discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey}, @@ -43,11 +44,11 @@ use ruma::{ serde::{Base64, JsonObject, Raw}, to_device::DeviceIdOrAllDevices, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, - OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, + OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashSet}, fmt::Debug, mem, net::{IpAddr, SocketAddr}, @@ -950,6 +951,53 @@ pub async fn get_event_route( }) } +/// # `GET /_matrix/federation/v1/backfill/` +/// +/// Retrieves events from before the sender joined the room, if the room's +/// history visibility allows. +pub async fn get_backfill_route( + body: Ruma, +) -> Result { + if !services().globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + + let sender_servername = body + .sender_servername + .as_ref() + .expect("server is authenticated"); + + info!("Got backfill request from: {}", sender_servername); + + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, &body.room_id)? + { + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Server is not in room.", + )); + } + + let origin = services().globals.server_name().to_owned(); + let earliest_events = &[]; + + let events = get_missing_events( + sender_servername, + &body.room_id, + earliest_events, + &body.v, + body.limit, + )?; + + Ok(get_backfill::v1::Response { + origin, + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + pdus: events, + }) +} + /// # `POST /_matrix/federation/v1/get_missing_events/{roomId}` /// /// Retrieves events that the sender is missing. @@ -981,11 +1029,43 @@ pub async fn get_missing_events_route( .event_handler .acl_check(sender_servername, &body.room_id)?; - let mut queued_events = body.latest_events.clone(); + let events = get_missing_events( + sender_servername, + &body.room_id, + &body.earliest_events, + &body.latest_events, + body.limit, + )?; + + Ok(get_missing_events::v1::Response { events }) +} + +// Recursively fetch events starting from `latest_events`, going backwards +// through each event's `prev_events` until reaching the `earliest_events`. +// +// Used by the federation /backfill and /get_missing_events routes. +fn get_missing_events( + sender_servername: &ServerName, + room_id: &RoomId, + earliest_events: &[OwnedEventId], + latest_events: &Vec, + limit: UInt, +) -> Result>> { + let limit = u64::from(limit) as usize; + + let mut queued_events = latest_events.clone(); let mut events = Vec::new(); + let mut stop_at_events = HashSet::with_capacity(limit); + stop_at_events.extend(earliest_events.iter().cloned()); + let mut i = 0; - while i < queued_events.len() && events.len() < u64::from(body.limit) as usize { + while i < queued_events.len() && events.len() < limit { + if stop_at_events.contains(&queued_events[i]) { + i += 1; + continue; + } + if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? { let room_id_str = pdu .get("room_id") @@ -995,10 +1075,10 @@ pub async fn get_missing_events_route( let event_room_id = <&RoomId>::try_from(room_id_str) .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - if event_room_id != body.room_id { + if event_room_id != room_id { warn!( "Evil event detected: Event {} found while searching in room {}", - queued_events[i], body.room_id + queued_events[i], room_id ); return Err(Error::BadRequest( ErrorKind::InvalidParam, @@ -1006,10 +1086,20 @@ pub async fn get_missing_events_route( )); } - if body.earliest_events.contains(&queued_events[i]) { + let event_is_visible = services() + .rooms + .state_accessor + .server_can_see_event(sender_servername, &queued_events[i])?; + + if !event_is_visible { i += 1; continue; } + + // Don't send this event again if it comes through some other + // event's prev_events. + stop_at_events.insert(queued_events[i].clone()); + queued_events.extend_from_slice( &serde_json::from_value::>( serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| { @@ -1024,7 +1114,7 @@ pub async fn get_missing_events_route( i += 1; } - Ok(get_missing_events::v1::Response { events }) + Ok(events) } /// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}` diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 0f0c0dc7..1618c8e0 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -2,7 +2,13 @@ use std::{collections::HashMap, sync::Arc}; use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; use async_trait::async_trait; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{ + events::{ + room::history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, + StateEventType, + }, + EventId, RoomId, ServerName, +}; #[async_trait] impl service::rooms::state_accessor::Data for KeyValueDatabase { @@ -138,6 +144,35 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { }) } + /// Whether a server is allowed to see an event through federation, based on + /// the room's history_visibility at that event's state. + /// + /// Note: Joined/Invited history visibility not yet implemented. + #[tracing::instrument(skip(self))] + fn server_can_see_event(&self, _server_name: &ServerName, event_id: &EventId) -> Result { + let shortstatehash = match self.pdu_shortstatehash(event_id) { + Ok(Some(shortstatehash)) => shortstatehash, + _ => return Ok(false), + }; + + let history_visibility = self + .state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")? + .map(|event| serde_json::from_str(event.content.get())) + .transpose() + .map_err(|_| Error::bad_database("Invalid room history visibility event in database."))? + .map(|content: RoomHistoryVisibilityEventContent| content.history_visibility); + + Ok(match history_visibility { + Some(HistoryVisibility::WorldReadable) => true, + Some(HistoryVisibility::Shared) => true, + // TODO: Check if any of the server's users were invited + // at this point in time. + Some(HistoryVisibility::Joined) => false, + Some(HistoryVisibility::Invited) => false, + _ => false, + }) + } + /// Returns the full room state. async fn room_state_full( &self, diff --git a/src/main.rs b/src/main.rs index da80507c..9eb879d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -391,6 +391,7 @@ fn routes() -> Router { .ruma_route(server_server::send_transaction_message_route) .ruma_route(server_server::get_event_route) .ruma_route(server_server::get_missing_events_route) + .ruma_route(server_server::get_backfill_route) .ruma_route(server_server::get_event_authorization_route) .ruma_route(server_server::get_room_state_route) .ruma_route(server_server::get_room_state_ids_route) diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index f3ae3c21..597955f6 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{events::StateEventType, EventId, RoomId, ServerName}; use crate::{PduEvent, Result}; @@ -35,6 +35,9 @@ pub trait Data: Send + Sync { /// Returns the state hash for this pdu. fn pdu_shortstatehash(&self, event_id: &EventId) -> Result>; + /// Returns true if a server has permission to see an event + fn server_can_see_event(&self, sever_name: &ServerName, event_id: &EventId) -> Result; + /// Returns the full room state. async fn room_state_full( &self, diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 87d99368..bc286568 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -2,7 +2,7 @@ mod data; use std::{collections::HashMap, sync::Arc}; pub use data::Data; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{events::StateEventType, EventId, RoomId, ServerName}; use crate::{PduEvent, Result}; @@ -51,6 +51,16 @@ impl Service { self.db.pdu_shortstatehash(event_id) } + /// Returns true if a server has permission to see an event + #[tracing::instrument(skip(self))] + pub fn server_can_see_event<'a>( + &'a self, + sever_name: &ServerName, + event_id: &EventId, + ) -> Result { + self.db.server_can_see_event(sever_name, event_id) + } + /// Returns the full room state. #[tracing::instrument(skip(self))] pub async fn room_state_full( -- 2.45.2 From 711e03b79902e2b16a5f26c4dbea474fc84aa9f1 Mon Sep 17 00:00:00 2001 From: Andrei Vasiliu Date: Sat, 22 Jan 2022 18:26:28 +0200 Subject: [PATCH 02/16] feat: Implement backfill joined/invited checks for private history Co-authored-by: Nyaaori <+@nyaaori.cat> --- src/api/server_server.rs | 13 +- .../key_value/rooms/state_accessor.rs | 81 ++++++++----- src/service/mod.rs | 7 +- src/service/rooms/state_accessor/data.rs | 17 ++- src/service/rooms/state_accessor/mod.rs | 112 ++++++++++++++++-- 5 files changed, 181 insertions(+), 49 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 65be5a6f..66174626 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1048,12 +1048,12 @@ fn get_missing_events( sender_servername: &ServerName, room_id: &RoomId, earliest_events: &[OwnedEventId], - latest_events: &Vec, + latest_events: &[OwnedEventId], limit: UInt, ) -> Result>> { let limit = u64::from(limit) as usize; - let mut queued_events = latest_events.clone(); + let mut queued_events = latest_events.to_owned(); let mut events = Vec::new(); let mut stop_at_events = HashSet::with_capacity(limit); @@ -1086,10 +1086,11 @@ fn get_missing_events( )); } - let event_is_visible = services() - .rooms - .state_accessor - .server_can_see_event(sender_servername, &queued_events[i])?; + let event_is_visible = services().rooms.state_accessor.server_can_see_event( + sender_servername, + room_id, + &queued_events[i], + )?; if !event_is_visible { i += 1; diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 1618c8e0..cfc0444d 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -3,11 +3,8 @@ use std::{collections::HashMap, sync::Arc}; use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; use async_trait::async_trait; use ruma::{ - events::{ - room::history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, - StateEventType, - }, - EventId, RoomId, ServerName, + events::{room::member::MembershipState, StateEventType}, + EventId, RoomId, UserId, }; #[async_trait] @@ -126,6 +123,21 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { }) } + fn state_get_content( + &self, + shortstatehash: u64, + event_type: &StateEventType, + state_key: &str, + ) -> Result> { + let content = self + .state_get(shortstatehash, event_type, state_key)? + .map(|event| serde_json::from_str(event.content.get())) + .transpose() + .map_err(|_| Error::bad_database("Invalid event in database"))?; + + Ok(content) + } + /// Returns the state hash for this pdu. fn pdu_shortstatehash(&self, event_id: &EventId) -> Result> { self.eventid_shorteventid @@ -144,33 +156,40 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { }) } - /// Whether a server is allowed to see an event through federation, based on - /// the room's history_visibility at that event's state. - /// - /// Note: Joined/Invited history visibility not yet implemented. - #[tracing::instrument(skip(self))] - fn server_can_see_event(&self, _server_name: &ServerName, event_id: &EventId) -> Result { - let shortstatehash = match self.pdu_shortstatehash(event_id) { - Ok(Some(shortstatehash)) => shortstatehash, - _ => return Ok(false), - }; + /// The user was a joined member at this state (potentially in the past) + fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> Result { + Ok(self + .state_get_content( + shortstatehash, + &StateEventType::RoomMember, + user_id.as_str(), + )? + .map(|content| match content.get("membership") { + Some(membership) => MembershipState::from(membership.as_str().unwrap_or("")), + None => MembershipState::Leave, + } == MembershipState::Join) + .unwrap_or(false)) + } - let history_visibility = self - .state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")? - .map(|event| serde_json::from_str(event.content.get())) - .transpose() - .map_err(|_| Error::bad_database("Invalid room history visibility event in database."))? - .map(|content: RoomHistoryVisibilityEventContent| content.history_visibility); - - Ok(match history_visibility { - Some(HistoryVisibility::WorldReadable) => true, - Some(HistoryVisibility::Shared) => true, - // TODO: Check if any of the server's users were invited - // at this point in time. - Some(HistoryVisibility::Joined) => false, - Some(HistoryVisibility::Invited) => false, - _ => false, - }) + /// The user was an invited or joined room member at this state (potentially + /// in the past) + fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> Result { + Ok(self + .state_get_content( + shortstatehash, + &StateEventType::RoomMember, + user_id.as_str(), + )? + .map(|content| { + let membership = match content.get("membership") { + Some(membership) => MembershipState::from(membership.as_str().unwrap_or("")), + None => MembershipState::Leave, + }; + let joined = membership == MembershipState::Join; + let invited = membership == MembershipState::Invite; + invited || joined + }) + .unwrap_or(false)) } /// Returns the full room state. diff --git a/src/service/mod.rs b/src/service/mod.rs index 385dcc69..07d80a15 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -77,7 +77,12 @@ impl Services { search: rooms::search::Service { db }, short: rooms::short::Service { db }, state: rooms::state::Service { db }, - state_accessor: rooms::state_accessor::Service { db }, + state_accessor: rooms::state_accessor::Service { + db, + server_visibility_cache: Mutex::new(LruCache::new( + (100.0 * config.conduit_cache_capacity_modifier) as usize, + )), + }, state_cache: rooms::state_cache::Service { db }, state_compressor: rooms::state_compressor::Service { db, diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 597955f6..2770a24b 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use ruma::{events::StateEventType, EventId, RoomId, ServerName}; +use ruma::{events::StateEventType, EventId, RoomId, UserId}; use crate::{PduEvent, Result}; @@ -32,11 +32,22 @@ pub trait Data: Send + Sync { state_key: &str, ) -> Result>>; + fn state_get_content( + &self, + shortstatehash: u64, + event_type: &StateEventType, + state_key: &str, + ) -> Result>; + /// Returns the state hash for this pdu. fn pdu_shortstatehash(&self, event_id: &EventId) -> Result>; - /// Returns true if a server has permission to see an event - fn server_can_see_event(&self, sever_name: &ServerName, event_id: &EventId) -> Result; + /// The user was a joined member at this state (potentially in the past) + fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> Result; + + /// The user was an invited or joined room member at this state (potentially + /// in the past) + fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> Result; /// Returns the full room state. async fn room_state_full( diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index bc286568..efe174e9 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -1,13 +1,21 @@ mod data; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; pub use data::Data; -use ruma::{events::StateEventType, EventId, RoomId, ServerName}; +use lru_cache::LruCache; +use ruma::{ + events::{room::history_visibility::HistoryVisibility, StateEventType}, + EventId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, +}; -use crate::{PduEvent, Result}; +use crate::{services, PduEvent, Result}; pub struct Service { pub db: &'static dyn Data, + pub server_visibility_cache: Mutex>, } impl Service { @@ -46,19 +54,107 @@ impl Service { self.db.state_get(shortstatehash, event_type, state_key) } + pub fn state_get_content( + &self, + shortstatehash: u64, + event_type: &StateEventType, + state_key: &str, + ) -> Result> { + self.db + .state_get_content(shortstatehash, event_type, state_key) + } + /// Returns the state hash for this pdu. pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result> { self.db.pdu_shortstatehash(event_id) } - /// Returns true if a server has permission to see an event + /// Whether a server is allowed to see an event through federation, based on + /// the room's history_visibility at that event's state. #[tracing::instrument(skip(self))] - pub fn server_can_see_event<'a>( - &'a self, - sever_name: &ServerName, + pub fn server_can_see_event( + &self, + server_name: &ServerName, + room_id: &RoomId, event_id: &EventId, ) -> Result { - self.db.server_can_see_event(sever_name, event_id) + let shortstatehash = match self.pdu_shortstatehash(event_id) { + Ok(Some(shortstatehash)) => shortstatehash, + _ => return Ok(false), + }; + + if let Some(visibility) = self + .server_visibility_cache + .lock() + .unwrap() + .get_mut(&(server_name.to_owned(), shortstatehash)) + { + return Ok(*visibility); + } + + let current_server_members: Vec = services() + .rooms + .state_cache + .room_members(room_id) + .filter(|member| { + member + .as_ref() + .map(|member| member.server_name() == server_name) + .unwrap_or(true) + }) + .collect::>()?; + + let history_visibility = self + .state_get_content(shortstatehash, &StateEventType::RoomHistoryVisibility, "")? + .map(|content| match content.get("history_visibility") { + Some(visibility) => HistoryVisibility::from(visibility.as_str().unwrap_or("")), + None => HistoryVisibility::Invited, + }); + + let visibility = match history_visibility { + Some(HistoryVisibility::Joined) => { + // Look at all members in the room from this server; one of them + // triggered a backfill. Was one of them a member in the past, + // at this event? + let mut visible = false; + for member in current_server_members { + if self.user_was_joined(shortstatehash, &member)? { + visible = true; + break; + } + } + visible + } + Some(HistoryVisibility::Invited) => { + let mut visible = false; + for member in current_server_members { + if self.user_was_invited(shortstatehash, &member)? { + visible = true; + break; + } + } + visible + } + _ => false, + }; + + self.server_visibility_cache + .lock() + .unwrap() + .insert((server_name.to_owned(), shortstatehash), visibility); + + Ok(visibility) + } + + /// The user was a joined member at this state (potentially in the past) + pub fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> Result { + self.db.user_was_joined(shortstatehash, user_id) + } + + /// The user was an invited or joined room member at this state (potentially + /// in the past) + pub fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> Result { + self.db.user_was_invited(shortstatehash, user_id) } /// Returns the full room state. -- 2.45.2 From c4bd0a9f2c23d6b47810d6d1fb43561fa12e9335 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Sat, 3 Sep 2022 14:16:32 +0200 Subject: [PATCH 03/16] fix: Add backfill ACL check --- src/api/server_server.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 66174626..0923263d 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -980,6 +980,11 @@ pub async fn get_backfill_route( )); } + services() + .rooms + .event_handler + .acl_check(sender_servername, &body.room_id)?; + let origin = services().globals.server_name().to_owned(); let earliest_events = &[]; -- 2.45.2 From 5d7f4602b26c986d82186f4c4715699d6356f00c Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Sat, 26 Nov 2022 13:03:19 +0100 Subject: [PATCH 04/16] fix: Proper S2S Backfill visibility handling --- src/service/rooms/state_accessor/mod.rs | 34 ++++++++++++++----------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index efe174e9..0da52779 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -112,10 +112,25 @@ impl Service { }); let visibility = match history_visibility { - Some(HistoryVisibility::Joined) => { - // Look at all members in the room from this server; one of them - // triggered a backfill. Was one of them a member in the past, - // at this event? + Some(HistoryVisibility::WorldReadable) => { + // Allow if event was sent while world readable + true + } + Some(HistoryVisibility::Invited) => { + let mut visible = false; + // Allow if any member on requesting server was invited or joined, else deny + for member in current_server_members { + if self.user_was_invited(shortstatehash, &member)? + || self.user_was_joined(shortstatehash, &member)? + { + visible = true; + break; + } + } + visible + } + _ => { + // Allow if any member on requested server was joined, else deny let mut visible = false; for member in current_server_members { if self.user_was_joined(shortstatehash, &member)? { @@ -125,17 +140,6 @@ impl Service { } visible } - Some(HistoryVisibility::Invited) => { - let mut visible = false; - for member in current_server_members { - if self.user_was_invited(shortstatehash, &member)? { - visible = true; - break; - } - } - visible - } - _ => false, }; self.server_visibility_cache -- 2.45.2 From 898e4d24d99996d70a03013440ae375726ae390d Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Sat, 26 Nov 2022 13:33:32 +0100 Subject: [PATCH 05/16] fix: Default to Shared history visibility for s2s permissions, per spec --- src/service/rooms/state_accessor/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 0da52779..4bfbf3fd 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -108,7 +108,7 @@ impl Service { .state_get_content(shortstatehash, &StateEventType::RoomHistoryVisibility, "")? .map(|content| match content.get("history_visibility") { Some(visibility) => HistoryVisibility::from(visibility.as_str().unwrap_or("")), - None => HistoryVisibility::Invited, + None => HistoryVisibility::Shared, }); let visibility = match history_visibility { -- 2.45.2 From f814e8e5cc80f99b957da7944f68932312e75171 Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Mon, 28 Nov 2022 01:42:24 +0200 Subject: [PATCH 06/16] refactor: Replace specialized interface for user's membership with more generic one in state accessor --- .../key_value/rooms/state_accessor.rs | 52 ++++++------------- src/service/rooms/state_accessor/data.rs | 9 ++-- src/service/rooms/state_accessor/mod.rs | 18 +++++-- 3 files changed, 33 insertions(+), 46 deletions(-) diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index cfc0444d..fe139315 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -6,6 +6,7 @@ use ruma::{ events::{room::member::MembershipState, StateEventType}, EventId, RoomId, UserId, }; +use serde_json::Value; #[async_trait] impl service::rooms::state_accessor::Data for KeyValueDatabase { @@ -128,7 +129,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { shortstatehash: u64, event_type: &StateEventType, state_key: &str, - ) -> Result> { + ) -> Result> { let content = self .state_get(shortstatehash, event_type, state_key)? .map(|event| serde_json::from_str(event.content.get())) @@ -156,40 +157,21 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { }) } - /// The user was a joined member at this state (potentially in the past) - fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> Result { - Ok(self - .state_get_content( - shortstatehash, - &StateEventType::RoomMember, - user_id.as_str(), - )? - .map(|content| match content.get("membership") { - Some(membership) => MembershipState::from(membership.as_str().unwrap_or("")), - None => MembershipState::Leave, - } == MembershipState::Join) - .unwrap_or(false)) - } - - /// The user was an invited or joined room member at this state (potentially - /// in the past) - fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> Result { - Ok(self - .state_get_content( - shortstatehash, - &StateEventType::RoomMember, - user_id.as_str(), - )? - .map(|content| { - let membership = match content.get("membership") { - Some(membership) => MembershipState::from(membership.as_str().unwrap_or("")), - None => MembershipState::Leave, - }; - let joined = membership == MembershipState::Join; - let invited = membership == MembershipState::Invite; - invited || joined - }) - .unwrap_or(false)) + /// Get membership for given user in state + fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> Result { + self.state_get_content( + shortstatehash, + &StateEventType::RoomMember, + user_id.as_str(), + )? + .map(|content| match content.get("membership") { + Some(Value::String(membership)) => Ok(MembershipState::from(membership.as_str())), + None => Ok(MembershipState::Leave), + _ => Err(Error::bad_database( + "Malformed membership, expected Value::String", + )), + }) + .unwrap_or(Ok(MembershipState::Leave)) } /// Returns the full room state. diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 2770a24b..1ac3e314 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; +use ruma::events::room::member::MembershipState; use ruma::{events::StateEventType, EventId, RoomId, UserId}; use crate::{PduEvent, Result}; @@ -42,12 +43,8 @@ pub trait Data: Send + Sync { /// Returns the state hash for this pdu. fn pdu_shortstatehash(&self, event_id: &EventId) -> Result>; - /// The user was a joined member at this state (potentially in the past) - fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> Result; - - /// The user was an invited or joined room member at this state (potentially - /// in the past) - fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> Result; + /// Get membership for given user in state + fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> Result; /// Returns the full room state. async fn room_state_full( diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 4bfbf3fd..9b119c5c 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -7,7 +7,9 @@ use std::{ pub use data::Data; use lru_cache::LruCache; use ruma::{ - events::{room::history_visibility::HistoryVisibility, StateEventType}, + events::{ + room::history_visibility::HistoryVisibility, room::member::MembershipState, StateEventType, + }, EventId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, }; @@ -151,14 +153,20 @@ impl Service { } /// The user was a joined member at this state (potentially in the past) - pub fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> Result { - self.db.user_was_joined(shortstatehash, user_id) + pub fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> bool { + self.db + .user_membership(shortstatehash, user_id) + .map(|s| s == MembershipState::Join) + .unwrap_or_default() // Return sensible default, i.e. false } /// The user was an invited or joined room member at this state (potentially /// in the past) - pub fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> Result { - self.db.user_was_invited(shortstatehash, user_id) + pub fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> bool { + self.db + .user_membership(shortstatehash, user_id) + .map(|s| s == MembershipState::Join || s == MembershipState::Invite) + .unwrap_or_default() // Return sensible default, i.e. false } /// Returns the full room state. -- 2.45.2 From e3dcb668cfb53db4e59612c4e55228e7705a3827 Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Mon, 28 Nov 2022 01:43:17 +0200 Subject: [PATCH 07/16] refactor: Replace re-serialization with plain coercion --- src/api/server_server.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 0923263d..21088f08 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1106,14 +1106,14 @@ fn get_missing_events( // event's prev_events. stop_at_events.insert(queued_events[i].clone()); + let prev_events = pdu + .get("prev_events") + .ok_or_else(|| Error::bad_database("Event in db has no prev_events field."))?; + queued_events.extend_from_slice( - &serde_json::from_value::>( - serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| { - Error::bad_database("Event in db has no prev_events field.") - })?) - .expect("canonical json is valid json value"), - ) - .map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?, + &serde_json::from_value::>(prev_events.clone().into()).map_err( + |_| Error::bad_database("Invalid prev_events content in pdu in db."), + )?, ); events.push(PduEvent::convert_to_outgoing_federation_event(pdu)); } -- 2.45.2 From 5987452cfcdd808e171142d51bac519d81ae4f42 Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Mon, 28 Nov 2022 01:44:05 +0200 Subject: [PATCH 08/16] refactor: Replace imperative style with short-circuit .any() --- src/service/rooms/state_accessor/mod.rs | 26 +++++++------------------ 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 9b119c5c..82b1c641 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -119,28 +119,16 @@ impl Service { true } Some(HistoryVisibility::Invited) => { - let mut visible = false; - // Allow if any member on requesting server was invited or joined, else deny - for member in current_server_members { - if self.user_was_invited(shortstatehash, &member)? - || self.user_was_joined(shortstatehash, &member)? - { - visible = true; - break; - } - } - visible + // Allow if any member on requesting server was AT LEAST invited, else deny + current_server_members + .into_iter() + .any(|member| self.user_was_invited(shortstatehash, &member)) } _ => { // Allow if any member on requested server was joined, else deny - let mut visible = false; - for member in current_server_members { - if self.user_was_joined(shortstatehash, &member)? { - visible = true; - break; - } - } - visible + current_server_members + .into_iter() + .any(|member| self.user_was_joined(shortstatehash, &member)) } }; -- 2.45.2 From e71ad56e686eb2ae83359b5a99cf59c7be61ba85 Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Mon, 28 Nov 2022 01:58:25 +0200 Subject: [PATCH 09/16] refactor: Do not extract members for room each time --- src/api/server_server.rs | 24 +++++++++++++++++++++--- src/service/rooms/state_accessor/mod.rs | 17 +++-------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 21088f08..9c73e286 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1082,8 +1082,9 @@ fn get_missing_events( if event_room_id != room_id { warn!( - "Evil event detected: Event {} found while searching in room {}", - queued_events[i], room_id + ?room_id, + evil_event = ?queued_events[i], + "Evil event detected while searching in room" ); return Err(Error::BadRequest( ErrorKind::InvalidParam, @@ -1091,9 +1092,26 @@ fn get_missing_events( )); } + let (room_members, room_errors): (Vec<_>, Vec<_>) = services() + .rooms + .state_cache + .room_members(room_id) + .partition(Result::is_ok); + + // Just log errors and continue with correct users + if !room_errors.is_empty() { + warn!(?room_id, "Some errors occurred when fetching room members"); + } + + let current_server_members: Vec = room_members + .into_iter() + .map(Result::unwrap) + .filter(|member| member.server_name() == sender_servername) + .collect(); + let event_is_visible = services().rooms.state_accessor.server_can_see_event( sender_servername, - room_id, + current_server_members.as_slice(), &queued_events[i], )?; diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 82b1c641..cd5bc745 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -12,8 +12,9 @@ use ruma::{ }, EventId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, }; +use tracing::warn; -use crate::{services, PduEvent, Result}; +use crate::{PduEvent, Result}; pub struct Service { pub db: &'static dyn Data, @@ -77,7 +78,7 @@ impl Service { pub fn server_can_see_event( &self, server_name: &ServerName, - room_id: &RoomId, + current_server_members: &[OwnedUserId], event_id: &EventId, ) -> Result { let shortstatehash = match self.pdu_shortstatehash(event_id) { @@ -94,18 +95,6 @@ impl Service { return Ok(*visibility); } - let current_server_members: Vec = services() - .rooms - .state_cache - .room_members(room_id) - .filter(|member| { - member - .as_ref() - .map(|member| member.server_name() == server_name) - .unwrap_or(true) - }) - .collect::>()?; - let history_visibility = self .state_get_content(shortstatehash, &StateEventType::RoomHistoryVisibility, "")? .map(|content| match content.get("history_visibility") { -- 2.45.2 From 98e24722ad86f5eed957283f1bd2d68cc9dbd37f Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Mon, 28 Nov 2022 13:22:43 +0200 Subject: [PATCH 10/16] refactor: Use same order as in trait --- src/service/pdu.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/service/pdu.rs b/src/service/pdu.rs index 554f3be7..a7983b24 100644 --- a/src/service/pdu.rs +++ b/src/service/pdu.rs @@ -281,6 +281,10 @@ impl state_res::Event for PduEvent { &self.sender } + fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch { + MilliSecondsSinceUnixEpoch(self.origin_server_ts) + } + fn event_type(&self) -> &RoomEventType { &self.kind } @@ -289,10 +293,6 @@ impl state_res::Event for PduEvent { &self.content } - fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch { - MilliSecondsSinceUnixEpoch(self.origin_server_ts) - } - fn state_key(&self) -> Option<&str> { self.state_key.as_deref() } -- 2.45.2 From 2717bdcf2e5787ff9b81a564a4befb72d1427613 Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Mon, 28 Nov 2022 13:43:33 +0200 Subject: [PATCH 11/16] refactor: Take away some methods from public --- src/service/rooms/state_accessor/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index cd5bc745..7eccbd40 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -130,7 +130,7 @@ impl Service { } /// The user was a joined member at this state (potentially in the past) - pub fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> bool { + fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> bool { self.db .user_membership(shortstatehash, user_id) .map(|s| s == MembershipState::Join) @@ -139,7 +139,7 @@ impl Service { /// The user was an invited or joined room member at this state (potentially /// in the past) - pub fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> bool { + fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> bool { self.db .user_membership(shortstatehash, user_id) .map(|s| s == MembershipState::Join || s == MembershipState::Invite) -- 2.45.2 From d8ee8cd0efced6e78d26015e1ecc2031b37897a8 Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Thu, 8 Dec 2022 23:41:10 +0200 Subject: [PATCH 12/16] refactor: Pull up invariants from the loop --- src/api/server_server.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 9c73e286..58f4d82a 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1056,6 +1056,23 @@ fn get_missing_events( latest_events: &[OwnedEventId], limit: UInt, ) -> Result>> { + let (room_members, room_errors): (Vec<_>, Vec<_>) = services() + .rooms + .state_cache + .room_members(room_id) + .partition(Result::is_ok); + + // Just log errors and continue with correct users + if !room_errors.is_empty() { + warn!(?room_id, "Some errors occurred when fetching room members"); + } + + let current_server_members: Vec = room_members + .into_iter() + .map(Result::unwrap) + .filter(|member| member.server_name() == sender_servername) + .collect(); + let limit = u64::from(limit) as usize; let mut queued_events = latest_events.to_owned(); @@ -1092,23 +1109,6 @@ fn get_missing_events( )); } - let (room_members, room_errors): (Vec<_>, Vec<_>) = services() - .rooms - .state_cache - .room_members(room_id) - .partition(Result::is_ok); - - // Just log errors and continue with correct users - if !room_errors.is_empty() { - warn!(?room_id, "Some errors occurred when fetching room members"); - } - - let current_server_members: Vec = room_members - .into_iter() - .map(Result::unwrap) - .filter(|member| member.server_name() == sender_servername) - .collect(); - let event_is_visible = services().rooms.state_accessor.server_can_see_event( sender_servername, current_server_members.as_slice(), -- 2.45.2 From b9c2bb38dbaf3f7af81678b3b127ceadff1b4ff3 Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Wed, 14 Dec 2022 17:18:13 +0200 Subject: [PATCH 13/16] refactor: Rewrite backfill algorithm according to specification in more readable form --- src/api/server_server.rs | 204 +++++++++++++++++++++++++++------------ 1 file changed, 144 insertions(+), 60 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 58f4d82a..2e3cbded 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -48,7 +48,7 @@ use ruma::{ }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, HashSet, VecDeque}, fmt::Debug, mem, net::{IpAddr, SocketAddr}, @@ -1045,10 +1045,10 @@ pub async fn get_missing_events_route( Ok(get_missing_events::v1::Response { events }) } -// Recursively fetch events starting from `latest_events`, going backwards -// through each event's `prev_events` until reaching the `earliest_events`. -// -// Used by the federation /backfill and /get_missing_events routes. +/// Fetch events starting from `latest_events`, going backwards +/// through each event's `prev_events` until reaching the `earliest_events`. +/// +/// Used by the federation /backfill and /get_missing_events routes. fn get_missing_events( sender_servername: &ServerName, room_id: &RoomId, @@ -1073,72 +1073,156 @@ fn get_missing_events( .filter(|member| member.server_name() == sender_servername) .collect(); - let limit = u64::from(limit) as usize; + let event_filter = |event_id: &EventId| { + services() + .rooms + .state_accessor + .server_can_see_event( + sender_servername, + current_server_members.as_slice(), + event_id, + ) + .unwrap_or_default() + }; - let mut queued_events = latest_events.to_owned(); - let mut events = Vec::new(); + let pdu_filter = |pdu: &CanonicalJsonObject| { + let event_room_id = pdu + .get("room_id") + .and_then(|val| val.as_str()) + .and_then(|room_id_str| <&RoomId>::try_from(room_id_str).ok()); - let mut stop_at_events = HashSet::with_capacity(limit); - stop_at_events.extend(earliest_events.iter().cloned()); + match event_room_id { + Some(event_room_id) => { + let valid_event = event_room_id != room_id; + if !valid_event { + error!(?room_id, ?event_room_id, "An evil event detected"); + } + valid_event + } + None => { + error!(?pdu, "Can't extract valid `room_id` from pdu"); + false + } + } + }; - let mut i = 0; - while i < queued_events.len() && events.len() < limit { - if stop_at_events.contains(&queued_events[i]) { - i += 1; + #[inline] + fn get_pdu(event: &EventId) -> Option { + services() + .rooms + .timeline + .get_pdu_json(event) + .unwrap_or_default() + } + + let events = linearize_previous_events( + latest_events.into_iter().cloned(), + earliest_events.into_iter().cloned(), + limit, + get_pdu, + event_filter, + pdu_filter, + ); + + Ok(events) +} + +/// Unwinds previous events by doing a breadth-first walk from given roots +/// +/// # Arguments +/// +/// * `roots`: Starting point to unwind event history +/// * `excluded`: Skipped events +/// * `limit`: How many events to extract +/// * `pdu_extractor`: Closure to extract PDU for given event_id, for example, from DB. +/// * `event_filter`: Closure to filter event by it's visiblity. It may or may not hit DB. +/// * `pdu_filter`: Closure to get basic validation against malformed PDUs. +/// +/// # Returns +/// +/// The previous events for given roots, without any `excluded` events, up to the provided `limit`. +/// +/// # Note +/// +/// In matrix specification, «Server-Server API», paragraph 8 there is no mention of previous events for excluded events. +/// Therefore, algorithm below excludes **only** events itself, but allows to process their history. +fn linearize_previous_events( + roots: E, + excluded: E, + limit: L, + pdu_extractor: P, + event_filter: F, + pdu_filter: V, +) -> Vec> +where + E: IntoIterator, + F: Fn(&EventId) -> bool, + L: Into, + V: Fn(&CanonicalJsonObject) -> bool, + P: Fn(&EventId) -> Option, +{ + let limit = limit.into() as usize; + assert!(limit > 0, "Limit should be > 0"); + + #[inline] + fn get_previous_events(pdu: &CanonicalJsonObject) -> Option> { + match pdu.get("prev_events") { + None => { + error!(?pdu, "A stored event has no 'prev_events' field"); + return None; + } + Some(prev_events) => { + let val = prev_events.clone().into(); + let events = serde_json::from_value::>(val); + if let Err(error) = events { + error!(?prev_events, ?error, "Broken 'prev_events' field"); + return None; + } + Some(events.unwrap_or_default()) + } + } + } + + let mut visited: HashSet = Default::default(); + let mut history: Vec> = Default::default(); + let mut queue: VecDeque = Default::default(); + let excluded: HashSet<_> = excluded.into_iter().collect(); + + // Add all roots into processing queue + for root in roots { + queue.push_back(root); + } + + while let Some(current_event) = queue.pop_front() { + // Return all collected events if reached limit + if history.len() >= limit { + return history; + } + + // Skip an entire branch containing incorrect events + if !event_filter(¤t_event) { continue; } - if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? { - let room_id_str = pdu - .get("room_id") - .and_then(|val| val.as_str()) - .ok_or_else(|| Error::bad_database("Invalid event in database"))?; - - let event_room_id = <&RoomId>::try_from(room_id_str) - .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - - if event_room_id != room_id { - warn!( - ?room_id, - evil_event = ?queued_events[i], - "Evil event detected while searching in room" - ); - return Err(Error::BadRequest( - ErrorKind::InvalidParam, - "Evil event detected", - )); + // Process PDU from a current event if it exists and valid + if let Some(pdu) = pdu_extractor(¤t_event).filter(&pdu_filter) { + if !&excluded.contains(¤t_event) { + history.push(PduEvent::convert_to_outgoing_federation_event(pdu.clone())); } - let event_is_visible = services().rooms.state_accessor.server_can_see_event( - sender_servername, - current_server_members.as_slice(), - &queued_events[i], - )?; - - if !event_is_visible { - i += 1; - continue; + // Fetch previous events, if they exists + if let Some(previous_events) = get_previous_events(&pdu) { + for previous_event in previous_events { + if !visited.contains(&previous_event) { + visited.insert(previous_event.clone()); + queue.push_back(previous_event); + } + } } - - // Don't send this event again if it comes through some other - // event's prev_events. - stop_at_events.insert(queued_events[i].clone()); - - let prev_events = pdu - .get("prev_events") - .ok_or_else(|| Error::bad_database("Event in db has no prev_events field."))?; - - queued_events.extend_from_slice( - &serde_json::from_value::>(prev_events.clone().into()).map_err( - |_| Error::bad_database("Invalid prev_events content in pdu in db."), - )?, - ); - events.push(PduEvent::convert_to_outgoing_federation_event(pdu)); } - i += 1; } - - Ok(events) + // All done, return collected events + history } /// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}` -- 2.45.2 From 1fabc3cd690bd9afced055b6b0c8eea0c1579ab2 Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Wed, 14 Dec 2022 17:19:01 +0200 Subject: [PATCH 14/16] refactor: Added tests to backfill --- src/api/server_server.rs | 229 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 229 insertions(+) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 2e3cbded..653f07d1 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1977,7 +1977,13 @@ pub async fn claim_keys_route( #[cfg(test)] mod tests { + use super::linearize_previous_events; use super::{add_port_to_hostname, get_ip_with_port, FedDest}; + use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId}; + use serde::{Deserialize, Serialize}; + use serde_json::value::RawValue; + use serde_json::Value; + use std::collections::HashMap; #[test] fn ips_get_default_ports() { @@ -2018,4 +2024,227 @@ mod tests { FedDest::Named(String::from("example.com"), String::from(":1337")) ) } + + type PduStorage = HashMap; + + #[derive(Debug, Serialize, Deserialize)] + struct MockPDU { + content: i32, + prev_events: Vec, + } + + fn mock_event_id(id: &i32) -> OwnedEventId { + const DOMAIN: &str = "canterlot.eq"; + ::try_from(format!("${id}:{DOMAIN}")).unwrap() + } + + fn create_graph(data: Vec<(i32, Vec)>) -> PduStorage { + data.iter() + .map(|(head, tail)| { + let key = mock_event_id(head); + let pdu = MockPDU { + content: *head, + prev_events: tail.iter().map(mock_event_id).collect(), + }; + let value = serde_json::to_value(pdu).unwrap(); + let value: CanonicalJsonValue = value.try_into().unwrap(); + (key, value.as_object().unwrap().to_owned()) + }) + .collect() + } + + fn mock_full_graph() -> PduStorage { + /* + (1) + __________|___________ + / / \ \ + (2) (3) (10) (11) + / \ / \ | | + (4) (5) (6) (7) (12) (13) + | | | + (8) (9) (14) + \ / + (15) + | + (16) + */ + create_graph(vec![ + (1, vec![2, 3, 10, 11]), + (2, vec![4, 5]), + (3, vec![6, 7]), + (4, vec![]), + (5, vec![8]), + (6, vec![9]), + (7, vec![]), + (8, vec![15]), + (9, vec![15]), + (10, vec![12]), + (11, vec![13]), + (12, vec![]), + (13, vec![14]), + (14, vec![]), + (15, vec![16]), + (16, vec![16]), + ]) + } + + fn extract_events_payload(events: Vec>) -> Vec { + events + .iter() + .map(|e| serde_json::from_str(e.get()).unwrap()) + .map(|p: MockPDU| p.content) + .collect() + } + + #[test] + fn backfill_empty() { + let events = linearize_previous_events( + vec![], + vec![], + 16u64, + |_| unreachable!(), + |_| true, + |_| true, + ); + assert!(events.is_empty()); + } + #[test] + fn backfill_limit() { + /* + (5) → (4) → (3) → (2) → (1) → × + */ + let events = create_graph(vec![ + (1, vec![]), + (2, vec![1]), + (3, vec![2]), + (4, vec![3]), + (5, vec![4]), + ]); + let roots = vec![mock_event_id(&5)]; + let result = linearize_previous_events( + roots, + vec![], + 3u64, + |e| events.get(e).cloned(), + |_| true, + |_| true, + ); + + assert_eq!(extract_events_payload(result), vec![5, 4, 3]) + } + + #[test] + fn backfill_bfs() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&1)]; + let result = linearize_previous_events( + roots, + vec![], + 100u64, + |e| events.get(e).cloned(), + |_| true, + |_| true, + ); + assert_eq!( + extract_events_payload(result), + vec![1, 2, 3, 10, 11, 4, 5, 6, 7, 12, 13, 8, 9, 14, 15, 16] + ) + } + + #[test] + fn backfill_subgraph() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&3)]; + let result = linearize_previous_events( + roots, + vec![], + 100u64, + |e| events.get(e).cloned(), + |_| true, + |_| true, + ); + assert_eq!(extract_events_payload(result), vec![3, 6, 7, 9, 15, 16]) + } + + #[test] + fn backfill_two_roots() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&3), mock_event_id(&11)]; + let result = linearize_previous_events( + roots, + vec![], + 100u64, + |e| events.get(e).cloned(), + |_| true, + |_| true, + ); + assert_eq!( + extract_events_payload(result), + vec![3, 11, 6, 7, 13, 9, 14, 15, 16] + ) + } + + #[test] + fn backfill_exclude_events() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&1)]; + let excluded_events = vec![ + mock_event_id(&14), + mock_event_id(&15), + mock_event_id(&16), + mock_event_id(&3), + ]; + let result = linearize_previous_events( + roots, + excluded_events, + 100u64, + |e| events.get(e).cloned(), + |_| true, + |_| true, + ); + assert_eq!( + extract_events_payload(result), + vec![1, 2, 10, 11, 4, 5, 6, 7, 12, 13, 8, 9] + ) + } + + #[test] + fn backfill_exclude_branch_with_evil_event() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&1)]; + let result = linearize_previous_events( + roots, + vec![], + 100u64, + |e| events.get(e).cloned(), + |_| true, + |e| { + let value: Value = CanonicalJsonValue::Object(e.clone()).into(); + let pdu: MockPDU = serde_json::from_value(value).unwrap(); + pdu.content != 3 + }, + ); + assert_eq!( + extract_events_payload(result), + vec![1, 2, 10, 11, 4, 5, 12, 13, 8, 14, 15, 16] + ) + } + + #[test] + fn backfill_exclude_branch_with_inaccessible_event() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&1)]; + let result = linearize_previous_events( + roots, + vec![], + 100u64, + |e| events.get(e).cloned(), + |e| e != mock_event_id(&3), + |_| true, + ); + assert_eq!( + extract_events_payload(result), + vec![1, 2, 10, 11, 4, 5, 12, 13, 8, 14, 15, 16] + ) + } } -- 2.45.2 From 330e68eb9363e7e62f8b90750e73b9cbe5e23c6f Mon Sep 17 00:00:00 2001 From: "Andriy Kushnir (Orhideous)" Date: Wed, 14 Dec 2022 18:02:35 +0200 Subject: [PATCH 15/16] refactor: Fixed logical condition for evil events --- src/api/server_server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 653f07d1..2b7b4f50 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1093,7 +1093,7 @@ fn get_missing_events( match event_room_id { Some(event_room_id) => { - let valid_event = event_room_id != room_id; + let valid_event = event_room_id == room_id; if !valid_event { error!(?room_id, ?event_room_id, "An evil event detected"); } -- 2.45.2 From c249fd7d4d9f7cf68996663d67625724b710d6e0 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Wed, 21 Dec 2022 10:22:09 +0100 Subject: [PATCH 16/16] chore: code cleanup --- src/api/server_server.rs | 12 +++++------- src/service/rooms/state_accessor/data.rs | 6 ++++-- src/service/rooms/state_accessor/mod.rs | 11 ++++++----- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 2b7b4f50..045cf13c 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1116,8 +1116,8 @@ fn get_missing_events( } let events = linearize_previous_events( - latest_events.into_iter().cloned(), - earliest_events.into_iter().cloned(), + latest_events.iter().cloned(), + earliest_events.iter().cloned(), limit, get_pdu, event_filter, @@ -1169,7 +1169,7 @@ where match pdu.get("prev_events") { None => { error!(?pdu, "A stored event has no 'prev_events' field"); - return None; + None } Some(prev_events) => { let val = prev_events.clone().into(); @@ -1977,12 +1977,10 @@ pub async fn claim_keys_route( #[cfg(test)] mod tests { - use super::linearize_previous_events; - use super::{add_port_to_hostname, get_ip_with_port, FedDest}; + use super::{add_port_to_hostname, get_ip_with_port, linearize_previous_events, FedDest}; use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId}; use serde::{Deserialize, Serialize}; - use serde_json::value::RawValue; - use serde_json::Value; + use serde_json::{value::RawValue, Value}; use std::collections::HashMap; #[test] diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 1ac3e314..70261b09 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,8 +1,10 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use ruma::events::room::member::MembershipState; -use ruma::{events::StateEventType, EventId, RoomId, UserId}; +use ruma::{ + events::{room::member::MembershipState, StateEventType}, + EventId, RoomId, UserId, +}; use crate::{PduEvent, Result}; diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 7eccbd40..7a8e65db 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -8,7 +8,8 @@ pub use data::Data; use lru_cache::LruCache; use ruma::{ events::{ - room::history_visibility::HistoryVisibility, room::member::MembershipState, StateEventType, + room::{history_visibility::HistoryVisibility, member::MembershipState}, + StateEventType, }, EventId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, }; @@ -110,14 +111,14 @@ impl Service { Some(HistoryVisibility::Invited) => { // Allow if any member on requesting server was AT LEAST invited, else deny current_server_members - .into_iter() - .any(|member| self.user_was_invited(shortstatehash, &member)) + .iter() + .any(|member| self.user_was_invited(shortstatehash, member)) } _ => { // Allow if any member on requested server was joined, else deny current_server_members - .into_iter() - .any(|member| self.user_was_joined(shortstatehash, &member)) + .iter() + .any(|member| self.user_was_joined(shortstatehash, member)) } }; -- 2.45.2