WIP improvement: much better state storage

This commit is contained in:
Timo Kösters 2021-08-01 15:14:54 +02:00
parent 9410d3ef9c
commit 41dd620d74
No known key found for this signature in database
GPG key ID: 356E705610F626D5
3 changed files with 345 additions and 10 deletions

View file

@ -24,13 +24,14 @@ use rocket::{
request::{FromRequest, Request},
Shutdown, State,
};
use ruma::{DeviceId, RoomId, ServerName, UserId};
use ruma::{DeviceId, EventId, RoomId, ServerName, UserId};
use serde::{de::IgnoredAny, Deserialize};
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, HashSet},
convert::TryFrom,
fs::{self, remove_dir_all},
io::Write,
mem::size_of,
ops::Deref,
path::Path,
sync::{Arc, Mutex, RwLock},
@ -261,7 +262,12 @@ impl Database {
userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?,
statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?,
shortroomid_roomid: builder.open_tree("shortroomid_roomid")?,
roomid_shortroomid: builder.open_tree("roomid_shortroomid")?,
stateid_shorteventid: builder.open_tree("stateid_shorteventid")?,
shortstatehash_statediff: builder.open_tree("shortstatehash_statediff")?,
eventid_shorteventid: builder.open_tree("eventid_shorteventid")?,
shorteventid_eventid: builder.open_tree("shorteventid_eventid")?,
shorteventid_shortstatehash: builder.open_tree("shorteventid_shortstatehash")?,
@ -438,6 +444,334 @@ impl Database {
println!("Migration: 5 -> 6 finished");
}
fn load_shortstatehash_info(
shortstatehash: &[u8],
db: &Database,
lru: &mut LruCache<
Vec<u8>,
Vec<(
Vec<u8>,
HashSet<Vec<u8>>,
HashSet<Vec<u8>>,
HashSet<Vec<u8>>,
)>,
>,
) -> Result<
Vec<(
Vec<u8>, // sstatehash
HashSet<Vec<u8>>, // full state
HashSet<Vec<u8>>, // added
HashSet<Vec<u8>>, // removed
)>,
> {
if let Some(result) = lru.get_mut(shortstatehash) {
return Ok(result.clone());
}
let value = db
.rooms
.shortstatehash_statediff
.get(shortstatehash)?
.ok_or_else(|| Error::bad_database("State hash does not exist"))?;
let parent = value[0..size_of::<u64>()].to_vec();
let mut add_mode = true;
let mut added = HashSet::new();
let mut removed = HashSet::new();
let mut i = size_of::<u64>();
while let Some(v) = value.get(i..i + 2 * size_of::<u64>()) {
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
add_mode = false;
i += size_of::<u64>();
continue;
}
if add_mode {
added.insert(v.to_vec());
} else {
removed.insert(v.to_vec());
}
i += 2 * size_of::<u64>();
}
if parent != 0_u64.to_be_bytes() {
let mut response = load_shortstatehash_info(&parent, db, lru)?;
let mut state = response.last().unwrap().1.clone();
state.extend(added.iter().cloned());
for r in &removed {
state.remove(r);
}
response.push((shortstatehash.to_vec(), state, added, removed));
lru.insert(shortstatehash.to_vec(), response.clone());
Ok(response)
} else {
let mut response = Vec::new();
response.push((shortstatehash.to_vec(), added.clone(), added, removed));
lru.insert(shortstatehash.to_vec(), response.clone());
Ok(response)
}
}
fn update_shortstatehash_level(
current_shortstatehash: &[u8],
statediffnew: HashSet<Vec<u8>>,
statediffremoved: HashSet<Vec<u8>>,
diff_to_sibling: usize,
mut parent_states: Vec<(
Vec<u8>, // sstatehash
HashSet<Vec<u8>>, // full state
HashSet<Vec<u8>>, // added
HashSet<Vec<u8>>, // removed
)>,
db: &Database,
) -> Result<()> {
let diffsum = statediffnew.len() + statediffremoved.len();
if parent_states.len() > 3 {
// Number of layers
// To many layers, we have to go deeper
let parent = parent_states.pop().unwrap();
let mut parent_new = parent.2;
let mut parent_removed = parent.3;
for removed in statediffremoved {
if !parent_new.remove(&removed) {
parent_removed.insert(removed);
}
}
parent_new.extend(statediffnew);
update_shortstatehash_level(
current_shortstatehash,
parent_new,
parent_removed,
diffsum,
parent_states,
db,
)?;
return Ok(());
}
if parent_states.len() == 0 {
// There is no parent layer, create a new state
let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent
for new in &statediffnew {
value.extend_from_slice(&new);
}
if !statediffremoved.is_empty() {
warn!("Tried to create new state with removals");
}
db.rooms
.shortstatehash_statediff
.insert(&current_shortstatehash, &value)?;
return Ok(());
};
// Else we have two options.
// 1. We add the current diff on top of the parent layer.
// 2. We replace a layer above
let parent = parent_states.pop().unwrap();
let parent_diff = parent.2.len() + parent.3.len();
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff {
// Diff too big, we replace above layer(s)
let mut parent_new = parent.2;
let mut parent_removed = parent.3;
for removed in statediffremoved {
if !parent_new.remove(&removed) {
parent_removed.insert(removed);
}
}
parent_new.extend(statediffnew);
update_shortstatehash_level(
current_shortstatehash,
parent_new,
parent_removed,
diffsum,
parent_states,
db,
)?;
} else {
// Diff small enough, we add diff as layer on top of parent
let mut value = parent.0.clone();
for new in &statediffnew {
value.extend_from_slice(&new);
}
if !statediffremoved.is_empty() {
value.extend_from_slice(&0_u64.to_be_bytes());
for removed in &statediffremoved {
value.extend_from_slice(&removed);
}
}
db.rooms
.shortstatehash_statediff
.insert(&current_shortstatehash, &value)?;
}
Ok(())
}
if db.globals.database_version()? < 7 {
// Upgrade state store
let mut lru = LruCache::new(1000);
let mut last_roomstates: HashMap<RoomId, Vec<u8>> = HashMap::new();
let mut current_sstatehash: Vec<u8> = Vec::new();
let mut current_room = None;
let mut current_state = HashSet::new();
let mut counter = 0;
for (k, seventid) in db._db.open_tree("stateid_shorteventid")?.iter() {
let sstatehash = k[0..size_of::<u64>()].to_vec();
let sstatekey = k[size_of::<u64>()..].to_vec();
if sstatehash != current_sstatehash {
if !current_sstatehash.is_empty() {
counter += 1;
println!("counter: {}", counter);
let current_room = current_room.as_ref().unwrap();
let last_roomsstatehash = last_roomstates.get(&current_room);
let states_parents = last_roomsstatehash.map_or_else(
|| Ok(Vec::new()),
|last_roomsstatehash| {
load_shortstatehash_info(&last_roomsstatehash, &db, &mut lru)
},
)?;
let (statediffnew, statediffremoved) =
if let Some(parent_stateinfo) = states_parents.last() {
let statediffnew = current_state
.difference(&parent_stateinfo.1)
.cloned()
.collect::<HashSet<_>>();
let statediffremoved = parent_stateinfo
.1
.difference(&current_state)
.cloned()
.collect::<HashSet<_>>();
(statediffnew, statediffremoved)
} else {
(current_state, HashSet::new())
};
update_shortstatehash_level(
&current_sstatehash,
statediffnew,
statediffremoved,
2, // every state change is 2 event changes on average
states_parents,
&db,
)?;
/*
let mut tmp = load_shortstatehash_info(&current_sstatehash, &db)?;
let state = tmp.pop().unwrap();
println!(
"{}\t{}{:?}: {:?} + {:?} - {:?}",
current_room,
" ".repeat(tmp.len()),
utils::u64_from_bytes(&current_sstatehash).unwrap(),
tmp.last().map(|b| utils::u64_from_bytes(&b.0).unwrap()),
state
.2
.iter()
.map(|b| utils::u64_from_bytes(&b[size_of::<u64>()..]).unwrap())
.collect::<Vec<_>>(),
state
.3
.iter()
.map(|b| utils::u64_from_bytes(&b[size_of::<u64>()..]).unwrap())
.collect::<Vec<_>>()
);
*/
last_roomstates.insert(current_room.clone(), current_sstatehash);
}
current_state = HashSet::new();
current_sstatehash = sstatehash;
let event_id = db
.rooms
.shorteventid_eventid
.get(&seventid)
.unwrap()
.unwrap();
let event_id =
EventId::try_from(utils::string_from_bytes(&event_id).unwrap())
.unwrap();
let pdu = db.rooms.get_pdu(&event_id).unwrap().unwrap();
if Some(&pdu.room_id) != current_room.as_ref() {
current_room = Some(pdu.room_id.clone());
}
}
let mut val = sstatekey;
val.extend_from_slice(&seventid);
current_state.insert(val);
}
db.globals.bump_database_version(7)?;
println!("Migration: 6 -> 7 finished");
}
if db.globals.database_version()? < 8 {
// Generate short room ids for all rooms
for (room_id, _) in db.rooms.roomid_shortstatehash.iter() {
let shortroomid = db.globals.next_count()?.to_be_bytes();
db.rooms.roomid_shortroomid.insert(&room_id, &shortroomid)?;
db.rooms.shortroomid_roomid.insert(&shortroomid, &room_id)?;
}
// Update pduids db layout
for (key, v) in db.rooms.pduid_pdu.iter() {
let mut parts = key.splitn(2, |&b| b == 0xff);
let room_id = parts.next().unwrap();
let count = parts.next().unwrap();
let short_room_id = db.rooms.roomid_shortroomid.get(&room_id)?.unwrap();
let mut new_key = short_room_id;
new_key.extend_from_slice(count);
println!("{:?}", new_key);
}
// Update tokenids db layout
for (key, _) in db.rooms.tokenids.iter() {
let mut parts = key.splitn(4, |&b| b == 0xff);
let room_id = parts.next().unwrap();
let word = parts.next().unwrap();
let _pdu_id_room = parts.next().unwrap();
let pdu_id_count = parts.next().unwrap();
let short_room_id = db.rooms.roomid_shortroomid.get(&room_id)?.unwrap();
let mut new_key = short_room_id;
new_key.extend_from_slice(word);
new_key.push(0xff);
new_key.extend_from_slice(pdu_id_count);
println!("{:?}", new_key);
}
db.globals.bump_database_version(8)?;
println!("Migration: 7 -> 8 finished");
}
panic!();
}
let guard = db.read().await;

View file

@ -223,10 +223,7 @@ impl Tree for SqliteTable {
let statement = Box::leak(Box::new(
guard
.prepare(&format!(
"SELECT key, value FROM {} ORDER BY key ASC",
&self.name
))
.prepare(&format!("SELECT key, value FROM {} ORDER BY key ASC", &self.name))
.unwrap(),
));

View file

@ -47,7 +47,7 @@ pub struct Rooms {
pub(super) aliasid_alias: Arc<dyn Tree>, // AliasId = RoomId + Count
pub(super) publicroomids: Arc<dyn Tree>,
pub(super) tokenids: Arc<dyn Tree>, // TokenId = RoomId + Token + PduId
pub(super) tokenids: Arc<dyn Tree>, // TokenId = ShortRoomId + Token + PduIdCount
/// Participating servers in a room.
pub(super) roomserverids: Arc<dyn Tree>, // RoomServerId = RoomId + ServerName
@ -71,14 +71,18 @@ pub struct Rooms {
pub(super) shorteventid_shortstatehash: Arc<dyn Tree>,
/// StateKey = EventType + StateKey, ShortStateKey = Count
pub(super) statekey_shortstatekey: Arc<dyn Tree>,
pub(super) shortroomid_roomid: Arc<dyn Tree>,
pub(super) roomid_shortroomid: Arc<dyn Tree>,
pub(super) shorteventid_eventid: Arc<dyn Tree>,
/// ShortEventId = Count
pub(super) eventid_shorteventid: Arc<dyn Tree>,
/// ShortEventId = Count
pub(super) statehash_shortstatehash: Arc<dyn Tree>,
/// ShortStateHash = Count
/// StateId = ShortStateHash + ShortStateKey
/// StateId = ShortStateHash
pub(super) stateid_shorteventid: Arc<dyn Tree>,
pub(super) shortstatehash_statediff: Arc<dyn Tree>, // StateDiff = parent (or 0) + (shortstatekey+shorteventid++) + 0_u64 + (shortstatekey+shorteventid--)
/// RoomId + EventId -> outlier PDU.
/// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.