feat(presence): start presence timeout implementation

This commit is contained in:
Jakub Kubík 2022-11-17 19:18:28 +01:00
parent 249960b111
commit cd5a83d4e2
No known key found for this signature in database
GPG key ID: D3A0D5D60F3A173F
4 changed files with 61 additions and 13 deletions

View file

@ -1,8 +1,10 @@
use std::collections::HashMap; use futures_util::{stream::FuturesUnordered, StreamExt};
use std::{collections::HashMap, time::Duration};
use ruma::{ use ruma::{
events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId,
}; };
use tokio::{sync::mpsc, time::sleep};
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
@ -109,24 +111,37 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
Ok(hashmap) Ok(hashmap)
} }
/* fn presence_maintain(
fn presence_maintain(&self, db: Arc<TokioRwLock<Database>>) { &self,
// TODO @M0dEx: move this to a timed tasks module mut timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>,
) -> Result<()> {
let mut timers = FuturesUnordered::new();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
select! { tokio::select! {
Some(user_id) = self.presence_timers.next() { Some(_user_id) = timers.next() => {
// TODO @M0dEx: would it be better to acquire the lock outside the loop? // TODO: Handle presence timeouts
let guard = db.read().await; }
Some(user_id) = timer_receiver.recv() => {
// Idle timeout
timers.push(create_presence_timer(Duration::from_secs(60), user_id.clone()));
// TODO @M0dEx: add self.presence_timers // Offline timeout
// TODO @M0dEx: maintain presence timers.push(create_presence_timer(Duration::from_secs(60*15) , user_id));
} }
} }
} }
}); });
Ok(())
} }
*/ }
async fn create_presence_timer(duration: Duration, user_id: Box<UserId>) -> Box<UserId> {
sleep(duration).await;
user_id
} }
fn parse_presence_event(bytes: &[u8]) -> Result<PresenceEvent> { fn parse_presence_event(bytes: &[u8]) -> Result<PresenceEvent> {

View file

@ -62,7 +62,7 @@ impl Services {
auth_chain: rooms::auth_chain::Service { db }, auth_chain: rooms::auth_chain::Service { db },
directory: rooms::directory::Service { db }, directory: rooms::directory::Service { db },
edus: rooms::edus::Service { edus: rooms::edus::Service {
presence: rooms::edus::presence::Service { db }, presence: rooms::edus::presence::Service::build(db)?,
read_receipt: rooms::edus::read_receipt::Service { db }, read_receipt: rooms::edus::read_receipt::Service { db },
typing: rooms::edus::typing::Service { db }, typing: rooms::edus::typing::Service { db },
}, },

View file

@ -2,6 +2,7 @@ use std::collections::HashMap;
use crate::Result; use crate::Result;
use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId};
use tokio::sync::mpsc;
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
/// Adds a presence event which will be saved until a new event replaces it. /// Adds a presence event which will be saved until a new event replaces it.
@ -35,4 +36,7 @@ pub trait Data: Send + Sync {
room_id: &RoomId, room_id: &RoomId,
since: u64, since: u64,
) -> Result<HashMap<OwnedUserId, PresenceEvent>>; ) -> Result<HashMap<OwnedUserId, PresenceEvent>>;
fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>)
-> Result<()>;
} }

View file

@ -3,14 +3,30 @@ use std::collections::HashMap;
pub use data::Data; pub use data::Data;
use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId};
use tokio::sync::mpsc;
use crate::Result; use crate::{Error, Result};
pub struct Service { pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,
// Presence timers
timer_sender: mpsc::UnboundedSender<Box<UserId>>,
} }
impl Service { impl Service {
pub fn build(db: &'static dyn Data) -> Result<Self> {
let (sender, receiver) = mpsc::unbounded_channel();
let service = Self {
db,
timer_sender: sender,
};
service.presence_maintain(receiver)?;
Ok(service)
}
/// Adds a presence event which will be saved until a new event replaces it. /// Adds a presence event which will be saved until a new event replaces it.
/// ///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to /// Note: This method takes a RoomId because presence updates are always bound to rooms to
@ -21,11 +37,17 @@ impl Service {
room_id: &RoomId, room_id: &RoomId,
presence: PresenceEvent, presence: PresenceEvent,
) -> Result<()> { ) -> Result<()> {
self.timer_sender
.send(user_id.into())
.map_err(|_| Error::bad_database("Sender errored out"))?;
self.db.update_presence(user_id, room_id, presence) self.db.update_presence(user_id, room_id, presence)
} }
/// Resets the presence timeout, so the user will stay in their current presence state. /// Resets the presence timeout, so the user will stay in their current presence state.
pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { pub fn ping_presence(&self, user_id: &UserId) -> Result<()> {
self.timer_sender
.send(user_id.into())
.map_err(|_| Error::bad_database("Sender errored out"))?;
self.db.ping_presence(user_id) self.db.ping_presence(user_id)
} }
@ -42,6 +64,13 @@ impl Service {
self.db.get_presence_event(room_id, user_id, last_update) self.db.get_presence_event(room_id, user_id, last_update)
} }
pub fn presence_maintain(
&self,
timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>,
) -> Result<()> {
self.db.presence_maintain(timer_receiver)
}
/* TODO /* TODO
/// Sets all users to offline who have been quiet for too long. /// Sets all users to offline who have been quiet for too long.
fn _presence_maintain( fn _presence_maintain(