chore: impl KvBackend for MetaPeerClient (#3076)

This commit is contained in:
Lanqing Yang
2024-01-10 06:16:03 -08:00
committed by GitHub
parent 7fad4e8356
commit d521bc9dc5
2 changed files with 177 additions and 108 deletions

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
@@ -22,8 +23,12 @@ use api::v1::meta::{
RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
};
use common_grpc::channel_manager::ChannelManager;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::rpc::store::{BatchGetRequest, RangeRequest};
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_meta::util;
use common_telemetry::warn;
@@ -49,11 +54,158 @@ pub struct MetaPeerClient {
retry_interval_ms: u64,
}
#[async_trait::async_trait]
impl TxnService for MetaPeerClient {
type Error = error::Error;
}
#[async_trait::async_trait]
impl KvBackend for MetaPeerClient {
fn name(&self) -> &str {
"MetaPeerClient"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
if self.is_leader() {
return self
.in_memory
.range(req)
.await
.context(error::KvBackendSnafu);
}
let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;
for _ in 0..max_retry_count {
match self
.remote_range(req.key.clone(), req.range_end.clone(), req.keys_only)
.await
{
Ok(res) => return Ok(res),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}
error::ExceededRetryLimitSnafu {
func_name: "range",
retry_num: max_retry_count,
}
.fail()
}
// Get kv information from the leader's in_mem kv store
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
if self.is_leader() {
return self
.in_memory
.batch_get(req)
.await
.context(error::KvBackendSnafu);
}
let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;
for _ in 0..max_retry_count {
match self.remote_batch_get(req.keys.clone()).await {
Ok(res) => return Ok(res),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}
error::ExceededRetryLimitSnafu {
func_name: "batch_get",
retry_num: max_retry_count,
}
.fail()
}
// MetaPeerClient does not support mutable methods listed below.
async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
error::UnsupportedSnafu {
operation: "put".to_string(),
}
.fail()
}
async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
error::UnsupportedSnafu {
operation: "batch put".to_string(),
}
.fail()
}
async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
error::UnsupportedSnafu {
operation: "compare and put".to_string(),
}
.fail()
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
error::UnsupportedSnafu {
operation: "delete range".to_string(),
}
.fail()
}
async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
error::UnsupportedSnafu {
operation: "batch delete".to_string(),
}
.fail()
}
async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result<Option<KeyValue>> {
error::UnsupportedSnafu {
operation: "delete".to_string(),
}
.fail()
}
async fn put_conditionally(
&self,
_key: Vec<u8>,
_value: Vec<u8>,
_if_not_exists: bool,
) -> Result<bool> {
error::UnsupportedSnafu {
operation: "put conditionally".to_string(),
}
.fail()
}
}
impl MetaPeerClient {
async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
let key = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&key);
self.range(key, range_end, keys_only).await
let range_request = RangeRequest {
key,
range_end,
keys_only,
..Default::default()
};
self.range(range_request).await.map(|res| res.kvs)
}
// Get all datanode stat kvs from leader meta.
@@ -73,70 +225,11 @@ impl MetaPeerClient {
// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
let batch_get_req = BatchGetRequest { keys: stat_keys };
let kvs = self.batch_get(stat_keys).await?;
let res = self.batch_get(batch_get_req).await?;
to_stat_kv_map(kvs)
}
// Get kv information from the leader's in_mem kv store.
pub async fn get(&self, key: Vec<u8>) -> Result<Option<KeyValue>> {
let mut kvs = self.range(key, vec![], false).await?;
Ok(if kvs.is_empty() {
None
} else {
debug_assert_eq!(kvs.len(), 1);
Some(kvs.remove(0))
})
}
// Range kv information from the leader's in_mem kv store
pub async fn range(
&self,
key: Vec<u8>,
range_end: Vec<u8>,
keys_only: bool,
) -> Result<Vec<KeyValue>> {
if self.is_leader() {
let request = RangeRequest {
key,
range_end,
..Default::default()
};
return self
.in_memory
.range(request)
.await
.map(|resp| resp.kvs)
.context(error::KvBackendSnafu);
}
let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;
for _ in 0..max_retry_count {
match self
.remote_range(key.clone(), range_end.clone(), keys_only)
.await
{
Ok(kvs) => return Ok(kvs),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}
error::ExceededRetryLimitSnafu {
func_name: "range",
retry_num: max_retry_count,
}
.fail()
to_stat_kv_map(res.kvs)
}
async fn remote_range(
@@ -144,7 +237,7 @@ impl MetaPeerClient {
key: Vec<u8>,
range_end: Vec<u8>,
keys_only: bool,
) -> Result<Vec<KeyValue>> {
) -> Result<RangeResponse> {
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();
@@ -170,47 +263,13 @@ impl MetaPeerClient {
check_resp_header(&response.header, Context { addr: &leader_addr })?;
Ok(response.kvs.into_iter().map(KeyValue::new).collect())
Ok(RangeResponse {
kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
more: response.more,
})
}
// Get kv information from the leader's in_mem kv store
pub async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
if self.is_leader() {
let request = BatchGetRequest { keys };
return self
.in_memory
.batch_get(request)
.await
.map(|resp| resp.kvs)
.context(error::KvBackendSnafu);
}
let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;
for _ in 0..max_retry_count {
match self.remote_batch_get(keys.clone()).await {
Ok(kvs) => return Ok(kvs),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}
error::ExceededRetryLimitSnafu {
func_name: "batch_get",
retry_num: max_retry_count,
}
.fail()
}
async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<BatchGetResponse> {
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();
@@ -234,7 +293,9 @@ impl MetaPeerClient {
check_resp_header(&response.header, Context { addr: &leader_addr })?;
Ok(response.kvs.into_iter().map(KeyValue::new).collect())
Ok(BatchGetResponse {
kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
})
}
// Check if the meta node is a leader node.

View File

@@ -14,6 +14,7 @@
use std::collections::HashMap;
use common_meta::kv_backend::KvBackend;
use common_meta::peer::Peer;
use common_meta::{util, ClusterId};
use common_time::util as time_util;
@@ -39,7 +40,8 @@ pub async fn lookup_alive_datanode_peer(
cluster_id,
node_id: datanode_id,
};
let Some(kv) = meta_peer_client.get(lease_key.clone().try_into()?).await? else {
let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
@@ -74,7 +76,13 @@ where
let key = get_lease_prefix(cluster_id);
let range_end = util::get_prefix_end_key(&key);
let kvs = meta_peer_client.range(key, range_end, false).await?;
let range_req = common_meta::rpc::store::RangeRequest {
key,
range_end,
keys_only: false,
..Default::default()
};
let kvs = meta_peer_client.range(range_req).await?.kvs;
let mut lease_kvs = HashMap::new();
for kv in kvs {
let lease_key: LeaseKey = kv.key.try_into()?;