feat: cluster information (#3631)

* chore: keep the same method order in KvBackend

* feat: make meta client can get all node info of cluster

* feat: cluster info data model

* feat: frontend and datanode info

* feat: list node info

* chore: remove the method: is_started

* fix: scan key prefix

* chore: impl From for NodeInfoKey

* chore: doc for trait and struct

* chore: reuse the error

* chore: refactor two collec cluster info handlers

* chore: remove inline

* chore: refactor two collec cluster info handlers
This commit is contained in:
JeremyHi
2024-04-08 15:48:36 +08:00
committed by GitHub
parent e920f95902
commit 12286f07ac
24 changed files with 879 additions and 194 deletions

View File

@@ -364,6 +364,10 @@ impl KvBackend for MetaKvBackend {
"MetaKvBackend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.client
.range(req)
@@ -372,27 +376,6 @@ impl KvBackend for MetaKvBackend {
.context(ExternalSnafu)
}
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let mut response = self
.client
.range(RangeRequest::new().with_key(key))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(response.take_kvs().get_mut(0).map(|kv| KeyValue {
key: kv.take_key(),
value: kv.take_value(),
}))
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
self.client
.batch_put(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
self.client
.put(req)
@@ -401,17 +384,9 @@ impl KvBackend for MetaKvBackend {
.context(ExternalSnafu)
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
self.client
.delete_range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
self.client
.batch_delete(req)
.batch_put(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
@@ -436,8 +411,33 @@ impl KvBackend for MetaKvBackend {
.context(ExternalSnafu)
}
fn as_any(&self) -> &dyn Any {
self
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
self.client
.delete_range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
self.client
.batch_delete(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let mut response = self
.client
.range(RangeRequest::new().with_key(key))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(response.take_kvs().get_mut(0).map(|kv| KeyValue {
key: kv.take_key(),
value: kv.take_value(),
}))
}
}

View File

@@ -0,0 +1,300 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::str::FromStr;
use common_error::ext::ErrorExt;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{
DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
InvalidRoleSnafu, ParseNumSnafu, Result,
};
use crate::peer::Peer;
const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
lazy_static! {
static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!(
"^{CLUSTER_NODE_INFO_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
))
.unwrap();
}
/// [ClusterInfo] provides information about the cluster.
#[async_trait::async_trait]
pub trait ClusterInfo {
type Error: ErrorExt;
/// List all nodes by role in the cluster. If `role` is `None`, list all nodes.
async fn list_nodes(
&self,
role: Option<Role>,
) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
// TODO(jeremy): Other info, like region status, etc.
}
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
pub cluster_id: u64,
/// The role of the node. It can be [Role::Datanode], [Role::Frontend], or [Role::Metasrv].
pub role: Role,
/// The node id.
pub node_id: u64,
}
impl NodeInfoKey {
pub fn key_prefix_with_cluster_id(cluster_id: u64) -> String {
format!("{}-{}-", CLUSTER_NODE_INFO_PREFIX, cluster_id)
}
pub fn key_prefix_with_role(cluster_id: u64, role: Role) -> String {
format!(
"{}-{}-{}-",
CLUSTER_NODE_INFO_PREFIX,
cluster_id,
i32::from(role)
)
}
}
/// The information of a node in the cluster.
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeInfo {
/// The peer information. [node_id, address]
pub peer: Peer,
/// Last activity time in milliseconds.
pub last_activity_ts: i64,
/// The status of the node. Different roles have different node status.
pub status: NodeStatus,
}
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum Role {
Datanode,
Frontend,
Metasrv,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum NodeStatus {
Datanode(DatanodeStatus),
Frontend(FrontendStatus),
Metasrv(MetasrvStatus),
}
/// The status of a datanode.
#[derive(Debug, Serialize, Deserialize)]
pub struct DatanodeStatus {
/// The read capacity units during this period.
pub rcus: i64,
/// The write capacity units during this period.
pub wcus: i64,
/// How many leader regions on this node.
pub leader_regions: usize,
/// How many follower regions on this node.
pub follower_regions: usize,
}
/// The status of a frontend.
#[derive(Debug, Serialize, Deserialize)]
pub struct FrontendStatus {}
/// The status of a metasrv.
#[derive(Debug, Serialize, Deserialize)]
pub struct MetasrvStatus {
pub is_leader: bool,
}
impl FromStr for NodeInfoKey {
type Err = Error;
fn from_str(key: &str) -> Result<Self> {
let caps = CLUSTER_NODE_INFO_PREFIX_PATTERN
.captures(key)
.context(InvalidNodeInfoKeySnafu { key })?;
ensure!(caps.len() == 4, InvalidNodeInfoKeySnafu { key });
let cluster_id = caps[1].to_string();
let role = caps[2].to_string();
let node_id = caps[3].to_string();
let cluster_id: u64 = cluster_id.parse().context(ParseNumSnafu {
err_msg: format!("invalid cluster_id: {cluster_id}"),
})?;
let role: i32 = role.parse().context(ParseNumSnafu {
err_msg: format!("invalid role {role}"),
})?;
let role = Role::try_from(role)?;
let node_id: u64 = node_id.parse().context(ParseNumSnafu {
err_msg: format!("invalid node_id: {node_id}"),
})?;
Ok(Self {
cluster_id,
role,
node_id,
})
}
}
impl TryFrom<Vec<u8>> for NodeInfoKey {
type Error = Error;
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(FromUtf8Snafu {
name: "NodeInfoKey",
})
.map(|x| x.parse())?
}
}
impl From<NodeInfoKey> for Vec<u8> {
fn from(key: NodeInfoKey) -> Self {
format!(
"{}-{}-{}-{}",
CLUSTER_NODE_INFO_PREFIX,
key.cluster_id,
i32::from(key.role),
key.node_id
)
.into_bytes()
}
}
impl FromStr for NodeInfo {
type Err = Error;
fn from_str(value: &str) -> Result<Self> {
serde_json::from_str(value).context(DecodeJsonSnafu)
}
}
impl TryFrom<Vec<u8>> for NodeInfo {
type Error = Error;
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(FromUtf8Snafu { name: "NodeInfo" })
.map(|x| x.parse())?
}
}
impl TryFrom<NodeInfo> for Vec<u8> {
type Error = Error;
fn try_from(info: NodeInfo) -> Result<Self> {
Ok(serde_json::to_string(&info)
.context(EncodeJsonSnafu)?
.into_bytes())
}
}
impl From<Role> for i32 {
fn from(role: Role) -> Self {
match role {
Role::Datanode => 0,
Role::Frontend => 1,
Role::Metasrv => 2,
}
}
}
impl TryFrom<i32> for Role {
type Error = Error;
fn try_from(role: i32) -> Result<Self> {
match role {
0 => Ok(Self::Datanode),
1 => Ok(Self::Frontend),
2 => Ok(Self::Metasrv),
_ => InvalidRoleSnafu { role }.fail(),
}
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use crate::cluster::Role::{Datanode, Frontend};
use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
use crate::peer::Peer;
#[test]
fn test_node_info_key_round_trip() {
let key = NodeInfoKey {
cluster_id: 1,
role: Datanode,
node_id: 2,
};
let key_bytes: Vec<u8> = key.into();
let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
assert_eq!(1, new_key.cluster_id);
assert_eq!(Datanode, new_key.role);
assert_eq!(2, new_key.node_id);
}
#[test]
fn test_node_info_round_trip() {
let node_info = NodeInfo {
peer: Peer {
id: 1,
addr: "127.0.0.1".to_string(),
},
last_activity_ts: 123,
status: NodeStatus::Datanode(DatanodeStatus {
rcus: 1,
wcus: 2,
leader_regions: 3,
follower_regions: 4,
}),
};
let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
assert_matches!(
new_node_info,
NodeInfo {
peer: Peer { id: 1, .. },
last_activity_ts: 123,
status: NodeStatus::Datanode(DatanodeStatus {
rcus: 1,
wcus: 2,
leader_regions: 3,
follower_regions: 4,
}),
}
);
}
#[test]
fn test_node_info_key_prefix() {
let prefix = NodeInfoKey::key_prefix_with_cluster_id(1);
assert_eq!(prefix, "__meta_cluster_node_info-1-");
let prefix = NodeInfoKey::key_prefix_with_role(2, Frontend);
assert_eq!(prefix, "__meta_cluster_node_info-2-1-");
}
}

View File

@@ -406,6 +406,28 @@ pub enum Error {
#[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
CreateLogicalTablesInvalidArguments { err_msg: String, location: Location },
#[snafu(display("Invalid node info key: {}", key))]
InvalidNodeInfoKey { key: String, location: Location },
#[snafu(display("Failed to parse number: {}", err_msg))]
ParseNum {
err_msg: String,
#[snafu(source)]
error: std::num::ParseIntError,
location: Location,
},
#[snafu(display("Invalid role: {}", role))]
InvalidRole { role: i32, location: Location },
#[snafu(display("Failed to parse {} from utf8", name))]
FromUtf8 {
name: String,
#[snafu(source)]
error: std::string::FromUtf8Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -452,6 +474,7 @@ impl ErrorExt for Error {
| EmptyTopicPool { .. }
| UnexpectedLogicalRouteTable { .. }
| ProcedureOutput { .. }
| FromUtf8 { .. }
| MetadataCorruption { .. } => StatusCode::Unexpected,
SendMessage { .. }
@@ -486,6 +509,9 @@ impl ErrorExt for Error {
ParseProcedureId { .. }
| InvalidNumTopics { .. }
| SchemaNotFound { .. }
| InvalidNodeInfoKey { .. }
| ParseNum { .. }
| InvalidRole { .. }
| EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -18,6 +18,7 @@
#![feature(let_chains)]
pub mod cache_invalidator;
pub mod cluster;
pub mod datanode_manager;
pub mod ddl;
pub mod ddl_manager;

View File

@@ -17,7 +17,6 @@ pub mod lock;
pub mod procedure;
pub mod router;
pub mod store;
pub mod util;
use std::fmt::{Display, Formatter};

View File

@@ -26,9 +26,9 @@ use api::v1::meta::{
ResponseHeader as PbResponseHeader,
};
use crate::error;
use crate::error::Result;
use crate::rpc::{util, KeyValue};
use crate::rpc::KeyValue;
use crate::{error, util};
pub fn to_range(key: Vec<u8>, range_end: Vec<u8>) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
match (&key[..], &range_end[..]) {

View File

@@ -1,46 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::ResponseHeader;
use crate::error;
use crate::error::Result;
#[inline]
pub fn check_response_header(header: Option<&ResponseHeader>) -> Result<()> {
if let Some(header) = header {
if let Some(error) = &header.error {
let code = error.code;
let err_msg = &error.err_msg;
return error::IllegalServerStateSnafu { code, err_msg }.fail();
}
}
Ok(())
}
/// Get prefix end key of `key`.
#[inline]
pub fn get_prefix_end_key(key: &[u8]) -> Vec<u8> {
for (i, v) in key.iter().enumerate().rev() {
if *v < 0xFF {
let mut end = Vec::from(&key[..=i]);
end[i] = *v + 1;
return end;
}
}
// next prefix does not exist (e.g., 0xffff);
vec![0]
}

View File

@@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::ResponseHeader;
use crate::error::{IllegalServerStateSnafu, Result};
/// Get prefix end key of `key`.
#[inline]
pub fn get_prefix_end_key(key: &[u8]) -> Vec<u8> {
for (i, v) in key.iter().enumerate().rev() {
if *v < 0xFF {
@@ -27,8 +30,19 @@ pub fn get_prefix_end_key(key: &[u8]) -> Vec<u8> {
vec![0]
}
pub fn check_response_header(header: Option<&ResponseHeader>) -> Result<()> {
if let Some(header) = header {
if let Some(error) = &header.error {
let code = error.code;
let err_msg = &error.err_msg;
return IllegalServerStateSnafu { code, err_msg }.fail();
}
}
Ok(())
}
/// Get next prefix key of `key`.
#[inline]
pub fn get_next_prefix_key(key: &[u8]) -> Vec<u8> {
let mut next = Vec::with_capacity(key.len() + 1);
next.extend_from_slice(key);

View File

@@ -144,12 +144,14 @@ impl Instance {
let channel_manager = ChannelManager::with_config(channel_config);
let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config);
let cluster_id = 0; // TODO(jeremy): read from config
let mut meta_client = MetaClientBuilder::new(cluster_id, 0, Role::Frontend)
let cluster_id = 0; // It is currently a reserved field and has not been enabled.
let member_id = 0; // Frontend does not need a member id.
let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Frontend)
.enable_router()
.enable_store()
.enable_heartbeat()
.enable_procedure()
.enable_access_cluster_info()
.channel_manager(channel_manager)
.ddl_channel_manager(ddl_channel_manager)
.build();

View File

@@ -18,9 +18,12 @@ mod load_balance;
mod lock;
mod procedure;
mod cluster;
mod store;
mod util;
use api::v1::meta::Role;
use cluster::Client as ClusterClient;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
@@ -50,6 +53,7 @@ pub type Id = (u64, u64);
const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
#[derive(Clone, Debug, Default)]
pub struct MetaClientBuilder {
@@ -60,6 +64,7 @@ pub struct MetaClientBuilder {
enable_store: bool,
enable_lock: bool,
enable_procedure: bool,
enable_access_cluster_info: bool,
channel_manager: Option<ChannelManager>,
ddl_channel_manager: Option<ChannelManager>,
heartbeat_channel_manager: Option<ChannelManager>,
@@ -109,6 +114,13 @@ impl MetaClientBuilder {
}
}
pub fn enable_access_cluster_info(self) -> Self {
Self {
enable_access_cluster_info: true,
..self
}
}
pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
Self {
channel_manager: Some(channel_manager),
@@ -159,7 +171,7 @@ impl MetaClientBuilder {
client.lock = Some(LockClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_procedure {
let mgr = self.ddl_channel_manager.unwrap_or(mgr);
let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
client.procedure = Some(ProcedureClient::new(
self.id,
self.role,
@@ -167,6 +179,14 @@ impl MetaClientBuilder {
DEFAULT_SUBMIT_DDL_MAX_RETRY,
));
}
if self.enable_access_cluster_info {
client.cluster = Some(ClusterClient::new(
self.id,
self.role,
mgr,
DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
))
}
client
}
@@ -180,6 +200,7 @@ pub struct MetaClient {
store: Option<StoreClient>,
lock: Option<LockClient>,
procedure: Option<ProcedureClient>,
cluster: Option<ClusterClient>,
}
#[async_trait::async_trait]
@@ -254,9 +275,13 @@ impl MetaClient {
info!("Lock client started");
}
if let Some(client) = &mut self.procedure {
client.start(urls).await?;
client.start(urls.clone()).await?;
info!("DDL client started");
}
if let Some(client) = &mut self.cluster {
client.start(urls).await?;
info!("Cluster client started");
}
Ok(())
}
@@ -493,7 +518,6 @@ mod tests {
let _ = meta_client.heartbeat_client().unwrap();
assert!(meta_client.store_client().is_err());
meta_client.start(urls).await.unwrap();
assert!(meta_client.heartbeat_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_router()
@@ -508,7 +532,6 @@ mod tests {
assert!(meta_client.heartbeat_client().is_err());
let _ = meta_client.store_client().unwrap();
meta_client.start(urls).await.unwrap();
assert!(meta_client.store_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new(1, 2, Role::Datanode)
.enable_heartbeat()
@@ -520,8 +543,6 @@ mod tests {
let _ = meta_client.heartbeat_client().unwrap();
let _ = meta_client.store_client().unwrap();
meta_client.start(urls).await.unwrap();
assert!(meta_client.heartbeat_client().unwrap().is_started().await);
assert!(meta_client.store_client().unwrap().is_started().await);
}
#[tokio::test]

View File

@@ -0,0 +1,242 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use std::sync::Arc;
use api::greptime_proto::v1;
use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{ResponseHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::cluster;
use common_meta::cluster::{ClusterInfo, NodeInfo, NodeInfoKey};
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse};
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use tonic::Status;
use crate::client::ask_leader::AskLeader;
use crate::client::{util, Id};
use crate::error::{
ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu, Result,
RetryTimesExceededSnafu,
};
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
ask_leader: None,
max_retry,
}));
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
}
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let inner = self.inner.read().await;
inner.range(req).await
}
#[allow(dead_code)]
pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
let inner = self.inner.read().await;
inner.batch_get(req).await
}
}
#[async_trait::async_trait]
impl ClusterInfo for Client {
type Error = Error;
async fn list_nodes(&self, role: Option<cluster::Role>) -> Result<Vec<NodeInfo>> {
let cluster_id = self.inner.read().await.id.0;
let key_prefix = match role {
None => NodeInfoKey::key_prefix_with_cluster_id(cluster_id),
Some(role) => NodeInfoKey::key_prefix_with_role(cluster_id, role),
};
let req = RangeRequest::new().with_prefix(key_prefix);
let res = self.range(req).await?;
res.kvs
.into_iter()
.map(|kv| NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu))
.collect::<Result<Vec<_>>>()
}
}
#[derive(Debug)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
max_retry: usize,
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
ensure!(
!self.is_started(),
IllegalGrpcClientStateSnafu {
err_msg: "Cluster client already started",
}
);
let peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<Vec<_>>();
self.ask_leader = Some(AskLeader::new(
self.id,
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));
Ok(())
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;
Ok(ClusterClient::new(channel))
}
#[inline]
fn is_started(&self) -> bool {
self.ask_leader.is_some()
}
fn ask_leader(&self) -> Result<&AskLeader> {
ensure!(
self.is_started(),
IllegalGrpcClientStateSnafu {
err_msg: "Cluster client not start"
}
);
Ok(self.ask_leader.as_ref().unwrap())
}
async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
where
R: Future<Output = std::result::Result<T, Status>>,
F: Fn(ClusterClient<Channel>) -> R,
H: Fn(&T) -> &Option<ResponseHeader>,
{
let ask_leader = self.ask_leader()?;
let mut times = 0;
while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
let client = self.make_client(leader)?;
match body_fn(client).await {
Ok(res) => {
if util::is_not_leader(get_header(&res)) {
warn!("Failed to {task} to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("Cluster client updated to new leader addr: {leader}");
times += 1;
continue;
}
return Ok(res);
}
Err(status) => {
// The leader may be unreachable.
if util::is_unreachable(&status) {
warn!("Failed to {task} to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
info!("Cluster client updated to new leader addr: {leader}");
times += 1;
continue;
} else {
return Err(Error::from(status));
}
}
}
} else if let Err(err) = ask_leader.ask_leader().await {
return Err(err);
}
}
RetryTimesExceededSnafu {
msg: "Failed to {task}",
times: self.max_retry,
}
.fail()
}
async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
self.with_retry(
"range",
move |mut client| {
let inner_req = tonic::Request::new(v1::meta::RangeRequest::from(request.clone()));
async move { client.range(inner_req).await.map(|res| res.into_inner()) }
},
|res| &res.header,
)
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
async fn batch_get(&self, request: BatchGetRequest) -> Result<BatchGetResponse> {
self.with_retry(
"batch_get",
move |mut client| {
let inner_req =
tonic::Request::new(v1::meta::BatchGetRequest::from(request.clone()));
async move {
client
.batch_get(inner_req)
.await
.map(|res| res.into_inner())
}
},
|res| &res.header,
)
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
}

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::rpc::util;
use common_meta::util;
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
@@ -128,11 +128,6 @@ impl Client {
inner.ask_leader().await?;
inner.heartbeat().await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
}
#[derive(Debug)]
@@ -267,17 +262,6 @@ impl Inner {
mod test {
use super::*;
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3);
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3);
@@ -285,7 +269,6 @@ mod test {
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(

View File

@@ -53,11 +53,6 @@ impl Client {
inner.start(urls).await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
pub async fn lock(&self, req: LockRequest) -> Result<LockResponse> {
let inner = self.inner.read().await;
inner.lock(req).await
@@ -155,17 +150,6 @@ impl Inner {
mod tests {
use super::*;
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
@@ -173,7 +157,6 @@ mod tests {
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(

View File

@@ -18,8 +18,8 @@ use std::time::Duration;
use api::v1::meta::procedure_service_client::ProcedureServiceClient;
use api::v1::meta::{
DdlTaskRequest, DdlTaskResponse, ErrorCode, MigrateRegionRequest, MigrateRegionResponse,
ProcedureId, ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, Role,
DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureId,
ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, Role,
};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
@@ -27,10 +27,10 @@ use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use tonic::{Code, Status};
use tonic::Status;
use crate::client::ask_leader::AskLeader;
use crate::client::Id;
use crate::client::{util, Id};
use crate::error;
use crate::error::Result;
@@ -61,11 +61,6 @@ impl Client {
inner.start(urls).await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
pub async fn submit_ddl_task(&self, req: DdlTaskRequest) -> Result<DdlTaskResponse> {
let inner = self.inner.read().await;
inner.submit_ddl_task(req).await
@@ -173,7 +168,7 @@ impl Inner {
let client = self.make_client(leader)?;
match body_fn(client).await {
Ok(res) => {
if is_not_leader(get_header(&res)) {
if util::is_not_leader(get_header(&res)) {
warn!("Failed to {task} to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
@@ -184,7 +179,7 @@ impl Inner {
}
Err(status) => {
// The leader may be unreachable.
if is_unreachable(&status) {
if util::is_unreachable(&status) {
warn!("Failed to {task} to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
info!("Procedure client updated to new leader addr: {leader}");
@@ -282,17 +277,3 @@ impl Inner {
.await
}
}
fn is_unreachable(status: &Status) -> bool {
status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded
}
fn is_not_leader(header: &Option<ResponseHeader>) -> bool {
if let Some(header) = header {
if let Some(err) = header.error.as_ref() {
return err.code == ErrorCode::NotLeader as i32;
}
}
false
}

View File

@@ -57,11 +57,6 @@ impl Client {
inner.start(urls).await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let inner = self.inner.read().await;
inner.range(req).await
@@ -254,17 +249,6 @@ impl Inner {
mod test {
use super::*;
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
@@ -272,7 +256,6 @@ mod test {
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(

View File

@@ -0,0 +1,32 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{ErrorCode, ResponseHeader};
use tonic::{Code, Status};
pub(crate) fn is_unreachable(status: &Status) -> bool {
status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded
}
pub(crate) fn is_not_leader(header: &Option<ResponseHeader>) -> bool {
let Some(header) = header else {
return false;
};
let Some(err) = header.error.as_ref() else {
return false;
};
err.code == ErrorCode::NotLeader as i32
}

View File

@@ -105,6 +105,21 @@ impl KvBackend for MetaPeerClient {
.fail()
}
// MetaPeerClient does not support mutable methods listed below.
async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
error::UnsupportedSnafu {
operation: "put".to_string(),
}
.fail()
}
async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
error::UnsupportedSnafu {
operation: "batch put".to_string(),
}
.fail()
}
// Get kv information from the leader's in_mem kv store
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
if self.is_leader() {
@@ -139,21 +154,6 @@ impl KvBackend for MetaPeerClient {
.fail()
}
// MetaPeerClient does not support mutable methods listed below.
async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
error::UnsupportedSnafu {
operation: "put".to_string(),
}
.fail()
}
async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
error::UnsupportedSnafu {
operation: "batch put".to_string(),
}
.fail()
}
async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
error::UnsupportedSnafu {
operation: "compare and put".to_string(),
@@ -175,13 +175,6 @@ impl KvBackend for MetaPeerClient {
.fail()
}
async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result<Option<KeyValue>> {
error::UnsupportedSnafu {
operation: "delete".to_string(),
}
.fail()
}
async fn put_conditionally(
&self,
_key: Vec<u8>,
@@ -193,6 +186,13 @@ impl KvBackend for MetaPeerClient {
}
.fail()
}
async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result<Option<KeyValue>> {
error::UnsupportedSnafu {
operation: "delete".to_string(),
}
.fail()
}
}
impl MetaPeerClient {

View File

@@ -654,6 +654,18 @@ pub enum Error {
err_msg: String,
source: common_meta::error::Error,
},
#[snafu(display("Failed to save cluster info"))]
SaveClusterInfo {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Invalid cluster info format"))]
InvalidClusterInfoFormat {
location: Location,
source: common_meta::error::Error,
},
}
impl Error {
@@ -746,6 +758,8 @@ impl ErrorExt for Error {
| Error::MigrationAbort { .. }
| Error::MigrationRunning { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::SaveClusterInfo { source, .. }
| Error::InvalidClusterInfoFormat { source, .. } => source.status_code(),
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. }

View File

@@ -41,6 +41,7 @@ use crate::service::mailbox::{
};
pub mod check_leader_handler;
pub mod collect_cluster_info_handler;
pub mod collect_stats_handler;
pub mod failure_handler;
pub mod filter_inactive_region_stats;

View File

@@ -0,0 +1,143 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use common_meta::cluster;
use common_meta::cluster::{DatanodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
use common_meta::peer::Peer;
use common_meta::rpc::store::PutRequest;
use snafu::ResultExt;
use store_api::region_engine::RegionRole;
use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::Result;
/// The handler to collect cluster info from the heartbeat request of frontend.
pub struct CollectFrontendClusterInfoHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Frontend
}
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some((key, peer)) = extract_base_info(req, Role::Frontend) else {
return Ok(HandleControl::Continue);
};
let value = NodeInfo {
peer,
last_activity_ts: common_time::util::current_time_millis(),
status: NodeStatus::Frontend(FrontendStatus {}),
};
save_to_mem_store(key, value, ctx).await?;
Ok(HandleControl::Continue)
}
}
/// The handler to collect cluster info from the heartbeat request of datanode.
pub struct CollectDatanodeClusterInfoHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some((key, peer)) = extract_base_info(req, Role::Datanode) else {
return Ok(HandleControl::Continue);
};
let Some(stat) = &acc.stat else {
return Ok(HandleControl::Continue);
};
let leader_regions = stat
.region_stats
.iter()
.filter(|s| s.role == RegionRole::Leader)
.count();
let follower_regions = stat.region_stats.len() - leader_regions;
let value = NodeInfo {
peer,
last_activity_ts: stat.timestamp_millis,
status: NodeStatus::Datanode(DatanodeStatus {
rcus: stat.rcus,
wcus: stat.wcus,
leader_regions,
follower_regions,
}),
};
save_to_mem_store(key, value, ctx).await?;
Ok(HandleControl::Continue)
}
}
fn extract_base_info(req: &HeartbeatRequest, role: Role) -> Option<(NodeInfoKey, Peer)> {
let HeartbeatRequest { header, peer, .. } = req;
let Some(header) = &header else {
return None;
};
let Some(peer) = &peer else {
return None;
};
Some((
NodeInfoKey {
cluster_id: header.cluster_id,
role: match role {
Role::Datanode => cluster::Role::Datanode,
Role::Frontend => cluster::Role::Frontend,
},
node_id: peer.id,
},
Peer::from(peer.clone()),
))
}
async fn save_to_mem_store(key: NodeInfoKey, value: NodeInfo, ctx: &mut Context) -> Result<()> {
let key = key.into();
let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
let put_req = PutRequest {
key,
value,
..Default::default()
};
ctx.in_memory
.put(put_req)
.await
.context(SaveClusterInfoSnafu)?;
Ok(())
}

View File

@@ -22,8 +22,8 @@ pub struct OnLeaderStartHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for OnLeaderStartHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
fn is_acceptable(&self, _: Role) -> bool {
true
}
async fn handle(

View File

@@ -22,7 +22,7 @@ pub struct ResponseHeaderHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for ResponseHeaderHandler {
fn is_acceptable(&self, _role: Role) -> bool {
fn is_acceptable(&self, _: Role) -> bool {
true
}

View File

@@ -40,6 +40,9 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, Result};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_cluster_info_handler::{
CollectDatanodeClusterInfoHandler, CollectFrontendClusterInfoHandler,
};
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
@@ -298,6 +301,8 @@ impl MetaSrvBuilder {
group.add_handler(CheckLeaderHandler).await;
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(CollectStatsHandler).await;
group.add_handler(CollectDatanodeClusterInfoHandler).await;
group.add_handler(CollectFrontendClusterInfoHandler).await;
group.add_handler(MailboxHandler).await;
group.add_handler(region_lease_handler).await;
group.add_handler(FilterInactiveRegionStatsHandler).await;

View File

@@ -348,6 +348,7 @@ impl GreptimeDbClusterBuilder {
.enable_heartbeat()
.channel_manager(meta_srv.channel_manager)
.enable_procedure()
.enable_access_cluster_info()
.build();
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);