mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-25 01:10:37 +00:00
feat: add retry logic for MetaPeerClient (#991)
* add retry logic in meta_peer_client * impl need_retry function * create meta_peer_client using the builder pattern * cr
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3895,6 +3895,7 @@ dependencies = [
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"dashmap",
|
||||
"derive_builder 0.12.0",
|
||||
"etcd-client",
|
||||
"futures",
|
||||
"h2",
|
||||
|
||||
@@ -20,6 +20,7 @@ common-runtime = { path = "../common/runtime" }
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
common-time = { path = "../common/time" }
|
||||
dashmap = "5.4"
|
||||
derive_builder = "0.12"
|
||||
etcd-client = "0.10"
|
||||
futures.workspace = true
|
||||
h2 = "0.3"
|
||||
|
||||
@@ -25,7 +25,7 @@ use tokio::net::TcpListener;
|
||||
use tokio_stream::wrappers::TcpListenerStream;
|
||||
use tonic::transport::server::Router;
|
||||
|
||||
use crate::cluster::MetaPeerClient;
|
||||
use crate::cluster::MetaPeerClientBuilder;
|
||||
use crate::election::etcd::EtcdElection;
|
||||
use crate::lock::etcd::EtcdLock;
|
||||
use crate::metasrv::builder::MetaSrvBuilder;
|
||||
@@ -91,7 +91,13 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
|
||||
};
|
||||
|
||||
let in_memory = Arc::new(MemStore::default()) as ResetableKvStoreRef;
|
||||
let meta_peer_client = MetaPeerClient::new(in_memory.clone(), election.clone());
|
||||
|
||||
let meta_peer_client = MetaPeerClientBuilder::default()
|
||||
.election(election.clone())
|
||||
.in_memory(in_memory.clone())
|
||||
.build()
|
||||
// Safety: all required fields set at initialization
|
||||
.unwrap();
|
||||
|
||||
let selector = match opts.selector {
|
||||
SelectorType::LoadBased => Arc::new(LoadBasedSelector {
|
||||
|
||||
@@ -13,52 +13,93 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::cluster_client::ClusterClient;
|
||||
use api::v1::meta::{
|
||||
BatchGetRequest, BatchGetResponse, KeyValue, RangeRequest, RangeResponse, ResponseHeader,
|
||||
};
|
||||
use common_grpc::channel_manager::ChannelManager;
|
||||
use common_telemetry::warn;
|
||||
use derive_builder::Builder;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{match_for_io_error, Result};
|
||||
use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX};
|
||||
use crate::metasrv::ElectionRef;
|
||||
use crate::service::store::ext::KvStoreExt;
|
||||
use crate::service::store::kv::ResetableKvStoreRef;
|
||||
use crate::{error, util};
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Builder, Clone)]
|
||||
pub struct MetaPeerClient {
|
||||
election: Option<ElectionRef>,
|
||||
in_memory: ResetableKvStoreRef,
|
||||
#[builder(default = "ChannelManager::default()")]
|
||||
channel_manager: ChannelManager,
|
||||
#[builder(default = "3")]
|
||||
max_retry_count: usize,
|
||||
#[builder(default = "1000")]
|
||||
retry_interval_ms: u64,
|
||||
}
|
||||
|
||||
impl MetaPeerClient {
|
||||
pub fn new(in_mem: ResetableKvStoreRef, election: Option<ElectionRef>) -> Self {
|
||||
Self {
|
||||
election,
|
||||
in_memory: in_mem,
|
||||
channel_manager: ChannelManager::default(),
|
||||
}
|
||||
}
|
||||
|
||||
// Get all datanode stat kvs from leader meta.
|
||||
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
|
||||
let stat_prefix = format!("{DN_STAT_PREFIX}-").into_bytes();
|
||||
let range_end = util::get_prefix_end_key(&stat_prefix);
|
||||
let req = RangeRequest {
|
||||
key: stat_prefix.clone(),
|
||||
range_end,
|
||||
..Default::default()
|
||||
};
|
||||
let key = format!("{DN_STAT_PREFIX}-").into_bytes();
|
||||
let range_end = util::get_prefix_end_key(&key);
|
||||
|
||||
let kvs = self.range(key, range_end).await?;
|
||||
|
||||
to_stat_kv_map(kvs)
|
||||
}
|
||||
|
||||
// Get datanode stat kvs from leader meta by input keys.
|
||||
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
|
||||
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
|
||||
|
||||
let kvs = self.batch_get(stat_keys).await?;
|
||||
|
||||
to_stat_kv_map(kvs)
|
||||
}
|
||||
|
||||
// Range kv information from the leader's in_mem kv store
|
||||
pub async fn range(&self, key: Vec<u8>, range_end: Vec<u8>) -> Result<Vec<KeyValue>> {
|
||||
if self.is_leader() {
|
||||
let kvs = self.in_memory.range(req).await?.kvs;
|
||||
return to_stat_kv_map(kvs);
|
||||
let request = RangeRequest {
|
||||
key,
|
||||
range_end,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
return self.in_memory.range(request).await.map(|resp| resp.kvs);
|
||||
}
|
||||
|
||||
let max_retry_count = self.max_retry_count;
|
||||
let retry_interval_ms = self.retry_interval_ms;
|
||||
|
||||
for _ in 0..max_retry_count {
|
||||
match self.remote_range(key.clone(), range_end.clone()).await {
|
||||
Ok(kvs) => return Ok(kvs),
|
||||
Err(e) => {
|
||||
if need_retry(&e) {
|
||||
warn!("Encountered an error that need to retry, err: {:?}", e);
|
||||
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
error::ExceededRetryLimitSnafu {
|
||||
func_name: "range",
|
||||
retry_num: max_retry_count,
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn remote_range(&self, key: Vec<u8>, range_end: Vec<u8>) -> Result<Vec<KeyValue>> {
|
||||
// Safety: when self.is_leader() == false, election must not empty.
|
||||
let election = self.election.as_ref().unwrap();
|
||||
|
||||
@@ -69,39 +110,54 @@ impl MetaPeerClient {
|
||||
.get(&leader_addr)
|
||||
.context(error::CreateChannelSnafu)?;
|
||||
|
||||
let request = tonic::Request::new(req);
|
||||
let request = tonic::Request::new(RangeRequest {
|
||||
key,
|
||||
range_end,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let response: RangeResponse = ClusterClient::new(channel)
|
||||
.range(request)
|
||||
.await
|
||||
.context(error::BatchGetSnafu)?
|
||||
.context(error::RangeSnafu)?
|
||||
.into_inner();
|
||||
|
||||
check_resp_header(&response.header, Context { addr: &leader_addr })?;
|
||||
|
||||
to_stat_kv_map(response.kvs)
|
||||
}
|
||||
|
||||
// Get datanode stat kvs from leader meta by input keys.
|
||||
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
|
||||
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
|
||||
let stat_kvs = self.batch_get(stat_keys).await?;
|
||||
|
||||
let mut result = HashMap::with_capacity(stat_kvs.len());
|
||||
for stat_kv in stat_kvs {
|
||||
let stat_key = stat_kv.key.try_into()?;
|
||||
let stat_val = stat_kv.value.try_into()?;
|
||||
result.insert(stat_key, stat_val);
|
||||
}
|
||||
Ok(result)
|
||||
Ok(response.kvs)
|
||||
}
|
||||
|
||||
// Get kv information from the leader's in_mem kv store
|
||||
async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
|
||||
pub async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
|
||||
if self.is_leader() {
|
||||
return self.in_memory.batch_get(keys).await;
|
||||
}
|
||||
|
||||
let max_retry_count = self.max_retry_count;
|
||||
let retry_interval_ms = self.retry_interval_ms;
|
||||
|
||||
for _ in 0..max_retry_count {
|
||||
match self.remote_batch_get(keys.clone()).await {
|
||||
Ok(kvs) => return Ok(kvs),
|
||||
Err(e) => {
|
||||
if need_retry(&e) {
|
||||
warn!("Encountered an error that need to retry, err: {:?}", e);
|
||||
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
error::ExceededRetryLimitSnafu {
|
||||
func_name: "batch_get",
|
||||
retry_num: max_retry_count,
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
|
||||
// Safety: when self.is_leader() == false, election must not empty.
|
||||
let election = self.election.as_ref().unwrap();
|
||||
|
||||
@@ -113,11 +169,11 @@ impl MetaPeerClient {
|
||||
.context(error::CreateChannelSnafu)?;
|
||||
|
||||
let request = tonic::Request::new(BatchGetRequest {
|
||||
keys: keys.clone(),
|
||||
keys,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let response: BatchGetResponse = ClusterClient::new(channel.clone())
|
||||
let response: BatchGetResponse = ClusterClient::new(channel)
|
||||
.batch_get(request)
|
||||
.await
|
||||
.context(error::BatchGetSnafu)?
|
||||
@@ -165,6 +221,16 @@ fn check_resp_header(header: &Option<ResponseHeader>, ctx: Context) -> Result<()
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn need_retry(error: &error::Error) -> bool {
|
||||
match error {
|
||||
error::Error::IsNotLeader { .. } => true,
|
||||
error::Error::Range { source, .. } | error::Error::BatchGet { source, .. } => {
|
||||
match_for_io_error(source).is_some()
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::meta::{Error, ErrorCode, KeyValue, ResponseHeader};
|
||||
|
||||
@@ -195,6 +195,15 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to batch range KVs from leader's in_memory kv store, source: {}",
|
||||
source
|
||||
))]
|
||||
Range {
|
||||
source: tonic::Status,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Response header not found"))]
|
||||
ResponseHeaderNotFound { backtrace: Backtrace },
|
||||
|
||||
@@ -213,6 +222,17 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"The number of retries for the grpc call {} exceeded the limit, {}",
|
||||
func_name,
|
||||
retry_num
|
||||
))]
|
||||
ExceededRetryLimit {
|
||||
func_name: String,
|
||||
retry_num: usize,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("An error occurred in Meta, source: {}", source))]
|
||||
MetaInternal {
|
||||
#[snafu(backtrace)]
|
||||
@@ -271,6 +291,7 @@ impl ErrorExt for Error {
|
||||
| Error::NoLeader { .. }
|
||||
| Error::CreateChannel { .. }
|
||||
| Error::BatchGet { .. }
|
||||
| Error::Range { .. }
|
||||
| Error::ResponseHeaderNotFound { .. }
|
||||
| Error::IsNotLeader { .. }
|
||||
| Error::NoMetaPeerClient { .. }
|
||||
@@ -279,6 +300,7 @@ impl ErrorExt for Error {
|
||||
| Error::Unlock { .. }
|
||||
| Error::LeaseGrant { .. }
|
||||
| Error::LockNotConfig { .. }
|
||||
| Error::ExceededRetryLimit { .. }
|
||||
| Error::StartGrpc { .. } => StatusCode::Internal,
|
||||
Error::EmptyKey { .. }
|
||||
| Error::EmptyTableName { .. }
|
||||
|
||||
@@ -16,6 +16,7 @@ use api::v1::meta::{
|
||||
cluster_server, BatchGetRequest, BatchGetResponse, Error, RangeRequest, RangeResponse,
|
||||
ResponseHeader,
|
||||
};
|
||||
use common_telemetry::warn;
|
||||
use tonic::{Request, Response};
|
||||
|
||||
use crate::metasrv::MetaSrv;
|
||||
@@ -31,6 +32,8 @@ impl cluster_server::Cluster for MetaSrv {
|
||||
header: Some(is_not_leader),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
warn!("The current meta is not leader, but a batch_get request have reached the meta. Detail: {:?}.", req);
|
||||
return Ok(Response::new(resp));
|
||||
}
|
||||
|
||||
@@ -53,6 +56,8 @@ impl cluster_server::Cluster for MetaSrv {
|
||||
header: Some(is_not_leader),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
warn!("The current meta is not leader, but a range request have reached the meta. Detail: {:?}.", req);
|
||||
return Ok(Response::new(resp));
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user