diff --git a/Cargo.lock b/Cargo.lock index 80fd2372a6..a9c387b2d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3895,6 +3895,7 @@ dependencies = [ "common-telemetry", "common-time", "dashmap", + "derive_builder 0.12.0", "etcd-client", "futures", "h2", diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 3b63781b7c..82b2c8fdac 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -20,6 +20,7 @@ common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } dashmap = "5.4" +derive_builder = "0.12" etcd-client = "0.10" futures.workspace = true h2 = "0.3" diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 243512e168..eb2d285ae9 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -25,7 +25,7 @@ use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::server::Router; -use crate::cluster::MetaPeerClient; +use crate::cluster::MetaPeerClientBuilder; use crate::election::etcd::EtcdElection; use crate::lock::etcd::EtcdLock; use crate::metasrv::builder::MetaSrvBuilder; @@ -91,7 +91,13 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { }; let in_memory = Arc::new(MemStore::default()) as ResetableKvStoreRef; - let meta_peer_client = MetaPeerClient::new(in_memory.clone(), election.clone()); + + let meta_peer_client = MetaPeerClientBuilder::default() + .election(election.clone()) + .in_memory(in_memory.clone()) + .build() + // Safety: all required fields set at initialization + .unwrap(); let selector = match opts.selector { SelectorType::LoadBased => Arc::new(LoadBasedSelector { diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index a8934d8110..f9b1b797c3 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -13,52 +13,93 @@ // limitations under the License. use std::collections::HashMap; +use std::time::Duration; use api::v1::meta::cluster_client::ClusterClient; use api::v1::meta::{ BatchGetRequest, BatchGetResponse, KeyValue, RangeRequest, RangeResponse, ResponseHeader, }; use common_grpc::channel_manager::ChannelManager; +use common_telemetry::warn; +use derive_builder::Builder; use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::Result; +use crate::error::{match_for_io_error, Result}; use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX}; use crate::metasrv::ElectionRef; use crate::service::store::ext::KvStoreExt; use crate::service::store::kv::ResetableKvStoreRef; use crate::{error, util}; -#[derive(Clone)] +#[derive(Builder, Clone)] pub struct MetaPeerClient { election: Option, in_memory: ResetableKvStoreRef, + #[builder(default = "ChannelManager::default()")] channel_manager: ChannelManager, + #[builder(default = "3")] + max_retry_count: usize, + #[builder(default = "1000")] + retry_interval_ms: u64, } impl MetaPeerClient { - pub fn new(in_mem: ResetableKvStoreRef, election: Option) -> Self { - Self { - election, - in_memory: in_mem, - channel_manager: ChannelManager::default(), - } - } - // Get all datanode stat kvs from leader meta. pub async fn get_all_dn_stat_kvs(&self) -> Result> { - let stat_prefix = format!("{DN_STAT_PREFIX}-").into_bytes(); - let range_end = util::get_prefix_end_key(&stat_prefix); - let req = RangeRequest { - key: stat_prefix.clone(), - range_end, - ..Default::default() - }; + let key = format!("{DN_STAT_PREFIX}-").into_bytes(); + let range_end = util::get_prefix_end_key(&key); + let kvs = self.range(key, range_end).await?; + + to_stat_kv_map(kvs) + } + + // Get datanode stat kvs from leader meta by input keys. + pub async fn get_dn_stat_kvs(&self, keys: Vec) -> Result> { + let stat_keys = keys.into_iter().map(|key| key.into()).collect(); + + let kvs = self.batch_get(stat_keys).await?; + + to_stat_kv_map(kvs) + } + + // Range kv information from the leader's in_mem kv store + pub async fn range(&self, key: Vec, range_end: Vec) -> Result> { if self.is_leader() { - let kvs = self.in_memory.range(req).await?.kvs; - return to_stat_kv_map(kvs); + let request = RangeRequest { + key, + range_end, + ..Default::default() + }; + + return self.in_memory.range(request).await.map(|resp| resp.kvs); } + let max_retry_count = self.max_retry_count; + let retry_interval_ms = self.retry_interval_ms; + + for _ in 0..max_retry_count { + match self.remote_range(key.clone(), range_end.clone()).await { + Ok(kvs) => return Ok(kvs), + Err(e) => { + if need_retry(&e) { + warn!("Encountered an error that need to retry, err: {:?}", e); + tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await; + } else { + return Err(e); + } + } + } + } + + error::ExceededRetryLimitSnafu { + func_name: "range", + retry_num: max_retry_count, + } + .fail() + } + + async fn remote_range(&self, key: Vec, range_end: Vec) -> Result> { // Safety: when self.is_leader() == false, election must not empty. let election = self.election.as_ref().unwrap(); @@ -69,39 +110,54 @@ impl MetaPeerClient { .get(&leader_addr) .context(error::CreateChannelSnafu)?; - let request = tonic::Request::new(req); + let request = tonic::Request::new(RangeRequest { + key, + range_end, + ..Default::default() + }); let response: RangeResponse = ClusterClient::new(channel) .range(request) .await - .context(error::BatchGetSnafu)? + .context(error::RangeSnafu)? .into_inner(); check_resp_header(&response.header, Context { addr: &leader_addr })?; - to_stat_kv_map(response.kvs) - } - - // Get datanode stat kvs from leader meta by input keys. - pub async fn get_dn_stat_kvs(&self, keys: Vec) -> Result> { - let stat_keys = keys.into_iter().map(|key| key.into()).collect(); - let stat_kvs = self.batch_get(stat_keys).await?; - - let mut result = HashMap::with_capacity(stat_kvs.len()); - for stat_kv in stat_kvs { - let stat_key = stat_kv.key.try_into()?; - let stat_val = stat_kv.value.try_into()?; - result.insert(stat_key, stat_val); - } - Ok(result) + Ok(response.kvs) } // Get kv information from the leader's in_mem kv store - async fn batch_get(&self, keys: Vec>) -> Result> { + pub async fn batch_get(&self, keys: Vec>) -> Result> { if self.is_leader() { return self.in_memory.batch_get(keys).await; } + let max_retry_count = self.max_retry_count; + let retry_interval_ms = self.retry_interval_ms; + + for _ in 0..max_retry_count { + match self.remote_batch_get(keys.clone()).await { + Ok(kvs) => return Ok(kvs), + Err(e) => { + if need_retry(&e) { + warn!("Encountered an error that need to retry, err: {:?}", e); + tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await; + } else { + return Err(e); + } + } + } + } + + error::ExceededRetryLimitSnafu { + func_name: "batch_get", + retry_num: max_retry_count, + } + .fail() + } + + async fn remote_batch_get(&self, keys: Vec>) -> Result> { // Safety: when self.is_leader() == false, election must not empty. let election = self.election.as_ref().unwrap(); @@ -113,11 +169,11 @@ impl MetaPeerClient { .context(error::CreateChannelSnafu)?; let request = tonic::Request::new(BatchGetRequest { - keys: keys.clone(), + keys, ..Default::default() }); - let response: BatchGetResponse = ClusterClient::new(channel.clone()) + let response: BatchGetResponse = ClusterClient::new(channel) .batch_get(request) .await .context(error::BatchGetSnafu)? @@ -165,6 +221,16 @@ fn check_resp_header(header: &Option, ctx: Context) -> Result<() Ok(()) } +fn need_retry(error: &error::Error) -> bool { + match error { + error::Error::IsNotLeader { .. } => true, + error::Error::Range { source, .. } | error::Error::BatchGet { source, .. } => { + match_for_io_error(source).is_some() + } + _ => false, + } +} + #[cfg(test)] mod tests { use api::v1::meta::{Error, ErrorCode, KeyValue, ResponseHeader}; diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index b08fd2f2dc..fb70ea004e 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -195,6 +195,15 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "Failed to batch range KVs from leader's in_memory kv store, source: {}", + source + ))] + Range { + source: tonic::Status, + backtrace: Backtrace, + }, + #[snafu(display("Response header not found"))] ResponseHeaderNotFound { backtrace: Backtrace }, @@ -213,6 +222,17 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "The number of retries for the grpc call {} exceeded the limit, {}", + func_name, + retry_num + ))] + ExceededRetryLimit { + func_name: String, + retry_num: usize, + backtrace: Backtrace, + }, + #[snafu(display("An error occurred in Meta, source: {}", source))] MetaInternal { #[snafu(backtrace)] @@ -271,6 +291,7 @@ impl ErrorExt for Error { | Error::NoLeader { .. } | Error::CreateChannel { .. } | Error::BatchGet { .. } + | Error::Range { .. } | Error::ResponseHeaderNotFound { .. } | Error::IsNotLeader { .. } | Error::NoMetaPeerClient { .. } @@ -279,6 +300,7 @@ impl ErrorExt for Error { | Error::Unlock { .. } | Error::LeaseGrant { .. } | Error::LockNotConfig { .. } + | Error::ExceededRetryLimit { .. } | Error::StartGrpc { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::EmptyTableName { .. } diff --git a/src/meta-srv/src/service/cluster.rs b/src/meta-srv/src/service/cluster.rs index 02e378738b..9ba265b666 100644 --- a/src/meta-srv/src/service/cluster.rs +++ b/src/meta-srv/src/service/cluster.rs @@ -16,6 +16,7 @@ use api::v1::meta::{ cluster_server, BatchGetRequest, BatchGetResponse, Error, RangeRequest, RangeResponse, ResponseHeader, }; +use common_telemetry::warn; use tonic::{Request, Response}; use crate::metasrv::MetaSrv; @@ -31,6 +32,8 @@ impl cluster_server::Cluster for MetaSrv { header: Some(is_not_leader), ..Default::default() }; + + warn!("The current meta is not leader, but a batch_get request have reached the meta. Detail: {:?}.", req); return Ok(Response::new(resp)); } @@ -53,6 +56,8 @@ impl cluster_server::Cluster for MetaSrv { header: Some(is_not_leader), ..Default::default() }; + + warn!("The current meta is not leader, but a range request have reached the meta. Detail: {:?}.", req); return Ok(Response::new(resp)); }