Draft: Work in progres for backup and restore data from running conduit instance #971

Open
tglman wants to merge 12 commits from WIP_backup into next
6 changed files with 132 additions and 21 deletions
Showing only changes of commit 08cde527ac - Show all commits

View file

@ -33,7 +33,7 @@ pub trait KeyValueDatabaseEngine: Send + Sync {
fn open(config: &Config) -> Result<Self>
where
Self: Sized;
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>>;
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>>;
fn flush(&self) -> Result<()>;
fn cleanup(&self) -> Result<()> {
Ok(())
@ -43,9 +43,7 @@ pub trait KeyValueDatabaseEngine: Send + Sync {
}
fn clear_caches(&self) {}
fn export(&self, _exporter: &mut Box<dyn KvExport>) -> Result<()> {
unimplemented!()
}
fn export(&self, exporter: &mut dyn KvExport) -> Result<()>;
}
pub trait KvExport {

View file

@ -49,7 +49,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
}))
}
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
// Creates the db if it doesn't exist already
Ok(Arc::new(EngineTree {
engine: self.clone(),
@ -67,7 +67,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
Ok(())
}
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
fn export(&self, exporter: &mut dyn KvExport) -> Result<()> {
// Heed do not support snapshots
let trees: Vec<String> = unimplemented!("heed has no way lo list trees");
for tree_name in &trees {

View file

@ -27,7 +27,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
Ok(Arc::new(Engine { persy }))
}
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
// Create if it doesn't exist
if !self.persy.exists_index(name)? {
let mut tx = self.persy.begin()?;
@ -46,7 +46,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
Ok(())
}
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
fn export(&self, exporter: &mut dyn KvExport) -> Result<()> {
let snapshot = self.persy.snapshot()?;
let indexes = snapshot.list_indexes()?;
for (index, _) in indexes {

View file

@ -14,9 +14,9 @@ pub struct Engine {
database_path: String,
}
pub struct RocksDbEngineTree<'a> {
pub struct RocksDbEngineTree {
db: Arc<Engine>,
name: &'a str,
name: String,
watchers: Watchers,
write_lock: RwLock<()>,
}
@ -96,7 +96,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
}))
}
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
if !self.old_cfs.contains(&name.to_owned()) {
// Create if it didn't exist
let _ = self
@ -105,7 +105,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
}
Ok(Arc::new(RocksDbEngineTree {
name,
name: name.to_owned(),
db: Arc::clone(self),
watchers: Watchers::default(),
write_lock: RwLock::new(()),
@ -135,7 +135,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
))
}
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
fn export(&self, exporter: &mut dyn KvExport) -> Result<()> {
let snapshot = self.rocks.snapshot();
let column_familes = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
&rocksdb::Options::default(),
@ -160,13 +160,13 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
fn clear_caches(&self) {}
}
impl RocksDbEngineTree<'_> {
impl RocksDbEngineTree {
fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
self.db.rocks.cf_handle(self.name).unwrap()
self.db.rocks.cf_handle(&self.name).unwrap()
}
}
impl KvTree for RocksDbEngineTree<'_> {
impl KvTree for RocksDbEngineTree {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.db.rocks.get_cf(&self.cf(), key)?)
}

View file

@ -128,7 +128,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
self.flush_wal()
}
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
fn export(&self, exporter: &mut dyn KvExport) -> Result<()> {
// TODO: rusqlite do not support snapshot yet, change this when they are supported
let tables: Vec<String> = {
let guard = self.read_lock();

View file

@ -1,5 +1,5 @@
use crate::database::{
abstraction::{KeyValueDatabaseEngine, KvTree},
abstraction::{KeyValueDatabaseEngine, KvExport, KvTree},
Config,
};
use std::sync::Arc;
@ -17,15 +17,22 @@ database_path = "{}"
.unwrap()
}
/// Make sure to keep the reference of the tree returned values for
/// the length of the test, to avoid early cleanups that may create test issues
fn open_tree<T>(test_name: &str) -> (Arc<dyn KvTree>, impl KeyValueDatabaseEngine, TempDir)
fn open_instance<T>(test_name: &str) -> (Arc<T>, TempDir)
where
Arc<T>: KeyValueDatabaseEngine,
{
let db_folder = Builder::new().prefix(test_name).tempdir().unwrap();
let config = empty_config(db_folder.path().to_str().unwrap());
let instance = Arc::<T>::open(&config).unwrap();
(instance, db_folder)
}
/// Make sure to keep the reference of the tree returned values for
/// the length of the test, to avoid early cleanups that may create test issues
fn open_tree<T>(test_name: &str) -> (Arc<dyn KvTree>, impl KeyValueDatabaseEngine, TempDir)
where
Arc<T>: KeyValueDatabaseEngine,
{
let (instance, db_folder) = open_instance(test_name);
let tree = instance.open_tree("test").unwrap();
(tree, instance, db_folder)
}
@ -226,6 +233,98 @@ where
assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2);
}
#[derive(Default)]
struct TestBackup {
data: Vec<(String, Vec<u8>, Vec<u8>)>,
current_tree: String,
}
impl TestBackup {
fn import<T>(&self, store: &Arc<T>) -> crate::Result<()>
where
Arc<T>: KeyValueDatabaseEngine,
{
for (tree, k, v) in &self.data {
let data = store.open_tree(&tree)?;
data.insert(&k, &v)?;
}
Ok(())
}
}
impl KvExport for TestBackup {
fn start_tree(&mut self, name: &str) -> crate::Result<()> {
self.current_tree = name.to_owned();
Ok(())
}
fn key_value(&mut self, key: &[u8], value: &[u8]) -> crate::Result<()> {
self.data
.push((self.current_tree.clone(), key.to_owned(), value.to_owned()));
Ok(())
}
fn end_tree(&mut self, _name: &str) -> crate::Result<()> {
Ok(())
}
}
fn insert_data<TT>(instance: &Arc<TT>, data: &str)
where
Arc<TT>: KeyValueDatabaseEngine,
{
let tree = instance.open_tree(data).unwrap();
let key = format!("{}", data);
let value = "value".as_bytes();
tree.insert(key.as_bytes(), value).unwrap();
let key1 = format!("{}1", data);
let value1 = "value1".as_bytes();
tree.insert(key1.as_bytes(), value1).unwrap();
let key2 = format!("{}2", data);
let value2 = "value2".as_bytes();
tree.insert(key2.as_bytes(), value2).unwrap();
}
fn check_data<TT>(instance: &Arc<TT>, data: &str)
where
Arc<TT>: KeyValueDatabaseEngine,
{
let tree = instance.open_tree(data).unwrap();
let key = format!("{}", data);
let value = "value".as_bytes();
let key1 = format!("{}1", data);
let value1 = "value1".as_bytes();
let key2 = format!("{}2", data);
let value2 = "value2".as_bytes();
let mut iter = tree.iter();
assert_eq!(
iter.next(),
Some((key.as_bytes().to_owned(), value.to_owned()))
);
assert_eq!(
iter.next(),
Some((key1.as_bytes().to_owned(), value1.to_owned()))
);
assert_eq!(
iter.next(),
Some((key2.as_bytes().to_owned(), value2.to_owned()))
);
assert_eq!(iter.next(), None);
}
fn test_export_import<T>(test_name: &str)
where
Arc<T>: KeyValueDatabaseEngine,
{
let (instance, _db_folder) = open_instance(test_name);
insert_data(&instance, "one");
insert_data(&instance, "two");
let mut bk = TestBackup::default();
instance.export(&mut bk).unwrap();
let (instance_r, _db_folder) = open_instance(&format!("{}_restore", test_name));
bk.import(&instance_r).unwrap();
check_data(&instance_r, "one");
check_data(&instance_r, "two");
}
#[cfg(feature = "sqlite")]
mod sqlite {
@ -281,6 +380,10 @@ mod sqlite {
fn sqlite_increment_batch() {
increment_batch::<Engine>("sqlite_increment_batch")
}
#[test]
fn sqlite_export_import() {
test_export_import::<Engine>("sqlite_export_import")
}
}
#[cfg(feature = "rocksdb")]
@ -338,6 +441,11 @@ mod rocksdb {
fn rocksdb_increment_batch() {
increment_batch::<Engine>("rocksdb_increment_batch")
}
#[test]
fn rocksdb_export_import() {
test_export_import::<Engine>("rocksdb_export_import")
}
}
#[cfg(feature = "persy")]
mod persy {
@ -394,4 +502,9 @@ mod persy {
fn persy_increment_batch() {
increment_batch::<Engine>("persy_increment_batch")
}
#[test]
fn persy_export_import() {
test_export_import::<Engine>("persy_export_import")
}
}