Merge branch 'jaegerfix' into 'next'

fix: jaeger support

See merge request famedly/conduit!437
This commit is contained in:
Timo Kösters 2022-12-18 05:52:49 +00:00
commit d963ad8cc1
9 changed files with 65 additions and 29 deletions

31
Cargo.lock generated
View file

@ -419,6 +419,7 @@ dependencies = [
"tower-http", "tower-http",
"tracing", "tracing",
"tracing-flame", "tracing-flame",
"tracing-opentelemetry",
"tracing-subscriber", "tracing-subscriber",
"trust-dns-resolver", "trust-dns-resolver",
] ]
@ -574,6 +575,19 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "dashmap"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.3.2" version = "2.3.2"
@ -1573,6 +1587,7 @@ version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22"
dependencies = [ dependencies = [
"fnv",
"futures-channel", "futures-channel",
"futures-util", "futures-util",
"indexmap", "indexmap",
@ -1590,6 +1605,8 @@ checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"crossbeam-channel", "crossbeam-channel",
"dashmap",
"fnv",
"futures-channel", "futures-channel",
"futures-executor", "futures-executor",
"futures-util", "futures-util",
@ -2891,6 +2908,20 @@ dependencies = [
"tracing-core", "tracing-core",
] ]
[[package]]
name = "tracing-opentelemetry"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de"
dependencies = [
"once_cell",
"opentelemetry",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
]
[[package]] [[package]]
name = "tracing-subscriber" name = "tracing-subscriber"
version = "0.3.16" version = "0.3.16"

View file

@ -69,6 +69,7 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
tracing-flame = "0.2.0" tracing-flame = "0.2.0"
opentelemetry = { version = "0.18.0", features = ["rt-tokio"] } opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] } opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] }
tracing-opentelemetry = "0.18.0"
lru-cache = "0.1.2" lru-cache = "0.1.2"
rusqlite = { version = "0.28.0", optional = true, features = ["bundled"] } rusqlite = { version = "0.28.0", optional = true, features = ["bundled"] }
parking_lot = { version = "0.12.1", optional = true } parking_lot = { version = "0.12.1", optional = true }

View file

@ -873,7 +873,7 @@ async fn sync_helper(
let since_state_ids = match since_shortstatehash { let since_state_ids = match since_shortstatehash {
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?, Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
None => BTreeMap::new(), None => HashMap::new(),
}; };
let left_event_id = match services().rooms.state_accessor.room_state_get_id( let left_event_id = match services().rooms.state_accessor.room_state_get_id(

View file

@ -1,7 +1,4 @@
use std::{ use std::{collections::HashMap, sync::Arc};
collections::{BTreeMap, HashMap},
sync::Arc,
};
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
use async_trait::async_trait; use async_trait::async_trait;
@ -9,7 +6,7 @@ use ruma::{events::StateEventType, EventId, RoomId};
#[async_trait] #[async_trait]
impl service::rooms::state_accessor::Data for KeyValueDatabase { impl service::rooms::state_accessor::Data for KeyValueDatabase {
async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> { async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> {
let full_state = services() let full_state = services()
.rooms .rooms
.state_compressor .state_compressor
@ -17,7 +14,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
.pop() .pop()
.expect("there is always one layer") .expect("there is always one layer")
.1; .1;
let mut result = BTreeMap::new(); let mut result = HashMap::new();
let mut i = 0; let mut i = 0;
for compressed in full_state.into_iter() { for compressed in full_state.into_iter() {
let parsed = services() let parsed = services()

View file

@ -26,7 +26,6 @@ use http::{
header::{self, HeaderName}, header::{self, HeaderName},
Method, StatusCode, Uri, Method, StatusCode, Uri,
}; };
use opentelemetry::trace::{FutureExt, Tracer};
use ruma::api::{ use ruma::api::{
client::{ client::{
error::{Error as RumaError, ErrorBody, ErrorKind}, error::{Error as RumaError, ErrorBody, ErrorKind},
@ -93,14 +92,29 @@ async fn main() {
if config.allow_jaeger { if config.allow_jaeger {
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_agent_pipeline() let tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_auto_split_batch(true)
.with_service_name("conduit")
.install_batch(opentelemetry::runtime::Tokio) .install_batch(opentelemetry::runtime::Tokio)
.unwrap(); .unwrap();
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let span = tracer.start("conduit"); let filter_layer = match EnvFilter::try_new(&config.log) {
start.with_current_context().await; Ok(s) => s,
drop(span); Err(e) => {
eprintln!(
"It looks like your log config is invalid. The following error occurred: {}",
e
);
EnvFilter::try_new("warn").unwrap()
}
};
println!("exporting"); let subscriber = tracing_subscriber::Registry::default()
.with(filter_layer)
.with(telemetry);
tracing::subscriber::set_global_default(subscriber).unwrap();
start.await;
println!("exporting remaining spans");
opentelemetry::global::shutdown_tracer_provider(); opentelemetry::global::shutdown_tracer_provider();
} else { } else {
let registry = tracing_subscriber::Registry::default(); let registry = tracing_subscriber::Registry::default();

View file

@ -15,7 +15,6 @@ pub struct Service {
} }
impl Service { impl Service {
#[tracing::instrument(skip(self))]
pub fn get_cached_eventid_authchain<'a>( pub fn get_cached_eventid_authchain<'a>(
&'a self, &'a self,
key: &[u64], key: &[u64],

View file

@ -7,7 +7,7 @@ use ruma::{
RoomVersionId, RoomVersionId,
}; };
use std::{ use std::{
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet}, collections::{hash_map, BTreeMap, HashMap, HashSet},
pin::Pin, pin::Pin,
sync::{Arc, RwLock, RwLockWriteGuard}, sync::{Arc, RwLock, RwLockWriteGuard},
time::{Duration, Instant, SystemTime}, time::{Duration, Instant, SystemTime},
@ -553,7 +553,7 @@ impl Service {
let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len()); let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len());
for (sstatehash, prev_event) in extremity_sstatehashes { for (sstatehash, prev_event) in extremity_sstatehashes {
let mut leaf_state: BTreeMap<_, _> = services() let mut leaf_state: HashMap<_, _> = services()
.rooms .rooms
.state_accessor .state_accessor
.state_full_ids(sstatehash) .state_full_ids(sstatehash)
@ -660,7 +660,7 @@ impl Service {
) )
.await; .await;
let mut state: BTreeMap<_, Arc<EventId>> = BTreeMap::new(); let mut state: HashMap<_, Arc<EventId>> = HashMap::new();
for (pdu, _) in state_vec { for (pdu, _) in state_vec {
let state_key = pdu.state_key.clone().ok_or_else(|| { let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Found non-state pdu in state events.") Error::bad_database("Found non-state pdu in state events.")
@ -672,10 +672,10 @@ impl Service {
)?; )?;
match state.entry(shortstatekey) { match state.entry(shortstatekey) {
btree_map::Entry::Vacant(v) => { hash_map::Entry::Vacant(v) => {
v.insert(Arc::from(&*pdu.event_id)); v.insert(Arc::from(&*pdu.event_id));
} }
btree_map::Entry::Occupied(_) => return Err( hash_map::Entry::Occupied(_) => return Err(
Error::bad_database("State event's type and state_key combination exists multiple times."), Error::bad_database("State event's type and state_key combination exists multiple times."),
), ),
} }

View file

@ -1,7 +1,4 @@
use std::{ use std::{collections::HashMap, sync::Arc};
collections::{BTreeMap, HashMap},
sync::Arc,
};
use async_trait::async_trait; use async_trait::async_trait;
use ruma::{events::StateEventType, EventId, RoomId}; use ruma::{events::StateEventType, EventId, RoomId};
@ -12,7 +9,7 @@ use crate::{PduEvent, Result};
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
/// Builds a StateMap by iterating over all keys that start /// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash. /// with state_hash, this gives the full state for the given state_hash.
async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>>; async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>>;
async fn state_full( async fn state_full(
&self, &self,

View file

@ -1,8 +1,5 @@
mod data; mod data;
use std::{ use std::{collections::HashMap, sync::Arc};
collections::{BTreeMap, HashMap},
sync::Arc,
};
pub use data::Data; pub use data::Data;
use ruma::{events::StateEventType, EventId, RoomId}; use ruma::{events::StateEventType, EventId, RoomId};
@ -16,7 +13,8 @@ pub struct Service {
impl Service { impl Service {
/// Builds a StateMap by iterating over all keys that start /// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash. /// with state_hash, this gives the full state for the given state_hash.
pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> { #[tracing::instrument(skip(self))]
pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> {
self.db.state_full_ids(shortstatehash).await self.db.state_full_ids(shortstatehash).await
} }
@ -39,7 +37,6 @@ impl Service {
} }
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
#[tracing::instrument(skip(self))]
pub fn state_get( pub fn state_get(
&self, &self,
shortstatehash: u64, shortstatehash: u64,