refactor: optimize channel_manager (#401)

* refactor: use dashmap in channel manager

* add benchmark for channel manager

* access field in channel use AtomicUsize

* cr
This commit is contained in:
fys
2022-11-07 16:09:01 +08:00
committed by GitHub
parent 6e99bb8490
commit c2e1b0857c
5 changed files with 222 additions and 83 deletions

89
Cargo.lock generated
View File

@@ -93,6 +93,12 @@ dependencies = [
"libc",
]
[[package]]
name = "anes"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "ansi_term"
version = "0.12.1"
@@ -797,6 +803,33 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e"
[[package]]
name = "ciborium"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0c137568cc60b904a7724001b35ce2630fd00d5d84805fbb608ab89509d788f"
dependencies = [
"ciborium-io",
"ciborium-ll",
"serde",
]
[[package]]
name = "ciborium-io"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "346de753af073cc87b52b2083a506b38ac176a44cfb05497b622e27be899b369"
[[package]]
name = "ciborium-ll"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213030a2b5a4e0c0892b6652260cf6ccac84827b83a85a534e178e3906c4cf1b"
dependencies = [
"ciborium-io",
"half",
]
[[package]]
name = "clang-sys"
version = "1.4.0"
@@ -1060,7 +1093,10 @@ dependencies = [
"common-base",
"common-error",
"common-runtime",
"criterion 0.4.0",
"dashmap",
"datafusion",
"rand 0.8.5",
"snafu",
"tokio",
"tonic",
@@ -1271,7 +1307,7 @@ dependencies = [
"atty",
"cast",
"clap 2.34.0",
"criterion-plot",
"criterion-plot 0.4.5",
"csv",
"itertools",
"lazy_static",
@@ -1288,6 +1324,32 @@ dependencies = [
"walkdir",
]
[[package]]
name = "criterion"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb"
dependencies = [
"anes",
"atty",
"cast",
"ciborium",
"clap 3.2.22",
"criterion-plot 0.5.0",
"itertools",
"lazy_static",
"num-traits",
"oorandom",
"plotters",
"rayon",
"regex",
"serde",
"serde_derive",
"serde_json",
"tinytemplate",
"walkdir",
]
[[package]]
name = "criterion-plot"
version = "0.4.5"
@@ -1298,6 +1360,16 @@ dependencies = [
"itertools",
]
[[package]]
name = "criterion-plot"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1"
dependencies = [
"cast",
"itertools",
]
[[package]]
name = "crossbeam"
version = "0.8.2"
@@ -1440,6 +1512,19 @@ dependencies = [
"syn",
]
[[package]]
name = "dashmap"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "datafusion"
version = "7.0.0"
@@ -5215,7 +5300,7 @@ dependencies = [
"common-runtime",
"common-telemetry",
"common-time",
"criterion",
"criterion 0.3.6",
"datatypes",
"futures",
"futures-util",

View File

@@ -9,6 +9,7 @@ async-trait = "0.1"
common-base = { path = "../base" }
common-error = { path = "../error" }
common-runtime = { path = "../runtime" }
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }
@@ -19,3 +20,11 @@ tower = "0.4"
package = "arrow2"
version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dev-dependencies]
criterion = "0.4"
rand = "0.8"
[[bench]]
name = "bench_main"
harness = false

View File

@@ -0,0 +1,7 @@
use criterion::criterion_main;
mod channel_manager;
criterion_main! {
channel_manager::benches
}

View File

@@ -0,0 +1,34 @@
use common_grpc::channel_manager::ChannelManager;
use criterion::{criterion_group, criterion_main, Criterion};
#[tokio::main]
async fn do_bench_channel_manager() {
let m = ChannelManager::new();
let task_count = 8;
let mut joins = Vec::with_capacity(task_count);
for _ in 0..task_count {
let m_clone = m.clone();
let join = tokio::spawn(async move {
for _ in 0..10000 {
let idx = rand::random::<usize>() % 100;
let ret = m_clone.get(format!("{}", idx));
assert!(ret.is_ok());
}
});
joins.push(join);
}
for join in joins {
let _ = join.await;
}
}
fn bench_channel_manager(c: &mut Criterion) {
c.bench_function("bench channel manager", |b| {
b.iter(do_bench_channel_manager);
});
}
criterion_group!(benches, bench_channel_manager);
criterion_main!(benches);

View File

@@ -1,8 +1,10 @@
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use snafu::ResultExt;
use tonic::transport::Channel as InnerChannel;
use tonic::transport::Endpoint;
@@ -17,7 +19,7 @@ const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
#[derive(Clone, Debug)]
pub struct ChannelManager {
config: ChannelConfig,
pool: Arc<Mutex<Pool>>,
pool: Arc<Pool>,
}
impl Default for ChannelManager {
@@ -32,17 +34,14 @@ impl ChannelManager {
}
pub fn with_config(config: ChannelConfig) -> Self {
let pool = Pool {
channels: HashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let pool = Arc::new(Pool::default());
let cloned_pool = pool.clone();
common_runtime::spawn_bg(async move {
common_runtime::spawn_bg(async {
recycle_channel_in_loop(cloned_pool, RECYCLE_CHANNEL_INTERVAL_SECS).await;
});
Self { pool, config }
Self { config, pool }
}
pub fn config(&self) -> &ChannelConfig {
@@ -51,23 +50,30 @@ impl ChannelManager {
pub fn get(&self, addr: impl AsRef<str>) -> Result<InnerChannel> {
let addr = addr.as_ref();
let mut pool = self.pool.lock().unwrap();
if let Some(ch) = pool.get_mut(addr) {
ch.access += 1;
return Ok(ch.channel.clone());
// It will acquire the read lock.
if let Some(inner_ch) = self.pool.get(addr) {
return Ok(inner_ch);
}
let endpoint = self.build_endpoint(addr)?;
// It will acquire the write lock.
let entry = match self.pool.entry(addr.to_string()) {
Entry::Occupied(entry) => {
entry.get().increase_access();
entry.into_ref()
}
Entry::Vacant(entry) => {
let endpoint = self.build_endpoint(addr)?;
let inner_channel = endpoint.connect_lazy();
let inner_channel = endpoint.connect_lazy();
let channel = Channel {
channel: inner_channel.clone(),
access: 1,
use_default_connector: true,
let channel = Channel {
channel: inner_channel,
access: AtomicUsize::new(1),
use_default_connector: true,
};
entry.insert(channel)
}
};
pool.put(addr, channel);
Ok(inner_channel)
Ok(entry.channel.clone())
}
pub fn reset_with_connector<C>(
@@ -86,11 +92,10 @@ impl ChannelManager {
let inner_channel = endpoint.connect_with_connector_lazy(connector);
let channel = Channel {
channel: inner_channel.clone(),
access: 1,
access: AtomicUsize::new(1),
use_default_connector: false,
};
let mut pool = self.pool.lock().unwrap();
pool.put(addr, channel);
self.pool.put(addr, channel);
Ok(inner_channel)
}
@@ -99,8 +104,7 @@ impl ChannelManager {
where
F: FnMut(&String, &mut Channel) -> bool,
{
let mut pool = self.pool.lock().unwrap();
pool.retain_channel(f);
self.pool.retain_channel(f);
}
fn build_endpoint(&self, addr: &str) -> Result<Endpoint> {
@@ -297,39 +301,56 @@ impl ChannelConfig {
#[derive(Debug)]
pub struct Channel {
channel: InnerChannel,
access: usize,
access: AtomicUsize,
use_default_connector: bool,
}
impl Channel {
#[inline]
pub fn access(&self) -> usize {
self.access
self.access.load(Ordering::Relaxed)
}
#[inline]
pub fn use_default_connector(&self) -> bool {
self.use_default_connector
}
#[inline]
pub fn increase_access(&self) {
self.access.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
struct Pool {
channels: HashMap<String, Channel>,
channels: DashMap<String, Channel>,
}
impl Pool {
#[inline]
fn get_mut(&mut self, addr: &str) -> Option<&mut Channel> {
self.channels.get_mut(addr)
fn get(&self, addr: &str) -> Option<InnerChannel> {
let channel = self.channels.get(addr);
channel.map(|ch| {
ch.increase_access();
ch.channel.clone()
})
}
#[inline]
fn put(&mut self, addr: &str, channel: Channel) {
fn entry(&self, addr: String) -> Entry<String, Channel> {
self.channels.entry(addr)
}
#[cfg(test)]
fn get_access(&self, addr: &str) -> Option<usize> {
let channel = self.channels.get(addr);
channel.map(|ch| ch.access())
}
fn put(&self, addr: &str, channel: Channel) {
self.channels.insert(addr.to_string(), channel);
}
#[inline]
fn retain_channel<F>(&mut self, f: F)
fn retain_channel<F>(&self, f: F)
where
F: FnMut(&String, &mut Channel) -> bool,
{
@@ -337,20 +358,12 @@ impl Pool {
}
}
async fn recycle_channel_in_loop(pool: Arc<Mutex<Pool>>, interval_secs: u64) {
async fn recycle_channel_in_loop(pool: Arc<Pool>, interval_secs: u64) {
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
loop {
interval.tick().await;
let mut pool = pool.lock().unwrap();
pool.retain_channel(|_, c| {
if c.access == 0 {
false
} else {
c.access = 0;
true
}
})
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
}
}
@@ -363,10 +376,7 @@ mod tests {
#[should_panic]
#[test]
fn test_invalid_addr() {
let pool = Pool {
channels: HashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let pool = Arc::new(Pool::default());
let mgr = ChannelManager {
pool,
..Default::default()
@@ -378,36 +388,31 @@ mod tests {
#[tokio::test]
async fn test_access_count() {
let pool = Pool {
channels: HashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let pool = Arc::new(Pool::default());
let config = ChannelConfig::new();
let mgr = ChannelManager { pool, config };
let mgr = Arc::new(ChannelManager { pool, config });
let addr = "test_uri";
for i in 0..10 {
{
let _ = mgr.get(addr).unwrap();
let mut pool = mgr.pool.lock().unwrap();
assert_eq!(i + 1, pool.get_mut(addr).unwrap().access);
}
let mut joins = Vec::with_capacity(10);
for _ in 0..10 {
let mgr_clone = mgr.clone();
let join = tokio::spawn(async move {
for _ in 0..100 {
let _ = mgr_clone.get(addr);
}
});
joins.push(join);
}
for join in joins {
join.await.unwrap();
}
let mut pool = mgr.pool.lock().unwrap();
assert_eq!(1000, mgr.pool.get_access(addr).unwrap());
assert_eq!(10, pool.get_mut(addr).unwrap().access);
mgr.pool
.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0);
pool.retain_channel(|_, c| {
if c.access == 0 {
false
} else {
c.access = 0;
true
}
});
assert_eq!(0, pool.get_mut(addr).unwrap().access);
assert_eq!(0, mgr.pool.get_access(addr).unwrap());
}
#[test]
@@ -466,10 +471,7 @@ mod tests {
#[test]
fn test_build_endpoint() {
let pool = Pool {
channels: HashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let pool = Arc::new(Pool::default());
let config = ChannelConfig::new()
.timeout(Duration::from_secs(3))
.connect_timeout(Duration::from_secs(5))
@@ -493,9 +495,11 @@ mod tests {
#[tokio::test]
async fn test_channel_with_connector() {
let pool = Pool {
channels: HashMap::default(),
channels: DashMap::default(),
};
let pool = Arc::new(Mutex::new(pool));
let pool = Arc::new(pool);
let config = ChannelConfig::new();
let mgr = ChannelManager { pool, config };