mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: add cpu_usage_millicores and memory_usage_bytes in information_schema.cluster_info table. (#7051)
* refactor: add `hostname` in cluster_info table Signed-off-by: zyy17 <zyylsxm@gmail.com> * chore: update information schema result Signed-off-by: zyy17 <zyylsxm@gmail.com> * feat: enable zstd for bulk memtable encoded parts (#7045) feat: enable zstd in bulk memtable Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: add `get_total_cpu_millicores()` / `get_total_cpu_cores()` / `get_total_memory_bytes()` / `get_total_memory_readable()` in common-stat Signed-off-by: zyy17 <zyylsxm@gmail.com> * feat: add `cpu_usage_millicores` and `memory_usage_bytes` in `information_schema.cluster_info` table Signed-off-by: zyy17 <zyylsxm@gmail.com> * fix: compile warning and integration test failed Signed-off-by: zyy17 <zyylsxm@gmail.com> * fix: integration test failed Signed-off-by: zyy17 <zyylsxm@gmail.com> * refactor: add `ResourceStat` Signed-off-by: zyy17 <zyylsxm@gmail.com> * refactor: apply code review comments Signed-off-by: zyy17 <zyylsxm@gmail.com> * chore: update greptime-proto Signed-off-by: zyy17 <zyylsxm@gmail.com> --------- Signed-off-by: zyy17 <zyylsxm@gmail.com> Signed-off-by: evenyag <realevenyag@gmail.com> Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2025,7 +2025,6 @@ dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-stat",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-wal",
|
||||
@@ -2546,11 +2545,14 @@ name = "common-stat"
|
||||
version = "0.18.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"lazy_static",
|
||||
"nix 0.30.1",
|
||||
"num_cpus",
|
||||
"prometheus",
|
||||
"sysinfo",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3907,6 +3909,7 @@ dependencies = [
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-stat",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
@@ -4904,6 +4907,7 @@ dependencies = [
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-stat",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
@@ -5319,7 +5323,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=72a0d22e0f5f716b2ee21bca091f87a88c36e5ca#72a0d22e0f5f716b2ee21bca091f87a88c36e5ca"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=14b9dc40bdc8288742b0cefc7bb024303b7429ef#14b9dc40bdc8288742b0cefc7bb024303b7429ef"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.5",
|
||||
@@ -7398,6 +7402,7 @@ dependencies = [
|
||||
"common-procedure",
|
||||
"common-procedure-test",
|
||||
"common-runtime",
|
||||
"common-stat",
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"common-version",
|
||||
@@ -12996,6 +13001,7 @@ dependencies = [
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-stat",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
|
||||
@@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "72a0d22e0f5f716b2ee21bca091f87a88c36e5ca" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -33,7 +33,6 @@ use datatypes::timestamp::TimestampMillisecond;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
|
||||
UInt32VectorBuilder, UInt64VectorBuilder,
|
||||
};
|
||||
use serde::Serialize;
|
||||
use snafu::ResultExt;
|
||||
@@ -53,6 +52,8 @@ const PEER_ADDR: &str = "peer_addr";
|
||||
const PEER_HOSTNAME: &str = "peer_hostname";
|
||||
const TOTAL_CPU_MILLICORES: &str = "total_cpu_millicores";
|
||||
const TOTAL_MEMORY_BYTES: &str = "total_memory_bytes";
|
||||
const CPU_USAGE_MILLICORES: &str = "cpu_usage_millicores";
|
||||
const MEMORY_USAGE_BYTES: &str = "memory_usage_bytes";
|
||||
const VERSION: &str = "version";
|
||||
const GIT_COMMIT: &str = "git_commit";
|
||||
const START_TIME: &str = "start_time";
|
||||
@@ -67,15 +68,17 @@ const INIT_CAPACITY: usize = 42;
|
||||
/// - `peer_id`: the peer server id.
|
||||
/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
|
||||
/// - `peer_addr`: the peer gRPC address.
|
||||
/// - `peer_hostname`: the hostname of the peer.
|
||||
/// - `total_cpu_millicores`: the total CPU millicores of the peer.
|
||||
/// - `total_memory_bytes`: the total memory bytes of the peer.
|
||||
/// - `cpu_usage_millicores`: the CPU usage millicores of the peer.
|
||||
/// - `memory_usage_bytes`: the memory usage bytes of the peer.
|
||||
/// - `version`: the build package version of the peer.
|
||||
/// - `git_commit`: the build git commit hash of the peer.
|
||||
/// - `start_time`: the starting time of the peer.
|
||||
/// - `uptime`: the uptime of the peer.
|
||||
/// - `active_time`: the time since the last activity of the peer.
|
||||
/// - `node_status`: the status info of the peer.
|
||||
/// - `peer_hostname`: the hostname of the peer.
|
||||
///
|
||||
#[derive(Debug)]
|
||||
pub(super) struct InformationSchemaClusterInfo {
|
||||
@@ -99,12 +102,22 @@ impl InformationSchemaClusterInfo {
|
||||
ColumnSchema::new(PEER_HOSTNAME, ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
TOTAL_CPU_MILLICORES,
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
TOTAL_MEMORY_BYTES,
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
CPU_USAGE_MILLICORES,
|
||||
ConcreteDataType::int64_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
MEMORY_USAGE_BYTES,
|
||||
ConcreteDataType::int64_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
|
||||
@@ -167,8 +180,10 @@ struct InformationSchemaClusterInfoBuilder {
|
||||
peer_types: StringVectorBuilder,
|
||||
peer_addrs: StringVectorBuilder,
|
||||
peer_hostnames: StringVectorBuilder,
|
||||
cpus: UInt32VectorBuilder,
|
||||
memory_bytes: UInt64VectorBuilder,
|
||||
total_cpu_millicores: Int64VectorBuilder,
|
||||
total_memory_bytes: Int64VectorBuilder,
|
||||
cpu_usage_millicores: Int64VectorBuilder,
|
||||
memory_usage_bytes: Int64VectorBuilder,
|
||||
versions: StringVectorBuilder,
|
||||
git_commits: StringVectorBuilder,
|
||||
start_times: TimestampMillisecondVectorBuilder,
|
||||
@@ -186,8 +201,10 @@ impl InformationSchemaClusterInfoBuilder {
|
||||
peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
peer_hostnames: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
cpus: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
memory_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
total_cpu_millicores: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
total_memory_bytes: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
cpu_usage_millicores: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
memory_usage_bytes: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
@@ -243,8 +260,14 @@ impl InformationSchemaClusterInfoBuilder {
|
||||
self.start_times.push(None);
|
||||
self.uptimes.push(None);
|
||||
}
|
||||
self.cpus.push(Some(node_info.cpus));
|
||||
self.memory_bytes.push(Some(node_info.memory_bytes));
|
||||
self.total_cpu_millicores
|
||||
.push(Some(node_info.total_cpu_millicores));
|
||||
self.total_memory_bytes
|
||||
.push(Some(node_info.total_memory_bytes));
|
||||
self.cpu_usage_millicores
|
||||
.push(Some(node_info.cpu_usage_millicores));
|
||||
self.memory_usage_bytes
|
||||
.push(Some(node_info.memory_usage_bytes));
|
||||
|
||||
if node_info.last_activity_ts > 0 {
|
||||
self.active_times.push(Some(
|
||||
@@ -269,8 +292,10 @@ impl InformationSchemaClusterInfoBuilder {
|
||||
Arc::new(self.peer_types.finish()),
|
||||
Arc::new(self.peer_addrs.finish()),
|
||||
Arc::new(self.peer_hostnames.finish()),
|
||||
Arc::new(self.cpus.finish()),
|
||||
Arc::new(self.memory_bytes.finish()),
|
||||
Arc::new(self.total_cpu_millicores.finish()),
|
||||
Arc::new(self.total_memory_bytes.finish()),
|
||||
Arc::new(self.cpu_usage_millicores.finish()),
|
||||
Arc::new(self.memory_usage_bytes.finish()),
|
||||
Arc::new(self.versions.finish()),
|
||||
Arc::new(self.git_commits.finish()),
|
||||
Arc::new(self.start_times.finish()),
|
||||
|
||||
@@ -30,6 +30,7 @@ use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHand
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_stat::ResourceStatImpl;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
|
||||
use common_version::{short_version, verbose_version};
|
||||
@@ -372,11 +373,15 @@ impl StartCommand {
|
||||
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
|
||||
]);
|
||||
|
||||
let mut resource_stat = ResourceStatImpl::default();
|
||||
resource_stat.start_collect_cpu_usage();
|
||||
|
||||
let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
|
||||
&opts,
|
||||
meta_client.clone(),
|
||||
opts.heartbeat.clone(),
|
||||
Arc::new(executor),
|
||||
Arc::new(resource_stat),
|
||||
);
|
||||
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
|
||||
|
||||
@@ -30,6 +30,7 @@ use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
|
||||
use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_stat::ResourceStatImpl;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
|
||||
use common_time::timezone::set_default_timezone;
|
||||
@@ -421,11 +422,15 @@ impl StartCommand {
|
||||
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
|
||||
]);
|
||||
|
||||
let mut resource_stat = ResourceStatImpl::default();
|
||||
resource_stat.start_collect_cpu_usage();
|
||||
|
||||
let heartbeat_task = HeartbeatTask::new(
|
||||
&opts,
|
||||
meta_client.clone(),
|
||||
opts.heartbeat.clone(),
|
||||
Arc::new(executor),
|
||||
Arc::new(resource_stat),
|
||||
);
|
||||
let heartbeat_task = Some(heartbeat_task);
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ workspace = true
|
||||
common-base.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-stat.workspace = true
|
||||
config.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
object-store.workspace = true
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
pub mod config;
|
||||
pub mod error;
|
||||
pub mod utils;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_stat::{get_total_cpu_millicores, get_total_memory_readable};
|
||||
|
||||
/// `ResourceSpec` holds the static resource specifications of a node,
|
||||
/// such as CPU cores and memory capacity. These values are fixed
|
||||
/// at startup and do not change dynamically during runtime.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct ResourceSpec {
|
||||
pub cpus: i64,
|
||||
pub memory: Option<ReadableSize>,
|
||||
}
|
||||
|
||||
impl Default for ResourceSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
cpus: get_total_cpu_millicores(),
|
||||
memory: get_total_memory_readable(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -120,10 +120,16 @@ pub struct NodeInfo {
|
||||
pub start_time_ms: u64,
|
||||
// The node build cpus
|
||||
#[serde(default)]
|
||||
pub cpus: u32,
|
||||
pub total_cpu_millicores: i64,
|
||||
// The node build memory bytes
|
||||
#[serde(default)]
|
||||
pub memory_bytes: u64,
|
||||
pub total_memory_bytes: i64,
|
||||
// The node build cpu usage millicores
|
||||
#[serde(default)]
|
||||
pub cpu_usage_millicores: i64,
|
||||
// The node build memory usage bytes
|
||||
#[serde(default)]
|
||||
pub memory_usage_bytes: i64,
|
||||
// The node build hostname
|
||||
#[serde(default)]
|
||||
pub hostname: String,
|
||||
@@ -333,8 +339,10 @@ mod tests {
|
||||
version: "".to_string(),
|
||||
git_commit: "".to_string(),
|
||||
start_time_ms: 1,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
total_cpu_millicores: 0,
|
||||
total_memory_bytes: 0,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
hostname: "test_hostname".to_string(),
|
||||
};
|
||||
|
||||
|
||||
@@ -6,11 +6,14 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-base.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
lazy_static.workspace = true
|
||||
nix.workspace = true
|
||||
num_cpus.workspace = true
|
||||
prometheus.workspace = true
|
||||
sysinfo.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -117,7 +117,10 @@ pub fn get_cpu_limit_from_cgroups() -> Option<i64> {
|
||||
None
|
||||
}
|
||||
|
||||
fn get_cpu_usage() -> Option<i64> {
|
||||
/// Get the usage of cpu in millicores from cgroups filesystem.
|
||||
///
|
||||
/// - Return `None` if it's not in the cgroups v2 environment or fails to read the cpu usage.
|
||||
pub fn get_cpu_usage_from_cgroups() -> Option<i64> {
|
||||
// In certain bare-metal environments, the `/sys/fs/cgroup/cpu.stat` file may be present and reflect system-wide CPU usage rather than container-specific metrics.
|
||||
// To ensure accurate collection of container-level CPU usage, verify the existence of the `/sys/fs/cgroup/memory.current` file.
|
||||
// The presence of this file typically indicates execution within a containerized environment, thereby validating the relevance of the collected CPU usage data.
|
||||
@@ -142,6 +145,22 @@ fn get_cpu_usage() -> Option<i64> {
|
||||
fields[1].trim().parse::<i64>().ok()
|
||||
}
|
||||
|
||||
// Calculate the cpu usage in millicores from cgroups filesystem.
|
||||
//
|
||||
// - Return `0` if the current cpu usage is equal to the last cpu usage or the interval is 0.
|
||||
pub(crate) fn calculate_cpu_usage(
|
||||
current_cpu_usage_usecs: i64,
|
||||
last_cpu_usage_usecs: i64,
|
||||
interval_milliseconds: i64,
|
||||
) -> i64 {
|
||||
let diff = current_cpu_usage_usecs - last_cpu_usage_usecs;
|
||||
if diff > 0 && interval_milliseconds > 0 {
|
||||
((diff as f64 / interval_milliseconds as f64).round() as i64).max(1)
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether the cgroup is v2.
|
||||
// - Return `true` if the cgroup is v2, otherwise return `false`.
|
||||
// - Return `None` if the detection fails or not on linux.
|
||||
@@ -230,7 +249,7 @@ impl Collector for CgroupsMetricsCollector {
|
||||
}
|
||||
|
||||
fn collect(&self) -> Vec<MetricFamily> {
|
||||
if let Some(cpu_usage) = get_cpu_usage() {
|
||||
if let Some(cpu_usage) = get_cpu_usage_from_cgroups() {
|
||||
self.cpu_usage.set(cpu_usage);
|
||||
}
|
||||
|
||||
|
||||
@@ -13,66 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod cgroups;
|
||||
mod resource;
|
||||
|
||||
pub use cgroups::*;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use sysinfo::System;
|
||||
|
||||
/// Get the total CPU in millicores.
|
||||
pub fn get_total_cpu_millicores() -> i64 {
|
||||
// Get CPU limit from cgroups filesystem.
|
||||
if let Some(cgroup_cpu_limit) = get_cpu_limit_from_cgroups() {
|
||||
cgroup_cpu_limit
|
||||
} else {
|
||||
// Get total CPU cores from host system.
|
||||
num_cpus::get() as i64 * 1000
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the total memory in bytes.
|
||||
pub fn get_total_memory_bytes() -> i64 {
|
||||
// Get memory limit from cgroups filesystem.
|
||||
if let Some(cgroup_memory_limit) = get_memory_limit_from_cgroups() {
|
||||
cgroup_memory_limit
|
||||
} else {
|
||||
// Get total memory from host system.
|
||||
if sysinfo::IS_SUPPORTED_SYSTEM {
|
||||
let mut sys_info = System::new();
|
||||
sys_info.refresh_memory();
|
||||
sys_info.total_memory() as i64
|
||||
} else {
|
||||
// If the system is not supported, return -1.
|
||||
-1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the total CPU cores. The result will be rounded to the nearest integer.
|
||||
/// For example, if the total CPU is 1.5 cores(1500 millicores), the result will be 2.
|
||||
pub fn get_total_cpu_cores() -> usize {
|
||||
((get_total_cpu_millicores() as f64) / 1000.0).round() as usize
|
||||
}
|
||||
|
||||
/// Get the total memory in readable size.
|
||||
pub fn get_total_memory_readable() -> Option<ReadableSize> {
|
||||
if get_total_memory_bytes() > 0 {
|
||||
Some(ReadableSize(get_total_memory_bytes() as u64))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_total_cpu_cores() {
|
||||
assert!(get_total_cpu_cores() > 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_total_memory_readable() {
|
||||
assert!(get_total_memory_readable().unwrap() > ReadableSize::mb(0));
|
||||
}
|
||||
}
|
||||
pub use resource::*;
|
||||
|
||||
187
src/common/stat/src/resource.rs
Normal file
187
src/common/stat/src/resource.rs
Normal file
@@ -0,0 +1,187 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_runtime::JoinHandle;
|
||||
use common_telemetry::info;
|
||||
use sysinfo::System;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::cgroups::calculate_cpu_usage;
|
||||
use crate::{
|
||||
get_cpu_limit_from_cgroups, get_cpu_usage_from_cgroups, get_memory_limit_from_cgroups,
|
||||
get_memory_usage_from_cgroups,
|
||||
};
|
||||
|
||||
/// Get the total CPU in millicores. If the CPU limit is unset, it will return the total CPU cores from host system.
|
||||
pub fn get_total_cpu_millicores() -> i64 {
|
||||
// Get CPU limit from cgroups filesystem.
|
||||
if let Some(cgroup_cpu_limit) = get_cpu_limit_from_cgroups() {
|
||||
cgroup_cpu_limit
|
||||
} else {
|
||||
// Get total CPU cores from host system.
|
||||
num_cpus::get() as i64 * 1000
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the total memory in bytes. If the memory limit is unset, it will return the total memory from host system.
|
||||
/// If the system is not supported to get the total host memory, it will return 0.
|
||||
pub fn get_total_memory_bytes() -> i64 {
|
||||
// Get memory limit from cgroups filesystem.
|
||||
if let Some(cgroup_memory_limit) = get_memory_limit_from_cgroups() {
|
||||
cgroup_memory_limit
|
||||
} else {
|
||||
// Get total memory from host system.
|
||||
if sysinfo::IS_SUPPORTED_SYSTEM {
|
||||
let mut sys_info = System::new();
|
||||
sys_info.refresh_memory();
|
||||
sys_info.total_memory() as i64
|
||||
} else {
|
||||
// If the system is not supported, return 0
|
||||
0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the total CPU cores. The result will be rounded to the nearest integer.
|
||||
/// For example, if the total CPU is 1.5 cores(1500 millicores), the result will be 2.
|
||||
pub fn get_total_cpu_cores() -> usize {
|
||||
((get_total_cpu_millicores() as f64) / 1000.0).round() as usize
|
||||
}
|
||||
|
||||
/// Get the total memory in readable size.
|
||||
pub fn get_total_memory_readable() -> Option<ReadableSize> {
|
||||
if get_total_memory_bytes() > 0 {
|
||||
Some(ReadableSize(get_total_memory_bytes() as u64))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// A reference to a `ResourceStat` implementation.
|
||||
pub type ResourceStatRef = Arc<dyn ResourceStat + Send + Sync>;
|
||||
|
||||
/// A trait for getting resource statistics.
|
||||
pub trait ResourceStat {
|
||||
/// Get the total CPU in millicores.
|
||||
fn get_total_cpu_millicores(&self) -> i64;
|
||||
/// Get the total memory in bytes.
|
||||
fn get_total_memory_bytes(&self) -> i64;
|
||||
/// Get the CPU usage in millicores.
|
||||
fn get_cpu_usage_millicores(&self) -> i64;
|
||||
/// Get the memory usage in bytes.
|
||||
fn get_memory_usage_bytes(&self) -> i64;
|
||||
}
|
||||
|
||||
/// A implementation of `ResourceStat` trait.
|
||||
pub struct ResourceStatImpl {
|
||||
cpu_usage_millicores: Arc<AtomicI64>,
|
||||
last_cpu_usage_usecs: Arc<AtomicI64>,
|
||||
calculate_interval: Duration,
|
||||
handler: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl Default for ResourceStatImpl {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
cpu_usage_millicores: Arc::new(AtomicI64::new(0)),
|
||||
last_cpu_usage_usecs: Arc::new(AtomicI64::new(0)),
|
||||
calculate_interval: Duration::from_secs(5),
|
||||
handler: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ResourceStatImpl {
|
||||
/// Start collecting CPU usage periodically. It will calculate the CPU usage in millicores based on rate of change of CPU usage usage_usec in `/sys/fs/cgroup/cpu.stat`.
|
||||
/// It ONLY works in cgroup v2 environment.
|
||||
pub fn start_collect_cpu_usage(&mut self) {
|
||||
if self.handler.is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
let cpu_usage_millicores = self.cpu_usage_millicores.clone();
|
||||
let last_cpu_usage_usecs = self.last_cpu_usage_usecs.clone();
|
||||
let calculate_interval = self.calculate_interval;
|
||||
|
||||
let handler = common_runtime::spawn_global(async move {
|
||||
info!(
|
||||
"Starting to collect CPU usage periodically for every {} seconds",
|
||||
calculate_interval.as_secs()
|
||||
);
|
||||
loop {
|
||||
let current_cpu_usage_usecs = get_cpu_usage_from_cgroups();
|
||||
if let Some(current_cpu_usage_usecs) = current_cpu_usage_usecs {
|
||||
// Skip the first time to collect CPU usage.
|
||||
if last_cpu_usage_usecs.load(Ordering::Relaxed) == 0 {
|
||||
last_cpu_usage_usecs.store(current_cpu_usage_usecs, Ordering::Relaxed);
|
||||
continue;
|
||||
}
|
||||
let cpu_usage = calculate_cpu_usage(
|
||||
current_cpu_usage_usecs,
|
||||
last_cpu_usage_usecs.load(Ordering::Relaxed),
|
||||
calculate_interval.as_millis() as i64,
|
||||
);
|
||||
cpu_usage_millicores.store(cpu_usage, Ordering::Relaxed);
|
||||
last_cpu_usage_usecs.store(current_cpu_usage_usecs, Ordering::Relaxed);
|
||||
}
|
||||
sleep(calculate_interval).await;
|
||||
}
|
||||
});
|
||||
|
||||
self.handler = Some(handler);
|
||||
}
|
||||
}
|
||||
|
||||
impl ResourceStat for ResourceStatImpl {
|
||||
/// Get the total CPU in millicores.
|
||||
fn get_total_cpu_millicores(&self) -> i64 {
|
||||
get_total_cpu_millicores()
|
||||
}
|
||||
|
||||
/// Get the total memory in bytes.
|
||||
fn get_total_memory_bytes(&self) -> i64 {
|
||||
get_total_memory_bytes()
|
||||
}
|
||||
|
||||
/// Get the CPU usage in millicores.
|
||||
fn get_cpu_usage_millicores(&self) -> i64 {
|
||||
self.cpu_usage_millicores.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Get the memory usage in bytes.
|
||||
/// It ONLY works in cgroup v2 environment.
|
||||
fn get_memory_usage_bytes(&self) -> i64 {
|
||||
get_memory_usage_from_cgroups().unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_total_cpu_cores() {
|
||||
assert!(get_total_cpu_cores() > 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_total_memory_readable() {
|
||||
assert!(get_total_memory_readable().unwrap() > ReadableSize::mb(0));
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,7 @@ common-procedure.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-stat.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
|
||||
@@ -27,6 +27,7 @@ use common_meta::key::runtime_switch::RuntimeSwitchManager;
|
||||
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
pub use common_procedure::options::ProcedureConfig;
|
||||
use common_stat::ResourceStatImpl;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use common_wal::config::kafka::DatanodeKafkaConfig;
|
||||
@@ -282,6 +283,9 @@ impl DatanodeBuilder {
|
||||
open_all_regions.await?;
|
||||
}
|
||||
|
||||
let mut resource_stat = ResourceStatImpl::default();
|
||||
resource_stat.start_collect_cpu_usage();
|
||||
|
||||
let heartbeat_task = if let Some(meta_client) = meta_client {
|
||||
Some(
|
||||
HeartbeatTask::try_new(
|
||||
@@ -290,6 +294,7 @@ impl DatanodeBuilder {
|
||||
meta_client,
|
||||
cache_registry,
|
||||
self.plugins.clone(),
|
||||
Arc::new(resource_stat),
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
|
||||
@@ -20,7 +20,6 @@ use std::time::Duration;
|
||||
use api::v1::meta::heartbeat_request::NodeWorkloads;
|
||||
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
|
||||
use common_base::Plugins;
|
||||
use common_config::utils::ResourceSpec;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::datanode::REGION_STATISTIC_KEY;
|
||||
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
|
||||
@@ -31,6 +30,7 @@ use common_meta::heartbeat::handler::{
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_stat::ResourceStatRef;
|
||||
use common_telemetry::{debug, error, info, trace, warn};
|
||||
use common_workload::DatanodeWorkloadType;
|
||||
use meta_client::MetaClientRef;
|
||||
@@ -63,7 +63,7 @@ pub struct HeartbeatTask {
|
||||
interval: u64,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
region_alive_keeper: Arc<RegionAliveKeeper>,
|
||||
resource_spec: ResourceSpec,
|
||||
resource_stat: ResourceStatRef,
|
||||
}
|
||||
|
||||
impl Drop for HeartbeatTask {
|
||||
@@ -80,6 +80,7 @@ impl HeartbeatTask {
|
||||
meta_client: MetaClientRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
plugins: Plugins,
|
||||
resource_stat: ResourceStatRef,
|
||||
) -> Result<Self> {
|
||||
let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
|
||||
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
|
||||
@@ -109,7 +110,7 @@ impl HeartbeatTask {
|
||||
interval: opts.heartbeat.interval.as_millis() as u64,
|
||||
resp_handler_executor,
|
||||
region_alive_keeper,
|
||||
resource_spec: Default::default(),
|
||||
resource_stat,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -186,6 +187,7 @@ impl HeartbeatTask {
|
||||
.context(error::HandleHeartbeatResponseSnafu)
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
/// Start heartbeat task, spawn background task.
|
||||
pub async fn start(
|
||||
&self,
|
||||
@@ -237,8 +239,9 @@ impl HeartbeatTask {
|
||||
|
||||
self.region_alive_keeper.start(Some(event_receiver)).await?;
|
||||
let mut last_sent = Instant::now();
|
||||
let cpus = self.resource_spec.cpus as u32;
|
||||
let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
|
||||
let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
|
||||
let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
|
||||
let resource_stat = self.resource_stat.clone();
|
||||
|
||||
common_runtime::spawn_hb(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
@@ -252,8 +255,13 @@ impl HeartbeatTask {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms: node_epoch,
|
||||
cpus,
|
||||
memory_bytes,
|
||||
total_cpu_millicores,
|
||||
total_memory_bytes,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
// TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
|
||||
cpus: total_cpu_millicores as u32,
|
||||
memory_bytes: total_memory_bytes as u64,
|
||||
hostname: hostname::get()
|
||||
.unwrap_or_default()
|
||||
.to_string_lossy()
|
||||
@@ -297,12 +305,18 @@ impl HeartbeatTask {
|
||||
let topic_stats = region_server_clone.topic_stats();
|
||||
let now = Instant::now();
|
||||
let duration_since_epoch = (now - epoch).as_millis() as u64;
|
||||
let req = HeartbeatRequest {
|
||||
let mut req = HeartbeatRequest {
|
||||
region_stats,
|
||||
topic_stats,
|
||||
duration_since_epoch,
|
||||
..heartbeat_request.clone()
|
||||
};
|
||||
|
||||
if let Some(info) = req.info.as_mut() {
|
||||
info.cpu_usage_millicores = resource_stat.get_cpu_usage_millicores();
|
||||
info.memory_usage_bytes = resource_stat.get_memory_usage_bytes();
|
||||
}
|
||||
|
||||
sleep.as_mut().reset(now + Duration::from_millis(interval));
|
||||
Some(req)
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, Peer};
|
||||
use common_config::utils::ResourceSpec;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
|
||||
@@ -26,6 +25,7 @@ use common_meta::heartbeat::handler::{
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_meta::key::flow::flow_state::FlowStat;
|
||||
use common_stat::ResourceStatRef;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use greptime_proto::v1::meta::NodeInfo;
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
@@ -69,7 +69,7 @@ pub struct HeartbeatTask {
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
running: Arc<AtomicBool>,
|
||||
query_stat_size: Option<SizeReportSender>,
|
||||
resource_spec: ResourceSpec,
|
||||
resource_stat: ResourceStatRef,
|
||||
}
|
||||
|
||||
impl HeartbeatTask {
|
||||
@@ -77,11 +77,13 @@ impl HeartbeatTask {
|
||||
self.query_stat_size = Some(query_stat_size);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
opts: &FlownodeOptions,
|
||||
meta_client: Arc<MetaClient>,
|
||||
heartbeat_opts: HeartbeatOptions,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
resource_stat: ResourceStatRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
node_id: opts.node_id.unwrap_or(0),
|
||||
@@ -93,7 +95,7 @@ impl HeartbeatTask {
|
||||
resp_handler_executor,
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
query_stat_size: None,
|
||||
resource_spec: Default::default(),
|
||||
resource_stat,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,6 +148,8 @@ impl HeartbeatTask {
|
||||
heartbeat_request: &HeartbeatRequest,
|
||||
message: Option<OutgoingMessage>,
|
||||
latest_report: &Option<FlowStat>,
|
||||
cpu_usage: i64,
|
||||
memory_usage: i64,
|
||||
) -> Option<HeartbeatRequest> {
|
||||
let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
|
||||
Some(Ok(message)) => Some(message),
|
||||
@@ -170,21 +174,38 @@ impl HeartbeatTask {
|
||||
.collect(),
|
||||
});
|
||||
|
||||
Some(HeartbeatRequest {
|
||||
let mut heartbeat_request = HeartbeatRequest {
|
||||
mailbox_message,
|
||||
flow_stat,
|
||||
..heartbeat_request.clone()
|
||||
})
|
||||
};
|
||||
|
||||
if let Some(info) = heartbeat_request.info.as_mut() {
|
||||
info.cpu_usage_millicores = cpu_usage;
|
||||
info.memory_usage_bytes = memory_usage;
|
||||
}
|
||||
|
||||
fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option<NodeInfo> {
|
||||
Some(heartbeat_request)
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn build_node_info(
|
||||
start_time_ms: u64,
|
||||
total_cpu_millicores: i64,
|
||||
total_memory_bytes: i64,
|
||||
) -> Option<NodeInfo> {
|
||||
let build_info = common_version::build_info();
|
||||
Some(NodeInfo {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms,
|
||||
cpus,
|
||||
memory_bytes,
|
||||
total_cpu_millicores,
|
||||
total_memory_bytes,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
// TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
|
||||
cpus: total_cpu_millicores as u32,
|
||||
memory_bytes: total_memory_bytes as u64,
|
||||
hostname: hostname::get()
|
||||
.unwrap_or_default()
|
||||
.to_string_lossy()
|
||||
@@ -203,9 +224,9 @@ impl HeartbeatTask {
|
||||
id: self.node_id,
|
||||
addr: self.peer_addr.clone(),
|
||||
});
|
||||
let cpus = self.resource_spec.cpus as u32;
|
||||
let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
|
||||
|
||||
let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
|
||||
let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
|
||||
let resource_stat = self.resource_stat.clone();
|
||||
let query_stat_size = self.query_stat_size.clone();
|
||||
|
||||
common_runtime::spawn_hb(async move {
|
||||
@@ -218,7 +239,7 @@ impl HeartbeatTask {
|
||||
let heartbeat_request = HeartbeatRequest {
|
||||
peer: self_peer,
|
||||
node_epoch,
|
||||
info: Self::build_node_info(node_epoch, cpus, memory_bytes),
|
||||
info: Self::build_node_info(node_epoch, total_cpu_millicores, total_memory_bytes),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -226,7 +247,7 @@ impl HeartbeatTask {
|
||||
let req = tokio::select! {
|
||||
message = outgoing_rx.recv() => {
|
||||
if let Some(message) = message {
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report, 0, 0)
|
||||
} else {
|
||||
warn!("Sender has been dropped, exiting the heartbeat loop");
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
@@ -234,7 +255,7 @@ impl HeartbeatTask {
|
||||
}
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
Self::new_heartbeat_request(&heartbeat_request, None, &latest_report)
|
||||
Self::new_heartbeat_request(&heartbeat_request, None, &latest_report, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ common-procedure.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-stat.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
|
||||
@@ -18,12 +18,12 @@ mod tests;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
|
||||
use common_config::utils::ResourceSpec;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_stat::ResourceStatRef;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
@@ -47,7 +47,7 @@ pub struct HeartbeatTask {
|
||||
retry_interval: Duration,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
start_time_ms: u64,
|
||||
resource_spec: ResourceSpec,
|
||||
resource_stat: ResourceStatRef,
|
||||
}
|
||||
|
||||
impl HeartbeatTask {
|
||||
@@ -56,6 +56,7 @@ impl HeartbeatTask {
|
||||
meta_client: Arc<MetaClient>,
|
||||
heartbeat_opts: HeartbeatOptions,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
resource_stat: ResourceStatRef,
|
||||
) -> Self {
|
||||
HeartbeatTask {
|
||||
// if internal grpc is configured, use its address as the peer address
|
||||
@@ -71,7 +72,7 @@ impl HeartbeatTask {
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
resp_handler_executor,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
resource_spec: Default::default(),
|
||||
resource_stat,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,6 +134,8 @@ impl HeartbeatTask {
|
||||
fn new_heartbeat_request(
|
||||
heartbeat_request: &HeartbeatRequest,
|
||||
message: Option<OutgoingMessage>,
|
||||
cpu_usage: i64,
|
||||
memory_usage: i64,
|
||||
) -> Option<HeartbeatRequest> {
|
||||
let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
|
||||
Some(Ok(message)) => Some(message),
|
||||
@@ -143,21 +146,38 @@ impl HeartbeatTask {
|
||||
None => None,
|
||||
};
|
||||
|
||||
Some(HeartbeatRequest {
|
||||
let mut heartbeat_request = HeartbeatRequest {
|
||||
mailbox_message,
|
||||
..heartbeat_request.clone()
|
||||
})
|
||||
};
|
||||
|
||||
if let Some(info) = heartbeat_request.info.as_mut() {
|
||||
info.memory_usage_bytes = memory_usage;
|
||||
info.cpu_usage_millicores = cpu_usage;
|
||||
}
|
||||
|
||||
fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option<NodeInfo> {
|
||||
Some(heartbeat_request)
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn build_node_info(
|
||||
start_time_ms: u64,
|
||||
total_cpu_millicores: i64,
|
||||
total_memory_bytes: i64,
|
||||
) -> Option<NodeInfo> {
|
||||
let build_info = common_version::build_info();
|
||||
|
||||
Some(NodeInfo {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms,
|
||||
cpus,
|
||||
memory_bytes,
|
||||
total_cpu_millicores,
|
||||
total_memory_bytes,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
// TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
|
||||
cpus: total_cpu_millicores as u32,
|
||||
memory_bytes: total_memory_bytes as u64,
|
||||
hostname: hostname::get()
|
||||
.unwrap_or_default()
|
||||
.to_string_lossy()
|
||||
@@ -177,16 +197,20 @@ impl HeartbeatTask {
|
||||
id: 0,
|
||||
addr: self.peer_addr.clone(),
|
||||
});
|
||||
let cpus = self.resource_spec.cpus as u32;
|
||||
let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
|
||||
|
||||
let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
|
||||
let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
|
||||
let resource_stat = self.resource_stat.clone();
|
||||
common_runtime::spawn_hb(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
tokio::pin!(sleep);
|
||||
|
||||
let heartbeat_request = HeartbeatRequest {
|
||||
peer: self_peer,
|
||||
info: Self::build_node_info(start_time_ms, cpus, memory_bytes),
|
||||
info: Self::build_node_info(
|
||||
start_time_ms,
|
||||
total_cpu_millicores,
|
||||
total_memory_bytes,
|
||||
),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -194,7 +218,7 @@ impl HeartbeatTask {
|
||||
let req = tokio::select! {
|
||||
message = outgoing_rx.recv() => {
|
||||
if let Some(message) = message {
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message))
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message), 0, 0)
|
||||
} else {
|
||||
warn!("Sender has been dropped, exiting the heartbeat loop");
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
@@ -203,7 +227,7 @@ impl HeartbeatTask {
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
sleep.as_mut().reset(Instant::now() + report_interval);
|
||||
Self::new_heartbeat_request(&heartbeat_request, None)
|
||||
Self::new_heartbeat_request(&heartbeat_request, None, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -24,7 +24,9 @@ mod util;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role};
|
||||
use api::v1::meta::{
|
||||
MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
|
||||
};
|
||||
pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
|
||||
use cluster::Client as ClusterClient;
|
||||
pub use cluster::ClusterKvBackend;
|
||||
@@ -371,7 +373,8 @@ impl ClusterInfo for MetaClient {
|
||||
let mut nodes = if get_metasrv_nodes {
|
||||
let last_activity_ts = -1; // Metasrv does not provide this information.
|
||||
|
||||
let (leader, followers) = cluster_client.get_metasrv_peers().await?;
|
||||
let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
|
||||
cluster_client.get_metasrv_peers().await?;
|
||||
followers
|
||||
.into_iter()
|
||||
.map(|node| {
|
||||
@@ -383,8 +386,10 @@ impl ClusterInfo for MetaClient {
|
||||
version: node_info.version,
|
||||
git_commit: node_info.git_commit,
|
||||
start_time_ms: node_info.start_time_ms,
|
||||
cpus: node_info.cpus,
|
||||
memory_bytes: node_info.memory_bytes,
|
||||
total_cpu_millicores: node_info.total_cpu_millicores,
|
||||
total_memory_bytes: node_info.total_memory_bytes,
|
||||
cpu_usage_millicores: node_info.cpu_usage_millicores,
|
||||
memory_usage_bytes: node_info.memory_usage_bytes,
|
||||
hostname: node_info.hostname,
|
||||
}
|
||||
} else {
|
||||
@@ -396,8 +401,10 @@ impl ClusterInfo for MetaClient {
|
||||
version: node.version,
|
||||
git_commit: node.git_commit,
|
||||
start_time_ms: node.start_time_ms,
|
||||
cpus: node.cpus,
|
||||
memory_bytes: node.memory_bytes,
|
||||
total_cpu_millicores: node.cpus as i64,
|
||||
total_memory_bytes: node.memory_bytes as i64,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
hostname: "".to_string(),
|
||||
}
|
||||
}
|
||||
@@ -411,8 +418,10 @@ impl ClusterInfo for MetaClient {
|
||||
version: node_info.version,
|
||||
git_commit: node_info.git_commit,
|
||||
start_time_ms: node_info.start_time_ms,
|
||||
cpus: node_info.cpus,
|
||||
memory_bytes: node_info.memory_bytes,
|
||||
total_cpu_millicores: node_info.total_cpu_millicores,
|
||||
total_memory_bytes: node_info.total_memory_bytes,
|
||||
cpu_usage_millicores: node_info.cpu_usage_millicores,
|
||||
memory_usage_bytes: node_info.memory_usage_bytes,
|
||||
hostname: node_info.hostname,
|
||||
}
|
||||
} else {
|
||||
@@ -424,8 +433,10 @@ impl ClusterInfo for MetaClient {
|
||||
version: node.version,
|
||||
git_commit: node.git_commit,
|
||||
start_time_ms: node.start_time_ms,
|
||||
cpus: node.cpus,
|
||||
memory_bytes: node.memory_bytes,
|
||||
total_cpu_millicores: node.cpus as i64,
|
||||
total_memory_bytes: node.memory_bytes as i64,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
hostname: "".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ common-meta.workspace = true
|
||||
common-options.workspace = true
|
||||
common-procedure.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-stat.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
|
||||
@@ -243,8 +243,10 @@ mod tests {
|
||||
version: "1.0.0".to_string(),
|
||||
git_commit: "1234567890".to_string(),
|
||||
start_time_ms: current_time_millis() as u64,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
total_cpu_millicores: 0,
|
||||
total_memory_bytes: 0,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
hostname: "test_hostname".to_string(),
|
||||
};
|
||||
|
||||
@@ -269,8 +271,10 @@ mod tests {
|
||||
version: "1.0.0".to_string(),
|
||||
git_commit: "1234567890".to_string(),
|
||||
start_time_ms: current_time_millis() as u64,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
total_cpu_millicores: 0,
|
||||
total_memory_bytes: 0,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
hostname: "test_hostname".to_string(),
|
||||
};
|
||||
|
||||
@@ -307,8 +311,10 @@ mod tests {
|
||||
version: "1.0.0".to_string(),
|
||||
git_commit: "1234567890".to_string(),
|
||||
start_time_ms: last_activity_ts as u64,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
total_cpu_millicores: 0,
|
||||
total_memory_bytes: 0,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
hostname: "test_hostname".to_string(),
|
||||
};
|
||||
|
||||
|
||||
@@ -1161,8 +1161,10 @@ mod tests {
|
||||
version: "test_version".to_string(),
|
||||
git_commit: "test_git_commit".to_string(),
|
||||
start_time_ms: 0,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
total_cpu_millicores: 0,
|
||||
total_memory_bytes: 0,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
hostname: "test_hostname".to_string(),
|
||||
};
|
||||
mysql_election.register_candidate(&node_info).await.unwrap();
|
||||
|
||||
@@ -1000,8 +1000,10 @@ mod tests {
|
||||
version: "test_version".to_string(),
|
||||
git_commit: "test_git_commit".to_string(),
|
||||
start_time_ms: 0,
|
||||
cpus: 0,
|
||||
memory_bytes: 0,
|
||||
total_cpu_millicores: 0,
|
||||
total_memory_bytes: 0,
|
||||
cpu_usage_millicores: 0,
|
||||
memory_usage_bytes: 0,
|
||||
hostname: "test_hostname".to_string(),
|
||||
};
|
||||
pg_election.register_candidate(&node_info).await.unwrap();
|
||||
|
||||
@@ -52,8 +52,10 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
|
||||
version: info.version,
|
||||
git_commit: info.git_commit,
|
||||
start_time_ms: info.start_time_ms,
|
||||
cpus: info.cpus,
|
||||
memory_bytes: info.memory_bytes,
|
||||
total_cpu_millicores: info.total_cpu_millicores,
|
||||
total_memory_bytes: info.total_memory_bytes,
|
||||
cpu_usage_millicores: info.cpu_usage_millicores,
|
||||
memory_usage_bytes: info.memory_usage_bytes,
|
||||
hostname: info.hostname,
|
||||
};
|
||||
|
||||
@@ -88,8 +90,10 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
|
||||
version: info.version,
|
||||
git_commit: info.git_commit,
|
||||
start_time_ms: info.start_time_ms,
|
||||
cpus: info.cpus,
|
||||
memory_bytes: info.memory_bytes,
|
||||
total_cpu_millicores: info.total_cpu_millicores,
|
||||
total_memory_bytes: info.total_memory_bytes,
|
||||
cpu_usage_millicores: info.cpu_usage_millicores,
|
||||
memory_usage_bytes: info.memory_usage_bytes,
|
||||
hostname: info.hostname,
|
||||
};
|
||||
|
||||
@@ -142,8 +146,10 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
|
||||
version: info.version,
|
||||
git_commit: info.git_commit,
|
||||
start_time_ms: info.start_time_ms,
|
||||
cpus: info.cpus,
|
||||
memory_bytes: info.memory_bytes,
|
||||
total_cpu_millicores: info.total_cpu_millicores,
|
||||
total_memory_bytes: info.total_memory_bytes,
|
||||
cpu_usage_millicores: info.cpu_usage_millicores,
|
||||
memory_usage_bytes: info.memory_usage_bytes,
|
||||
hostname: info.hostname,
|
||||
};
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ use std::time::Duration;
|
||||
use clap::ValueEnum;
|
||||
use common_base::Plugins;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_config::utils::ResourceSpec;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
use common_event_recorder::EventRecorderOptions;
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
@@ -47,6 +46,7 @@ use common_options::datanode::DatanodeClientOptions;
|
||||
use common_options::memory::MemoryOptions;
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_procedure::options::ProcedureConfig;
|
||||
use common_stat::ResourceStatRef;
|
||||
use common_telemetry::logging::{LoggingOptions, TracingOptions};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use common_wal::config::MetasrvWalConfig;
|
||||
@@ -372,12 +372,16 @@ pub struct MetasrvNodeInfo {
|
||||
pub git_commit: String,
|
||||
// The node start timestamp in milliseconds
|
||||
pub start_time_ms: u64,
|
||||
// The node cpus
|
||||
// The node total cpu millicores
|
||||
#[serde(default)]
|
||||
pub cpus: u32,
|
||||
// The node memory bytes
|
||||
pub total_cpu_millicores: i64,
|
||||
#[serde(default)]
|
||||
pub memory_bytes: u64,
|
||||
// The node total memory bytes
|
||||
pub total_memory_bytes: i64,
|
||||
/// The node build cpu usage millicores
|
||||
pub cpu_usage_millicores: i64,
|
||||
/// The node build memory usage bytes
|
||||
pub memory_usage_bytes: i64,
|
||||
// The node hostname
|
||||
#[serde(default)]
|
||||
pub hostname: String,
|
||||
@@ -397,15 +401,19 @@ impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
|
||||
version: node_info.version.clone(),
|
||||
git_commit: node_info.git_commit.clone(),
|
||||
start_time_ms: node_info.start_time_ms,
|
||||
cpus: node_info.cpus,
|
||||
memory_bytes: node_info.memory_bytes,
|
||||
cpus: node_info.total_cpu_millicores as u32,
|
||||
memory_bytes: node_info.total_memory_bytes as u64,
|
||||
// The canonical location for node information.
|
||||
info: Some(api::v1::meta::NodeInfo {
|
||||
version: node_info.version,
|
||||
git_commit: node_info.git_commit,
|
||||
start_time_ms: node_info.start_time_ms,
|
||||
cpus: node_info.cpus,
|
||||
memory_bytes: node_info.memory_bytes,
|
||||
total_cpu_millicores: node_info.total_cpu_millicores,
|
||||
total_memory_bytes: node_info.total_memory_bytes,
|
||||
cpu_usage_millicores: node_info.cpu_usage_millicores,
|
||||
memory_usage_bytes: node_info.memory_usage_bytes,
|
||||
cpus: node_info.total_cpu_millicores as u32,
|
||||
memory_bytes: node_info.total_memory_bytes as u64,
|
||||
hostname: node_info.hostname,
|
||||
}),
|
||||
}
|
||||
@@ -517,7 +525,7 @@ pub struct Metasrv {
|
||||
region_flush_ticker: Option<RegionFlushTickerRef>,
|
||||
table_id_sequence: SequenceRef,
|
||||
reconciliation_manager: ReconciliationManagerRef,
|
||||
resource_spec: ResourceSpec,
|
||||
resource_stat: ResourceStatRef,
|
||||
|
||||
plugins: Plugins,
|
||||
}
|
||||
@@ -699,8 +707,8 @@ impl Metasrv {
|
||||
self.start_time_ms
|
||||
}
|
||||
|
||||
pub fn resource_spec(&self) -> &ResourceSpec {
|
||||
&self.resource_spec
|
||||
pub fn resource_stat(&self) -> &ResourceStatRef {
|
||||
&self.resource_stat
|
||||
}
|
||||
|
||||
pub fn node_info(&self) -> MetasrvNodeInfo {
|
||||
@@ -710,8 +718,10 @@ impl Metasrv {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms: self.start_time_ms(),
|
||||
cpus: self.resource_spec().cpus as u32,
|
||||
memory_bytes: self.resource_spec().memory.unwrap_or_default().as_bytes(),
|
||||
total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
|
||||
total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
|
||||
cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
|
||||
memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
|
||||
hostname: hostname::get()
|
||||
.unwrap_or_default()
|
||||
.to_string_lossy()
|
||||
|
||||
@@ -46,6 +46,7 @@ use common_meta::stats::topic::TopicStatsRegistry;
|
||||
use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
use common_stat::ResourceStatImpl;
|
||||
use common_telemetry::{info, warn};
|
||||
use snafu::{ResultExt, ensure};
|
||||
use store_api::storage::MAX_REGION_SEQ;
|
||||
@@ -517,6 +518,9 @@ impl MetasrvBuilder {
|
||||
.try_start()
|
||||
.context(error::InitReconciliationManagerSnafu)?;
|
||||
|
||||
let mut resource_stat = ResourceStatImpl::default();
|
||||
resource_stat.start_collect_cpu_usage();
|
||||
|
||||
Ok(Metasrv {
|
||||
state,
|
||||
started: Arc::new(AtomicBool::new(false)),
|
||||
@@ -556,7 +560,7 @@ impl MetasrvBuilder {
|
||||
table_id_sequence,
|
||||
reconciliation_manager,
|
||||
topic_stats_registry,
|
||||
resource_spec: Default::default(),
|
||||
resource_stat: Arc::new(resource_stat),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,8 +97,10 @@ impl Metasrv {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms: self.start_time_ms(),
|
||||
cpus: self.resource_spec().cpus as u32,
|
||||
memory_bytes: self.resource_spec().memory.unwrap_or_default().as_bytes(),
|
||||
total_cpu_millicores: self.resource_stat().get_total_cpu_millicores(),
|
||||
total_memory_bytes: self.resource_stat().get_total_memory_bytes(),
|
||||
cpu_usage_millicores: self.resource_stat().get_cpu_usage_millicores(),
|
||||
memory_usage_bytes: self.resource_stat().get_memory_usage_bytes(),
|
||||
hostname: hostname::get()
|
||||
.unwrap_or_default()
|
||||
.to_string_lossy()
|
||||
|
||||
@@ -24,6 +24,7 @@ use common_meta::key::flow::flow_state::FlowStat;
|
||||
use common_meta::peer::Peer;
|
||||
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
|
||||
use common_query::request::QueryRequest;
|
||||
use common_stat::{ResourceStatImpl, ResourceStatRef};
|
||||
use datanode::region_server::RegionServer;
|
||||
use flow::StreamingEngine;
|
||||
use snafu::ResultExt;
|
||||
@@ -35,15 +36,19 @@ pub struct StandaloneInformationExtension {
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
start_time_ms: u64,
|
||||
flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
|
||||
resource_stat: ResourceStatRef,
|
||||
}
|
||||
|
||||
impl StandaloneInformationExtension {
|
||||
pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
|
||||
let mut resource_stat = ResourceStatImpl::default();
|
||||
resource_stat.start_collect_cpu_usage();
|
||||
Self {
|
||||
region_server,
|
||||
procedure_manager,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
flow_streaming_engine: RwLock::new(None),
|
||||
resource_stat: Arc::new(resource_stat),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,8 +80,10 @@ impl InformationExtension for StandaloneInformationExtension {
|
||||
// Use `self.start_time_ms` instead.
|
||||
// It's not precise but enough.
|
||||
start_time_ms: self.start_time_ms,
|
||||
cpus: common_stat::get_total_cpu_millicores() as u32,
|
||||
memory_bytes: common_stat::get_total_memory_bytes() as u64,
|
||||
total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
|
||||
total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
|
||||
cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
|
||||
memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
|
||||
hostname: hostname::get()
|
||||
.unwrap_or_default()
|
||||
.to_string_lossy()
|
||||
|
||||
@@ -35,6 +35,7 @@ common-procedure.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-stat.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-test-util.workspace = true
|
||||
common-time.workspace = true
|
||||
|
||||
@@ -44,6 +44,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::peer::Peer;
|
||||
use common_runtime::Builder as RuntimeBuilder;
|
||||
use common_runtime::runtime::BuilderBuild;
|
||||
use common_stat::ResourceStatImpl;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
|
||||
use datanode::config::DatanodeOptions;
|
||||
@@ -411,11 +412,15 @@ impl GreptimeDbClusterBuilder {
|
||||
|
||||
let fe_opts = self.build_frontend_options();
|
||||
|
||||
let mut resource_stat = ResourceStatImpl::default();
|
||||
resource_stat.start_collect_cpu_usage();
|
||||
|
||||
let heartbeat_task = HeartbeatTask::new(
|
||||
&fe_opts,
|
||||
meta_client.clone(),
|
||||
HeartbeatOptions::default(),
|
||||
Arc::new(handlers_executor),
|
||||
Arc::new(resource_stat),
|
||||
);
|
||||
|
||||
let instance = FrontendBuilder::new(
|
||||
|
||||
@@ -11,8 +11,10 @@ DESC TABLE CLUSTER_INFO;
|
||||
| peer_type | String | | NO | | FIELD |
|
||||
| peer_addr | String | | YES | | FIELD |
|
||||
| peer_hostname | String | | YES | | FIELD |
|
||||
| total_cpu_millicores | UInt32 | | NO | | FIELD |
|
||||
| total_memory_bytes | UInt64 | | NO | | FIELD |
|
||||
| total_cpu_millicores | Int64 | | NO | | FIELD |
|
||||
| total_memory_bytes | Int64 | | NO | | FIELD |
|
||||
| cpu_usage_millicores | Int64 | | NO | | FIELD |
|
||||
| memory_usage_bytes | Int64 | | NO | | FIELD |
|
||||
| version | String | | NO | | FIELD |
|
||||
| git_commit | String | | NO | | FIELD |
|
||||
| start_time | TimestampMillisecond | | YES | | FIELD |
|
||||
|
||||
@@ -72,18 +72,20 @@ select * from information_schema.columns order by table_schema, table_name, colu
|
||||
| greptime | information_schema | check_constraints | constraint_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | check_constraints | constraint_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | check_constraints | constraint_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | cluster_info | active_time | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | git_commit | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | cluster_info | node_status | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | active_time | 13 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | cpu_usage_millicores | 7 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
|
||||
| greptime | information_schema | cluster_info | git_commit | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | cluster_info | memory_usage_bytes | 8 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
|
||||
| greptime | information_schema | cluster_info | node_status | 14 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | peer_addr | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | peer_hostname | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | peer_id | 1 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
|
||||
| greptime | information_schema | cluster_info | peer_type | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | cluster_info | start_time | 9 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | |
|
||||
| greptime | information_schema | cluster_info | total_cpu_millicores | 5 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
|
||||
| greptime | information_schema | cluster_info | total_memory_bytes | 6 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | cluster_info | uptime | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | version | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | cluster_info | start_time | 11 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | |
|
||||
| greptime | information_schema | cluster_info | total_cpu_millicores | 5 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
|
||||
| greptime | information_schema | cluster_info | total_memory_bytes | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
|
||||
| greptime | information_schema | cluster_info | uptime | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | version | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | collation_character_set_applicability | character_set_name | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | collation_character_set_applicability | collation_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | collations | character_set_name | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
|
||||
@@ -11,8 +11,10 @@ DESC TABLE CLUSTER_INFO;
|
||||
| peer_type | String | | NO | | FIELD |
|
||||
| peer_addr | String | | YES | | FIELD |
|
||||
| peer_hostname | String | | YES | | FIELD |
|
||||
| total_cpu_millicores | UInt32 | | NO | | FIELD |
|
||||
| total_memory_bytes | UInt64 | | NO | | FIELD |
|
||||
| total_cpu_millicores | Int64 | | NO | | FIELD |
|
||||
| total_memory_bytes | Int64 | | NO | | FIELD |
|
||||
| cpu_usage_millicores | Int64 | | NO | | FIELD |
|
||||
| memory_usage_bytes | Int64 | | NO | | FIELD |
|
||||
| version | String | | NO | | FIELD |
|
||||
| git_commit | String | | NO | | FIELD |
|
||||
| start_time | TimestampMillisecond | | YES | | FIELD |
|
||||
|
||||
Reference in New Issue
Block a user