mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 11:52:54 +00:00
feat: adds information_schema cluster_info table (#3832)
* feat: adds server running mode to KvBackendCatalogManager * feat: adds MetaClient to KvBackendCatalogManager * feat: impl information_schema.cluster_info table * fix: forgot files * test: update information_schema result * feat: adds start_time and uptime to cluster_info * chore: tweak cargo and comment * feat: rename greptime_region_peers to region_peers * fix: cluster_info result * chore: simplify sqlness commands * chore: set peer_id to -1 for frontends * fix: move cluster_info to greptime catalog * chore: use official proto * feat: adds active_time * chore: apply suggestion Co-authored-by: Jeremyhi <jiachun_feng@proton.me> * chore: STANDALONE for runtime_metrics --------- Co-authored-by: Jeremyhi <jiachun_feng@proton.me> Co-authored-by: tison <wander4096@gmail.com>
This commit is contained in:
8
Cargo.lock
generated
8
Cargo.lock
generated
@@ -1245,6 +1245,7 @@ dependencies = [
|
||||
"catalog",
|
||||
"chrono",
|
||||
"common-catalog",
|
||||
"common-config",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
@@ -1259,6 +1260,7 @@ dependencies = [
|
||||
"datatypes",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"humantime",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"log-store",
|
||||
@@ -2861,6 +2863,7 @@ dependencies = [
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
"common-version",
|
||||
"common-wal",
|
||||
"dashmap",
|
||||
"datafusion",
|
||||
@@ -3584,6 +3587,8 @@ dependencies = [
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
"common-version",
|
||||
"datanode",
|
||||
"futures",
|
||||
"humantime-serde",
|
||||
@@ -3886,7 +3891,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b5412f72257c18410fdccbb893fa5d245b846141#b5412f72257c18410fdccbb893fa5d245b846141"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f699e240f7a6c83f139dabac8669714f08513120#f699e240f7a6c83f139dabac8669714f08513120"
|
||||
dependencies = [
|
||||
"prost 0.12.4",
|
||||
"serde",
|
||||
@@ -9141,6 +9146,7 @@ dependencies = [
|
||||
"client",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-config",
|
||||
"common-error",
|
||||
"common-grpc",
|
||||
"common-macro",
|
||||
|
||||
@@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b5412f72257c18410fdccbb893fa5d245b846141" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f699e240f7a6c83f139dabac8669714f08513120" }
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
|
||||
@@ -17,6 +17,7 @@ arrow-schema.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait = "0.1"
|
||||
common-catalog.workspace = true
|
||||
common-config.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
@@ -30,6 +31,7 @@ datafusion.workspace = true
|
||||
datatypes.workspace = true
|
||||
futures = "0.3"
|
||||
futures-util.workspace = true
|
||||
humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
meta-client.workspace = true
|
||||
|
||||
@@ -49,6 +49,12 @@ pub enum Error {
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list nodes in cluster: {source}"))]
|
||||
ListNodes {
|
||||
location: Location,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to re-compile script due to internal error"))]
|
||||
CompileScriptInternal {
|
||||
location: Location,
|
||||
@@ -294,6 +300,7 @@ impl ErrorExt for Error {
|
||||
}
|
||||
|
||||
Error::ListCatalogs { source, .. }
|
||||
| Error::ListNodes { source, .. }
|
||||
| Error::ListSchemas { source, .. }
|
||||
| Error::ListTables { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod cluster_info;
|
||||
pub mod columns;
|
||||
pub mod key_column_usage;
|
||||
mod memory_table;
|
||||
@@ -23,6 +24,7 @@ pub mod schemata;
|
||||
mod table_constraints;
|
||||
mod table_names;
|
||||
pub mod tables;
|
||||
pub(crate) mod utils;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -47,6 +49,7 @@ pub use table_names::*;
|
||||
|
||||
use self::columns::InformationSchemaColumns;
|
||||
use crate::error::Result;
|
||||
use crate::information_schema::cluster_info::InformationSchemaClusterInfo;
|
||||
use crate::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
|
||||
use crate::information_schema::memory_table::{get_schema_columns, MemoryTable};
|
||||
use crate::information_schema::partitions::InformationSchemaPartitions;
|
||||
@@ -150,6 +153,7 @@ impl InformationSchemaProvider {
|
||||
fn build_tables(&mut self) {
|
||||
let mut tables = HashMap::new();
|
||||
|
||||
// SECURITY NOTE:
|
||||
// Carefully consider the tables that may expose sensitive cluster configurations,
|
||||
// authentication details, and other critical information.
|
||||
// Only put these tables under `greptime` catalog to prevent info leak.
|
||||
@@ -166,6 +170,10 @@ impl InformationSchemaProvider {
|
||||
REGION_PEERS.to_string(),
|
||||
self.build_table(REGION_PEERS).unwrap(),
|
||||
);
|
||||
tables.insert(
|
||||
CLUSTER_INFO.to_string(),
|
||||
self.build_table(CLUSTER_INFO).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
|
||||
@@ -251,6 +259,9 @@ impl InformationSchemaProvider {
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _),
|
||||
CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
|
||||
self.catalog_manager.clone(),
|
||||
)) as _),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
317
src/catalog/src/information_schema/cluster_info.rs
Normal file
317
src/catalog/src/information_schema/cluster_info.rs
Normal file
@@ -0,0 +1,317 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
|
||||
use arrow_schema::SchemaRef as ArrowSchemaRef;
|
||||
use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
|
||||
use common_config::Mode;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cluster::{ClusterInfo, NodeInfo, NodeStatus};
|
||||
use common_meta::peer::Peer;
|
||||
use common_query::physical_plan::TaskContext;
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
|
||||
use common_telemetry::logging::warn;
|
||||
use common_time::timestamp::Timestamp;
|
||||
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
|
||||
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::timestamp::TimestampMillisecond;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
|
||||
};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
|
||||
use super::CLUSTER_INFO;
|
||||
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListNodesSnafu, Result};
|
||||
use crate::information_schema::{utils, InformationTable, Predicates};
|
||||
use crate::CatalogManager;
|
||||
|
||||
const PEER_ID: &str = "peer_id";
|
||||
const PEER_TYPE: &str = "peer_type";
|
||||
const PEER_ADDR: &str = "peer_addr";
|
||||
const VERSION: &str = "version";
|
||||
const GIT_COMMIT: &str = "git_commit";
|
||||
const START_TIME: &str = "start_time";
|
||||
const UPTIME: &str = "uptime";
|
||||
const ACTIVE_TIME: &str = "active_time";
|
||||
|
||||
const INIT_CAPACITY: usize = 42;
|
||||
|
||||
/// The `CLUSTER_INFO` table provides information about the current topology information of the cluster.
|
||||
///
|
||||
/// - `peer_id`: the peer server id.
|
||||
/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
|
||||
/// - `peer_addr`: the peer gRPC address.
|
||||
/// - `version`: the build package version of the peer.
|
||||
/// - `git_commit`: the build git commit hash of the peer.
|
||||
/// - `start_time`: the starting time of the peer.
|
||||
/// - `uptime`: the uptime of the peer.
|
||||
/// - `active_time`: the time since the last activity of the peer.
|
||||
///
|
||||
pub(super) struct InformationSchemaClusterInfo {
|
||||
schema: SchemaRef,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
start_time_ms: u64,
|
||||
}
|
||||
|
||||
impl InformationSchemaClusterInfo {
|
||||
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
|
||||
Self {
|
||||
schema: Self::schema(),
|
||||
catalog_manager,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
|
||||
ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(
|
||||
START_TIME,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
),
|
||||
ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true),
|
||||
]))
|
||||
}
|
||||
|
||||
fn builder(&self) -> InformationSchemaClusterInfoBuilder {
|
||||
InformationSchemaClusterInfoBuilder::new(
|
||||
self.schema.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
self.start_time_ms,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl InformationTable for InformationSchemaClusterInfo {
|
||||
fn table_id(&self) -> TableId {
|
||||
INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID
|
||||
}
|
||||
|
||||
fn table_name(&self) -> &'static str {
|
||||
CLUSTER_INFO
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
|
||||
let schema = self.schema.arrow_schema().clone();
|
||||
let mut builder = self.builder();
|
||||
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
|
||||
schema,
|
||||
futures::stream::once(async move {
|
||||
builder
|
||||
.make_cluster_info(Some(request))
|
||||
.await
|
||||
.map(|x| x.into_df_record_batch())
|
||||
.map_err(Into::into)
|
||||
}),
|
||||
));
|
||||
Ok(Box::pin(
|
||||
RecordBatchStreamAdapter::try_new(stream)
|
||||
.map_err(BoxedError::new)
|
||||
.context(InternalSnafu)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
struct InformationSchemaClusterInfoBuilder {
|
||||
schema: SchemaRef,
|
||||
start_time_ms: u64,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
|
||||
peer_ids: Int64VectorBuilder,
|
||||
peer_types: StringVectorBuilder,
|
||||
peer_addrs: StringVectorBuilder,
|
||||
versions: StringVectorBuilder,
|
||||
git_commits: StringVectorBuilder,
|
||||
start_times: TimestampMillisecondVectorBuilder,
|
||||
uptimes: StringVectorBuilder,
|
||||
active_times: StringVectorBuilder,
|
||||
}
|
||||
|
||||
impl InformationSchemaClusterInfoBuilder {
|
||||
fn new(
|
||||
schema: SchemaRef,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
start_time_ms: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
catalog_manager,
|
||||
peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
start_time_ms,
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct the `information_schema.cluster_info` virtual table
|
||||
async fn make_cluster_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
|
||||
let predicates = Predicates::from_scan_request(&request);
|
||||
let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone);
|
||||
|
||||
match mode {
|
||||
Mode::Standalone => {
|
||||
let build_info = common_version::build_info();
|
||||
|
||||
self.add_node_info(
|
||||
&predicates,
|
||||
NodeInfo {
|
||||
// For the standalone:
|
||||
// - id always 0
|
||||
// - empty string for peer_addr
|
||||
peer: Peer {
|
||||
id: 0,
|
||||
addr: "".to_string(),
|
||||
},
|
||||
last_activity_ts: -1,
|
||||
status: NodeStatus::Standalone,
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
// Use `self.start_time_ms` instead.
|
||||
// It's not precise but enough.
|
||||
start_time_ms: self.start_time_ms,
|
||||
},
|
||||
);
|
||||
}
|
||||
Mode::Distributed => {
|
||||
if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? {
|
||||
let node_infos = meta_client
|
||||
.list_nodes(None)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ListNodesSnafu)?;
|
||||
|
||||
for node_info in node_infos {
|
||||
self.add_node_info(&predicates, node_info);
|
||||
}
|
||||
} else {
|
||||
warn!("Could not find meta client in distributed mode.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.finish()
|
||||
}
|
||||
|
||||
fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) {
|
||||
let peer_type = node_info.status.role_name();
|
||||
|
||||
let row = [
|
||||
(PEER_ID, &Value::from(node_info.peer.id)),
|
||||
(PEER_TYPE, &Value::from(peer_type)),
|
||||
(PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
|
||||
(VERSION, &Value::from(node_info.version.as_str())),
|
||||
(GIT_COMMIT, &Value::from(node_info.git_commit.as_str())),
|
||||
];
|
||||
|
||||
if !predicates.eval(&row) {
|
||||
return;
|
||||
}
|
||||
|
||||
if peer_type == "FRONTEND" {
|
||||
// Always set peer_id to be -1 for frontends
|
||||
self.peer_ids.push(Some(-1));
|
||||
} else {
|
||||
self.peer_ids.push(Some(node_info.peer.id as i64));
|
||||
}
|
||||
|
||||
self.peer_types.push(Some(peer_type));
|
||||
self.peer_addrs.push(Some(&node_info.peer.addr));
|
||||
self.versions.push(Some(&node_info.version));
|
||||
self.git_commits.push(Some(&node_info.git_commit));
|
||||
if node_info.start_time_ms > 0 {
|
||||
self.start_times
|
||||
.push(Some(TimestampMillisecond(Timestamp::new_millisecond(
|
||||
node_info.start_time_ms as i64,
|
||||
))));
|
||||
self.uptimes.push(Some(
|
||||
Self::format_duration_since(node_info.start_time_ms).as_str(),
|
||||
));
|
||||
} else {
|
||||
self.start_times.push(None);
|
||||
self.uptimes.push(None);
|
||||
}
|
||||
|
||||
if node_info.last_activity_ts > 0 {
|
||||
self.active_times.push(Some(
|
||||
Self::format_duration_since(node_info.last_activity_ts as u64).as_str(),
|
||||
));
|
||||
} else {
|
||||
self.active_times.push(None);
|
||||
}
|
||||
}
|
||||
|
||||
fn format_duration_since(ts: u64) -> String {
|
||||
let now = common_time::util::current_time_millis() as u64;
|
||||
let duration_since = now - ts;
|
||||
humantime::format_duration(Duration::from_millis(duration_since)).to_string()
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> Result<RecordBatch> {
|
||||
let columns: Vec<VectorRef> = vec![
|
||||
Arc::new(self.peer_ids.finish()),
|
||||
Arc::new(self.peer_types.finish()),
|
||||
Arc::new(self.peer_addrs.finish()),
|
||||
Arc::new(self.versions.finish()),
|
||||
Arc::new(self.git_commits.finish()),
|
||||
Arc::new(self.start_times.finish()),
|
||||
Arc::new(self.uptimes.finish()),
|
||||
Arc::new(self.active_times.finish()),
|
||||
];
|
||||
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl DfPartitionStream for InformationSchemaClusterInfo {
|
||||
fn schema(&self) -> &ArrowSchemaRef {
|
||||
self.schema.arrow_schema()
|
||||
}
|
||||
|
||||
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
|
||||
let schema = self.schema.arrow_schema().clone();
|
||||
let mut builder = self.builder();
|
||||
Box::pin(DfRecordBatchStreamAdapter::new(
|
||||
schema,
|
||||
futures::stream::once(async move {
|
||||
builder
|
||||
.make_cluster_info(None)
|
||||
.await
|
||||
.map(|x| x.into_df_record_batch())
|
||||
.map_err(Into::into)
|
||||
}),
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -55,7 +55,7 @@ const INIT_CAPACITY: usize = 42;
|
||||
///
|
||||
/// - `region_id`: the region id
|
||||
/// - `peer_id`: the region storage datanode peer id
|
||||
/// - `peer_addr`: the region storage datanode peer address
|
||||
/// - `peer_addr`: the region storage datanode gRPC peer address
|
||||
/// - `is_leader`: whether the peer is the leader
|
||||
/// - `status`: the region status, `ALIVE` or `DOWNGRADED`.
|
||||
/// - `down_seconds`: the duration of being offline, in seconds.
|
||||
|
||||
@@ -28,8 +28,8 @@ use datatypes::prelude::{ConcreteDataType, MutableVector};
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::vectors::{
|
||||
ConstantVector, Float64VectorBuilder, StringVector, StringVectorBuilder,
|
||||
TimestampMillisecondVector, VectorRef,
|
||||
ConstantVector, Float64VectorBuilder, StringVectorBuilder, TimestampMillisecondVector,
|
||||
VectorRef,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use snafu::ResultExt;
|
||||
@@ -45,8 +45,8 @@ pub(super) struct InformationSchemaMetrics {
|
||||
const METRIC_NAME: &str = "metric_name";
|
||||
const METRIC_VALUE: &str = "value";
|
||||
const METRIC_LABELS: &str = "labels";
|
||||
const NODE: &str = "node";
|
||||
const NODE_TYPE: &str = "node_type";
|
||||
const PEER_ADDR: &str = "peer_addr";
|
||||
const PEER_TYPE: &str = "peer_type";
|
||||
const TIMESTAMP: &str = "timestamp";
|
||||
|
||||
/// The `information_schema.runtime_metrics` virtual table.
|
||||
@@ -63,8 +63,8 @@ impl InformationSchemaMetrics {
|
||||
ColumnSchema::new(METRIC_NAME, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(METRIC_VALUE, ConcreteDataType::float64_datatype(), false),
|
||||
ColumnSchema::new(METRIC_LABELS, ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new(NODE, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(NODE_TYPE, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(
|
||||
TIMESTAMP,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
@@ -104,6 +104,7 @@ impl InformationTable for InformationSchemaMetrics {
|
||||
.map_err(Into::into)
|
||||
}),
|
||||
));
|
||||
|
||||
Ok(Box::pin(
|
||||
RecordBatchStreamAdapter::try_new(stream)
|
||||
.map_err(BoxedError::new)
|
||||
@@ -118,6 +119,8 @@ struct InformationSchemaMetricsBuilder {
|
||||
metric_names: StringVectorBuilder,
|
||||
metric_values: Float64VectorBuilder,
|
||||
metric_labels: StringVectorBuilder,
|
||||
peer_addrs: StringVectorBuilder,
|
||||
peer_types: StringVectorBuilder,
|
||||
}
|
||||
|
||||
impl InformationSchemaMetricsBuilder {
|
||||
@@ -127,13 +130,24 @@ impl InformationSchemaMetricsBuilder {
|
||||
metric_names: StringVectorBuilder::with_capacity(42),
|
||||
metric_values: Float64VectorBuilder::with_capacity(42),
|
||||
metric_labels: StringVectorBuilder::with_capacity(42),
|
||||
peer_addrs: StringVectorBuilder::with_capacity(42),
|
||||
peer_types: StringVectorBuilder::with_capacity(42),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_metric(&mut self, metric_name: &str, labels: String, metric_value: f64) {
|
||||
fn add_metric(
|
||||
&mut self,
|
||||
metric_name: &str,
|
||||
labels: String,
|
||||
metric_value: f64,
|
||||
peer: Option<&str>,
|
||||
peer_type: &str,
|
||||
) {
|
||||
self.metric_names.push(Some(metric_name));
|
||||
self.metric_values.push(Some(metric_value));
|
||||
self.metric_labels.push(Some(&labels));
|
||||
self.peer_addrs.push(peer);
|
||||
self.peer_types.push(Some(peer_type));
|
||||
}
|
||||
|
||||
async fn make_metrics(&mut self, _request: Option<ScanRequest>) -> Result<RecordBatch> {
|
||||
@@ -170,18 +184,19 @@ impl InformationSchemaMetricsBuilder {
|
||||
.join(", "),
|
||||
// Safety: always has a sample
|
||||
ts.samples[0].value,
|
||||
// The peer column is always `None` for standalone
|
||||
None,
|
||||
"STANDALONE",
|
||||
);
|
||||
}
|
||||
|
||||
// FIXME(dennis): fetching other peers metrics
|
||||
self.finish()
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> Result<RecordBatch> {
|
||||
let rows_num = self.metric_names.len();
|
||||
let unknowns = Arc::new(ConstantVector::new(
|
||||
Arc::new(StringVector::from(vec!["unknown"])),
|
||||
rows_num,
|
||||
));
|
||||
|
||||
let timestamps = Arc::new(ConstantVector::new(
|
||||
Arc::new(TimestampMillisecondVector::from_slice([
|
||||
current_time_millis(),
|
||||
@@ -193,9 +208,8 @@ impl InformationSchemaMetricsBuilder {
|
||||
Arc::new(self.metric_names.finish()),
|
||||
Arc::new(self.metric_values.finish()),
|
||||
Arc::new(self.metric_labels.finish()),
|
||||
// TODO(dennis): supports node and node_type for cluster
|
||||
unknowns.clone(),
|
||||
unknowns,
|
||||
Arc::new(self.peer_addrs.finish()),
|
||||
Arc::new(self.peer_types.finish()),
|
||||
timestamps,
|
||||
];
|
||||
|
||||
@@ -243,8 +257,8 @@ mod tests {
|
||||
assert!(result_literal.contains(METRIC_NAME));
|
||||
assert!(result_literal.contains(METRIC_VALUE));
|
||||
assert!(result_literal.contains(METRIC_LABELS));
|
||||
assert!(result_literal.contains(NODE));
|
||||
assert!(result_literal.contains(NODE_TYPE));
|
||||
assert!(result_literal.contains(PEER_ADDR));
|
||||
assert!(result_literal.contains(PEER_TYPE));
|
||||
assert!(result_literal.contains(TIMESTAMP));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,5 +40,6 @@ pub const GLOBAL_STATUS: &str = "global_status";
|
||||
pub const SESSION_STATUS: &str = "session_status";
|
||||
pub const RUNTIME_METRICS: &str = "runtime_metrics";
|
||||
pub const PARTITIONS: &str = "partitions";
|
||||
pub const REGION_PEERS: &str = "greptime_region_peers";
|
||||
pub const REGION_PEERS: &str = "region_peers";
|
||||
pub const TABLE_CONSTRAINTS: &str = "table_constraints";
|
||||
pub const CLUSTER_INFO: &str = "cluster_info";
|
||||
|
||||
53
src/catalog/src/information_schema/utils.rs
Normal file
53
src/catalog/src/information_schema/utils.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use common_config::Mode;
|
||||
use meta_client::client::MetaClient;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::{Result, UpgradeWeakCatalogManagerRefSnafu};
|
||||
use crate::kvbackend::KvBackendCatalogManager;
|
||||
use crate::CatalogManager;
|
||||
|
||||
/// Try to get the server running mode from `[CatalogManager]` weak reference.
|
||||
pub fn running_mode(catalog_manager: &Weak<dyn CatalogManager>) -> Result<Option<Mode>> {
|
||||
let catalog_manager = catalog_manager
|
||||
.upgrade()
|
||||
.context(UpgradeWeakCatalogManagerRefSnafu)?;
|
||||
|
||||
Ok(catalog_manager
|
||||
.as_any()
|
||||
.downcast_ref::<KvBackendCatalogManager>()
|
||||
.map(|manager| manager.running_mode())
|
||||
.copied())
|
||||
}
|
||||
|
||||
/// Try to get the `[MetaClient]` from `[CatalogManager]` weak reference.
|
||||
pub fn meta_client(catalog_manager: &Weak<dyn CatalogManager>) -> Result<Option<Arc<MetaClient>>> {
|
||||
let catalog_manager = catalog_manager
|
||||
.upgrade()
|
||||
.context(UpgradeWeakCatalogManagerRefSnafu)?;
|
||||
|
||||
let meta_client = match catalog_manager
|
||||
.as_any()
|
||||
.downcast_ref::<KvBackendCatalogManager>()
|
||||
{
|
||||
None => None,
|
||||
Some(manager) => manager.meta_client(),
|
||||
};
|
||||
|
||||
Ok(meta_client)
|
||||
}
|
||||
@@ -22,6 +22,7 @@ use common_catalog::consts::{
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
|
||||
};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_config::Mode;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::{CacheInvalidator, Context, MultiCacheInvalidator};
|
||||
use common_meta::instruction::CacheIdent;
|
||||
@@ -33,6 +34,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use futures_util::stream::BoxStream;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use meta_client::client::MetaClient;
|
||||
use moka::future::{Cache as AsyncCache, CacheBuilder};
|
||||
use moka::sync::Cache;
|
||||
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
|
||||
@@ -56,6 +58,8 @@ use crate::CatalogManager;
|
||||
/// comes from `SystemCatalog`, which is static and read-only.
|
||||
#[derive(Clone)]
|
||||
pub struct KvBackendCatalogManager {
|
||||
mode: Mode,
|
||||
meta_client: Option<Arc<MetaClient>>,
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
/// A sub-CatalogManager that handles system tables
|
||||
@@ -101,6 +105,8 @@ const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
impl KvBackendCatalogManager {
|
||||
pub async fn new(
|
||||
mode: Mode,
|
||||
meta_client: Option<Arc<MetaClient>>,
|
||||
backend: KvBackendRef,
|
||||
multi_cache_invalidator: Arc<MultiCacheInvalidator>,
|
||||
) -> Arc<Self> {
|
||||
@@ -113,6 +119,8 @@ impl KvBackendCatalogManager {
|
||||
.await;
|
||||
|
||||
Arc::new_cyclic(|me| Self {
|
||||
mode,
|
||||
meta_client,
|
||||
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
|
||||
system_catalog: SystemCatalog {
|
||||
@@ -127,6 +135,16 @@ impl KvBackendCatalogManager {
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the server running mode.
|
||||
pub fn running_mode(&self) -> &Mode {
|
||||
&self.mode
|
||||
}
|
||||
|
||||
/// Returns the `[MetaClient]`.
|
||||
pub fn meta_client(&self) -> Option<Arc<MetaClient>> {
|
||||
self.meta_client.clone()
|
||||
}
|
||||
|
||||
pub fn partition_manager(&self) -> PartitionRuleManagerRef {
|
||||
self.partition_manager.clone()
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ use catalog::kvbackend::{
|
||||
};
|
||||
use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_base::Plugins;
|
||||
use common_config::Mode;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_meta::cache_invalidator::MultiCacheInvalidator;
|
||||
use common_query::Output;
|
||||
@@ -256,8 +257,13 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
|
||||
cached_meta_backend.clone(),
|
||||
]));
|
||||
let catalog_list =
|
||||
KvBackendCatalogManager::new(cached_meta_backend.clone(), multi_cache_invalidator).await;
|
||||
let catalog_list = KvBackendCatalogManager::new(
|
||||
Mode::Distributed,
|
||||
Some(meta_client.clone()),
|
||||
cached_meta_backend.clone(),
|
||||
multi_cache_invalidator,
|
||||
)
|
||||
.await;
|
||||
let plugins: Plugins = Default::default();
|
||||
let state = Arc::new(QueryEngineState::new(
|
||||
catalog_list,
|
||||
|
||||
@@ -253,6 +253,8 @@ impl StartCommand {
|
||||
cached_meta_backend.clone(),
|
||||
]));
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
opts.mode,
|
||||
Some(meta_client.clone()),
|
||||
cached_meta_backend.clone(),
|
||||
multi_cache_invalidator.clone(),
|
||||
)
|
||||
@@ -266,6 +268,7 @@ impl StartCommand {
|
||||
]);
|
||||
|
||||
let heartbeat_task = HeartbeatTask::new(
|
||||
&opts,
|
||||
meta_client.clone(),
|
||||
opts.heartbeat.clone(),
|
||||
Arc::new(executor),
|
||||
|
||||
@@ -404,8 +404,13 @@ impl StartCommand {
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
|
||||
let catalog_manager =
|
||||
KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await;
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
dn_opts.mode,
|
||||
None,
|
||||
kv_backend.clone(),
|
||||
multi_cache_invalidator.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let builder =
|
||||
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
|
||||
|
||||
@@ -91,6 +91,8 @@ pub const INFORMATION_SCHEMA_PARTITIONS_TABLE_ID: u32 = 28;
|
||||
pub const INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID: u32 = 29;
|
||||
/// id for information_schema.columns
|
||||
pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30;
|
||||
/// id for information_schema.cluster_info
|
||||
pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31;
|
||||
/// ----- End of information_schema tables -----
|
||||
|
||||
pub const MITO_ENGINE: &str = "mito";
|
||||
|
||||
@@ -21,6 +21,16 @@ pub fn metadata_store_dir(store_dir: &str) -> String {
|
||||
format!("{store_dir}/metadata")
|
||||
}
|
||||
|
||||
/// The Server running mode
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Copy)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum Mode {
|
||||
// The single process mode.
|
||||
Standalone,
|
||||
// The distributed cluster mode.
|
||||
Distributed,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct KvBackendConfig {
|
||||
|
||||
@@ -86,6 +86,12 @@ pub struct NodeInfo {
|
||||
pub last_activity_ts: i64,
|
||||
/// The status of the node. Different roles have different node status.
|
||||
pub status: NodeStatus,
|
||||
// The node build version
|
||||
pub version: String,
|
||||
// The node build git commit hash
|
||||
pub git_commit: String,
|
||||
// The node star timestamp
|
||||
pub start_time_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
|
||||
@@ -100,6 +106,19 @@ pub enum NodeStatus {
|
||||
Datanode(DatanodeStatus),
|
||||
Frontend(FrontendStatus),
|
||||
Metasrv(MetasrvStatus),
|
||||
Standalone,
|
||||
}
|
||||
|
||||
impl NodeStatus {
|
||||
// Get the role name of the node status
|
||||
pub fn role_name(&self) -> &str {
|
||||
match self {
|
||||
NodeStatus::Datanode(_) => "DATANODE",
|
||||
NodeStatus::Frontend(_) => "FRONTEND",
|
||||
NodeStatus::Metasrv(_) => "METASRV",
|
||||
NodeStatus::Standalone => "STANDALONE",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The status of a datanode.
|
||||
@@ -271,6 +290,9 @@ mod tests {
|
||||
leader_regions: 3,
|
||||
follower_regions: 4,
|
||||
}),
|
||||
version: "".to_string(),
|
||||
git_commit: "".to_string(),
|
||||
start_time_ms: 1,
|
||||
};
|
||||
|
||||
let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
|
||||
@@ -287,6 +309,8 @@ mod tests {
|
||||
leader_regions: 3,
|
||||
follower_regions: 4,
|
||||
}),
|
||||
start_time_ms: 1,
|
||||
..
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
common-wal.workspace = true
|
||||
dashmap.workspace = true
|
||||
datafusion.workspace = true
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, Peer, RegionRole, RegionStat, Role};
|
||||
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat, Role};
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
@@ -43,6 +43,7 @@ use crate::region_server::RegionServer;
|
||||
pub(crate) mod handler;
|
||||
pub(crate) mod task_tracker;
|
||||
|
||||
/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
|
||||
pub struct HeartbeatTask {
|
||||
node_id: u64,
|
||||
node_epoch: u64,
|
||||
@@ -246,6 +247,7 @@ impl HeartbeatTask {
|
||||
}
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
let build_info = common_version::build_info();
|
||||
let region_stats = Self::load_region_stats(®ion_server_clone).await;
|
||||
let now = Instant::now();
|
||||
let duration_since_epoch = (now - epoch).as_millis() as u64;
|
||||
@@ -254,6 +256,12 @@ impl HeartbeatTask {
|
||||
region_stats,
|
||||
duration_since_epoch,
|
||||
node_epoch,
|
||||
info: Some(NodeInfo {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
// The start timestamp is the same as node_epoch currently.
|
||||
start_time_ms: node_epoch,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
sleep.as_mut().reset(now + Duration::from_millis(interval));
|
||||
|
||||
@@ -33,6 +33,8 @@ common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
datanode.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
lazy_static.workspace = true
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::HeartbeatRequest;
|
||||
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
|
||||
use common_meta::heartbeat::handler::{
|
||||
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
|
||||
};
|
||||
@@ -30,28 +30,35 @@ use tokio::time::{Duration, Instant};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::frontend::FrontendOptions;
|
||||
|
||||
pub mod handler;
|
||||
|
||||
/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
|
||||
#[derive(Clone)]
|
||||
pub struct HeartbeatTask {
|
||||
server_addr: String,
|
||||
meta_client: Arc<MetaClient>,
|
||||
report_interval: u64,
|
||||
retry_interval: u64,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
start_time_ms: u64,
|
||||
}
|
||||
|
||||
impl HeartbeatTask {
|
||||
pub fn new(
|
||||
opts: &FrontendOptions,
|
||||
meta_client: Arc<MetaClient>,
|
||||
heartbeat_opts: HeartbeatOptions,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
) -> Self {
|
||||
HeartbeatTask {
|
||||
server_addr: opts.grpc.addr.clone(),
|
||||
meta_client,
|
||||
report_interval: heartbeat_opts.interval.as_millis() as u64,
|
||||
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
|
||||
resp_handler_executor,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,12 +109,28 @@ impl HeartbeatTask {
|
||||
});
|
||||
}
|
||||
|
||||
fn build_node_info(start_time_ms: u64) -> NodeInfo {
|
||||
let build_info = common_version::build_info();
|
||||
|
||||
NodeInfo {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms,
|
||||
}
|
||||
}
|
||||
|
||||
fn start_heartbeat_report(
|
||||
&self,
|
||||
req_sender: HeartbeatSender,
|
||||
mut outgoing_rx: Receiver<OutgoingMessage>,
|
||||
) {
|
||||
let report_interval = self.report_interval;
|
||||
let start_time_ms = self.start_time_ms;
|
||||
let self_peer = Some(Peer {
|
||||
// The peer id doesn't make sense for frontend, so we just set it 0.
|
||||
id: 0,
|
||||
addr: self.server_addr.clone(),
|
||||
});
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
@@ -121,6 +144,8 @@ impl HeartbeatTask {
|
||||
Ok(message) => {
|
||||
let req = HeartbeatRequest {
|
||||
mailbox_message: Some(message),
|
||||
peer: self_peer.clone(),
|
||||
info: Some(Self::build_node_info(start_time_ms)),
|
||||
..Default::default()
|
||||
};
|
||||
Some(req)
|
||||
@@ -138,6 +163,8 @@ impl HeartbeatTask {
|
||||
_ = &mut sleep => {
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
|
||||
let req = HeartbeatRequest {
|
||||
peer: self_peer.clone(),
|
||||
info: Some(Self::build_node_info(start_time_ms)),
|
||||
..Default::default()
|
||||
};
|
||||
Some(req)
|
||||
|
||||
@@ -264,6 +264,12 @@ impl ClusterInfo for MetaClient {
|
||||
|
||||
let mut nodes = if get_metasrv_nodes {
|
||||
let last_activity_ts = -1; // Metasrv does not provide this information.
|
||||
|
||||
// TODO(dennis): Get Metasrv node info
|
||||
let git_commit = "unknown";
|
||||
let version = "unknown";
|
||||
let start_time_ms = 0;
|
||||
|
||||
let (leader, followers) = cluster_client.get_metasrv_peers().await?;
|
||||
followers
|
||||
.into_iter()
|
||||
@@ -271,11 +277,17 @@ impl ClusterInfo for MetaClient {
|
||||
peer,
|
||||
last_activity_ts,
|
||||
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
|
||||
version: version.to_string(),
|
||||
git_commit: git_commit.to_string(),
|
||||
start_time_ms,
|
||||
})
|
||||
.chain(leader.into_iter().map(|leader| NodeInfo {
|
||||
peer: leader,
|
||||
last_activity_ts,
|
||||
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
|
||||
version: version.to_string(),
|
||||
git_commit: git_commit.to_string(),
|
||||
start_time_ms,
|
||||
}))
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
|
||||
@@ -146,6 +146,7 @@ impl Inner {
|
||||
{
|
||||
let ask_leader = self.ask_leader()?;
|
||||
let mut times = 0;
|
||||
let mut last_error = None;
|
||||
|
||||
while times < self.max_retry {
|
||||
if let Some(leader) = &ask_leader.get_leader() {
|
||||
@@ -153,6 +154,7 @@ impl Inner {
|
||||
match body_fn(client).await {
|
||||
Ok(res) => {
|
||||
if util::is_not_leader(get_header(&res)) {
|
||||
last_error = Some(format!("{leader} is not a leader"));
|
||||
warn!("Failed to {task} to {leader}, not a leader");
|
||||
let leader = ask_leader.ask_leader().await?;
|
||||
info!("Cluster client updated to new leader addr: {leader}");
|
||||
@@ -164,6 +166,7 @@ impl Inner {
|
||||
Err(status) => {
|
||||
// The leader may be unreachable.
|
||||
if util::is_unreachable(&status) {
|
||||
last_error = Some(status.to_string());
|
||||
warn!("Failed to {task} to {leader}, source: {status}");
|
||||
let leader = ask_leader.ask_leader().await?;
|
||||
info!("Cluster client updated to new leader addr: {leader}");
|
||||
@@ -180,7 +183,7 @@ impl Inner {
|
||||
}
|
||||
|
||||
RetryTimesExceededSnafu {
|
||||
msg: "Failed to {task}",
|
||||
msg: format!("Failed to {task}, last error: {:?}", last_error),
|
||||
times: self.max_retry,
|
||||
}
|
||||
.fail()
|
||||
|
||||
@@ -162,6 +162,7 @@ impl Inner {
|
||||
{
|
||||
let ask_leader = self.ask_leader()?;
|
||||
let mut times = 0;
|
||||
let mut last_error = None;
|
||||
|
||||
while times < self.max_retry {
|
||||
if let Some(leader) = &ask_leader.get_leader() {
|
||||
@@ -169,6 +170,7 @@ impl Inner {
|
||||
match body_fn(client).await {
|
||||
Ok(res) => {
|
||||
if util::is_not_leader(get_header(&res)) {
|
||||
last_error = Some(format!("{leader} is not a leader"));
|
||||
warn!("Failed to {task} to {leader}, not a leader");
|
||||
let leader = ask_leader.ask_leader().await?;
|
||||
info!("DDL client updated to new leader addr: {leader}");
|
||||
@@ -180,6 +182,7 @@ impl Inner {
|
||||
Err(status) => {
|
||||
// The leader may be unreachable.
|
||||
if util::is_unreachable(&status) {
|
||||
last_error = Some(status.to_string());
|
||||
warn!("Failed to {task} to {leader}, source: {status}");
|
||||
let leader = ask_leader.ask_leader().await?;
|
||||
info!("Procedure client updated to new leader addr: {leader}");
|
||||
@@ -196,7 +199,7 @@ impl Inner {
|
||||
}
|
||||
|
||||
error::RetryTimesExceededSnafu {
|
||||
msg: "Failed to {task}",
|
||||
msg: format!("Failed to {task}, last error: {:?}", last_error),
|
||||
times: self.max_retry,
|
||||
}
|
||||
.fail()
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, Role};
|
||||
use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
|
||||
use common_meta::cluster;
|
||||
use common_meta::cluster::{DatanodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
|
||||
use common_meta::peer::Peer;
|
||||
@@ -40,7 +40,7 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
|
||||
ctx: &mut Context,
|
||||
_acc: &mut HeartbeatAccumulator,
|
||||
) -> Result<HandleControl> {
|
||||
let Some((key, peer)) = extract_base_info(req, Role::Frontend) else {
|
||||
let Some((key, peer, info)) = extract_base_info(req, Role::Frontend) else {
|
||||
return Ok(HandleControl::Continue);
|
||||
};
|
||||
|
||||
@@ -48,6 +48,9 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
|
||||
peer,
|
||||
last_activity_ts: common_time::util::current_time_millis(),
|
||||
status: NodeStatus::Frontend(FrontendStatus {}),
|
||||
version: info.version,
|
||||
git_commit: info.git_commit,
|
||||
start_time_ms: info.start_time_ms,
|
||||
};
|
||||
|
||||
save_to_mem_store(key, value, ctx).await?;
|
||||
@@ -71,7 +74,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
|
||||
ctx: &mut Context,
|
||||
acc: &mut HeartbeatAccumulator,
|
||||
) -> Result<HandleControl> {
|
||||
let Some((key, peer)) = extract_base_info(req, Role::Datanode) else {
|
||||
let Some((key, peer, info)) = extract_base_info(req, Role::Datanode) else {
|
||||
return Ok(HandleControl::Continue);
|
||||
};
|
||||
|
||||
@@ -95,6 +98,9 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
|
||||
leader_regions,
|
||||
follower_regions,
|
||||
}),
|
||||
version: info.version,
|
||||
git_commit: info.git_commit,
|
||||
start_time_ms: info.start_time_ms,
|
||||
};
|
||||
|
||||
save_to_mem_store(key, value, ctx).await?;
|
||||
@@ -103,14 +109,22 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_base_info(req: &HeartbeatRequest, role: Role) -> Option<(NodeInfoKey, Peer)> {
|
||||
let HeartbeatRequest { header, peer, .. } = req;
|
||||
fn extract_base_info(
|
||||
req: &HeartbeatRequest,
|
||||
role: Role,
|
||||
) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> {
|
||||
let HeartbeatRequest {
|
||||
header, peer, info, ..
|
||||
} = req;
|
||||
let Some(header) = &header else {
|
||||
return None;
|
||||
};
|
||||
let Some(peer) = &peer else {
|
||||
return None;
|
||||
};
|
||||
let Some(info) = &info else {
|
||||
return None;
|
||||
};
|
||||
|
||||
Some((
|
||||
NodeInfoKey {
|
||||
@@ -122,6 +136,7 @@ fn extract_base_info(req: &HeartbeatRequest, role: Role) -> Option<(NodeInfoKey,
|
||||
node_id: peer.id,
|
||||
},
|
||||
Peer::from(peer.clone()),
|
||||
info.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@@ -128,6 +128,7 @@ impl cluster_server::Cluster for Metasrv {
|
||||
|
||||
impl Metasrv {
|
||||
pub fn is_leader(&self) -> bool {
|
||||
self.election().map(|x| x.is_leader()).unwrap_or(false)
|
||||
// Returns true when there is no `election`, indicating the presence of only one `Metasrv` node, which is the leader.
|
||||
self.election().map(|x| x.is_leader()).unwrap_or(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
common-base.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-config.workspace = true
|
||||
common-error.workspace = true
|
||||
common-grpc.workspace = true
|
||||
common-macro.workspace = true
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
|
||||
use datatypes::schema::Schema;
|
||||
use query::plan::LogicalPlan;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod configurator;
|
||||
pub mod error;
|
||||
@@ -46,12 +45,7 @@ pub mod server;
|
||||
mod shutdown;
|
||||
pub mod tls;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum Mode {
|
||||
Standalone,
|
||||
Distributed,
|
||||
}
|
||||
pub use common_config::Mode;
|
||||
|
||||
/// Cached SQL and logical plan for database interfaces
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -39,6 +39,7 @@ use common_test_util::temp_dir::create_temp_dir;
|
||||
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
|
||||
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
|
||||
use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig};
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
|
||||
use frontend::heartbeat::HeartbeatTask;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
@@ -357,6 +358,8 @@ impl GreptimeDbClusterBuilder {
|
||||
cached_meta_backend.clone(),
|
||||
]));
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
Mode::Distributed,
|
||||
Some(meta_client.clone()),
|
||||
cached_meta_backend.clone(),
|
||||
multi_cache_invalidator.clone(),
|
||||
)
|
||||
@@ -370,6 +373,7 @@ impl GreptimeDbClusterBuilder {
|
||||
]);
|
||||
|
||||
let heartbeat_task = HeartbeatTask::new(
|
||||
&FrontendOptions::default(),
|
||||
meta_client.clone(),
|
||||
HeartbeatOptions::default(),
|
||||
Arc::new(handlers_executor),
|
||||
|
||||
@@ -131,8 +131,13 @@ impl GreptimeDbStandaloneBuilder {
|
||||
table_metadata_manager.init().await.unwrap();
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
|
||||
let catalog_manager =
|
||||
KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await;
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
Mode::Standalone,
|
||||
None,
|
||||
kv_backend.clone(),
|
||||
multi_cache_invalidator.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
|
||||
|
||||
|
||||
@@ -854,17 +854,21 @@ async fn find_region_distribution_by_sql(cluster: &GreptimeDbCluster) -> RegionD
|
||||
|
||||
let OutputData::Stream(stream) = run_sql(
|
||||
&cluster.frontend,
|
||||
&format!(r#"select b.peer_id as datanode_id,
|
||||
&format!(
|
||||
r#"select b.peer_id as datanode_id,
|
||||
a.greptime_partition_id as region_id
|
||||
from information_schema.partitions a left join information_schema.greptime_region_peers b
|
||||
from information_schema.partitions a left join information_schema.region_peers b
|
||||
on a.greptime_partition_id = b.region_id
|
||||
where a.table_name='{TEST_TABLE_NAME}' order by datanode_id asc"#
|
||||
),
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await.unwrap().data else {
|
||||
unreachable!();
|
||||
};
|
||||
.await
|
||||
.unwrap()
|
||||
.data
|
||||
else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
USE INFORMATION_SCHEMA;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DESC TABLE CLUSTER_INFO;
|
||||
|
||||
+-------------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+-------------+----------------------+-----+------+---------+---------------+
|
||||
| peer_id | Int64 | | NO | | FIELD |
|
||||
| peer_type | String | | NO | | FIELD |
|
||||
| peer_addr | String | | YES | | FIELD |
|
||||
| version | String | | NO | | FIELD |
|
||||
| git_commit | String | | NO | | FIELD |
|
||||
| start_time | TimestampMillisecond | | YES | | FIELD |
|
||||
| uptime | String | | YES | | FIELD |
|
||||
| active_time | String | | YES | | FIELD |
|
||||
+-------------+----------------------+-----+------+---------+---------------+
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE unknown UNKNOWN
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
|
||||
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE unknown UNKNOWN
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;
|
||||
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE unknown UNKNOWN
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
|
||||
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration|+++++++++
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE unknown UNKNOWN
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
|
||||
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE unknown UNKNOWN
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 1 ORDER BY peer_type;
|
||||
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration|+++++++++
|
||||
|
||||
USE PUBLIC;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
50
tests/cases/distributed/information_schema/cluster_info.sql
Normal file
50
tests/cases/distributed/information_schema/cluster_info.sql
Normal file
@@ -0,0 +1,50 @@
|
||||
USE INFORMATION_SCHEMA;
|
||||
|
||||
DESC TABLE CLUSTER_INFO;
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE unknown UNKNOWN
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE unknown UNKNOWN
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE unknown UNKNOWN
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE unknown UNKNOWN
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE unknown UNKNOWN
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 1 ORDER BY peer_type;
|
||||
|
||||
USE PUBLIC;
|
||||
@@ -24,7 +24,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres
|
||||
|
||||
-- SQLNESS REPLACE (\d{13}) REGION_ID
|
||||
-- SQLNESS REPLACE (\d{1}) PEER_ID
|
||||
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
|
||||
SELECT region_id, peer_id, is_leader, status FROM information_schema.region_peers ORDER BY peer_id;
|
||||
|
||||
+---------------+---------+-----------+--------+
|
||||
| region_id | peer_id | is_leader | status |
|
||||
@@ -128,7 +128,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres
|
||||
|
||||
-- SQLNESS REPLACE (\d{13}) REGION_ID
|
||||
-- SQLNESS REPLACE (\d{1}) PEER_ID
|
||||
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
|
||||
SELECT region_id, peer_id, is_leader, status FROM information_schema.region_peers ORDER BY peer_id;
|
||||
|
||||
+---------------+---------+-----------+--------+
|
||||
| region_id | peer_id | is_leader | status |
|
||||
@@ -148,7 +148,6 @@ INSERT INTO my_table VALUES
|
||||
|
||||
Affected Rows: 8
|
||||
|
||||
|
||||
SELECT * FROM my_table;
|
||||
|
||||
+------+---+-------------------------+
|
||||
|
||||
@@ -14,7 +14,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres
|
||||
|
||||
-- SQLNESS REPLACE (\d{13}) REGION_ID
|
||||
-- SQLNESS REPLACE (\d{1}) PEER_ID
|
||||
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
|
||||
SELECT region_id, peer_id, is_leader, status FROM information_schema.region_peers ORDER BY peer_id;
|
||||
|
||||
INSERT INTO my_table VALUES
|
||||
(100, 'a', 1),
|
||||
@@ -54,7 +54,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres
|
||||
|
||||
-- SQLNESS REPLACE (\d{13}) REGION_ID
|
||||
-- SQLNESS REPLACE (\d{1}) PEER_ID
|
||||
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
|
||||
SELECT region_id, peer_id, is_leader, status FROM information_schema.region_peers ORDER BY peer_id;
|
||||
|
||||
INSERT INTO my_table VALUES
|
||||
(100, 'a', 1),
|
||||
@@ -65,7 +65,7 @@ INSERT INTO my_table VALUES
|
||||
(2100, 'f', 6),
|
||||
(2200, 'g', 7),
|
||||
(2400, 'h', 8);
|
||||
|
||||
|
||||
SELECT * FROM my_table;
|
||||
|
||||
DROP TABLE my_table;
|
||||
|
||||
@@ -20,6 +20,7 @@ show tables;
|
||||
| build_info |
|
||||
| character_sets |
|
||||
| check_constraints |
|
||||
| cluster_info |
|
||||
| collation_character_set_applicability |
|
||||
| collations |
|
||||
| column_privileges |
|
||||
@@ -29,13 +30,13 @@ show tables;
|
||||
| events |
|
||||
| files |
|
||||
| global_status |
|
||||
| greptime_region_peers |
|
||||
| key_column_usage |
|
||||
| optimizer_trace |
|
||||
| parameters |
|
||||
| partitions |
|
||||
| profiling |
|
||||
| referential_constraints |
|
||||
| region_peers |
|
||||
| routines |
|
||||
| runtime_metrics |
|
||||
| schema_privileges |
|
||||
|
||||
@@ -15,6 +15,7 @@ order by table_schema, table_name;
|
||||
| greptime | information_schema | build_info | LOCAL TEMPORARY | 8 | |
|
||||
| greptime | information_schema | character_sets | LOCAL TEMPORARY | 9 | |
|
||||
| greptime | information_schema | check_constraints | LOCAL TEMPORARY | 12 | |
|
||||
| greptime | information_schema | cluster_info | LOCAL TEMPORARY | 31 | |
|
||||
| greptime | information_schema | collation_character_set_applicability | LOCAL TEMPORARY | 11 | |
|
||||
| greptime | information_schema | collations | LOCAL TEMPORARY | 10 | |
|
||||
| greptime | information_schema | column_privileges | LOCAL TEMPORARY | 6 | |
|
||||
@@ -24,13 +25,13 @@ order by table_schema, table_name;
|
||||
| greptime | information_schema | events | LOCAL TEMPORARY | 13 | |
|
||||
| greptime | information_schema | files | LOCAL TEMPORARY | 14 | |
|
||||
| greptime | information_schema | global_status | LOCAL TEMPORARY | 25 | |
|
||||
| greptime | information_schema | greptime_region_peers | LOCAL TEMPORARY | 29 | |
|
||||
| greptime | information_schema | key_column_usage | LOCAL TEMPORARY | 16 | |
|
||||
| greptime | information_schema | optimizer_trace | LOCAL TEMPORARY | 17 | |
|
||||
| greptime | information_schema | parameters | LOCAL TEMPORARY | 18 | |
|
||||
| greptime | information_schema | partitions | LOCAL TEMPORARY | 28 | |
|
||||
| greptime | information_schema | profiling | LOCAL TEMPORARY | 19 | |
|
||||
| greptime | information_schema | referential_constraints | LOCAL TEMPORARY | 20 | |
|
||||
| greptime | information_schema | region_peers | LOCAL TEMPORARY | 29 | |
|
||||
| greptime | information_schema | routines | LOCAL TEMPORARY | 21 | |
|
||||
| greptime | information_schema | runtime_metrics | LOCAL TEMPORARY | 27 | |
|
||||
| greptime | information_schema | schema_privileges | LOCAL TEMPORARY | 22 | |
|
||||
@@ -61,6 +62,14 @@ select * from information_schema.columns order by table_schema, table_name, colu
|
||||
| greptime | information_schema | check_constraints | constraint_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | check_constraints | constraint_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | check_constraints | constraint_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | cluster_info | active_time | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | git_commit | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | cluster_info | peer_addr | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | peer_id | 1 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
|
||||
| greptime | information_schema | cluster_info | peer_type | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | cluster_info | start_time | 6 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | |
|
||||
| greptime | information_schema | cluster_info | uptime | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | cluster_info | version | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | collation_character_set_applicability | character_set_name | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | collation_character_set_applicability | collation_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | collations | character_set_name | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
@@ -174,12 +183,6 @@ select * from information_schema.columns order by table_schema, table_name, colu
|
||||
| greptime | information_schema | files | version | 25 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | global_status | variable_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | global_status | variable_value | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | greptime_region_peers | down_seconds | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | |
|
||||
| greptime | information_schema | greptime_region_peers | is_leader | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | greptime_region_peers | peer_addr | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | greptime_region_peers | peer_id | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | greptime_region_peers | region_id | 1 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | greptime_region_peers | status | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | key_column_usage | column_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | key_column_usage | constraint_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | key_column_usage | constraint_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
@@ -268,6 +271,12 @@ select * from information_schema.columns order by table_schema, table_name, colu
|
||||
| greptime | information_schema | referential_constraints | unique_constraint_name | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | referential_constraints | unique_constraint_schema | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | referential_constraints | update_rule | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | region_peers | down_seconds | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | |
|
||||
| greptime | information_schema | region_peers | is_leader | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | region_peers | peer_addr | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | region_peers | peer_id | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
|
||||
| greptime | information_schema | region_peers | region_id | 1 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
|
||||
| greptime | information_schema | region_peers | status | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | routines | character_maximum_length | 7 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
|
||||
| greptime | information_schema | routines | character_octet_length | 8 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
|
||||
| greptime | information_schema | routines | character_set_client | 29 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
@@ -301,8 +310,8 @@ select * from information_schema.columns order by table_schema, table_name, colu
|
||||
| greptime | information_schema | routines | sql_path | 22 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | runtime_metrics | labels | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | runtime_metrics | metric_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | runtime_metrics | node | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | runtime_metrics | node_type | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | runtime_metrics | peer_addr | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | runtime_metrics | peer_type | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | runtime_metrics | timestamp | 6 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | No | timestamp(3) | | |
|
||||
| greptime | information_schema | runtime_metrics | value | 2 | | | 22 | | | | | | | select,insert | | Float64 | double | FIELD | | No | double | | |
|
||||
| greptime | information_schema | schema_privileges | grantee | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
@@ -678,12 +687,12 @@ DESC TABLE RUNTIME_METRICS;
|
||||
| metric_name | String | | NO | | FIELD |
|
||||
| value | Float64 | | NO | | FIELD |
|
||||
| labels | String | | YES | | FIELD |
|
||||
| node | String | | NO | | FIELD |
|
||||
| node_type | String | | NO | | FIELD |
|
||||
| peer_addr | String | | YES | | FIELD |
|
||||
| peer_type | String | | NO | | FIELD |
|
||||
| timestamp | TimestampMillisecond | | NO | | FIELD |
|
||||
+-------------+----------------------+-----+------+---------+---------------+
|
||||
|
||||
DESC TABLE GREPTIME_REGION_PEERS;
|
||||
DESC TABLE REGION_PEERS;
|
||||
|
||||
+--------------+--------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
|
||||
@@ -117,7 +117,7 @@ SELECT * FROM CHECK_CONSTRAINTS;
|
||||
|
||||
DESC TABLE RUNTIME_METRICS;
|
||||
|
||||
DESC TABLE GREPTIME_REGION_PEERS;
|
||||
DESC TABLE REGION_PEERS;
|
||||
|
||||
USE INFORMATION_SCHEMA;
|
||||
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
USE INFORMATION_SCHEMA;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DESC TABLE CLUSTER_INFO;
|
||||
|
||||
+-------------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+-------------+----------------------+-----+------+---------+---------------+
|
||||
| peer_id | Int64 | | NO | | FIELD |
|
||||
| peer_type | String | | NO | | FIELD |
|
||||
| peer_addr | String | | YES | | FIELD |
|
||||
| version | String | | NO | | FIELD |
|
||||
| git_commit | String | | NO | | FIELD |
|
||||
| start_time | TimestampMillisecond | | YES | | FIELD |
|
||||
| uptime | String | | YES | | FIELD |
|
||||
| active_time | String | | YES | | FIELD |
|
||||
+-------------+----------------------+-----+------+---------+---------------+
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE (\d\.\d\.\d) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO;
|
||||
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|STANDALONE||Version|Hash|Start_time|Duration||+++++++++
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE (\d\.\d\.\d) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'STANDALONE';
|
||||
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|STANDALONE||Version|Hash|Start_time|Duration||+++++++++
|
||||
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE';
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE (\d\.\d\.\d) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_ID = 0;
|
||||
|
||||
++++
|
||||
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 0;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
USE PUBLIC;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
33
tests/cases/standalone/information_schema/cluster_info.sql
Normal file
33
tests/cases/standalone/information_schema/cluster_info.sql
Normal file
@@ -0,0 +1,33 @@
|
||||
USE INFORMATION_SCHEMA;
|
||||
|
||||
DESC TABLE CLUSTER_INFO;
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE (\d\.\d\.\d) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO;
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE (\d\.\d\.\d) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'STANDALONE';
|
||||
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE';
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE (\d\.\d\.\d) Version
|
||||
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
|
||||
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
|
||||
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_ID = 0;
|
||||
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 0;
|
||||
|
||||
USE PUBLIC;
|
||||
Reference in New Issue
Block a user