improvement: bundle typing events and only send on changes

Fixes #67 and #49
This commit is contained in:
timokoesters 2020-06-04 11:17:36 +02:00
parent 8328eeb5ac
commit 168f2281fd
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
4 changed files with 141 additions and 98 deletions

View file

@ -4,6 +4,7 @@ use std::{
time::{Duration, SystemTime},
};
use crate::{utils, Database, MatrixResult, Ruma};
use log::{debug, warn};
use rocket::{delete, get, options, post, put, State};
use ruma_client_api::{
@ -55,7 +56,6 @@ use ruma_events::{
};
use ruma_identifiers::{DeviceId, RoomAliasId, RoomId, RoomVersionId, UserId};
use serde_json::{json, value::RawValue};
use crate::{utils, Database, MatrixResult, Ruma};
const GUEST_NAME_LENGTH: usize = 10;
const DEVICE_ID_LENGTH: usize = 10;
@ -921,18 +921,12 @@ pub fn create_typing_event_route(
_user_id: String,
) -> MatrixResult<create_typing_event::Response> {
let user_id = body.user_id.as_ref().expect("user is authenticated");
let edu = EduEvent::Typing(ruma_events::typing::TypingEvent {
content: ruma_events::typing::TypingEventContent {
user_ids: vec![user_id.clone()],
},
room_id: None, // None because it can be inferred
});
if body.typing {
db.rooms
.edus
.roomactive_add(
edu,
&user_id,
&body.room_id,
body.timeout.map(|d| d.as_millis() as u64).unwrap_or(30000)
+ utils::millis_since_unix_epoch().try_into().unwrap_or(0),
@ -940,7 +934,10 @@ pub fn create_typing_event_route(
)
.unwrap();
} else {
db.rooms.edus.roomactive_remove(edu, &body.room_id).unwrap();
db.rooms
.edus
.roomactive_remove(&user_id, &body.room_id, &db.globals)
.unwrap();
}
MatrixResult(Ok(create_typing_event::Response))
@ -2083,30 +2080,23 @@ pub fn sync_route(
let mut edus = db
.rooms
.edus
.roomactives_all(&room_id)
.roomlatests_since(&room_id, since)
.unwrap()
.map(|r| r.unwrap())
.collect::<Vec<_>>();
if edus.is_empty() {
edus.push(
EduEvent::Typing(ruma_events::typing::TypingEvent {
content: ruma_events::typing::TypingEventContent {
user_ids: Vec::new(),
},
room_id: None, // None because it can be inferred
})
.into(),
);
if db
.rooms
.edus
.last_roomactive_update(&room_id, &db.globals)
.unwrap()
> since
{
edus.push(serde_json::from_str(&serde_json::to_string(
&EduEvent::Typing(db.rooms.edus.roomactives_all(&room_id).unwrap()),
).unwrap()).unwrap());
}
edus.extend(
db.rooms
.edus
.roomlatests_since(&room_id, since)
.unwrap()
.map(|r| r.unwrap()),
);
joined_rooms.insert(
room_id.clone().try_into().unwrap(),
sync_events::JoinedRoom {
@ -2173,7 +2163,17 @@ pub fn sync_route(
.map(|r| r.unwrap())
.collect::<Vec<_>>();
edus.extend(db.rooms.edus.roomactives_all(&room_id).map(|r| r.unwrap()));
if db
.rooms
.edus
.last_roomactive_update(&room_id, &db.globals)
.unwrap()
> since
{
edus.push(serde_json::from_str(&serde_json::to_string(
&EduEvent::Typing(db.rooms.edus.roomactives_all(&room_id).unwrap()),
).unwrap()).unwrap());
}
left_rooms.insert(
room_id.clone().try_into().unwrap(),
@ -2324,7 +2324,6 @@ pub fn get_message_events_route(
#[get("/_matrix/client/r0/voip/turnServer")]
pub fn turn_server_route() -> MatrixResult<create_message_event::Response> {
warn!("TODO: turn_server_route");
MatrixResult(Err(Error {
kind: ErrorKind::NotFound,
message: "There is no turn server yet.".to_owned(),

View file

@ -70,7 +70,10 @@ impl Database {
edus: rooms::RoomEdus {
roomuserid_lastread: db.open_tree("roomuserid_lastread").unwrap(), // "Private" read receipt
roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest").unwrap(), // Read receipts
roomactiveid_roomactive: db.open_tree("roomactiveid_roomactive").unwrap(), // Typing notifs
roomactiveid_userid: db.open_tree("roomactiveid_userid").unwrap(), // Typing notifs
roomid_lastroomactiveupdate: db
.open_tree("roomid_lastroomactiveupdate")
.unwrap(),
},
pduid_pdu: db.open_tree("pduid_pdu").unwrap(),
eventid_pduid: db.open_tree("eventid_pduid").unwrap(),

View file

@ -52,31 +52,6 @@ impl Rooms {
.is_some())
}
// TODO: Remove and replace with public room dir
/// Returns a vector over all rooms.
pub fn all_rooms(&self) -> Vec<RoomId> {
let mut room_ids = self
.roomid_pduleaves
.iter()
.keys()
.map(|key| {
RoomId::try_from(
&*utils::string_from_bytes(
&key.unwrap()
.iter()
.copied()
.take_while(|&x| x != 0xff) // until delimiter
.collect::<Vec<_>>(),
)
.unwrap(),
)
.unwrap()
})
.collect::<Vec<_>>();
room_ids.dedup();
room_ids
}
/// Returns the full room state.
pub fn room_state(&self, room_id: &RoomId) -> Result<HashMap<(EventType, String), PduEvent>> {
let mut hashmap = HashMap::new();

View file

@ -1,11 +1,13 @@
use crate::{utils, Result};
use crate::{utils, Error, Result};
use ruma_events::{collections::only::Event as EduEvent, EventJson};
use ruma_identifiers::{RoomId, UserId};
use std::convert::TryFrom;
pub struct RoomEdus {
pub(in super::super) roomuserid_lastread: sled::Tree, // RoomUserId = Room + User
pub(in super::super) roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Count + UserId
pub(in super::super) roomactiveid_roomactive: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count
pub(in super::super) roomactiveid_userid: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count
pub(in super::super) roomid_lastroomactiveupdate: sled::Tree, // LastRoomActiveUpdate = Count
}
impl RoomEdus {
@ -79,10 +81,11 @@ impl RoomEdus {
.map(|(_, v)| Ok(serde_json::from_slice(&v)?)))
}
/// Adds an event that will be saved until the `timeout` timestamp (e.g. typing notifications).
/// Sets a user as typing until the timeout timestamp is reached or roomactive_remove is
/// called.
pub fn roomactive_add(
&self,
event: EduEvent,
user_id: &UserId,
room_id: &RoomId,
timeout: u64,
globals: &super::super::globals::Globals,
@ -90,9 +93,73 @@ impl RoomEdus {
let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff);
// Cleanup all outdated edus before inserting a new one
let count = globals.next_count()?.to_be_bytes();
let mut room_active_id = prefix;
room_active_id.extend_from_slice(&timeout.to_be_bytes());
room_active_id.push(0xff);
room_active_id.extend_from_slice(&count);
self.roomactiveid_userid
.insert(&room_active_id, &*user_id.to_string().as_bytes())?;
self.roomid_lastroomactiveupdate
.insert(&room_id.to_string().as_bytes(), &count)?;
Ok(())
}
/// Removes a user from typing before the timeout is reached.
pub fn roomactive_remove(
&self,
user_id: &UserId,
room_id: &RoomId,
globals: &super::super::globals::Globals,
) -> Result<()> {
let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff);
let user_id = user_id.to_string();
let mut found_outdated = false;
// Maybe there are multiple ones from calling roomactive_add multiple times
for outdated_edu in self
.roomactiveid_roomactive
.roomactiveid_userid
.scan_prefix(&prefix)
.filter_map(|r| r.ok())
.filter(|(_, v)| v == user_id.as_bytes())
{
self.roomactiveid_userid.remove(outdated_edu.0)?;
found_outdated = true;
}
if found_outdated {
self.roomid_lastroomactiveupdate.insert(
&room_id.to_string().as_bytes(),
&globals.next_count()?.to_be_bytes(),
)?;
}
Ok(())
}
/// Makes sure that typing events with old timestamps get removed.
fn roomactives_maintain(
&self,
room_id: &RoomId,
globals: &super::super::globals::Globals,
) -> Result<()> {
let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff);
let current_timestamp = utils::millis_since_unix_epoch();
let mut found_outdated = false;
// Find all outdated edus before inserting a new one
for outdated_edu in self
.roomactiveid_userid
.scan_prefix(&prefix)
.keys()
.filter_map(|r| r.ok())
@ -101,60 +168,59 @@ impl RoomEdus {
k.split(|&c| c == 0xff)
.nth(1)
.expect("roomactive has valid timestamp and delimiters"),
) < utils::millis_since_unix_epoch()
) < current_timestamp
})
{
// This is an outdated edu (time > timestamp)
self.roomlatestid_roomlatest.remove(outdated_edu)?;
found_outdated = true;
}
let mut room_active_id = prefix;
room_active_id.extend_from_slice(&timeout.to_be_bytes());
room_active_id.push(0xff);
room_active_id.extend_from_slice(&globals.next_count()?.to_be_bytes());
self.roomactiveid_roomactive
.insert(room_active_id, &*serde_json::to_string(&event)?)?;
Ok(())
}
/// Removes an active event manually (before the timeout is reached).
pub fn roomactive_remove(&self, event: EduEvent, room_id: &RoomId) -> Result<()> {
let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff);
let json = serde_json::to_string(&event)?;
// Remove outdated entries
for outdated_edu in self
.roomactiveid_roomactive
.scan_prefix(&prefix)
.filter_map(|r| r.ok())
.filter(|(_, v)| v == json.as_bytes())
{
self.roomactiveid_roomactive.remove(outdated_edu.0)?;
if found_outdated {
self.roomid_lastroomactiveupdate.insert(
&room_id.to_string().as_bytes(),
&globals.next_count()?.to_be_bytes(),
)?;
}
Ok(())
}
/// Returns an iterator over all active events (e.g. typing notifications).
pub fn roomactives_all(
pub fn last_roomactive_update(
&self,
room_id: &RoomId,
) -> impl Iterator<Item = Result<EventJson<EduEvent>>> {
globals: &super::super::globals::Globals,
) -> Result<u64> {
self.roomactives_maintain(room_id, globals)?;
Ok(self
.roomid_lastroomactiveupdate
.get(&room_id.to_string().as_bytes())?
.map(|bytes| utils::u64_from_bytes(&bytes))
.unwrap_or(0))
}
/// Returns an iterator over all active events (e.g. typing notifications).
pub fn roomactives_all(&self, room_id: &RoomId) -> Result<ruma_events::typing::TypingEvent> {
let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff);
let mut first_active_edu = prefix.clone();
first_active_edu.extend_from_slice(&utils::millis_since_unix_epoch().to_be_bytes());
let mut user_ids = Vec::new();
self.roomactiveid_roomactive
.range(first_active_edu..)
.filter_map(|r| r.ok())
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(|(_, v)| Ok(serde_json::from_slice(&v)?))
for user_id in self
.roomactiveid_userid
.scan_prefix(prefix)
.values()
.map(|user_id| Ok::<_, Error>(UserId::try_from(utils::string_from_bytes(&user_id?)?)?))
{
user_ids.push(user_id?);
}
Ok(ruma_events::typing::TypingEvent {
content: ruma_events::typing::TypingEventContent { user_ids },
room_id: None, // Can be inferred
})
}
/// Sets a private read marker at `count`.