feat: region lease improve (#2004)

* feat: add exists api into KvBackend

* refactor: region lease

* feat: fiter out inactive node in keep-lease

* feat: register&deregister inactive node

* chore: doc

* chore: ut

* chore: minor refactor

* feat: use memory_kv to store inactive node

* fix: use real error in

* chore: make inactive_node_manager's func compact

* chore: more efficiently

* feat: clear inactive status on cadidate node
This commit is contained in:
JeremyHi
2023-07-24 11:49:14 +08:00
committed by GitHub
parent 657fcaf9d0
commit 41139ec11d
23 changed files with 662 additions and 475 deletions

View File

@@ -58,35 +58,12 @@ impl KvBackend for CachedMetaKvBackend {
&self.name
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.kv_backend.range(req).await
fn as_any(&self) -> &dyn Any {
self
}
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let _timer = timer!(METRIC_CATALOG_KV_GET);
let init = async {
let _timer = timer!(METRIC_CATALOG_KV_REMOTE_GET);
self.kv_backend.get(key).await.map(|val| {
val.with_context(|| CacheNotGetSnafu {
key: String::from_utf8_lossy(key),
})
})?
};
// currently moka doesn't have `optionally_try_get_with_by_ref`
// TODO(fys): change to moka method when available
// https://github.com/moka-rs/moka/issues/254
match self.cache.try_get_with_by_ref(key, init).await {
Ok(val) => Ok(Some(val)),
Err(e) => match e.as_ref() {
CacheNotGet { .. } => Ok(None),
_ => Err(e),
},
}
.map_err(|e| GetKvCache {
err_msg: e.to_string(),
})
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.kv_backend.range(req).await
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
@@ -119,6 +96,22 @@ impl KvBackend for CachedMetaKvBackend {
resp
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
self.kv_backend.batch_get(req).await
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let key = &req.key.clone();
let ret = self.kv_backend.compare_and_put(req).await;
if ret.is_ok() {
self.invalidate_key(key).await;
}
ret
}
async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let prev_kv = req.prev_kv;
@@ -159,22 +152,6 @@ impl KvBackend for CachedMetaKvBackend {
}
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
self.kv_backend.batch_get(req).await
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let key = &req.key.clone();
let ret = self.kv_backend.compare_and_put(req).await;
if ret.is_ok() {
self.invalidate_key(key).await;
}
ret
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
let from_key = &req.from_key.clone();
let to_key = &req.to_key.clone();
@@ -189,8 +166,31 @@ impl KvBackend for CachedMetaKvBackend {
ret
}
fn as_any(&self) -> &dyn Any {
self
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let _timer = timer!(METRIC_CATALOG_KV_GET);
let init = async {
let _timer = timer!(METRIC_CATALOG_KV_REMOTE_GET);
self.kv_backend.get(key).await.map(|val| {
val.with_context(|| CacheNotGetSnafu {
key: String::from_utf8_lossy(key),
})
})?
};
// currently moka doesn't have `optionally_try_get_with_by_ref`
// TODO(fys): change to moka method when available
// https://github.com/moka-rs/moka/issues/254
match self.cache.try_get_with_by_ref(key, init).await {
Ok(val) => Ok(Some(val)),
Err(e) => match e.as_ref() {
CacheNotGet { .. } => Ok(None),
_ => Err(e),
},
}
.map_err(|e| GetKvCache {
err_msg: e.to_string(),
})
}
}

View File

@@ -20,7 +20,7 @@ use snafu::OptionExt;
use crate::error::{Error, InvalidProtoMsgSnafu};
#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(Eq, Hash, PartialEq, Clone, Debug, Default, Serialize, Deserialize)]
pub struct TableIdent {
pub catalog: String,
pub schema: String,

View File

@@ -40,12 +40,16 @@ where
{
fn name(&self) -> &str;
fn as_any(&self) -> &dyn Any;
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
async fn compare_and_put(
&self,
req: CompareAndPutRequest,
@@ -56,6 +60,35 @@ where
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse, Self::Error>;
async fn batch_delete(
&self,
req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse, Self::Error>;
/// MoveValue atomically renames the key to the given updated key.
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error>;
// The following methods are implemented based on the above methods,
// and a higher-level interface is provided for to simplify usage.
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
let req = RangeRequest::new().with_key(key.to_vec());
let mut resp = self.range(req).await?;
Ok(if resp.kvs.is_empty() {
None
} else {
Some(resp.kvs.remove(0))
})
}
/// Check if the key exists, not returning the value.
/// If the value is large, this method is more efficient than `get`.
async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
let resp = self.range(req).await?;
Ok(!resp.kvs.is_empty())
}
async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
if prev_kv {
@@ -70,27 +103,4 @@ where
Ok(None)
}
}
async fn batch_delete(
&self,
req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse, Self::Error>;
/// Default get is implemented based on `range` method.
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
let req = RangeRequest::new().with_key(key.to_vec());
let mut resp = self.range(req).await?;
Ok(if resp.kvs.is_empty() {
None
} else {
Some(resp.kvs.remove(0))
})
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
/// MoveValue atomically renames the key to the given updated key.
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error>;
fn as_any(&self) -> &dyn Any;
}

View File

@@ -80,6 +80,10 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
"Memory"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
let RangeRequest {
key,
@@ -155,6 +159,23 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
Ok(BatchPutResponse { prev_kvs })
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
let kvs = self.kvs.read().unwrap();
let kvs = req
.keys
.into_iter()
.filter_map(|key| {
kvs.get_key_value(&key).map(|(k, v)| KeyValue {
key: k.clone(),
value: v.clone(),
})
})
.collect::<Vec<_>>();
Ok(BatchGetResponse { kvs })
}
async fn compare_and_put(
&self,
req: CompareAndPutRequest,
@@ -251,23 +272,6 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
Ok(BatchDeleteResponse { prev_kvs })
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
let kvs = self.kvs.read().unwrap();
let kvs = req
.keys
.into_iter()
.filter_map(|key| {
kvs.get_key_value(&key).map(|(k, v)| KeyValue {
key: k.clone(),
value: v.clone(),
})
})
.collect::<Vec<_>>();
Ok(BatchGetResponse { kvs })
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error> {
let MoveValueRequest { from_key, to_key } = req;
@@ -288,10 +292,6 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
Ok(MoveValueResponse(kv))
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait]

View File

@@ -458,6 +458,9 @@ pub enum Error {
source: common_meta::error::Error,
location: Location,
},
#[snafu(display("Invalid heartbeat request: {}", err_msg))]
InvalidHeartbeatRequest { err_msg: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -469,10 +472,6 @@ impl From<Error> for tonic::Status {
}
impl ErrorExt for Error {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn status_code(&self) -> StatusCode {
match self {
Error::EtcdFailed { .. }
@@ -516,7 +515,8 @@ impl ErrorExt for Error {
| Error::InvalidStatKey { .. }
| Error::ParseNum { .. }
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. } => StatusCode::InvalidArguments,
| Error::InvalidArguments { .. }
| Error::InvalidHeartbeatRequest { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
| Error::StatKeyFromUtf8 { .. }
@@ -558,6 +558,10 @@ impl ErrorExt for Error {
Error::Other { source, .. } => source.status_code(),
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
// for form tonic

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use common_telemetry::debug;
use common_telemetry::warn;
use super::node_stat::Stat;
use crate::error::Result;
@@ -45,8 +45,8 @@ impl HeartbeatHandler for CollectStatsHandler {
Ok(stat) => {
let _ = acc.stat.insert(stat);
}
Err(_) => {
debug!("Incomplete heartbeat data: {:?}", req);
Err(err) => {
warn!("Incomplete heartbeat data: {:?}, err: {:?}", req, err);
}
};

View File

@@ -36,7 +36,6 @@ pub(crate) struct DatanodeHeartbeat {
pub struct RegionFailureHandler {
failure_detect_runner: FailureDetectRunner,
region_failover_manager: Arc<RegionFailoverManager>,
}
impl RegionFailureHandler {
@@ -52,13 +51,8 @@ impl RegionFailureHandler {
Ok(Self {
failure_detect_runner,
region_failover_manager,
})
}
pub(crate) fn region_failover_manager(&self) -> &Arc<RegionFailoverManager> {
&self.region_failover_manager
}
}
#[async_trait]
@@ -89,9 +83,9 @@ impl HeartbeatHandler for RegionFailureHandler {
cluster_id: stat.cluster_id,
datanode_id: stat.id,
table_ident: TableIdent {
catalog: x.catalog.clone(),
schema: x.schema.clone(),
table: x.table.clone(),
catalog: x.table_ident.catalog.clone(),
schema: x.table_ident.schema.clone(),
table: x.table_ident.table.clone(),
table_id: RegionId::from(x.id).table_id(),
// TODO(#1583): Use the actual table engine.
engine: MITO_ENGINE.to_string(),
@@ -132,9 +126,13 @@ mod tests {
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
id: region_id,
catalog: "a".to_string(),
schema: "b".to_string(),
table: "c".to_string(),
table_ident: TableIdent {
catalog: "a".to_string(),
schema: "b".to_string(),
table: "c".to_string(),
table_id: 0,
engine: "d".to_string(),
},
rcus: 0,
wcus: 0,
approximate_bytes: 0,

View File

@@ -13,9 +13,12 @@
// limitations under the License.
use api::v1::meta::HeartbeatRequest;
use common_meta::ident::TableIdent;
use common_time::util as time_util;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{Error, InvalidHeartbeatRequestSnafu};
use crate::keys::StatKey;
#[derive(Debug, Default, Serialize, Deserialize)]
@@ -40,9 +43,7 @@ pub struct Stat {
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct RegionStat {
pub id: u64,
pub catalog: String,
pub schema: String,
pub table: String,
pub table_ident: TableIdent,
/// The read capacity units during this period
pub rcus: i64,
/// The write capacity units during this period
@@ -63,7 +64,7 @@ impl Stat {
}
impl TryFrom<HeartbeatRequest> for Stat {
type Error = ();
type Error = Error;
fn try_from(value: HeartbeatRequest) -> Result<Self, Self::Error> {
let HeartbeatRequest {
@@ -82,6 +83,10 @@ impl TryFrom<HeartbeatRequest> for Stat {
} else {
None
};
let region_stats = region_stats
.into_iter()
.map(RegionStat::try_from)
.collect::<Result<Vec<_>, _>>()?;
Ok(Self {
timestamp_millis: time_util::current_time_millis(),
@@ -92,31 +97,41 @@ impl TryFrom<HeartbeatRequest> for Stat {
wcus: node_stat.wcus,
table_num: node_stat.table_num,
region_num,
region_stats: region_stats.into_iter().map(RegionStat::from).collect(),
region_stats,
node_epoch,
})
}
_ => Err(()),
_ => InvalidHeartbeatRequestSnafu {
err_msg: "missing header, peer or node_stat",
}
.fail(),
}
}
}
impl From<api::v1::meta::RegionStat> for RegionStat {
fn from(value: api::v1::meta::RegionStat) -> Self {
let table = value
.table_ident
.as_ref()
.and_then(|t| t.table_name.as_ref());
Self {
impl TryFrom<api::v1::meta::RegionStat> for RegionStat {
type Error = Error;
fn try_from(value: api::v1::meta::RegionStat) -> Result<Self, Self::Error> {
let table_ident = value.table_ident.context(InvalidHeartbeatRequestSnafu {
err_msg: "missing table_ident",
})?;
let table_ident_result = TableIdent::try_from(table_ident);
let Ok(table_ident) = table_ident_result else {
return InvalidHeartbeatRequestSnafu {
err_msg: format!("invalid table_ident: {:?}", table_ident_result.err()),
}
.fail();
};
Ok(Self {
id: value.region_id,
catalog: table.map_or("", |t| &t.catalog_name).to_string(),
schema: table.map_or("", |t| &t.schema_name).to_string(),
table: table.map_or("", |t| &t.table_name).to_string(),
table_ident,
rcus: value.rcus,
wcus: value.wcus,
approximate_bytes: value.approximate_bytes,
approximate_rows: value.approximate_rows,
}
})
}
}

View File

@@ -13,90 +13,23 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManagerRef;
use common_meta::ClusterId;
use common_telemetry::warn;
use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use store_api::storage::RegionId;
use crate::error::{Result, TableMetadataManagerSnafu};
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::inactive_node_manager::InactiveNodeManager;
use crate::metasrv::Context;
use crate::procedure::region_failover::{RegionFailoverKey, RegionFailoverManager};
/// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus
/// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second).
// TODO(LFC): Make region lease seconds calculated from Datanode heartbeat configuration.
pub(crate) const REGION_LEASE_SECONDS: u64 = 20;
pub(crate) struct RegionLeaseHandler {
region_failover_manager: Option<Arc<RegionFailoverManager>>,
table_metadata_manager: TableMetadataManagerRef,
}
impl RegionLeaseHandler {
pub(crate) fn new(
region_failover_manager: Option<Arc<RegionFailoverManager>>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
region_failover_manager,
table_metadata_manager,
}
}
async fn find_table_ident(&self, table_id: TableId) -> Result<Option<TableIdent>> {
let value = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?;
Ok(value.map(|x| {
let table_info = &x.table_info;
TableIdent {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table: table_info.name.clone(),
table_id,
engine: table_info.meta.engine.clone(),
}
}))
}
/// Filter out the regions that are currently in failover.
/// It's meaningless to extend the lease of a region if it is in failover.
fn filter_failover_regions(
&self,
cluster_id: ClusterId,
table_ident: &TableIdent,
regions: Vec<RegionNumber>,
) -> Vec<RegionNumber> {
if let Some(region_failover_manager) = &self.region_failover_manager {
let mut region_failover_key = RegionFailoverKey {
cluster_id,
table_ident: table_ident.clone(),
region_number: 0,
};
regions
.into_iter()
.filter(|region| {
region_failover_key.region_number = *region;
!region_failover_manager.is_region_failover_running(&region_failover_key)
})
.collect()
} else {
regions
}
}
}
#[derive(Default)]
pub(crate) struct RegionLeaseHandler;
#[async_trait]
impl HeartbeatHandler for RegionLeaseHandler {
@@ -107,73 +40,56 @@ impl HeartbeatHandler for RegionLeaseHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
_: &mut Context,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let Some(stat) = acc.stat.as_ref() else { return Ok(()) };
let datanode_id = stat.id;
let mut datanode_regions = HashMap::new();
stat.region_stats.iter().for_each(|x| {
let region_id: RegionId = x.id.into();
let table_id = region_id.table_id();
datanode_regions
.entry(table_id)
let mut table_region_leases = HashMap::new();
stat.region_stats.iter().for_each(|region_stat| {
let table_ident = region_stat.table_ident.clone();
table_region_leases
.entry(table_ident)
.or_insert_with(Vec::new)
.push(RegionId::from(x.id).region_number());
.push(RegionId::from(region_stat.id).region_number());
});
let mut region_leases = Vec::with_capacity(datanode_regions.len());
for (table_id, local_regions) in datanode_regions {
let Some(table_ident) = self.find_table_ident(table_id).await? else {
warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \
Reason: table not found.");
continue;
};
let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory);
for (table_ident, region_numbers) in table_region_leases.iter_mut() {
inactive_node_manager
.retain_active_regions(
stat.cluster_id,
stat.id,
table_ident.table_id,
region_numbers,
)
.await?;
}
let Some(table_region_value) = self
.table_metadata_manager
.table_region_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)? else {
warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \
Reason: table region value not found.");
continue;
};
let Some(global_regions) = table_region_value
.region_distribution
.get(&datanode_id) else {
warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \
Reason: not expected to place the region on it.");
continue;
};
// Filter out the designated regions from table info value for the given table on the given Datanode.
let designated_regions = local_regions
.into_iter()
.filter(|x| global_regions.contains(x))
.collect::<Vec<_>>();
let designated_regions =
self.filter_failover_regions(stat.cluster_id, &table_ident, designated_regions);
region_leases.push(RegionLease {
acc.region_leases = table_region_leases
.into_iter()
.filter(|(_, regions)| !regions.is_empty()) // filter out empty region_numbers
.map(|(table_ident, regions)| RegionLease {
table_ident: Some(table_ident.into()),
regions: designated_regions,
regions,
duration_since_epoch: req.duration_since_epoch,
lease_seconds: REGION_LEASE_SECONDS,
});
}
acc.region_leases = region_leases;
})
.collect();
Ok(())
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManager;
use common_meta::RegionIdent;
use store_api::storage::RegionNumber;
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
@@ -208,18 +124,6 @@ mod test {
table_id,
engine: "mito".to_string(),
};
let _ = region_failover_manager
.running_procedures()
.write()
.unwrap()
.insert(RegionFailoverKey {
cluster_id: 1,
table_ident: table_ident.clone(),
region_number: 1,
});
let handler =
RegionLeaseHandler::new(Some(region_failover_manager), table_metadata_manager);
let req = HeartbeatRequest {
duration_since_epoch: 1234,
@@ -229,15 +133,20 @@ mod test {
let builder = MetaSrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let ctx = &mut metasrv.new_ctx();
let handler = RegionLeaseHandler::default();
let acc = &mut HeartbeatAccumulator::default();
let new_region_stat = |region_number: RegionNumber| -> RegionStat {
let region_id = RegionId::new(table_id, region_number);
RegionStat {
id: region_id.as_u64(),
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: table_name.to_string(),
table_ident: TableIdent {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: table_name.to_string(),
table_id: 1,
engine: "mito".to_string(),
},
..Default::default()
}
};
@@ -248,10 +157,34 @@ mod test {
..Default::default()
});
let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory);
inactive_node_manager
.register_inactive_region(&RegionIdent {
cluster_id: 1,
datanode_id: 1,
table_ident: TableIdent {
table_id: 1,
..Default::default()
},
region_number: 1,
})
.await
.unwrap();
inactive_node_manager
.register_inactive_region(&RegionIdent {
cluster_id: 1,
datanode_id: 1,
table_ident: TableIdent {
table_id: 1,
..Default::default()
},
region_number: 3,
})
.await
.unwrap();
handler.handle(&req, ctx, acc).await.unwrap();
// region 1 is during failover and region 3 is not in table region value,
// so only region 2's lease is extended.
assert_eq!(acc.region_leases.len(), 1);
let lease = acc.region_leases.remove(0);
assert_eq!(lease.table_ident.unwrap(), table_ident.into());

View File

@@ -0,0 +1,107 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use common_meta::rpc::store::{BatchGetRequest, PutRequest};
use common_meta::RegionIdent;
use store_api::storage::RegionNumber;
use crate::error::Result;
use crate::keys::InactiveNodeKey;
use crate::service::store::kv::ResettableKvStoreRef;
pub struct InactiveNodeManager<'a> {
store: &'a ResettableKvStoreRef,
}
impl<'a> InactiveNodeManager<'a> {
pub fn new(store: &'a ResettableKvStoreRef) -> Self {
Self { store }
}
pub async fn register_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> {
let key = InactiveNodeKey {
cluster_id: region_ident.cluster_id,
node_id: region_ident.datanode_id,
table_id: region_ident.table_ident.table_id,
region_number: region_ident.region_number,
};
let req = PutRequest {
key: key.into(),
value: vec![],
prev_kv: false,
};
self.store.put(req).await?;
Ok(())
}
pub async fn deregister_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> {
let key: Vec<u8> = InactiveNodeKey {
cluster_id: region_ident.cluster_id,
node_id: region_ident.datanode_id,
table_id: region_ident.table_ident.table_id,
region_number: region_ident.region_number,
}
.into();
self.store.delete(&key, false).await?;
Ok(())
}
/// The input is a list of regions from a table on a specific node. If one or more
/// regions have been set to inactive state by metasrv, the corresponding regions
/// will be removed, then return the remaining regions.
pub async fn retain_active_regions(
&self,
cluster_id: u64,
node_id: u64,
table_id: u32,
region_numbers: &mut Vec<RegionNumber>,
) -> Result<()> {
let key_region_numbers: Vec<(Vec<u8>, RegionNumber)> = region_numbers
.iter()
.map(|region_number| {
(
InactiveNodeKey {
cluster_id,
node_id,
table_id,
region_number: *region_number,
}
.into(),
*region_number,
)
})
.collect();
let keys = key_region_numbers
.iter()
.map(|(key, _)| key.clone())
.collect();
let resp = self.store.batch_get(BatchGetRequest { keys }).await?;
let kvs = resp.kvs;
if kvs.is_empty() {
return Ok(());
}
let inactive_keys = kvs.into_iter().map(|kv| kv.key).collect::<HashSet<_>>();
let active_region_numbers = key_region_numbers
.into_iter()
.filter(|(key, _)| !inactive_keys.contains(key))
.map(|(_, region_number)| region_number)
.collect::<Vec<_>>();
*region_numbers = active_region_numbers;
Ok(())
}
}

View File

@@ -19,6 +19,7 @@ use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use crate::error;
use crate::error::Result;
@@ -26,6 +27,7 @@ use crate::handler::node_stat::Stat;
pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease";
pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
pub(crate) const INACTIVE_NODE_PREFIX: &str = "__meta_inactive_node";
pub const DN_STAT_PREFIX: &str = "__meta_dnstat";
@@ -136,7 +138,7 @@ pub fn build_table_route_prefix(catalog: impl AsRef<str>, schema: impl AsRef<str
)
}
#[derive(Eq, PartialEq, Debug, Clone, Hash, Copy)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct StatKey {
pub cluster_id: u64,
pub node_id: u64,
@@ -242,6 +244,28 @@ impl TryFrom<Vec<u8>> for StatValue {
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct InactiveNodeKey {
pub cluster_id: u64,
pub node_id: u64,
pub table_id: u32,
pub region_number: RegionNumber,
}
impl From<InactiveNodeKey> for Vec<u8> {
fn from(value: InactiveNodeKey) -> Self {
format!(
"{}-{}-{}-{}-{}",
INACTIVE_NODE_PREFIX,
value.cluster_id,
value.node_id,
value.table_id,
value.region_number
)
.into_bytes()
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -39,5 +39,6 @@ pub mod table_routes;
pub use crate::error::Result;
mod inactive_node_manager;
#[cfg(test)]
mod test_util;

View File

@@ -18,11 +18,12 @@ use std::time::Duration;
use client::client_manager::DatanodeClients;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::key::TableMetadataManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::ddl::DdlManager;
use crate::ddl::{DdlManager, DdlManagerRef};
use crate::error::Result;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
@@ -41,6 +42,7 @@ use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore};
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef};
use crate::service::store::memory::MemStore;
@@ -145,63 +147,26 @@ impl MetaSrvBuilder {
let kv_store = kv_store.unwrap_or_else(|| Arc::new(MemStore::default()));
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemStore::default()));
let leader_cached_kv_store = Arc::new(LeaderCachedKvStore::new(
Arc::new(CheckLeaderByElection(election.clone())),
kv_store.clone(),
));
let meta_peer_client = meta_peer_client.unwrap_or_else(|| {
MetaPeerClientBuilder::default()
.election(election.clone())
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap()
});
let leader_cached_kv_store = build_leader_cached_kv_store(&election, &kv_store);
let meta_peer_client = meta_peer_client
.unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
let pushers = Pushers::default();
let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone());
let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence);
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
let manager_config = ManagerConfig {
max_retry_times: options.procedure.max_retry_times,
retry_delay: options.procedure.retry_delay,
..Default::default()
};
let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store));
let mailbox = build_mailbox(&kv_store, &pushers);
let procedure_manager = build_procedure_manager(&options, &kv_store);
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));
let metadata_service = metadata_service
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)));
let datanode_clients = datanode_clients.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(
options.datanode.client_options.timeout_millis,
))
.connect_timeout(Duration::from_millis(
options.datanode.client_options.connect_timeout_millis,
))
.tcp_nodelay(options.datanode.client_options.tcp_nodelay);
Arc::new(DatanodeClients::new(datanode_client_channel_config))
});
// TODO(weny): considers to modify the default config of procedure manager
let ddl_manager = Arc::new(DdlManager::new(
procedure_manager.clone(),
kv_store.clone(),
let table_metadata_manager = build_table_metadata_manager(&kv_store);
let ddl_manager = build_ddl_manager(
&options,
datanode_clients,
mailbox.clone(),
options.server_addr.clone(),
table_metadata_manager.clone(),
));
&procedure_manager,
&kv_store,
&mailbox,
&table_metadata_manager,
);
let _ = ddl_manager.try_start();
let handler_group = match handler_group {
@@ -210,36 +175,30 @@ impl MetaSrvBuilder {
let region_failover_handler = if options.disable_region_failover {
None
} else {
let select_ctx = SelectorContext {
server_addr: options.server_addr.clone(),
datanode_lease_secs: options.datanode_lease_secs,
kv_store: kv_store.clone(),
meta_peer_client: meta_peer_client.clone(),
catalog: None,
schema: None,
table: None,
};
let region_failover_manager = Arc::new(RegionFailoverManager::new(
in_memory.clone(),
mailbox.clone(),
procedure_manager.clone(),
selector.clone(),
SelectorContext {
server_addr: options.server_addr.clone(),
datanode_lease_secs: options.datanode_lease_secs,
kv_store: kv_store.clone(),
meta_peer_client: meta_peer_client.clone(),
catalog: None,
schema: None,
table: None,
},
select_ctx,
lock.clone(),
table_metadata_manager.clone(),
));
Some(
RegionFailureHandler::try_new(election.clone(), region_failover_manager)
.await?,
)
};
let region_lease_handler = RegionLeaseHandler::new(
region_failover_handler
.as_ref()
.map(|x| x.region_failover_manager().clone()),
table_metadata_manager.clone(),
);
let group = HeartbeatHandlerGroup::new(pushers);
group.add_handler(ResponseHeaderHandler::default()).await;
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
@@ -253,7 +212,7 @@ impl MetaSrvBuilder {
if let Some(region_failover_handler) = region_failover_handler {
group.add_handler(region_failover_handler).await;
}
group.add_handler(region_lease_handler).await;
group.add_handler(RegionLeaseHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;
group
}
@@ -280,6 +239,80 @@ impl MetaSrvBuilder {
}
}
fn build_leader_cached_kv_store(
election: &Option<ElectionRef>,
kv_store: &KvStoreRef,
) -> Arc<LeaderCachedKvStore> {
Arc::new(LeaderCachedKvStore::new(
Arc::new(CheckLeaderByElection(election.clone())),
kv_store.clone(),
))
}
fn build_default_meta_peer_client(
election: &Option<ElectionRef>,
in_memory: &ResettableKvStoreRef,
) -> MetaPeerClientRef {
MetaPeerClientBuilder::default()
.election(election.clone())
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap()
}
fn build_mailbox(kv_store: &KvStoreRef, pushers: &Pushers) -> MailboxRef {
let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone());
HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
}
fn build_procedure_manager(options: &MetaSrvOptions, kv_store: &KvStoreRef) -> ProcedureManagerRef {
let manager_config = ManagerConfig {
max_retry_times: options.procedure.max_retry_times,
retry_delay: options.procedure.retry_delay,
..Default::default()
};
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
Arc::new(LocalManager::new(manager_config, state_store))
}
fn build_table_metadata_manager(kv_store: &KvStoreRef) -> TableMetadataManagerRef {
Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)))
}
fn build_ddl_manager(
options: &MetaSrvOptions,
datanode_clients: Option<Arc<DatanodeClients>>,
procedure_manager: &ProcedureManagerRef,
kv_store: &KvStoreRef,
mailbox: &MailboxRef,
table_metadata_manager: &TableMetadataManagerRef,
) -> DdlManagerRef {
let datanode_clients = datanode_clients.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(
options.datanode.client_options.timeout_millis,
))
.connect_timeout(Duration::from_millis(
options.datanode.client_options.connect_timeout_millis,
))
.tcp_nodelay(options.datanode.client_options.tcp_nodelay);
Arc::new(DatanodeClients::new(datanode_client_channel_config))
});
// TODO(weny): considers to modify the default config of procedure manager
Arc::new(DdlManager::new(
procedure_manager.clone(),
kv_store.clone(),
datanode_clients,
mailbox.clone(),
options.server_addr.clone(),
table_metadata_manager.clone(),
))
}
impl Default for MetaSrvBuilder {
fn default() -> Self {
Self::new()

View File

@@ -45,6 +45,7 @@ use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataMan
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::service::mailbox::MailboxRef;
use crate::service::store::kv::ResettableKvStoreRef;
const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30);
const CLOSE_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(2);
@@ -68,6 +69,7 @@ impl From<RegionIdent> for RegionFailoverKey {
}
pub(crate) struct RegionFailoverManager {
in_memory: ResettableKvStoreRef,
mailbox: MailboxRef,
procedure_manager: ProcedureManagerRef,
selector: SelectorRef,
@@ -90,6 +92,7 @@ impl Drop for FailoverProcedureGuard {
impl RegionFailoverManager {
pub(crate) fn new(
in_memory: ResettableKvStoreRef,
mailbox: MailboxRef,
procedure_manager: ProcedureManagerRef,
selector: SelectorRef,
@@ -98,6 +101,7 @@ impl RegionFailoverManager {
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
in_memory,
mailbox,
procedure_manager,
selector,
@@ -110,6 +114,7 @@ impl RegionFailoverManager {
pub(crate) fn create_context(&self) -> RegionFailoverContext {
RegionFailoverContext {
in_memory: self.in_memory.clone(),
mailbox: self.mailbox.clone(),
selector: self.selector.clone(),
selector_ctx: self.selector_ctx.clone(),
@@ -133,10 +138,6 @@ impl RegionFailoverManager {
})
}
pub(crate) fn is_region_failover_running(&self, key: &RegionFailoverKey) -> bool {
self.running_procedures.read().unwrap().contains(key)
}
fn insert_running_procedures(
&self,
failed_region: &RegionIdent,
@@ -153,11 +154,6 @@ impl RegionFailoverManager {
}
}
#[cfg(test)]
pub(crate) fn running_procedures(&self) -> Arc<RwLock<HashSet<RegionFailoverKey>>> {
self.running_procedures.clone()
}
pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> {
let Some(guard) = self.insert_running_procedures(failed_region) else {
warn!("Region failover procedure for region {failed_region} is already running!");
@@ -224,6 +220,7 @@ struct Node {
/// The "Context" of region failover procedure state machine.
#[derive(Clone)]
pub struct RegionFailoverContext {
pub in_memory: ResettableKvStoreRef,
pub mailbox: MailboxRef,
pub selector: SelectorRef,
pub selector_ctx: SelectorContext,
@@ -463,6 +460,7 @@ mod tests {
}
pub async fn build(self) -> TestingEnv {
let in_memory = Arc::new(MemStore::new());
let kv_store: KvStoreRef = Arc::new(MemStore::new());
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
@@ -527,6 +525,7 @@ mod tests {
TestingEnv {
context: RegionFailoverContext {
in_memory,
mailbox,
selector,
selector_ctx,

View File

@@ -29,6 +29,7 @@ use crate::error::{
Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::HeartbeatMailbox;
use crate::inactive_node_manager::InactiveNodeManager;
use crate::procedure::region_failover::OPEN_REGION_MESSAGE_TIMEOUT;
use crate::service::mailbox::{Channel, MailboxReceiver};
@@ -64,6 +65,21 @@ impl ActivateRegion {
input: instruction.to_string(),
})?;
// Ensure that metasrv will renew the lease for this candidate node.
//
// This operation may not be redundant, imagine the following scenario:
// This candidate once had the current region, and because it did not respond to the `close`
// command in time, it was considered an inactive node by metasrv, then it replied, and the
// current region failed over again, and the node was selected as a candidate, so it needs
// to clear its previous state first.
let candidate = RegionIdent {
datanode_id: self.candidate.id,
..failed_region.clone()
};
InactiveNodeManager::new(&ctx.in_memory)
.deregister_inactive_region(&candidate)
.await?;
let ch = Channel::Datanode(self.candidate.id);
ctx.mailbox.send(&ch, msg, timeout).await
}

View File

@@ -30,6 +30,7 @@ use crate::error::{
};
use crate::handler::region_lease_handler::REGION_LEASE_SECONDS;
use crate::handler::HeartbeatMailbox;
use crate::inactive_node_manager::InactiveNodeManager;
use crate::procedure::region_failover::CLOSE_REGION_MESSAGE_TIMEOUT;
use crate::service::mailbox::{Channel, MailboxReceiver};
@@ -66,12 +67,17 @@ impl DeactivateRegion {
input: instruction.to_string(),
})?;
InactiveNodeManager::new(&ctx.in_memory)
.register_inactive_region(failed_region)
.await?;
let ch = Channel::Datanode(failed_region.datanode_id);
ctx.mailbox.send(&ch, msg, timeout).await
}
async fn handle_response(
self,
ctx: &RegionFailoverContext,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
@@ -87,6 +93,10 @@ impl DeactivateRegion {
}.fail();
};
if result {
InactiveNodeManager::new(&ctx.in_memory)
.deregister_inactive_region(failed_region)
.await?;
Ok(Box::new(ActivateRegion::new(self.candidate)))
} else {
// Under rare circumstances would a Datanode fail to close a Region.
@@ -138,7 +148,8 @@ impl State for DeactivateRegion {
Err(e) => return Err(e),
};
self.handle_response(mailbox_receiver, failed_region).await
self.handle_response(ctx, mailbox_receiver, failed_region)
.await
}
}
@@ -207,7 +218,7 @@ mod tests {
.unwrap();
let next_state = state
.handle_response(mailbox_receiver, &failed_region)
.handle_response(&env.context, mailbox_receiver, &failed_region)
.await
.unwrap();
assert_eq!(
@@ -251,7 +262,7 @@ mod tests {
);
let next_state = state
.handle_response(mailbox_receiver, &failed_region)
.handle_response(&env.context, mailbox_receiver, &failed_region)
.await
.unwrap();
// Timeout or not, proceed to `ActivateRegion`.

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use api::v1::meta::Peer;
use common_meta::ident::TableIdent;
use common_telemetry::warn;
use crate::error::Result;
@@ -102,9 +103,13 @@ fn contains_table(
if let Some(latest) = may_latest {
for RegionStat {
catalog,
schema,
table,
table_ident:
TableIdent {
catalog,
schema,
table,
..
},
..
} in latest.region_stats.iter()
{
@@ -121,6 +126,8 @@ fn contains_table(
#[cfg(test)]
mod tests {
use common_meta::ident::TableIdent;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::keys::StatValue;
use crate::selector::load_based::contains_table;
@@ -135,21 +142,30 @@ mod tests {
Stat {
region_stats: vec![
RegionStat {
catalog: "greptime_1".to_string(),
schema: "public_1".to_string(),
table: "demo_1".to_string(),
table_ident: TableIdent {
catalog: "greptime_1".to_string(),
schema: "public_1".to_string(),
table: "demo_1".to_string(),
..Default::default()
},
..Default::default()
},
RegionStat {
catalog: "greptime_2".to_string(),
schema: "public_2".to_string(),
table: "demo_2".to_string(),
table_ident: TableIdent {
catalog: "greptime_2".to_string(),
schema: "public_2".to_string(),
table: "demo_2".to_string(),
..Default::default()
},
..Default::default()
},
RegionStat {
catalog: "greptime_3".to_string(),
schema: "public_3".to_string(),
table: "demo_3".to_string(),
table_ident: TableIdent {
catalog: "greptime_3".to_string(),
schema: "public_3".to_string(),
table: "demo_3".to_string(),
..Default::default()
},
..Default::default()
},
],
@@ -158,21 +174,30 @@ mod tests {
Stat {
region_stats: vec![
RegionStat {
catalog: "greptime_1".to_string(),
schema: "public_1".to_string(),
table: "demo_1".to_string(),
table_ident: TableIdent {
catalog: "greptime_1".to_string(),
schema: "public_1".to_string(),
table: "demo_1".to_string(),
..Default::default()
},
..Default::default()
},
RegionStat {
catalog: "greptime_2".to_string(),
schema: "public_2".to_string(),
table: "demo_2".to_string(),
table_ident: TableIdent {
catalog: "greptime_2".to_string(),
schema: "public_2".to_string(),
table: "demo_2".to_string(),
..Default::default()
},
..Default::default()
},
RegionStat {
catalog: "greptime_3".to_string(),
schema: "public_3".to_string(),
table: "demo_3".to_string(),
table_ident: TableIdent {
catalog: "greptime_3".to_string(),
schema: "public_3".to_string(),
table: "demo_3".to_string(),
..Default::default()
},
..Default::default()
},
],
@@ -181,21 +206,30 @@ mod tests {
Stat {
region_stats: vec![
RegionStat {
catalog: "greptime_1".to_string(),
schema: "public_1".to_string(),
table: "demo_1".to_string(),
table_ident: TableIdent {
catalog: "greptime_1".to_string(),
schema: "public_1".to_string(),
table: "demo_1".to_string(),
..Default::default()
},
..Default::default()
},
RegionStat {
catalog: "greptime_2".to_string(),
schema: "public_2".to_string(),
table: "demo_2".to_string(),
table_ident: TableIdent {
catalog: "greptime_2".to_string(),
schema: "public_2".to_string(),
table: "demo_2".to_string(),
..Default::default()
},
..Default::default()
},
RegionStat {
catalog: "greptime_4".to_string(),
schema: "public_4".to_string(),
table: "demo_4".to_string(),
table_ident: TableIdent {
catalog: "greptime_4".to_string(),
schema: "public_4".to_string(),
table: "demo_4".to_string(),
..Default::default()
},
..Default::default()
},
],

View File

@@ -213,6 +213,10 @@ mod tests {
"Noop"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, _: RangeRequest) -> Result<RangeResponse> {
unreachable!()
}
@@ -240,16 +244,12 @@ mod tests {
unreachable!()
}
async fn move_value(&self, _: MoveValueRequest) -> Result<MoveValueResponse> {
unreachable!()
}
async fn batch_delete(&self, _: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
unreachable!()
}
fn as_any(&self) -> &dyn Any {
self
async fn move_value(&self, _: MoveValueRequest) -> Result<MoveValueResponse> {
unreachable!()
}
}

View File

@@ -120,6 +120,10 @@ impl KvBackend for LeaderCachedKvStore {
&self.name
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
if !self.is_leader() {
return self.store.range(req).await;
@@ -173,6 +177,24 @@ impl KvBackend for LeaderCachedKvStore {
Ok(res)
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
if !self.is_leader() {
return self.store.batch_put(req).await;
}
let ver = self.create_new_version();
let res = self.store.batch_put(req.clone()).await?;
let _ = self.cache.batch_put(req.clone()).await?;
if !self.validate_version(ver) {
let keys = req.kvs.into_iter().map(|kv| kv.key).collect::<Vec<_>>();
self.invalid_keys(keys).await?;
}
Ok(res)
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
if !self.is_leader() {
return self.store.batch_get(req).await;
@@ -220,36 +242,6 @@ impl KvBackend for LeaderCachedKvStore {
Ok(merged_res)
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
if !self.is_leader() {
return self.store.batch_put(req).await;
}
let ver = self.create_new_version();
let res = self.store.batch_put(req.clone()).await?;
let _ = self.cache.batch_put(req.clone()).await?;
if !self.validate_version(ver) {
let keys = req.kvs.into_iter().map(|kv| kv.key).collect::<Vec<_>>();
self.invalid_keys(keys).await?;
}
Ok(res)
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
if !self.is_leader() {
return self.store.batch_delete(req).await;
}
let _ = self.create_new_version();
let res = self.store.batch_delete(req.clone()).await?;
let _ = self.cache.batch_delete(req).await?;
Ok(res)
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
if !self.is_leader() {
return self.store.compare_and_put(req).await;
@@ -279,6 +271,18 @@ impl KvBackend for LeaderCachedKvStore {
Ok(res)
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
if !self.is_leader() {
return self.store.batch_delete(req).await;
}
let _ = self.create_new_version();
let res = self.store.batch_delete(req.clone()).await?;
let _ = self.cache.batch_delete(req).await?;
Ok(res)
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
if !self.is_leader() {
return self.store.move_value(req).await;
@@ -297,10 +301,6 @@ impl KvBackend for LeaderCachedKvStore {
self.invalid_keys(vec![from_key, to_key]).await?;
Ok(res)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait::async_trait]

View File

@@ -97,6 +97,10 @@ impl KvBackend for EtcdStore {
"Etcd"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let Get { key, options } = req.try_into()?;
@@ -137,31 +141,6 @@ impl KvBackend for EtcdStore {
Ok(PutResponse { prev_kv })
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
let BatchGet { keys, options } = req.try_into()?;
let get_ops: Vec<_> = keys
.into_iter()
.map(|k| TxnOp::get(k, options.clone()))
.collect();
let txn_responses = self.do_multi_txn(get_ops).await?;
let mut kvs = vec![];
for txn_res in txn_responses {
for op_res in txn_res.op_responses() {
let get_res = match op_res {
TxnOpResponse::Get(get_res) => get_res,
_ => unreachable!(),
};
kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv));
}
}
Ok(BatchGetResponse { kvs })
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
let BatchPut { kvs, options } = req.try_into()?;
@@ -189,32 +168,29 @@ impl KvBackend for EtcdStore {
Ok(BatchPutResponse { prev_kvs })
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let BatchDelete { keys, options } = req.try_into()?;
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
let BatchGet { keys, options } = req.try_into()?;
let mut prev_kvs = Vec::with_capacity(keys.len());
let delete_ops = keys
let get_ops: Vec<_> = keys
.into_iter()
.map(|k| TxnOp::delete(k, options.clone()))
.collect::<Vec<_>>();
.map(|k| TxnOp::get(k, options.clone()))
.collect();
let txn_responses = self.do_multi_txn(delete_ops).await?;
let txn_responses = self.do_multi_txn(get_ops).await?;
let mut kvs = vec![];
for txn_res in txn_responses {
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Delete(delete_res) => {
delete_res.prev_kvs().iter().for_each(|kv| {
prev_kvs.push(KvPair::from_etcd_kv(kv));
});
}
let get_res = match op_res {
TxnOpResponse::Get(get_res) => get_res,
_ => unreachable!(),
}
};
kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv));
}
}
Ok(BatchDeleteResponse { prev_kvs })
Ok(BatchGetResponse { kvs })
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
@@ -286,6 +262,34 @@ impl KvBackend for EtcdStore {
})
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let BatchDelete { keys, options } = req.try_into()?;
let mut prev_kvs = Vec::with_capacity(keys.len());
let delete_ops = keys
.into_iter()
.map(|k| TxnOp::delete(k, options.clone()))
.collect::<Vec<_>>();
let txn_responses = self.do_multi_txn(delete_ops).await?;
for txn_res in txn_responses {
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Delete(delete_res) => {
delete_res.prev_kvs().iter().for_each(|kv| {
prev_kvs.push(KvPair::from_etcd_kv(kv));
});
}
_ => unreachable!(),
}
}
}
Ok(BatchDeleteResponse { prev_kvs })
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
let MoveValue {
from_key,
@@ -358,10 +362,6 @@ impl KvBackend for EtcdStore {
}
.fail()
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait::async_trait]

View File

@@ -65,6 +65,10 @@ impl KvBackend for KvBackendAdapter {
self.0.name()
}
fn as_any(&self) -> &dyn Any {
self.0.as_any()
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
self.0
.range(req)
@@ -89,6 +93,14 @@ impl KvBackend for KvBackendAdapter {
.context(MetaSrvSnafu)
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
self.0
.batch_get(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
async fn compare_and_put(
&self,
req: CompareAndPutRequest,
@@ -122,14 +134,6 @@ impl KvBackend for KvBackendAdapter {
.context(MetaSrvSnafu)
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
self.0
.batch_get(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error> {
self.0
.move_value(req)
@@ -137,8 +141,4 @@ impl KvBackend for KvBackendAdapter {
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
fn as_any(&self) -> &dyn Any {
self.0.as_any()
}
}

View File

@@ -41,7 +41,7 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
let in_memory = Arc::new(MemStore::new());
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
@@ -59,6 +59,7 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
};
Arc::new(RegionFailoverManager::new(
in_memory,
mailbox,
procedure_manager,
selector,

View File

@@ -349,6 +349,7 @@ async fn run_region_failover_procedure(
let procedure = RegionFailoverProcedure::new(
failed_region.clone(),
RegionFailoverContext {
in_memory: meta_srv.in_memory().clone(),
mailbox: meta_srv.mailbox().clone(),
selector,
selector_ctx: SelectorContext {