diff --git a/Cargo.lock b/Cargo.lock index 6039d477e6..22e51f0b73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,6 +93,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "ansi_term" version = "0.12.1" @@ -797,6 +803,33 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e" +[[package]] +name = "ciborium" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c137568cc60b904a7724001b35ce2630fd00d5d84805fbb608ab89509d788f" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "346de753af073cc87b52b2083a506b38ac176a44cfb05497b622e27be899b369" + +[[package]] +name = "ciborium-ll" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213030a2b5a4e0c0892b6652260cf6ccac84827b83a85a534e178e3906c4cf1b" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clang-sys" version = "1.4.0" @@ -1060,7 +1093,10 @@ dependencies = [ "common-base", "common-error", "common-runtime", + "criterion 0.4.0", + "dashmap", "datafusion", + "rand 0.8.5", "snafu", "tokio", "tonic", @@ -1271,7 +1307,7 @@ dependencies = [ "atty", "cast", "clap 2.34.0", - "criterion-plot", + "criterion-plot 0.4.5", "csv", "itertools", "lazy_static", @@ -1288,6 +1324,32 @@ dependencies = [ "walkdir", ] +[[package]] +name = "criterion" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb" +dependencies = [ + "anes", + "atty", + "cast", + "ciborium", + "clap 3.2.22", + "criterion-plot 0.5.0", + "itertools", + "lazy_static", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + [[package]] name = "criterion-plot" version = "0.4.5" @@ -1298,6 +1360,16 @@ dependencies = [ "itertools", ] +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools", +] + [[package]] name = "crossbeam" version = "0.8.2" @@ -1440,6 +1512,19 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "datafusion" version = "7.0.0" @@ -5215,7 +5300,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-time", - "criterion", + "criterion 0.3.6", "datatypes", "futures", "futures-util", diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index 1bbd38e86c..e72abc1996 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -9,6 +9,7 @@ async-trait = "0.1" common-base = { path = "../base" } common-error = { path = "../error" } common-runtime = { path = "../runtime" } +dashmap = "5.4" datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.0", features = ["full"] } @@ -19,3 +20,11 @@ tower = "0.4" package = "arrow2" version = "0.10" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] + +[dev-dependencies] +criterion = "0.4" +rand = "0.8" + +[[bench]] +name = "bench_main" +harness = false diff --git a/src/common/grpc/benches/bench_main.rs b/src/common/grpc/benches/bench_main.rs new file mode 100644 index 0000000000..b1e7a22bf2 --- /dev/null +++ b/src/common/grpc/benches/bench_main.rs @@ -0,0 +1,7 @@ +use criterion::criterion_main; + +mod channel_manager; + +criterion_main! { + channel_manager::benches +} diff --git a/src/common/grpc/benches/channel_manager.rs b/src/common/grpc/benches/channel_manager.rs new file mode 100644 index 0000000000..c9aa17bd86 --- /dev/null +++ b/src/common/grpc/benches/channel_manager.rs @@ -0,0 +1,34 @@ +use common_grpc::channel_manager::ChannelManager; +use criterion::{criterion_group, criterion_main, Criterion}; + +#[tokio::main] +async fn do_bench_channel_manager() { + let m = ChannelManager::new(); + let task_count = 8; + let mut joins = Vec::with_capacity(task_count); + + for _ in 0..task_count { + let m_clone = m.clone(); + let join = tokio::spawn(async move { + for _ in 0..10000 { + let idx = rand::random::() % 100; + let ret = m_clone.get(format!("{}", idx)); + assert!(ret.is_ok()); + } + }); + joins.push(join); + } + + for join in joins { + let _ = join.await; + } +} + +fn bench_channel_manager(c: &mut Criterion) { + c.bench_function("bench channel manager", |b| { + b.iter(do_bench_channel_manager); + }); +} + +criterion_group!(benches, bench_channel_manager); +criterion_main!(benches); diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index 0e31ebcdfa..a209bb7b10 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -1,8 +1,10 @@ -use std::collections::HashMap; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::Mutex; use std::time::Duration; +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; use snafu::ResultExt; use tonic::transport::Channel as InnerChannel; use tonic::transport::Endpoint; @@ -17,7 +19,7 @@ const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; #[derive(Clone, Debug)] pub struct ChannelManager { config: ChannelConfig, - pool: Arc>, + pool: Arc, } impl Default for ChannelManager { @@ -32,17 +34,14 @@ impl ChannelManager { } pub fn with_config(config: ChannelConfig) -> Self { - let pool = Pool { - channels: HashMap::default(), - }; - let pool = Arc::new(Mutex::new(pool)); + let pool = Arc::new(Pool::default()); let cloned_pool = pool.clone(); - common_runtime::spawn_bg(async move { + common_runtime::spawn_bg(async { recycle_channel_in_loop(cloned_pool, RECYCLE_CHANNEL_INTERVAL_SECS).await; }); - Self { pool, config } + Self { config, pool } } pub fn config(&self) -> &ChannelConfig { @@ -51,23 +50,30 @@ impl ChannelManager { pub fn get(&self, addr: impl AsRef) -> Result { let addr = addr.as_ref(); - let mut pool = self.pool.lock().unwrap(); - if let Some(ch) = pool.get_mut(addr) { - ch.access += 1; - return Ok(ch.channel.clone()); + // It will acquire the read lock. + if let Some(inner_ch) = self.pool.get(addr) { + return Ok(inner_ch); } - let endpoint = self.build_endpoint(addr)?; + // It will acquire the write lock. + let entry = match self.pool.entry(addr.to_string()) { + Entry::Occupied(entry) => { + entry.get().increase_access(); + entry.into_ref() + } + Entry::Vacant(entry) => { + let endpoint = self.build_endpoint(addr)?; + let inner_channel = endpoint.connect_lazy(); - let inner_channel = endpoint.connect_lazy(); - let channel = Channel { - channel: inner_channel.clone(), - access: 1, - use_default_connector: true, + let channel = Channel { + channel: inner_channel, + access: AtomicUsize::new(1), + use_default_connector: true, + }; + entry.insert(channel) + } }; - pool.put(addr, channel); - - Ok(inner_channel) + Ok(entry.channel.clone()) } pub fn reset_with_connector( @@ -86,11 +92,10 @@ impl ChannelManager { let inner_channel = endpoint.connect_with_connector_lazy(connector); let channel = Channel { channel: inner_channel.clone(), - access: 1, + access: AtomicUsize::new(1), use_default_connector: false, }; - let mut pool = self.pool.lock().unwrap(); - pool.put(addr, channel); + self.pool.put(addr, channel); Ok(inner_channel) } @@ -99,8 +104,7 @@ impl ChannelManager { where F: FnMut(&String, &mut Channel) -> bool, { - let mut pool = self.pool.lock().unwrap(); - pool.retain_channel(f); + self.pool.retain_channel(f); } fn build_endpoint(&self, addr: &str) -> Result { @@ -297,39 +301,56 @@ impl ChannelConfig { #[derive(Debug)] pub struct Channel { channel: InnerChannel, - access: usize, + access: AtomicUsize, use_default_connector: bool, } impl Channel { #[inline] pub fn access(&self) -> usize { - self.access + self.access.load(Ordering::Relaxed) } #[inline] pub fn use_default_connector(&self) -> bool { self.use_default_connector } + + #[inline] + pub fn increase_access(&self) { + self.access.fetch_add(1, Ordering::Relaxed); + } } -#[derive(Debug)] + +#[derive(Debug, Default)] struct Pool { - channels: HashMap, + channels: DashMap, } impl Pool { - #[inline] - fn get_mut(&mut self, addr: &str) -> Option<&mut Channel> { - self.channels.get_mut(addr) + fn get(&self, addr: &str) -> Option { + let channel = self.channels.get(addr); + channel.map(|ch| { + ch.increase_access(); + ch.channel.clone() + }) } - #[inline] - fn put(&mut self, addr: &str, channel: Channel) { + fn entry(&self, addr: String) -> Entry { + self.channels.entry(addr) + } + + #[cfg(test)] + fn get_access(&self, addr: &str) -> Option { + let channel = self.channels.get(addr); + channel.map(|ch| ch.access()) + } + + fn put(&self, addr: &str, channel: Channel) { self.channels.insert(addr.to_string(), channel); } - #[inline] - fn retain_channel(&mut self, f: F) + fn retain_channel(&self, f: F) where F: FnMut(&String, &mut Channel) -> bool, { @@ -337,20 +358,12 @@ impl Pool { } } -async fn recycle_channel_in_loop(pool: Arc>, interval_secs: u64) { +async fn recycle_channel_in_loop(pool: Arc, interval_secs: u64) { let mut interval = tokio::time::interval(Duration::from_secs(interval_secs)); loop { interval.tick().await; - let mut pool = pool.lock().unwrap(); - pool.retain_channel(|_, c| { - if c.access == 0 { - false - } else { - c.access = 0; - true - } - }) + pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0) } } @@ -363,10 +376,7 @@ mod tests { #[should_panic] #[test] fn test_invalid_addr() { - let pool = Pool { - channels: HashMap::default(), - }; - let pool = Arc::new(Mutex::new(pool)); + let pool = Arc::new(Pool::default()); let mgr = ChannelManager { pool, ..Default::default() @@ -378,36 +388,31 @@ mod tests { #[tokio::test] async fn test_access_count() { - let pool = Pool { - channels: HashMap::default(), - }; - let pool = Arc::new(Mutex::new(pool)); + let pool = Arc::new(Pool::default()); let config = ChannelConfig::new(); - let mgr = ChannelManager { pool, config }; + let mgr = Arc::new(ChannelManager { pool, config }); let addr = "test_uri"; - for i in 0..10 { - { - let _ = mgr.get(addr).unwrap(); - let mut pool = mgr.pool.lock().unwrap(); - assert_eq!(i + 1, pool.get_mut(addr).unwrap().access); - } + let mut joins = Vec::with_capacity(10); + for _ in 0..10 { + let mgr_clone = mgr.clone(); + let join = tokio::spawn(async move { + for _ in 0..100 { + let _ = mgr_clone.get(addr); + } + }); + joins.push(join); + } + for join in joins { + join.await.unwrap(); } - let mut pool = mgr.pool.lock().unwrap(); + assert_eq!(1000, mgr.pool.get_access(addr).unwrap()); - assert_eq!(10, pool.get_mut(addr).unwrap().access); + mgr.pool + .retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0); - pool.retain_channel(|_, c| { - if c.access == 0 { - false - } else { - c.access = 0; - true - } - }); - - assert_eq!(0, pool.get_mut(addr).unwrap().access); + assert_eq!(0, mgr.pool.get_access(addr).unwrap()); } #[test] @@ -466,10 +471,7 @@ mod tests { #[test] fn test_build_endpoint() { - let pool = Pool { - channels: HashMap::default(), - }; - let pool = Arc::new(Mutex::new(pool)); + let pool = Arc::new(Pool::default()); let config = ChannelConfig::new() .timeout(Duration::from_secs(3)) .connect_timeout(Duration::from_secs(5)) @@ -493,9 +495,11 @@ mod tests { #[tokio::test] async fn test_channel_with_connector() { let pool = Pool { - channels: HashMap::default(), + channels: DashMap::default(), }; - let pool = Arc::new(Mutex::new(pool)); + + let pool = Arc::new(pool); + let config = ChannelConfig::new(); let mgr = ChannelManager { pool, config };