diff --git a/Cargo.toml b/Cargo.toml index ff1785e4..d6b045f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,9 @@ sd-notify = { version = "0.4.1", optional = true } [target.'cfg(unix)'.dependencies] nix = { version = "0.26.2", features = ["resource"] } +[dev-dependencies] +tempfile = "3.2" + [features] default = ["conduit_bin", "backend_sqlite", "backend_rocksdb", "systemd"] #backend_sled = ["sled"] diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 0a321054..4d9da37d 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -26,11 +26,16 @@ pub mod persy; ))] pub mod watchers; +pub mod json_stream_export; + +#[cfg(test)] +mod tests; + pub trait KeyValueDatabaseEngine: Send + Sync { fn open(config: &Config) -> Result where Self: Sized; - fn open_tree(&self, name: &'static str) -> Result>; + fn open_tree(&self, name: &str) -> Result>; fn flush(&self) -> Result<()>; fn cleanup(&self) -> Result<()> { Ok(()) @@ -39,6 +44,14 @@ pub trait KeyValueDatabaseEngine: Send + Sync { Ok("Current database engine does not support memory usage reporting.".to_owned()) } fn clear_caches(&self) {} + + fn export(&self, exporter: &mut dyn KvExport) -> Result<()>; +} + +pub trait KvExport { + fn start_tree(&mut self, name: &str) -> Result<()>; + fn key_value(&mut self, key: &[u8], value: &[u8]) -> Result<()>; + fn end_tree(&mut self, name: &str) -> Result<()>; } pub trait KvTree: Send + Sync { diff --git a/src/database/abstraction/heed.rs b/src/database/abstraction/heed.rs index 9cca0975..f215f6d5 100644 --- a/src/database/abstraction/heed.rs +++ b/src/database/abstraction/heed.rs @@ -9,7 +9,7 @@ use std::{ sync::{Arc, Mutex}, }; -use super::{DatabaseEngine, Tree}; +use super::{KeyValueDatabaseEngine, KvExport, KvTree}; type TupleOfBytes = (Vec, Vec); @@ -30,8 +30,8 @@ fn convert_error(error: heed::Error) -> Error { } } -impl DatabaseEngine for Engine { - fn open(config: &Config) -> Result> { +impl KeyValueDatabaseEngine for Arc { + fn open(config: &Config) -> Result { let mut env_builder = heed::EnvOpenOptions::new(); env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte env_builder.max_readers(126); @@ -49,10 +49,10 @@ impl DatabaseEngine for Engine { })) } - fn open_tree(self: &Arc, name: &'static str) -> Result> { + fn open_tree(&self, name: &str) -> Result> { // Creates the db if it doesn't exist already Ok(Arc::new(EngineTree { - engine: Arc::clone(self), + engine: self.clone(), tree: Arc::new( self.env .create_database(Some(name)) @@ -62,10 +62,24 @@ impl DatabaseEngine for Engine { })) } - fn flush(self: &Arc) -> Result<()> { + fn flush(&self) -> Result<()> { self.env.force_sync().map_err(convert_error)?; Ok(()) } + + fn export(&self, exporter: &mut dyn KvExport) -> Result<()> { + // Heed do not support snapshots + let trees: Vec = unimplemented!("heed has no way lo list trees"); + for tree_name in &trees { + exporter.start_tree(tree_name)?; + let tree = self.open_tree(tree_name)?; + for (key, value) in tree.iter() { + exporter.key_value(&key, &value)?; + } + exporter.end_tree(&tree_name)?; + } + Ok(()) + } } impl EngineTree { @@ -78,17 +92,7 @@ impl EngineTree { let (s, r) = bounded::(100); let engine = Arc::clone(&self.engine); - let lock = self.engine.iter_pool.lock().await; - if lock.active_count() < lock.max_count() { - lock.execute(move || { - iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); - }); - } else { - std::thread::spawn(move || { - iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); - }); - } - + let lock = self.engine.iter_pool.lock(); Box::new(r.into_iter()) } } @@ -123,7 +127,7 @@ fn iter_from_thread_work( } } -impl Tree for EngineTree { +impl KvTree for EngineTree { fn get(&self, key: &[u8]) -> Result>> { let txn = self.engine.env.read_txn().map_err(convert_error)?; Ok(self @@ -143,6 +147,36 @@ impl Tree for EngineTree { Ok(()) } + fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { + let mut txn = self.engine.env.write_txn().map_err(convert_error)?; + for (key, value) in iter { + self.tree + .put(&mut txn, &key.as_slice(), &value.as_slice()) + .map_err(convert_error)?; + self.watchers.wake(&key); + } + txn.commit().map_err(convert_error)?; + Ok(()) + } + + fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + let mut txn = self.engine.env.write_txn().map_err(convert_error)?; + for key in iter { + let old = self + .tree + .get(&txn, &key.as_slice()) + .map_err(convert_error)?; + let new = crate::utils::increment(old.as_deref()) + .expect("utils::increment always returns Some"); + + self.tree + .put(&mut txn, &key.as_slice(), &&*new) + .map_err(convert_error)?; + } + txn.commit().map_err(convert_error)?; + Ok(()) + } + fn remove(&self, key: &[u8]) -> Result<()> { let mut txn = self.engine.env.write_txn().map_err(convert_error)?; self.tree.delete(&mut txn, &key).map_err(convert_error)?; @@ -150,7 +184,7 @@ impl Tree for EngineTree { Ok(()) } - fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { + fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { self.iter_from(&[], false) } @@ -158,7 +192,7 @@ impl Tree for EngineTree { &self, from: &[u8], backwards: bool, - ) -> Box, Vec)> + Send> { + ) -> Box, Vec)>> { self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards) } @@ -181,7 +215,7 @@ impl Tree for EngineTree { fn scan_prefix<'a>( &'a self, prefix: Vec, - ) -> Box, Vec)> + Send + 'a> { + ) -> Box, Vec)> + 'a> { Box::new( self.iter_from(&prefix, false) .take_while(move |(key, _)| key.starts_with(&prefix)), diff --git a/src/database/abstraction/json_stream_export.rs b/src/database/abstraction/json_stream_export.rs new file mode 100644 index 00000000..a37f31e6 --- /dev/null +++ b/src/database/abstraction/json_stream_export.rs @@ -0,0 +1,77 @@ +use crate::{ + database::abstraction::{KeyValueDatabaseEngine, KvExport}, + Result, +}; +use base64::{engine::general_purpose::STANDARD, Engine}; +use serde::{Deserialize, Serialize}; +use std::io::{BufRead, BufReader, Read, Write}; + +pub trait KeyValueJsonExporter { + fn export_json_stream(&self, output: &mut dyn Write) -> Result<()>; + fn import_json_stream(&self, input: &mut dyn Read) -> Result<()>; +} + +#[derive(Serialize, Deserialize)] +struct Entry { + tree: String, + key: String, + value: String, +} + +struct JsonExporter<'a> { + current_name: String, + write: &'a mut dyn Write, +} +impl<'a> KvExport for JsonExporter<'a> { + fn start_tree(&mut self, name: &str) -> Result<()> { + self.current_name = name.to_owned(); + Ok(()) + } + + fn key_value(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + let entry = Entry { + tree: self.current_name.clone(), + key: STANDARD.encode(key), + value: STANDARD.encode(value), + }; + writeln!(self.write, "{}", serde_json::to_string(&entry).unwrap())?; + Ok(()) + } + + fn end_tree(&mut self, _name: &str) -> Result<()> { + Ok(()) + } +} + +impl KeyValueJsonExporter for T { + fn export_json_stream(&self, output: &mut dyn Write) -> Result<()> { + self.export(&mut JsonExporter { + current_name: Default::default(), + write: output, + })?; + Ok(()) + } + fn import_json_stream(&self, input: &mut dyn Read) -> Result<()> { + let bf = BufReader::new(input); + //Just a cache to avoid to reopen the tree all the times + let mut cur_tree = None; + for line in bf.lines() { + if let Ok(entry) = serde_json::from_str::(&line?) { + if let (Ok(key), Ok(value)) = + (STANDARD.decode(&entry.key), STANDARD.decode(&entry.value)) + { + let (tree, tree_name) = match cur_tree { + Some((tree, tree_name)) if tree_name == entry.tree => (tree, tree_name), + _ => { + let tree = self.open_tree(&entry.tree)?; + (tree, entry.tree.clone()) + } + }; + tree.insert(&key, &value)?; + cur_tree = Some((tree, tree_name)); + } + } + } + Ok(()) + } +} diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index 1fa7a0df..47bd1044 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -1,6 +1,6 @@ use crate::{ database::{ - abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvTree}, + abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree}, Config, }, Result, @@ -27,7 +27,7 @@ impl KeyValueDatabaseEngine for Arc { Ok(Arc::new(Engine { persy })) } - fn open_tree(&self, name: &'static str) -> Result> { + fn open_tree(&self, name: &str) -> Result> { // Create if it doesn't exist if !self.persy.exists_index(name)? { let mut tx = self.persy.begin()?; @@ -45,6 +45,22 @@ impl KeyValueDatabaseEngine for Arc { fn flush(&self) -> Result<()> { Ok(()) } + + fn export(&self, exporter: &mut dyn KvExport) -> Result<()> { + let snapshot = self.persy.snapshot()?; + let indexes = snapshot.list_indexes()?; + for (index, _) in indexes { + exporter.start_tree(&index)?; + let data = snapshot.range::(&index, ..)?; + for (key, values) in data { + for value in values { + exporter.key_value(&key, &value)?; + } + } + exporter.end_tree(&index)?; + } + Ok(()) + } } pub struct PersyTree { diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index b40c4393..dd7a929e 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -1,4 +1,4 @@ -use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree}; +use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree}; use crate::{utils, Result}; use std::{ future::Future, @@ -11,11 +11,12 @@ pub struct Engine { max_open_files: i32, cache: rocksdb::Cache, old_cfs: Vec, + database_path: String, } -pub struct RocksDbEngineTree<'a> { +pub struct RocksDbEngineTree { db: Arc, - name: &'a str, + name: String, watchers: Watchers, write_lock: RwLock<()>, } @@ -91,10 +92,11 @@ impl KeyValueDatabaseEngine for Arc { max_open_files: config.rocksdb_max_open_files, cache: rocksdb_cache, old_cfs: cfs, + database_path: config.database_path.clone(), })) } - fn open_tree(&self, name: &'static str) -> Result> { + fn open_tree(&self, name: &str) -> Result> { if !self.old_cfs.contains(&name.to_owned()) { // Create if it didn't exist let _ = self @@ -103,7 +105,7 @@ impl KeyValueDatabaseEngine for Arc { } Ok(Arc::new(RocksDbEngineTree { - name, + name: name.to_owned(), db: Arc::clone(self), watchers: Watchers::default(), write_lock: RwLock::new(()), @@ -133,16 +135,38 @@ impl KeyValueDatabaseEngine for Arc { )) } + fn export(&self, exporter: &mut dyn KvExport) -> Result<()> { + let snapshot = self.rocks.snapshot(); + let column_familes = rocksdb::DBWithThreadMode::::list_cf( + &rocksdb::Options::default(), + &self.database_path, + ) + .unwrap(); + for column_family in column_familes { + if let Some(handle) = self.rocks.cf_handle(&column_family) { + exporter.start_tree(&column_family)?; + let data = snapshot.iterator_cf(&handle, rocksdb::IteratorMode::Start); + for ele in data { + if let Ok((key, value)) = ele { + exporter.key_value(&key, &value)?; + } + } + exporter.end_tree(&column_family)?; + } + } + Ok(()) + } + fn clear_caches(&self) {} } -impl RocksDbEngineTree<'_> { +impl RocksDbEngineTree { fn cf(&self) -> Arc> { - self.db.rocks.cf_handle(self.name).unwrap() + self.db.rocks.cf_handle(&self.name).unwrap() } } -impl KvTree for RocksDbEngineTree<'_> { +impl KvTree for RocksDbEngineTree { fn get(&self, key: &[u8]) -> Result>> { Ok(self.db.rocks.get_cf(&self.cf(), key)?) } diff --git a/src/database/abstraction/sled.rs b/src/database/abstraction/sled.rs index 87defc57..454a4b83 100644 --- a/src/database/abstraction/sled.rs +++ b/src/database/abstraction/sled.rs @@ -27,6 +27,20 @@ impl DatabaseEngine for Engine { fn flush(self: &Arc) -> Result<()> { Ok(()) // noop } + + fn export(&self, exporter: &mut Box) -> Result<()> { + // Sled do not support snapshots + let indexes = self.0.tree_names(); + for index in &indexes { + exporter.start_index(index)?; + let tree = Arc::new(SledEngineTree(self.0.open_tree(name)?)); + for (key, value) in tree.iter() { + exporter.key_value(&key, &value)?; + } + exporter.end_index(&index)?; + } + Ok(()) + } } impl Tree for SledEngineTree { diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index b69efb61..a3b1dd2b 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -1,4 +1,4 @@ -use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree}; +use super::{watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree}; use crate::{database::Config, Result}; use parking_lot::{Mutex, MutexGuard}; use rusqlite::{Connection, DatabaseName::Main, OptionalExtension}; @@ -78,6 +78,14 @@ impl Engine { .pragma_update(Some(Main), "wal_checkpoint", "RESTART")?; Ok(()) } + + pub fn open_tree_impl(self: &Arc, name: &str) -> Arc { + Arc::new(SqliteTable { + engine: Arc::clone(self), + name: name.to_owned(), + watchers: Watchers::default(), + }) + } } impl KeyValueDatabaseEngine for Arc { @@ -108,11 +116,7 @@ impl KeyValueDatabaseEngine for Arc { fn open_tree(&self, name: &str) -> Result> { self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {name} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )"), [])?; - Ok(Arc::new(SqliteTable { - engine: Arc::clone(self), - name: name.to_owned(), - watchers: Watchers::default(), - })) + Ok(self.open_tree_impl(name)) } fn flush(&self) -> Result<()> { @@ -123,6 +127,27 @@ impl KeyValueDatabaseEngine for Arc { fn cleanup(&self) -> Result<()> { self.flush_wal() } + + fn export(&self, exporter: &mut dyn KvExport) -> Result<()> { + // TODO: rusqlite do not support snapshot yet, change this when they are supported + let tables: Vec = { + let guard = self.read_lock(); + guard + .prepare("SELECT name FROM sqlite_master WHERE type='table'")? + .query_map([], |row| row.get(0))? + .map(|r| r.unwrap()) + .collect() + }; + for table in &tables { + exporter.start_tree(table)?; + let tree = self.open_tree_impl(table); + for (key, value) in tree.iter() { + exporter.key_value(&key, &value)?; + } + exporter.end_tree(&table)?; + } + Ok(()) + } } pub struct SqliteTable { diff --git a/src/database/abstraction/tests.rs b/src/database/abstraction/tests.rs new file mode 100644 index 00000000..da9a6de4 --- /dev/null +++ b/src/database/abstraction/tests.rs @@ -0,0 +1,542 @@ +use crate::database::{ + abstraction::{ + json_stream_export::KeyValueJsonExporter, KeyValueDatabaseEngine, KvExport, KvTree, + }, + Config, +}; +use std::sync::Arc; +use tempfile::{Builder, TempDir}; + +fn empty_config(database_path: &str) -> Config { + use figment::providers::{Format, Toml}; + Toml::from_str(&format!( + r#" +server_name = "test" +database_path = "{}" +"#, + database_path + )) + .unwrap() +} + +fn open_instance(test_name: &str) -> (Arc, TempDir) +where + Arc: KeyValueDatabaseEngine, +{ + let db_folder = Builder::new().prefix(test_name).tempdir().unwrap(); + let config = empty_config(db_folder.path().to_str().unwrap()); + let instance = Arc::::open(&config).unwrap(); + (instance, db_folder) +} +/// Make sure to keep the reference of the tree returned values for +/// the length of the test, to avoid early cleanups that may create test issues +fn open_tree(test_name: &str) -> (Arc, impl KeyValueDatabaseEngine, TempDir) +where + Arc: KeyValueDatabaseEngine, +{ + let (instance, db_folder) = open_instance(test_name); + let tree = instance.open_tree("test").unwrap(); + (tree, instance, db_folder) +} + +fn insert_get(name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, Some(value.to_owned())); +} + +fn insert_get_replace(name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, Some(value.to_owned())); + + let value1 = "value1".as_bytes(); + tree.insert(key, value1).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, Some(value1.to_owned())); +} + +fn insert_get_remove(name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, Some(value.to_owned())); + tree.remove(key).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, None); + // Remove of not existing key should run seamless + tree.remove(key).unwrap(); +} + +fn batch_insert_get(name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); + let key = "key".as_bytes(); + let value = "value".as_bytes(); + let key1 = "key1".as_bytes(); + let value1 = "value1".as_bytes(); + let key2 = "key2".as_bytes(); + let value2 = "value2".as_bytes(); + tree.insert_batch( + &mut vec![ + (key.to_owned(), value.to_owned()), + (key1.to_owned(), value1.to_owned()), + (key2.to_owned(), value2.to_owned()), + ] + .into_iter(), + ) + .unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, Some(value.to_owned())); + let read = tree.get(key1).unwrap(); + assert_eq!(read, Some(value1.to_owned())); + let read = tree.get(key2).unwrap(); + assert_eq!(read, Some(value2.to_owned())); +} + +fn insert_iter(name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let key1 = "key1".as_bytes(); + let value1 = "value1".as_bytes(); + tree.insert(key1, value1).unwrap(); + let key2 = "key2".as_bytes(); + let value2 = "value2".as_bytes(); + tree.insert(key2, value2).unwrap(); + let mut iter = tree.iter(); + assert_eq!(iter.next(), Some((key.to_owned(), value.to_owned()))); + assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned()))); + assert_eq!(iter.next(), Some((key2.to_owned(), value2.to_owned()))); + assert_eq!(iter.next(), None); +} + +fn insert_iter_from(name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let key1 = "key1".as_bytes(); + let value1 = "value1".as_bytes(); + tree.insert(key1, value1).unwrap(); + let key2 = "key2".as_bytes(); + let value2 = "value2".as_bytes(); + tree.insert(key2, value2).unwrap(); + let mut iter = tree.iter_from(key1, false); + assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned()))); + assert_eq!(iter.next(), Some((key2.to_owned(), value2.to_owned()))); + assert_eq!(iter.next(), None); + let mut iter = tree.iter_from(key1, true); + assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned()))); + assert_eq!(iter.next(), Some((key.to_owned(), value.to_owned()))); + assert_eq!(iter.next(), None); +} + +fn insert_iter_prefix(name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let key1 = "key1".as_bytes(); + let value1 = "value1".as_bytes(); + tree.insert(key1, value1).unwrap(); + let key11 = "key11".as_bytes(); + let value11 = "value11".as_bytes(); + tree.insert(key11, value11).unwrap(); + let key2 = "key2".as_bytes(); + let value2 = "value2".as_bytes(); + tree.insert(key2, value2).unwrap(); + let mut iter = tree.scan_prefix(key1.to_owned()); + assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned()))); + assert_eq!(iter.next(), Some((key11.to_owned(), value11.to_owned()))); + assert_eq!(iter.next(), None); +} + +fn insert_clear(name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let key1 = "key1".as_bytes(); + let value1 = "value1".as_bytes(); + tree.insert(key1, value1).unwrap(); + let key2 = "key2".as_bytes(); + let value2 = "value2".as_bytes(); + tree.insert(key2, value2).unwrap(); + assert_eq!(tree.iter().count(), 3); + tree.clear().unwrap(); + assert_eq!(tree.iter().count(), 0); +} + +fn increment(name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); + let key = "key".as_bytes(); + tree.increment(key).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 1); + tree.increment(key).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2); +} + +fn increment_batch(name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); + let key = "key".as_bytes(); + let key1 = "key1".as_bytes(); + tree.increment_batch(&mut vec![key.to_owned(), key1.to_owned()].into_iter()) + .unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 1); + let read = tree.get(key1).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 1); + tree.increment_batch(&mut vec![key.to_owned(), key1.to_owned()].into_iter()) + .unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2); + let read = tree.get(key1).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2); +} + +#[derive(Default)] +struct TestBackup { + data: Vec<(String, Vec, Vec)>, + current_tree: String, +} +impl TestBackup { + fn import(&self, store: &Arc) -> crate::Result<()> + where + Arc: KeyValueDatabaseEngine, + { + for (tree, k, v) in &self.data { + let data = store.open_tree(&tree)?; + data.insert(&k, &v)?; + } + Ok(()) + } +} +impl KvExport for TestBackup { + fn start_tree(&mut self, name: &str) -> crate::Result<()> { + self.current_tree = name.to_owned(); + Ok(()) + } + + fn key_value(&mut self, key: &[u8], value: &[u8]) -> crate::Result<()> { + self.data + .push((self.current_tree.clone(), key.to_owned(), value.to_owned())); + Ok(()) + } + + fn end_tree(&mut self, _name: &str) -> crate::Result<()> { + Ok(()) + } +} + +fn insert_data(instance: &Arc, data: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let tree = instance.open_tree(data).unwrap(); + let key = format!("{}", data); + let value = "value".as_bytes(); + tree.insert(key.as_bytes(), value).unwrap(); + let key1 = format!("{}1", data); + let value1 = "value1".as_bytes(); + tree.insert(key1.as_bytes(), value1).unwrap(); + let key2 = format!("{}2", data); + let value2 = "value2".as_bytes(); + tree.insert(key2.as_bytes(), value2).unwrap(); +} + +fn check_data(instance: &Arc, data: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let tree = instance.open_tree(data).unwrap(); + let key = format!("{}", data); + let value = "value".as_bytes(); + let key1 = format!("{}1", data); + let value1 = "value1".as_bytes(); + let key2 = format!("{}2", data); + let value2 = "value2".as_bytes(); + let mut iter = tree.iter(); + assert_eq!( + iter.next(), + Some((key.as_bytes().to_owned(), value.to_owned())) + ); + assert_eq!( + iter.next(), + Some((key1.as_bytes().to_owned(), value1.to_owned())) + ); + assert_eq!( + iter.next(), + Some((key2.as_bytes().to_owned(), value2.to_owned())) + ); + assert_eq!(iter.next(), None); +} + +fn test_export_import(test_name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (instance, _db_folder) = open_instance(test_name); + insert_data(&instance, "one"); + insert_data(&instance, "two"); + let mut bk = TestBackup::default(); + instance.export(&mut bk).unwrap(); + let (instance_r, _db_folder) = open_instance(&format!("{}_restore", test_name)); + bk.import(&instance_r).unwrap(); + check_data(&instance_r, "one"); + check_data(&instance_r, "two"); +} + +fn test_export_import_json(test_name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (instance, _db_folder) = open_instance(test_name); + insert_data(&instance, "one"); + insert_data(&instance, "two"); + let mut buffer = Vec::new(); + instance.export_json_stream(&mut buffer).unwrap(); + let (instance_r, _db_folder) = open_instance(&format!("{}_restore", test_name)); + instance_r + .import_json_stream(&mut std::io::Cursor::new(buffer)) + .unwrap(); + check_data(&instance_r, "one"); + check_data(&instance_r, "two"); +} + +#[cfg(feature = "sqlite")] +mod sqlite { + + use super::*; + use crate::database::abstraction::sqlite::Engine; + + #[test] + fn sqlite_insert_get() { + insert_get::("sqlite_insert_get") + } + + #[test] + fn sqlite_insert_replace_get() { + insert_get_replace::("sqlite_insert_get_replace") + } + + #[test] + fn sqlite_insert_get_remove() { + insert_get_remove::("sqlite_insert_get_remove") + } + + #[test] + fn sqlite_batch_insert_get() { + batch_insert_get::("sqlite_batch_insert_get") + } + + #[test] + fn sqlite_insert_iter() { + insert_iter::("sqlite_insert_iter") + } + + #[test] + fn sqlite_insert_iter_from() { + insert_iter_from::("sqlite_insert_iter_from") + } + + #[test] + fn sqlite_insert_iter_prefix() { + insert_iter_prefix::("sqlite_insert_iter_prefix") + } + + #[test] + fn sqlite_insert_clear() { + insert_clear::("sqlite_insert_iter_prefix") + } + + #[test] + fn sqlite_increment() { + increment::("sqlite_increment") + } + + #[test] + fn sqlite_increment_batch() { + increment_batch::("sqlite_increment_batch") + } + #[test] + fn sqlite_export_import() { + test_export_import::("sqlite_export_import") + } + #[test] + fn sqlite_export_import_json() { + test_export_import_json::("sqlite_export_import_json") + } +} + +#[cfg(feature = "rocksdb")] +mod rocksdb { + + use super::*; + use crate::database::abstraction::rocksdb::Engine; + + #[test] + fn rocksdb_insert_get() { + insert_get::("rocksdb_insert_get") + } + + #[test] + fn rocksdb_insert_replace_get() { + insert_get_replace::("rocksdb_insert_get_replace") + } + + #[test] + fn rocksdb_insert_get_remove() { + insert_get_remove::("rocksdb_insert_get_remove") + } + + #[test] + fn rocksdb_batch_insert_get() { + batch_insert_get::("rocksdb_batch_insert_get") + } + + #[test] + fn rocksdb_insert_iter() { + insert_iter::("rocksdb_insert_iter") + } + + #[test] + fn rocksdb_insert_iter_from() { + insert_iter_from::("rocksdb_insert_iter_from") + } + + #[test] + fn rocksdb_insert_iter_prefix() { + insert_iter_prefix::("rocksdb_insert_iter_prefix") + } + + #[test] + fn rocksdb_insert_clear() { + insert_clear::("rocksdb_insert_iter_prefix") + } + + #[test] + fn rocksdb_increment() { + increment::("rocksdb_increment") + } + + #[test] + fn rocksdb_increment_batch() { + increment_batch::("rocksdb_increment_batch") + } + + #[test] + fn rocksdb_export_import() { + test_export_import::("rocksdb_export_import") + } + #[test] + fn rocksdb_export_import_json() { + test_export_import_json::("rocksdb_export_import_json") + } +} +#[cfg(feature = "persy")] +mod persy { + + use super::*; + use crate::database::abstraction::persy::Engine; + + #[test] + fn persy_insert_get() { + insert_get::("persy_insert_get") + } + + #[test] + fn persy_insert_replace_get() { + insert_get_replace::("persy_insert_get_replace") + } + + #[test] + fn persy_insert_get_remove() { + insert_get_remove::("persy_insert_get_remove") + } + + #[test] + fn persy_batch_insert_get() { + batch_insert_get::("persy_batch_insert_get") + } + + #[test] + fn persy_insert_iter() { + insert_iter::("persy_insert_iter") + } + + #[test] + fn persy_insert_iter_from() { + insert_iter_from::("persy_insert_iter_from") + } + + #[test] + fn persy_insert_iter_prefix() { + insert_iter_prefix::("persy_insert_iter_prefix") + } + + #[test] + fn persy_insert_clear() { + insert_clear::("persy_insert_iter_prefix") + } + + #[test] + fn persy_increment() { + increment::("persy_increment") + } + + #[test] + fn persy_increment_batch() { + increment_batch::("persy_increment_batch") + } + + #[test] + fn persy_export_import() { + test_export_import::("persy_export_import") + } + + #[test] + fn persy_export_import_json() { + test_export_import_json::("persy_export_import_json") + } +} diff --git a/src/utils/error.rs b/src/utils/error.rs index 6e88cf59..3627fbc0 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -151,7 +151,7 @@ impl Error { #[cfg(feature = "persy")] Self::PersyError { .. } => db_error, #[cfg(feature = "heed")] - Self::HeedError => db_error, + Self::HeedError { .. } => db_error, #[cfg(feature = "rocksdb")] Self::RocksDbError { .. } => db_error, Self::IoError { .. } => db_error,