diff --git a/src/api/server_server.rs b/src/api/server_server.rs index fc3e2c0f..65be5a6f 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -12,6 +12,7 @@ use ruma::{ client::error::{Error as RumaError, ErrorKind}, federation::{ authorization::get_event_authorization, + backfill::get_backfill, device::get_devices::{self, v1::UserDevice}, directory::{get_public_rooms, get_public_rooms_filtered}, discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey}, @@ -43,11 +44,11 @@ use ruma::{ serde::{Base64, JsonObject, Raw}, to_device::DeviceIdOrAllDevices, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, - OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, + OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashSet}, fmt::Debug, mem, net::{IpAddr, SocketAddr}, @@ -950,6 +951,53 @@ pub async fn get_event_route( }) } +/// # `GET /_matrix/federation/v1/backfill/` +/// +/// Retrieves events from before the sender joined the room, if the room's +/// history visibility allows. +pub async fn get_backfill_route( + body: Ruma, +) -> Result { + if !services().globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + + let sender_servername = body + .sender_servername + .as_ref() + .expect("server is authenticated"); + + info!("Got backfill request from: {}", sender_servername); + + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, &body.room_id)? + { + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Server is not in room.", + )); + } + + let origin = services().globals.server_name().to_owned(); + let earliest_events = &[]; + + let events = get_missing_events( + sender_servername, + &body.room_id, + earliest_events, + &body.v, + body.limit, + )?; + + Ok(get_backfill::v1::Response { + origin, + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + pdus: events, + }) +} + /// # `POST /_matrix/federation/v1/get_missing_events/{roomId}` /// /// Retrieves events that the sender is missing. @@ -981,11 +1029,43 @@ pub async fn get_missing_events_route( .event_handler .acl_check(sender_servername, &body.room_id)?; - let mut queued_events = body.latest_events.clone(); + let events = get_missing_events( + sender_servername, + &body.room_id, + &body.earliest_events, + &body.latest_events, + body.limit, + )?; + + Ok(get_missing_events::v1::Response { events }) +} + +// Recursively fetch events starting from `latest_events`, going backwards +// through each event's `prev_events` until reaching the `earliest_events`. +// +// Used by the federation /backfill and /get_missing_events routes. +fn get_missing_events( + sender_servername: &ServerName, + room_id: &RoomId, + earliest_events: &[OwnedEventId], + latest_events: &Vec, + limit: UInt, +) -> Result>> { + let limit = u64::from(limit) as usize; + + let mut queued_events = latest_events.clone(); let mut events = Vec::new(); + let mut stop_at_events = HashSet::with_capacity(limit); + stop_at_events.extend(earliest_events.iter().cloned()); + let mut i = 0; - while i < queued_events.len() && events.len() < u64::from(body.limit) as usize { + while i < queued_events.len() && events.len() < limit { + if stop_at_events.contains(&queued_events[i]) { + i += 1; + continue; + } + if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? { let room_id_str = pdu .get("room_id") @@ -995,10 +1075,10 @@ pub async fn get_missing_events_route( let event_room_id = <&RoomId>::try_from(room_id_str) .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - if event_room_id != body.room_id { + if event_room_id != room_id { warn!( "Evil event detected: Event {} found while searching in room {}", - queued_events[i], body.room_id + queued_events[i], room_id ); return Err(Error::BadRequest( ErrorKind::InvalidParam, @@ -1006,10 +1086,20 @@ pub async fn get_missing_events_route( )); } - if body.earliest_events.contains(&queued_events[i]) { + let event_is_visible = services() + .rooms + .state_accessor + .server_can_see_event(sender_servername, &queued_events[i])?; + + if !event_is_visible { i += 1; continue; } + + // Don't send this event again if it comes through some other + // event's prev_events. + stop_at_events.insert(queued_events[i].clone()); + queued_events.extend_from_slice( &serde_json::from_value::>( serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| { @@ -1024,7 +1114,7 @@ pub async fn get_missing_events_route( i += 1; } - Ok(get_missing_events::v1::Response { events }) + Ok(events) } /// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}` diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 0f0c0dc7..1618c8e0 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -2,7 +2,13 @@ use std::{collections::HashMap, sync::Arc}; use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; use async_trait::async_trait; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{ + events::{ + room::history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, + StateEventType, + }, + EventId, RoomId, ServerName, +}; #[async_trait] impl service::rooms::state_accessor::Data for KeyValueDatabase { @@ -138,6 +144,35 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { }) } + /// Whether a server is allowed to see an event through federation, based on + /// the room's history_visibility at that event's state. + /// + /// Note: Joined/Invited history visibility not yet implemented. + #[tracing::instrument(skip(self))] + fn server_can_see_event(&self, _server_name: &ServerName, event_id: &EventId) -> Result { + let shortstatehash = match self.pdu_shortstatehash(event_id) { + Ok(Some(shortstatehash)) => shortstatehash, + _ => return Ok(false), + }; + + let history_visibility = self + .state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")? + .map(|event| serde_json::from_str(event.content.get())) + .transpose() + .map_err(|_| Error::bad_database("Invalid room history visibility event in database."))? + .map(|content: RoomHistoryVisibilityEventContent| content.history_visibility); + + Ok(match history_visibility { + Some(HistoryVisibility::WorldReadable) => true, + Some(HistoryVisibility::Shared) => true, + // TODO: Check if any of the server's users were invited + // at this point in time. + Some(HistoryVisibility::Joined) => false, + Some(HistoryVisibility::Invited) => false, + _ => false, + }) + } + /// Returns the full room state. async fn room_state_full( &self, diff --git a/src/main.rs b/src/main.rs index da80507c..9eb879d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -391,6 +391,7 @@ fn routes() -> Router { .ruma_route(server_server::send_transaction_message_route) .ruma_route(server_server::get_event_route) .ruma_route(server_server::get_missing_events_route) + .ruma_route(server_server::get_backfill_route) .ruma_route(server_server::get_event_authorization_route) .ruma_route(server_server::get_room_state_route) .ruma_route(server_server::get_room_state_ids_route) diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index f3ae3c21..597955f6 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{events::StateEventType, EventId, RoomId, ServerName}; use crate::{PduEvent, Result}; @@ -35,6 +35,9 @@ pub trait Data: Send + Sync { /// Returns the state hash for this pdu. fn pdu_shortstatehash(&self, event_id: &EventId) -> Result>; + /// Returns true if a server has permission to see an event + fn server_can_see_event(&self, sever_name: &ServerName, event_id: &EventId) -> Result; + /// Returns the full room state. async fn room_state_full( &self, diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 87d99368..bc286568 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -2,7 +2,7 @@ mod data; use std::{collections::HashMap, sync::Arc}; pub use data::Data; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{events::StateEventType, EventId, RoomId, ServerName}; use crate::{PduEvent, Result}; @@ -51,6 +51,16 @@ impl Service { self.db.pdu_shortstatehash(event_id) } + /// Returns true if a server has permission to see an event + #[tracing::instrument(skip(self))] + pub fn server_can_see_event<'a>( + &'a self, + sever_name: &ServerName, + event_id: &EventId, + ) -> Result { + self.db.server_can_see_event(sever_name, event_id) + } + /// Returns the full room state. #[tracing::instrument(skip(self))] pub async fn room_state_full(