mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-18 22:10:42 +00:00
feat: add CPU, memory and node status info to cluster_info (#6897)
* feat: add CPU and memory info to `cluster_info` Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: add `node_status` to `cluster_info` table Signed-off-by: WenyXu <wenymedia@gmail.com> * test: update sqlness Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: update proto Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -32,6 +32,7 @@ common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
common-workload.workspace = true
|
||||
dashmap.workspace = true
|
||||
datafusion.workspace = true
|
||||
datatypes.workspace = true
|
||||
@@ -48,6 +49,7 @@ prometheus.workspace = true
|
||||
promql-parser.workspace = true
|
||||
rand.workspace = true
|
||||
rustc-hash.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
session.workspace = true
|
||||
snafu.workspace = true
|
||||
|
||||
@@ -18,10 +18,11 @@ use std::time::Duration;
|
||||
use arrow_schema::SchemaRef as ArrowSchemaRef;
|
||||
use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cluster::NodeInfo;
|
||||
use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus};
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
|
||||
use common_time::timestamp::Timestamp;
|
||||
use common_workload::DatanodeWorkloadType;
|
||||
use datafusion::execution::TaskContext;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
|
||||
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
|
||||
@@ -32,7 +33,9 @@ use datatypes::timestamp::TimestampMillisecond;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
|
||||
UInt32VectorBuilder, UInt64VectorBuilder,
|
||||
};
|
||||
use serde::Serialize;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
|
||||
@@ -41,14 +44,20 @@ use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
|
||||
use crate::system_schema::information_schema::{CLUSTER_INFO, InformationTable, Predicates};
|
||||
use crate::system_schema::utils;
|
||||
|
||||
const PEER_TYPE_FRONTEND: &str = "FRONTEND";
|
||||
const PEER_TYPE_METASRV: &str = "METASRV";
|
||||
|
||||
const PEER_ID: &str = "peer_id";
|
||||
const PEER_TYPE: &str = "peer_type";
|
||||
const PEER_ADDR: &str = "peer_addr";
|
||||
const CPUS: &str = "cpus";
|
||||
const MEMORY_BYTES: &str = "memory_bytes";
|
||||
const VERSION: &str = "version";
|
||||
const GIT_COMMIT: &str = "git_commit";
|
||||
const START_TIME: &str = "start_time";
|
||||
const UPTIME: &str = "uptime";
|
||||
const ACTIVE_TIME: &str = "active_time";
|
||||
const NODE_STATUS: &str = "node_status";
|
||||
|
||||
const INIT_CAPACITY: usize = 42;
|
||||
|
||||
@@ -57,11 +66,14 @@ const INIT_CAPACITY: usize = 42;
|
||||
/// - `peer_id`: the peer server id.
|
||||
/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
|
||||
/// - `peer_addr`: the peer gRPC address.
|
||||
/// - `cpus`: the number of CPUs of the peer.
|
||||
/// - `memory_bytes`: the memory bytes of the peer.
|
||||
/// - `version`: the build package version of the peer.
|
||||
/// - `git_commit`: the build git commit hash of the peer.
|
||||
/// - `start_time`: the starting time of the peer.
|
||||
/// - `uptime`: the uptime of the peer.
|
||||
/// - `active_time`: the time since the last activity of the peer.
|
||||
/// - `node_status`: the status info of the peer.
|
||||
///
|
||||
#[derive(Debug)]
|
||||
pub(super) struct InformationSchemaClusterInfo {
|
||||
@@ -82,6 +94,8 @@ impl InformationSchemaClusterInfo {
|
||||
ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
|
||||
ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new(CPUS, ConcreteDataType::uint32_datatype(), false),
|
||||
ColumnSchema::new(MEMORY_BYTES, ConcreteDataType::uint64_datatype(), false),
|
||||
ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(
|
||||
@@ -91,6 +105,7 @@ impl InformationSchemaClusterInfo {
|
||||
),
|
||||
ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new(NODE_STATUS, ConcreteDataType::string_datatype(), true),
|
||||
]))
|
||||
}
|
||||
|
||||
@@ -140,11 +155,14 @@ struct InformationSchemaClusterInfoBuilder {
|
||||
peer_ids: Int64VectorBuilder,
|
||||
peer_types: StringVectorBuilder,
|
||||
peer_addrs: StringVectorBuilder,
|
||||
cpus: UInt32VectorBuilder,
|
||||
memory_bytes: UInt64VectorBuilder,
|
||||
versions: StringVectorBuilder,
|
||||
git_commits: StringVectorBuilder,
|
||||
start_times: TimestampMillisecondVectorBuilder,
|
||||
uptimes: StringVectorBuilder,
|
||||
active_times: StringVectorBuilder,
|
||||
node_status: StringVectorBuilder,
|
||||
}
|
||||
|
||||
impl InformationSchemaClusterInfoBuilder {
|
||||
@@ -155,11 +173,14 @@ impl InformationSchemaClusterInfoBuilder {
|
||||
peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
cpus: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
memory_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
node_status: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,9 +197,10 @@ impl InformationSchemaClusterInfoBuilder {
|
||||
|
||||
fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) {
|
||||
let peer_type = node_info.status.role_name();
|
||||
let peer_id = peer_id(peer_type, node_info.peer.id);
|
||||
|
||||
let row = [
|
||||
(PEER_ID, &Value::from(node_info.peer.id)),
|
||||
(PEER_ID, &Value::from(peer_id)),
|
||||
(PEER_TYPE, &Value::from(peer_type)),
|
||||
(PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
|
||||
(VERSION, &Value::from(node_info.version.as_str())),
|
||||
@@ -189,13 +211,7 @@ impl InformationSchemaClusterInfoBuilder {
|
||||
return;
|
||||
}
|
||||
|
||||
if peer_type == "FRONTEND" || peer_type == "METASRV" {
|
||||
// Always set peer_id to be -1 for frontends and metasrvs
|
||||
self.peer_ids.push(Some(-1));
|
||||
} else {
|
||||
self.peer_ids.push(Some(node_info.peer.id as i64));
|
||||
}
|
||||
|
||||
self.peer_ids.push(Some(peer_id));
|
||||
self.peer_types.push(Some(peer_type));
|
||||
self.peer_addrs.push(Some(&node_info.peer.addr));
|
||||
self.versions.push(Some(&node_info.version));
|
||||
@@ -212,6 +228,8 @@ impl InformationSchemaClusterInfoBuilder {
|
||||
self.start_times.push(None);
|
||||
self.uptimes.push(None);
|
||||
}
|
||||
self.cpus.push(Some(node_info.cpus));
|
||||
self.memory_bytes.push(Some(node_info.memory_bytes));
|
||||
|
||||
if node_info.last_activity_ts > 0 {
|
||||
self.active_times.push(Some(
|
||||
@@ -220,6 +238,8 @@ impl InformationSchemaClusterInfoBuilder {
|
||||
} else {
|
||||
self.active_times.push(None);
|
||||
}
|
||||
self.node_status
|
||||
.push(format_node_status(&node_info).as_deref());
|
||||
}
|
||||
|
||||
fn format_duration_since(ts: u64) -> String {
|
||||
@@ -233,11 +253,14 @@ impl InformationSchemaClusterInfoBuilder {
|
||||
Arc::new(self.peer_ids.finish()),
|
||||
Arc::new(self.peer_types.finish()),
|
||||
Arc::new(self.peer_addrs.finish()),
|
||||
Arc::new(self.cpus.finish()),
|
||||
Arc::new(self.memory_bytes.finish()),
|
||||
Arc::new(self.versions.finish()),
|
||||
Arc::new(self.git_commits.finish()),
|
||||
Arc::new(self.start_times.finish()),
|
||||
Arc::new(self.uptimes.finish()),
|
||||
Arc::new(self.active_times.finish()),
|
||||
Arc::new(self.node_status.finish()),
|
||||
];
|
||||
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
|
||||
}
|
||||
@@ -263,3 +286,56 @@ impl DfPartitionStream for InformationSchemaClusterInfo {
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn peer_id(peer_type: &str, peer_id: u64) -> i64 {
|
||||
if peer_type == PEER_TYPE_FRONTEND || peer_type == PEER_TYPE_METASRV {
|
||||
-1
|
||||
} else {
|
||||
peer_id as i64
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct DisplayMetasrvStatus {
|
||||
is_leader: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct DisplayDatanodeStatus {
|
||||
workloads: Vec<DatanodeWorkloadType>,
|
||||
leader_regions: usize,
|
||||
follower_regions: usize,
|
||||
}
|
||||
|
||||
impl From<&DatanodeStatus> for DisplayDatanodeStatus {
|
||||
fn from(status: &DatanodeStatus) -> Self {
|
||||
Self {
|
||||
workloads: status
|
||||
.workloads
|
||||
.types
|
||||
.iter()
|
||||
.flat_map(|w| DatanodeWorkloadType::from_i32(*w))
|
||||
.collect(),
|
||||
leader_regions: status.leader_regions,
|
||||
follower_regions: status.follower_regions,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn format_node_status(node_info: &NodeInfo) -> Option<String> {
|
||||
match &node_info.status {
|
||||
NodeStatus::Datanode(datanode_status) => {
|
||||
serde_json::to_string(&DisplayDatanodeStatus::from(datanode_status)).ok()
|
||||
}
|
||||
NodeStatus::Frontend(_) => None,
|
||||
NodeStatus::Flownode(_) => None,
|
||||
NodeStatus::Metasrv(metasrv_status) => {
|
||||
if metasrv_status.is_leader {
|
||||
serde_json::to_string(&DisplayMetasrvStatus { is_leader: true }).ok()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
NodeStatus::Standalone => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -787,6 +787,10 @@ impl InformationExtension for StandaloneInformationExtension {
|
||||
// Use `self.start_time_ms` instead.
|
||||
// It's not precise but enough.
|
||||
start_time_ms: self.start_time_ms,
|
||||
cpus: common_config::utils::get_cpus() as u32,
|
||||
memory_bytes: common_config::utils::get_sys_total_memory()
|
||||
.unwrap_or_default()
|
||||
.as_bytes(),
|
||||
};
|
||||
Ok(vec![node_info])
|
||||
}
|
||||
|
||||
@@ -39,6 +39,24 @@ pub fn get_sys_total_memory() -> Option<ReadableSize> {
|
||||
}
|
||||
}
|
||||
|
||||
/// `ResourceSpec` holds the static resource specifications of a node,
|
||||
/// such as CPU cores and memory capacity. These values are fixed
|
||||
/// at startup and do not change dynamically during runtime.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct ResourceSpec {
|
||||
pub cpus: usize,
|
||||
pub memory: Option<ReadableSize>,
|
||||
}
|
||||
|
||||
impl Default for ResourceSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
cpus: get_cpus(),
|
||||
memory: get_sys_total_memory(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -118,6 +118,12 @@ pub struct NodeInfo {
|
||||
pub git_commit: String,
|
||||
// The node star timestamp
|
||||
pub start_time_ms: u64,
|
||||
// The node build cpus
|
||||
#[serde(default)]
|
||||
pub cpus: u32,
|
||||
// The node build memory bytes
|
||||
#[serde(default)]
|
||||
pub memory_bytes: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
|
||||
@@ -324,6 +330,8 @@ mod tests {
|
||||
version: "".to_string(),
|
||||
git_commit: "".to_string(),
|
||||
start_time_ms: 1,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
};
|
||||
|
||||
let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
|
||||
|
||||
@@ -20,6 +20,7 @@ use std::time::Duration;
|
||||
use api::v1::meta::heartbeat_request::NodeWorkloads;
|
||||
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
|
||||
use common_base::Plugins;
|
||||
use common_config::utils::ResourceSpec;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::datanode::REGION_STATISTIC_KEY;
|
||||
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
|
||||
@@ -62,6 +63,7 @@ pub struct HeartbeatTask {
|
||||
interval: u64,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
region_alive_keeper: Arc<RegionAliveKeeper>,
|
||||
resource_spec: ResourceSpec,
|
||||
}
|
||||
|
||||
impl Drop for HeartbeatTask {
|
||||
@@ -104,6 +106,7 @@ impl HeartbeatTask {
|
||||
interval: opts.heartbeat.interval.as_millis() as u64,
|
||||
resp_handler_executor,
|
||||
region_alive_keeper,
|
||||
resource_spec: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -231,6 +234,8 @@ impl HeartbeatTask {
|
||||
|
||||
self.region_alive_keeper.start(Some(event_receiver)).await?;
|
||||
let mut last_sent = Instant::now();
|
||||
let cpus = self.resource_spec.cpus as u32;
|
||||
let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
|
||||
|
||||
common_runtime::spawn_hb(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
@@ -244,7 +249,8 @@ impl HeartbeatTask {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms: node_epoch,
|
||||
cpus: num_cpus::get() as u32,
|
||||
cpus,
|
||||
memory_bytes,
|
||||
}),
|
||||
node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
|
||||
types: workload_types.iter().map(|w| w.to_i32()).collect(),
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, Peer};
|
||||
use common_config::utils::ResourceSpec;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
|
||||
@@ -68,6 +69,7 @@ pub struct HeartbeatTask {
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
running: Arc<AtomicBool>,
|
||||
query_stat_size: Option<SizeReportSender>,
|
||||
resource_spec: ResourceSpec,
|
||||
}
|
||||
|
||||
impl HeartbeatTask {
|
||||
@@ -91,6 +93,7 @@ impl HeartbeatTask {
|
||||
resp_handler_executor,
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
query_stat_size: None,
|
||||
resource_spec: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,13 +177,14 @@ impl HeartbeatTask {
|
||||
})
|
||||
}
|
||||
|
||||
fn build_node_info(start_time_ms: u64) -> Option<NodeInfo> {
|
||||
fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option<NodeInfo> {
|
||||
let build_info = common_version::build_info();
|
||||
Some(NodeInfo {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms,
|
||||
cpus: num_cpus::get() as u32,
|
||||
cpus,
|
||||
memory_bytes,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -195,6 +199,8 @@ impl HeartbeatTask {
|
||||
id: self.node_id,
|
||||
addr: self.peer_addr.clone(),
|
||||
});
|
||||
let cpus = self.resource_spec.cpus as u32;
|
||||
let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
|
||||
|
||||
let query_stat_size = self.query_stat_size.clone();
|
||||
|
||||
@@ -208,7 +214,7 @@ impl HeartbeatTask {
|
||||
let heartbeat_request = HeartbeatRequest {
|
||||
peer: self_peer,
|
||||
node_epoch,
|
||||
info: Self::build_node_info(node_epoch),
|
||||
info: Self::build_node_info(node_epoch, cpus, memory_bytes),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ mod tests;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
|
||||
use common_config::utils::ResourceSpec;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
|
||||
};
|
||||
@@ -46,6 +47,7 @@ pub struct HeartbeatTask {
|
||||
retry_interval: Duration,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
start_time_ms: u64,
|
||||
resource_spec: ResourceSpec,
|
||||
}
|
||||
|
||||
impl HeartbeatTask {
|
||||
@@ -69,6 +71,7 @@ impl HeartbeatTask {
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
resp_handler_executor,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
resource_spec: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,14 +149,15 @@ impl HeartbeatTask {
|
||||
})
|
||||
}
|
||||
|
||||
fn build_node_info(start_time_ms: u64) -> Option<NodeInfo> {
|
||||
fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option<NodeInfo> {
|
||||
let build_info = common_version::build_info();
|
||||
|
||||
Some(NodeInfo {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms,
|
||||
cpus: num_cpus::get() as u32,
|
||||
cpus,
|
||||
memory_bytes,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -169,6 +173,8 @@ impl HeartbeatTask {
|
||||
id: 0,
|
||||
addr: self.peer_addr.clone(),
|
||||
});
|
||||
let cpus = self.resource_spec.cpus as u32;
|
||||
let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
|
||||
|
||||
common_runtime::spawn_hb(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
@@ -176,7 +182,7 @@ impl HeartbeatTask {
|
||||
|
||||
let heartbeat_request = HeartbeatRequest {
|
||||
peer: self_peer,
|
||||
info: Self::build_node_info(start_time_ms),
|
||||
info: Self::build_node_info(start_time_ms, cpus, memory_bytes),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -372,6 +372,8 @@ impl ClusterInfo for MetaClient {
|
||||
version: node.version,
|
||||
git_commit: node.git_commit,
|
||||
start_time_ms: node.start_time_ms,
|
||||
cpus: node.cpus,
|
||||
memory_bytes: node.memory_bytes,
|
||||
})
|
||||
.chain(leader.into_iter().map(|node| NodeInfo {
|
||||
peer: node.peer.unwrap_or_default(),
|
||||
@@ -380,6 +382,8 @@ impl ClusterInfo for MetaClient {
|
||||
version: node.version,
|
||||
git_commit: node.git_commit,
|
||||
start_time_ms: node.start_time_ms,
|
||||
cpus: node.cpus,
|
||||
memory_bytes: node.memory_bytes,
|
||||
}))
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
|
||||
@@ -128,7 +128,7 @@ impl Election for EtcdElection {
|
||||
|
||||
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
|
||||
let mut lease_client = self.client.lease_client();
|
||||
let res = lease_client
|
||||
let res: etcd_client::LeaseGrantResponse = lease_client
|
||||
.grant(CANDIDATE_LEASE_SECS as i64, None)
|
||||
.await
|
||||
.context(error::EtcdFailedSnafu)?;
|
||||
|
||||
@@ -1161,6 +1161,8 @@ mod tests {
|
||||
version: "test_version".to_string(),
|
||||
git_commit: "test_git_commit".to_string(),
|
||||
start_time_ms: 0,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
};
|
||||
mysql_election.register_candidate(&node_info).await.unwrap();
|
||||
}
|
||||
|
||||
@@ -1000,6 +1000,8 @@ mod tests {
|
||||
version: "test_version".to_string(),
|
||||
git_commit: "test_git_commit".to_string(),
|
||||
start_time_ms: 0,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
};
|
||||
pg_election.register_candidate(&node_info).await.unwrap();
|
||||
}
|
||||
|
||||
@@ -52,6 +52,8 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
|
||||
version: info.version,
|
||||
git_commit: info.git_commit,
|
||||
start_time_ms: info.start_time_ms,
|
||||
cpus: info.cpus,
|
||||
memory_bytes: info.memory_bytes,
|
||||
};
|
||||
|
||||
put_into_memory_store(ctx, key, value).await?;
|
||||
@@ -85,6 +87,8 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
|
||||
version: info.version,
|
||||
git_commit: info.git_commit,
|
||||
start_time_ms: info.start_time_ms,
|
||||
cpus: info.cpus,
|
||||
memory_bytes: info.memory_bytes,
|
||||
};
|
||||
|
||||
put_into_memory_store(ctx, key, value).await?;
|
||||
@@ -136,6 +140,8 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
|
||||
version: info.version,
|
||||
git_commit: info.git_commit,
|
||||
start_time_ms: info.start_time_ms,
|
||||
cpus: info.cpus,
|
||||
memory_bytes: info.memory_bytes,
|
||||
};
|
||||
|
||||
put_into_memory_store(ctx, key, value).await?;
|
||||
|
||||
@@ -467,6 +467,8 @@ mod tests {
|
||||
version: "1.0.0".to_string(),
|
||||
git_commit: "1234567890".to_string(),
|
||||
start_time_ms: current_time_millis() as u64,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
};
|
||||
|
||||
let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
|
||||
@@ -490,6 +492,8 @@ mod tests {
|
||||
version: "1.0.0".to_string(),
|
||||
git_commit: "1234567890".to_string(),
|
||||
start_time_ms: current_time_millis() as u64,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
};
|
||||
|
||||
in_memory
|
||||
@@ -526,6 +530,8 @@ mod tests {
|
||||
version: "1.0.0".to_string(),
|
||||
git_commit: "1234567890".to_string(),
|
||||
start_time_ms: last_activity_ts as u64,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
};
|
||||
|
||||
let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
|
||||
|
||||
@@ -22,6 +22,7 @@ use std::time::Duration;
|
||||
use clap::ValueEnum;
|
||||
use common_base::Plugins;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_config::utils::ResourceSpec;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
use common_event_recorder::EventRecorderOptions;
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
@@ -372,6 +373,12 @@ pub struct MetasrvNodeInfo {
|
||||
pub git_commit: String,
|
||||
// The node start timestamp in milliseconds
|
||||
pub start_time_ms: u64,
|
||||
// The node cpus
|
||||
#[serde(default)]
|
||||
pub cpus: u32,
|
||||
#[serde(default)]
|
||||
// The node memory bytes
|
||||
pub memory_bytes: u64,
|
||||
}
|
||||
|
||||
impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
|
||||
@@ -384,6 +391,8 @@ impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
|
||||
version: node_info.version,
|
||||
git_commit: node_info.git_commit,
|
||||
start_time_ms: node_info.start_time_ms,
|
||||
cpus: node_info.cpus,
|
||||
memory_bytes: node_info.memory_bytes,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -498,6 +507,7 @@ pub struct Metasrv {
|
||||
region_flush_ticker: Option<RegionFlushTickerRef>,
|
||||
table_id_sequence: SequenceRef,
|
||||
reconciliation_manager: ReconciliationManagerRef,
|
||||
resource_spec: ResourceSpec,
|
||||
|
||||
plugins: Plugins,
|
||||
}
|
||||
@@ -679,6 +689,10 @@ impl Metasrv {
|
||||
self.start_time_ms
|
||||
}
|
||||
|
||||
pub fn resource_spec(&self) -> &ResourceSpec {
|
||||
&self.resource_spec
|
||||
}
|
||||
|
||||
pub fn node_info(&self) -> MetasrvNodeInfo {
|
||||
let build_info = common_version::build_info();
|
||||
MetasrvNodeInfo {
|
||||
@@ -686,6 +700,8 @@ impl Metasrv {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms: self.start_time_ms(),
|
||||
cpus: self.resource_spec().cpus as u32,
|
||||
memory_bytes: self.resource_spec().memory.unwrap_or_default().as_bytes(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -564,6 +564,7 @@ impl MetasrvBuilder {
|
||||
table_id_sequence,
|
||||
reconciliation_manager,
|
||||
topic_stats_registry,
|
||||
resource_spec: Default::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,6 +98,8 @@ impl Metasrv {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms: self.start_time_ms(),
|
||||
cpus: self.resource_spec().cpus as u32,
|
||||
memory_bytes: self.resource_spec().memory.unwrap_or_default().as_bytes(),
|
||||
}
|
||||
.into()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user