refactor: replace InactiveRegionManager with RegionLeaseKeeper (#2729)

* refactor: replace InactiveRegionManager with RegionLeaseKeeper

* refactor: remove InactiveRegionManager

* chore: remove remark_inactive_region field

* chore: apply suggestions from CR

* refactor: reduce meta server cache scope

* chore: remove redundant code
This commit is contained in:
Weny Xu
2023-11-14 14:16:13 +09:00
committed by GitHub
parent 8f3b299a45
commit dc46e96879
14 changed files with 330 additions and 408 deletions

View File

@@ -660,9 +660,7 @@ impl TableMetadataManager {
.table_route_manager()
.build_update_txn(table_id, &current_table_route_value, &new_table_route_value)?;
let txn = Txn::merge_all(vec![update_table_route_txn]);
let r = self.kv_backend.txn(txn).await?;
let r = self.kv_backend.txn(update_table_route_txn).await?;
// Checks whether metadata was already updated.
if !r.succeeded {

View File

@@ -373,12 +373,16 @@ impl CountdownTask {
countdown.set(tokio::time::sleep_until(first_deadline));
},
Some(CountdownCommand::Reset((role, deadline))) => {
// The first-time granted regions might be ignored because the `first_deadline` is larger than the `region_lease_timeout`.
// Therefore, we set writable at the outside.
// TODO(weny): Considers setting `first_deadline` to `region_lease_timeout`.
let _ = self.region_server.set_writable(self.region_id, role.writable());
if countdown.deadline() < deadline {
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later",
(deadline - Instant::now()).as_secs_f32(),
);
let _ = self.region_server.set_writable(self.region_id, role.writable());
countdown.set(tokio::time::sleep_until(deadline));
}
// Else the countdown could be either:

View File

@@ -18,6 +18,7 @@ use api::v1::meta::HeartbeatRequest;
use common_time::util as time_util;
use serde::{Deserialize, Serialize};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use crate::error::{Error, InvalidHeartbeatRequestSnafu};
use crate::keys::StatKey;
@@ -72,8 +73,12 @@ impl Stat {
}
}
pub fn region_ids(&self) -> Vec<u64> {
self.region_stats.iter().map(|s| s.id).collect()
/// Returns a tuple array containing [RegionId] and [RegionRole].
pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
self.region_stats
.iter()
.map(|s| (RegionId::from(s.id), s.role))
.collect()
}
pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<u64>) {

View File

@@ -12,22 +12,63 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{GrantedRegion, HeartbeatRequest, RegionLease, RegionRole, Role};
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::inactive_region_manager::InactiveRegionManager;
use crate::metasrv::Context;
use crate::region::lease_keeper::RegionLeaseKeeperRef;
use crate::region::RegionLeaseKeeper;
pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,
}
impl RegionLeaseHandler {
pub fn new(region_lease_seconds: u64) -> Self {
pub fn new(region_lease_seconds: u64, table_metadata_manager: TableMetadataManagerRef) -> Self {
let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager);
Self {
region_lease_seconds,
region_lease_keeper: Arc::new(region_lease_keeper),
}
}
}
fn flip_role(role: RegionRole) -> RegionRole {
match role {
RegionRole::Follower => RegionRole::Leader,
RegionRole::Leader => RegionRole::Follower,
}
}
/// Grants lease of regions.
///
/// - If a region is in an `operable` set, it will be granted an `flip_role(current)`([RegionRole]);
/// otherwise, it will be granted a `current`([RegionRole]).
/// - If a region is in a `closable` set, it won't be granted.
fn grant(
granted_regions: &mut Vec<GrantedRegion>,
operable: &HashSet<RegionId>,
closable: &HashSet<RegionId>,
regions: &[RegionId],
current: RegionRole,
) {
for region in regions {
if operable.contains(region) {
granted_regions.push(GrantedRegion::new(*region, flip_role(current)));
} else if closable.contains(region) {
// Filters out the closable regions.
} else {
granted_regions.push(GrantedRegion::new(*region, current))
}
}
}
@@ -41,31 +82,61 @@ impl HeartbeatHandler for RegionLeaseHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let Some(stat) = acc.stat.as_ref() else {
return Ok(());
};
let mut region_ids = stat.region_ids();
let regions = stat.regions();
let cluster_id = stat.cluster_id;
let datanode_id = stat.id;
let mut granted_regions = Vec::with_capacity(regions.len());
let inactive_region_manager = InactiveRegionManager::new(&ctx.in_memory);
let inactive_region_ids = inactive_region_manager
.retain_active_regions(stat.cluster_id, stat.id, &mut region_ids)
let (leaders, followers): (Vec<_>, Vec<_>) = regions
.into_iter()
.map(|(id, role)| match role {
RegionRole::Follower => (None, Some(id)),
RegionRole::Leader => (Some(id), None),
})
.unzip();
let leaders = leaders.into_iter().flatten().collect::<Vec<_>>();
let (downgradable, closable) = self
.region_lease_keeper
.find_staled_leader_regions(cluster_id, datanode_id, &leaders)
.await?;
let regions = region_ids
.into_iter()
.map(|region_id| GrantedRegion {
region_id,
role: RegionRole::Leader.into(),
})
.collect();
grant(
&mut granted_regions,
&downgradable,
&closable,
&leaders,
RegionRole::Leader,
);
let followers = followers.into_iter().flatten().collect::<Vec<_>>();
let (upgradeable, closable) = self
.region_lease_keeper
.find_staled_follower_regions(cluster_id, datanode_id, &followers)
.await?;
grant(
&mut granted_regions,
&upgradeable,
&closable,
&followers,
RegionRole::Follower,
);
acc.inactive_region_ids = inactive_region_ids;
acc.region_lease = Some(RegionLease {
regions,
regions: granted_regions
.into_iter()
.map(Into::into)
.collect::<Vec<_>>(),
duration_since_epoch: req.duration_since_epoch,
lease_seconds: self.region_lease_seconds,
});
@@ -76,101 +147,215 @@ impl HeartbeatHandler for RegionLeaseHandler {
#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::RegionRole;
use common_meta::distributed_time_constants;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
use common_meta::{distributed_time_constants, RegionIdent};
use store_api::storage::{RegionId, RegionNumber};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute, RegionStatus};
use store_api::storage::RegionId;
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::test_util;
fn new_test_keeper() -> RegionLeaseKeeper {
let store = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
RegionLeaseKeeper::new(table_metadata_manager)
}
fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
RegionStat {
id: region_id.as_u64(),
role,
rcus: 0,
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
engine: String::new(),
}
}
#[tokio::test]
async fn test_handle_region_lease() {
let region_failover_manager = test_util::create_region_failover_manager();
let kv_backend = region_failover_manager
.create_context()
.selector_ctx
.kv_backend
.clone();
async fn test_handle_upgradable_follower() {
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let another_region_id = RegionId::new(table_id, region_number + 1);
let peer = Peer::empty(datanode_id);
let follower_peer = Peer::empty(datanode_id + 1);
let table_info = new_test_table_info(table_id, vec![region_number]).into();
let cluster_id = 1;
let table_id = 1;
let table_name = "my_table";
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
test_util::prepare_table_region_and_info_value(&table_metadata_manager, table_name).await;
let req = HeartbeatRequest {
duration_since_epoch: 1234,
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
follower_peers: vec![follower_peer.clone()],
..Default::default()
};
}];
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let builder = MetaSrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let ctx = &mut metasrv.new_ctx();
let acc = &mut HeartbeatAccumulator::default();
let new_region_stat = |region_number: RegionNumber| -> RegionStat {
let region_id = RegionId::new(table_id, region_number);
RegionStat {
id: region_id.as_u64(),
rcus: 0,
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
engine: String::new(),
role: RegionRole::Leader.into(),
}
};
acc.stat = Some(Stat {
cluster_id: 1,
id: 1,
region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)],
cluster_id,
id: peer.id,
region_stats: vec![
new_empty_region_stat(region_id, RegionRole::Follower),
new_empty_region_stat(another_region_id, RegionRole::Follower),
],
..Default::default()
});
let inactive_region_manager = InactiveRegionManager::new(&ctx.in_memory);
inactive_region_manager
.register_inactive_region(&RegionIdent {
cluster_id: 1,
datanode_id: 1,
table_id: 1,
region_number: 1,
engine: "mito2".to_string(),
})
.await
.unwrap();
inactive_region_manager
.register_inactive_region(&RegionIdent {
cluster_id: 1,
datanode_id: 1,
table_id: 1,
region_number: 3,
engine: "mito2".to_string(),
})
.await
.unwrap();
let req = HeartbeatRequest {
duration_since_epoch: 1234,
..Default::default()
};
RegionLeaseHandler::new(distributed_time_constants::REGION_LEASE_SECS)
.handle(&req, ctx, acc)
.await
.unwrap();
assert!(acc.region_lease.is_some());
let lease = acc.region_lease.as_ref().unwrap();
assert_eq!(
lease.regions,
vec![GrantedRegion {
region_id: RegionId::new(table_id, 2).as_u64(),
role: RegionRole::Leader.into()
}]
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
);
assert_eq!(lease.duration_since_epoch, 1234);
handler.handle(&req, ctx, acc).await.unwrap();
assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
let acc = &mut HeartbeatAccumulator::default();
acc.stat = Some(Stat {
cluster_id,
id: follower_peer.id,
region_stats: vec![
new_empty_region_stat(region_id, RegionRole::Follower),
new_empty_region_stat(another_region_id, RegionRole::Follower),
],
..Default::default()
});
handler.handle(&req, ctx, acc).await.unwrap();
assert_eq!(
lease.lease_seconds,
acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS
);
assert_region_lease(
acc,
vec![GrantedRegion::new(region_id, RegionRole::Follower)],
);
}
#[tokio::test]
async fn test_handle_downgradable_leader() {
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let another_region_id = RegionId::new(table_id, region_number + 1);
let no_exist_region_id = RegionId::new(table_id, region_number + 2);
let peer = Peer::empty(datanode_id);
let follower_peer = Peer::empty(datanode_id + 1);
let table_info = new_test_table_info(table_id, vec![region_number]).into();
let cluster_id = 1;
let region_routes = vec![
RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
follower_peers: vec![follower_peer.clone()],
leader_status: Some(RegionStatus::Downgraded),
},
RegionRoute {
region: Region::new_test(another_region_id),
leader_peer: Some(peer.clone()),
..Default::default()
},
];
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let builder = MetaSrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let ctx = &mut metasrv.new_ctx();
let req = HeartbeatRequest {
duration_since_epoch: 1234,
..Default::default()
};
let acc = &mut HeartbeatAccumulator::default();
acc.stat = Some(Stat {
cluster_id,
id: peer.id,
region_stats: vec![
new_empty_region_stat(region_id, RegionRole::Leader),
new_empty_region_stat(another_region_id, RegionRole::Leader),
new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
],
..Default::default()
});
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
);
handler.handle(&req, ctx, acc).await.unwrap();
assert_region_lease(
acc,
vec![
GrantedRegion::new(region_id, RegionRole::Follower),
GrantedRegion::new(another_region_id, RegionRole::Leader),
],
);
}
fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
let region_lease = acc.region_lease.as_ref().unwrap().clone();
let granted: Vec<GrantedRegion> = region_lease
.regions
.into_iter()
.map(Into::into)
.collect::<Vec<_>>();
let granted = granted
.into_iter()
.map(|region| (region.region_id, region))
.collect::<HashMap<_, _>>();
let expected = expected
.into_iter()
.map(|region| (region.region_id, region))
.collect::<HashMap<_, _>>();
assert_eq!(granted, expected);
}
}

View File

@@ -1,156 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::rpc::store::{BatchGetRequest, DeleteRangeRequest, PutRequest, RangeRequest};
use common_meta::RegionIdent;
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::keys::InactiveRegionKey;
use crate::metrics::METRIC_META_INACTIVE_REGIONS;
pub struct InactiveRegionManager<'a> {
store: &'a ResettableKvBackendRef,
}
impl<'a> InactiveRegionManager<'a> {
pub fn new(store: &'a ResettableKvBackendRef) -> Self {
Self { store }
}
pub async fn register_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> {
let region_id = region_ident.get_region_id().as_u64();
let key = InactiveRegionKey {
cluster_id: region_ident.cluster_id,
node_id: region_ident.datanode_id,
region_id,
};
let req = PutRequest {
key: key.into(),
value: vec![],
prev_kv: false,
};
self.store.put(req).await.context(error::KvBackendSnafu)?;
METRIC_META_INACTIVE_REGIONS.inc();
Ok(())
}
pub async fn deregister_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> {
let region_id = region_ident.get_region_id().as_u64();
let key: Vec<u8> = InactiveRegionKey {
cluster_id: region_ident.cluster_id,
node_id: region_ident.datanode_id,
region_id,
}
.into();
self.store
.delete(&key, false)
.await
.context(error::KvBackendSnafu)?;
METRIC_META_INACTIVE_REGIONS.dec();
Ok(())
}
/// The input is a list of regions on a specific node. If one or more regions have been
/// set to inactive state by metasrv, the corresponding regions will be removed(update the
/// `region_ids`), then returns the removed regions.
pub async fn retain_active_regions(
&self,
cluster_id: u64,
node_id: u64,
region_ids: &mut Vec<u64>,
) -> Result<HashSet<u64>> {
let key_region_ids = region_ids
.iter()
.map(|region_id| {
(
InactiveRegionKey {
cluster_id,
node_id,
region_id: *region_id,
}
.into(),
*region_id,
)
})
.collect::<Vec<(Vec<u8>, _)>>();
let keys = key_region_ids.iter().map(|(key, _)| key.clone()).collect();
let resp = self
.store
.batch_get(BatchGetRequest { keys })
.await
.context(error::KvBackendSnafu)?;
let kvs = resp.kvs;
if kvs.is_empty() {
return Ok(HashSet::new());
}
let inactive_keys = kvs.into_iter().map(|kv| kv.key).collect::<HashSet<_>>();
let (active_region_ids, inactive_region_ids): (Vec<Option<u64>>, Vec<Option<u64>>) =
key_region_ids
.into_iter()
.map(|(key, region_id)| {
let is_active = !inactive_keys.contains(&key);
if is_active {
(Some(region_id), None)
} else {
(None, Some(region_id))
}
})
.unzip();
*region_ids = active_region_ids.into_iter().flatten().collect();
Ok(inactive_region_ids.into_iter().flatten().collect())
}
/// Scan all inactive regions in the cluster.
///
/// When will these data appear?
/// Generally, it is because the corresponding Datanode is disconnected and
/// did not respond to the `Failover` scheduling instructions of metasrv.
pub async fn scan_all_inactive_regions(
&self,
cluster_id: u64,
) -> Result<Vec<InactiveRegionKey>> {
let prefix = InactiveRegionKey::get_prefix_by_cluster(cluster_id);
let request = RangeRequest::new().with_prefix(prefix);
let resp = self
.store
.range(request)
.await
.context(error::KvBackendSnafu)?;
let kvs = resp.kvs;
kvs.into_iter()
.map(|kv| InactiveRegionKey::try_from(kv.key))
.collect::<Result<Vec<_>>>()
}
pub async fn clear_all_inactive_regions(&self, cluster_id: u64) -> Result<()> {
let prefix = InactiveRegionKey::get_prefix_by_cluster(cluster_id);
let request = DeleteRangeRequest::new().with_prefix(prefix);
let _ = self
.store
.delete_range(request)
.await
.context(error::KvBackendSnafu)?;
Ok(())
}
}

View File

@@ -40,8 +40,6 @@ pub mod table_meta_alloc;
pub use crate::error::Result;
mod inactive_region_manager;
mod greptimedb_telemetry;
#[cfg(test)]

View File

@@ -168,7 +168,6 @@ impl MetaSrvBuilder {
state.clone(),
kv_backend.clone(),
));
let kv_backend = leader_cached_kv_backend.clone() as _;
let meta_peer_client = meta_peer_client
.unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
@@ -177,7 +176,9 @@ impl MetaSrvBuilder {
let mailbox = build_mailbox(&kv_backend, &pushers);
let procedure_manager = build_procedure_manager(&options, &kv_backend);
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend.clone()));
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let table_metadata_manager = Arc::new(TableMetadataManager::new(
leader_cached_kv_backend.clone() as _,
));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
let selector_ctx = SelectorContext {
server_addr: options.server_addr.clone(),
@@ -227,8 +228,10 @@ impl MetaSrvBuilder {
.and_then(|plugins| plugins.get::<PublishRef>())
.map(|publish| PublishHeartbeatHandler::new(publish.clone()));
let region_lease_handler =
RegionLeaseHandler::new(distributed_time_constants::REGION_LEASE_SECS);
let region_lease_handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
);
let group = HeartbeatHandlerGroup::new(pushers);
group.add_handler(ResponseHeaderHandler).await;

View File

@@ -270,8 +270,6 @@ trait State: Sync + Send + Debug {
fn status(&self) -> Status {
Status::executing(true)
}
fn remark_inactive_region_if_needed(&mut self) {}
}
/// The states transition of region failover procedure:
@@ -341,11 +339,7 @@ impl RegionFailoverProcedure {
}
fn from_json(json: &str, context: RegionFailoverContext) -> ProcedureResult<Self> {
let mut node: Node = serde_json::from_str(json).context(FromJsonSnafu)?;
// If the meta leader node dies during the execution of the procedure,
// the new leader node needs to remark the failed region as "inactive"
// to prevent it from renewing the lease.
node.state.remark_inactive_region_if_needed();
let node: Node = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { node, context })
}
}

View File

@@ -31,7 +31,6 @@ use crate::error::{
self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::HeartbeatMailbox;
use crate::inactive_region_manager::InactiveRegionManager;
use crate::procedure::region_failover::OPEN_REGION_MESSAGE_TIMEOUT;
use crate::service::mailbox::{Channel, MailboxReceiver};
@@ -104,17 +103,6 @@ impl ActivateRegion {
input: instruction.to_string(),
})?;
// Ensure that metasrv will renew the lease for this candidate node.
//
// This operation may not be redundant, imagine the following scenario:
// This candidate once had the current region, and because it did not respond to the `close`
// command in time, it was considered an inactive node by metasrv, then it replied, and the
// current region failed over again, and the node was selected as a candidate, so it needs
// to clear its previous state first.
InactiveRegionManager::new(&ctx.in_memory)
.deregister_inactive_region(&candidate_ident)
.await?;
let ch = Channel::Datanode(self.candidate.id);
ctx.mailbox.send(&ch, msg, timeout).await
}
@@ -182,23 +170,12 @@ impl State for ActivateRegion {
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
if self.remark_inactive_region {
// Remark the fail region as inactive to prevent it from renewing the lease.
InactiveRegionManager::new(&ctx.in_memory)
.register_inactive_region(failed_region)
.await?;
}
let mailbox_receiver = self
.send_open_region_message(ctx, failed_region, OPEN_REGION_MESSAGE_TIMEOUT)
.await?;
self.handle_response(mailbox_receiver, failed_region).await
}
fn remark_inactive_region_if_needed(&mut self) {
self.remark_inactive_region = true;
}
}
#[cfg(test)]

View File

@@ -30,7 +30,6 @@ use crate::error::{
self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::HeartbeatMailbox;
use crate::inactive_region_manager::InactiveRegionManager;
use crate::service::mailbox::{Channel, MailboxReceiver};
#[derive(Serialize, Deserialize, Debug)]
@@ -91,22 +90,13 @@ impl DeactivateRegion {
})?;
let ch = Channel::Datanode(failed_region.datanode_id);
// Mark the region as inactive
InactiveRegionManager::new(&ctx.in_memory)
.register_inactive_region(failed_region)
.await?;
// We first marked the region as inactive, which means that the failed region cannot
// be successfully renewed from now on, so after the lease time is exceeded, the region
// will be automatically closed.
// If the deadline is exceeded, we can proceed to the next step with confidence,
// as the expiration means that the region has been closed.
let timeout = Duration::from_secs(ctx.region_lease_secs);
ctx.mailbox.send(&ch, msg, timeout).await
}
async fn handle_response(
&self,
ctx: &RegionFailoverContext,
_ctx: &RegionFailoverContext,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
@@ -123,10 +113,6 @@ impl DeactivateRegion {
.fail();
};
if result {
InactiveRegionManager::new(&ctx.in_memory)
.deregister_inactive_region(failed_region)
.await?;
Ok(Box::new(ActivateRegion::new(self.candidate.clone())))
} else {
// Under rare circumstances would a Datanode fail to close a Region.

View File

@@ -16,6 +16,7 @@ pub mod mito;
pub mod utils;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
@@ -26,6 +27,8 @@ use self::mito::find_staled_leader_regions;
use crate::error::{self, Result};
use crate::region::lease_keeper::utils::find_staled_follower_regions;
pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
pub struct RegionLeaseKeeper {
table_metadata_manager: TableMetadataManagerRef,
}

View File

@@ -14,7 +14,6 @@
mod health;
mod heartbeat;
mod inactive_regions;
mod leader;
mod meta;
mod node_lease;
@@ -91,20 +90,6 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
.route("/route", handler.clone())
.route("/route/help", handler);
let router = router.route(
"/inactive-regions/view",
inactive_regions::ViewInactiveRegionsHandler {
store: meta_srv.in_memory().clone(),
},
);
let router = router.route(
"/inactive-regions/clear",
inactive_regions::ClearInactiveRegionsHandler {
store: meta_srv.in_memory().clone(),
},
);
let router = Router::nest("/admin", router);
Admin::new(router)

View File

@@ -1,92 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_meta::kv_backend::ResettableKvBackendRef;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tonic::codegen::http;
use crate::error::{self, Result};
use crate::inactive_region_manager::InactiveRegionManager;
use crate::keys::InactiveRegionKey;
use crate::service::admin::{util, HttpHandler};
pub struct ViewInactiveRegionsHandler {
pub store: ResettableKvBackendRef,
}
#[async_trait::async_trait]
impl HttpHandler for ViewInactiveRegionsHandler {
async fn handle(
&self,
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let cluster_id = util::extract_cluster_id(params)?;
let inactive_region_manager = InactiveRegionManager::new(&self.store);
let inactive_regions = inactive_region_manager
.scan_all_inactive_regions(cluster_id)
.await?;
let result = InactiveRegions { inactive_regions }.try_into()?;
http::Response::builder()
.status(http::StatusCode::OK)
.body(result)
.context(error::InvalidHttpBodySnafu)
}
}
pub struct ClearInactiveRegionsHandler {
pub store: ResettableKvBackendRef,
}
#[async_trait::async_trait]
impl HttpHandler for ClearInactiveRegionsHandler {
async fn handle(
&self,
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let cluster_id = util::extract_cluster_id(params)?;
let inactive_region_manager = InactiveRegionManager::new(&self.store);
inactive_region_manager
.clear_all_inactive_regions(cluster_id)
.await?;
Ok(http::Response::builder()
.status(http::StatusCode::OK)
.body("Success\n".to_owned())
.unwrap())
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
struct InactiveRegions {
inactive_regions: Vec<InactiveRegionKey>,
}
impl TryFrom<InactiveRegions> for String {
type Error = error::Error;
fn try_from(value: InactiveRegions) -> Result<Self> {
serde_json::to_string(&value).context(error::SerializeToJsonSnafu {
input: format!("{value:?}"),
})
}
}

View File

@@ -16,7 +16,7 @@
use std::sync::Arc;
use api::greptime_proto::v1::meta::RegionRole as PbRegionRole;
use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::Output;
@@ -27,6 +27,38 @@ use crate::metadata::RegionMetadataRef;
use crate::region_request::RegionRequest;
use crate::storage::{RegionId, ScanRequest};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GrantedRegion {
pub region_id: RegionId,
pub region_role: RegionRole,
}
impl GrantedRegion {
pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
Self {
region_id,
region_role,
}
}
}
impl From<GrantedRegion> for PbGrantedRegion {
fn from(value: GrantedRegion) -> Self {
PbGrantedRegion {
region_id: value.region_id.as_u64(),
role: PbRegionRole::from(value.region_role).into(),
}
}
}
impl From<PbGrantedRegion> for GrantedRegion {
fn from(value: PbGrantedRegion) -> Self {
GrantedRegion {
region_id: RegionId::from_u64(value.region_id),
region_role: value.role().into(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RegionRole {
// Readonly region(mito2), Readonly region(file).