feat(presence): finish presence cleanup task

This commit is contained in:
Jakub Kubík 2022-11-21 21:46:21 +01:00
parent f9d10e8f41
commit 8d161c6a36
No known key found for this signature in database
GPG key ID: D3A0D5D60F3A173F
2 changed files with 45 additions and 6 deletions

View file

@ -1,5 +1,4 @@
use futures_util::{stream::FuturesUnordered, StreamExt};
use ruma::user_id;
use std::{
collections::{hash_map::Entry, HashMap},
mem,
@ -157,8 +156,8 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
let user_timestamp: HashMap<OwnedUserId, u64> = self
.userid_presenceupdate
.iter()
.filter_map(|(user_id_bytes, update_bytes)| {
Some((
.map(|(user_id_bytes, update_bytes)| {
(
UserId::parse(
utils::string_from_bytes(&user_id_bytes)
.expect("UserID bytes are a valid string"),
@ -166,7 +165,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
.expect("UserID bytes from database are a valid UserID"),
PresenceUpdate::from_be_bytes(&update_bytes)
.expect("PresenceUpdate bytes from database are a valid PresenceUpdate"),
))
)
})
.filter_map(|(user_id, presence_update)| {
if presence_update.count <= since
@ -301,10 +300,49 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
let period = Duration::from_secs(services().globals.presence_cleanup_period());
let age_limit = Duration::from_secs(services().globals.presence_cleanup_limit());
let userid_presenceupdate = self.userid_presenceupdate.clone();
let roomuserid_presenceevent = self.roomuserid_presenceevent.clone();
tokio::spawn(async move {
loop {
// TODO: Cleanup
let mut removed_events: u64 = 0;
let age_limit_curr = millis_since_unix_epoch().saturating_sub(age_limit.as_millis() as u64);
for user_id in userid_presenceupdate
.iter()
.map(|(user_id_bytes, update_bytes)| {
(
UserId::parse(
utils::string_from_bytes(&user_id_bytes)
.expect("UserID bytes are a valid string"),
)
.expect("UserID bytes from database are a valid UserID"),
PresenceUpdate::from_be_bytes(&update_bytes)
.expect("PresenceUpdate bytes from database are a valid PresenceUpdate"),
)
})
.filter_map(|(user_id, presence_update)| {
if presence_update.curr_timestamp < age_limit_curr {
return None;
}
Some(user_id)
})
{
for room_id in services()
.rooms
.state_cache
.rooms_joined(&user_id)
.filter_map(|room_id| room_id.ok())
{
match roomuserid_presenceevent.remove(&*[room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat()) {
Ok(_) => removed_events += 1,
Err(e) => error!("An errord occured while removing a stale presence event: {e}")
}
}
}
info!("Cleaned up {removed_events} stale presence events!");
sleep(period).await;
}
});

View file

@ -23,7 +23,8 @@ impl Service {
};
service.presence_maintain(receiver)?;
service.presence_cleanup()?;
Ok(service)
}