refactor: use downgrading the region instead of closing region (#2863)

* refactor: use downgrading the region instead of closing region

* feat: enhance the tests for alive keeper

* feat: add a metric to track region lease expired

* chore: apply suggestions from CR

* chore: enable logging for test_distributed_handle_ddl_request

* refactor: simplify lease keeper

* feat: add metrics for lease keeper

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: move OpeningRegionKeeper to common_meta

* feat: register operating regions to MemoryRegionKeeper
This commit is contained in:
Weny Xu
2023-12-12 18:24:17 +09:00
committed by GitHub
parent 89a0d3af1e
commit cf6bba09fd
30 changed files with 803 additions and 1215 deletions

1
Cargo.lock generated
View File

@@ -1799,6 +1799,7 @@ dependencies = [
"common-telemetry",
"common-time",
"datatypes",
"derive_builder 0.12.0",
"etcd-client",
"futures",
"humantime-serde",

View File

@@ -23,6 +23,7 @@ use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
@@ -396,6 +397,7 @@ impl StartCommand {
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(StandaloneTableMetadataCreator::new(kv_backend)),
Arc::new(MemoryRegionKeeper::default()),
)
.context(InitDdlManagerSnafu)?,
);

View File

@@ -24,6 +24,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true
humantime-serde.workspace = true

View File

@@ -23,6 +23,7 @@ use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::error::Result;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::router::RegionRoute;
@@ -70,4 +71,5 @@ pub struct DdlContext {
pub datanode_manager: DatanodeManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub memory_region_keeper: MemoryRegionKeeperRef,
}

View File

@@ -18,7 +18,10 @@ use api::v1::region::{
};
use api::v1::{ColumnDef, SemanticType};
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_error::ext::BoxedError;
use common_procedure::error::{
ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
@@ -35,8 +38,11 @@ use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
pub struct CreateTableProcedure {
pub context: DdlContext,
@@ -60,10 +66,18 @@ impl CreateTableProcedure {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(CreateTableProcedure {
context,
creator: TableCreator { data },
})
let mut creator = TableCreator {
data,
opening_regions: vec![],
};
creator
.register_opening_regions(&context)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(CreateTableProcedure { context, creator })
}
pub fn table_info(&self) -> &RawTableInfo {
@@ -169,6 +183,9 @@ impl CreateTableProcedure {
}
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
// Registers opening regions
self.creator.register_opening_regions(&self.context)?;
let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
@@ -226,7 +243,9 @@ impl CreateTableProcedure {
self.creator.data.state = CreateTableState::CreateMetadata;
Ok(Status::executing(true))
// Ensures the procedures after the crash start from the `DatanodeCreateRegions` stage.
// TODO(weny): Add more tests.
Ok(Status::executing(false))
}
async fn on_create_metadata(&self) -> Result<Status> {
@@ -282,7 +301,10 @@ impl Procedure for CreateTableProcedure {
}
pub struct TableCreator {
/// The serializable data.
pub data: CreateTableData,
/// The guards of opening.
pub opening_regions: Vec<OperatingRegionGuard>,
}
impl TableCreator {
@@ -294,8 +316,36 @@ impl TableCreator {
task,
region_routes,
},
opening_regions: vec![],
}
}
/// Register opening regions if doesn't exist.
pub fn register_opening_regions(&mut self, context: &DdlContext) -> Result<()> {
let region_routes = &self.data.region_routes;
let opening_regions = operating_leader_regions(region_routes);
if self.opening_regions.len() == opening_regions.len() {
return Ok(());
}
let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
for (region_id, datanode_id) in opening_regions {
let guard = context
.memory_region_keeper
.register(datanode_id, region_id)
.context(error::RegionOperatingRaceSnafu {
region_id,
peer_id: datanode_id,
})?;
opening_region_guards.push(guard);
}
self.opening_regions = opening_region_guards;
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]

View File

@@ -26,7 +26,7 @@ use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, info};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::engine::TableReference;
@@ -42,12 +42,19 @@ use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::DropTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
pub struct DropTableProcedure {
/// The context of procedure runtime.
pub context: DdlContext,
/// The serializable data.
pub data: DropTableData,
/// The guards of opening regions.
pub dropping_regions: Vec<OperatingRegionGuard>,
}
#[allow(dead_code)]
@@ -64,12 +71,17 @@ impl DropTableProcedure {
Self {
context,
data: DropTableData::new(cluster_id, task, table_route_value, table_info_value),
dropping_regions: vec![],
}
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
Ok(Self {
context,
data,
dropping_regions: vec![],
})
}
async fn on_prepare(&mut self) -> Result<Status> {
@@ -102,8 +114,42 @@ impl DropTableProcedure {
Ok(Status::executing(true))
}
/// Register dropping regions if doesn't exist.
fn register_dropping_regions(&mut self) -> Result<()> {
let region_routes = self.data.region_routes();
let dropping_regions = operating_leader_regions(region_routes);
if self.dropping_regions.len() == dropping_regions.len() {
return Ok(());
}
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
for (region_id, datanode_id) in dropping_regions {
let guard = self
.context
.memory_region_keeper
.register(datanode_id, region_id)
.context(error::RegionOperatingRaceSnafu {
region_id,
peer_id: datanode_id,
})?;
dropping_region_guards.push(guard);
}
self.dropping_regions = dropping_region_guards;
Ok(())
}
/// Removes the table metadata.
async fn on_remove_metadata(&mut self) -> Result<Status> {
// NOTES: If the meta server is crashed after the `RemoveMetadata`,
// Corresponding regions of this table on the Datanode will be closed automatically.
// Then any future dropping operation will fail.
// TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping.
let table_metadata_manager = &self.context.table_metadata_manager;
let table_info_value = &self.data.table_info_value;
let table_route_value = &self.data.table_route_value;

View File

@@ -37,6 +37,7 @@ use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::DdlTask::{AlterTable, CreateTable, DropTable, TruncateTable};
use crate::rpc::ddl::{
AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
@@ -52,6 +53,7 @@ pub struct DdlManager {
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
}
impl DdlManager {
@@ -62,6 +64,7 @@ impl DdlManager {
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
) -> Result<Self> {
let manager = Self {
procedure_manager,
@@ -69,6 +72,7 @@ impl DdlManager {
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
memory_region_keeper,
};
manager.register_loaders()?;
Ok(manager)
@@ -85,6 +89,7 @@ impl DdlManager {
datanode_manager: self.datanode_manager.clone(),
cache_invalidator: self.cache_invalidator.clone(),
table_metadata_manager: self.table_metadata_manager.clone(),
memory_region_keeper: self.memory_region_keeper.clone(),
}
}
@@ -446,6 +451,7 @@ mod tests {
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::rpc::router::RegionRoute;
use crate::state_store::KvStateStore;
@@ -488,6 +494,7 @@ mod tests {
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(DummyTableMetadataAllocator),
Arc::new(MemoryRegionKeeper::default()),
);
let expected_loaders = vec![

View File

@@ -19,10 +19,11 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::RegionNumber;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::peer::Peer;
use crate::DatanodeId;
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -31,6 +32,17 @@ pub enum Error {
#[snafu(display("Empty key is not allowed"))]
EmptyKey { location: Location },
#[snafu(display(
"Another procedure is operating the region: {} on peer: {}",
region_id,
peer_id
))]
RegionOperatingRace {
location: Location,
peer_id: DatanodeId,
region_id: RegionId,
},
#[snafu(display("Invalid result with a txn response: {}", err_msg))]
InvalidTxnResult { err_msg: String, location: Location },
@@ -291,7 +303,16 @@ impl ErrorExt for Error {
| SequenceOutOfRange { .. }
| UnexpectedSequenceValue { .. }
| InvalidHeartbeatResponse { .. }
| InvalidTxnResult { .. } => StatusCode::Unexpected,
| InvalidTxnResult { .. }
| EncodeJson { .. }
| DecodeJson { .. }
| PayloadNotExist { .. }
| ConvertRawKey { .. }
| DecodeProto { .. }
| BuildTableMeta { .. }
| TableRouteNotFound { .. }
| ConvertRawTableInfo { .. }
| RegionOperatingRace { .. } => StatusCode::Unexpected,
SendMessage { .. }
| GetKvCache { .. }
@@ -306,15 +327,6 @@ impl ErrorExt for Error {
TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
EncodeJson { .. }
| DecodeJson { .. }
| PayloadNotExist { .. }
| ConvertRawKey { .. }
| DecodeProto { .. }
| BuildTableMeta { .. }
| TableRouteNotFound { .. }
| ConvertRawTableInfo { .. } => StatusCode::Unexpected,
SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(),
RegisterProcedureLoader { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),

View File

@@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::fmt::Display;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use table::metadata::TableId;
use super::DeserializedValueWithBytes;
@@ -50,6 +51,7 @@ impl TableRouteValue {
}
}
/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
Self {
region_routes,
@@ -64,6 +66,14 @@ impl TableRouteValue {
pub fn version(&self) -> u64 {
self.version
}
/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
}
}
impl TableMetaKey for TableRouteKey {

View File

@@ -29,6 +29,7 @@ pub mod kv_backend;
pub mod metrics;
pub mod peer;
pub mod range_stream;
pub mod region_keeper;
pub mod rpc;
pub mod sequence;
pub mod state_store;

View File

@@ -0,0 +1,146 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::{Arc, RwLock};
use store_api::storage::RegionId;
use crate::DatanodeId;
/// Tracks the operating(i.e., creating, opening, dropping) regions.
#[derive(Debug, Clone)]
pub struct OperatingRegionGuard {
datanode_id: DatanodeId,
region_id: RegionId,
inner: Arc<RwLock<HashSet<(DatanodeId, RegionId)>>>,
}
impl Drop for OperatingRegionGuard {
fn drop(&mut self) {
let mut inner = self.inner.write().unwrap();
inner.remove(&(self.datanode_id, self.region_id));
}
}
impl OperatingRegionGuard {
/// Returns opening region info.
pub fn info(&self) -> (DatanodeId, RegionId) {
(self.datanode_id, self.region_id)
}
}
pub type MemoryRegionKeeperRef = Arc<MemoryRegionKeeper>;
/// Tracks regions in memory.
///
/// It used in following cases:
/// - Tracks the opening regions before the corresponding metadata is created.
/// - Tracks the deleting regions after the corresponding metadata is deleted.
#[derive(Debug, Clone, Default)]
pub struct MemoryRegionKeeper {
inner: Arc<RwLock<HashSet<(DatanodeId, RegionId)>>>,
}
impl MemoryRegionKeeper {
pub fn new() -> Self {
Default::default()
}
/// Returns [OpeningRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist.
pub fn register(
&self,
datanode_id: DatanodeId,
region_id: RegionId,
) -> Option<OperatingRegionGuard> {
let mut inner = self.inner.write().unwrap();
if inner.insert((datanode_id, region_id)) {
Some(OperatingRegionGuard {
datanode_id,
region_id,
inner: self.inner.clone(),
})
} else {
None
}
}
/// Returns true if the keeper contains a (`datanoe_id`, `region_id`) tuple.
pub fn contains(&self, datanode_id: DatanodeId, region_id: RegionId) -> bool {
let inner = self.inner.read().unwrap();
inner.contains(&(datanode_id, region_id))
}
/// Returns a set of filtered out regions that are opening.
pub fn filter_opening_regions(
&self,
datanode_id: DatanodeId,
mut region_ids: HashSet<RegionId>,
) -> HashSet<RegionId> {
let inner = self.inner.read().unwrap();
region_ids.retain(|region_id| !inner.contains(&(datanode_id, *region_id)));
region_ids
}
/// Returns number of element in tracking set.
pub fn len(&self) -> usize {
let inner = self.inner.read().unwrap();
inner.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use store_api::storage::RegionId;
use crate::region_keeper::MemoryRegionKeeper;
#[test]
fn test_opening_region_keeper() {
let keeper = MemoryRegionKeeper::new();
let guard = keeper.register(1, RegionId::from_u64(1)).unwrap();
assert!(keeper.register(1, RegionId::from_u64(1)).is_none());
let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap();
let output = keeper.filter_opening_regions(
1,
HashSet::from([
RegionId::from_u64(1),
RegionId::from_u64(2),
RegionId::from_u64(3),
]),
);
assert_eq!(output.len(), 1);
assert!(output.contains(&RegionId::from_u64(3)));
assert_eq!(keeper.len(), 2);
drop(guard);
assert_eq!(keeper.len(), 1);
assert!(keeper.contains(1, RegionId::from_u64(2)));
drop(guard2);
assert!(keeper.is_empty());
}
}

View File

@@ -18,6 +18,7 @@ use api::v1::meta::{
Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
TableRoute as PbTableRoute,
};
use derive_builder::Builder;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use snafu::OptionExt;
@@ -27,6 +28,7 @@ use crate::error::{self, Result};
use crate::key::RegionDistribution;
use crate::peer::Peer;
use crate::table_name::TableName;
use crate::DatanodeId;
pub fn region_distribution(region_routes: &[RegionRoute]) -> Result<RegionDistribution> {
let mut regions_id_map = RegionDistribution::new();
@@ -58,6 +60,19 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
.collect()
}
/// Returns the operating leader regions with corresponding [DatanodeId].
pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
region_routes
.iter()
.filter_map(|route| {
route
.leader_peer
.as_ref()
.map(|leader| (route.region.id, leader.id))
})
.collect::<Vec<_>>()
}
/// Returns the HashMap<[RegionNumber], &[Peer]>;
///
/// If the region doesn't have a leader peer, the [Region] will be omitted.
@@ -231,12 +246,15 @@ impl From<Table> for PbTable {
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
pub struct RegionRoute {
pub region: Region,
#[builder(setter(into, strip_option))]
pub leader_peer: Option<Peer>,
#[builder(setter(into), default)]
pub follower_peers: Vec<Peer>,
/// `None` by default.
#[builder(setter(into, strip_option), default)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub leader_status: Option<RegionStatus>,
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -40,20 +39,14 @@ use crate::error::{self, Result};
use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
use crate::region_server::RegionServer;
const MAX_CLOSE_RETRY_TIMES: usize = 10;
/// [RegionAliveKeeper] manages all `CountdownTaskHandles`.
/// [RegionAliveKeeper] manages all [CountdownTaskHandle]s.
///
/// [RegionAliveKeeper] starts a `CountdownTask` for each region. When deadline is reached,
/// the region will be closed.
/// [RegionAliveKeeper] starts a [CountdownTask] for each region. When the deadline is reached,
/// the status of region be set to "readonly", ensures there is no side-effect in the entity system.
///
/// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its
/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this
/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to
/// countdown.
///
/// On each lease extension, [RegionAliveKeeper] will reset the deadline to the corresponding time, and
/// set region's status to "writable".
/// The deadline is controlled by the meta server. Datanode will send its opened regions info to meta sever
/// via heartbeat. If the meta server decides some region could be resided in this Datanode,
/// it will renew the lease of region, a deadline of [CountdownTask] will be reset.
pub struct RegionAliveKeeper {
region_server: RegionServer,
tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
@@ -61,13 +54,14 @@ pub struct RegionAliveKeeper {
started: Arc<AtomicBool>,
/// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing
/// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically
/// non-decreasing). The heartbeat request will carry the duration since this epoch, and the
/// elapsed time when submitting heartbeats to the meta server (because [Instant] is monotonically
/// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the
/// duration acts like an "invariant point" for region's keep alive lease.
epoch: Instant,
}
impl RegionAliveKeeper {
/// Returns an empty [RegionAliveKeeper].
pub fn new(region_server: RegionServer, heartbeat_interval_millis: u64) -> Self {
Self {
region_server,
@@ -82,26 +76,16 @@ impl RegionAliveKeeper {
self.tasks.lock().await.get(&region_id).cloned()
}
/// Add the countdown task for a specific region.
/// It will be ignored if the task exists.
pub async fn register_region(&self, region_id: RegionId) {
if self.find_handle(region_id).await.is_some() {
return;
}
let tasks = Arc::downgrade(&self.tasks);
let on_task_finished = async move {
if let Some(x) = tasks.upgrade() {
let _ = x.lock().await.remove(&region_id);
} // Else the countdown task handles map could be dropped because the keeper is dropped.
};
let handle = Arc::new(CountdownTaskHandle::new(
self.region_server.clone(),
region_id,
move |result: Option<bool>| {
info!(
"Deregister region: {region_id} after countdown task finished, result: {result:?}",
);
on_task_finished
},
));
let mut handles = self.tasks.lock().await;
@@ -118,13 +102,15 @@ impl RegionAliveKeeper {
}
}
/// Removes the countdown task for a specific region.
pub async fn deregister_region(&self, region_id: RegionId) {
if self.tasks.lock().await.remove(&region_id).is_some() {
info!("Deregister alive countdown for region {region_id}")
}
}
async fn keep_lived(&self, regions: &[GrantedRegion], deadline: Instant) {
/// Renews the lease of regions to `deadline`.
async fn renew_region_leases(&self, regions: &[GrantedRegion], deadline: Instant) {
for region in regions {
let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
if let Some(handle) = self.find_handle(region_id).await {
@@ -138,6 +124,25 @@ impl RegionAliveKeeper {
}
}
async fn close_staled_region(&self, region_id: RegionId) {
info!("Closing staled region: {region_id}");
let request = RegionRequest::Close(RegionCloseRequest {});
if let Err(e) = self.region_server.handle_request(region_id, request).await {
if e.status_code() != StatusCode::RegionNotFound {
let _ = self.region_server.set_writable(region_id, false);
error!(e; "Failed to close staled region {}, set region to readonly.",region_id);
}
}
}
/// Closes staled regions.
async fn close_staled_regions(&self, regions: &[u64]) {
for region_id in regions {
self.close_staled_region(RegionId::from_u64(*region_id))
.await;
}
}
#[cfg(test)]
async fn deadline(&self, region_id: RegionId) -> Option<Instant> {
let mut deadline = None;
@@ -243,7 +248,11 @@ impl HeartbeatResponseHandler for RegionAliveKeeper {
let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
self.keep_lived(&region_lease.regions, deadline).await;
self.renew_region_leases(&region_lease.regions, deadline)
.await;
self.close_staled_regions(&region_lease.closeable_region_ids)
.await;
Ok(HandleControl::Continue)
}
}
@@ -269,19 +278,7 @@ struct CountdownTaskHandle {
impl CountdownTaskHandle {
/// Creates a new [CountdownTaskHandle] and starts the countdown task.
/// # Params
/// - `on_task_finished`: a callback to be invoked when the task is finished. Note that it will not
/// be invoked if the task is cancelled (by dropping the handle). This is because we want something
/// meaningful to be done when the task is finished, e.g. deregister the handle from the map.
/// While dropping the handle does not necessarily mean the task is finished.
fn new<Fut>(
region_server: RegionServer,
region_id: RegionId,
on_task_finished: impl FnOnce(Option<bool>) -> Fut + Send + 'static,
) -> Self
where
Fut: Future<Output = ()> + Send,
{
fn new(region_server: RegionServer, region_id: RegionId) -> Self {
let (tx, rx) = mpsc::channel(1024);
let mut countdown_task = CountdownTask {
@@ -290,8 +287,7 @@ impl CountdownTaskHandle {
rx,
};
let handler = common_runtime::spawn_bg(async move {
let result = countdown_task.run().await;
on_task_finished(result).await;
countdown_task.run().await;
});
Self {
@@ -301,6 +297,8 @@ impl CountdownTaskHandle {
}
}
/// Starts the [CountdownTask],
/// it will be ignored if the task started.
async fn start(&self, heartbeat_interval_millis: u64) {
if let Err(e) = self
.tx
@@ -354,8 +352,7 @@ struct CountdownTask {
}
impl CountdownTask {
// returns true if region closed successfully
async fn run(&mut self) -> Option<bool> {
async fn run(&mut self) {
// 30 years. See `Instant::far_future`.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
@@ -363,37 +360,30 @@ impl CountdownTask {
// "start countdown" command will be sent from heartbeat task).
let countdown = tokio::time::sleep_until(far_future);
tokio::pin!(countdown);
let region_id = self.region_id;
let mut started = false;
loop {
tokio::select! {
command = self.rx.recv() => {
match command {
Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
// Set first deadline in 4 heartbeats (roughly after 20 seconds from now if heartbeat
// interval is set to default 5 seconds), to make Datanode and Metasrv more tolerable to
// network or other jitters during startup.
let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
countdown.set(tokio::time::sleep_until(first_deadline));
if !started {
// Set first deadline in 4 heartbeats (roughly after 12 seconds from now if heartbeat
// interval is set to default 3 seconds), to make Datanode and Metasrv more tolerable to
// network or other jitters during startup.
let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
countdown.set(tokio::time::sleep_until(first_deadline));
started = true;
}
},
Some(CountdownCommand::Reset((role, deadline))) => {
// The first-time granted regions might be ignored because the `first_deadline` is larger than the `region_lease_timeout`.
// Therefore, we set writable at the outside.
// TODO(weny): Considers setting `first_deadline` to `region_lease_timeout`.
let _ = self.region_server.set_writable(self.region_id, role.writable());
if countdown.deadline() < deadline {
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later",
(deadline - Instant::now()).as_secs_f32(),
);
countdown.set(tokio::time::sleep_until(deadline));
}
// Else the countdown could be either:
// - not started yet;
// - during startup protection;
// - received a lagging heartbeat message.
// All can be safely ignored.
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later.",
(deadline - Instant::now()).as_secs_f32(),
);
countdown.set(tokio::time::sleep_until(deadline));
},
None => {
info!(
@@ -409,130 +399,123 @@ impl CountdownTask {
}
}
() = &mut countdown => {
let result = self.close_region().await;
info!(
"Region {region_id} is closed, result: {result:?}. \
RegionAliveKeeper out.",
);
return Some(result);
warn!("The region {region_id} lease is expired, set region to readonly.");
let _ = self.region_server.set_writable(self.region_id, false);
// resets the countdown.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
countdown.as_mut().reset(far_future);
}
}
}
None
}
/// Returns if the region is closed successfully.
async fn close_region(&self) -> bool {
for retry in 0..MAX_CLOSE_RETRY_TIMES {
let request = RegionRequest::Close(RegionCloseRequest {});
match self
.region_server
.handle_request(self.region_id, request)
.await
{
Ok(_) => return true,
Err(e) if e.status_code() == StatusCode::RegionNotFound => return true,
// If region is failed to close, immediately retry. Maybe we should panic instead?
Err(e) => error!(e;
"Retry {retry}, failed to close region {}. \
For the integrity of data, retry closing and retry without wait.",
self.region_id,
),
}
}
false
}
}
#[cfg(test)]
mod test {
use api::v1::meta::RegionRole;
use mito2::config::MitoConfig;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::region_engine::RegionEngine;
use super::*;
use crate::tests::mock_region_server;
#[tokio::test(flavor = "multi_thread")]
async fn region_alive_keeper() {
let region_server = mock_region_server();
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server, 300));
let region_id = RegionId::new(1, 2);
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper");
let engine = Arc::new(engine_env.create_engine(MitoConfig::default()).await);
region_server.register_engine(engine.clone());
// register a region before starting
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), 100));
let region_id = RegionId::new(1024, 1);
let builder = CreateRequestBuilder::new();
region_server
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();
region_server.set_writable(region_id, true).unwrap();
// Register a region before starting.
alive_keeper.register_region(region_id).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
info!("Start the keeper");
alive_keeper.start(None).await.unwrap();
// started alive keeper should assign deadline to this region
// The started alive keeper should assign deadline to this region.
let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline >= Instant::now());
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
// extend lease then sleep
info!("Wait for lease expired");
// Sleep to wait lease expired.
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
info!("Renew the region lease");
// Renew lease then sleep.
alive_keeper
.keep_lived(
.renew_region_leases(
&[GrantedRegion {
region_id: region_id.as_u64(),
role: RegionRole::Leader.into(),
role: api::v1::meta::RegionRole::Leader.into(),
}],
Instant::now() + Duration::from_millis(500),
Instant::now() + Duration::from_millis(200),
)
.await;
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline >= Instant::now());
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
// sleep to wait lease expired
tokio::time::sleep(Duration::from_millis(1000)).await;
assert!(alive_keeper.find_handle(region_id).await.is_none());
info!("Wait for lease expired");
// Sleep to wait lease expired.
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline > Instant::now() + Duration::from_secs(86400 * 365 * 29));
}
#[tokio::test(flavor = "multi_thread")]
async fn countdown_task() {
let region_server = mock_region_server();
let (tx, rx) = oneshot::channel();
let countdown_handle = CountdownTaskHandle::new(region_server, RegionId::new(9999, 2));
let countdown_handle = CountdownTaskHandle::new(
region_server,
RegionId::new(9999, 2),
|result: Option<bool>| async move {
tx.send((Instant::now(), result)).unwrap();
},
);
// if countdown task is not started, its deadline is set to far future
// If countdown task is not started, its deadline is set to far future.
assert!(
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_secs(86400 * 365 * 29)
);
// the first deadline should be set to 4 * heartbeat_interval_millis
// we assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test
// The first deadline should be set to 4 * heartbeat_interval_millis.
// We assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test.
let heartbeat_interval_millis = 100;
countdown_handle.start(heartbeat_interval_millis).await;
assert!(
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
);
tokio::time::sleep(Duration::from_millis(heartbeat_interval_millis * 5)).await;
// reset deadline
// a nearer deadline will be ignored
countdown_handle
.reset_deadline(
RegionRole::Leader.into(),
Instant::now() + Duration::from_millis(heartbeat_interval_millis),
)
.await;
// No effect.
countdown_handle.start(heartbeat_interval_millis).await;
assert!(
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
> Instant::now() + Duration::from_secs(86400 * 365 * 29)
);
// only a farther deadline will be accepted
// Reset deadline.
countdown_handle
.reset_deadline(
RegionRole::Leader.into(),
RegionRole::Leader,
Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
)
.await;
@@ -540,16 +523,5 @@ mod test {
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4)
);
// wait for countdown task to finish
let before_await = Instant::now();
let (finish_instant, result) = rx.await.unwrap();
// it returns `RegionNotFound`
assert_eq!(result, Some(true));
// this task should be finished after 5 * heartbeat_interval_millis
// we assert 4 times here
assert!(
finish_instant > before_await + Duration::from_millis(heartbeat_interval_millis * 4)
);
}
}

View File

@@ -19,6 +19,7 @@ use prometheus::*;
pub const REGION_REQUEST_TYPE: &str = "datanode_region_request_type";
pub const REGION_ROLE: &str = "region_role";
pub const REGION_ID: &str = "region_id";
lazy_static! {
/// The elapsed time of handling a request in the region_server.
@@ -34,6 +35,12 @@ lazy_static! {
"last received heartbeat lease elapsed",
)
.unwrap();
pub static ref LEASE_EXPIRED_REGION: IntGaugeVec = register_int_gauge_vec!(
"lease_expired_region",
"lease expired region",
&[REGION_ID]
)
.unwrap();
/// The received region leases via heartbeat.
pub static ref HEARTBEAT_REGION_LEASES: IntGaugeVec = register_int_gauge_vec!(
"heartbeat_region_leases",

View File

@@ -12,70 +12,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::info;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use store_api::region_engine::GrantedRegion;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::lease_keeper::{OpeningRegionKeeperRef, RegionLeaseKeeperRef};
use crate::region::lease_keeper::{RegionLeaseKeeperRef, RenewRegionLeasesResponse};
use crate::region::RegionLeaseKeeper;
pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,
opening_region_keeper: OpeningRegionKeeperRef,
}
impl RegionLeaseHandler {
pub fn new(
region_lease_seconds: u64,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
memory_region_keeper: MemoryRegionKeeperRef,
) -> Self {
let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager);
let region_lease_keeper =
RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
Self {
region_lease_seconds,
region_lease_keeper: Arc::new(region_lease_keeper),
opening_region_keeper,
}
}
}
fn flip_role(role: RegionRole) -> RegionRole {
match role {
RegionRole::Follower => RegionRole::Leader,
RegionRole::Leader => RegionRole::Follower,
}
}
/// Grants lease of regions.
///
/// - If a region is in an `operable` set, it will be granted an `flip_role(current)`([RegionRole]);
/// otherwise, it will be granted a `current`([RegionRole]).
/// - If a region is in a `closeable` set, it won't be granted.
fn grant(
granted_regions: &mut Vec<GrantedRegion>,
operable: &HashSet<RegionId>,
closeable: &HashSet<RegionId>,
regions: &[RegionId],
current: RegionRole,
) {
for region in regions {
if operable.contains(region) {
granted_regions.push(GrantedRegion::new(*region, flip_role(current)));
} else if closeable.contains(region) {
// Filters out the closeable regions.
} else {
granted_regions.push(GrantedRegion::new(*region, current))
}
}
}
@@ -99,76 +66,33 @@ impl HeartbeatHandler for RegionLeaseHandler {
let regions = stat.regions();
let cluster_id = stat.cluster_id;
let datanode_id = stat.id;
let mut granted_regions = Vec::with_capacity(regions.len());
let mut inactive_regions = HashSet::new();
let (leaders, followers): (Vec<_>, Vec<_>) = regions
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = self
.region_lease_keeper
.renew_region_leases(cluster_id, datanode_id, &regions)
.await?;
let renewed = renewed
.into_iter()
.map(|(id, role)| match role {
RegionRole::Follower => (None, Some(id)),
RegionRole::Leader => (Some(id), None),
.map(|(region_id, region_role)| {
GrantedRegion {
region_id,
region_role,
}
.into()
})
.unzip();
.collect::<Vec<_>>();
let leaders = leaders.into_iter().flatten().collect::<Vec<_>>();
let (downgradable, closeable) = self
.region_lease_keeper
.find_staled_leader_regions(cluster_id, datanode_id, &leaders)
.await?;
grant(
&mut granted_regions,
&downgradable,
&closeable,
&leaders,
RegionRole::Leader,
);
if !closeable.is_empty() {
info!(
"Granting region lease, found closeable leader regions: {:?} on datanode {}",
closeable, datanode_id
);
}
inactive_regions.extend(closeable);
let followers = followers.into_iter().flatten().collect::<Vec<_>>();
let (upgradeable, closeable) = self
.region_lease_keeper
.find_staled_follower_regions(cluster_id, datanode_id, &followers)
.await?;
// If a region is opening, it will be filtered out from the closeable regions set.
let closeable = self
.opening_region_keeper
.filter_opening_regions(datanode_id, closeable);
grant(
&mut granted_regions,
&upgradeable,
&closeable,
&followers,
RegionRole::Follower,
);
if !closeable.is_empty() {
info!(
"Granting region lease, found closeable follower regions {:?} on datanode {}",
closeable, datanode_id
);
}
inactive_regions.extend(closeable);
acc.inactive_region_ids = inactive_regions;
acc.region_lease = Some(RegionLease {
regions: granted_regions
.into_iter()
.map(Into::into)
.collect::<Vec<_>>(),
regions: renewed,
duration_since_epoch: req.duration_since_epoch,
lease_seconds: self.region_lease_seconds,
closeable_region_ids: vec![],
closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
});
acc.inactive_region_ids = non_exists;
Ok(HandleControl::Continue)
}
@@ -176,7 +100,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::distributed_time_constants;
@@ -184,20 +108,22 @@ mod test {
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{Region, RegionRoute, RegionStatus};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::region::lease_keeper::OpeningRegionKeeper;
fn new_test_keeper() -> RegionLeaseKeeper {
let store = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
RegionLeaseKeeper::new(table_metadata_manager)
let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper)
}
fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
@@ -260,7 +186,7 @@ mod test {
..Default::default()
};
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
@@ -272,6 +198,10 @@ mod test {
assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
assert_eq!(
acc.region_lease.as_ref().unwrap().closeable_region_ids,
vec![another_region_id]
);
let acc = &mut HeartbeatAccumulator::default();
@@ -297,6 +227,10 @@ mod test {
vec![GrantedRegion::new(region_id, RegionRole::Follower)],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
assert_eq!(
acc.region_lease.as_ref().unwrap().closeable_region_ids,
vec![another_region_id]
);
let opening_region_id = RegionId::new(table_id, region_number + 2);
let _guard = opening_region_keeper
@@ -331,6 +265,10 @@ mod test {
],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
assert_eq!(
acc.region_lease.as_ref().unwrap().closeable_region_ids,
vec![another_region_id]
);
}
#[tokio::test]

View File

@@ -25,6 +25,7 @@ use common_grpc::channel_manager;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::sequence::SequenceRef;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
@@ -46,7 +47,6 @@ use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::region::lease_keeper::OpeningRegionKeeperRef;
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
@@ -233,7 +233,7 @@ pub struct MetaSrv {
mailbox: MailboxRef,
ddl_executor: DdlTaskExecutorRef,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
plugins: Plugins,
@@ -396,8 +396,8 @@ impl MetaSrv {
&self.table_metadata_manager
}
pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef {
&self.opening_region_keeper
pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
&self.memory_region_keeper
}
pub fn publish(&self) -> Option<PublishRef> {

View File

@@ -26,6 +26,7 @@ use common_meta::distributed_time_constants;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::sequence::Sequence;
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
@@ -55,7 +56,6 @@ use crate::metasrv::{
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::pubsub::PublishRef;
use crate::region::lease_keeper::OpeningRegionKeeper;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvBackend};
@@ -211,6 +211,8 @@ impl MetaSrvBuilder {
))
});
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
let ddl_manager = build_ddl_manager(
&options,
datanode_manager,
@@ -218,8 +220,8 @@ impl MetaSrvBuilder {
&mailbox,
&table_metadata_manager,
table_metadata_allocator,
&opening_region_keeper,
)?;
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());
let handler_group = match handler_group {
Some(handler_group) => handler_group,
@@ -307,7 +309,7 @@ impl MetaSrvBuilder {
)
.await,
plugins: plugins.unwrap_or_else(Plugins::default),
opening_region_keeper,
memory_region_keeper: opening_region_keeper,
})
}
}
@@ -350,6 +352,7 @@ fn build_ddl_manager(
mailbox: &MailboxRef,
table_metadata_manager: &TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
memory_region_keeper: &MemoryRegionKeeperRef,
) -> Result<DdlManagerRef> {
let datanode_clients = datanode_clients.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
@@ -376,6 +379,7 @@ fn build_ddl_manager(
cache_invalidator,
table_metadata_manager.clone(),
table_metadata_allocator,
memory_region_keeper.clone(),
)
.context(error::InitDdlManagerSnafu)?,
))

View File

@@ -39,18 +39,6 @@ lazy_static! {
pub static ref METRIC_META_LEADER_CACHED_KV_LOAD_ELAPSED: HistogramVec =
register_histogram_vec!("meta_leader_cache_kv_load", "meta load cache", &["prefix"])
.unwrap();
/// Elapsed time to load follower region metadata.
pub static ref METRIC_META_LOAD_FOLLOWER_METADATA_ELAPSED: Histogram = register_histogram!(
"meta_load_follower_metadata_elapsed",
"meta load follower regions metadata elapsed"
)
.unwrap();
/// Elapsed time to load leader region metadata.
pub static ref METRIC_META_LOAD_LEADER_METADATA_ELAPSED: Histogram = register_histogram!(
"meta_load_leader_metadata_elapsed",
"meta load leader regions metadata elapsed"
)
.unwrap();
/// Meta kv cache hit counter.
pub static ref METRIC_META_KV_CACHE_HIT: IntCounterVec =
register_int_counter_vec!("meta_kv_cache_hit", "meta kv cache hit", &["op"]).unwrap();

View File

@@ -32,6 +32,7 @@ use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_meta::ClusterId;
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
@@ -45,7 +46,6 @@ use tokio::time::Instant;
use self::migration_start::RegionMigrationStart;
use crate::error::{self, Error, Result};
use crate::procedure::utils::region_lock_key;
use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef};
use crate::service::mailbox::{BroadcastChannel, MailboxRef};
/// It's shared in each step and available even after recovering.
@@ -84,7 +84,7 @@ pub struct VolatileContext {
/// `opening_region_guard` should be consumed after
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
opening_region_guard: Option<OpeningRegionGuard>,
opening_region_guard: Option<OperatingRegionGuard>,
/// `table_route` is stored via previous steps for future use.
table_route: Option<DeserializedValueWithBytes<TableRouteValue>>,
/// `table_info` is stored via previous steps for future use.
@@ -126,7 +126,7 @@ pub trait ContextFactory {
pub struct ContextFactoryImpl {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
opening_region_keeper: MemoryRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
}
@@ -151,7 +151,7 @@ pub struct Context {
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
opening_region_keeper: MemoryRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
}

View File

@@ -23,6 +23,7 @@ use common_meta::instruction::{
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::rpc::router::RegionRoute;
use common_meta::sequence::Sequence;
use common_meta::DatanodeId;
@@ -43,7 +44,6 @@ use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeader
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::PersistentContext;
use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef};
use crate::service::mailbox::{Channel, MailboxRef};
pub type MockHeartbeatReceiver = Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>;
@@ -83,7 +83,7 @@ impl MailboxContext {
pub struct TestingEnv {
table_metadata_manager: TableMetadataManagerRef,
mailbox_ctx: MailboxContext,
opening_region_keeper: OpeningRegionKeeperRef,
opening_region_keeper: MemoryRegionKeeperRef,
server_addr: String,
}
@@ -96,7 +96,7 @@ impl TestingEnv {
let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 1, kv_backend.clone());
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
Self {
table_metadata_manager,
@@ -127,8 +127,8 @@ impl TestingEnv {
&self.table_metadata_manager
}
/// Returns the [OpeningRegionKeeperRef]
pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef {
/// Returns the [MemoryRegionKeeperRef]
pub fn opening_region_keeper(&self) -> &MemoryRegionKeeperRef {
&self.opening_region_keeper
}

View File

@@ -174,6 +174,7 @@ mod tests {
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{Region, RegionRoute, RegionStatus};
use store_api::storage::RegionId;
@@ -182,7 +183,6 @@ mod tests {
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
use crate::region::lease_keeper::OpeningRegionKeeper;
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
@@ -301,7 +301,7 @@ mod tests {
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let opening_keeper = OpeningRegionKeeper::default();
let opening_keeper = MemoryRegionKeeper::default();
let table_id = 1024;
let table_info = new_test_table_info(table_id, vec![1]).into();
@@ -448,7 +448,7 @@ mod tests {
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let opening_keeper = OpeningRegionKeeper::default();
let opening_keeper = MemoryRegionKeeper::default();
let table_id = 1024;
let table_info = new_test_table_info(table_id, vec![1]).into();

View File

@@ -217,7 +217,7 @@ async fn test_on_datanode_create_regions() {
});
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: true }));
assert!(matches!(status, Status::Executing { persist: false }));
assert!(matches!(
procedure.creator.data.state,
CreateTableState::CreateMetadata

View File

@@ -115,6 +115,7 @@ pub mod test_data {
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::RegionRoute;
use common_meta::sequence::Sequence;
use datatypes::prelude::ConcreteDataType;
@@ -205,6 +206,7 @@ pub mod test_data {
},
)),
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
}
}
}

View File

@@ -12,38 +12,78 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod mito;
pub mod utils;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::rpc::router::RegionRoute;
use common_meta::DatanodeId;
use common_telemetry::warn;
use snafu::ResultExt;
use store_api::region_engine::RegionRole;
use store_api::storage::{RegionId, TableId};
use self::mito::find_staled_leader_regions;
use crate::error::{self, Result};
use crate::metrics;
use crate::region::lease_keeper::utils::find_staled_follower_regions;
pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
/// Renews lease of regions.
pub struct RegionLeaseKeeper {
table_metadata_manager: TableMetadataManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
}
/// The result of region lease renewal,
/// contains the renewed region leases and [RegionId] of non-existing regions.
pub struct RenewRegionLeasesResponse {
pub renewed: HashMap<RegionId, RegionRole>,
pub non_exists: HashSet<RegionId>,
}
impl RegionLeaseKeeper {
pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self {
pub fn new(
table_metadata_manager: TableMetadataManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
) -> Self {
Self {
table_metadata_manager,
memory_region_keeper,
}
}
}
fn renew_region_lease_via_region_route(
region_route: &RegionRoute,
datanode_id: DatanodeId,
region_id: RegionId,
) -> Option<(RegionId, RegionRole)> {
// If it's a leader region on this datanode.
if let Some(leader) = &region_route.leader_peer {
if leader.id == datanode_id {
let region_role = if region_route.is_leader_downgraded() {
RegionRole::Follower
} else {
RegionRole::Leader
};
return Some((region_id, region_role));
}
}
// If it's a follower region on this datanode.
if region_route
.follower_peers
.iter()
.any(|peer| peer.id == datanode_id)
{
return Some((region_id, RegionRole::Follower));
}
// The region doesn't belong to this datanode.
None
}
impl RegionLeaseKeeper {
fn collect_tables(&self, datanode_regions: &[RegionId]) -> HashMap<TableId, Vec<RegionId>> {
let mut tables = HashMap::<TableId, Vec<RegionId>>::new();
@@ -73,100 +113,68 @@ impl RegionLeaseKeeper {
Ok(metadata_subset)
}
/// Returns downgradable regions, and closeable regions.
///
/// - Downgradable regions:
/// Region's peer(`datanode_id`) is the corresponding downgraded leader peer in `region_routes`.
///
/// - closeable regions:
/// - It returns a region if it's peer(`datanode_id`) isn't the corresponding leader peer in `region_routes`.
/// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions.
/// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions.
/// - It returns a region if the region's table metadata is not found.
pub async fn find_staled_leader_regions(
/// Returns [None] if specific region doesn't belong to the datanode.
fn renew_region_lease(
&self,
_cluster_id: u64,
datanode_id: u64,
datanode_regions: &[RegionId],
) -> Result<(HashSet<RegionId>, HashSet<RegionId>)> {
let tables = self.collect_tables(datanode_regions);
let table_ids = tables.keys().copied().collect::<Vec<_>>();
table_metadata: &HashMap<TableId, TableRouteValue>,
datanode_id: DatanodeId,
region_id: RegionId,
role: RegionRole,
) -> Option<(RegionId, RegionRole)> {
// Renews the lease if it's a opening region or deleting region.
if self.memory_region_keeper.contains(datanode_id, region_id) {
return Some((region_id, role));
}
let metadata_subset = {
let _timer = metrics::METRIC_META_LOAD_LEADER_METADATA_ELAPSED.start_timer();
self.collect_tables_metadata(&table_ids).await?
};
let mut closeable_set = HashSet::new();
let mut downgradable_set = HashSet::new();
for (table_id, regions) in tables {
if let Some(metadata) = metadata_subset.get(&table_id) {
let region_routes = &metadata.region_routes;
let (downgradable, closeable) =
find_staled_leader_regions(datanode_id, &regions, region_routes);
downgradable_set.extend(downgradable);
closeable_set.extend(closeable);
} else {
warn!(
"The table {} metadata is not found, appends closeable leader regions: {:?}",
table_id, regions
);
// If table metadata is not found.
closeable_set.extend(regions);
if let Some(table_route) = table_metadata.get(&region_id.table_id()) {
if let Some(region_route) = table_route.region_route(region_id) {
return renew_region_lease_via_region_route(&region_route, datanode_id, region_id);
}
}
Ok((downgradable_set, closeable_set))
None
}
/// Returns upgradable regions, and closeable regions.
/// Renews the lease of regions for specific datanode.
///
/// Upgradable regions:
/// - Region's peer(`datanode_id`) is the corresponding leader peer in `region_routes`.
/// The lease of regions will be renewed if:
/// - The region of the specific datanode exists in [TableRouteValue].
/// - The region of the specific datanode is opening.
///
/// closeable regions:
/// - Region's peer(`datanode_id`) isn't the corresponding leader/follower peer in `region_routes`.
/// - Region's table metadata is not found.
pub async fn find_staled_follower_regions(
/// Otherwise the lease of regions will not be renewed,
/// and corresponding regions will be added to `non_exists` of [RenewRegionLeasesResponse].
pub async fn renew_region_leases(
&self,
_cluster_id: u64,
datanode_id: u64,
datanode_regions: &[RegionId],
) -> Result<(HashSet<RegionId>, HashSet<RegionId>)> {
let tables = self.collect_tables(datanode_regions);
datanode_id: DatanodeId,
regions: &[(RegionId, RegionRole)],
) -> Result<RenewRegionLeasesResponse> {
let region_ids = regions
.iter()
.map(|(region_id, _)| *region_id)
.collect::<Vec<_>>();
let tables = self.collect_tables(&region_ids);
let table_ids = tables.keys().copied().collect::<Vec<_>>();
let table_metadata = self.collect_tables_metadata(&table_ids).await?;
let metadata_subset = {
let _timer = metrics::METRIC_META_LOAD_FOLLOWER_METADATA_ELAPSED.start_timer();
self.collect_tables_metadata(&table_ids).await?
};
let mut renewed = HashMap::new();
let mut non_exists = HashSet::new();
let mut upgradable_set = HashSet::new();
let mut closeable_set = HashSet::new();
for (table_id, regions) in tables {
if let Some(metadata) = metadata_subset.get(&table_id) {
let region_routes = &metadata.region_routes;
let (upgradable, closeable) =
find_staled_follower_regions(datanode_id, &regions, region_routes);
upgradable_set.extend(upgradable);
closeable_set.extend(closeable);
} else {
warn!(
"The table {} metadata is not found, appends closeable followers regions: {:?}",
table_id, regions
);
// If table metadata is not found.
closeable_set.extend(regions);
for &(region, role) in regions {
match self.renew_region_lease(&table_metadata, datanode_id, region, role) {
Some((region, renewed_role)) => {
renewed.insert(region, renewed_role);
}
None => {
non_exists.insert(region);
}
}
}
Ok((upgradable_set, closeable_set))
Ok(RenewRegionLeasesResponse {
renewed,
non_exists,
})
}
#[cfg(test)]
@@ -175,394 +183,217 @@ impl RegionLeaseKeeper {
}
}
#[derive(Debug, Clone)]
pub struct OpeningRegionGuard {
datanode_id: DatanodeId,
region_id: RegionId,
inner: Arc<RwLock<HashSet<(DatanodeId, RegionId)>>>,
}
impl Drop for OpeningRegionGuard {
fn drop(&mut self) {
let mut inner = self.inner.write().unwrap();
inner.remove(&(self.datanode_id, self.region_id));
}
}
impl OpeningRegionGuard {
/// Returns opening region info.
pub fn info(&self) -> (DatanodeId, RegionId) {
(self.datanode_id, self.region_id)
}
}
pub type OpeningRegionKeeperRef = Arc<OpeningRegionKeeper>;
#[derive(Debug, Clone, Default)]
/// Tracks opening regions.
pub struct OpeningRegionKeeper {
inner: Arc<RwLock<HashSet<(DatanodeId, RegionId)>>>,
}
impl OpeningRegionKeeper {
pub fn new() -> Self {
Default::default()
}
/// Returns [OpeningRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist.
pub fn register(
&self,
datanode_id: DatanodeId,
region_id: RegionId,
) -> Option<OpeningRegionGuard> {
let mut inner = self.inner.write().unwrap();
if inner.insert((datanode_id, region_id)) {
Some(OpeningRegionGuard {
datanode_id,
region_id,
inner: self.inner.clone(),
})
} else {
None
}
}
/// Returns true if the keeper contains a (`datanoe_id`, `region_id`) tuple.
pub fn contains(&self, datanode_id: DatanodeId, region_id: RegionId) -> bool {
let inner = self.inner.read().unwrap();
inner.contains(&(datanode_id, region_id))
}
/// Returns a set of filtered out regions that are opening.
pub fn filter_opening_regions(
&self,
datanode_id: DatanodeId,
mut region_ids: HashSet<RegionId>,
) -> HashSet<RegionId> {
let inner = self.inner.read().unwrap();
region_ids.retain(|region_id| !inner.contains(&(datanode_id, *region_id)));
region_ids
}
/// Returns number of element in tracking set.
pub fn len(&self) -> usize {
let inner = self.inner.read().unwrap();
inner.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute, RegionStatus};
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{Region, RegionRouteBuilder, RegionStatus};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use super::{OpeningRegionKeeper, RegionLeaseKeeper};
use super::{renew_region_lease_via_region_route, RegionLeaseKeeper};
use crate::region::lease_keeper::RenewRegionLeasesResponse;
fn new_test_keeper() -> RegionLeaseKeeper {
let store = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
RegionLeaseKeeper::new(table_metadata_manager)
}
#[tokio::test]
async fn test_empty_table_routes() {
let datanode_id = 1;
let region_number = 1u32;
let region_id = RegionId::from_u64(region_number as u64);
let keeper = new_test_keeper();
let datanode_regions = vec![region_id];
let (downgradable, closeable) = keeper
.find_staled_leader_regions(0, datanode_id, &datanode_regions)
.await
.unwrap();
assert_eq!(closeable.len(), 1);
assert!(closeable.contains(&region_id));
assert!(downgradable.is_empty());
let (upgradable, closeable) = keeper
.find_staled_follower_regions(0, datanode_id, &datanode_regions)
.await
.unwrap();
assert!(upgradable.is_empty());
assert_eq!(closeable.len(), 1);
assert!(closeable.contains(&region_id));
}
#[tokio::test]
async fn test_find_closeable_regions_simple() {
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let peer = Peer::empty(datanode_id);
let table_info = new_test_table_info(table_id, vec![region_number]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
..Default::default()
}];
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
// `closeable` should be empty.
let datanode_regions = vec![region_id];
let (downgradable, closeable) = keeper
.find_staled_leader_regions(0, datanode_id, &datanode_regions)
.await
.unwrap();
assert!(closeable.is_empty());
assert!(downgradable.is_empty());
// `closeable` should be empty.
let datanode_regions = vec![];
let (downgradable, closeable) = keeper
.find_staled_leader_regions(0, datanode_id, &datanode_regions)
.await
.unwrap();
assert!(closeable.is_empty());
assert!(downgradable.is_empty());
}
#[tokio::test]
async fn test_find_closeable_regions_2() {
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let another_region_id = RegionId::new(table_id, region_number + 1);
let unknown_region_id = RegionId::new(table_id + 1, region_number);
let peer = Peer::empty(datanode_id);
let another_peer = Peer::empty(datanode_id + 1);
let table_info =
new_test_table_info(table_id, vec![region_number, region_number + 1]).into();
let region_routes = vec![
RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
..Default::default()
},
RegionRoute {
region: Region::new_test(another_region_id),
leader_peer: None,
follower_peers: vec![another_peer.clone()],
leader_status: None,
},
];
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
// Unexpected Leader region.
// `closeable` should be vec![unknown_region_id].
let datanode_regions = vec![region_id, unknown_region_id];
let (downgradable, closeable) = keeper
.find_staled_leader_regions(0, datanode_id, &datanode_regions)
.await
.unwrap();
assert_eq!(closeable.len(), 1);
assert!(closeable.contains(&unknown_region_id));
assert!(downgradable.is_empty());
// Expected as Follower region.
// `closeable` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`.
let datanode_regions = vec![another_region_id];
let (downgradable, closeable) = keeper
.find_staled_leader_regions(0, datanode_id, &datanode_regions)
.await
.unwrap();
assert_eq!(closeable.len(), 1);
assert!(closeable.contains(&another_region_id));
assert!(downgradable.is_empty());
}
#[tokio::test]
async fn test_find_staled_leader_region_downgraded() {
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let another_region_id = RegionId::new(table_id, region_number + 1);
let peer = Peer::empty(datanode_id);
let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
leader_status: Some(RegionStatus::Downgraded),
..Default::default()
}];
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
// `upgradable` should be empty, `closeable` should be empty.
let datanode_regions = vec![region_id, another_region_id];
let (downgradable, closeable) = keeper
.find_staled_leader_regions(0, datanode_id, &datanode_regions)
.await
.unwrap();
assert_eq!(closeable.len(), 1);
assert!(closeable.contains(&another_region_id));
assert_eq!(downgradable.len(), 1);
assert!(downgradable.contains(&region_id));
}
#[tokio::test]
async fn test_find_staled_follower_regions() {
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let peer = Peer::empty(datanode_id);
let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
..Default::default()
}];
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
// `upgradable` should be vec![region_id], `closeable` should be empty.
let datanode_regions = vec![region_id];
let (upgradable, closeable) = keeper
.find_staled_follower_regions(0, datanode_id, &datanode_regions)
.await
.unwrap();
assert!(closeable.is_empty());
assert_eq!(upgradable.len(), 1);
assert!(upgradable.contains(&region_id));
// `upgradable` should be empty, `closeable` should be vec![region_id].
let datanode_regions = vec![region_id];
let (upgradable, closeable) = keeper
.find_staled_follower_regions(0, datanode_id + 1, &datanode_regions)
.await
.unwrap();
assert!(upgradable.is_empty());
assert_eq!(closeable.len(), 1);
assert!(closeable.contains(&region_id));
}
#[tokio::test]
async fn test_find_staled_region_downgraded() {
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let peer = Peer::empty(datanode_id);
let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
leader_status: Some(RegionStatus::Downgraded),
..Default::default()
}];
let datanode_regions = vec![region_id];
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let (upgradable, closeable) = keeper
.find_staled_follower_regions(0, datanode_id, &datanode_regions)
.await
.unwrap();
assert!(upgradable.is_empty());
assert!(closeable.is_empty());
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper)
}
#[test]
fn test_opening_region_keeper() {
let keeper = OpeningRegionKeeper::new();
fn test_renew_region_lease_via_region_route() {
let region_id = RegionId::new(1024, 1);
let leader_peer_id = 1024;
let follower_peer_id = 2048;
let mut region_route = RegionRouteBuilder::default()
.region(Region::new_test(region_id))
.leader_peer(Peer::empty(leader_peer_id))
.follower_peers(vec![Peer::empty(follower_peer_id)])
.build()
.unwrap();
let guard = keeper.register(1, RegionId::from_u64(1)).unwrap();
assert!(keeper.register(1, RegionId::from_u64(1)).is_none());
let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap();
// The region doesn't belong to the datanode.
for region_id in [RegionId::new(1024, 2), region_id] {
assert!(renew_region_lease_via_region_route(&region_route, 1, region_id).is_none());
}
let output = keeper.filter_opening_regions(
1,
HashSet::from([
RegionId::from_u64(1),
RegionId::from_u64(2),
RegionId::from_u64(3),
]),
// The leader region on the datanode.
assert_eq!(
renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
Some((region_id, RegionRole::Leader))
);
// The follower region on the datanode.
assert_eq!(
renew_region_lease_via_region_route(&region_route, follower_peer_id, region_id),
Some((region_id, RegionRole::Follower))
);
assert_eq!(output.len(), 1);
assert!(output.contains(&RegionId::from_u64(3)));
assert_eq!(keeper.len(), 2);
drop(guard);
region_route.leader_status = Some(RegionStatus::Downgraded);
// The downgraded leader region on the datanode.
assert_eq!(
renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
Some((region_id, RegionRole::Follower))
);
}
assert_eq!(keeper.len(), 1);
#[tokio::test]
async fn test_renew_region_leases_non_exists_regions() {
let keeper = new_test_keeper();
assert!(keeper.contains(1, RegionId::from_u64(2)));
drop(guard2);
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(
0,
1,
&[
(RegionId::new(1024, 1), RegionRole::Follower),
(RegionId::new(1024, 2), RegionRole::Leader),
],
)
.await
.unwrap();
assert!(keeper.is_empty());
assert!(renewed.is_empty());
assert_eq!(
non_exists,
HashSet::from([RegionId::new(1024, 1), RegionId::new(1024, 2)])
);
}
#[tokio::test]
async fn test_renew_region_leases_basic() {
let region_number = 1u32;
let table_id = 1024;
let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
let region_id = RegionId::new(table_id, 1);
let leader_peer_id = 1024;
let follower_peer_id = 2048;
let region_route = RegionRouteBuilder::default()
.region(Region::new_test(region_id))
.leader_peer(Peer::empty(leader_peer_id))
.follower_peers(vec![Peer::empty(follower_peer_id)])
.build()
.unwrap();
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, vec![region_route.clone()])
.await
.unwrap();
// The region doesn't belong to the datanode.
for region_id in [RegionId::new(1024, 2), region_id] {
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(0, 1, &[(region_id, RegionRole::Follower)])
.await
.unwrap();
assert!(renewed.is_empty());
assert_eq!(non_exists, HashSet::from([region_id]));
}
// The leader region on the datanode.
for role in [RegionRole::Leader, RegionRole::Follower] {
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(0, leader_peer_id, &[(region_id, role)])
.await
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Leader)]));
}
// The follower region on the datanode.
for role in [RegionRole::Leader, RegionRole::Follower] {
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(0, follower_peer_id, &[(region_id, role)])
.await
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Follower)]));
}
let opening_region_id = RegionId::new(2048, 1);
let _guard = keeper
.memory_region_keeper
.register(leader_peer_id, opening_region_id)
.unwrap();
// The opening region on the datanode.
// NOTES: The procedure lock will ensure only one opening leader.
for role in [RegionRole::Leader, RegionRole::Follower] {
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(0, leader_peer_id, &[(opening_region_id, role)])
.await
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(renewed, HashMap::from([(opening_region_id, role)]));
}
}
#[tokio::test]
async fn test_renew_region_leases_with_downgrade_leader() {
let region_number = 1u32;
let table_id = 1024;
let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
let region_id = RegionId::new(table_id, 1);
let leader_peer_id = 1024;
let follower_peer_id = 2048;
let region_route = RegionRouteBuilder::default()
.region(Region::new_test(region_id))
.leader_peer(Peer::empty(leader_peer_id))
.follower_peers(vec![Peer::empty(follower_peer_id)])
.leader_status(RegionStatus::Downgraded)
.build()
.unwrap();
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, vec![region_route.clone()])
.await
.unwrap();
// The leader region on the datanode.
for role in [RegionRole::Leader, RegionRole::Follower] {
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(0, follower_peer_id, &[(region_id, role)])
.await
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Follower)]));
}
}
}

View File

@@ -1,122 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use common_meta::rpc::router::{
convert_to_region_leader_map, convert_to_region_leader_status_map, RegionRoute,
};
use store_api::storage::RegionId;
use super::utils::downgradable_leader_regions;
use crate::region::lease_keeper::utils::closeable_leader_region;
/// Returns Downgradable regions and closeable regions.
///
/// - Downgradable regions:
/// Region's peer(`datanode_id`) is the corresponding downgraded leader peer in `region_routes`.
///
/// - closeable regions:
/// Region's peer(`datanode_id`) isn't the corresponding leader peer in `region_routes`.
/// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions.
/// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions.
pub fn find_staled_leader_regions(
datanode_id: u64,
datanode_regions: &[RegionId],
region_routes: &[RegionRoute],
) -> (HashSet<RegionId>, HashSet<RegionId>) {
let region_leader_map = convert_to_region_leader_map(region_routes);
let region_leader_status_map = convert_to_region_leader_status_map(region_routes);
let (downgradable, closeable): (HashSet<_>, HashSet<_>) = datanode_regions
.iter()
.map(|region_id| {
(
downgradable_leader_regions(
datanode_id,
*region_id,
&region_leader_map,
&region_leader_status_map,
),
closeable_leader_region(datanode_id, *region_id, &region_leader_map),
)
})
.unzip();
let downgradable = downgradable.into_iter().flatten().collect();
let closeable = closeable.into_iter().flatten().collect();
(downgradable, closeable)
}
#[cfg(test)]
mod tests {
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use crate::region::lease_keeper::mito::find_staled_leader_regions;
#[test]
fn test_find_staled_regions() {
let datanode_id = 1u64;
let region_number = 1u32;
let region_id = RegionId::from_u64(region_number as u64);
let peer = Peer::empty(datanode_id);
let another_peer = Peer::empty(datanode_id + 1);
let datanode_regions = vec![region_id];
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
..Default::default()
}];
// Grants lease.
// `closeable` should be empty, `region_id` is a active leader region of the `peer`
let (downgradable, closeable) =
find_staled_leader_regions(datanode_id, &datanode_regions, &region_routes);
assert!(closeable.is_empty());
assert!(downgradable.is_empty());
// Unexpected Leader region.
// `closeable` should be vec![`region_id`];
let (downgradable, closeable) =
find_staled_leader_regions(datanode_id, &datanode_regions, &[]);
assert_eq!(closeable.len(), 1);
assert!(closeable.contains(&region_id));
assert!(downgradable.is_empty());
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(another_peer.clone()),
follower_peers: vec![peer.clone()],
leader_status: None,
}];
let retained_active_regions = datanode_regions.clone();
// Expected as Follower region.
// `closeable` should be vec![`region_id`], `region_id` is RegionRole::Leader.
let (downgradable, closeable) =
find_staled_leader_regions(datanode_id, &retained_active_regions, &region_routes);
assert!(downgradable.is_empty());
assert_eq!(closeable.len(), 1);
assert!(closeable.contains(&region_id));
}
}

View File

@@ -1,340 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use common_meta::peer::Peer;
use common_meta::rpc::router::{
convert_to_region_leader_map, convert_to_region_leader_status_map, convert_to_region_peer_map,
RegionRoute, RegionStatus,
};
use store_api::storage::{RegionId, RegionNumber};
/// Returns Some(region_id) if it's not a leader region in `region_route`.
///
/// It removes a leader region if its peer(`node_id`) isn't the corresponding leader peer in `region_routes`.
pub fn closeable_leader_region(
node_id: u64,
region_id: RegionId,
region_leader_map: &HashMap<RegionNumber, &Peer>,
) -> Option<RegionId> {
let region_number = region_id.region_number();
if let Some(peer) = region_leader_map.get(&region_number) {
if peer.id == node_id {
None
} else {
Some(region_id)
}
} else {
Some(region_id)
}
}
/// Returns Some(region_id) if its peer(`node_id`) a downgrade leader region peer in `region_route`.
pub fn downgradable_leader_regions(
node_id: u64,
region_id: RegionId,
region_leader_map: &HashMap<RegionNumber, &Peer>,
region_leader_status: &HashMap<RegionNumber, RegionStatus>,
) -> Option<RegionId> {
let region_number = region_id.region_number();
let leader_status = region_leader_status.get(&region_number);
let downgraded = matches!(leader_status, Some(RegionStatus::Downgraded));
if let Some(peer) = region_leader_map.get(&region_number) {
if peer.id == node_id && downgraded {
Some(region_id)
} else {
None
}
} else {
None
}
}
/// Returns upgradable regions, and closeable regions.
///
/// Upgradable regions:
/// - Region's peer(`datanode_id`) is the corresponding leader peer in `region_routes`.
///
/// closeable regions:
/// - Region's peer(`datanode_id`) isn't the corresponding leader/follower peer in `region_routes`.
pub fn find_staled_follower_regions(
datanode_id: u64,
datanode_regions: &[RegionId],
region_routes: &[RegionRoute],
) -> (HashSet<RegionId>, HashSet<RegionId>) {
let region_leader_map = convert_to_region_leader_map(region_routes);
let region_leader_status_map = convert_to_region_leader_status_map(region_routes);
let region_peer_map = convert_to_region_peer_map(region_routes);
let (upgradable, closeable): (HashSet<Option<RegionId>>, HashSet<Option<RegionId>>) =
datanode_regions
.iter()
.map(|region_id| {
(
upgradable_follower_region(
datanode_id,
*region_id,
&region_leader_map,
&region_leader_status_map,
),
closeable_region(datanode_id, *region_id, &region_peer_map),
)
})
.unzip();
let upgradable = upgradable.into_iter().flatten().collect();
let closeable = closeable.into_iter().flatten().collect();
(upgradable, closeable)
}
/// Returns Some(region) if its peer(`node_id`) a leader region peer in `region_routes`.
pub fn upgradable_follower_region(
node_id: u64,
region_id: RegionId,
region_leader_map: &HashMap<RegionNumber, &Peer>,
region_leader_status: &HashMap<RegionNumber, RegionStatus>,
) -> Option<RegionId> {
let region_number = region_id.region_number();
let leader_status = region_leader_status.get(&region_number);
let downgraded = matches!(leader_status, Some(RegionStatus::Downgraded));
if let Some(peer) = region_leader_map.get(&region_number) {
if peer.id == node_id && !downgraded {
Some(region_id)
} else {
None
}
} else {
None
}
}
/// Returns Some(region) if its peer(`node_id) is't a leader or follower region peer in `region_routes`.
pub fn closeable_region(
node_id: u64,
region_id: RegionId,
region_peer_map: &HashMap<RegionNumber, HashSet<u64>>,
) -> Option<RegionId> {
if let Some(set) = region_peer_map.get(&region_id.region_number()) {
if set.get(&node_id).is_some() {
None
} else {
Some(region_id)
}
} else {
Some(region_id)
}
}
#[cfg(test)]
mod tests {
use common_meta::peer::Peer;
use store_api::storage::RegionId;
use super::*;
#[test]
fn test_closeable_leader_region() {
let datanode_id = 1u64;
let region_number = 1u32;
let region_id = RegionId::from_u64(region_number as u64);
let peer = Peer::empty(datanode_id);
let region_leader_map = [(region_number, &peer)].into();
// Should be None, `region_id` is an active region of `peer`.
assert_eq!(
None,
closeable_leader_region(datanode_id, region_id, &region_leader_map,)
);
// Should be Some(`region_id`), incorrect datanode_id.
assert_eq!(
Some(region_id),
closeable_leader_region(datanode_id + 1, region_id, &region_leader_map,)
);
// Should be Some(`region_id`), the inactive_leader_regions is empty.
assert_eq!(
Some(region_id),
closeable_leader_region(datanode_id, region_id, &Default::default(),)
);
let another_peer = Peer::empty(datanode_id + 1);
let region_leader_map = [(region_number, &another_peer)].into();
// Should be Some(`region_id`), `region_id` is active region of `another_peer`.
assert_eq!(
Some(region_id),
closeable_leader_region(datanode_id, region_id, &region_leader_map,)
);
}
#[test]
fn test_downgradable_region() {
let datanode_id = 1u64;
let region_number = 1u32;
let region_id = RegionId::from_u64(region_number as u64);
let peer = Peer::empty(datanode_id);
let region_leader_map = [(region_number, &peer)].into();
let region_leader_status_map = [(region_number, RegionStatus::Downgraded)].into();
// Should be Some(region_id), `region_id` is a downgraded leader region.
assert_eq!(
Some(region_id),
downgradable_leader_regions(
datanode_id,
region_id,
&region_leader_map,
&region_leader_status_map
)
);
// Should be None, `region_id` is a leader region.
assert_eq!(
None,
downgradable_leader_regions(
datanode_id,
region_id,
&region_leader_map,
&Default::default(),
)
);
// Should be None, incorrect datanode_id.
assert_eq!(
None,
downgradable_leader_regions(
datanode_id + 1,
region_id,
&region_leader_map,
&region_leader_status_map
)
);
// Should be None, incorrect datanode_id.
assert_eq!(
None,
downgradable_leader_regions(
datanode_id + 1,
region_id,
&region_leader_map,
&Default::default(),
)
);
}
#[test]
fn test_closeable_follower_region() {
let region_number = 1u32;
let region_id = RegionId::from_u64(region_number as u64);
let another_region_id = RegionId::from_u64(region_number as u64 + 1);
let region_peer_map = [(region_number, HashSet::from([1, 2, 3]))].into();
// Should be None.
assert_eq!(None, closeable_region(1, region_id, &region_peer_map));
// Should be Some(`region_id`), incorrect `datanode_id`.
assert_eq!(
Some(region_id),
closeable_region(4, region_id, &region_peer_map)
);
// Should be Some(`another_region_id`), `another_region_id` doesn't exist.
assert_eq!(
Some(another_region_id),
closeable_region(1, another_region_id, &region_peer_map)
);
// Should be Some(`another_region_id`), `another_region_id` doesn't exist, incorrect `datanode_id`.
assert_eq!(
Some(another_region_id),
closeable_region(4, another_region_id, &region_peer_map)
);
}
#[test]
fn test_upgradable_follower_region() {
let datanode_id = 1u64;
let region_number = 1u32;
let region_id = RegionId::from_u64(region_number as u64);
let another_region_id = RegionId::from_u64(region_number as u64 + 1);
let peer = Peer::empty(datanode_id);
let region_leader_map = [(region_number, &peer)].into();
let region_leader_status = HashMap::new();
// Should be Some(region_id), `region_id` is a leader region.
assert_eq!(
Some(region_id),
upgradable_follower_region(
datanode_id,
region_id,
&region_leader_map,
&region_leader_status
)
);
let downgraded_leader = [(region_number, RegionStatus::Downgraded)].into();
// Should be None, `region_id` is a downgraded leader region.
assert_eq!(
None,
upgradable_follower_region(
datanode_id,
region_id,
&region_leader_map,
&downgraded_leader
)
);
// Should be None, incorrect `datanode_id`.
assert_eq!(
None,
upgradable_follower_region(
datanode_id + 1,
region_id,
&region_leader_map,
&region_leader_status
)
);
// Should be None, incorrect `datanode_id`, `another_region_id` doesn't exist.
assert_eq!(
None,
upgradable_follower_region(
datanode_id,
another_region_id,
&region_leader_map,
&region_leader_status
)
);
// Should be None, incorrect `datanode_id`, `another_region_id` doesn't exist, incorrect `datanode_id`.
assert_eq!(
None,
upgradable_follower_region(
datanode_id + 1,
another_region_id,
&region_leader_map,
&region_leader_status
)
);
}
}

View File

@@ -392,7 +392,6 @@ impl CreateRequestBuilder {
});
RegionCreateRequest {
// We use empty engine name as we already locates the engine.
engine: self.engine.to_string(),
column_metadatas,
primary_key: self.primary_key.clone().unwrap_or(primary_key),

View File

@@ -14,6 +14,7 @@
//! Region Engine's definition
use std::fmt::Display;
use std::sync::Arc;
use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
@@ -84,6 +85,15 @@ pub enum RegionRole {
Leader,
}
impl Display for RegionRole {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RegionRole::Follower => write!(f, "Follower"),
RegionRole::Leader => write!(f, "Leader"),
}
}
}
impl RegionRole {
pub fn writable(&self) -> bool {
matches!(self, RegionRole::Leader)

View File

@@ -44,6 +44,7 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_handle_ddl_request() {
common_telemetry::init_default_ut_logging();
let instance =
tests::create_distributed_instance("test_distributed_handle_ddl_request").await;

View File

@@ -20,6 +20,7 @@ use common_config::KvBackendConfig;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use datanode::config::DatanodeOptions;
@@ -115,6 +116,7 @@ impl GreptimeDbStandaloneBuilder {
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
Arc::new(MemoryRegionKeeper::default()),
)
.unwrap(),
);