Merge pull request 'improvement: only remove to-device events when sure the client received them' (#139) from improve-to-device-removal into master

Reviewed-on: https://git.koesters.xyz/timo/conduit/pulls/139
This commit is contained in:
Timo Kösters 2020-07-26 14:14:08 +02:00
commit f8544bf6e3
2 changed files with 43 additions and 8 deletions

View file

@ -2659,6 +2659,9 @@ pub fn sync_route(
} }
} }
// Remove all to-device events the device received *last time*
db.users.remove_to_device_events(user_id, device_id, since)?;
Ok(sync_events::Response { Ok(sync_events::Response {
next_batch, next_batch,
rooms: sync_events::Rooms { rooms: sync_events::Rooms {
@ -2711,7 +2714,7 @@ pub fn sync_route(
}, },
device_one_time_keys_count: Default::default(), // TODO device_one_time_keys_count: Default::default(), // TODO
to_device: sync_events::ToDevice { to_device: sync_events::ToDevice {
events: db.users.take_to_device_events(user_id, device_id, 100)?, events: db.users.get_to_device_events(user_id, device_id)?,
}, },
} }
.into()) .into())

View file

@ -11,7 +11,7 @@ use ruma::{
events::{AnyToDeviceEvent, EventJson, EventType}, events::{AnyToDeviceEvent, EventJson, EventType},
identifiers::{DeviceId, UserId}, identifiers::{DeviceId, UserId},
}; };
use std::{collections::BTreeMap, convert::TryFrom, time::SystemTime}; use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime};
pub struct Users { pub struct Users {
pub(super) userid_password: sled::Tree, pub(super) userid_password: sled::Tree,
@ -660,11 +660,10 @@ impl Users {
Ok(()) Ok(())
} }
pub fn take_to_device_events( pub fn get_to_device_events(
&self, &self,
user_id: &UserId, user_id: &UserId,
device_id: &DeviceId, device_id: &DeviceId,
max: usize,
) -> Result<Vec<EventJson<AnyToDeviceEvent>>> { ) -> Result<Vec<EventJson<AnyToDeviceEvent>>> {
let mut events = Vec::new(); let mut events = Vec::new();
@ -673,18 +672,51 @@ impl Users {
prefix.extend_from_slice(device_id.as_str().as_bytes()); prefix.extend_from_slice(device_id.as_str().as_bytes());
prefix.push(0xff); prefix.push(0xff);
for result in self.todeviceid_events.scan_prefix(&prefix).take(max) { for value in self.todeviceid_events.scan_prefix(&prefix).values() {
let (key, value) = result?;
events.push( events.push(
serde_json::from_slice(&*value) serde_json::from_slice(&*value?)
.map_err(|_| Error::bad_database("Event in todeviceid_events is invalid."))?, .map_err(|_| Error::bad_database("Event in todeviceid_events is invalid."))?,
); );
self.todeviceid_events.remove(key)?;
} }
Ok(events) Ok(events)
} }
pub fn remove_to_device_events(
&self,
user_id: &UserId,
device_id: &DeviceId,
until: u64,
) -> Result<()> {
let mut prefix = user_id.to_string().as_bytes().to_vec();
prefix.push(0xff);
prefix.extend_from_slice(device_id.as_ref().as_bytes());
prefix.push(0xff);
let mut last = prefix.clone();
last.extend_from_slice(&until.to_be_bytes());
for (key, _) in self
.todeviceid_events
.range(&*prefix..=&*last)
.keys()
.map(|key| {
let key = key?;
Ok::<_, Error>((
key.clone(),
utils::u64_from_bytes(&key[key.len() - mem::size_of::<u64>()..key.len()])
.map_err(|_| Error::bad_database("ToDeviceId has invalid count bytes."))?,
))
})
.filter_map(|r| r.ok())
.take_while(|&(_, count)| count <= until)
{
self.todeviceid_events.remove(key)?;
}
Ok(())
}
pub fn update_device_metadata( pub fn update_device_metadata(
&self, &self,
user_id: &UserId, user_id: &UserId,