diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs index c2348492..f7345f6c 100644 --- a/src/database/key_value/rooms/edus/presence.rs +++ b/src/database/key_value/rooms/edus/presence.rs @@ -1,5 +1,4 @@ use futures_util::{stream::FuturesUnordered, StreamExt}; -use ruma::user_id; use std::{ collections::{hash_map::Entry, HashMap}, mem, @@ -157,8 +156,8 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { let user_timestamp: HashMap = self .userid_presenceupdate .iter() - .filter_map(|(user_id_bytes, update_bytes)| { - Some(( + .map(|(user_id_bytes, update_bytes)| { + ( UserId::parse( utils::string_from_bytes(&user_id_bytes) .expect("UserID bytes are a valid string"), @@ -166,7 +165,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { .expect("UserID bytes from database are a valid UserID"), PresenceUpdate::from_be_bytes(&update_bytes) .expect("PresenceUpdate bytes from database are a valid PresenceUpdate"), - )) + ) }) .filter_map(|(user_id, presence_update)| { if presence_update.count <= since @@ -301,10 +300,49 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { let period = Duration::from_secs(services().globals.presence_cleanup_period()); let age_limit = Duration::from_secs(services().globals.presence_cleanup_limit()); + let userid_presenceupdate = self.userid_presenceupdate.clone(); + let roomuserid_presenceevent = self.roomuserid_presenceevent.clone(); + tokio::spawn(async move { loop { - // TODO: Cleanup + let mut removed_events: u64 = 0; + let age_limit_curr = millis_since_unix_epoch().saturating_sub(age_limit.as_millis() as u64); + for user_id in userid_presenceupdate + .iter() + .map(|(user_id_bytes, update_bytes)| { + ( + UserId::parse( + utils::string_from_bytes(&user_id_bytes) + .expect("UserID bytes are a valid string"), + ) + .expect("UserID bytes from database are a valid UserID"), + PresenceUpdate::from_be_bytes(&update_bytes) + .expect("PresenceUpdate bytes from database are a valid PresenceUpdate"), + ) + }) + .filter_map(|(user_id, presence_update)| { + if presence_update.curr_timestamp < age_limit_curr { + return None; + } + + Some(user_id) + }) + { + for room_id in services() + .rooms + .state_cache + .rooms_joined(&user_id) + .filter_map(|room_id| room_id.ok()) + { + match roomuserid_presenceevent.remove(&*[room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat()) { + Ok(_) => removed_events += 1, + Err(e) => error!("An errord occured while removing a stale presence event: {e}") + } + } + } + + info!("Cleaned up {removed_events} stale presence events!"); sleep(period).await; } }); diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs index 7d2520d3..e14b9322 100644 --- a/src/service/rooms/edus/presence/mod.rs +++ b/src/service/rooms/edus/presence/mod.rs @@ -23,7 +23,8 @@ impl Service { }; service.presence_maintain(receiver)?; - + service.presence_cleanup()?; + Ok(service) }