feat: add metric and manage tool for InactiveRegionKey (#2313)

* feat: add metric and manage tool for InactiveRegionKey

* chore: by review comment
This commit is contained in:
JeremyHi
2023-09-04 14:20:52 +08:00
committed by Ruihang Xia
parent a3d5931fca
commit 920763d7dd
13 changed files with 282 additions and 39 deletions

View File

@@ -15,6 +15,7 @@
use std::fmt::{Display, Formatter};
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use crate::ident::TableIdent;
use crate::{ClusterId, DatanodeId};
@@ -27,6 +28,12 @@ pub struct RegionIdent {
pub region_number: u32,
}
impl RegionIdent {
pub fn get_region_id(&self) -> RegionId {
RegionId::new(self.table_ident.table_id, self.region_number)
}
}
impl Display for RegionIdent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(

View File

@@ -156,6 +156,9 @@ pub enum Error {
#[snafu(display("Invalid datanode stat key: {}", key))]
InvalidStatKey { key: String, location: Location },
#[snafu(display("Invalid inactive region key: {}", key))]
InvalidInactiveRegionKey { key: String, location: Location },
#[snafu(display("Failed to parse datanode lease key from utf8: {}", source))]
LeaseKeyFromUtf8 {
source: std::string::FromUtf8Error,
@@ -180,6 +183,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse invalid region key from utf8: {}", source))]
InvalidRegionKeyFromUtf8 {
source: std::string::FromUtf8Error,
location: Location,
},
#[snafu(display("Failed to serialize to json: {}", input))]
SerializeToJson {
input: String,
@@ -556,6 +565,7 @@ impl ErrorExt for Error {
| Error::EmptyTableName { .. }
| Error::InvalidLeaseKey { .. }
| Error::InvalidStatKey { .. }
| Error::InvalidInactiveRegionKey { .. }
| Error::ParseNum { .. }
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. }
@@ -565,6 +575,7 @@ impl ErrorExt for Error {
| Error::LeaseValueFromUtf8 { .. }
| Error::StatKeyFromUtf8 { .. }
| Error::StatValueFromUtf8 { .. }
| Error::InvalidRegionKeyFromUtf8 { .. }
| Error::UnexpectedSequenceValue { .. }
| Error::TableRouteNotFound { .. }
| Error::TableInfoNotFound { .. }

View File

@@ -17,7 +17,7 @@ use async_trait::async_trait;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::inactive_node_manager::InactiveNodeManager;
use crate::inactive_region_manager::InactiveRegionManager;
use crate::metasrv::Context;
pub struct RegionLeaseHandler {
@@ -50,8 +50,8 @@ impl HeartbeatHandler for RegionLeaseHandler {
let mut region_ids = stat.region_ids();
let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory);
let inactive_region_ids = inactive_node_manager
let inactive_region_manager = InactiveRegionManager::new(&ctx.in_memory);
let inactive_region_ids = inactive_region_manager
.retain_active_regions(stat.cluster_id, stat.id, &mut region_ids)
.await?;
@@ -126,8 +126,8 @@ mod test {
..Default::default()
});
let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory);
inactive_node_manager
let inactive_region_manager = InactiveRegionManager::new(&ctx.in_memory);
inactive_region_manager
.register_inactive_region(&RegionIdent {
cluster_id: 1,
datanode_id: 1,
@@ -139,7 +139,7 @@ mod test {
})
.await
.unwrap();
inactive_node_manager
inactive_region_manager
.register_inactive_region(&RegionIdent {
cluster_id: 1,
datanode_id: 1,

View File

@@ -14,29 +14,26 @@
use std::collections::HashSet;
use common_meta::rpc::store::{BatchGetRequest, PutRequest};
use common_meta::rpc::store::{BatchGetRequest, DeleteRangeRequest, PutRequest, RangeRequest};
use common_meta::RegionIdent;
use store_api::storage::RegionId;
use metrics::{decrement_gauge, increment_gauge};
use crate::error::Result;
use crate::keys::InactiveRegionKey;
use crate::metrics::METRIC_META_INACTIVE_REGIONS;
use crate::service::store::kv::ResettableKvStoreRef;
pub struct InactiveNodeManager<'a> {
pub struct InactiveRegionManager<'a> {
store: &'a ResettableKvStoreRef,
}
impl<'a> InactiveNodeManager<'a> {
impl<'a> InactiveRegionManager<'a> {
pub fn new(store: &'a ResettableKvStoreRef) -> Self {
Self { store }
}
pub async fn register_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> {
let region_id = RegionId::new(
region_ident.table_ident.table_id,
region_ident.region_number,
)
.as_u64();
let region_id = region_ident.get_region_id().as_u64();
let key = InactiveRegionKey {
cluster_id: region_ident.cluster_id,
node_id: region_ident.datanode_id,
@@ -48,15 +45,14 @@ impl<'a> InactiveNodeManager<'a> {
prev_kv: false,
};
self.store.put(req).await?;
increment_gauge!(METRIC_META_INACTIVE_REGIONS, 1.0);
Ok(())
}
pub async fn deregister_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> {
let region_id = RegionId::new(
region_ident.table_ident.table_id,
region_ident.region_number,
)
.as_u64();
let region_id = region_ident.get_region_id().as_u64();
let key: Vec<u8> = InactiveRegionKey {
cluster_id: region_ident.cluster_id,
node_id: region_ident.datanode_id,
@@ -64,6 +60,9 @@ impl<'a> InactiveNodeManager<'a> {
}
.into();
self.store.delete(&key, false).await?;
decrement_gauge!(METRIC_META_INACTIVE_REGIONS, 1.0);
Ok(())
}
@@ -114,4 +113,29 @@ impl<'a> InactiveNodeManager<'a> {
Ok(inactive_region_ids.into_iter().flatten().collect())
}
/// Scan all inactive regions in the cluster.
///
/// When will these data appear?
/// Generally, it is because the corresponding Datanode is disconnected and
/// did not respond to the `Failover` scheduling instructions of metasrv.
pub async fn scan_all_inactive_regions(
&self,
cluster_id: u64,
) -> Result<Vec<InactiveRegionKey>> {
let prefix = InactiveRegionKey::get_prefix_by_cluster(cluster_id);
let request = RangeRequest::new().with_prefix(prefix);
let resp = self.store.range(request).await?;
let kvs = resp.kvs;
kvs.into_iter()
.map(|kv| InactiveRegionKey::try_from(kv.key))
.collect::<Result<Vec<_>>>()
}
pub async fn clear_all_inactive_regions(&self, cluster_id: u64) -> Result<()> {
let prefix = InactiveRegionKey::get_prefix_by_cluster(cluster_id);
let request = DeleteRangeRequest::new().with_prefix(prefix);
let _ = self.store.delete_range(request).await?;
Ok(())
}
}

View File

@@ -35,6 +35,10 @@ lazy_static! {
Regex::new(&format!("^{DN_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
static ref DATANODE_STAT_KEY_PATTERN: Regex =
Regex::new(&format!("^{DN_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!(
"^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
))
.unwrap();
}
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
@@ -164,7 +168,7 @@ impl FromStr for StatKey {
fn from_str(key: &str) -> Result<Self> {
let caps = DATANODE_STAT_KEY_PATTERN
.captures(key)
.context(error::InvalidLeaseKeySnafu { key })?;
.context(error::InvalidStatKeySnafu { key })?;
ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
@@ -237,13 +241,19 @@ impl TryFrom<Vec<u8>> for StatValue {
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct InactiveRegionKey {
pub cluster_id: u64,
pub node_id: u64,
pub region_id: u64,
}
impl InactiveRegionKey {
pub fn get_prefix_by_cluster(cluster_id: u64) -> Vec<u8> {
format!("{}-{}-", INACTIVE_REGION_PREFIX, cluster_id).into_bytes()
}
}
impl From<InactiveRegionKey> for Vec<u8> {
fn from(value: InactiveRegionKey) -> Self {
format!(
@@ -254,6 +264,51 @@ impl From<InactiveRegionKey> for Vec<u8> {
}
}
impl FromStr for InactiveRegionKey {
type Err = error::Error;
fn from_str(key: &str) -> Result<Self> {
let caps = INACTIVE_REGION_KEY_PATTERN
.captures(key)
.context(error::InvalidInactiveRegionKeySnafu { key })?;
ensure!(
caps.len() == 4,
error::InvalidInactiveRegionKeySnafu { key }
);
let cluster_id = caps[1].to_string();
let node_id = caps[2].to_string();
let region_id = caps[3].to_string();
let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid cluster_id: {cluster_id}"),
})?;
let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid node_id: {node_id}"),
})?;
let region_id: u64 = region_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid region_id: {region_id}"),
})?;
Ok(Self {
cluster_id,
node_id,
region_id,
})
}
}
impl TryFrom<Vec<u8>> for InactiveRegionKey {
type Error = error::Error;
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(error::InvalidRegionKeyFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -376,4 +431,18 @@ mod tests {
assert_eq!(1, stat_key.cluster_id);
assert_eq!(101, stat_key.node_id);
}
#[test]
fn test_inactive_region_key_round_trip() {
let key = InactiveRegionKey {
cluster_id: 0,
node_id: 1,
region_id: 2,
};
let key_bytes: Vec<u8> = key.try_into().unwrap();
let new_key: InactiveRegionKey = key_bytes.try_into().unwrap();
assert_eq!(new_key, key);
}
}

View File

@@ -40,7 +40,7 @@ pub mod table_routes;
pub use crate::error::Result;
mod inactive_node_manager;
mod inactive_region_manager;
mod greptimedb_telemetry;

View File

@@ -18,3 +18,4 @@ pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request";
pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request";
pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num";
pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute";
pub const METRIC_META_INACTIVE_REGIONS: &str = "meta.inactive_regions";

View File

@@ -29,7 +29,7 @@ use crate::error::{
Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::HeartbeatMailbox;
use crate::inactive_node_manager::InactiveNodeManager;
use crate::inactive_region_manager::InactiveRegionManager;
use crate::procedure::region_failover::OPEN_REGION_MESSAGE_TIMEOUT;
use crate::service::mailbox::{Channel, MailboxReceiver};
@@ -76,7 +76,7 @@ impl ActivateRegion {
datanode_id: self.candidate.id,
..failed_region.clone()
};
InactiveNodeManager::new(&ctx.in_memory)
InactiveRegionManager::new(&ctx.in_memory)
.deregister_inactive_region(&candidate)
.await?;

View File

@@ -29,7 +29,7 @@ use crate::error::{
Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::HeartbeatMailbox;
use crate::inactive_node_manager::InactiveNodeManager;
use crate::inactive_region_manager::InactiveRegionManager;
use crate::service::mailbox::{Channel, MailboxReceiver};
#[derive(Serialize, Deserialize, Debug)]
@@ -62,7 +62,7 @@ impl DeactivateRegion {
let ch = Channel::Datanode(failed_region.datanode_id);
// Mark the region as inactive
InactiveNodeManager::new(&ctx.in_memory)
InactiveRegionManager::new(&ctx.in_memory)
.register_inactive_region(failed_region)
.await?;
// We first marked the region as inactive, which means that the failed region cannot
@@ -93,7 +93,7 @@ impl DeactivateRegion {
.fail();
};
if result {
InactiveNodeManager::new(&ctx.in_memory)
InactiveRegionManager::new(&ctx.in_memory)
.deregister_inactive_region(failed_region)
.await?;

View File

@@ -14,10 +14,12 @@
mod health;
mod heartbeat;
mod inactive_regions;
mod leader;
mod meta;
mod node_lease;
mod route;
mod util;
use std::collections::HashMap;
use std::convert::Infallible;
@@ -89,6 +91,20 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
},
);
let router = router.route(
"/inactive-regions/view",
inactive_regions::ViewInactiveRegionsHandler {
store: meta_srv.in_memory().clone(),
},
);
let router = router.route(
"/inactive-regions/clear",
inactive_regions::ClearInactiveRegionsHandler {
store: meta_srv.in_memory().clone(),
},
);
let router = Router::nest("/admin", router);
Admin::new(router)

View File

@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tonic::codegen::http;
use crate::error::{self, Result};
use crate::inactive_region_manager::InactiveRegionManager;
use crate::keys::InactiveRegionKey;
use crate::service::admin::{util, HttpHandler};
use crate::service::store::kv::ResettableKvStoreRef;
pub struct ViewInactiveRegionsHandler {
pub store: ResettableKvStoreRef,
}
#[async_trait::async_trait]
impl HttpHandler for ViewInactiveRegionsHandler {
async fn handle(
&self,
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let cluster_id = util::extract_cluster_id(params)?;
let inactive_region_manager = InactiveRegionManager::new(&self.store);
let inactive_regions = inactive_region_manager
.scan_all_inactive_regions(cluster_id)
.await?;
let result = InactiveRegions { inactive_regions }.try_into()?;
http::Response::builder()
.status(http::StatusCode::OK)
.body(result)
.context(error::InvalidHttpBodySnafu)
}
}
pub struct ClearInactiveRegionsHandler {
pub store: ResettableKvStoreRef,
}
#[async_trait::async_trait]
impl HttpHandler for ClearInactiveRegionsHandler {
async fn handle(
&self,
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let cluster_id = util::extract_cluster_id(params)?;
let inactive_region_manager = InactiveRegionManager::new(&self.store);
inactive_region_manager
.clear_all_inactive_regions(cluster_id)
.await?;
Ok(http::Response::builder()
.status(http::StatusCode::OK)
.body("Success\n".to_owned())
.unwrap())
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
struct InactiveRegions {
inactive_regions: Vec<InactiveRegionKey>,
}
impl TryFrom<InactiveRegions> for String {
type Error = error::Error;
fn try_from(value: InactiveRegions) -> Result<Self> {
serde_json::to_string(&value).context(error::SerializeToJsonSnafu {
input: format!("{value:?}"),
})
}
}

View File

@@ -15,14 +15,14 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use tonic::codegen::http;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Result};
use crate::keys::{LeaseKey, LeaseValue};
use crate::lease;
use crate::service::admin::HttpHandler;
use crate::service::admin::{util, HttpHandler};
pub struct NodeLeaseHandler {
pub meta_peer_client: MetaPeerClientRef,
@@ -35,15 +35,7 @@ impl HttpHandler for NodeLeaseHandler {
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let cluster_id = params
.get("cluster_id")
.map(|id| id.parse::<u64>())
.context(error::MissingRequiredParameterSnafu {
param: "cluster_id",
})?
.context(error::ParseNumSnafu {
err_msg: "`cluster_id` is not a valid number",
})?;
let cluster_id = util::extract_cluster_id(params)?;
let leases =
lease::filter_datanodes(cluster_id, &self.meta_peer_client, |_, _| true).await?;

View File

@@ -0,0 +1,31 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
pub fn extract_cluster_id(params: &HashMap<String, String>) -> Result<u64> {
params
.get("cluster_id")
.map(|id| id.parse::<u64>())
.context(error::MissingRequiredParameterSnafu {
param: "cluster_id",
})?
.context(error::ParseNumSnafu {
err_msg: "`cluster_id` is not a valid number",
})
}