diff --git a/src/service/account_data.rs b/src/database/key_value/account_data.rs similarity index 100% rename from src/service/account_data.rs rename to src/database/key_value/account_data.rs diff --git a/src/service/globals.rs b/src/database/key_value/globals.rs similarity index 100% rename from src/service/globals.rs rename to src/database/key_value/globals.rs diff --git a/src/service/key_backups.rs b/src/database/key_value/key_backups.rs similarity index 100% rename from src/service/key_backups.rs rename to src/database/key_value/key_backups.rs diff --git a/src/service/media.rs b/src/database/key_value/media.rs similarity index 100% rename from src/service/media.rs rename to src/database/key_value/media.rs diff --git a/src/service/account_data/data.rs b/src/service/account_data/data.rs new file mode 100644 index 00000000..70ad9f2a --- /dev/null +++ b/src/service/account_data/data.rs @@ -0,0 +1,145 @@ +use crate::{utils, Error, Result}; +use ruma::{ + api::client::error::ErrorKind, + events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, + serde::Raw, + RoomId, UserId, +}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{collections::HashMap, sync::Arc}; + +impl AccountData { + /// Places one event in the account data of the user and removes the previous entry. + #[tracing::instrument(skip(self, room_id, user_id, event_type, data))] + pub fn update( + &self, + room_id: Option<&RoomId>, + user_id: &UserId, + event_type: RoomAccountDataEventType, + data: &T, + ) -> Result<()> { + let mut prefix = room_id + .map(|r| r.to_string()) + .unwrap_or_default() + .as_bytes() + .to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(user_id.as_bytes()); + prefix.push(0xff); + + let mut roomuserdataid = prefix.clone(); + roomuserdataid.extend_from_slice(&services().globals.next_count()?.to_be_bytes()); + roomuserdataid.push(0xff); + roomuserdataid.extend_from_slice(event_type.to_string().as_bytes()); + + let mut key = prefix; + key.extend_from_slice(event_type.to_string().as_bytes()); + + let json = serde_json::to_value(data).expect("all types here can be serialized"); // TODO: maybe add error handling + if json.get("type").is_none() || json.get("content").is_none() { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Account data doesn't have all required fields.", + )); + } + + self.roomuserdataid_accountdata.insert( + &roomuserdataid, + &serde_json::to_vec(&json).expect("to_vec always works on json values"), + )?; + + let prev = self.roomusertype_roomuserdataid.get(&key)?; + + self.roomusertype_roomuserdataid + .insert(&key, &roomuserdataid)?; + + // Remove old entry + if let Some(prev) = prev { + self.roomuserdataid_accountdata.remove(&prev)?; + } + + Ok(()) + } + + /// Searches the account data for a specific kind. + #[tracing::instrument(skip(self, room_id, user_id, kind))] + pub fn get( + &self, + room_id: Option<&RoomId>, + user_id: &UserId, + kind: RoomAccountDataEventType, + ) -> Result> { + let mut key = room_id + .map(|r| r.to_string()) + .unwrap_or_default() + .as_bytes() + .to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(kind.to_string().as_bytes()); + + self.roomusertype_roomuserdataid + .get(&key)? + .and_then(|roomuserdataid| { + self.roomuserdataid_accountdata + .get(&roomuserdataid) + .transpose() + }) + .transpose()? + .map(|data| { + serde_json::from_slice(&data) + .map_err(|_| Error::bad_database("could not deserialize")) + }) + .transpose() + } + + /// Returns all changes to the account data that happened after `since`. + #[tracing::instrument(skip(self, room_id, user_id, since))] + pub fn changes_since( + &self, + room_id: Option<&RoomId>, + user_id: &UserId, + since: u64, + ) -> Result>> { + let mut userdata = HashMap::new(); + + let mut prefix = room_id + .map(|r| r.to_string()) + .unwrap_or_default() + .as_bytes() + .to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(user_id.as_bytes()); + prefix.push(0xff); + + // Skip the data that's exactly at since, because we sent that last time + let mut first_possible = prefix.clone(); + first_possible.extend_from_slice(&(since + 1).to_be_bytes()); + + for r in self + .roomuserdataid_accountdata + .iter_from(&first_possible, false) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(|(k, v)| { + Ok::<_, Error>(( + RoomAccountDataEventType::try_from( + utils::string_from_bytes(k.rsplit(|&b| b == 0xff).next().ok_or_else( + || Error::bad_database("RoomUserData ID in db is invalid."), + )?) + .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, + ) + .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, + serde_json::from_slice::>(&v).map_err(|_| { + Error::bad_database("Database contains invalid account data.") + })?, + )) + }) + { + let (kind, data) = r?; + userdata.insert(kind, data); + } + + Ok(userdata) + } +} diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs new file mode 100644 index 00000000..70ad9f2a --- /dev/null +++ b/src/service/account_data/mod.rs @@ -0,0 +1,145 @@ +use crate::{utils, Error, Result}; +use ruma::{ + api::client::error::ErrorKind, + events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, + serde::Raw, + RoomId, UserId, +}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{collections::HashMap, sync::Arc}; + +impl AccountData { + /// Places one event in the account data of the user and removes the previous entry. + #[tracing::instrument(skip(self, room_id, user_id, event_type, data))] + pub fn update( + &self, + room_id: Option<&RoomId>, + user_id: &UserId, + event_type: RoomAccountDataEventType, + data: &T, + ) -> Result<()> { + let mut prefix = room_id + .map(|r| r.to_string()) + .unwrap_or_default() + .as_bytes() + .to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(user_id.as_bytes()); + prefix.push(0xff); + + let mut roomuserdataid = prefix.clone(); + roomuserdataid.extend_from_slice(&services().globals.next_count()?.to_be_bytes()); + roomuserdataid.push(0xff); + roomuserdataid.extend_from_slice(event_type.to_string().as_bytes()); + + let mut key = prefix; + key.extend_from_slice(event_type.to_string().as_bytes()); + + let json = serde_json::to_value(data).expect("all types here can be serialized"); // TODO: maybe add error handling + if json.get("type").is_none() || json.get("content").is_none() { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Account data doesn't have all required fields.", + )); + } + + self.roomuserdataid_accountdata.insert( + &roomuserdataid, + &serde_json::to_vec(&json).expect("to_vec always works on json values"), + )?; + + let prev = self.roomusertype_roomuserdataid.get(&key)?; + + self.roomusertype_roomuserdataid + .insert(&key, &roomuserdataid)?; + + // Remove old entry + if let Some(prev) = prev { + self.roomuserdataid_accountdata.remove(&prev)?; + } + + Ok(()) + } + + /// Searches the account data for a specific kind. + #[tracing::instrument(skip(self, room_id, user_id, kind))] + pub fn get( + &self, + room_id: Option<&RoomId>, + user_id: &UserId, + kind: RoomAccountDataEventType, + ) -> Result> { + let mut key = room_id + .map(|r| r.to_string()) + .unwrap_or_default() + .as_bytes() + .to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(kind.to_string().as_bytes()); + + self.roomusertype_roomuserdataid + .get(&key)? + .and_then(|roomuserdataid| { + self.roomuserdataid_accountdata + .get(&roomuserdataid) + .transpose() + }) + .transpose()? + .map(|data| { + serde_json::from_slice(&data) + .map_err(|_| Error::bad_database("could not deserialize")) + }) + .transpose() + } + + /// Returns all changes to the account data that happened after `since`. + #[tracing::instrument(skip(self, room_id, user_id, since))] + pub fn changes_since( + &self, + room_id: Option<&RoomId>, + user_id: &UserId, + since: u64, + ) -> Result>> { + let mut userdata = HashMap::new(); + + let mut prefix = room_id + .map(|r| r.to_string()) + .unwrap_or_default() + .as_bytes() + .to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(user_id.as_bytes()); + prefix.push(0xff); + + // Skip the data that's exactly at since, because we sent that last time + let mut first_possible = prefix.clone(); + first_possible.extend_from_slice(&(since + 1).to_be_bytes()); + + for r in self + .roomuserdataid_accountdata + .iter_from(&first_possible, false) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .map(|(k, v)| { + Ok::<_, Error>(( + RoomAccountDataEventType::try_from( + utils::string_from_bytes(k.rsplit(|&b| b == 0xff).next().ok_or_else( + || Error::bad_database("RoomUserData ID in db is invalid."), + )?) + .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, + ) + .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, + serde_json::from_slice::>(&v).map_err(|_| { + Error::bad_database("Database contains invalid account data.") + })?, + )) + }) + { + let (kind, data) = r?; + userdata.insert(kind, data); + } + + Ok(userdata) + } +} diff --git a/src/service/admin.rs b/src/service/admin/mod.rs similarity index 100% rename from src/service/admin.rs rename to src/service/admin/mod.rs diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs new file mode 100644 index 00000000..2b47e5b1 --- /dev/null +++ b/src/service/globals/mod.rs @@ -0,0 +1,426 @@ +mod data; +pub use data::Data; + +use crate::service::*; + +use crate::{database::Config, server_server::FedDest, utils, Error, Result}; +use ruma::{ + api::{ + client::sync::sync_events, + federation::discovery::{ServerSigningKeys, VerifyKey}, + }, + DeviceId, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, + ServerSigningKeyId, UserId, +}; +use std::{ + collections::{BTreeMap, HashMap}, + fs, + future::Future, + net::{IpAddr, SocketAddr}, + path::PathBuf, + sync::{Arc, Mutex, RwLock}, + time::{Duration, Instant}, +}; +use tokio::sync::{broadcast, watch::Receiver, Mutex as TokioMutex, Semaphore}; +use tracing::error; +use trust_dns_resolver::TokioAsyncResolver; + +use super::abstraction::Tree; + +pub const COUNTER: &[u8] = b"c"; + +type WellKnownMap = HashMap, (FedDest, String)>; +type TlsNameMap = HashMap, u16)>; +type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries +type SyncHandle = ( + Option, // since + Receiver>>, // rx +); + +pub struct Service { + db: D, + + pub actual_destination_cache: Arc>, // actual_destination, host + pub tls_name_override: Arc>, + pub config: Config, + keypair: Arc, + dns_resolver: TokioAsyncResolver, + jwt_decoding_key: Option>, + federation_client: reqwest::Client, + default_client: reqwest::Client, + pub stable_room_versions: Vec, + pub unstable_room_versions: Vec, + pub bad_event_ratelimiter: Arc, RateLimitState>>>, + pub bad_signature_ratelimiter: Arc, RateLimitState>>>, + pub servername_ratelimiter: Arc, Arc>>>, + pub sync_receivers: RwLock, Box), SyncHandle>>, + pub roomid_mutex_insert: RwLock, Arc>>>, + pub roomid_mutex_state: RwLock, Arc>>>, + pub roomid_mutex_federation: RwLock, Arc>>>, // this lock will be held longer + pub roomid_federationhandletime: RwLock, (Box, Instant)>>, + pub stateres_mutex: Arc>, + pub rotate: RotationHandler, +} + +/// Handles "rotation" of long-polling requests. "Rotation" in this context is similar to "rotation" of log files and the like. +/// +/// This is utilized to have sync workers return early and release read locks on the database. +pub struct RotationHandler(broadcast::Sender<()>, broadcast::Receiver<()>); + +impl RotationHandler { + pub fn new() -> Self { + let (s, r) = broadcast::channel(1); + Self(s, r) + } + + pub fn watch(&self) -> impl Future { + let mut r = self.0.subscribe(); + + async move { + let _ = r.recv().await; + } + } + + pub fn fire(&self) { + let _ = self.0.send(()); + } +} + +impl Default for RotationHandler { + fn default() -> Self { + Self::new() + } +} + + +impl Service<_> { + pub fn load( + globals: Arc, + server_signingkeys: Arc, + config: Config, + ) -> Result { + let keypair_bytes = globals.get(b"keypair")?.map_or_else( + || { + let keypair = utils::generate_keypair(); + globals.insert(b"keypair", &keypair)?; + Ok::<_, Error>(keypair) + }, + |s| Ok(s.to_vec()), + )?; + + let mut parts = keypair_bytes.splitn(2, |&b| b == 0xff); + + let keypair = utils::string_from_bytes( + // 1. version + parts + .next() + .expect("splitn always returns at least one element"), + ) + .map_err(|_| Error::bad_database("Invalid version bytes in keypair.")) + .and_then(|version| { + // 2. key + parts + .next() + .ok_or_else(|| Error::bad_database("Invalid keypair format in database.")) + .map(|key| (version, key)) + }) + .and_then(|(version, key)| { + ruma::signatures::Ed25519KeyPair::from_der(key, version) + .map_err(|_| Error::bad_database("Private or public keys are invalid.")) + }); + + let keypair = match keypair { + Ok(k) => k, + Err(e) => { + error!("Keypair invalid. Deleting..."); + globals.remove(b"keypair")?; + return Err(e); + } + }; + + let tls_name_override = Arc::new(RwLock::new(TlsNameMap::new())); + + let jwt_decoding_key = config + .jwt_secret + .as_ref() + .map(|secret| jsonwebtoken::DecodingKey::from_secret(secret.as_bytes()).into_static()); + + let default_client = reqwest_client_builder(&config)?.build()?; + let name_override = Arc::clone(&tls_name_override); + let federation_client = reqwest_client_builder(&config)? + .resolve_fn(move |domain| { + let read_guard = name_override.read().unwrap(); + let (override_name, port) = read_guard.get(&domain)?; + let first_name = override_name.get(0)?; + Some(SocketAddr::new(*first_name, *port)) + }) + .build()?; + + // Supported and stable room versions + let stable_room_versions = vec![ + RoomVersionId::V6, + RoomVersionId::V7, + RoomVersionId::V8, + RoomVersionId::V9, + ]; + // Experimental, partially supported room versions + let unstable_room_versions = vec![RoomVersionId::V3, RoomVersionId::V4, RoomVersionId::V5]; + + let mut s = Self { + globals, + config, + keypair: Arc::new(keypair), + dns_resolver: TokioAsyncResolver::tokio_from_system_conf().map_err(|e| { + error!( + "Failed to set up trust dns resolver with system config: {}", + e + ); + Error::bad_config("Failed to set up trust dns resolver with system config.") + })?, + actual_destination_cache: Arc::new(RwLock::new(WellKnownMap::new())), + tls_name_override, + federation_client, + default_client, + server_signingkeys, + jwt_decoding_key, + stable_room_versions, + unstable_room_versions, + bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())), + bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())), + servername_ratelimiter: Arc::new(RwLock::new(HashMap::new())), + roomid_mutex_state: RwLock::new(HashMap::new()), + roomid_mutex_insert: RwLock::new(HashMap::new()), + roomid_mutex_federation: RwLock::new(HashMap::new()), + roomid_federationhandletime: RwLock::new(HashMap::new()), + stateres_mutex: Arc::new(Mutex::new(())), + sync_receivers: RwLock::new(HashMap::new()), + rotate: RotationHandler::new(), + }; + + fs::create_dir_all(s.get_media_folder())?; + + if !s + .supported_room_versions() + .contains(&s.config.default_room_version) + { + error!("Room version in config isn't supported, falling back to Version 6"); + s.config.default_room_version = RoomVersionId::V6; + }; + + Ok(s) + } + + /// Returns this server's keypair. + pub fn keypair(&self) -> &ruma::signatures::Ed25519KeyPair { + &self.keypair + } + + /// Returns a reqwest client which can be used to send requests + pub fn default_client(&self) -> reqwest::Client { + // Client is cheap to clone (Arc wrapper) and avoids lifetime issues + self.default_client.clone() + } + + /// Returns a client used for resolving .well-knowns + pub fn federation_client(&self) -> reqwest::Client { + // Client is cheap to clone (Arc wrapper) and avoids lifetime issues + self.federation_client.clone() + } + + #[tracing::instrument(skip(self))] + pub fn next_count(&self) -> Result { + utils::u64_from_bytes(&self.globals.increment(COUNTER)?) + .map_err(|_| Error::bad_database("Count has invalid bytes.")) + } + + #[tracing::instrument(skip(self))] + pub fn current_count(&self) -> Result { + self.globals.get(COUNTER)?.map_or(Ok(0_u64), |bytes| { + utils::u64_from_bytes(&bytes) + .map_err(|_| Error::bad_database("Count has invalid bytes.")) + }) + } + + pub fn server_name(&self) -> &ServerName { + self.config.server_name.as_ref() + } + + pub fn max_request_size(&self) -> u32 { + self.config.max_request_size + } + + pub fn allow_registration(&self) -> bool { + self.config.allow_registration + } + + pub fn allow_encryption(&self) -> bool { + self.config.allow_encryption + } + + pub fn allow_federation(&self) -> bool { + self.config.allow_federation + } + + pub fn allow_room_creation(&self) -> bool { + self.config.allow_room_creation + } + + pub fn allow_unstable_room_versions(&self) -> bool { + self.config.allow_unstable_room_versions + } + + pub fn default_room_version(&self) -> RoomVersionId { + self.config.default_room_version.clone() + } + + pub fn trusted_servers(&self) -> &[Box] { + &self.config.trusted_servers + } + + pub fn dns_resolver(&self) -> &TokioAsyncResolver { + &self.dns_resolver + } + + pub fn jwt_decoding_key(&self) -> Option<&jsonwebtoken::DecodingKey<'_>> { + self.jwt_decoding_key.as_ref() + } + + pub fn turn_password(&self) -> &String { + &self.config.turn_password + } + + pub fn turn_ttl(&self) -> u64 { + self.config.turn_ttl + } + + pub fn turn_uris(&self) -> &[String] { + &self.config.turn_uris + } + + pub fn turn_username(&self) -> &String { + &self.config.turn_username + } + + pub fn turn_secret(&self) -> &String { + &self.config.turn_secret + } + + pub fn emergency_password(&self) -> &Option { + &self.config.emergency_password + } + + pub fn supported_room_versions(&self) -> Vec { + let mut room_versions: Vec = vec![]; + room_versions.extend(self.stable_room_versions.clone()); + if self.allow_unstable_room_versions() { + room_versions.extend(self.unstable_room_versions.clone()); + }; + room_versions + } + + /// TODO: the key valid until timestamp is only honored in room version > 4 + /// Remove the outdated keys and insert the new ones. + /// + /// This doesn't actually check that the keys provided are newer than the old set. + pub fn add_signing_key( + &self, + origin: &ServerName, + new_keys: ServerSigningKeys, + ) -> Result, VerifyKey>> { + // Not atomic, but this is not critical + let signingkeys = self.server_signingkeys.get(origin.as_bytes())?; + + let mut keys = signingkeys + .and_then(|keys| serde_json::from_slice(&keys).ok()) + .unwrap_or_else(|| { + // Just insert "now", it doesn't matter + ServerSigningKeys::new(origin.to_owned(), MilliSecondsSinceUnixEpoch::now()) + }); + + let ServerSigningKeys { + verify_keys, + old_verify_keys, + .. + } = new_keys; + + keys.verify_keys.extend(verify_keys.into_iter()); + keys.old_verify_keys.extend(old_verify_keys.into_iter()); + + self.server_signingkeys.insert( + origin.as_bytes(), + &serde_json::to_vec(&keys).expect("serversigningkeys can be serialized"), + )?; + + let mut tree = keys.verify_keys; + tree.extend( + keys.old_verify_keys + .into_iter() + .map(|old| (old.0, VerifyKey::new(old.1.key))), + ); + + Ok(tree) + } + + /// This returns an empty `Ok(BTreeMap<..>)` when there are no keys found for the server. + pub fn signing_keys_for( + &self, + origin: &ServerName, + ) -> Result, VerifyKey>> { + let signingkeys = self + .server_signingkeys + .get(origin.as_bytes())? + .and_then(|bytes| serde_json::from_slice(&bytes).ok()) + .map(|keys: ServerSigningKeys| { + let mut tree = keys.verify_keys; + tree.extend( + keys.old_verify_keys + .into_iter() + .map(|old| (old.0, VerifyKey::new(old.1.key))), + ); + tree + }) + .unwrap_or_else(BTreeMap::new); + + Ok(signingkeys) + } + + pub fn database_version(&self) -> Result { + self.globals.get(b"version")?.map_or(Ok(0), |version| { + utils::u64_from_bytes(&version) + .map_err(|_| Error::bad_database("Database version id is invalid.")) + }) + } + + pub fn bump_database_version(&self, new_version: u64) -> Result<()> { + self.globals + .insert(b"version", &new_version.to_be_bytes())?; + Ok(()) + } + + pub fn get_media_folder(&self) -> PathBuf { + let mut r = PathBuf::new(); + r.push(self.config.database_path.clone()); + r.push("media"); + r + } + + pub fn get_media_file(&self, key: &[u8]) -> PathBuf { + let mut r = PathBuf::new(); + r.push(self.config.database_path.clone()); + r.push("media"); + r.push(base64::encode_config(key, base64::URL_SAFE_NO_PAD)); + r + } +} + +fn reqwest_client_builder(config: &Config) -> Result { + let mut reqwest_client_builder = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(30)) + .timeout(Duration::from_secs(60 * 3)); + + if let Some(proxy) = config.proxy.to_proxy()? { + reqwest_client_builder = reqwest_client_builder.proxy(proxy); + } + + Ok(reqwest_client_builder) +} diff --git a/src/service/key_backups/data.rs b/src/service/key_backups/data.rs new file mode 100644 index 00000000..be1d6b18 --- /dev/null +++ b/src/service/key_backups/data.rs @@ -0,0 +1,371 @@ +use crate::{utils, Error, Result, services}; +use ruma::{ + api::client::{ + backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, + error::ErrorKind, + }, + serde::Raw, + RoomId, UserId, +}; +use std::{collections::BTreeMap, sync::Arc}; + +impl KeyBackups { + pub fn create_backup( + &self, + user_id: &UserId, + backup_metadata: &Raw, + ) -> Result { + let version = services().globals.next_count()?.to_string(); + + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + self.backupid_algorithm.insert( + &key, + &serde_json::to_vec(backup_metadata).expect("BackupAlgorithm::to_vec always works"), + )?; + self.backupid_etag + .insert(&key, &services().globals.next_count()?.to_be_bytes())?; + Ok(version) + } + + pub fn delete_backup(&self, user_id: &UserId, version: &str) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + self.backupid_algorithm.remove(&key)?; + self.backupid_etag.remove(&key)?; + + key.push(0xff); + + for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { + self.backupkeyid_backup.remove(&outdated_key)?; + } + + Ok(()) + } + + pub fn update_backup( + &self, + user_id: &UserId, + version: &str, + backup_metadata: &Raw, + ) -> Result { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + if self.backupid_algorithm.get(&key)?.is_none() { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "Tried to update nonexistent backup.", + )); + } + + self.backupid_algorithm + .insert(&key, backup_metadata.json().get().as_bytes())?; + self.backupid_etag + .insert(&key, &services().globals.next_count()?.to_be_bytes())?; + Ok(version.to_owned()) + } + + pub fn get_latest_backup_version(&self, user_id: &UserId) -> Result> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + let mut last_possible_key = prefix.clone(); + last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); + + self.backupid_algorithm + .iter_from(&last_possible_key, true) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .next() + .map(|(key, _)| { + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| Error::bad_database("backupid_algorithm key is invalid.")) + }) + .transpose() + } + + pub fn get_latest_backup( + &self, + user_id: &UserId, + ) -> Result)>> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + let mut last_possible_key = prefix.clone(); + last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); + + self.backupid_algorithm + .iter_from(&last_possible_key, true) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .next() + .map(|(key, value)| { + let version = utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))?; + + Ok(( + version, + serde_json::from_slice(&value).map_err(|_| { + Error::bad_database("Algorithm in backupid_algorithm is invalid.") + })?, + )) + }) + .transpose() + } + + pub fn get_backup( + &self, + user_id: &UserId, + version: &str, + ) -> Result>> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + self.backupid_algorithm + .get(&key)? + .map_or(Ok(None), |bytes| { + serde_json::from_slice(&bytes) + .map_err(|_| Error::bad_database("Algorithm in backupid_algorithm is invalid.")) + }) + } + + pub fn add_key( + &self, + user_id: &UserId, + version: &str, + room_id: &RoomId, + session_id: &str, + key_data: &Raw, + ) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + if self.backupid_algorithm.get(&key)?.is_none() { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "Tried to update nonexistent backup.", + )); + } + + self.backupid_etag + .insert(&key, &services().globals.next_count()?.to_be_bytes())?; + + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(session_id.as_bytes()); + + self.backupkeyid_backup + .insert(&key, key_data.json().get().as_bytes())?; + + Ok(()) + } + + pub fn count_keys(&self, user_id: &UserId, version: &str) -> Result { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(version.as_bytes()); + + Ok(self.backupkeyid_backup.scan_prefix(prefix).count()) + } + + pub fn get_etag(&self, user_id: &UserId, version: &str) -> Result { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + Ok(utils::u64_from_bytes( + &self + .backupid_etag + .get(&key)? + .ok_or_else(|| Error::bad_database("Backup has no etag."))?, + ) + .map_err(|_| Error::bad_database("etag in backupid_etag invalid."))? + .to_string()) + } + + pub fn get_all( + &self, + user_id: &UserId, + version: &str, + ) -> Result, RoomKeyBackup>> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(version.as_bytes()); + prefix.push(0xff); + + let mut rooms = BTreeMap::, RoomKeyBackup>::new(); + + for result in self + .backupkeyid_backup + .scan_prefix(prefix) + .map(|(key, value)| { + let mut parts = key.rsplit(|&b| b == 0xff); + + let session_id = + utils::string_from_bytes(parts.next().ok_or_else(|| { + Error::bad_database("backupkeyid_backup key is invalid.") + })?) + .map_err(|_| { + Error::bad_database("backupkeyid_backup session_id is invalid.") + })?; + + let room_id = RoomId::parse( + utils::string_from_bytes(parts.next().ok_or_else(|| { + Error::bad_database("backupkeyid_backup key is invalid.") + })?) + .map_err(|_| Error::bad_database("backupkeyid_backup room_id is invalid."))?, + ) + .map_err(|_| { + Error::bad_database("backupkeyid_backup room_id is invalid room id.") + })?; + + let key_data = serde_json::from_slice(&value).map_err(|_| { + Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") + })?; + + Ok::<_, Error>((room_id, session_id, key_data)) + }) + { + let (room_id, session_id, key_data) = result?; + rooms + .entry(room_id) + .or_insert_with(|| RoomKeyBackup { + sessions: BTreeMap::new(), + }) + .sessions + .insert(session_id, key_data); + } + + Ok(rooms) + } + + pub fn get_room( + &self, + user_id: &UserId, + version: &str, + room_id: &RoomId, + ) -> Result>> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(version.as_bytes()); + prefix.push(0xff); + prefix.extend_from_slice(room_id.as_bytes()); + prefix.push(0xff); + + Ok(self + .backupkeyid_backup + .scan_prefix(prefix) + .map(|(key, value)| { + let mut parts = key.rsplit(|&b| b == 0xff); + + let session_id = + utils::string_from_bytes(parts.next().ok_or_else(|| { + Error::bad_database("backupkeyid_backup key is invalid.") + })?) + .map_err(|_| { + Error::bad_database("backupkeyid_backup session_id is invalid.") + })?; + + let key_data = serde_json::from_slice(&value).map_err(|_| { + Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") + })?; + + Ok::<_, Error>((session_id, key_data)) + }) + .filter_map(|r| r.ok()) + .collect()) + } + + pub fn get_session( + &self, + user_id: &UserId, + version: &str, + room_id: &RoomId, + session_id: &str, + ) -> Result>> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(session_id.as_bytes()); + + self.backupkeyid_backup + .get(&key)? + .map(|value| { + serde_json::from_slice(&value).map_err(|_| { + Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") + }) + }) + .transpose() + } + + pub fn delete_all_keys(&self, user_id: &UserId, version: &str) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + key.push(0xff); + + for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { + self.backupkeyid_backup.remove(&outdated_key)?; + } + + Ok(()) + } + + pub fn delete_room_keys( + &self, + user_id: &UserId, + version: &str, + room_id: &RoomId, + ) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + key.push(0xff); + + for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { + self.backupkeyid_backup.remove(&outdated_key)?; + } + + Ok(()) + } + + pub fn delete_room_key( + &self, + user_id: &UserId, + version: &str, + room_id: &RoomId, + session_id: &str, + ) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(session_id.as_bytes()); + + for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { + self.backupkeyid_backup.remove(&outdated_key)?; + } + + Ok(()) + } +} diff --git a/src/service/key_backups/mod.rs b/src/service/key_backups/mod.rs new file mode 100644 index 00000000..be1d6b18 --- /dev/null +++ b/src/service/key_backups/mod.rs @@ -0,0 +1,371 @@ +use crate::{utils, Error, Result, services}; +use ruma::{ + api::client::{ + backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup}, + error::ErrorKind, + }, + serde::Raw, + RoomId, UserId, +}; +use std::{collections::BTreeMap, sync::Arc}; + +impl KeyBackups { + pub fn create_backup( + &self, + user_id: &UserId, + backup_metadata: &Raw, + ) -> Result { + let version = services().globals.next_count()?.to_string(); + + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + self.backupid_algorithm.insert( + &key, + &serde_json::to_vec(backup_metadata).expect("BackupAlgorithm::to_vec always works"), + )?; + self.backupid_etag + .insert(&key, &services().globals.next_count()?.to_be_bytes())?; + Ok(version) + } + + pub fn delete_backup(&self, user_id: &UserId, version: &str) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + self.backupid_algorithm.remove(&key)?; + self.backupid_etag.remove(&key)?; + + key.push(0xff); + + for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { + self.backupkeyid_backup.remove(&outdated_key)?; + } + + Ok(()) + } + + pub fn update_backup( + &self, + user_id: &UserId, + version: &str, + backup_metadata: &Raw, + ) -> Result { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + if self.backupid_algorithm.get(&key)?.is_none() { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "Tried to update nonexistent backup.", + )); + } + + self.backupid_algorithm + .insert(&key, backup_metadata.json().get().as_bytes())?; + self.backupid_etag + .insert(&key, &services().globals.next_count()?.to_be_bytes())?; + Ok(version.to_owned()) + } + + pub fn get_latest_backup_version(&self, user_id: &UserId) -> Result> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + let mut last_possible_key = prefix.clone(); + last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); + + self.backupid_algorithm + .iter_from(&last_possible_key, true) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .next() + .map(|(key, _)| { + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| Error::bad_database("backupid_algorithm key is invalid.")) + }) + .transpose() + } + + pub fn get_latest_backup( + &self, + user_id: &UserId, + ) -> Result)>> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + let mut last_possible_key = prefix.clone(); + last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); + + self.backupid_algorithm + .iter_from(&last_possible_key, true) + .take_while(move |(k, _)| k.starts_with(&prefix)) + .next() + .map(|(key, value)| { + let version = utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))?; + + Ok(( + version, + serde_json::from_slice(&value).map_err(|_| { + Error::bad_database("Algorithm in backupid_algorithm is invalid.") + })?, + )) + }) + .transpose() + } + + pub fn get_backup( + &self, + user_id: &UserId, + version: &str, + ) -> Result>> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + self.backupid_algorithm + .get(&key)? + .map_or(Ok(None), |bytes| { + serde_json::from_slice(&bytes) + .map_err(|_| Error::bad_database("Algorithm in backupid_algorithm is invalid.")) + }) + } + + pub fn add_key( + &self, + user_id: &UserId, + version: &str, + room_id: &RoomId, + session_id: &str, + key_data: &Raw, + ) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + if self.backupid_algorithm.get(&key)?.is_none() { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "Tried to update nonexistent backup.", + )); + } + + self.backupid_etag + .insert(&key, &services().globals.next_count()?.to_be_bytes())?; + + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(session_id.as_bytes()); + + self.backupkeyid_backup + .insert(&key, key_data.json().get().as_bytes())?; + + Ok(()) + } + + pub fn count_keys(&self, user_id: &UserId, version: &str) -> Result { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(version.as_bytes()); + + Ok(self.backupkeyid_backup.scan_prefix(prefix).count()) + } + + pub fn get_etag(&self, user_id: &UserId, version: &str) -> Result { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + + Ok(utils::u64_from_bytes( + &self + .backupid_etag + .get(&key)? + .ok_or_else(|| Error::bad_database("Backup has no etag."))?, + ) + .map_err(|_| Error::bad_database("etag in backupid_etag invalid."))? + .to_string()) + } + + pub fn get_all( + &self, + user_id: &UserId, + version: &str, + ) -> Result, RoomKeyBackup>> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(version.as_bytes()); + prefix.push(0xff); + + let mut rooms = BTreeMap::, RoomKeyBackup>::new(); + + for result in self + .backupkeyid_backup + .scan_prefix(prefix) + .map(|(key, value)| { + let mut parts = key.rsplit(|&b| b == 0xff); + + let session_id = + utils::string_from_bytes(parts.next().ok_or_else(|| { + Error::bad_database("backupkeyid_backup key is invalid.") + })?) + .map_err(|_| { + Error::bad_database("backupkeyid_backup session_id is invalid.") + })?; + + let room_id = RoomId::parse( + utils::string_from_bytes(parts.next().ok_or_else(|| { + Error::bad_database("backupkeyid_backup key is invalid.") + })?) + .map_err(|_| Error::bad_database("backupkeyid_backup room_id is invalid."))?, + ) + .map_err(|_| { + Error::bad_database("backupkeyid_backup room_id is invalid room id.") + })?; + + let key_data = serde_json::from_slice(&value).map_err(|_| { + Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") + })?; + + Ok::<_, Error>((room_id, session_id, key_data)) + }) + { + let (room_id, session_id, key_data) = result?; + rooms + .entry(room_id) + .or_insert_with(|| RoomKeyBackup { + sessions: BTreeMap::new(), + }) + .sessions + .insert(session_id, key_data); + } + + Ok(rooms) + } + + pub fn get_room( + &self, + user_id: &UserId, + version: &str, + room_id: &RoomId, + ) -> Result>> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(version.as_bytes()); + prefix.push(0xff); + prefix.extend_from_slice(room_id.as_bytes()); + prefix.push(0xff); + + Ok(self + .backupkeyid_backup + .scan_prefix(prefix) + .map(|(key, value)| { + let mut parts = key.rsplit(|&b| b == 0xff); + + let session_id = + utils::string_from_bytes(parts.next().ok_or_else(|| { + Error::bad_database("backupkeyid_backup key is invalid.") + })?) + .map_err(|_| { + Error::bad_database("backupkeyid_backup session_id is invalid.") + })?; + + let key_data = serde_json::from_slice(&value).map_err(|_| { + Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") + })?; + + Ok::<_, Error>((session_id, key_data)) + }) + .filter_map(|r| r.ok()) + .collect()) + } + + pub fn get_session( + &self, + user_id: &UserId, + version: &str, + room_id: &RoomId, + session_id: &str, + ) -> Result>> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(session_id.as_bytes()); + + self.backupkeyid_backup + .get(&key)? + .map(|value| { + serde_json::from_slice(&value).map_err(|_| { + Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") + }) + }) + .transpose() + } + + pub fn delete_all_keys(&self, user_id: &UserId, version: &str) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + key.push(0xff); + + for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { + self.backupkeyid_backup.remove(&outdated_key)?; + } + + Ok(()) + } + + pub fn delete_room_keys( + &self, + user_id: &UserId, + version: &str, + room_id: &RoomId, + ) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + key.push(0xff); + + for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { + self.backupkeyid_backup.remove(&outdated_key)?; + } + + Ok(()) + } + + pub fn delete_room_key( + &self, + user_id: &UserId, + version: &str, + room_id: &RoomId, + session_id: &str, + ) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(version.as_bytes()); + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(session_id.as_bytes()); + + for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { + self.backupkeyid_backup.remove(&outdated_key)?; + } + + Ok(()) + } +} diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs new file mode 100644 index 00000000..1bdf6d47 --- /dev/null +++ b/src/service/media/mod.rs @@ -0,0 +1,357 @@ +use image::{imageops::FilterType, GenericImageView}; + +use super::abstraction::Tree; +use crate::{utils, Error, Result}; +use std::{mem, sync::Arc}; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncWriteExt}, +}; + +pub struct FileMeta { + pub content_disposition: Option, + pub content_type: Option, + pub file: Vec, +} + +pub struct Media { + pub(super) mediaid_file: Arc, // MediaId = MXC + WidthHeight + ContentDisposition + ContentType +} + +impl Media { + /// Uploads a file. + pub async fn create( + &self, + mxc: String, + globals: &Globals, + content_disposition: &Option<&str>, + content_type: &Option<&str>, + file: &[u8], + ) -> Result<()> { + let mut key = mxc.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&0_u32.to_be_bytes()); // Width = 0 if it's not a thumbnail + key.extend_from_slice(&0_u32.to_be_bytes()); // Height = 0 if it's not a thumbnail + key.push(0xff); + key.extend_from_slice( + content_disposition + .as_ref() + .map(|f| f.as_bytes()) + .unwrap_or_default(), + ); + key.push(0xff); + key.extend_from_slice( + content_type + .as_ref() + .map(|c| c.as_bytes()) + .unwrap_or_default(), + ); + + let path = globals.get_media_file(&key); + let mut f = File::create(path).await?; + f.write_all(file).await?; + + self.mediaid_file.insert(&key, &[])?; + Ok(()) + } + + /// Uploads or replaces a file thumbnail. + #[allow(clippy::too_many_arguments)] + pub async fn upload_thumbnail( + &self, + mxc: String, + globals: &Globals, + content_disposition: &Option, + content_type: &Option, + width: u32, + height: u32, + file: &[u8], + ) -> Result<()> { + let mut key = mxc.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&width.to_be_bytes()); + key.extend_from_slice(&height.to_be_bytes()); + key.push(0xff); + key.extend_from_slice( + content_disposition + .as_ref() + .map(|f| f.as_bytes()) + .unwrap_or_default(), + ); + key.push(0xff); + key.extend_from_slice( + content_type + .as_ref() + .map(|c| c.as_bytes()) + .unwrap_or_default(), + ); + + let path = globals.get_media_file(&key); + let mut f = File::create(path).await?; + f.write_all(file).await?; + + self.mediaid_file.insert(&key, &[])?; + + Ok(()) + } + + /// Downloads a file. + pub async fn get(&self, globals: &Globals, mxc: &str) -> Result> { + let mut prefix = mxc.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(&0_u32.to_be_bytes()); // Width = 0 if it's not a thumbnail + prefix.extend_from_slice(&0_u32.to_be_bytes()); // Height = 0 if it's not a thumbnail + prefix.push(0xff); + + let first = self.mediaid_file.scan_prefix(prefix).next(); + if let Some((key, _)) = first { + let path = globals.get_media_file(&key); + let mut file = Vec::new(); + File::open(path).await?.read_to_end(&mut file).await?; + let mut parts = key.rsplit(|&b| b == 0xff); + + let content_type = parts + .next() + .map(|bytes| { + utils::string_from_bytes(bytes).map_err(|_| { + Error::bad_database("Content type in mediaid_file is invalid unicode.") + }) + }) + .transpose()?; + + let content_disposition_bytes = parts + .next() + .ok_or_else(|| Error::bad_database("Media ID in db is invalid."))?; + + let content_disposition = if content_disposition_bytes.is_empty() { + None + } else { + Some( + utils::string_from_bytes(content_disposition_bytes).map_err(|_| { + Error::bad_database( + "Content Disposition in mediaid_file is invalid unicode.", + ) + })?, + ) + }; + + Ok(Some(FileMeta { + content_disposition, + content_type, + file, + })) + } else { + Ok(None) + } + } + + /// Returns width, height of the thumbnail and whether it should be cropped. Returns None when + /// the server should send the original file. + pub fn thumbnail_properties(&self, width: u32, height: u32) -> Option<(u32, u32, bool)> { + match (width, height) { + (0..=32, 0..=32) => Some((32, 32, true)), + (0..=96, 0..=96) => Some((96, 96, true)), + (0..=320, 0..=240) => Some((320, 240, false)), + (0..=640, 0..=480) => Some((640, 480, false)), + (0..=800, 0..=600) => Some((800, 600, false)), + _ => None, + } + } + + /// Downloads a file's thumbnail. + /// + /// Here's an example on how it works: + /// + /// - Client requests an image with width=567, height=567 + /// - Server rounds that up to (800, 600), so it doesn't have to save too many thumbnails + /// - Server rounds that up again to (958, 600) to fix the aspect ratio (only for width,height>96) + /// - Server creates the thumbnail and sends it to the user + /// + /// For width,height <= 96 the server uses another thumbnailing algorithm which crops the image afterwards. + pub async fn get_thumbnail( + &self, + mxc: &str, + globals: &Globals, + width: u32, + height: u32, + ) -> Result> { + let (width, height, crop) = self + .thumbnail_properties(width, height) + .unwrap_or((0, 0, false)); // 0, 0 because that's the original file + + let mut main_prefix = mxc.as_bytes().to_vec(); + main_prefix.push(0xff); + + let mut thumbnail_prefix = main_prefix.clone(); + thumbnail_prefix.extend_from_slice(&width.to_be_bytes()); + thumbnail_prefix.extend_from_slice(&height.to_be_bytes()); + thumbnail_prefix.push(0xff); + + let mut original_prefix = main_prefix; + original_prefix.extend_from_slice(&0_u32.to_be_bytes()); // Width = 0 if it's not a thumbnail + original_prefix.extend_from_slice(&0_u32.to_be_bytes()); // Height = 0 if it's not a thumbnail + original_prefix.push(0xff); + + let first_thumbnailprefix = self.mediaid_file.scan_prefix(thumbnail_prefix).next(); + let first_originalprefix = self.mediaid_file.scan_prefix(original_prefix).next(); + if let Some((key, _)) = first_thumbnailprefix { + // Using saved thumbnail + let path = globals.get_media_file(&key); + let mut file = Vec::new(); + File::open(path).await?.read_to_end(&mut file).await?; + let mut parts = key.rsplit(|&b| b == 0xff); + + let content_type = parts + .next() + .map(|bytes| { + utils::string_from_bytes(bytes).map_err(|_| { + Error::bad_database("Content type in mediaid_file is invalid unicode.") + }) + }) + .transpose()?; + + let content_disposition_bytes = parts + .next() + .ok_or_else(|| Error::bad_database("Media ID in db is invalid."))?; + + let content_disposition = if content_disposition_bytes.is_empty() { + None + } else { + Some( + utils::string_from_bytes(content_disposition_bytes).map_err(|_| { + Error::bad_database("Content Disposition in db is invalid.") + })?, + ) + }; + + Ok(Some(FileMeta { + content_disposition, + content_type, + file: file.to_vec(), + })) + } else if let Some((key, _)) = first_originalprefix { + // Generate a thumbnail + let path = globals.get_media_file(&key); + let mut file = Vec::new(); + File::open(path).await?.read_to_end(&mut file).await?; + + let mut parts = key.rsplit(|&b| b == 0xff); + + let content_type = parts + .next() + .map(|bytes| { + utils::string_from_bytes(bytes).map_err(|_| { + Error::bad_database("Content type in mediaid_file is invalid unicode.") + }) + }) + .transpose()?; + + let content_disposition_bytes = parts + .next() + .ok_or_else(|| Error::bad_database("Media ID in db is invalid."))?; + + let content_disposition = if content_disposition_bytes.is_empty() { + None + } else { + Some( + utils::string_from_bytes(content_disposition_bytes).map_err(|_| { + Error::bad_database( + "Content Disposition in mediaid_file is invalid unicode.", + ) + })?, + ) + }; + + if let Ok(image) = image::load_from_memory(&file) { + let original_width = image.width(); + let original_height = image.height(); + if width > original_width || height > original_height { + return Ok(Some(FileMeta { + content_disposition, + content_type, + file: file.to_vec(), + })); + } + + let thumbnail = if crop { + image.resize_to_fill(width, height, FilterType::CatmullRom) + } else { + let (exact_width, exact_height) = { + // Copied from image::dynimage::resize_dimensions + let ratio = u64::from(original_width) * u64::from(height); + let nratio = u64::from(width) * u64::from(original_height); + + let use_width = nratio <= ratio; + let intermediate = if use_width { + u64::from(original_height) * u64::from(width) + / u64::from(original_width) + } else { + u64::from(original_width) * u64::from(height) + / u64::from(original_height) + }; + if use_width { + if intermediate <= u64::from(::std::u32::MAX) { + (width, intermediate as u32) + } else { + ( + (u64::from(width) * u64::from(::std::u32::MAX) / intermediate) + as u32, + ::std::u32::MAX, + ) + } + } else if intermediate <= u64::from(::std::u32::MAX) { + (intermediate as u32, height) + } else { + ( + ::std::u32::MAX, + (u64::from(height) * u64::from(::std::u32::MAX) / intermediate) + as u32, + ) + } + }; + + image.thumbnail_exact(exact_width, exact_height) + }; + + let mut thumbnail_bytes = Vec::new(); + thumbnail.write_to(&mut thumbnail_bytes, image::ImageOutputFormat::Png)?; + + // Save thumbnail in database so we don't have to generate it again next time + let mut thumbnail_key = key.to_vec(); + let width_index = thumbnail_key + .iter() + .position(|&b| b == 0xff) + .ok_or_else(|| Error::bad_database("Media in db is invalid."))? + + 1; + let mut widthheight = width.to_be_bytes().to_vec(); + widthheight.extend_from_slice(&height.to_be_bytes()); + + thumbnail_key.splice( + width_index..width_index + 2 * mem::size_of::(), + widthheight, + ); + + let path = globals.get_media_file(&thumbnail_key); + let mut f = File::create(path).await?; + f.write_all(&thumbnail_bytes).await?; + + self.mediaid_file.insert(&thumbnail_key, &[])?; + + Ok(Some(FileMeta { + content_disposition, + content_type, + file: thumbnail_bytes.to_vec(), + })) + } else { + // Couldn't parse file to generate thumbnail, send original + Ok(Some(FileMeta { + content_disposition, + content_type, + file: file.to_vec(), + })) + } + } else { + Ok(None) + } + } +}