diff --git a/Cargo.lock b/Cargo.lock index b801b342c1..63ba289947 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8324,6 +8324,7 @@ dependencies = [ "either", "futures", "greptime-proto", + "humantime", "humantime-serde", "index", "itertools 0.14.0", diff --git a/src/catalog/src/system_schema/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs index 9715aa9402..a35950194c 100644 --- a/src/catalog/src/system_schema/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -20,6 +20,7 @@ pub mod key_column_usage; mod partitions; mod procedure_info; pub mod process_list; +mod region_info; pub mod region_peers; mod region_statistics; pub mod schemata; @@ -47,6 +48,8 @@ use datatypes::schema::SchemaRef; use lazy_static::lazy_static; use paste::paste; use process_list::InformationSchemaProcessList; +use region_info::InformationSchemaRegionInfo; +use store_api::region_info::RegionInfoEntry; use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry}; use store_api::storage::{ScanRequest, TableId}; use table::TableRef; @@ -242,6 +245,9 @@ impl SystemSchemaProviderInner for InformationSchemaProvider { self.catalog_manager.clone(), ), ) as _), + REGION_INFO => Some(Arc::new(InformationSchemaRegionInfo::new( + self.catalog_manager.clone(), + )) as _), PROCESS_LIST => self .process_manager .as_ref() @@ -320,6 +326,10 @@ impl InformationSchemaProvider { REGION_STATISTICS.to_string(), self.build_table(REGION_STATISTICS).unwrap(), ); + tables.insert( + REGION_INFO.to_string(), + self.build_table(REGION_INFO).unwrap(), + ); tables.insert( SSTS_MANIFEST.to_string(), self.build_table(SSTS_MANIFEST).unwrap(), @@ -447,6 +457,8 @@ pub enum DatanodeInspectKind { SstStorage, /// List index metadata collected from manifest SstIndexMeta, + /// List region runtime and manifest info + RegionInfo, } impl DatanodeInspectRequest { @@ -456,6 +468,7 @@ impl DatanodeInspectRequest { DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan), DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan), DatanodeInspectKind::SstIndexMeta => PuffinIndexMetaEntry::build_plan(self.scan), + DatanodeInspectKind::RegionInfo => RegionInfoEntry::build_plan(self.scan), } } } @@ -488,3 +501,28 @@ impl InformationExtension for NoopInformationExtension { Ok(common_recordbatch::RecordBatches::empty().as_stream()) } } + +#[cfg(test)] +mod tests { + use store_api::region_info::RegionInfoEntry; + + use super::*; + + #[test] + fn test_datanode_inspect_region_info_build_plan() { + let plan = DatanodeInspectRequest { + kind: DatanodeInspectKind::RegionInfo, + scan: ScanRequest::default(), + } + .build_plan() + .unwrap(); + + let LogicalPlan::TableScan(scan) = plan else { + panic!("expected table scan"); + }; + assert_eq!( + scan.table_name.to_string(), + RegionInfoEntry::reserved_table_name_for_inspection() + ); + } +} diff --git a/src/catalog/src/system_schema/information_schema/region_info.rs b/src/catalog/src/system_schema/information_schema/region_info.rs new file mode 100644 index 0000000000..ffc9dfc7ae --- /dev/null +++ b/src/catalog/src/system_schema/information_schema/region_info.rs @@ -0,0 +1,86 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; + +use common_catalog::consts::INFORMATION_SCHEMA_REGION_INFO_TABLE_ID; +use common_error::ext::BoxedError; +use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter; +use datatypes::schema::SchemaRef; +use snafu::ResultExt; +use store_api::region_info::RegionInfoEntry; +use store_api::storage::{ScanRequest, TableId}; + +use crate::CatalogManager; +use crate::error::{ProjectSchemaSnafu, Result}; +use crate::information_schema::{ + DatanodeInspectKind, DatanodeInspectRequest, InformationTable, REGION_INFO, +}; +use crate::system_schema::utils; + +/// Information schema table for region info. +pub struct InformationSchemaRegionInfo { + schema: SchemaRef, + catalog_manager: Weak, +} + +impl InformationSchemaRegionInfo { + pub(super) fn new(catalog_manager: Weak) -> Self { + Self { + schema: RegionInfoEntry::schema(), + catalog_manager, + } + } +} + +impl InformationTable for InformationSchemaRegionInfo { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_REGION_INFO_TABLE_ID + } + + fn table_name(&self) -> &'static str { + REGION_INFO + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, request: ScanRequest) -> Result { + let schema = if let Some(p) = request.projection_indices() { + Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?) + } else { + self.schema.clone() + }; + + let info_ext = utils::information_extension(&self.catalog_manager)?; + let req = DatanodeInspectRequest { + kind: DatanodeInspectKind::RegionInfo, + scan: request, + }; + + let future = async move { + info_ext + .inspect_datanode(req) + .await + .map_err(BoxedError::new) + .context(common_recordbatch::error::ExternalSnafu) + }; + Ok(Box::pin(AsyncRecordBatchStreamAdapter::new( + schema, + Box::pin(future), + ))) + } +} diff --git a/src/catalog/src/system_schema/information_schema/table_names.rs b/src/catalog/src/system_schema/information_schema/table_names.rs index 2a3329fece..3a4c86487a 100644 --- a/src/catalog/src/system_schema/information_schema/table_names.rs +++ b/src/catalog/src/system_schema/information_schema/table_names.rs @@ -45,6 +45,7 @@ pub const CLUSTER_INFO: &str = "cluster_info"; pub const VIEWS: &str = "views"; pub const FLOWS: &str = "flows"; pub const PROCEDURE_INFO: &str = "procedure_info"; +pub const REGION_INFO: &str = "region_info"; pub const REGION_STATISTICS: &str = "region_statistics"; pub const PROCESS_LIST: &str = "process_list"; pub const SSTS_MANIFEST: &str = "ssts_manifest"; diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 1cd5db8a0c..dd09893177 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -112,6 +112,8 @@ pub const INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID: u32 = 38; pub const INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID: u32 = 39; /// id for information_schema.alerts pub const INFORMATION_SCHEMA_ALERTS_TABLE_ID: u32 = 40; +/// id for information_schema.region_info +pub const INFORMATION_SCHEMA_REGION_INFO_TABLE_ID: u32 = 41; // ----- End of information_schema tables ----- diff --git a/src/datanode/src/region_server/catalog.rs b/src/datanode/src/region_server/catalog.rs index 1c0f48951f..a4df422b75 100644 --- a/src/datanode/src/region_server/catalog.rs +++ b/src/datanode/src/region_server/catalog.rs @@ -27,6 +27,7 @@ use datafusion_expr::{LogicalPlan, TableSource}; use futures::TryStreamExt; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; +use store_api::region_info::RegionInfoEntry; use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry}; use store_api::storage::RegionId; @@ -41,6 +42,7 @@ enum InternalTableKind { InspectSstManifest, InspectSstStorage, InspectSstIndexMeta, + InspectRegionInfo, } impl InternalTableKind { @@ -55,6 +57,9 @@ impl InternalTableKind { if name.eq_ignore_ascii_case(PuffinIndexMetaEntry::reserved_table_name_for_inspection()) { return Some(Self::InspectSstIndexMeta); } + if name.eq_ignore_ascii_case(RegionInfoEntry::reserved_table_name_for_inspection()) { + return Some(Self::InspectRegionInfo); + } None } @@ -64,6 +69,7 @@ impl InternalTableKind { Self::InspectSstManifest => server.inspect_sst_manifest_provider().await, Self::InspectSstStorage => server.inspect_sst_storage_provider().await, Self::InspectSstIndexMeta => server.inspect_sst_index_meta_provider().await, + Self::InspectRegionInfo => server.inspect_region_info_provider().await, } } } @@ -128,6 +134,25 @@ impl RegionServer { let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?; Ok(Arc::new(table)) } + + /// Expose region info across the engine as an in-memory table. + pub async fn inspect_region_info_provider(&self) -> Result> { + let mito = { + let guard = self.inner.mito_engine.read().unwrap(); + guard.as_ref().cloned().context(UnexpectedSnafu { + violated: "mito engine not available", + })? + }; + + let entries = mito.all_region_infos().await; + let schema = RegionInfoEntry::schema().arrow_schema().clone(); + let batch = RegionInfoEntry::to_record_batch(&entries) + .map_err(DataFusionError::from) + .context(DataFusionSnafu)?; + + let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?; + Ok(Arc::new(table)) + } } /// A catalog list that resolves `TableProvider` by table name: @@ -347,6 +372,7 @@ mod tests { use datatypes::arrow::array::Int32Array; use datatypes::arrow::datatypes::{DataType, Field, Schema}; use datatypes::arrow::record_batch::RecordBatch; + use store_api::region_info::RegionInfoEntry; use super::*; // bring rewrite() into scope @@ -409,6 +435,18 @@ mod tests { b3.reserved_table_needed, vec![InternalTableKind::InspectSstManifest] ); + + let region_info = RegionInfoEntry::reserved_table_name_for_inspection(); + let plan4 = table_scan(Some(region_info), &schema, None) + .unwrap() + .build() + .unwrap(); + let b4 = NameAwareDataSourceInjectorBuilder::from_plan(&plan4).unwrap(); + assert!(!b4.need_region_provider); + assert_eq!( + b4.reserved_table_needed, + vec![InternalTableKind::InspectRegionInfo] + ); } #[test] @@ -445,6 +483,39 @@ mod tests { } } + #[test] + fn test_rewriter_replaces_with_region_info_reserved_source() { + let schema = test_schema(); + let table_name = RegionInfoEntry::reserved_table_name_for_inspection(); + let plan = table_scan(Some(table_name), &schema, None) + .unwrap() + .build() + .unwrap(); + + let provider = empty_mem_table(); + let source = provider_as_source(provider); + + let mut injector = NameAwareDataSourceInjector { + reserved_sources: { + let mut m = HashMap::new(); + m.insert(InternalTableKind::InspectRegionInfo, source.clone()); + m + }, + region_source: None, + }; + + let transformed = plan.rewrite(&mut injector).unwrap(); + let new_plan = transformed.data; + + if let LogicalPlan::TableScan(scan) = new_plan { + let src_ptr = Arc::as_ptr(&scan.source); + let want_ptr = Arc::as_ptr(&source); + assert!(std::ptr::eq(src_ptr, want_ptr)); + } else { + panic!("expected TableScan after rewrite"); + } + } + #[test] fn test_rewriter_replaces_with_region_source_for_normal() { let schema = test_schema(); diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index dde1e44ea1..3e3a18a24d 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -53,6 +53,7 @@ dashmap.workspace = true dotenv.workspace = true either.workspace = true futures.workspace = true +humantime.workspace = true humantime-serde.workspace = true index.workspace = true itertools.workspace = true diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index ec38dec105..64f7576139 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -117,6 +117,7 @@ use store_api::region_engine::{ RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState, SyncRegionFromRequest, SyncRegionFromResponse, }; +use store_api::region_info::RegionInfoEntry; use store_api::region_request::{ AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest, }; @@ -653,6 +654,16 @@ impl MitoEngine { results } + /// Lists region info entries of all regions in the engine. + pub async fn all_region_infos(&self) -> Vec { + let node_id = self.inner.workers.file_ref_manager().node_id(); + self.inner + .workers + .all_regions() + .map(|region| region.region_info_entry(node_id)) + .collect() + } + /// Lists all SSTs from the storage layer of all regions in the engine. pub fn all_ssts_from_storage(&self) -> impl Stream> { let node_id = self.inner.workers.file_ref_manager().node_id(); diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index f256f88694..e1e462f692 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -978,6 +978,58 @@ async fn test_list_ssts_with_format( assert_eq!(debug_format, expected_storage_ssts, "{}", debug_format); } +#[tokio::test] +async fn test_all_region_infos() { + let mut env = TestEnv::with_prefix("all-region-infos").await; + let engine = env + .create_engine(MitoConfig { + default_flat_format: true, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1024, 7); + let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows_for_key("region-info", 0, 3, 0), + }; + put_rows(&engine, region_id, rows).await; + engine + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest::default()), + ) + .await + .unwrap(); + + let entries = engine.all_region_infos().await; + let entry = entries + .iter() + .find(|entry| entry.region_id == region_id) + .expect("region info entry should exist"); + + assert_eq!(region_id.as_u64(), entry.region_id.as_u64()); + assert_eq!(region_id.table_id(), entry.table_id); + assert_eq!(region_id.region_number(), entry.region_number); + assert_eq!(region_id.region_group(), entry.region_group); + assert_eq!(region_id.region_sequence(), entry.region_sequence); + assert!(!entry.state.is_empty()); + assert_eq!("Leader", entry.role); + assert!(entry.writable); + assert_eq!(3, entry.committed_sequence); + assert_eq!(Some(3), entry.flushed_sequence); + assert!(entry.manifest_version > 0); + assert!(serde_json::from_str::(&entry.region_options).is_ok()); + assert_eq!("flat", entry.sst_format); +} + #[tokio::test] async fn test_all_index_metas_list_all_types() { test_all_index_metas_list_all_types_with_format(false, r#" diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index c85599bf58..f6d2a17bba 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -37,6 +37,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState, }; +use store_api::region_info::RegionInfoEntry; use store_api::region_request::{PathType, StagingPartitionDirective}; use store_api::sst_entry::ManifestSstEntry; use store_api::storage::{FileId, RegionId, SequenceNumber}; @@ -111,6 +112,22 @@ impl RegionRoleState { RegionRoleState::Follower => None, } } + + pub(crate) fn as_str(&self) -> &'static str { + match self { + RegionRoleState::Follower => "Follower", + RegionRoleState::Leader(RegionLeaderState::Writable) => "Leader(Writable)", + RegionRoleState::Leader(RegionLeaderState::Staging) => "Leader(Staging)", + RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => { + "Leader(EnteringStaging)" + } + RegionRoleState::Leader(RegionLeaderState::Altering) => "Leader(Altering)", + RegionRoleState::Leader(RegionLeaderState::Dropping) => "Leader(Dropping)", + RegionRoleState::Leader(RegionLeaderState::Truncating) => "Leader(Truncating)", + RegionRoleState::Leader(RegionLeaderState::Editing) => "Leader(Editing)", + RegionRoleState::Leader(RegionLeaderState::Downgrading) => "Leader(Downgrading)", + } + } } /// Metadata and runtime status of a region. @@ -648,6 +665,41 @@ impl MitoRegion { self.access_layer.clone() } + /// Returns the region info entry of the region. + pub(crate) fn region_info_entry(&self, node_id: Option) -> RegionInfoEntry { + let region_id = self.region_id; + let version = self.version(); + let state = self.state(); + let role = self.region_role(); + let region_options = serde_json::to_string(&version.options) + .unwrap_or_else(|err| serde_json::json!({ "error": err.to_string() }).to_string()); + let sst_format = match version.options.sst_format.unwrap_or_default() { + crate::sst::FormatType::PrimaryKey => "primary_key", + crate::sst::FormatType::Flat => "flat", + } + .to_string(); + + RegionInfoEntry { + region_id, + table_id: region_id.table_id(), + region_number: region_id.region_number(), + region_group: region_id.region_group(), + region_sequence: region_id.region_sequence(), + state: state.as_str().to_string(), + role: role.to_string(), + writable: self.is_writable(), + committed_sequence: self.find_committed_sequence(), + flushed_sequence: Some(self.flushed_sequence()).filter(|sequence| *sequence > 0), + manifest_version: self.stats.manifest_version(), + compaction_time_window: version + .compaction_time_window + .map(|duration| humantime::format_duration(duration).to_string()), + region_options, + sst_format, + node_id, + } + } + /// Returns the SST entries of the region. pub async fn manifest_sst_entries(&self) -> Vec { let table_dir = self.table_dir(); @@ -1623,6 +1675,23 @@ mod tests { assert!(AtomicCell::::is_lock_free()); } + #[test] + fn test_region_role_state_as_str() { + assert_eq!("Follower", RegionRoleState::Follower.as_str()); + assert_eq!( + "Leader(Writable)", + RegionRoleState::Leader(RegionLeaderState::Writable).as_str() + ); + assert_eq!( + "Leader(Staging)", + RegionRoleState::Leader(RegionLeaderState::Staging).as_str() + ); + assert_eq!( + "Leader(Downgrading)", + RegionRoleState::Leader(RegionLeaderState::Downgrading).as_str() + ); + } + async fn build_test_region(env: &SchedulerEnv) -> MitoRegion { let builder = VersionControlBuilder::new(); let version_control = Arc::new(builder.build()); diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index cb39875d74..f97e348842 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -23,6 +23,7 @@ mod metrics; pub mod mito_engine_options; pub mod path_utils; pub mod region_engine; +pub mod region_info; pub mod region_request; pub mod sst_entry; pub mod storage; diff --git a/src/store-api/src/region_info.rs b/src/store-api/src/region_info.rs new file mode 100644 index 0000000000..2d099d2734 --- /dev/null +++ b/src/store-api/src/region_info.rs @@ -0,0 +1,360 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_recordbatch::DfRecordBatch; +use datafusion_common::DataFusionError; +use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource}; +use datatypes::arrow::array::{ + ArrayRef, BooleanArray, StringArray, UInt8Array, UInt32Array, UInt64Array, +}; +use datatypes::arrow::error::ArrowError; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use serde::{Deserialize, Serialize}; + +use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId}; + +/// Runtime and manifest information of a region for inspection. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RegionInfoEntry { + /// The region id. + pub region_id: RegionId, + /// The table id this region belongs to. + pub table_id: TableId, + /// The region number inside the table. + pub region_number: RegionNumber, + /// The region group. + pub region_group: RegionGroup, + /// The region sequence inside the group. + pub region_sequence: RegionSeq, + /// The full runtime role/state label. + pub state: String, + /// The coarse region role. + pub role: String, + /// Whether the region accepts writes. + pub writable: bool, + /// The committed sequence of the region. + pub committed_sequence: u64, + /// The latest sequence that has been persisted into SSTs. + pub flushed_sequence: Option, + /// The manifest version of the region. + pub manifest_version: u64, + /// Human-readable compaction time window. + pub compaction_time_window: Option, + /// Region options encoded as JSON. + pub region_options: String, + /// SST format used by the region. + pub sst_format: String, + /// Datanode id that reports the row. + pub node_id: Option, +} + +impl RegionInfoEntry { + /// Returns the schema of the region info entry. + pub fn schema() -> SchemaRef { + use datatypes::prelude::ConcreteDataType as Ty; + Arc::new(Schema::new(vec![ + ColumnSchema::new("region_id", Ty::uint64_datatype(), false), + ColumnSchema::new("table_id", Ty::uint32_datatype(), false), + ColumnSchema::new("region_number", Ty::uint32_datatype(), false), + ColumnSchema::new("region_group", Ty::uint8_datatype(), false), + ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false), + ColumnSchema::new("state", Ty::string_datatype(), false), + ColumnSchema::new("role", Ty::string_datatype(), false), + ColumnSchema::new("writable", Ty::boolean_datatype(), false), + ColumnSchema::new("committed_sequence", Ty::uint64_datatype(), false), + ColumnSchema::new("flushed_sequence", Ty::uint64_datatype(), true), + ColumnSchema::new("manifest_version", Ty::uint64_datatype(), false), + ColumnSchema::new("compaction_time_window", Ty::string_datatype(), true), + ColumnSchema::new("region_options", Ty::string_datatype(), false), + ColumnSchema::new("sst_format", Ty::string_datatype(), false), + ColumnSchema::new("node_id", Ty::uint64_datatype(), true), + ])) + } + + /// Converts a list of region info entries to a record batch. + pub fn to_record_batch(entries: &[Self]) -> Result { + let schema = Self::schema(); + let region_ids = entries.iter().map(|e| e.region_id.as_u64()); + let table_ids = entries.iter().map(|e| e.table_id); + let region_numbers = entries.iter().map(|e| e.region_number); + let region_groups = entries.iter().map(|e| e.region_group); + let region_sequences = entries.iter().map(|e| e.region_sequence); + let states = entries.iter().map(|e| e.state.as_str()); + let roles = entries.iter().map(|e| e.role.as_str()); + let writable = entries.iter().map(|e| e.writable); + let committed_sequences = entries.iter().map(|e| e.committed_sequence); + let flushed_sequences = entries.iter().map(|e| e.flushed_sequence); + let manifest_versions = entries.iter().map(|e| e.manifest_version); + let compaction_time_windows = entries.iter().map(|e| e.compaction_time_window.as_ref()); + let region_options = entries.iter().map(|e| e.region_options.as_str()); + let sst_formats = entries.iter().map(|e| e.sst_format.as_str()); + let node_ids = entries.iter().map(|e| e.node_id); + + let columns: Vec = vec![ + Arc::new(UInt64Array::from_iter_values(region_ids)), + Arc::new(UInt32Array::from_iter_values(table_ids)), + Arc::new(UInt32Array::from_iter_values(region_numbers)), + Arc::new(UInt8Array::from_iter_values(region_groups)), + Arc::new(UInt32Array::from_iter_values(region_sequences)), + Arc::new(StringArray::from_iter_values(states)), + Arc::new(StringArray::from_iter_values(roles)), + Arc::new(BooleanArray::from_iter(writable)), + Arc::new(UInt64Array::from_iter_values(committed_sequences)), + Arc::new(UInt64Array::from_iter(flushed_sequences)), + Arc::new(UInt64Array::from_iter_values(manifest_versions)), + Arc::new(StringArray::from_iter(compaction_time_windows)), + Arc::new(StringArray::from_iter_values(region_options)), + Arc::new(StringArray::from_iter_values(sst_formats)), + Arc::new(UInt64Array::from_iter(node_ids)), + ]; + + DfRecordBatch::try_new(schema.arrow_schema().clone(), columns) + } + + /// Reserved internal inspect table name for region info. + pub fn reserved_table_name_for_inspection() -> &'static str { + "__inspect/__mito/__region_info" + } + + /// Builds a logical plan for scanning region info entries. + pub fn build_plan(scan_request: ScanRequest) -> Result { + let table_source = LogicalTableSource::new(Self::schema().arrow_schema().clone()); + + let projection = scan_request.projection_input.map(|input| input.projection); + let mut builder = LogicalPlanBuilder::scan( + Self::reserved_table_name_for_inspection(), + Arc::new(table_source), + projection, + )?; + + for filter in scan_request.filters { + builder = builder.filter(filter)?; + } + + if let Some(limit) = scan_request.limit { + builder = builder.limit(0, Some(limit))?; + } + + builder.build() + } +} + +#[cfg(test)] +mod tests { + use datafusion_common::TableReference; + use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit}; + use datatypes::arrow::array::{ + Array, BooleanArray, StringArray, UInt8Array, UInt32Array, UInt64Array, + }; + + use super::*; + use crate::storage::{RegionId, ScanRequest}; + + #[test] + fn test_region_info_schema() { + let schema = RegionInfoEntry::schema(); + let columns = schema.column_schemas(); + + let names = columns.iter().map(|c| c.name.as_str()).collect::>(); + assert_eq!( + names, + vec![ + "region_id", + "table_id", + "region_number", + "region_group", + "region_sequence", + "state", + "role", + "writable", + "committed_sequence", + "flushed_sequence", + "manifest_version", + "compaction_time_window", + "region_options", + "sst_format", + "node_id", + ] + ); + assert!(!columns[0].is_nullable()); + assert!(!columns[8].is_nullable()); + assert!(columns[9].is_nullable()); + assert!(columns[11].is_nullable()); + assert!(columns[14].is_nullable()); + } + + #[test] + fn test_region_info_to_record_batch() { + let region_id1 = RegionId::with_group_and_seq(10, 1, 20); + let region_id2 = RegionId::with_group_and_seq(11, 0, 21); + let entries = vec![ + RegionInfoEntry { + region_id: region_id1, + table_id: region_id1.table_id(), + region_number: region_id1.region_number(), + region_group: region_id1.region_group(), + region_sequence: region_id1.region_sequence(), + state: "Leader(Writable)".to_string(), + role: "Leader".to_string(), + writable: true, + committed_sequence: 42, + flushed_sequence: Some(41), + manifest_version: 7, + compaction_time_window: Some("1h".to_string()), + region_options: "{\"sst_format\":\"flat\"}".to_string(), + sst_format: "flat".to_string(), + node_id: Some(3), + }, + RegionInfoEntry { + region_id: region_id2, + table_id: region_id2.table_id(), + region_number: region_id2.region_number(), + region_group: region_id2.region_group(), + region_sequence: region_id2.region_sequence(), + state: "Follower".to_string(), + role: "Follower".to_string(), + writable: false, + committed_sequence: 9, + flushed_sequence: None, + manifest_version: 2, + compaction_time_window: None, + region_options: "{}".to_string(), + sst_format: "primary_key".to_string(), + node_id: None, + }, + ]; + + let batch = RegionInfoEntry::to_record_batch(&entries).unwrap(); + assert_eq!(batch.num_rows(), 2); + + let region_ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(region_id1.as_u64(), region_ids.value(0)); + assert_eq!(region_id2.as_u64(), region_ids.value(1)); + + let table_ids = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(10, table_ids.value(0)); + assert_eq!(11, table_ids.value(1)); + + let region_groups = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(1, region_groups.value(0)); + assert_eq!(0, region_groups.value(1)); + + let states = batch + .column(5) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("Leader(Writable)", states.value(0)); + assert_eq!("Follower", states.value(1)); + + let writable = batch + .column(7) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(writable.value(0)); + assert!(!writable.value(1)); + + let committed_sequences = batch + .column(8) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(42, committed_sequences.value(0)); + assert_eq!(9, committed_sequences.value(1)); + + let flushed_sequences = batch + .column(9) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(41, flushed_sequences.value(0)); + assert!(flushed_sequences.is_null(1)); + + let compaction_time_windows = batch + .column(11) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("1h", compaction_time_windows.value(0)); + assert!(compaction_time_windows.is_null(1)); + + let node_ids = batch + .column(14) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(3, node_ids.value(0)); + assert!(node_ids.is_null(1)); + } + + #[test] + fn test_region_info_build_plan() { + let projection_input = Some(vec![0, 5, 7, 11].into()); + let request = ScanRequest { + projection_input, + filters: vec![binary_expr(col("writable"), Operator::Eq, lit(true))], + limit: Some(10), + ..Default::default() + }; + + let plan = RegionInfoEntry::build_plan(request).unwrap(); + let (scan, has_filter, has_limit) = extract_scan(&plan); + assert!(has_filter); + assert!(has_limit); + assert_eq!( + scan.table_name, + TableReference::bare(RegionInfoEntry::reserved_table_name_for_inspection()) + ); + assert_eq!(scan.projection, Some(vec![0, 5, 7, 11])); + + let fields = scan.projected_schema.fields(); + assert_eq!(fields.len(), 4); + assert_eq!(fields[0].name(), "region_id"); + assert_eq!(fields[1].name(), "state"); + assert_eq!(fields[2].name(), "writable"); + assert_eq!(fields[3].name(), "compaction_time_window"); + } + + fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) { + use datafusion_expr::logical_plan::Limit; + + match plan { + LogicalPlan::Filter(f) => { + let (scan, _, has_limit) = extract_scan(&f.input); + (scan, true, has_limit) + } + LogicalPlan::Limit(Limit { input, .. }) => { + let (scan, has_filter, _) = extract_scan(input); + (scan, has_filter, true) + } + LogicalPlan::TableScan(scan) => (scan, false, false), + other => panic!("unexpected plan: {other:?}"), + } + } +} diff --git a/tests/cases/standalone/common/information_schema/region_info.result b/tests/cases/standalone/common/information_schema/region_info.result new file mode 100644 index 0000000000..ed9a07add6 --- /dev/null +++ b/tests/cases/standalone/common/information_schema/region_info.result @@ -0,0 +1,63 @@ +DESC TABLE information_schema.region_info; + ++------------------------+---------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++------------------------+---------+-----+------+---------+---------------+ +| region_id | UInt64 | | NO | | FIELD | +| table_id | UInt32 | | NO | | FIELD | +| region_number | UInt32 | | NO | | FIELD | +| region_group | UInt8 | | NO | | FIELD | +| region_sequence | UInt32 | | NO | | FIELD | +| state | String | | NO | | FIELD | +| role | String | | NO | | FIELD | +| writable | Boolean | | NO | | FIELD | +| committed_sequence | UInt64 | | NO | | FIELD | +| flushed_sequence | UInt64 | | YES | | FIELD | +| manifest_version | UInt64 | | NO | | FIELD | +| compaction_time_window | String | | YES | | FIELD | +| region_options | String | | NO | | FIELD | +| sst_format | String | | NO | | FIELD | +| node_id | UInt64 | | YES | | FIELD | ++------------------------+---------+-----+------+---------+---------------+ + +CREATE TABLE region_info_case ( + a INT PRIMARY KEY, + ts TIMESTAMP TIME INDEX, +) +WITH ("sst_format" = "flat"); + +Affected Rows: 0 + +INSERT INTO region_info_case VALUES (1, 1), (2, 2); + +Affected Rows: 2 + +ADMIN FLUSH_TABLE('region_info_case'); + ++---------------------------------------+ +| ADMIN FLUSH_TABLE('region_info_case') | ++---------------------------------------+ +| 0 | ++---------------------------------------+ + +-- SQLNESS REPLACE (\s+\d+\s+) +-- SQLNESS REPLACE (\{".*"\}) +-- SQLNESS REPLACE (-{40,}) ---------------- +-- SQLNESS REPLACE (region_options\s+\|) region_options | +SELECT region_id, state, role, writable, committed_sequence, flushed_sequence, manifest_version, compaction_time_window, region_options, sst_format +FROM information_schema.region_info +WHERE region_id IN ( + SELECT region_id FROM information_schema.region_peers WHERE table_name = 'region_info_case' +) +ORDER BY region_id; + ++---------------+------------------+--------+----------+--------------------+------------------+------------------+------------------------+----------------+------------+ +| region_id | state | role | writable | committed_sequence | flushed_sequence | manifest_version | compaction_time_window | region_options | sst_format | ++---------------+------------------+--------+----------+--------------------+------------------+------------------+------------------------+----------------+------------+ +|| Leader(Writable) | Leader | true |||| | | flat | ++---------------+------------------+--------+----------+--------------------+------------------+------------------+------------------------+----------------+------------+ + +DROP TABLE region_info_case; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/information_schema/region_info.sql b/tests/cases/standalone/common/information_schema/region_info.sql new file mode 100644 index 0000000000..1aa682393f --- /dev/null +++ b/tests/cases/standalone/common/information_schema/region_info.sql @@ -0,0 +1,24 @@ +DESC TABLE information_schema.region_info; + +CREATE TABLE region_info_case ( + a INT PRIMARY KEY, + ts TIMESTAMP TIME INDEX, +) +WITH ("sst_format" = "flat"); + +INSERT INTO region_info_case VALUES (1, 1), (2, 2); + +ADMIN FLUSH_TABLE('region_info_case'); + +-- SQLNESS REPLACE (\s+\d+\s+) +-- SQLNESS REPLACE (\{".*"\}) +-- SQLNESS REPLACE (-{40,}) ---------------- +-- SQLNESS REPLACE (region_options\s+\|) region_options | +SELECT region_id, state, role, writable, committed_sequence, flushed_sequence, manifest_version, compaction_time_window, region_options, sst_format +FROM information_schema.region_info +WHERE region_id IN ( + SELECT region_id FROM information_schema.region_peers WHERE table_name = 'region_info_case' +) +ORDER BY region_id; + +DROP TABLE region_info_case; diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index d817227392..e816e989d9 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -49,6 +49,7 @@ SHOW TABLES; | process_list | | profiling | | referential_constraints | +| region_info | | region_peers | | region_statistics | | routines | @@ -99,6 +100,7 @@ SHOW FULL TABLES; | process_list | LOCAL TEMPORARY | | profiling | LOCAL TEMPORARY | | referential_constraints | LOCAL TEMPORARY | +| region_info | LOCAL TEMPORARY | | region_peers | LOCAL TEMPORARY | | region_statistics | LOCAL TEMPORARY | | routines | LOCAL TEMPORARY | @@ -143,6 +145,7 @@ SHOW TABLE STATUS; |process_list||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0||| |profiling||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0||| |referential_constraints||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0||| +|region_info||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0||| |region_peers||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0||| |region_statistics||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0||| |routines||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0||| diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 38f3ea52a4..6cff6e2ce9 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -36,6 +36,7 @@ order by table_schema, table_name; |greptime|information_schema|process_list|LOCALTEMPORARY|36|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y| |greptime|information_schema|profiling|LOCALTEMPORARY|19|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y| |greptime|information_schema|referential_constraints|LOCALTEMPORARY|20|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y| +|greptime|information_schema|region_info|LOCALTEMPORARY|41|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y| |greptime|information_schema|region_peers|LOCALTEMPORARY|29|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y| |greptime|information_schema|region_statistics|LOCALTEMPORARY|35|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y| |greptime|information_schema|routines|LOCALTEMPORARY|21|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y| @@ -316,6 +317,21 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | referential_constraints | unique_constraint_name | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | referential_constraints | unique_constraint_schema | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | referential_constraints | update_rule | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | region_info | committed_sequence | 9 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | +| greptime | information_schema | region_info | compaction_time_window | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | region_info | flushed_sequence | 10 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_info | manifest_version | 11 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | +| greptime | information_schema | region_info | node_id | 15 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_info | region_group | 4 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | | +| greptime | information_schema | region_info | region_id | 1 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | +| greptime | information_schema | region_info | region_number | 3 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | +| greptime | information_schema | region_info | region_options | 13 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | region_info | region_sequence | 5 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | +| greptime | information_schema | region_info | role | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | region_info | sst_format | 14 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | region_info | state | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | region_info | table_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | +| greptime | information_schema | region_info | writable | 8 | | | | | | | | | | select,insert | | Boolean | boolean | FIELD | | No | boolean | | | | greptime | information_schema | region_peers | down_seconds | 9 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | | | greptime | information_schema | region_peers | is_leader | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | region_peers | peer_addr | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result index 76b9838628..4674f83c98 100644 --- a/tests/cases/standalone/common/view/create.result +++ b/tests/cases/standalone/common/view/create.result @@ -116,6 +116,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; |greptime|information_schema|process_list|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y| |greptime|information_schema|profiling|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y| |greptime|information_schema|referential_constraints|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y| +|greptime|information_schema|region_info|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y| |greptime|information_schema|region_peers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y| |greptime|information_schema|region_statistics|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y| |greptime|information_schema|routines|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|