2021-06-08 16:10:00 +00:00
pub mod abstraction ;
2022-09-06 21:15:09 +00:00
pub mod key_value ;
2021-06-08 16:10:00 +00:00
2022-10-05 18:34:31 +00:00
use crate ::{
service ::{
2022-10-05 18:41:05 +00:00
rooms ::{ state_compressor ::CompressedStateEvent } ,
2022-10-05 18:34:31 +00:00
} ,
services , utils , Config , Error , PduEvent , Result , Services , SERVICES ,
} ;
2022-06-25 14:12:23 +00:00
use abstraction ::KeyValueDatabaseEngine ;
2022-10-05 18:34:31 +00:00
use abstraction ::KvTree ;
2020-03-30 11:46:18 +00:00
use directories ::ProjectDirs ;
2022-10-05 18:41:05 +00:00
use futures_util ::{ StreamExt } ;
2021-06-30 07:52:01 +00:00
use lru_cache ::LruCache ;
2022-04-07 12:11:55 +00:00
use ruma ::{
events ::{
2022-01-18 15:53:25 +00:00
push_rules ::PushRulesEventContent , room ::message ::RoomMessageEventContent ,
2022-10-05 10:45:54 +00:00
GlobalAccountDataEvent , GlobalAccountDataEventType , StateEventType ,
2022-04-07 12:11:55 +00:00
} ,
push ::Ruleset ,
2022-10-05 18:34:31 +00:00
signatures ::CanonicalJsonValue ,
DeviceId , EventId , RoomId , UserId ,
2022-04-07 12:11:55 +00:00
} ;
2021-06-08 16:24:36 +00:00
use std ::{
2021-08-01 13:14:54 +00:00
collections ::{ BTreeMap , HashMap , HashSet } ,
2021-06-08 16:24:36 +00:00
fs ::{ self , remove_dir_all } ,
io ::Write ,
2021-08-01 13:14:54 +00:00
mem ::size_of ,
2021-07-14 07:07:08 +00:00
path ::Path ,
2021-07-18 18:43:39 +00:00
sync ::{ Arc , Mutex , RwLock } ,
2021-06-08 16:24:36 +00:00
} ;
2022-10-05 18:41:05 +00:00
use tokio ::sync ::{ mpsc } ;
2022-02-03 18:52:41 +00:00
use tracing ::{ debug , error , info , warn } ;
2022-06-25 14:12:23 +00:00
pub struct KeyValueDatabase {
_db : Arc < dyn KeyValueDatabaseEngine > ,
2022-09-06 21:15:09 +00:00
//pub globals: globals::Globals,
pub ( super ) global : Arc < dyn KvTree > ,
pub ( super ) server_signingkeys : Arc < dyn KvTree > ,
//pub users: users::Users,
pub ( super ) userid_password : Arc < dyn KvTree > ,
pub ( super ) userid_displayname : Arc < dyn KvTree > ,
pub ( super ) userid_avatarurl : Arc < dyn KvTree > ,
pub ( super ) userid_blurhash : Arc < dyn KvTree > ,
pub ( super ) userdeviceid_token : Arc < dyn KvTree > ,
pub ( super ) userdeviceid_metadata : Arc < dyn KvTree > , // This is also used to check if a device exists
pub ( super ) userid_devicelistversion : Arc < dyn KvTree > , // DevicelistVersion = u64
pub ( super ) token_userdeviceid : Arc < dyn KvTree > ,
pub ( super ) onetimekeyid_onetimekeys : Arc < dyn KvTree > , // OneTimeKeyId = UserId + DeviceKeyId
pub ( super ) userid_lastonetimekeyupdate : Arc < dyn KvTree > , // LastOneTimeKeyUpdate = Count
pub ( super ) keychangeid_userid : Arc < dyn KvTree > , // KeyChangeId = UserId/RoomId + Count
pub ( super ) keyid_key : Arc < dyn KvTree > , // KeyId = UserId + KeyId (depends on key type)
pub ( super ) userid_masterkeyid : Arc < dyn KvTree > ,
pub ( super ) userid_selfsigningkeyid : Arc < dyn KvTree > ,
pub ( super ) userid_usersigningkeyid : Arc < dyn KvTree > ,
pub ( super ) userfilterid_filter : Arc < dyn KvTree > , // UserFilterId = UserId + FilterId
pub ( super ) todeviceid_events : Arc < dyn KvTree > , // ToDeviceId = UserId + DeviceId + Count
//pub uiaa: uiaa::Uiaa,
pub ( super ) userdevicesessionid_uiaainfo : Arc < dyn KvTree > , // User-interactive authentication
pub ( super ) userdevicesessionid_uiaarequest :
RwLock < BTreeMap < ( Box < UserId > , Box < DeviceId > , String ) , CanonicalJsonValue > > ,
//pub edus: RoomEdus,
pub ( super ) readreceiptid_readreceipt : Arc < dyn KvTree > , // ReadReceiptId = RoomId + Count + UserId
pub ( super ) roomuserid_privateread : Arc < dyn KvTree > , // RoomUserId = Room + User, PrivateRead = Count
pub ( super ) roomuserid_lastprivatereadupdate : Arc < dyn KvTree > , // LastPrivateReadUpdate = Count
2022-10-05 18:34:31 +00:00
pub ( super ) typingid_userid : Arc < dyn KvTree > , // TypingId = RoomId + TimeoutTime + Count
2022-09-06 21:15:09 +00:00
pub ( super ) roomid_lasttypingupdate : Arc < dyn KvTree > , // LastRoomTypingUpdate = Count
2022-10-05 18:34:31 +00:00
pub ( super ) presenceid_presence : Arc < dyn KvTree > , // PresenceId = RoomId + Count + UserId
2022-09-06 21:15:09 +00:00
pub ( super ) userid_lastpresenceupdate : Arc < dyn KvTree > , // LastPresenceUpdate = Count
//pub rooms: rooms::Rooms,
pub ( super ) pduid_pdu : Arc < dyn KvTree > , // PduId = ShortRoomId + Count
pub ( super ) eventid_pduid : Arc < dyn KvTree > ,
pub ( super ) roomid_pduleaves : Arc < dyn KvTree > ,
pub ( super ) alias_roomid : Arc < dyn KvTree > ,
pub ( super ) aliasid_alias : Arc < dyn KvTree > , // AliasId = RoomId + Count
pub ( super ) publicroomids : Arc < dyn KvTree > ,
pub ( super ) tokenids : Arc < dyn KvTree > , // TokenId = ShortRoomId + Token + PduIdCount
/// Participating servers in a room.
pub ( super ) roomserverids : Arc < dyn KvTree > , // RoomServerId = RoomId + ServerName
pub ( super ) serverroomids : Arc < dyn KvTree > , // ServerRoomId = ServerName + RoomId
pub ( super ) userroomid_joined : Arc < dyn KvTree > ,
pub ( super ) roomuserid_joined : Arc < dyn KvTree > ,
pub ( super ) roomid_joinedcount : Arc < dyn KvTree > ,
pub ( super ) roomid_invitedcount : Arc < dyn KvTree > ,
pub ( super ) roomuseroncejoinedids : Arc < dyn KvTree > ,
pub ( super ) userroomid_invitestate : Arc < dyn KvTree > , // InviteState = Vec<Raw<Pdu>>
pub ( super ) roomuserid_invitecount : Arc < dyn KvTree > , // InviteCount = Count
pub ( super ) userroomid_leftstate : Arc < dyn KvTree > ,
pub ( super ) roomuserid_leftcount : Arc < dyn KvTree > ,
pub ( super ) disabledroomids : Arc < dyn KvTree > , // Rooms where incoming federation handling is disabled
pub ( super ) lazyloadedids : Arc < dyn KvTree > , // LazyLoadedIds = UserId + DeviceId + RoomId + LazyLoadedUserId
pub ( super ) userroomid_notificationcount : Arc < dyn KvTree > , // NotifyCount = u64
pub ( super ) userroomid_highlightcount : Arc < dyn KvTree > , // HightlightCount = u64
/// Remember the current state hash of a room.
pub ( super ) roomid_shortstatehash : Arc < dyn KvTree > ,
pub ( super ) roomsynctoken_shortstatehash : Arc < dyn KvTree > ,
/// Remember the state hash at events in the past.
pub ( super ) shorteventid_shortstatehash : Arc < dyn KvTree > ,
/// StateKey = EventType + StateKey, ShortStateKey = Count
pub ( super ) statekey_shortstatekey : Arc < dyn KvTree > ,
pub ( super ) shortstatekey_statekey : Arc < dyn KvTree > ,
pub ( super ) roomid_shortroomid : Arc < dyn KvTree > ,
pub ( super ) shorteventid_eventid : Arc < dyn KvTree > ,
pub ( super ) eventid_shorteventid : Arc < dyn KvTree > ,
pub ( super ) statehash_shortstatehash : Arc < dyn KvTree > ,
pub ( super ) shortstatehash_statediff : Arc < dyn KvTree > , // StateDiff = parent (or 0) + (shortstatekey+shorteventid++) + 0_u64 + (shortstatekey+shorteventid--)
pub ( super ) shorteventid_authchain : Arc < dyn KvTree > ,
/// RoomId + EventId -> outlier PDU.
/// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.
pub ( super ) eventid_outlierpdu : Arc < dyn KvTree > ,
pub ( super ) softfailedeventids : Arc < dyn KvTree > ,
/// RoomId + EventId -> Parent PDU EventId.
pub ( super ) referencedevents : Arc < dyn KvTree > ,
//pub account_data: account_data::AccountData,
pub ( super ) roomuserdataid_accountdata : Arc < dyn KvTree > , // RoomUserDataId = Room + User + Count + Type
pub ( super ) roomusertype_roomuserdataid : Arc < dyn KvTree > , // RoomUserType = Room + User + Type
//pub media: media::Media,
pub ( super ) mediaid_file : Arc < dyn KvTree > , // MediaId = MXC + WidthHeight + ContentDisposition + ContentType
//pub key_backups: key_backups::KeyBackups,
pub ( super ) backupid_algorithm : Arc < dyn KvTree > , // BackupId = UserId + Version(Count)
pub ( super ) backupid_etag : Arc < dyn KvTree > , // BackupId = UserId + Version(Count)
pub ( super ) backupkeyid_backup : Arc < dyn KvTree > , // BackupKeyId = UserId + Version + RoomId + SessionId
//pub transaction_ids: transaction_ids::TransactionIds,
pub ( super ) userdevicetxnid_response : Arc < dyn KvTree > , // Response can be empty (/sendToDevice) or the event id (/send)
//pub sending: sending::Sending,
pub ( super ) servername_educount : Arc < dyn KvTree > , // EduCount: Count of last EDU sync
pub ( super ) servernameevent_data : Arc < dyn KvTree > , // ServernameEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content
pub ( super ) servercurrentevent_data : Arc < dyn KvTree > , // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content
//pub appservice: appservice::Appservice,
pub ( super ) id_appserviceregistrations : Arc < dyn KvTree > ,
//pub pusher: pusher::PushData,
pub ( super ) senderkey_pusher : Arc < dyn KvTree > ,
2022-10-05 10:45:54 +00:00
pub ( super ) cached_registrations : Arc < RwLock < HashMap < String , serde_yaml ::Value > > > ,
pub ( super ) pdu_cache : Mutex < LruCache < Box < EventId > , Arc < PduEvent > > > ,
pub ( super ) shorteventid_cache : Mutex < LruCache < u64 , Arc < EventId > > > ,
pub ( super ) auth_chain_cache : Mutex < LruCache < Vec < u64 > , Arc < HashSet < u64 > > > > ,
pub ( super ) eventidshort_cache : Mutex < LruCache < Box < EventId > , u64 > > ,
pub ( super ) statekeyshort_cache : Mutex < LruCache < ( StateEventType , String ) , u64 > > ,
pub ( super ) shortstatekey_cache : Mutex < LruCache < u64 , ( StateEventType , String ) > > ,
pub ( super ) our_real_users_cache : RwLock < HashMap < Box < RoomId > , Arc < HashSet < Box < UserId > > > > > ,
pub ( super ) appservice_in_room_cache : RwLock < HashMap < Box < RoomId > , HashMap < String , bool > > > ,
pub ( super ) lazy_load_waiting :
Mutex < HashMap < ( Box < UserId > , Box < DeviceId > , Box < RoomId > , u64 ) , HashSet < Box < UserId > > > > ,
pub ( super ) stateinfo_cache : Mutex <
LruCache <
u64 ,
Vec < (
u64 , // sstatehash
HashSet < CompressedStateEvent > , // full state
HashSet < CompressedStateEvent > , // added
HashSet < CompressedStateEvent > , // removed
) > ,
> ,
> ,
pub ( super ) lasttimelinecount_cache : Mutex < HashMap < Box < RoomId > , u64 > > ,
2020-03-30 11:46:18 +00:00
}
2022-06-25 14:12:23 +00:00
impl KeyValueDatabase {
2020-04-10 11:36:57 +00:00
/// Tries to remove the old database but ignores all errors.
2020-06-09 13:13:17 +00:00
pub fn try_remove ( server_name : & str ) -> Result < ( ) > {
2020-04-11 18:03:22 +00:00
let mut path = ProjectDirs ::from ( " xyz " , " koesters " , " conduit " )
2020-11-15 11:17:21 +00:00
. ok_or_else ( | | Error ::bad_config ( " The OS didn't return a valid home directory path. " ) ) ?
2020-04-10 11:36:57 +00:00
. data_dir ( )
. to_path_buf ( ) ;
2020-05-06 13:36:44 +00:00
path . push ( server_name ) ;
2020-04-10 11:36:57 +00:00
let _ = remove_dir_all ( path ) ;
2020-06-09 13:13:17 +00:00
Ok ( ( ) )
2020-04-10 11:36:57 +00:00
}
2022-01-09 15:44:44 +00:00
fn check_db_setup ( config : & Config ) -> Result < ( ) > {
let path = Path ::new ( & config . database_path ) ;
let sled_exists = path . join ( " db " ) . exists ( ) ;
let sqlite_exists = path . join ( " conduit.db " ) . exists ( ) ;
let rocksdb_exists = path . join ( " IDENTITY " ) . exists ( ) ;
let mut count = 0 ;
if sled_exists {
count + = 1 ;
}
if sqlite_exists {
count + = 1 ;
}
if rocksdb_exists {
count + = 1 ;
}
if count > 1 {
warn! ( " Multiple databases at database_path detected " ) ;
return Ok ( ( ) ) ;
}
2022-01-19 23:10:39 +00:00
if sled_exists & & config . database_backend ! = " sled " {
return Err ( Error ::bad_config (
" Found sled at database_path, but is not specified in config. " ,
) ) ;
2022-01-09 15:44:44 +00:00
}
2022-01-19 23:10:39 +00:00
if sqlite_exists & & config . database_backend ! = " sqlite " {
return Err ( Error ::bad_config (
" Found sqlite at database_path, but is not specified in config. " ,
) ) ;
2022-01-09 15:44:44 +00:00
}
2022-01-19 23:10:39 +00:00
if rocksdb_exists & & config . database_backend ! = " rocksdb " {
return Err ( Error ::bad_config (
" Found rocksdb at database_path, but is not specified in config. " ,
) ) ;
2021-07-14 07:07:08 +00:00
}
Ok ( ( ) )
}
2020-03-30 11:46:18 +00:00
/// Load an existing database or create a new one.
2022-10-05 16:36:12 +00:00
pub async fn load_or_create ( config : Config ) -> Result < ( ) > {
Self ::check_db_setup ( & config ) ? ;
2021-07-14 07:07:08 +00:00
2021-09-07 18:41:14 +00:00
if ! Path ::new ( & config . database_path ) . exists ( ) {
std ::fs ::create_dir_all ( & config . database_path )
. map_err ( | _ | Error ::BadConfig ( " Database folder doesn't exists and couldn't be created (e.g. due to missing permissions). Please create the database folder yourself. " ) ) ? ;
}
2022-06-25 14:12:23 +00:00
let builder : Arc < dyn KeyValueDatabaseEngine > = match & * config . database_backend {
2022-01-09 15:44:44 +00:00
" sqlite " = > {
#[ cfg(not(feature = " sqlite " )) ]
return Err ( Error ::BadConfig ( " Database backend not found. " ) ) ;
#[ cfg(feature = " sqlite " ) ]
2022-10-05 16:36:12 +00:00
Arc ::new ( Arc ::< abstraction ::sqlite ::Engine > ::open ( & config ) ? )
2022-01-09 15:44:44 +00:00
}
" rocksdb " = > {
#[ cfg(not(feature = " rocksdb " )) ]
return Err ( Error ::BadConfig ( " Database backend not found. " ) ) ;
#[ cfg(feature = " rocksdb " ) ]
2022-10-05 16:36:12 +00:00
Arc ::new ( Arc ::< abstraction ::rocksdb ::Engine > ::open ( & config ) ? )
2022-01-09 15:44:44 +00:00
}
2021-06-17 23:38:32 +00:00
" persy " = > {
#[ cfg(not(feature = " persy " )) ]
return Err ( Error ::BadConfig ( " Database backend not found. " ) ) ;
#[ cfg(feature = " persy " ) ]
2022-10-05 16:36:12 +00:00
Arc ::new ( Arc ::< abstraction ::persy ::Engine > ::open ( & config ) ? )
2021-06-17 23:38:32 +00:00
}
2022-01-09 15:44:44 +00:00
_ = > {
return Err ( Error ::BadConfig ( " Database backend not found. " ) ) ;
}
} ;
2020-10-21 19:43:59 +00:00
2021-05-22 08:34:19 +00:00
if config . max_request_size < 1024 {
eprintln! ( " ERROR: Max request size is less than 1KB. Please increase it. " ) ;
}
2020-03-30 11:46:18 +00:00
2022-01-20 10:51:31 +00:00
let ( admin_sender , admin_receiver ) = mpsc ::unbounded_channel ( ) ;
let ( sending_sender , sending_receiver ) = mpsc ::unbounded_channel ( ) ;
2020-11-09 11:21:04 +00:00
2022-10-05 10:45:54 +00:00
let db = Arc ::new ( Self {
2021-07-14 07:07:08 +00:00
_db : builder . clone ( ) ,
2022-10-05 18:34:31 +00:00
userid_password : builder . open_tree ( " userid_password " ) ? ,
userid_displayname : builder . open_tree ( " userid_displayname " ) ? ,
userid_avatarurl : builder . open_tree ( " userid_avatarurl " ) ? ,
userid_blurhash : builder . open_tree ( " userid_blurhash " ) ? ,
userdeviceid_token : builder . open_tree ( " userdeviceid_token " ) ? ,
userdeviceid_metadata : builder . open_tree ( " userdeviceid_metadata " ) ? ,
userid_devicelistversion : builder . open_tree ( " userid_devicelistversion " ) ? ,
token_userdeviceid : builder . open_tree ( " token_userdeviceid " ) ? ,
onetimekeyid_onetimekeys : builder . open_tree ( " onetimekeyid_onetimekeys " ) ? ,
userid_lastonetimekeyupdate : builder . open_tree ( " userid_lastonetimekeyupdate " ) ? ,
keychangeid_userid : builder . open_tree ( " keychangeid_userid " ) ? ,
keyid_key : builder . open_tree ( " keyid_key " ) ? ,
userid_masterkeyid : builder . open_tree ( " userid_masterkeyid " ) ? ,
userid_selfsigningkeyid : builder . open_tree ( " userid_selfsigningkeyid " ) ? ,
userid_usersigningkeyid : builder . open_tree ( " userid_usersigningkeyid " ) ? ,
userfilterid_filter : builder . open_tree ( " userfilterid_filter " ) ? ,
todeviceid_events : builder . open_tree ( " todeviceid_events " ) ? ,
userdevicesessionid_uiaainfo : builder . open_tree ( " userdevicesessionid_uiaainfo " ) ? ,
userdevicesessionid_uiaarequest : RwLock ::new ( BTreeMap ::new ( ) ) ,
readreceiptid_readreceipt : builder . open_tree ( " readreceiptid_readreceipt " ) ? ,
roomuserid_privateread : builder . open_tree ( " roomuserid_privateread " ) ? , // "Private" read receipt
roomuserid_lastprivatereadupdate : builder
. open_tree ( " roomuserid_lastprivatereadupdate " ) ? ,
typingid_userid : builder . open_tree ( " typingid_userid " ) ? ,
roomid_lasttypingupdate : builder . open_tree ( " roomid_lasttypingupdate " ) ? ,
presenceid_presence : builder . open_tree ( " presenceid_presence " ) ? ,
userid_lastpresenceupdate : builder . open_tree ( " userid_lastpresenceupdate " ) ? ,
pduid_pdu : builder . open_tree ( " pduid_pdu " ) ? ,
eventid_pduid : builder . open_tree ( " eventid_pduid " ) ? ,
roomid_pduleaves : builder . open_tree ( " roomid_pduleaves " ) ? ,
alias_roomid : builder . open_tree ( " alias_roomid " ) ? ,
aliasid_alias : builder . open_tree ( " aliasid_alias " ) ? ,
publicroomids : builder . open_tree ( " publicroomids " ) ? ,
tokenids : builder . open_tree ( " tokenids " ) ? ,
roomserverids : builder . open_tree ( " roomserverids " ) ? ,
serverroomids : builder . open_tree ( " serverroomids " ) ? ,
userroomid_joined : builder . open_tree ( " userroomid_joined " ) ? ,
roomuserid_joined : builder . open_tree ( " roomuserid_joined " ) ? ,
roomid_joinedcount : builder . open_tree ( " roomid_joinedcount " ) ? ,
roomid_invitedcount : builder . open_tree ( " roomid_invitedcount " ) ? ,
roomuseroncejoinedids : builder . open_tree ( " roomuseroncejoinedids " ) ? ,
userroomid_invitestate : builder . open_tree ( " userroomid_invitestate " ) ? ,
roomuserid_invitecount : builder . open_tree ( " roomuserid_invitecount " ) ? ,
userroomid_leftstate : builder . open_tree ( " userroomid_leftstate " ) ? ,
roomuserid_leftcount : builder . open_tree ( " roomuserid_leftcount " ) ? ,
disabledroomids : builder . open_tree ( " disabledroomids " ) ? ,
lazyloadedids : builder . open_tree ( " lazyloadedids " ) ? ,
userroomid_notificationcount : builder . open_tree ( " userroomid_notificationcount " ) ? ,
userroomid_highlightcount : builder . open_tree ( " userroomid_highlightcount " ) ? ,
statekey_shortstatekey : builder . open_tree ( " statekey_shortstatekey " ) ? ,
shortstatekey_statekey : builder . open_tree ( " shortstatekey_statekey " ) ? ,
shorteventid_authchain : builder . open_tree ( " shorteventid_authchain " ) ? ,
roomid_shortroomid : builder . open_tree ( " roomid_shortroomid " ) ? ,
shortstatehash_statediff : builder . open_tree ( " shortstatehash_statediff " ) ? ,
eventid_shorteventid : builder . open_tree ( " eventid_shorteventid " ) ? ,
shorteventid_eventid : builder . open_tree ( " shorteventid_eventid " ) ? ,
shorteventid_shortstatehash : builder . open_tree ( " shorteventid_shortstatehash " ) ? ,
roomid_shortstatehash : builder . open_tree ( " roomid_shortstatehash " ) ? ,
roomsynctoken_shortstatehash : builder . open_tree ( " roomsynctoken_shortstatehash " ) ? ,
statehash_shortstatehash : builder . open_tree ( " statehash_shortstatehash " ) ? ,
eventid_outlierpdu : builder . open_tree ( " eventid_outlierpdu " ) ? ,
softfailedeventids : builder . open_tree ( " softfailedeventids " ) ? ,
referencedevents : builder . open_tree ( " referencedevents " ) ? ,
roomuserdataid_accountdata : builder . open_tree ( " roomuserdataid_accountdata " ) ? ,
roomusertype_roomuserdataid : builder . open_tree ( " roomusertype_roomuserdataid " ) ? ,
mediaid_file : builder . open_tree ( " mediaid_file " ) ? ,
backupid_algorithm : builder . open_tree ( " backupid_algorithm " ) ? ,
backupid_etag : builder . open_tree ( " backupid_etag " ) ? ,
backupkeyid_backup : builder . open_tree ( " backupkeyid_backup " ) ? ,
userdevicetxnid_response : builder . open_tree ( " userdevicetxnid_response " ) ? ,
servername_educount : builder . open_tree ( " servername_educount " ) ? ,
servernameevent_data : builder . open_tree ( " servernameevent_data " ) ? ,
servercurrentevent_data : builder . open_tree ( " servercurrentevent_data " ) ? ,
id_appserviceregistrations : builder . open_tree ( " id_appserviceregistrations " ) ? ,
senderkey_pusher : builder . open_tree ( " senderkey_pusher " ) ? ,
global : builder . open_tree ( " global " ) ? ,
server_signingkeys : builder . open_tree ( " server_signingkeys " ) ? ,
cached_registrations : Arc ::new ( RwLock ::new ( HashMap ::new ( ) ) ) ,
pdu_cache : Mutex ::new ( LruCache ::new (
config
. pdu_cache_capacity
. try_into ( )
. expect ( " pdu cache capacity fits into usize " ) ,
) ) ,
auth_chain_cache : Mutex ::new ( LruCache ::new (
( 100_000.0 * config . conduit_cache_capacity_modifier ) as usize ,
) ) ,
shorteventid_cache : Mutex ::new ( LruCache ::new (
( 100_000.0 * config . conduit_cache_capacity_modifier ) as usize ,
) ) ,
eventidshort_cache : Mutex ::new ( LruCache ::new (
( 100_000.0 * config . conduit_cache_capacity_modifier ) as usize ,
) ) ,
shortstatekey_cache : Mutex ::new ( LruCache ::new (
( 100_000.0 * config . conduit_cache_capacity_modifier ) as usize ,
) ) ,
statekeyshort_cache : Mutex ::new ( LruCache ::new (
( 100_000.0 * config . conduit_cache_capacity_modifier ) as usize ,
) ) ,
our_real_users_cache : RwLock ::new ( HashMap ::new ( ) ) ,
appservice_in_room_cache : RwLock ::new ( HashMap ::new ( ) ) ,
lazy_load_waiting : Mutex ::new ( HashMap ::new ( ) ) ,
stateinfo_cache : Mutex ::new ( LruCache ::new (
( 100.0 * config . conduit_cache_capacity_modifier ) as usize ,
) ) ,
lasttimelinecount_cache : Mutex ::new ( HashMap ::new ( ) ) ,
2022-10-05 10:45:54 +00:00
} ) ;
2022-10-05 16:36:12 +00:00
let services_raw = Box ::new ( Services ::build ( Arc ::clone ( & db ) , config ) ? ) ;
2022-10-05 10:45:54 +00:00
// This is the first and only time we initialize the SERVICE static
2022-10-05 13:33:57 +00:00
* SERVICES . write ( ) . unwrap ( ) = Some ( Box ::leak ( services_raw ) ) ;
2022-10-05 10:45:54 +00:00
2022-02-03 18:52:41 +00:00
// Matrix resource ownership is based on the server name; changing it
// requires recreating the database from scratch.
2022-10-05 10:45:54 +00:00
if services ( ) . users . count ( ) ? > 0 {
2022-02-03 18:52:41 +00:00
let conduit_user =
2022-10-05 10:45:54 +00:00
UserId ::parse_with_server_name ( " conduit " , services ( ) . globals . server_name ( ) )
2022-02-03 18:52:41 +00:00
. expect ( " @conduit:server_name is valid " ) ;
2022-10-05 10:45:54 +00:00
if ! services ( ) . users . exists ( & conduit_user ) ? {
2022-02-03 18:52:41 +00:00
error! (
" The {} server user does not exist, and the database is not new. " ,
conduit_user
) ;
return Err ( Error ::bad_database (
" Cannot reuse an existing database after changing the server name, please delete the old one first. "
) ) ;
}
}
// If the database has any data, perform data migrations before starting
let latest_database_version = 11 ;
2022-10-05 10:45:54 +00:00
if services ( ) . users . count ( ) ? > 0 {
2021-07-14 07:07:08 +00:00
// MIGRATIONS
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 1 {
for ( roomserverid , _ ) in db . roomserverids . iter ( ) {
2021-07-14 07:07:08 +00:00
let mut parts = roomserverid . split ( | & b | b = = 0xff ) ;
let room_id = parts . next ( ) . expect ( " split always returns one element " ) ;
let servername = match parts . next ( ) {
Some ( s ) = > s ,
None = > {
error! ( " Migration: Invalid roomserverid in db. " ) ;
continue ;
}
} ;
let mut serverroomid = servername . to_vec ( ) ;
serverroomid . push ( 0xff ) ;
serverroomid . extend_from_slice ( room_id ) ;
2020-11-09 11:21:04 +00:00
2022-10-05 10:45:54 +00:00
db . serverroomids . insert ( & serverroomid , & [ ] ) ? ;
2021-07-14 07:07:08 +00:00
}
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 1 ) ? ;
2021-05-17 08:25:27 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 0 -> 1 finished " ) ;
2021-05-17 08:25:27 +00:00
}
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 2 {
2021-07-14 07:07:08 +00:00
// We accidentally inserted hashed versions of "" into the db instead of just ""
2022-10-05 10:45:54 +00:00
for ( userid , password ) in db . userid_password . iter ( ) {
2021-07-14 07:07:08 +00:00
let password = utils ::string_from_bytes ( & password ) ;
2021-05-17 08:25:27 +00:00
2021-07-14 07:07:08 +00:00
let empty_hashed_password = password . map_or ( false , | password | {
argon2 ::verify_encoded ( & password , b " " ) . unwrap_or ( false )
} ) ;
2021-05-17 08:25:27 +00:00
2021-07-14 07:07:08 +00:00
if empty_hashed_password {
2022-10-05 10:45:54 +00:00
db . userid_password . insert ( & userid , b " " ) ? ;
2021-07-14 07:07:08 +00:00
}
}
2021-05-30 19:55:43 +00:00
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 2 ) ? ;
2021-06-08 16:23:24 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 1 -> 2 finished " ) ;
2021-05-30 19:55:43 +00:00
}
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 3 {
2021-07-14 07:07:08 +00:00
// Move media to filesystem
2022-10-05 10:45:54 +00:00
for ( key , content ) in db . mediaid_file . iter ( ) {
2021-07-14 10:31:38 +00:00
if content . is_empty ( ) {
2021-07-14 07:07:08 +00:00
continue ;
}
2021-05-30 19:55:43 +00:00
2022-10-05 10:45:54 +00:00
let path = services ( ) . globals . get_media_file ( & key ) ;
2021-07-14 07:07:08 +00:00
let mut file = fs ::File ::create ( path ) ? ;
file . write_all ( & content ) ? ;
2022-10-05 10:45:54 +00:00
db . mediaid_file . insert ( & key , & [ ] ) ? ;
2021-06-08 16:23:24 +00:00
}
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 3 ) ? ;
2021-06-08 16:23:24 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 2 -> 3 finished " ) ;
2021-07-14 07:07:08 +00:00
}
2021-06-12 16:40:33 +00:00
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 4 {
// Add federated users to services() as deactivated
for our_user in services ( ) . users . iter ( ) {
2021-07-14 07:07:08 +00:00
let our_user = our_user ? ;
2022-10-05 10:45:54 +00:00
if services ( ) . users . is_deactivated ( & our_user ) ? {
2021-07-14 07:07:08 +00:00
continue ;
}
2022-10-05 10:45:54 +00:00
for room in services ( ) . rooms . state_cache . rooms_joined ( & our_user ) {
for user in services ( ) . rooms . state_cache . room_members ( & room ? ) {
2021-07-14 07:07:08 +00:00
let user = user ? ;
2022-10-05 10:45:54 +00:00
if user . server_name ( ) ! = services ( ) . globals . server_name ( ) {
2021-07-14 07:07:08 +00:00
println! ( " Migration: Creating user {} " , user ) ;
2022-10-05 10:45:54 +00:00
services ( ) . users . create ( & user , None ) ? ;
2021-07-14 07:07:08 +00:00
}
2021-06-12 16:40:33 +00:00
}
}
}
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 4 ) ? ;
2021-06-12 16:40:33 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 3 -> 4 finished " ) ;
2021-07-14 07:07:08 +00:00
}
2021-07-30 10:11:06 +00:00
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 5 {
2021-07-30 10:11:06 +00:00
// Upgrade user data store
2022-10-05 10:45:54 +00:00
for ( roomuserdataid , _ ) in db . roomuserdataid_accountdata . iter ( ) {
2021-07-30 10:11:06 +00:00
let mut parts = roomuserdataid . split ( | & b | b = = 0xff ) ;
let room_id = parts . next ( ) . unwrap ( ) ;
2021-07-30 16:05:26 +00:00
let user_id = parts . next ( ) . unwrap ( ) ;
2021-07-30 10:11:06 +00:00
let event_type = roomuserdataid . rsplit ( | & b | b = = 0xff ) . next ( ) . unwrap ( ) ;
let mut key = room_id . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( user_id ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( event_type ) ;
2022-10-05 10:45:54 +00:00
db . roomusertype_roomuserdataid
2021-07-30 10:11:06 +00:00
. insert ( & key , & roomuserdataid ) ? ;
}
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 5 ) ? ;
2021-07-30 10:11:06 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 4 -> 5 finished " ) ;
2021-07-30 10:11:06 +00:00
}
2021-08-04 19:15:01 +00:00
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 6 {
2021-08-04 19:15:01 +00:00
// Set room member count
2022-10-05 10:45:54 +00:00
for ( roomid , _ ) in db . roomid_shortstatehash . iter ( ) {
2021-11-26 23:30:28 +00:00
let string = utils ::string_from_bytes ( & roomid ) . unwrap ( ) ;
let room_id = < & RoomId > ::try_from ( string . as_str ( ) ) . unwrap ( ) ;
2022-10-05 10:45:54 +00:00
services ( ) . rooms . state_cache . update_joined_count ( room_id ) ? ;
2021-08-04 19:15:01 +00:00
}
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 6 ) ? ;
2021-08-04 19:15:01 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 5 -> 6 finished " ) ;
2021-08-04 19:15:01 +00:00
}
2021-08-01 13:14:54 +00:00
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 7 {
2021-08-01 13:14:54 +00:00
// Upgrade state store
2021-11-26 19:36:40 +00:00
let mut last_roomstates : HashMap < Box < RoomId > , u64 > = HashMap ::new ( ) ;
2021-08-12 21:04:00 +00:00
let mut current_sstatehash : Option < u64 > = None ;
2021-08-01 13:14:54 +00:00
let mut current_room = None ;
let mut current_state = HashSet ::new ( ) ;
let mut counter = 0 ;
2021-08-12 21:04:00 +00:00
let mut handle_state =
| current_sstatehash : u64 ,
current_room : & RoomId ,
current_state : HashSet < _ > ,
last_roomstates : & mut HashMap < _ , _ > | {
counter + = 1 ;
println! ( " counter: {} " , counter ) ;
let last_roomsstatehash = last_roomstates . get ( current_room ) ;
let states_parents = last_roomsstatehash . map_or_else (
| | Ok ( Vec ::new ( ) ) ,
| & last_roomsstatehash | {
2022-10-05 18:34:31 +00:00
services ( )
. rooms
. state_compressor
. load_shortstatehash_info ( dbg! ( last_roomsstatehash ) )
2021-08-12 21:04:00 +00:00
} ,
) ? ;
let ( statediffnew , statediffremoved ) =
if let Some ( parent_stateinfo ) = states_parents . last ( ) {
let statediffnew = current_state
. difference ( & parent_stateinfo . 1 )
2021-10-13 08:24:39 +00:00
. copied ( )
2021-08-12 21:04:00 +00:00
. collect ::< HashSet < _ > > ( ) ;
let statediffremoved = parent_stateinfo
. 1
. difference ( & current_state )
2021-10-13 08:24:39 +00:00
. copied ( )
2021-08-12 21:04:00 +00:00
. collect ::< HashSet < _ > > ( ) ;
( statediffnew , statediffremoved )
} else {
( current_state , HashSet ::new ( ) )
} ;
2022-10-05 10:45:54 +00:00
services ( ) . rooms . state_compressor . save_state_from_diff (
2021-08-12 21:04:00 +00:00
dbg! ( current_sstatehash ) ,
statediffnew ,
statediffremoved ,
2 , // every state change is 2 event changes on average
states_parents ,
) ? ;
/*
2022-10-05 10:45:54 +00:00
let mut tmp = services ( ) . rooms . load_shortstatehash_info ( & current_sstatehash ) ? ;
2021-08-12 21:04:00 +00:00
let state = tmp . pop ( ) . unwrap ( ) ;
println! (
" {} \t {}{:?}: {:?} + {:?} - {:?} " ,
current_room ,
" " . repeat ( tmp . len ( ) ) ,
utils ::u64_from_bytes ( & current_sstatehash ) . unwrap ( ) ,
tmp . last ( ) . map ( | b | utils ::u64_from_bytes ( & b . 0 ) . unwrap ( ) ) ,
state
. 2
. iter ( )
. map ( | b | utils ::u64_from_bytes ( & b [ size_of ::< u64 > ( ) .. ] ) . unwrap ( ) )
. collect ::< Vec < _ > > ( ) ,
state
. 3
. iter ( )
. map ( | b | utils ::u64_from_bytes ( & b [ size_of ::< u64 > ( ) .. ] ) . unwrap ( ) )
. collect ::< Vec < _ > > ( )
) ;
* /
Ok ::< _ , Error > ( ( ) )
} ;
2021-08-01 13:14:54 +00:00
for ( k , seventid ) in db . _db . open_tree ( " stateid_shorteventid " ) ? . iter ( ) {
2021-08-12 21:04:00 +00:00
let sstatehash = utils ::u64_from_bytes ( & k [ 0 .. size_of ::< u64 > ( ) ] )
. expect ( " number of bytes is correct " ) ;
2021-08-01 13:14:54 +00:00
let sstatekey = k [ size_of ::< u64 > ( ) .. ] . to_vec ( ) ;
2021-08-12 21:04:00 +00:00
if Some ( sstatehash ) ! = current_sstatehash {
if let Some ( current_sstatehash ) = current_sstatehash {
handle_state (
current_sstatehash ,
2021-11-26 19:36:40 +00:00
current_room . as_deref ( ) . unwrap ( ) ,
2021-08-12 21:04:00 +00:00
current_state ,
& mut last_roomstates ,
2021-08-01 13:14:54 +00:00
) ? ;
2021-08-12 21:04:00 +00:00
last_roomstates
. insert ( current_room . clone ( ) . unwrap ( ) , current_sstatehash ) ;
2021-08-01 13:14:54 +00:00
}
current_state = HashSet ::new ( ) ;
2021-08-12 21:04:00 +00:00
current_sstatehash = Some ( sstatehash ) ;
2021-08-01 13:14:54 +00:00
2022-10-05 18:34:31 +00:00
let event_id = db . shorteventid_eventid . get ( & seventid ) . unwrap ( ) . unwrap ( ) ;
2021-11-26 23:30:28 +00:00
let string = utils ::string_from_bytes ( & event_id ) . unwrap ( ) ;
let event_id = < & EventId > ::try_from ( string . as_str ( ) ) . unwrap ( ) ;
2022-10-05 18:34:31 +00:00
let pdu = services ( )
. rooms
. timeline
. get_pdu ( event_id )
. unwrap ( )
. unwrap ( ) ;
2021-08-01 13:14:54 +00:00
if Some ( & pdu . room_id ) ! = current_room . as_ref ( ) {
current_room = Some ( pdu . room_id . clone ( ) ) ;
}
}
let mut val = sstatekey ;
val . extend_from_slice ( & seventid ) ;
2021-08-12 21:04:00 +00:00
current_state . insert ( val . try_into ( ) . expect ( " size is correct " ) ) ;
}
if let Some ( current_sstatehash ) = current_sstatehash {
handle_state (
current_sstatehash ,
2021-11-26 19:36:40 +00:00
current_room . as_deref ( ) . unwrap ( ) ,
2021-08-12 21:04:00 +00:00
current_state ,
& mut last_roomstates ,
) ? ;
2021-08-01 13:14:54 +00:00
}
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 7 ) ? ;
2021-08-01 13:14:54 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 6 -> 7 finished " ) ;
2021-08-01 13:14:54 +00:00
}
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 8 {
2021-08-01 13:14:54 +00:00
// Generate short room ids for all rooms
2022-10-05 10:45:54 +00:00
for ( room_id , _ ) in db . roomid_shortstatehash . iter ( ) {
let shortroomid = services ( ) . globals . next_count ( ) ? . to_be_bytes ( ) ;
db . roomid_shortroomid . insert ( & room_id , & shortroomid ) ? ;
2022-02-03 18:52:41 +00:00
info! ( " Migration: 8 " ) ;
2021-08-01 13:14:54 +00:00
}
// Update pduids db layout
2022-10-05 10:45:54 +00:00
let mut batch = db . pduid_pdu . iter ( ) . filter_map ( | ( key , v ) | {
2021-08-02 20:32:28 +00:00
if ! key . starts_with ( b " ! " ) {
return None ;
}
2021-08-01 13:14:54 +00:00
let mut parts = key . splitn ( 2 , | & b | b = = 0xff ) ;
let room_id = parts . next ( ) . unwrap ( ) ;
let count = parts . next ( ) . unwrap ( ) ;
2021-08-02 20:32:28 +00:00
let short_room_id = db
. roomid_shortroomid
2021-09-13 17:45:56 +00:00
. get ( room_id )
2021-08-02 20:32:28 +00:00
. unwrap ( )
. expect ( " shortroomid should exist " ) ;
2021-08-01 13:14:54 +00:00
let mut new_key = short_room_id ;
new_key . extend_from_slice ( count ) ;
2021-08-02 20:32:28 +00:00
Some ( ( new_key , v ) )
} ) ;
2022-10-05 10:45:54 +00:00
db . pduid_pdu . insert_batch ( & mut batch ) ? ;
2021-08-02 20:32:28 +00:00
2022-10-05 10:45:54 +00:00
let mut batch2 = db . eventid_pduid . iter ( ) . filter_map ( | ( k , value ) | {
2021-08-12 21:04:00 +00:00
if ! value . starts_with ( b " ! " ) {
return None ;
2021-08-02 20:32:28 +00:00
}
2021-08-12 21:04:00 +00:00
let mut parts = value . splitn ( 2 , | & b | b = = 0xff ) ;
let room_id = parts . next ( ) . unwrap ( ) ;
let count = parts . next ( ) . unwrap ( ) ;
let short_room_id = db
. roomid_shortroomid
2021-09-13 17:45:56 +00:00
. get ( room_id )
2021-08-12 21:04:00 +00:00
. unwrap ( )
. expect ( " shortroomid should exist " ) ;
let mut new_value = short_room_id ;
new_value . extend_from_slice ( count ) ;
Some ( ( k , new_value ) )
} ) ;
2022-10-05 10:45:54 +00:00
db . eventid_pduid . insert_batch ( & mut batch2 ) ? ;
2021-08-01 13:14:54 +00:00
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 8 ) ? ;
2021-08-02 20:32:28 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 7 -> 8 finished " ) ;
2021-08-02 20:32:28 +00:00
}
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 9 {
2021-08-01 13:14:54 +00:00
// Update tokenids db layout
2021-08-31 19:20:03 +00:00
let mut iter = db
2021-08-21 12:24:10 +00:00
. tokenids
. iter ( )
. filter_map ( | ( key , _ ) | {
if ! key . starts_with ( b " ! " ) {
return None ;
}
let mut parts = key . splitn ( 4 , | & b | b = = 0xff ) ;
let room_id = parts . next ( ) . unwrap ( ) ;
let word = parts . next ( ) . unwrap ( ) ;
let _pdu_id_room = parts . next ( ) . unwrap ( ) ;
let pdu_id_count = parts . next ( ) . unwrap ( ) ;
2021-08-01 13:14:54 +00:00
2021-08-21 12:24:10 +00:00
let short_room_id = db
. roomid_shortroomid
2021-09-13 17:45:56 +00:00
. get ( room_id )
2021-08-21 12:24:10 +00:00
. unwrap ( )
. expect ( " shortroomid should exist " ) ;
let mut new_key = short_room_id ;
new_key . extend_from_slice ( word ) ;
new_key . push ( 0xff ) ;
new_key . extend_from_slice ( pdu_id_count ) ;
println! ( " old {:?} " , key ) ;
println! ( " new {:?} " , new_key ) ;
Some ( ( new_key , Vec ::new ( ) ) )
} )
2021-08-31 19:20:03 +00:00
. peekable ( ) ;
2021-08-02 20:32:28 +00:00
2021-08-21 12:22:21 +00:00
while iter . peek ( ) . is_some ( ) {
2022-10-05 18:34:31 +00:00
db . tokenids . insert_batch ( & mut iter . by_ref ( ) . take ( 1000 ) ) ? ;
2021-08-21 12:22:21 +00:00
println! ( " smaller batch done " ) ;
}
2021-08-02 20:32:28 +00:00
2022-02-03 18:52:41 +00:00
info! ( " Deleting starts " ) ;
2021-08-21 12:22:21 +00:00
2021-10-13 09:51:30 +00:00
let batch2 : Vec < _ > = db
2021-08-21 12:24:10 +00:00
. tokenids
. iter ( )
. filter_map ( | ( key , _ ) | {
if key . starts_with ( b " ! " ) {
println! ( " del {:?} " , key ) ;
Some ( key )
} else {
None
}
} )
2021-10-13 09:51:30 +00:00
. collect ( ) ;
2021-08-21 12:22:21 +00:00
for key in batch2 {
println! ( " del " ) ;
2022-10-05 10:45:54 +00:00
db . tokenids . remove ( & key ) ? ;
2021-08-01 13:14:54 +00:00
}
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 9 ) ? ;
2021-08-01 13:14:54 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 8 -> 9 finished " ) ;
2021-08-01 13:14:54 +00:00
}
2021-08-24 17:10:31 +00:00
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 10 {
2021-08-24 17:10:31 +00:00
// Add other direction for shortstatekeys
2022-10-05 10:45:54 +00:00
for ( statekey , shortstatekey ) in db . statekey_shortstatekey . iter ( ) {
db . shortstatekey_statekey
2021-08-24 17:10:31 +00:00
. insert ( & shortstatekey , & statekey ) ? ;
}
2021-08-25 15:40:10 +00:00
// Force E2EE device list updates so we can send them over federation
2022-10-05 10:45:54 +00:00
for user_id in services ( ) . users . iter ( ) . filter_map ( | r | r . ok ( ) ) {
2022-10-05 18:34:31 +00:00
services ( ) . users . mark_device_key_update ( & user_id ) ? ;
2021-08-25 15:40:10 +00:00
}
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 10 ) ? ;
2021-08-24 17:10:31 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 9 -> 10 finished " ) ;
2021-08-24 17:10:31 +00:00
}
2021-12-14 16:55:28 +00:00
2022-10-05 10:45:54 +00:00
if services ( ) . globals . database_version ( ) ? < 11 {
2021-12-14 16:55:28 +00:00
db . _db
. open_tree ( " userdevicesessionid_uiaarequest " ) ?
. clear ( ) ? ;
2022-10-05 10:45:54 +00:00
services ( ) . globals . bump_database_version ( 11 ) ? ;
2021-12-14 16:55:28 +00:00
2022-02-03 18:52:41 +00:00
warn! ( " Migration: 10 -> 11 finished " ) ;
2021-12-14 16:55:28 +00:00
}
2021-06-12 16:40:33 +00:00
2022-02-03 18:52:41 +00:00
assert_eq! ( 11 , latest_database_version ) ;
info! (
" Loaded {} database with version {} " ,
2022-10-05 18:34:31 +00:00
services ( ) . globals . config . database_backend ,
latest_database_version
2022-02-03 18:52:41 +00:00
) ;
} else {
2022-10-05 10:45:54 +00:00
services ( )
2022-02-03 18:52:41 +00:00
. globals
. bump_database_version ( latest_database_version ) ? ;
// Create the admin room and server user on first run
2022-10-05 10:45:54 +00:00
services ( ) . admin . create_admin_room ( ) . await ? ;
2022-02-03 18:52:41 +00:00
warn! (
" Created new {} database with version {} " ,
2022-10-05 18:34:31 +00:00
services ( ) . globals . config . database_backend ,
latest_database_version
2022-02-03 18:52:41 +00:00
) ;
}
2021-07-14 07:07:08 +00:00
2021-05-12 18:04:28 +00:00
// This data is probably outdated
2022-10-05 10:45:54 +00:00
db . presenceid_presence . clear ( ) ? ;
2021-07-14 07:07:08 +00:00
2022-10-05 10:45:54 +00:00
services ( ) . admin . start_handler ( admin_receiver ) ;
2022-04-07 12:11:55 +00:00
// Set emergency access for the conduit user
2022-10-05 10:45:54 +00:00
match set_emergency_access ( ) {
2022-04-07 12:11:55 +00:00
Ok ( pwd_set ) = > {
if pwd_set {
warn! ( " The Conduit account emergency password is set! Please unset it as soon as you finish admin account recovery! " ) ;
2022-10-05 10:45:54 +00:00
services ( ) . admin . send_message ( RoomMessageEventContent ::text_plain ( " The Conduit account emergency password is set! Please unset it as soon as you finish admin account recovery! " ) ) ;
2022-04-07 12:11:55 +00:00
}
}
Err ( e ) = > {
error! (
" Could not set the configured emergency password for the conduit user: {} " ,
e
)
}
} ;
2022-10-05 18:34:31 +00:00
services ( ) . sending . start_handler ( sending_receiver ) ;
2021-05-12 18:04:28 +00:00
2022-10-05 16:36:12 +00:00
Self ::start_cleanup_task ( ) . await ;
2021-07-14 07:07:08 +00:00
2022-10-05 10:45:54 +00:00
Ok ( ( ) )
2020-03-30 11:46:18 +00:00
}
2020-07-27 15:36:54 +00:00
2021-07-14 12:50:07 +00:00
#[ cfg(feature = " conduit_bin " ) ]
2022-10-05 10:45:54 +00:00
pub async fn on_shutdown ( ) {
2022-01-20 10:51:31 +00:00
info! ( target : " shutdown-sync " , " Received shutdown notification, notifying sync helpers... " ) ;
2022-10-05 10:45:54 +00:00
services ( ) . globals . rotate . fire ( ) ;
2021-07-14 12:50:07 +00:00
}
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(self)) ]
2021-08-02 08:13:34 +00:00
pub fn flush ( & self ) -> Result < ( ) > {
2021-07-14 07:07:08 +00:00
let start = std ::time ::Instant ::now ( ) ;
let res = self . _db . flush ( ) ;
2021-07-29 06:36:01 +00:00
debug! ( " flush: took {:?} " , start . elapsed ( ) ) ;
2021-07-14 07:07:08 +00:00
res
}
2022-10-05 16:36:12 +00:00
#[ tracing::instrument ]
pub async fn start_cleanup_task ( ) {
2021-08-01 14:59:52 +00:00
use tokio ::time ::interval ;
2021-07-15 16:09:10 +00:00
#[ cfg(unix) ]
use tokio ::signal ::unix ::{ signal , SignalKind } ;
2021-07-29 06:36:01 +00:00
use tracing ::info ;
2021-07-14 07:07:08 +00:00
2021-08-01 14:59:52 +00:00
use std ::time ::{ Duration , Instant } ;
2021-07-14 07:07:08 +00:00
2022-10-05 18:34:31 +00:00
let timer_interval =
Duration ::from_secs ( services ( ) . globals . config . cleanup_second_interval as u64 ) ;
2021-07-14 07:07:08 +00:00
tokio ::spawn ( async move {
let mut i = interval ( timer_interval ) ;
2021-07-15 16:09:10 +00:00
#[ cfg(unix) ]
2021-07-14 07:07:08 +00:00
let mut s = signal ( SignalKind ::hangup ( ) ) . unwrap ( ) ;
loop {
2021-07-15 16:09:10 +00:00
#[ cfg(unix) ]
tokio ::select! {
2021-08-01 14:59:52 +00:00
_ = i . tick ( ) = > {
2022-01-09 15:44:44 +00:00
info! ( " cleanup: Timer ticked " ) ;
2021-07-14 07:07:08 +00:00
}
_ = s . recv ( ) = > {
2022-01-09 15:44:44 +00:00
info! ( " cleanup: Received SIGHUP " ) ;
2021-07-14 07:07:08 +00:00
}
} ;
2021-07-15 16:09:10 +00:00
#[ cfg(not(unix)) ]
2021-08-01 14:59:52 +00:00
{
2021-07-15 16:09:10 +00:00
i . tick ( ) . await ;
2022-01-09 15:44:44 +00:00
info! ( " cleanup: Timer ticked " )
2021-07-15 16:09:10 +00:00
}
2021-08-01 14:59:52 +00:00
let start = Instant ::now ( ) ;
2022-10-05 13:33:57 +00:00
if let Err ( e ) = services ( ) . globals . cleanup ( ) {
2022-01-09 15:44:44 +00:00
error! ( " cleanup: Errored: {} " , e ) ;
2021-07-14 07:07:08 +00:00
} else {
2022-01-09 15:44:44 +00:00
info! ( " cleanup: Finished in {:?} " , start . elapsed ( ) ) ;
2021-07-14 07:07:08 +00:00
}
}
} ) ;
}
}
2022-04-07 12:11:55 +00:00
/// Sets the emergency password and push rules for the @conduit account in case emergency password is set
2022-10-05 10:45:54 +00:00
fn set_emergency_access ( ) -> Result < bool > {
let conduit_user = UserId ::parse_with_server_name ( " conduit " , services ( ) . globals . server_name ( ) )
2022-04-07 12:11:55 +00:00
. expect ( " @conduit:server_name is a valid UserId " ) ;
2022-10-05 18:34:31 +00:00
services ( ) . users . set_password (
& conduit_user ,
services ( ) . globals . emergency_password ( ) . as_deref ( ) ,
) ? ;
2022-04-07 12:11:55 +00:00
2022-10-05 10:45:54 +00:00
let ( ruleset , res ) = match services ( ) . globals . emergency_password ( ) {
2022-04-07 12:11:55 +00:00
Some ( _ ) = > ( Ruleset ::server_default ( & conduit_user ) , Ok ( true ) ) ,
None = > ( Ruleset ::new ( ) , Ok ( false ) ) ,
} ;
2022-10-05 10:45:54 +00:00
services ( ) . account_data . update (
2022-04-07 12:11:55 +00:00
None ,
& conduit_user ,
2022-01-18 15:53:25 +00:00
GlobalAccountDataEventType ::PushRules . to_string ( ) . into ( ) ,
2022-10-05 13:33:57 +00:00
& serde_json ::to_value ( & GlobalAccountDataEvent {
2022-04-07 12:11:55 +00:00
content : PushRulesEventContent { global : ruleset } ,
2022-10-05 18:34:31 +00:00
} )
. expect ( " to json value always works " ) ,
2022-04-07 12:11:55 +00:00
) ? ;
res
}