diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index da09f81b1b..659114e517 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -15,6 +15,7 @@ use std::fmt::{Display, Formatter}; use serde::{Deserialize, Serialize}; +use store_api::storage::RegionId; use crate::ident::TableIdent; use crate::{ClusterId, DatanodeId}; @@ -27,6 +28,12 @@ pub struct RegionIdent { pub region_number: u32, } +impl RegionIdent { + pub fn get_region_id(&self) -> RegionId { + RegionId::new(self.table_ident.table_id, self.region_number) + } +} + impl Display for RegionIdent { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 750b979d22..b5d2f34b64 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -156,6 +156,9 @@ pub enum Error { #[snafu(display("Invalid datanode stat key: {}", key))] InvalidStatKey { key: String, location: Location }, + #[snafu(display("Invalid inactive region key: {}", key))] + InvalidInactiveRegionKey { key: String, location: Location }, + #[snafu(display("Failed to parse datanode lease key from utf8: {}", source))] LeaseKeyFromUtf8 { source: std::string::FromUtf8Error, @@ -180,6 +183,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse invalid region key from utf8: {}", source))] + InvalidRegionKeyFromUtf8 { + source: std::string::FromUtf8Error, + location: Location, + }, + #[snafu(display("Failed to serialize to json: {}", input))] SerializeToJson { input: String, @@ -556,6 +565,7 @@ impl ErrorExt for Error { | Error::EmptyTableName { .. } | Error::InvalidLeaseKey { .. } | Error::InvalidStatKey { .. } + | Error::InvalidInactiveRegionKey { .. } | Error::ParseNum { .. } | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } @@ -565,6 +575,7 @@ impl ErrorExt for Error { | Error::LeaseValueFromUtf8 { .. } | Error::StatKeyFromUtf8 { .. } | Error::StatValueFromUtf8 { .. } + | Error::InvalidRegionKeyFromUtf8 { .. } | Error::UnexpectedSequenceValue { .. } | Error::TableRouteNotFound { .. } | Error::TableInfoNotFound { .. } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 4942b650f6..62d88fd2d3 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; -use crate::inactive_node_manager::InactiveNodeManager; +use crate::inactive_region_manager::InactiveRegionManager; use crate::metasrv::Context; pub struct RegionLeaseHandler { @@ -50,8 +50,8 @@ impl HeartbeatHandler for RegionLeaseHandler { let mut region_ids = stat.region_ids(); - let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory); - let inactive_region_ids = inactive_node_manager + let inactive_region_manager = InactiveRegionManager::new(&ctx.in_memory); + let inactive_region_ids = inactive_region_manager .retain_active_regions(stat.cluster_id, stat.id, &mut region_ids) .await?; @@ -126,8 +126,8 @@ mod test { ..Default::default() }); - let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory); - inactive_node_manager + let inactive_region_manager = InactiveRegionManager::new(&ctx.in_memory); + inactive_region_manager .register_inactive_region(&RegionIdent { cluster_id: 1, datanode_id: 1, @@ -139,7 +139,7 @@ mod test { }) .await .unwrap(); - inactive_node_manager + inactive_region_manager .register_inactive_region(&RegionIdent { cluster_id: 1, datanode_id: 1, diff --git a/src/meta-srv/src/inactive_node_manager.rs b/src/meta-srv/src/inactive_region_manager.rs similarity index 69% rename from src/meta-srv/src/inactive_node_manager.rs rename to src/meta-srv/src/inactive_region_manager.rs index 427d37f2b7..b3e110584f 100644 --- a/src/meta-srv/src/inactive_node_manager.rs +++ b/src/meta-srv/src/inactive_region_manager.rs @@ -14,29 +14,26 @@ use std::collections::HashSet; -use common_meta::rpc::store::{BatchGetRequest, PutRequest}; +use common_meta::rpc::store::{BatchGetRequest, DeleteRangeRequest, PutRequest, RangeRequest}; use common_meta::RegionIdent; -use store_api::storage::RegionId; +use metrics::{decrement_gauge, increment_gauge}; use crate::error::Result; use crate::keys::InactiveRegionKey; +use crate::metrics::METRIC_META_INACTIVE_REGIONS; use crate::service::store::kv::ResettableKvStoreRef; -pub struct InactiveNodeManager<'a> { +pub struct InactiveRegionManager<'a> { store: &'a ResettableKvStoreRef, } -impl<'a> InactiveNodeManager<'a> { +impl<'a> InactiveRegionManager<'a> { pub fn new(store: &'a ResettableKvStoreRef) -> Self { Self { store } } pub async fn register_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> { - let region_id = RegionId::new( - region_ident.table_ident.table_id, - region_ident.region_number, - ) - .as_u64(); + let region_id = region_ident.get_region_id().as_u64(); let key = InactiveRegionKey { cluster_id: region_ident.cluster_id, node_id: region_ident.datanode_id, @@ -48,15 +45,14 @@ impl<'a> InactiveNodeManager<'a> { prev_kv: false, }; self.store.put(req).await?; + + increment_gauge!(METRIC_META_INACTIVE_REGIONS, 1.0); + Ok(()) } pub async fn deregister_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> { - let region_id = RegionId::new( - region_ident.table_ident.table_id, - region_ident.region_number, - ) - .as_u64(); + let region_id = region_ident.get_region_id().as_u64(); let key: Vec = InactiveRegionKey { cluster_id: region_ident.cluster_id, node_id: region_ident.datanode_id, @@ -64,6 +60,9 @@ impl<'a> InactiveNodeManager<'a> { } .into(); self.store.delete(&key, false).await?; + + decrement_gauge!(METRIC_META_INACTIVE_REGIONS, 1.0); + Ok(()) } @@ -114,4 +113,29 @@ impl<'a> InactiveNodeManager<'a> { Ok(inactive_region_ids.into_iter().flatten().collect()) } + + /// Scan all inactive regions in the cluster. + /// + /// When will these data appear? + /// Generally, it is because the corresponding Datanode is disconnected and + /// did not respond to the `Failover` scheduling instructions of metasrv. + pub async fn scan_all_inactive_regions( + &self, + cluster_id: u64, + ) -> Result> { + let prefix = InactiveRegionKey::get_prefix_by_cluster(cluster_id); + let request = RangeRequest::new().with_prefix(prefix); + let resp = self.store.range(request).await?; + let kvs = resp.kvs; + kvs.into_iter() + .map(|kv| InactiveRegionKey::try_from(kv.key)) + .collect::>>() + } + + pub async fn clear_all_inactive_regions(&self, cluster_id: u64) -> Result<()> { + let prefix = InactiveRegionKey::get_prefix_by_cluster(cluster_id); + let request = DeleteRangeRequest::new().with_prefix(prefix); + let _ = self.store.delete_range(request).await?; + Ok(()) + } } diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index a2ca626b4d..af7bc1b1e9 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -35,6 +35,10 @@ lazy_static! { Regex::new(&format!("^{DN_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); static ref DATANODE_STAT_KEY_PATTERN: Regex = Regex::new(&format!("^{DN_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); + static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!( + "^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$" + )) + .unwrap(); } #[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] @@ -164,7 +168,7 @@ impl FromStr for StatKey { fn from_str(key: &str) -> Result { let caps = DATANODE_STAT_KEY_PATTERN .captures(key) - .context(error::InvalidLeaseKeySnafu { key })?; + .context(error::InvalidStatKeySnafu { key })?; ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key }); @@ -237,13 +241,19 @@ impl TryFrom> for StatValue { } } -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct InactiveRegionKey { pub cluster_id: u64, pub node_id: u64, pub region_id: u64, } +impl InactiveRegionKey { + pub fn get_prefix_by_cluster(cluster_id: u64) -> Vec { + format!("{}-{}-", INACTIVE_REGION_PREFIX, cluster_id).into_bytes() + } +} + impl From for Vec { fn from(value: InactiveRegionKey) -> Self { format!( @@ -254,6 +264,51 @@ impl From for Vec { } } +impl FromStr for InactiveRegionKey { + type Err = error::Error; + + fn from_str(key: &str) -> Result { + let caps = INACTIVE_REGION_KEY_PATTERN + .captures(key) + .context(error::InvalidInactiveRegionKeySnafu { key })?; + + ensure!( + caps.len() == 4, + error::InvalidInactiveRegionKeySnafu { key } + ); + + let cluster_id = caps[1].to_string(); + let node_id = caps[2].to_string(); + let region_id = caps[3].to_string(); + + let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu { + err_msg: format!("invalid cluster_id: {cluster_id}"), + })?; + let node_id: u64 = node_id.parse().context(error::ParseNumSnafu { + err_msg: format!("invalid node_id: {node_id}"), + })?; + let region_id: u64 = region_id.parse().context(error::ParseNumSnafu { + err_msg: format!("invalid region_id: {region_id}"), + })?; + + Ok(Self { + cluster_id, + node_id, + region_id, + }) + } +} + +impl TryFrom> for InactiveRegionKey { + type Error = error::Error; + + fn try_from(bytes: Vec) -> Result { + String::from_utf8(bytes) + .context(error::InvalidRegionKeyFromUtf8Snafu {}) + .map(|x| x.parse())? + } +} + #[cfg(test)] mod tests { use super::*; @@ -376,4 +431,18 @@ mod tests { assert_eq!(1, stat_key.cluster_id); assert_eq!(101, stat_key.node_id); } + + #[test] + fn test_inactive_region_key_round_trip() { + let key = InactiveRegionKey { + cluster_id: 0, + node_id: 1, + region_id: 2, + }; + + let key_bytes: Vec = key.try_into().unwrap(); + let new_key: InactiveRegionKey = key_bytes.try_into().unwrap(); + + assert_eq!(new_key, key); + } } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index a7329e5353..c8ebdefd9f 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -40,7 +40,7 @@ pub mod table_routes; pub use crate::error::Result; -mod inactive_node_manager; +mod inactive_region_manager; mod greptimedb_telemetry; diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index cac6598991..709e410b0b 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -18,3 +18,4 @@ pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request"; pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request"; pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num"; pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute"; +pub const METRIC_META_INACTIVE_REGIONS: &str = "meta.inactive_regions"; diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index a6c835c25d..7fcb939c25 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -29,7 +29,7 @@ use crate::error::{ Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, }; use crate::handler::HeartbeatMailbox; -use crate::inactive_node_manager::InactiveNodeManager; +use crate::inactive_region_manager::InactiveRegionManager; use crate::procedure::region_failover::OPEN_REGION_MESSAGE_TIMEOUT; use crate::service::mailbox::{Channel, MailboxReceiver}; @@ -76,7 +76,7 @@ impl ActivateRegion { datanode_id: self.candidate.id, ..failed_region.clone() }; - InactiveNodeManager::new(&ctx.in_memory) + InactiveRegionManager::new(&ctx.in_memory) .deregister_inactive_region(&candidate) .await?; diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index a83108ad08..ce575f62d0 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -29,7 +29,7 @@ use crate::error::{ Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, }; use crate::handler::HeartbeatMailbox; -use crate::inactive_node_manager::InactiveNodeManager; +use crate::inactive_region_manager::InactiveRegionManager; use crate::service::mailbox::{Channel, MailboxReceiver}; #[derive(Serialize, Deserialize, Debug)] @@ -62,7 +62,7 @@ impl DeactivateRegion { let ch = Channel::Datanode(failed_region.datanode_id); // Mark the region as inactive - InactiveNodeManager::new(&ctx.in_memory) + InactiveRegionManager::new(&ctx.in_memory) .register_inactive_region(failed_region) .await?; // We first marked the region as inactive, which means that the failed region cannot @@ -93,7 +93,7 @@ impl DeactivateRegion { .fail(); }; if result { - InactiveNodeManager::new(&ctx.in_memory) + InactiveRegionManager::new(&ctx.in_memory) .deregister_inactive_region(failed_region) .await?; diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index e0366fa19d..6b3c8a61a8 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -14,10 +14,12 @@ mod health; mod heartbeat; +mod inactive_regions; mod leader; mod meta; mod node_lease; mod route; +mod util; use std::collections::HashMap; use std::convert::Infallible; @@ -89,6 +91,20 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { }, ); + let router = router.route( + "/inactive-regions/view", + inactive_regions::ViewInactiveRegionsHandler { + store: meta_srv.in_memory().clone(), + }, + ); + + let router = router.route( + "/inactive-regions/clear", + inactive_regions::ClearInactiveRegionsHandler { + store: meta_srv.in_memory().clone(), + }, + ); + let router = Router::nest("/admin", router); Admin::new(router) diff --git a/src/meta-srv/src/service/admin/inactive_regions.rs b/src/meta-srv/src/service/admin/inactive_regions.rs new file mode 100644 index 0000000000..18b1b5c48b --- /dev/null +++ b/src/meta-srv/src/service/admin/inactive_regions.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use tonic::codegen::http; + +use crate::error::{self, Result}; +use crate::inactive_region_manager::InactiveRegionManager; +use crate::keys::InactiveRegionKey; +use crate::service::admin::{util, HttpHandler}; +use crate::service::store::kv::ResettableKvStoreRef; + +pub struct ViewInactiveRegionsHandler { + pub store: ResettableKvStoreRef, +} + +#[async_trait::async_trait] +impl HttpHandler for ViewInactiveRegionsHandler { + async fn handle( + &self, + _: &str, + params: &HashMap, + ) -> Result> { + let cluster_id = util::extract_cluster_id(params)?; + + let inactive_region_manager = InactiveRegionManager::new(&self.store); + let inactive_regions = inactive_region_manager + .scan_all_inactive_regions(cluster_id) + .await?; + let result = InactiveRegions { inactive_regions }.try_into()?; + + http::Response::builder() + .status(http::StatusCode::OK) + .body(result) + .context(error::InvalidHttpBodySnafu) + } +} + +pub struct ClearInactiveRegionsHandler { + pub store: ResettableKvStoreRef, +} + +#[async_trait::async_trait] +impl HttpHandler for ClearInactiveRegionsHandler { + async fn handle( + &self, + _: &str, + params: &HashMap, + ) -> Result> { + let cluster_id = util::extract_cluster_id(params)?; + + let inactive_region_manager = InactiveRegionManager::new(&self.store); + inactive_region_manager + .clear_all_inactive_regions(cluster_id) + .await?; + + Ok(http::Response::builder() + .status(http::StatusCode::OK) + .body("Success\n".to_owned()) + .unwrap()) + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +struct InactiveRegions { + inactive_regions: Vec, +} + +impl TryFrom for String { + type Error = error::Error; + + fn try_from(value: InactiveRegions) -> Result { + serde_json::to_string(&value).context(error::SerializeToJsonSnafu { + input: format!("{value:?}"), + }) + } +} diff --git a/src/meta-srv/src/service/admin/node_lease.rs b/src/meta-srv/src/service/admin/node_lease.rs index 3b9b63f31d..3655cd3aae 100644 --- a/src/meta-srv/src/service/admin/node_lease.rs +++ b/src/meta-srv/src/service/admin/node_lease.rs @@ -15,14 +15,14 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use tonic::codegen::http; use crate::cluster::MetaPeerClientRef; use crate::error::{self, Result}; use crate::keys::{LeaseKey, LeaseValue}; use crate::lease; -use crate::service::admin::HttpHandler; +use crate::service::admin::{util, HttpHandler}; pub struct NodeLeaseHandler { pub meta_peer_client: MetaPeerClientRef, @@ -35,15 +35,7 @@ impl HttpHandler for NodeLeaseHandler { _: &str, params: &HashMap, ) -> Result> { - let cluster_id = params - .get("cluster_id") - .map(|id| id.parse::()) - .context(error::MissingRequiredParameterSnafu { - param: "cluster_id", - })? - .context(error::ParseNumSnafu { - err_msg: "`cluster_id` is not a valid number", - })?; + let cluster_id = util::extract_cluster_id(params)?; let leases = lease::filter_datanodes(cluster_id, &self.meta_peer_client, |_, _| true).await?; diff --git a/src/meta-srv/src/service/admin/util.rs b/src/meta-srv/src/service/admin/util.rs new file mode 100644 index 0000000000..a26bee0a8d --- /dev/null +++ b/src/meta-srv/src/service/admin/util.rs @@ -0,0 +1,31 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use snafu::{OptionExt, ResultExt}; + +use crate::error::{self, Result}; + +pub fn extract_cluster_id(params: &HashMap) -> Result { + params + .get("cluster_id") + .map(|id| id.parse::()) + .context(error::MissingRequiredParameterSnafu { + param: "cluster_id", + })? + .context(error::ParseNumSnafu { + err_msg: "`cluster_id` is not a valid number", + }) +}