13 changed files with 592 additions and 203 deletions
|
@ -38,8 +38,12 @@ max_request_size = 20_000_000 # in bytes
|
|||
# Enables registration. If set to false, no users can register on this server.
|
||||
allow_registration = true
|
||||
|
||||
# Enables federation. If set to false, this server will not federate with others (rooms from other server will not be available).
|
||||
allow_federation = true
|
||||
|
||||
# Enables presence. If set to false, the presence of users (whether they are online, idle or offline) will not be shown or processed.
|
||||
allow_presence = true
|
||||
|
||||
# Enable the display name lightning bolt on registration.
|
||||
enable_lightning_bolt = true
|
||||
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
use crate::{services, utils, Result, Ruma};
|
||||
use ruma::api::client::presence::{get_presence, set_presence};
|
||||
use crate::{services, Result, Ruma};
|
||||
use ruma::{
|
||||
api::client::presence::{get_presence, set_presence},
|
||||
presence::PresenceState,
|
||||
uint,
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
/// # `PUT /_matrix/client/r0/presence/{userId}/status`
|
||||
|
@ -21,16 +25,13 @@ pub async fn set_presence_route(
|
|||
avatar_url: services().users.avatar_url(sender_user)?,
|
||||
currently_active: None,
|
||||
displayname: services().users.displayname(sender_user)?,
|
||||
last_active_ago: Some(
|
||||
utils::millis_since_unix_epoch()
|
||||
.try_into()
|
||||
.expect("time is valid"),
|
||||
),
|
||||
last_active_ago: Some(uint!(0)),
|
||||
presence: body.presence.clone(),
|
||||
status_msg: body.status_msg.clone(),
|
||||
},
|
||||
sender: sender_user.clone(),
|
||||
},
|
||||
true,
|
||||
)?;
|
||||
}
|
||||
|
||||
|
@ -60,7 +61,7 @@ pub async fn get_presence_route(
|
|||
.rooms
|
||||
.edus
|
||||
.presence
|
||||
.get_last_presence_event(sender_user, &room_id)?
|
||||
.get_presence_event(sender_user, &room_id)?
|
||||
{
|
||||
presence_event = Some(presence);
|
||||
break;
|
||||
|
@ -69,7 +70,6 @@ pub async fn get_presence_route(
|
|||
|
||||
if let Some(presence) = presence_event {
|
||||
Ok(get_presence::v3::Response {
|
||||
// TODO: Should ruma just use the presenceeventcontent type here?
|
||||
status_msg: presence.content.status_msg,
|
||||
currently_active: presence.content.currently_active,
|
||||
last_active_ago: presence
|
||||
|
@ -79,6 +79,6 @@ pub async fn get_presence_route(
|
|||
presence: presence.content.presence,
|
||||
})
|
||||
} else {
|
||||
todo!();
|
||||
Ok(get_presence::v3::Response::new(PresenceState::Offline))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,6 +109,7 @@ pub async fn set_displayname_route(
|
|||
},
|
||||
sender: sender_user.clone(),
|
||||
},
|
||||
true,
|
||||
)?;
|
||||
}
|
||||
|
||||
|
@ -244,6 +245,7 @@ pub async fn set_avatar_url_route(
|
|||
},
|
||||
sender: sender_user.clone(),
|
||||
},
|
||||
true,
|
||||
)?;
|
||||
}
|
||||
|
||||
|
|
|
@ -166,7 +166,11 @@ async fn sync_helper(
|
|||
};
|
||||
|
||||
// TODO: match body.set_presence {
|
||||
services().rooms.edus.presence.ping_presence(&sender_user)?;
|
||||
services()
|
||||
.rooms
|
||||
.edus
|
||||
.presence
|
||||
.ping_presence(&sender_user, false, true, true)?;
|
||||
|
||||
// Setup watchers, so if there's no response, we can wait for them
|
||||
let watcher = services().globals.watch(&sender_user, &sender_device);
|
||||
|
|
|
@ -33,6 +33,7 @@ use ruma::{
|
|||
},
|
||||
directory::{IncomingFilter, IncomingRoomNetwork},
|
||||
events::{
|
||||
presence::{PresenceEvent, PresenceEventContent},
|
||||
receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
|
||||
room::{
|
||||
join_rules::{JoinRule, RoomJoinRulesEventContent},
|
||||
|
@ -746,7 +747,34 @@ pub async fn send_transaction_message_route(
|
|||
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
|
||||
{
|
||||
match edu {
|
||||
Edu::Presence(_) => {}
|
||||
Edu::Presence(presence) => {
|
||||
for presence_update in presence.push {
|
||||
let user_id = presence_update.user_id;
|
||||
for room_id in services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.rooms_joined(&user_id)
|
||||
.filter_map(|room_id| room_id.ok())
|
||||
{
|
||||
services().rooms.edus.presence.update_presence(
|
||||
&user_id,
|
||||
&room_id,
|
||||
PresenceEvent {
|
||||
content: PresenceEventContent {
|
||||
avatar_url: services().users.avatar_url(&user_id)?,
|
||||
currently_active: Some(presence_update.currently_active),
|
||||
displayname: services().users.displayname(&user_id)?,
|
||||
last_active_ago: Some(presence_update.last_active_ago),
|
||||
presence: presence_update.presence.clone(),
|
||||
status_msg: presence_update.status_msg.clone(),
|
||||
},
|
||||
sender: user_id.clone(),
|
||||
},
|
||||
true,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Edu::Receipt(receipt) => {
|
||||
for (room_id, room_updates) in receipt.receipts {
|
||||
for (user_id, user_updates) in room_updates.read {
|
||||
|
|
|
@ -76,6 +76,19 @@ pub struct Config {
|
|||
|
||||
pub emergency_password: Option<String>,
|
||||
|
||||
#[serde(default = "true_fn")]
|
||||
pub allow_presence: bool,
|
||||
|
||||
#[serde(default = "default_presence_idle_timeout")]
|
||||
pub presence_idle_timeout: u64,
|
||||
#[serde(default = "default_presence_offline_timeout")]
|
||||
pub presence_offline_timeout: u64,
|
||||
|
||||
#[serde(default = "default_presence_cleanup_period")]
|
||||
pub presence_cleanup_period: u64,
|
||||
#[serde(default = "default_presence_cleanup_limit")]
|
||||
pub presence_cleanup_limit: u64,
|
||||
|
||||
#[serde(flatten)]
|
||||
pub catchall: BTreeMap<String, IgnoredAny>,
|
||||
}
|
||||
|
@ -257,6 +270,22 @@ fn default_turn_ttl() -> u64 {
|
|||
60 * 60 * 24
|
||||
}
|
||||
|
||||
fn default_presence_idle_timeout() -> u64 {
|
||||
60
|
||||
}
|
||||
|
||||
fn default_presence_offline_timeout() -> u64 {
|
||||
30 * 60
|
||||
}
|
||||
|
||||
fn default_presence_cleanup_period() -> u64 {
|
||||
24 * 60 * 60
|
||||
}
|
||||
|
||||
fn default_presence_cleanup_limit() -> u64 {
|
||||
24 * 60 * 60
|
||||
}
|
||||
|
||||
// I know, it's a great name
|
||||
pub fn default_default_room_version() -> RoomVersionId {
|
||||
RoomVersionId::V9
|
||||
|
|
|
@ -1,10 +1,53 @@
|
|||
use std::collections::HashMap;
|
||||
use futures_util::{stream::FuturesUnordered, StreamExt};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
mem,
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::{error, info};
|
||||
|
||||
use ruma::{
|
||||
events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId,
|
||||
};
|
||||
use tokio::{sync::mpsc, time::sleep};
|
||||
|
||||
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
|
||||
use crate::{
|
||||
database::KeyValueDatabase,
|
||||
service::{self, rooms::edus::presence::PresenceIter},
|
||||
services, utils,
|
||||
utils::{millis_since_unix_epoch, u64_from_bytes},
|
||||
Error, Result,
|
||||
};
|
||||
|
||||
pub struct PresenceUpdate {
|
||||
count: u64,
|
||||
prev_timestamp: u64,
|
||||
curr_timestamp: u64,
|
||||
}
|
||||
|
||||
impl PresenceUpdate {
|
||||
fn to_be_bytes(&self) -> Vec<u8> {
|
||||
[
|
||||
self.count.to_be_bytes(),
|
||||
self.prev_timestamp.to_be_bytes(),
|
||||
self.curr_timestamp.to_be_bytes(),
|
||||
]
|
||||
.concat()
|
||||
}
|
||||
|
||||
fn from_be_bytes(bytes: &[u8]) -> Result<Self> {
|
||||
let (count_bytes, timestamps_bytes) = bytes.split_at(mem::size_of::<u64>());
|
||||
let (prev_timestamp_bytes, curr_timestamp_bytes) =
|
||||
timestamps_bytes.split_at(mem::size_of::<u64>());
|
||||
Ok(Self {
|
||||
count: u64_from_bytes(count_bytes).expect("count bytes from DB are valid"),
|
||||
prev_timestamp: u64_from_bytes(prev_timestamp_bytes)
|
||||
.expect("timestamp bytes from DB are valid"),
|
||||
curr_timestamp: u64_from_bytes(curr_timestamp_bytes)
|
||||
.expect("timestamp bytes from DB are valid"),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl service::rooms::edus::presence::Data for KeyValueDatabase {
|
||||
fn update_presence(
|
||||
|
@ -13,45 +56,82 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
|
|||
room_id: &RoomId,
|
||||
presence: PresenceEvent,
|
||||
) -> Result<()> {
|
||||
// TODO: Remove old entry? Or maybe just wipe completely from time to time?
|
||||
let roomuser_id = [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat();
|
||||
|
||||
let count = services().globals.next_count()?.to_be_bytes();
|
||||
|
||||
let mut presence_id = room_id.as_bytes().to_vec();
|
||||
presence_id.push(0xff);
|
||||
presence_id.extend_from_slice(&count);
|
||||
presence_id.push(0xff);
|
||||
presence_id.extend_from_slice(presence.sender.as_bytes());
|
||||
|
||||
self.presenceid_presence.insert(
|
||||
&presence_id,
|
||||
&serde_json::to_vec(&presence).expect("PresenceEvent can be serialized"),
|
||||
self.roomuserid_presenceevent.insert(
|
||||
&roomuser_id,
|
||||
&serde_json::to_vec(&presence).expect("presence event from DB is valid"),
|
||||
)?;
|
||||
|
||||
self.userid_lastpresenceupdate.insert(
|
||||
let timestamp = match presence.content.last_active_ago {
|
||||
Some(active_ago) => millis_since_unix_epoch().saturating_sub(active_ago.into()),
|
||||
None => millis_since_unix_epoch(),
|
||||
};
|
||||
|
||||
self.userid_presenceupdate.insert(
|
||||
user_id.as_bytes(),
|
||||
&utils::millis_since_unix_epoch().to_be_bytes(),
|
||||
&PresenceUpdate {
|
||||
count: services().globals.next_count()?,
|
||||
prev_timestamp: timestamp,
|
||||
curr_timestamp: timestamp,
|
||||
}
|
||||
.to_be_bytes(),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ping_presence(&self, user_id: &UserId) -> Result<()> {
|
||||
self.userid_lastpresenceupdate.insert(
|
||||
user_id.as_bytes(),
|
||||
&utils::millis_since_unix_epoch().to_be_bytes(),
|
||||
)?;
|
||||
fn ping_presence(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
update_count: bool,
|
||||
update_timestamp: bool,
|
||||
) -> Result<()> {
|
||||
let now = millis_since_unix_epoch();
|
||||
|
||||
let presence = self
|
||||
.userid_presenceupdate
|
||||
.get(user_id.as_bytes())?
|
||||
.map(|presence_bytes| PresenceUpdate::from_be_bytes(&presence_bytes))
|
||||
.transpose()?;
|
||||
|
||||
let new_presence = match presence {
|
||||
Some(presence) => PresenceUpdate {
|
||||
count: if update_count {
|
||||
services().globals.next_count()?
|
||||
} else {
|
||||
presence.count
|
||||
},
|
||||
prev_timestamp: if update_timestamp {
|
||||
presence.curr_timestamp
|
||||
} else {
|
||||
presence.prev_timestamp
|
||||
},
|
||||
curr_timestamp: if update_timestamp {
|
||||
now
|
||||
} else {
|
||||
presence.curr_timestamp
|
||||
},
|
||||
},
|
||||
None => PresenceUpdate {
|
||||
count: services().globals.current_count()?,
|
||||
prev_timestamp: now,
|
||||
curr_timestamp: now,
|
||||
},
|
||||
};
|
||||
|
||||
self.userid_presenceupdate
|
||||
.insert(user_id.as_bytes(), &new_presence.to_be_bytes())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> {
|
||||
self.userid_lastpresenceupdate
|
||||
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<(u64, u64)>> {
|
||||
self.userid_presenceupdate
|
||||
.get(user_id.as_bytes())?
|
||||
.map(|bytes| {
|
||||
utils::u64_from_bytes(&bytes).map_err(|_| {
|
||||
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
|
||||
})
|
||||
PresenceUpdate::from_be_bytes(&bytes)
|
||||
.map(|update| (update.prev_timestamp, update.curr_timestamp))
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
|
@ -60,93 +140,268 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
|
|||
&self,
|
||||
room_id: &RoomId,
|
||||
user_id: &UserId,
|
||||
count: u64,
|
||||
presence_timestamp: u64,
|
||||
) -> Result<Option<PresenceEvent>> {
|
||||
let mut presence_id = room_id.as_bytes().to_vec();
|
||||
presence_id.push(0xff);
|
||||
presence_id.extend_from_slice(&count.to_be_bytes());
|
||||
presence_id.push(0xff);
|
||||
presence_id.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
self.presenceid_presence
|
||||
.get(&presence_id)?
|
||||
.map(|value| parse_presence_event(&value))
|
||||
let roomuser_id = [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat();
|
||||
self.roomuserid_presenceevent
|
||||
.get(&roomuser_id)?
|
||||
.map(|value| parse_presence_event(&value, presence_timestamp))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
fn presence_since(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
since: u64,
|
||||
) -> Result<HashMap<OwnedUserId, PresenceEvent>> {
|
||||
let mut prefix = room_id.as_bytes().to_vec();
|
||||
prefix.push(0xff);
|
||||
|
||||
let mut first_possible_edu = prefix.clone();
|
||||
first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since
|
||||
let mut hashmap = HashMap::new();
|
||||
|
||||
for (key, value) in self
|
||||
.presenceid_presence
|
||||
.iter_from(&first_possible_edu, false)
|
||||
.take_while(|(key, _)| key.starts_with(&prefix))
|
||||
fn presence_since<'a>(&'a self, room_id: &RoomId, since: u64) -> Result<PresenceIter<'a>> {
|
||||
let user_timestamp: HashMap<OwnedUserId, u64> = self
|
||||
.userid_presenceupdate
|
||||
.iter()
|
||||
.map(|(user_id_bytes, update_bytes)| {
|
||||
(
|
||||
UserId::parse(
|
||||
utils::string_from_bytes(&user_id_bytes)
|
||||
.expect("UserID bytes are a valid string"),
|
||||
)
|
||||
.expect("UserID bytes from database are a valid UserID"),
|
||||
PresenceUpdate::from_be_bytes(&update_bytes)
|
||||
.expect("PresenceUpdate bytes from database are a valid PresenceUpdate"),
|
||||
)
|
||||
})
|
||||
.filter_map(|(user_id, presence_update)| {
|
||||
if presence_update.count <= since
|
||||
|| !services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.is_joined(&user_id, room_id)
|
||||
.ok()?
|
||||
{
|
||||
let user_id = UserId::parse(
|
||||
utils::string_from_bytes(
|
||||
key.rsplit(|&b| b == 0xff)
|
||||
.next()
|
||||
.expect("rsplit always returns an element"),
|
||||
)
|
||||
.map_err(|_| Error::bad_database("Invalid UserId bytes in presenceid_presence."))?,
|
||||
)
|
||||
.map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?;
|
||||
|
||||
let presence = parse_presence_event(&value)?;
|
||||
|
||||
hashmap.insert(user_id, presence);
|
||||
return None;
|
||||
}
|
||||
|
||||
Ok(hashmap)
|
||||
Some((user_id, presence_update.curr_timestamp))
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Box::new(
|
||||
self.roomuserid_presenceevent
|
||||
.scan_prefix(room_id.as_bytes().to_vec())
|
||||
.filter_map(move |(roomuserid_bytes, presence_bytes)| {
|
||||
let user_id_bytes = roomuserid_bytes.split(|byte| *byte == 0xff).last()?;
|
||||
let user_id: OwnedUserId = UserId::parse(
|
||||
utils::string_from_bytes(user_id_bytes)
|
||||
.expect("UserID bytes are a valid string"),
|
||||
)
|
||||
.expect("UserID bytes from database are a valid UserID");
|
||||
|
||||
let timestamp = user_timestamp.get(&user_id)?;
|
||||
let presence_event = parse_presence_event(&presence_bytes, *timestamp)
|
||||
.expect("PresenceEvent bytes from database are a valid PresenceEvent");
|
||||
|
||||
Some((user_id, presence_event))
|
||||
}),
|
||||
))
|
||||
}
|
||||
|
||||
/*
|
||||
fn presence_maintain(&self, db: Arc<TokioRwLock<Database>>) {
|
||||
// TODO @M0dEx: move this to a timed tasks module
|
||||
fn presence_maintain(
|
||||
&self,
|
||||
mut timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
|
||||
) -> Result<()> {
|
||||
let mut timers = FuturesUnordered::new();
|
||||
let mut timers_timestamp: HashMap<OwnedUserId, u64> = HashMap::new();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
select! {
|
||||
Some(user_id) = self.presence_timers.next() {
|
||||
// TODO @M0dEx: would it be better to acquire the lock outside the loop?
|
||||
let guard = db.read().await;
|
||||
// Wait for services to be created
|
||||
sleep(Duration::from_secs(15)).await;
|
||||
|
||||
// TODO @M0dEx: add self.presence_timers
|
||||
// TODO @M0dEx: maintain presence
|
||||
if !services().globals.allow_presence() {
|
||||
return;
|
||||
}
|
||||
|
||||
let idle_timeout = Duration::from_secs(services().globals.presence_idle_timeout());
|
||||
let offline_timeout =
|
||||
Duration::from_secs(services().globals.presence_offline_timeout());
|
||||
|
||||
// TODO: Get rid of this hack (hinting correct types to rustc)
|
||||
timers.push(create_presence_timer(
|
||||
idle_timeout,
|
||||
UserId::parse_with_server_name("conduit", services().globals.server_name())
|
||||
.expect("Conduit user always exists"),
|
||||
));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(user_id) = timers.next() => {
|
||||
info!("Processing timer for user '{}' ({})", user_id.clone(), timers.len());
|
||||
let (prev_timestamp, curr_timestamp) = match services().rooms.edus.presence.last_presence_update(&user_id) {
|
||||
Ok(timestamp_tuple) => match timestamp_tuple {
|
||||
Some(timestamp_tuple) => timestamp_tuple,
|
||||
None => continue,
|
||||
},
|
||||
Err(e) => {
|
||||
error!("{e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let prev_presence_state = determine_presence_state(prev_timestamp);
|
||||
let curr_presence_state = determine_presence_state(curr_timestamp);
|
||||
|
||||
// Continue if there is no change in state
|
||||
if prev_presence_state == curr_presence_state {
|
||||
continue;
|
||||
}
|
||||
|
||||
match services().rooms.edus.presence.ping_presence(&user_id, true, false, false) {
|
||||
Ok(_) => (),
|
||||
Err(e) => error!("{e}")
|
||||
}
|
||||
|
||||
// TODO: Notify federation sender
|
||||
}
|
||||
Some(user_id) = timer_receiver.recv() => {
|
||||
let now = millis_since_unix_epoch();
|
||||
// Do not create timers if we added timers recently
|
||||
let should_send = match timers_timestamp.entry(user_id.to_owned()) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if now - entry.get() > 15 * 1000 {
|
||||
entry.insert(now);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
},
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(now);
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if !should_send {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Idle timeout
|
||||
timers.push(create_presence_timer(idle_timeout, user_id.clone()));
|
||||
|
||||
// Offline timeout
|
||||
timers.push(create_presence_timer(offline_timeout, user_id.clone()));
|
||||
|
||||
info!("Added timers for user '{}' ({})", user_id, timers.len());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
*/
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_presence_event(bytes: &[u8]) -> Result<PresenceEvent> {
|
||||
fn presence_cleanup(&self) -> Result<()> {
|
||||
let userid_presenceupdate = self.userid_presenceupdate.clone();
|
||||
let roomuserid_presenceevent = self.roomuserid_presenceevent.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Wait for services to be created
|
||||
sleep(Duration::from_secs(15)).await;
|
||||
|
||||
if !services().globals.allow_presence() {
|
||||
return;
|
||||
}
|
||||
|
||||
let period = Duration::from_secs(services().globals.presence_cleanup_period());
|
||||
let age_limit = Duration::from_secs(services().globals.presence_cleanup_limit());
|
||||
|
||||
loop {
|
||||
let mut removed_events: u64 = 0;
|
||||
let age_limit_curr =
|
||||
millis_since_unix_epoch().saturating_sub(age_limit.as_millis() as u64);
|
||||
|
||||
for user_id in userid_presenceupdate
|
||||
.iter()
|
||||
.map(|(user_id_bytes, update_bytes)| {
|
||||
(
|
||||
UserId::parse(
|
||||
utils::string_from_bytes(&user_id_bytes)
|
||||
.expect("UserID bytes are a valid string"),
|
||||
)
|
||||
.expect("UserID bytes from database are a valid UserID"),
|
||||
PresenceUpdate::from_be_bytes(&update_bytes).expect(
|
||||
"PresenceUpdate bytes from database are a valid PresenceUpdate",
|
||||
),
|
||||
)
|
||||
})
|
||||
.filter_map(|(user_id, presence_update)| {
|
||||
if presence_update.curr_timestamp < age_limit_curr {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(user_id)
|
||||
})
|
||||
{
|
||||
match userid_presenceupdate.remove(user_id.as_bytes()) {
|
||||
Ok(_) => (),
|
||||
Err(e) => {
|
||||
error!("An errord occured while removing a stale presence update: {e}")
|
||||
}
|
||||
}
|
||||
|
||||
for room_id in services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.rooms_joined(&user_id)
|
||||
.filter_map(|room_id| room_id.ok())
|
||||
{
|
||||
match roomuserid_presenceevent
|
||||
.remove(&[room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat())
|
||||
{
|
||||
Ok(_) => removed_events += 1,
|
||||
Err(e) => error!(
|
||||
"An errord occured while removing a stale presence event: {e}"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Cleaned up {removed_events} stale presence events!");
|
||||
sleep(period).await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_presence_timer(duration: Duration, user_id: OwnedUserId) -> OwnedUserId {
|
||||
sleep(duration).await;
|
||||
|
||||
user_id
|
||||
}
|
||||
|
||||
fn parse_presence_event(bytes: &[u8], presence_timestamp: u64) -> Result<PresenceEvent> {
|
||||
let mut presence: PresenceEvent = serde_json::from_slice(bytes)
|
||||
.map_err(|_| Error::bad_database("Invalid presence event in db."))?;
|
||||
|
||||
let current_timestamp: UInt = utils::millis_since_unix_epoch()
|
||||
.try_into()
|
||||
.expect("time is valid");
|
||||
|
||||
if presence.content.presence == PresenceState::Online {
|
||||
// Don't set last_active_ago when the user is online
|
||||
presence.content.last_active_ago = None;
|
||||
} else {
|
||||
// Convert from timestamp to duration
|
||||
presence.content.last_active_ago = presence
|
||||
.content
|
||||
.last_active_ago
|
||||
.map(|timestamp| current_timestamp - timestamp);
|
||||
}
|
||||
translate_active_ago(&mut presence, presence_timestamp);
|
||||
|
||||
Ok(presence)
|
||||
}
|
||||
|
||||
fn determine_presence_state(last_active_ago: u64) -> PresenceState {
|
||||
let globals = &services().globals;
|
||||
|
||||
if last_active_ago < globals.presence_idle_timeout() * 1000 {
|
||||
PresenceState::Online
|
||||
} else if last_active_ago < globals.presence_offline_timeout() * 1000 {
|
||||
PresenceState::Unavailable
|
||||
} else {
|
||||
PresenceState::Offline
|
||||
}
|
||||
}
|
||||
|
||||
/// Translates the timestamp representing last_active_ago to a diff from now.
|
||||
fn translate_active_ago(presence_event: &mut PresenceEvent, last_active_ts: u64) {
|
||||
let last_active_ago = millis_since_unix_epoch().saturating_sub(last_active_ts);
|
||||
|
||||
presence_event.content.presence = determine_presence_state(last_active_ago);
|
||||
|
||||
presence_event.content.last_active_ago = match presence_event.content.presence {
|
||||
PresenceState::Online => None,
|
||||
_ => Some(UInt::new_saturating(last_active_ago)),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,8 +65,8 @@ pub struct KeyValueDatabase {
|
|||
pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count
|
||||
pub(super) typingid_userid: Arc<dyn KvTree>, // TypingId = RoomId + TimeoutTime + Count
|
||||
pub(super) roomid_lasttypingupdate: Arc<dyn KvTree>, // LastRoomTypingUpdate = Count
|
||||
pub(super) presenceid_presence: Arc<dyn KvTree>, // PresenceId = RoomId + Count + UserId
|
||||
pub(super) userid_lastpresenceupdate: Arc<dyn KvTree>, // LastPresenceUpdate = Count
|
||||
pub(super) userid_presenceupdate: Arc<dyn KvTree>, // PresenceUpdate = Count + Timestamp
|
||||
pub(super) roomuserid_presenceevent: Arc<dyn KvTree>, // PresenceEvent
|
||||
|
||||
//pub rooms: rooms::Rooms,
|
||||
pub(super) pduid_pdu: Arc<dyn KvTree>, // PduId = ShortRoomId + Count
|
||||
|
@ -288,8 +288,8 @@ impl KeyValueDatabase {
|
|||
.open_tree("roomuserid_lastprivatereadupdate")?,
|
||||
typingid_userid: builder.open_tree("typingid_userid")?,
|
||||
roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?,
|
||||
presenceid_presence: builder.open_tree("presenceid_presence")?,
|
||||
userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?,
|
||||
userid_presenceupdate: builder.open_tree("userid_presenceupdate")?,
|
||||
roomuserid_presenceevent: builder.open_tree("roomuserid_presenceevent")?,
|
||||
pduid_pdu: builder.open_tree("pduid_pdu")?,
|
||||
eventid_pduid: builder.open_tree("eventid_pduid")?,
|
||||
roomid_pduleaves: builder.open_tree("roomid_pduleaves")?,
|
||||
|
@ -825,9 +825,6 @@ impl KeyValueDatabase {
|
|||
);
|
||||
}
|
||||
|
||||
// This data is probably outdated
|
||||
db.presenceid_presence.clear()?;
|
||||
|
||||
services().admin.start_handler();
|
||||
|
||||
// Set emergency access for the conduit user
|
||||
|
|
|
@ -286,6 +286,26 @@ impl Service {
|
|||
&self.config.emergency_password
|
||||
}
|
||||
|
||||
pub fn allow_presence(&self) -> bool {
|
||||
self.config.allow_presence
|
||||
}
|
||||
|
||||
pub fn presence_idle_timeout(&self) -> u64 {
|
||||
self.config.presence_idle_timeout
|
||||
}
|
||||
|
||||
pub fn presence_offline_timeout(&self) -> u64 {
|
||||
self.config.presence_offline_timeout
|
||||
}
|
||||
|
||||
pub fn presence_cleanup_period(&self) -> u64 {
|
||||
self.config.presence_cleanup_period
|
||||
}
|
||||
|
||||
pub fn presence_cleanup_limit(&self) -> u64 {
|
||||
self.config.presence_cleanup_limit
|
||||
}
|
||||
|
||||
pub fn supported_room_versions(&self) -> Vec<RoomVersionId> {
|
||||
let mut room_versions: Vec<RoomVersionId> = vec![];
|
||||
room_versions.extend(self.stable_room_versions.clone());
|
||||
|
|
|
@ -62,7 +62,7 @@ impl Services {
|
|||
auth_chain: rooms::auth_chain::Service { db },
|
||||
directory: rooms::directory::Service { db },
|
||||
edus: rooms::edus::Service {
|
||||
presence: rooms::edus::presence::Service { db },
|
||||
presence: rooms::edus::presence::Service::build(db)?,
|
||||
read_receipt: rooms::edus::read_receipt::Service { db },
|
||||
typing: rooms::edus::typing::Service { db },
|
||||
},
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use crate::Result;
|
||||
use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use super::PresenceIter;
|
||||
|
||||
pub trait Data: Send + Sync {
|
||||
/// Adds a presence event which will be saved until a new event replaces it.
|
||||
|
@ -16,23 +17,29 @@ pub trait Data: Send + Sync {
|
|||
) -> Result<()>;
|
||||
|
||||
/// Resets the presence timeout, so the user will stay in their current presence state.
|
||||
fn ping_presence(&self, user_id: &UserId) -> Result<()>;
|
||||
fn ping_presence(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
update_count: bool,
|
||||
update_timestamp: bool,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Returns the timestamp of the last presence update of this user in millis since the unix epoch.
|
||||
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>>;
|
||||
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<(u64, u64)>>;
|
||||
|
||||
/// Returns the presence event with correct last_active_ago.
|
||||
fn get_presence_event(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
user_id: &UserId,
|
||||
count: u64,
|
||||
presence_timestamp: u64,
|
||||
) -> Result<Option<PresenceEvent>>;
|
||||
|
||||
/// Returns the most recent presence updates that happened after the event with id `since`.
|
||||
fn presence_since(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
since: u64,
|
||||
) -> Result<HashMap<OwnedUserId, PresenceEvent>>;
|
||||
fn presence_since<'a>(&'a self, room_id: &RoomId, since: u64) -> Result<PresenceIter<'a>>;
|
||||
|
||||
fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>)
|
||||
-> Result<()>;
|
||||
|
||||
fn presence_cleanup(&self) -> Result<()>;
|
||||
}
|
||||
|
|
|
@ -1,16 +1,55 @@
|
|||
mod data;
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub use data::Data;
|
||||
use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::Result;
|
||||
use crate::{services, Error, Result};
|
||||
|
||||
pub(crate) type PresenceIter<'a> = Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)> + 'a>;
|
||||
|
||||
pub struct Service {
|
||||
pub db: &'static dyn Data,
|
||||
|
||||
// Presence timers
|
||||
timer_sender: mpsc::UnboundedSender<OwnedUserId>,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
/// Builds the service and initialized the presence_maintain task
|
||||
pub fn build(db: &'static dyn Data) -> Result<Self> {
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
let service = Self {
|
||||
db,
|
||||
timer_sender: sender,
|
||||
};
|
||||
|
||||
service.presence_maintain(receiver)?;
|
||||
service.presence_cleanup()?;
|
||||
|
||||
Ok(service)
|
||||
}
|
||||
|
||||
/// Resets the presence timeout, so the user will stay in their current presence state.
|
||||
pub fn ping_presence(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
update_count: bool,
|
||||
update_timestamp: bool,
|
||||
spawn_timer: bool,
|
||||
) -> Result<()> {
|
||||
if !services().globals.allow_presence() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if spawn_timer {
|
||||
self.spawn_timer(user_id)?;
|
||||
}
|
||||
|
||||
self.db
|
||||
.ping_presence(user_id, update_count, update_timestamp)
|
||||
}
|
||||
|
||||
/// Adds a presence event which will be saved until a new event replaces it.
|
||||
///
|
||||
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
|
||||
|
@ -20,103 +59,78 @@ impl Service {
|
|||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
presence: PresenceEvent,
|
||||
spawn_timer: bool,
|
||||
) -> Result<()> {
|
||||
if !services().globals.allow_presence() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if spawn_timer {
|
||||
self.spawn_timer(user_id)?;
|
||||
}
|
||||
|
||||
self.db.update_presence(user_id, room_id, presence)
|
||||
}
|
||||
|
||||
/// Resets the presence timeout, so the user will stay in their current presence state.
|
||||
pub fn ping_presence(&self, user_id: &UserId) -> Result<()> {
|
||||
self.db.ping_presence(user_id)
|
||||
/// Returns the timestamp of when the presence was last updated for the specified user.
|
||||
pub fn last_presence_update(&self, user_id: &UserId) -> Result<Option<(u64, u64)>> {
|
||||
if !services().globals.allow_presence() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
pub fn get_last_presence_event(
|
||||
self.db.last_presence_update(user_id)
|
||||
}
|
||||
|
||||
/// Returns the saved presence event for this user with actual last_active_ago.
|
||||
pub fn get_presence_event(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
) -> Result<Option<PresenceEvent>> {
|
||||
if !services().globals.allow_presence() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let last_update = match self.db.last_presence_update(user_id)? {
|
||||
Some(last) => last,
|
||||
Some(last) => last.1,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
self.db.get_presence_event(room_id, user_id, last_update)
|
||||
}
|
||||
|
||||
/* TODO
|
||||
/// Sets all users to offline who have been quiet for too long.
|
||||
fn _presence_maintain(
|
||||
&self,
|
||||
rooms: &super::Rooms,
|
||||
globals: &super::super::globals::Globals,
|
||||
) -> Result<()> {
|
||||
let current_timestamp = utils::millis_since_unix_epoch();
|
||||
|
||||
for (user_id_bytes, last_timestamp) in self
|
||||
.userid_lastpresenceupdate
|
||||
.iter()
|
||||
.filter_map(|(k, bytes)| {
|
||||
Some((
|
||||
k,
|
||||
utils::u64_from_bytes(&bytes)
|
||||
.map_err(|_| {
|
||||
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
|
||||
})
|
||||
.ok()?,
|
||||
))
|
||||
})
|
||||
.take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000)
|
||||
// 5 Minutes
|
||||
{
|
||||
// Send new presence events to set the user offline
|
||||
let count = globals.next_count()?.to_be_bytes();
|
||||
let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes)
|
||||
.map_err(|_| {
|
||||
Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.")
|
||||
})?
|
||||
.try_into()
|
||||
.map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?;
|
||||
for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) {
|
||||
let mut presence_id = room_id.as_bytes().to_vec();
|
||||
presence_id.push(0xff);
|
||||
presence_id.extend_from_slice(&count);
|
||||
presence_id.push(0xff);
|
||||
presence_id.extend_from_slice(&user_id_bytes);
|
||||
|
||||
self.presenceid_presence.insert(
|
||||
&presence_id,
|
||||
&serde_json::to_vec(&PresenceEvent {
|
||||
content: PresenceEventContent {
|
||||
avatar_url: None,
|
||||
currently_active: None,
|
||||
displayname: None,
|
||||
last_active_ago: Some(
|
||||
last_timestamp.try_into().expect("time is valid"),
|
||||
),
|
||||
presence: PresenceState::Offline,
|
||||
status_msg: None,
|
||||
},
|
||||
sender: user_id.to_owned(),
|
||||
})
|
||||
.expect("PresenceEvent can be serialized"),
|
||||
)?;
|
||||
}
|
||||
|
||||
self.userid_lastpresenceupdate.insert(
|
||||
user_id.as_bytes(),
|
||||
&utils::millis_since_unix_epoch().to_be_bytes(),
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}*/
|
||||
|
||||
/// Returns the most recent presence updates that happened after the event with id `since`.
|
||||
#[tracing::instrument(skip(self, since, room_id))]
|
||||
pub fn presence_since(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
since: u64,
|
||||
) -> Result<HashMap<OwnedUserId, PresenceEvent>> {
|
||||
pub fn presence_since<'a>(&'a self, room_id: &RoomId, since: u64) -> Result<PresenceIter<'a>> {
|
||||
if !services().globals.allow_presence() {
|
||||
return Ok(Box::new(std::iter::empty()));
|
||||
}
|
||||
|
||||
self.db.presence_since(room_id, since)
|
||||
}
|
||||
|
||||
/// Spawns a task maintaining presence data
|
||||
fn presence_maintain(
|
||||
&self,
|
||||
timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
|
||||
) -> Result<()> {
|
||||
self.db.presence_maintain(timer_receiver)
|
||||
}
|
||||
|
||||
fn presence_cleanup(&self) -> Result<()> {
|
||||
self.db.presence_cleanup()
|
||||
}
|
||||
|
||||
/// Spawns a timer for the user used by the maintenance task
|
||||
fn spawn_timer(&self, user_id: &UserId) -> Result<()> {
|
||||
if !services().globals.allow_presence() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.timer_sender
|
||||
.send(user_id.into())
|
||||
.map_err(|_| Error::bad_database("Sender errored out"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,8 @@ use ruma::{
|
|||
federation::{
|
||||
self,
|
||||
transactions::edu::{
|
||||
DeviceListUpdateContent, Edu, ReceiptContent, ReceiptData, ReceiptMap,
|
||||
DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent,
|
||||
ReceiptData, ReceiptMap,
|
||||
},
|
||||
},
|
||||
OutgoingRequest,
|
||||
|
@ -283,6 +284,34 @@ impl Service {
|
|||
.filter(|user_id| user_id.server_name() == services().globals.server_name()),
|
||||
);
|
||||
|
||||
// Look for presence updates in this room
|
||||
let presence_updates: Vec<PresenceUpdate> = services()
|
||||
.rooms
|
||||
.edus
|
||||
.presence
|
||||
.presence_since(&room_id, since)?
|
||||
.filter(|(user_id, _)| user_id.server_name() == services().globals.server_name())
|
||||
.map(|(user_id, presence_event)| PresenceUpdate {
|
||||
user_id,
|
||||
presence: presence_event.content.presence,
|
||||
status_msg: presence_event.content.status_msg,
|
||||
last_active_ago: presence_event
|
||||
.content
|
||||
.last_active_ago
|
||||
.unwrap_or_else(|| uint!(0)),
|
||||
currently_active: presence_event.content.currently_active.unwrap_or(false),
|
||||
})
|
||||
.collect();
|
||||
|
||||
let presence_content = PresenceContent {
|
||||
push: presence_updates,
|
||||
};
|
||||
|
||||
events.push(
|
||||
serde_json::to_vec(&Edu::Presence(presence_content))
|
||||
.expect("presence json can be serialized"),
|
||||
);
|
||||
|
||||
// Look for read receipts in this room
|
||||
for r in services()
|
||||
.rooms
|
||||
|
|
Loading…
Reference in a new issue