style(presence): reformat with cargo

This commit is contained in:
Jakub Kubík 2022-11-18 22:39:05 +01:00
parent b18b228c7c
commit 42b27f2f60
No known key found for this signature in database
GPG key ID: D3A0D5D60F3A173F
7 changed files with 93 additions and 48 deletions

View file

@ -1,5 +1,9 @@
use crate::{services, Result, Ruma};
use ruma::{api::client::presence::{get_presence, set_presence}, uint, presence::PresenceState};
use ruma::{
api::client::presence::{get_presence, set_presence},
presence::PresenceState,
uint,
};
use std::time::Duration;
/// # `PUT /_matrix/client/r0/presence/{userId}/status`
@ -27,7 +31,7 @@ pub async fn set_presence_route(
},
sender: sender_user.clone(),
},
true
true,
)?;
}

View file

@ -109,7 +109,7 @@ pub async fn set_displayname_route(
},
sender: sender_user.clone(),
},
true
true,
)?;
}
@ -245,7 +245,7 @@ pub async fn set_avatar_url_route(
},
sender: sender_user.clone(),
},
true
true,
)?;
}

View file

@ -170,12 +170,7 @@ async fn sync_helper(
.rooms
.edus
.presence
.ping_presence(
&sender_user,
false,
true,
true
)?;
.ping_presence(&sender_user, false, true, true)?;
// Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device);

View file

@ -770,7 +770,7 @@ pub async fn send_transaction_message_route(
},
sender: user_id.clone(),
},
true
true,
)?;
}
}

View file

@ -1,6 +1,10 @@
use futures_util::{stream::FuturesUnordered, StreamExt};
use ruma::user_id;
use std::{collections::{HashMap, hash_map::Entry}, time::Duration, mem};
use std::{
collections::{hash_map::Entry, HashMap},
mem,
time::Duration,
};
use tracing::{error, info};
use ruma::{
@ -23,16 +27,24 @@ pub struct PresenceUpdate {
impl PresenceUpdate {
fn to_be_bytes(&self) -> Vec<u8> {
[self.count.to_be_bytes(), self.prev_timestamp.to_be_bytes(), self.curr_timestamp.to_be_bytes()].concat()
[
self.count.to_be_bytes(),
self.prev_timestamp.to_be_bytes(),
self.curr_timestamp.to_be_bytes(),
]
.concat()
}
fn from_be_bytes(bytes: &[u8]) -> Result<Self> {
let (count_bytes, timestamps_bytes) = bytes.split_at(mem::size_of::<u64>());
let (prev_timestamp_bytes, curr_timestamp_bytes) = timestamps_bytes.split_at(mem::size_of::<u64>());
let (prev_timestamp_bytes, curr_timestamp_bytes) =
timestamps_bytes.split_at(mem::size_of::<u64>());
Ok(Self {
count: u64_from_bytes(count_bytes).expect("count bytes from DB are valid"),
prev_timestamp: u64_from_bytes(prev_timestamp_bytes).expect("timestamp bytes from DB are valid"),
curr_timestamp: u64_from_bytes(curr_timestamp_bytes).expect("timestamp bytes from DB are valid"),
prev_timestamp: u64_from_bytes(prev_timestamp_bytes)
.expect("timestamp bytes from DB are valid"),
curr_timestamp: u64_from_bytes(curr_timestamp_bytes)
.expect("timestamp bytes from DB are valid"),
})
}
}
@ -69,33 +81,47 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
Ok(())
}
fn ping_presence(&self, user_id: &UserId, update_count: bool, update_timestamp: bool) -> Result<()> {
fn ping_presence(
&self,
user_id: &UserId,
update_count: bool,
update_timestamp: bool,
) -> Result<()> {
let now = millis_since_unix_epoch();
let presence = self.userid_presenceupdate
let presence = self
.userid_presenceupdate
.get(user_id.as_bytes())?
.map(|presence_bytes| PresenceUpdate::from_be_bytes(&presence_bytes))
.transpose()?;
let new_presence = match presence {
Some(presence) => {
PresenceUpdate {
count: if update_count { services().globals.next_count()? } else { presence.count },
prev_timestamp: if update_timestamp { presence.curr_timestamp } else { presence.prev_timestamp },
curr_timestamp: if update_timestamp { now } else { presence.curr_timestamp }
}
Some(presence) => PresenceUpdate {
count: if update_count {
services().globals.next_count()?
} else {
presence.count
},
prev_timestamp: if update_timestamp {
presence.curr_timestamp
} else {
presence.prev_timestamp
},
curr_timestamp: if update_timestamp {
now
} else {
presence.curr_timestamp
},
},
None => PresenceUpdate {
count: services().globals.current_count()?,
prev_timestamp: now,
curr_timestamp: now,
}
},
};
self.userid_presenceupdate.insert(
user_id.as_bytes(),
&*new_presence.to_be_bytes(),
)?;
self.userid_presenceupdate
.insert(user_id.as_bytes(), &*new_presence.to_be_bytes())?;
Ok(())
}
@ -103,7 +129,10 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<(u64, u64)>> {
self.userid_presenceupdate
.get(user_id.as_bytes())?
.map(|bytes| PresenceUpdate::from_be_bytes(&bytes).map(|update| (update.prev_timestamp, update.curr_timestamp)))
.map(|bytes| {
PresenceUpdate::from_be_bytes(&bytes)
.map(|update| (update.prev_timestamp, update.curr_timestamp))
})
.transpose()
}
@ -132,15 +161,16 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
Some((
UserId::parse(
utils::string_from_bytes(&user_id_bytes)
.expect("UserID bytes are a valid string")
).expect("UserID bytes from database are a valid UserID"),
.expect("UserID bytes are a valid string"),
)
.expect("UserID bytes from database are a valid UserID"),
PresenceUpdate::from_be_bytes(&update_bytes)
.expect("PresenceUpdate bytes from database are a valid PresenceUpdate"),
.expect("PresenceUpdate bytes from database are a valid PresenceUpdate"),
))
})
.filter_map(|(user_id, presence_update)| {
if presence_update.count <= since
|| !services()
|| !services()
.rooms
.state_cache
.is_joined(&user_id, room_id)
@ -157,12 +187,15 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
self.roomuserid_presenceevent
.scan_prefix(room_id.as_bytes().to_vec())
.filter_map(|(roomuserid_bytes, presence_bytes)| {
let user_id_bytes = roomuserid_bytes.split(|byte| *byte == 0xff as u8).last()?;
let user_id_bytes =
roomuserid_bytes.split(|byte| *byte == 0xff as u8).last()?;
Some((
UserId::parse(
utils::string_from_bytes(&user_id_bytes)
.expect("UserID bytes are a valid string")
).expect("UserID bytes from database are a valid UserID").to_owned(),
utils::string_from_bytes(&user_id_bytes)
.expect("UserID bytes are a valid string"),
)
.expect("UserID bytes from database are a valid UserID")
.to_owned(),
presence_bytes,
))
})
@ -172,8 +205,9 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
Some((
user_id,
parse_presence_event(&presence_bytes, *timestamp)
.expect("PresenceEvent bytes from database are a valid PresenceEvent"),
parse_presence_event(&presence_bytes, *timestamp).expect(
"PresenceEvent bytes from database are a valid PresenceEvent",
),
))
},
),

View file

@ -15,7 +15,12 @@ pub trait Data: Send + Sync {
) -> Result<()>;
/// Resets the presence timeout, so the user will stay in their current presence state.
fn ping_presence(&self, user_id: &UserId, update_count: bool, update_timestamp: bool) -> Result<()>;
fn ping_presence(
&self,
user_id: &UserId,
update_count: bool,
update_timestamp: bool,
) -> Result<()>;
/// Returns the timestamp of the last presence update of this user in millis since the unix epoch.
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<(u64, u64)>>;

View file

@ -21,19 +21,26 @@ impl Service {
db,
timer_sender: sender,
};
service.presence_maintain(receiver)?;
Ok(service)
}
/// Resets the presence timeout, so the user will stay in their current presence state.
pub fn ping_presence(&self, user_id: &UserId, update_count: bool, update_timestamp: bool, spawn_timer: bool) -> Result<()> {
pub fn ping_presence(
&self,
user_id: &UserId,
update_count: bool,
update_timestamp: bool,
spawn_timer: bool,
) -> Result<()> {
if spawn_timer {
self.spawn_timer(user_id)?;
}
self.db.ping_presence(user_id, update_count, update_timestamp)
self.db
.ping_presence(user_id, update_count, update_timestamp)
}
/// Adds a presence event which will be saved until a new event replaces it.
@ -45,7 +52,7 @@ impl Service {
user_id: &UserId,
room_id: &RoomId,
presence: PresenceEvent,
spawn_timer: bool
spawn_timer: bool,
) -> Result<()> {
if spawn_timer {
self.spawn_timer(user_id)?;
@ -85,9 +92,9 @@ impl Service {
/// Spawns a task maintaining presence data
fn presence_maintain(
&self,
timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
) -> Result<()> {
&self,
timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
) -> Result<()> {
self.db.presence_maintain(timer_receiver)
}