From b18b228c7cc95da17fc60bef0ca8283ca88fe6d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Kub=C3=ADk?= Date: Fri, 18 Nov 2022 22:32:50 +0100 Subject: [PATCH] fix(presence): fix issues found when testing --- src/api/client_server/presence.rs | 19 ++- src/api/client_server/profile.rs | 2 + src/api/client_server/sync.rs | 11 +- src/api/server_server.rs | 1 + src/database/key_value/rooms/edus/presence.rs | 159 +++++++++++------- src/database/mod.rs | 3 + src/service/rooms/edus/presence/data.rs | 4 +- src/service/rooms/edus/presence/mod.rs | 58 ++++--- 8 files changed, 161 insertions(+), 96 deletions(-) diff --git a/src/api/client_server/presence.rs b/src/api/client_server/presence.rs index 7afda962..f363933e 100644 --- a/src/api/client_server/presence.rs +++ b/src/api/client_server/presence.rs @@ -1,5 +1,5 @@ -use crate::{services, utils, Result, Ruma}; -use ruma::api::client::presence::{get_presence, set_presence}; +use crate::{services, Result, Ruma}; +use ruma::{api::client::presence::{get_presence, set_presence}, uint, presence::PresenceState}; use std::time::Duration; /// # `PUT /_matrix/client/r0/presence/{userId}/status` @@ -21,16 +21,13 @@ pub async fn set_presence_route( avatar_url: services().users.avatar_url(sender_user)?, currently_active: None, displayname: services().users.displayname(sender_user)?, - last_active_ago: Some( - utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - ), + last_active_ago: Some(uint!(0)), presence: body.presence.clone(), status_msg: body.status_msg.clone(), }, sender: sender_user.clone(), }, + true )?; } @@ -69,7 +66,6 @@ pub async fn get_presence_route( if let Some(presence) = presence_event { Ok(get_presence::v3::Response { - // TODO: Should ruma just use the presenceeventcontent type here? status_msg: presence.content.status_msg, currently_active: presence.content.currently_active, last_active_ago: presence @@ -79,6 +75,11 @@ pub async fn get_presence_route( presence: presence.content.presence, }) } else { - todo!(); + Ok(get_presence::v3::Response { + status_msg: None, + currently_active: None, + last_active_ago: None, + presence: PresenceState::Offline, + }) } } diff --git a/src/api/client_server/profile.rs b/src/api/client_server/profile.rs index 5ace1777..0e667290 100644 --- a/src/api/client_server/profile.rs +++ b/src/api/client_server/profile.rs @@ -109,6 +109,7 @@ pub async fn set_displayname_route( }, sender: sender_user.clone(), }, + true )?; } @@ -244,6 +245,7 @@ pub async fn set_avatar_url_route( }, sender: sender_user.clone(), }, + true )?; } diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 94e4f5bb..e3c250c1 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -166,7 +166,16 @@ async fn sync_helper( }; // TODO: match body.set_presence { - services().rooms.edus.presence.ping_presence(&sender_user)?; + services() + .rooms + .edus + .presence + .ping_presence( + &sender_user, + false, + true, + true + )?; // Setup watchers, so if there's no response, we can wait for them let watcher = services().globals.watch(&sender_user, &sender_device); diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 9154b3ef..564843a6 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -770,6 +770,7 @@ pub async fn send_transaction_message_route( }, sender: user_id.clone(), }, + true )?; } } diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs index dae6f759..159edca6 100644 --- a/src/database/key_value/rooms/edus/presence.rs +++ b/src/database/key_value/rooms/edus/presence.rs @@ -1,7 +1,7 @@ use futures_util::{stream::FuturesUnordered, StreamExt}; use ruma::user_id; -use std::{collections::HashMap, time::Duration}; -use tracing::error; +use std::{collections::{HashMap, hash_map::Entry}, time::Duration, mem}; +use tracing::{error, info}; use ruma::{ events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, @@ -17,19 +17,22 @@ use crate::{ pub struct PresenceUpdate { count: u64, - timestamp: u64, + prev_timestamp: u64, + curr_timestamp: u64, } impl PresenceUpdate { fn to_be_bytes(&self) -> Vec { - [self.count.to_be_bytes(), self.timestamp.to_be_bytes()].concat() + [self.count.to_be_bytes(), self.prev_timestamp.to_be_bytes(), self.curr_timestamp.to_be_bytes()].concat() } fn from_be_bytes(bytes: &[u8]) -> Result { - let (count_bytes, timestamp_bytes) = bytes.split_at(bytes.len() / 2); + let (count_bytes, timestamps_bytes) = bytes.split_at(mem::size_of::()); + let (prev_timestamp_bytes, curr_timestamp_bytes) = timestamps_bytes.split_at(mem::size_of::()); Ok(Self { count: u64_from_bytes(count_bytes).expect("count bytes from DB are valid"), - timestamp: u64_from_bytes(timestamp_bytes).expect("timestamp bytes from DB are valid"), + prev_timestamp: u64_from_bytes(prev_timestamp_bytes).expect("timestamp bytes from DB are valid"), + curr_timestamp: u64_from_bytes(curr_timestamp_bytes).expect("timestamp bytes from DB are valid"), }) } } @@ -48,14 +51,17 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { &serde_json::to_vec(&presence).expect("presence event from DB is valid"), )?; + let timestamp = match presence.content.last_active_ago { + Some(active_ago) => millis_since_unix_epoch().saturating_sub(active_ago.into()), + None => millis_since_unix_epoch(), + }; + self.userid_presenceupdate.insert( user_id.as_bytes(), &*PresenceUpdate { count: services().globals.next_count()?, - timestamp: match presence.content.last_active_ago { - Some(active_ago) => millis_since_unix_epoch().saturating_sub(active_ago.into()), - None => millis_since_unix_epoch(), - }, + prev_timestamp: timestamp, + curr_timestamp: timestamp, } .to_be_bytes(), )?; @@ -63,23 +69,41 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { Ok(()) } - fn ping_presence(&self, user_id: &UserId) -> Result<()> { + fn ping_presence(&self, user_id: &UserId, update_count: bool, update_timestamp: bool) -> Result<()> { + let now = millis_since_unix_epoch(); + + let presence = self.userid_presenceupdate + .get(user_id.as_bytes())? + .map(|presence_bytes| PresenceUpdate::from_be_bytes(&presence_bytes)) + .transpose()?; + + let new_presence = match presence { + Some(presence) => { + PresenceUpdate { + count: if update_count { services().globals.next_count()? } else { presence.count }, + prev_timestamp: if update_timestamp { presence.curr_timestamp } else { presence.prev_timestamp }, + curr_timestamp: if update_timestamp { now } else { presence.curr_timestamp } + } + }, + None => PresenceUpdate { + count: services().globals.current_count()?, + prev_timestamp: now, + curr_timestamp: now, + } + }; + self.userid_presenceupdate.insert( user_id.as_bytes(), - &*PresenceUpdate { - count: services().globals.current_count()?, - timestamp: millis_since_unix_epoch(), - } - .to_be_bytes(), + &*new_presence.to_be_bytes(), )?; Ok(()) } - fn last_presence_update(&self, user_id: &UserId) -> Result> { + fn last_presence_update(&self, user_id: &UserId) -> Result> { self.userid_presenceupdate .get(user_id.as_bytes())? - .map(|bytes| PresenceUpdate::from_be_bytes(&bytes).map(|update| update.timestamp)) + .map(|bytes| PresenceUpdate::from_be_bytes(&bytes).map(|update| (update.prev_timestamp, update.curr_timestamp))) .transpose() } @@ -101,21 +125,22 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { room_id: &RoomId, since: u64, ) -> Result + 'a>> { - let services = &services(); let user_timestamp: HashMap = self .userid_presenceupdate .iter() .filter_map(|(user_id_bytes, update_bytes)| { Some(( - OwnedUserId::from( - UserId::parse(utils::string_from_bytes(&user_id_bytes).ok()?).ok()?, - ), - PresenceUpdate::from_be_bytes(&update_bytes).ok()?, + UserId::parse( + utils::string_from_bytes(&user_id_bytes) + .expect("UserID bytes are a valid string") + ).expect("UserID bytes from database are a valid UserID"), + PresenceUpdate::from_be_bytes(&update_bytes) + .expect("PresenceUpdate bytes from database are a valid PresenceUpdate"), )) }) .filter_map(|(user_id, presence_update)| { if presence_update.count <= since - || !services + || !services() .rooms .state_cache .is_joined(&user_id, room_id) @@ -124,18 +149,20 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { return None; } - Some((user_id, presence_update.timestamp)) + Some((user_id, presence_update.curr_timestamp)) }) .collect(); Ok(Box::new( self.roomuserid_presenceevent - .iter() - .filter_map(|(user_id_bytes, presence_bytes)| { + .scan_prefix(room_id.as_bytes().to_vec()) + .filter_map(|(roomuserid_bytes, presence_bytes)| { + let user_id_bytes = roomuserid_bytes.split(|byte| *byte == 0xff as u8).last()?; Some(( - OwnedUserId::from( - UserId::parse(utils::string_from_bytes(&user_id_bytes).ok()?).ok()?, - ), + UserId::parse( + utils::string_from_bytes(&user_id_bytes) + .expect("UserID bytes are a valid string") + ).expect("UserID bytes from database are a valid UserID").to_owned(), presence_bytes, )) }) @@ -145,7 +172,8 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { Some(( user_id, - parse_presence_event(&presence_bytes, *timestamp).ok()?, + parse_presence_event(&presence_bytes, *timestamp) + .expect("PresenceEvent bytes from database are a valid PresenceEvent"), )) }, ), @@ -157,6 +185,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { mut timer_receiver: mpsc::UnboundedReceiver, ) -> Result<()> { let mut timers = FuturesUnordered::new(); + let mut timers_timestamp: HashMap = HashMap::new(); // TODO: Get rid of this hack timers.push(create_presence_timer( @@ -168,10 +197,11 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { loop { tokio::select! { Some(user_id) = timers.next() => { - let presence_timestamp = match services().rooms.edus.presence.last_presence_update(&user_id) { - Ok(timestamp) => match timestamp { - Some(timestamp) => timestamp, - None => continue, + info!("Processing timer for user '{}' ({})", user_id.clone(), timers.len()); + let (prev_timestamp, curr_timestamp) = match services().rooms.edus.presence.last_presence_update(&user_id) { + Ok(timestamp_tuple) => match timestamp_tuple { + Some(timestamp_tuple) => timestamp_tuple, + None => continue, }, Err(e) => { error!("{e}"); @@ -179,46 +209,49 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { } }; - let presence_state = determine_presence_state(presence_timestamp); + let prev_presence_state = determine_presence_state(prev_timestamp); + let curr_presence_state = determine_presence_state(curr_timestamp); // Continue if there is no change in state - if presence_state != PresenceState::Offline { + if prev_presence_state == curr_presence_state { continue; } - for room_id in services() - .rooms - .state_cache - .rooms_joined(&user_id) - .filter_map(|room_id| room_id.ok()) { - let presence_event = match services().rooms.edus.presence.get_presence_event(&user_id, &room_id) { - Ok(event) => match event { - Some(event) => event, - None => continue, - }, - Err(e) => { - error!("{e}"); - continue; - } - }; - - match services().rooms.edus.presence.update_presence(&user_id, &room_id, presence_event) { - Ok(()) => (), - Err(e) => { - error!("{e}"); - continue; - } - } - - // TODO: Send event over federation + match services().rooms.edus.presence.ping_presence(&user_id, true, false, false) { + Ok(_) => (), + Err(e) => error!("{e}") } + + // TODO: Notify federation sender } Some(user_id) = timer_receiver.recv() => { + let now = millis_since_unix_epoch(); + let should_send = match timers_timestamp.entry(user_id.to_owned()) { + Entry::Occupied(mut entry) => { + if now - entry.get() > 15 * 1000 { + entry.insert(now); + true + } else { + false + } + }, + Entry::Vacant(entry) => { + entry.insert(now); + true + } + }; + + if !should_send { + continue; + } + // Idle timeout timers.push(create_presence_timer(Duration::from_secs(60), user_id.clone())); // Offline timeout - timers.push(create_presence_timer(Duration::from_secs(60*15) , user_id)); + timers.push(create_presence_timer(Duration::from_secs(60*15) , user_id.clone())); + + info!("Added timers for user '{}' ({})", user_id, timers.len()); } } } diff --git a/src/database/mod.rs b/src/database/mod.rs index 7baa512a..563076e1 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -825,6 +825,9 @@ impl KeyValueDatabase { ); } + // Flush old presence data + db.userid_presenceupdate.clear()?; + services().admin.start_handler(); // Set emergency access for the conduit user diff --git a/src/service/rooms/edus/presence/data.rs b/src/service/rooms/edus/presence/data.rs index 5dc4c3cb..d90eaece 100644 --- a/src/service/rooms/edus/presence/data.rs +++ b/src/service/rooms/edus/presence/data.rs @@ -15,10 +15,10 @@ pub trait Data: Send + Sync { ) -> Result<()>; /// Resets the presence timeout, so the user will stay in their current presence state. - fn ping_presence(&self, user_id: &UserId) -> Result<()>; + fn ping_presence(&self, user_id: &UserId, update_count: bool, update_timestamp: bool) -> Result<()>; /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. - fn last_presence_update(&self, user_id: &UserId) -> Result>; + fn last_presence_update(&self, user_id: &UserId) -> Result>; /// Returns the presence event with correct last_active_ago. fn get_presence_event( diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs index faac5c76..427c4fd1 100644 --- a/src/service/rooms/edus/presence/mod.rs +++ b/src/service/rooms/edus/presence/mod.rs @@ -14,18 +14,28 @@ pub struct Service { } impl Service { + /// Builds the service and initialized the presence_maintain task pub fn build(db: &'static dyn Data) -> Result { let (sender, receiver) = mpsc::unbounded_channel(); let service = Self { db, timer_sender: sender, }; - + service.presence_maintain(receiver)?; Ok(service) } + /// Resets the presence timeout, so the user will stay in their current presence state. + pub fn ping_presence(&self, user_id: &UserId, update_count: bool, update_timestamp: bool, spawn_timer: bool) -> Result<()> { + if spawn_timer { + self.spawn_timer(user_id)?; + } + + self.db.ping_presence(user_id, update_count, update_timestamp) + } + /// Adds a presence event which will be saved until a new event replaces it. /// /// Note: This method takes a RoomId because presence updates are always bound to rooms to @@ -35,45 +45,34 @@ impl Service { user_id: &UserId, room_id: &RoomId, presence: PresenceEvent, + spawn_timer: bool ) -> Result<()> { - self.timer_sender - .send(user_id.into()) - .map_err(|_| Error::bad_database("Sender errored out"))?; + if spawn_timer { + self.spawn_timer(user_id)?; + } + self.db.update_presence(user_id, room_id, presence) } - /// Resets the presence timeout, so the user will stay in their current presence state. - pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { - self.timer_sender - .send(user_id.into()) - .map_err(|_| Error::bad_database("Sender errored out"))?; - self.db.ping_presence(user_id) - } - - pub fn last_presence_update(&self, user_id: &UserId) -> Result> { + /// Returns the timestamp of when the presence was last updated for the specified user. + pub fn last_presence_update(&self, user_id: &UserId) -> Result> { self.db.last_presence_update(user_id) } + /// Returns the saved presence event for this user with actual last_active_ago. pub fn get_presence_event( &self, user_id: &UserId, room_id: &RoomId, ) -> Result> { let last_update = match self.db.last_presence_update(user_id)? { - Some(last) => last, + Some(last) => last.1, None => return Ok(None), }; self.db.get_presence_event(room_id, user_id, last_update) } - pub fn presence_maintain( - &self, - timer_receiver: mpsc::UnboundedReceiver, - ) -> Result<()> { - self.db.presence_maintain(timer_receiver) - } - /// Returns the most recent presence updates that happened after the event with id `since`. #[tracing::instrument(skip(self, since, room_id))] pub fn presence_since( @@ -83,4 +82,21 @@ impl Service { ) -> Result>> { self.db.presence_since(room_id, since) } + + /// Spawns a task maintaining presence data + fn presence_maintain( + &self, + timer_receiver: mpsc::UnboundedReceiver, + ) -> Result<()> { + self.db.presence_maintain(timer_receiver) + } + + /// Spawns a timer for the user used by the maintenance task + fn spawn_timer(&self, user_id: &UserId) -> Result<()> { + self.timer_sender + .send(user_id.into()) + .map_err(|_| Error::bad_database("Sender errored out"))?; + + Ok(()) + } }