chore: expose region info inspection table (#8178)

* chore/region-sync-diff: add region info inspection core

- `store-api`: add `RegionInfoEntry` schema and plan builder in `src/store-api/src/region_info.rs` and export it from `src/store-api/src/lib.rs`
- `mito2`: collect region runtime metadata with `MitoEngine::all_region_infos` and `RegionRoleState::as_str` in `src/mito2/src/engine.rs`, `src/mito2/src/region.rs`, `src/mito2/src/engine/basic_test.rs`, `src/mito2/Cargo.toml`, and `Cargo.lock`
- `datanode`: expose the reserved `InspectRegionInfo` provider in `src/datanode/src/region_server/catalog.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore/region-sync-diff: expose region info schema table

- `information_schema.region_info`: add frontend table wiring in `src/catalog/src/system_schema/information_schema.rs`, `src/catalog/src/system_schema/information_schema/region_info.rs`, `src/catalog/src/system_schema/information_schema/table_names.rs`, and `src/common/catalog/src/consts.rs`
- `region_group` removal: drop `region_group` from `src/store-api/src/region_info.rs`, `src/mito2/src/region.rs`, and `src/mito2/src/engine/basic_test.rs`
- `SQLness coverage`: add standalone coverage in `tests/cases/standalone/common/information_schema/region_info.sql` and `tests/cases/standalone/common/information_schema/region_info.result`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore/region-sync-diff: restore region group info

- `region_info` schema: restore `region_group` alongside `region_sequence` in `src/store-api/src/region_info.rs`, `src/mito2/src/region.rs`, `src/mito2/src/engine/basic_test.rs`, and `tests/cases/standalone/common/information_schema/region_info.result`
- `MitoEngine::all_region_infos`: remove redundant iterator conversion in `src/mito2/src/engine.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: sqlness

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: sqlness

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore/region-sync-diff: clarify region sequence columns

- `region_info` schema: rename `sequence` to `committed_sequence` and add nullable `flushed_sequence` in `src/store-api/src/region_info.rs` and `src/mito2/src/region.rs`
- `region_info` coverage: update sequence assertions and expected metadata in `src/mito2/src/engine/basic_test.rs`, `tests/cases/standalone/common/information_schema/region_info.sql`, `tests/cases/standalone/common/information_schema/region_info.result`, and `tests/cases/standalone/common/system/information_schema.result`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore/region-sync-diff: report region options errors

- `region_info` output: preserve `region_options` serialization failures as JSON error objects in `src/mito2/src/region.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-05-28 17:52:37 +08:00
committed by GitHub
parent eccd97b5c7
commit 9a4e5e8457
17 changed files with 800 additions and 0 deletions

1
Cargo.lock generated
View File

@@ -8324,6 +8324,7 @@ dependencies = [
"either",
"futures",
"greptime-proto",
"humantime",
"humantime-serde",
"index",
"itertools 0.14.0",

View File

@@ -20,6 +20,7 @@ pub mod key_column_usage;
mod partitions;
mod procedure_info;
pub mod process_list;
mod region_info;
pub mod region_peers;
mod region_statistics;
pub mod schemata;
@@ -47,6 +48,8 @@ use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
use paste::paste;
use process_list::InformationSchemaProcessList;
use region_info::InformationSchemaRegionInfo;
use store_api::region_info::RegionInfoEntry;
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::{ScanRequest, TableId};
use table::TableRef;
@@ -242,6 +245,9 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
self.catalog_manager.clone(),
),
) as _),
REGION_INFO => Some(Arc::new(InformationSchemaRegionInfo::new(
self.catalog_manager.clone(),
)) as _),
PROCESS_LIST => self
.process_manager
.as_ref()
@@ -320,6 +326,10 @@ impl InformationSchemaProvider {
REGION_STATISTICS.to_string(),
self.build_table(REGION_STATISTICS).unwrap(),
);
tables.insert(
REGION_INFO.to_string(),
self.build_table(REGION_INFO).unwrap(),
);
tables.insert(
SSTS_MANIFEST.to_string(),
self.build_table(SSTS_MANIFEST).unwrap(),
@@ -447,6 +457,8 @@ pub enum DatanodeInspectKind {
SstStorage,
/// List index metadata collected from manifest
SstIndexMeta,
/// List region runtime and manifest info
RegionInfo,
}
impl DatanodeInspectRequest {
@@ -456,6 +468,7 @@ impl DatanodeInspectRequest {
DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
DatanodeInspectKind::SstIndexMeta => PuffinIndexMetaEntry::build_plan(self.scan),
DatanodeInspectKind::RegionInfo => RegionInfoEntry::build_plan(self.scan),
}
}
}
@@ -488,3 +501,28 @@ impl InformationExtension for NoopInformationExtension {
Ok(common_recordbatch::RecordBatches::empty().as_stream())
}
}
#[cfg(test)]
mod tests {
use store_api::region_info::RegionInfoEntry;
use super::*;
#[test]
fn test_datanode_inspect_region_info_build_plan() {
let plan = DatanodeInspectRequest {
kind: DatanodeInspectKind::RegionInfo,
scan: ScanRequest::default(),
}
.build_plan()
.unwrap();
let LogicalPlan::TableScan(scan) = plan else {
panic!("expected table scan");
};
assert_eq!(
scan.table_name.to_string(),
RegionInfoEntry::reserved_table_name_for_inspection()
);
}
}

View File

@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, Weak};
use common_catalog::consts::INFORMATION_SCHEMA_REGION_INFO_TABLE_ID;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use store_api::region_info::RegionInfoEntry;
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{ProjectSchemaSnafu, Result};
use crate::information_schema::{
DatanodeInspectKind, DatanodeInspectRequest, InformationTable, REGION_INFO,
};
use crate::system_schema::utils;
/// Information schema table for region info.
pub struct InformationSchemaRegionInfo {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
}
impl InformationSchemaRegionInfo {
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: RegionInfoEntry::schema(),
catalog_manager,
}
}
}
impl InformationTable for InformationSchemaRegionInfo {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_REGION_INFO_TABLE_ID
}
fn table_name(&self) -> &'static str {
REGION_INFO
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = if let Some(p) = request.projection_indices() {
Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
} else {
self.schema.clone()
};
let info_ext = utils::information_extension(&self.catalog_manager)?;
let req = DatanodeInspectRequest {
kind: DatanodeInspectKind::RegionInfo,
scan: request,
};
let future = async move {
info_ext
.inspect_datanode(req)
.await
.map_err(BoxedError::new)
.context(common_recordbatch::error::ExternalSnafu)
};
Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
schema,
Box::pin(future),
)))
}
}

View File

@@ -45,6 +45,7 @@ pub const CLUSTER_INFO: &str = "cluster_info";
pub const VIEWS: &str = "views";
pub const FLOWS: &str = "flows";
pub const PROCEDURE_INFO: &str = "procedure_info";
pub const REGION_INFO: &str = "region_info";
pub const REGION_STATISTICS: &str = "region_statistics";
pub const PROCESS_LIST: &str = "process_list";
pub const SSTS_MANIFEST: &str = "ssts_manifest";

View File

@@ -112,6 +112,8 @@ pub const INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID: u32 = 38;
pub const INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID: u32 = 39;
/// id for information_schema.alerts
pub const INFORMATION_SCHEMA_ALERTS_TABLE_ID: u32 = 40;
/// id for information_schema.region_info
pub const INFORMATION_SCHEMA_REGION_INFO_TABLE_ID: u32 = 41;
// ----- End of information_schema tables -----

View File

@@ -27,6 +27,7 @@ use datafusion_expr::{LogicalPlan, TableSource};
use futures::TryStreamExt;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use store_api::region_info::RegionInfoEntry;
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::RegionId;
@@ -41,6 +42,7 @@ enum InternalTableKind {
InspectSstManifest,
InspectSstStorage,
InspectSstIndexMeta,
InspectRegionInfo,
}
impl InternalTableKind {
@@ -55,6 +57,9 @@ impl InternalTableKind {
if name.eq_ignore_ascii_case(PuffinIndexMetaEntry::reserved_table_name_for_inspection()) {
return Some(Self::InspectSstIndexMeta);
}
if name.eq_ignore_ascii_case(RegionInfoEntry::reserved_table_name_for_inspection()) {
return Some(Self::InspectRegionInfo);
}
None
}
@@ -64,6 +69,7 @@ impl InternalTableKind {
Self::InspectSstManifest => server.inspect_sst_manifest_provider().await,
Self::InspectSstStorage => server.inspect_sst_storage_provider().await,
Self::InspectSstIndexMeta => server.inspect_sst_index_meta_provider().await,
Self::InspectRegionInfo => server.inspect_region_info_provider().await,
}
}
}
@@ -128,6 +134,25 @@ impl RegionServer {
let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
Ok(Arc::new(table))
}
/// Expose region info across the engine as an in-memory table.
pub async fn inspect_region_info_provider(&self) -> Result<Arc<dyn TableProvider>> {
let mito = {
let guard = self.inner.mito_engine.read().unwrap();
guard.as_ref().cloned().context(UnexpectedSnafu {
violated: "mito engine not available",
})?
};
let entries = mito.all_region_infos().await;
let schema = RegionInfoEntry::schema().arrow_schema().clone();
let batch = RegionInfoEntry::to_record_batch(&entries)
.map_err(DataFusionError::from)
.context(DataFusionSnafu)?;
let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
Ok(Arc::new(table))
}
}
/// A catalog list that resolves `TableProvider` by table name:
@@ -347,6 +372,7 @@ mod tests {
use datatypes::arrow::array::Int32Array;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use datatypes::arrow::record_batch::RecordBatch;
use store_api::region_info::RegionInfoEntry;
use super::*; // bring rewrite() into scope
@@ -409,6 +435,18 @@ mod tests {
b3.reserved_table_needed,
vec![InternalTableKind::InspectSstManifest]
);
let region_info = RegionInfoEntry::reserved_table_name_for_inspection();
let plan4 = table_scan(Some(region_info), &schema, None)
.unwrap()
.build()
.unwrap();
let b4 = NameAwareDataSourceInjectorBuilder::from_plan(&plan4).unwrap();
assert!(!b4.need_region_provider);
assert_eq!(
b4.reserved_table_needed,
vec![InternalTableKind::InspectRegionInfo]
);
}
#[test]
@@ -445,6 +483,39 @@ mod tests {
}
}
#[test]
fn test_rewriter_replaces_with_region_info_reserved_source() {
let schema = test_schema();
let table_name = RegionInfoEntry::reserved_table_name_for_inspection();
let plan = table_scan(Some(table_name), &schema, None)
.unwrap()
.build()
.unwrap();
let provider = empty_mem_table();
let source = provider_as_source(provider);
let mut injector = NameAwareDataSourceInjector {
reserved_sources: {
let mut m = HashMap::new();
m.insert(InternalTableKind::InspectRegionInfo, source.clone());
m
},
region_source: None,
};
let transformed = plan.rewrite(&mut injector).unwrap();
let new_plan = transformed.data;
if let LogicalPlan::TableScan(scan) = new_plan {
let src_ptr = Arc::as_ptr(&scan.source);
let want_ptr = Arc::as_ptr(&source);
assert!(std::ptr::eq(src_ptr, want_ptr));
} else {
panic!("expected TableScan after rewrite");
}
}
#[test]
fn test_rewriter_replaces_with_region_source_for_normal() {
let schema = test_schema();

View File

@@ -53,6 +53,7 @@ dashmap.workspace = true
dotenv.workspace = true
either.workspace = true
futures.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
index.workspace = true
itertools.workspace = true

View File

@@ -117,6 +117,7 @@ use store_api::region_engine::{
RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
SyncRegionFromRequest, SyncRegionFromResponse,
};
use store_api::region_info::RegionInfoEntry;
use store_api::region_request::{
AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
};
@@ -653,6 +654,16 @@ impl MitoEngine {
results
}
/// Lists region info entries of all regions in the engine.
pub async fn all_region_infos(&self) -> Vec<RegionInfoEntry> {
let node_id = self.inner.workers.file_ref_manager().node_id();
self.inner
.workers
.all_regions()
.map(|region| region.region_info_entry(node_id))
.collect()
}
/// Lists all SSTs from the storage layer of all regions in the engine.
pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
let node_id = self.inner.workers.file_ref_manager().node_id();

View File

@@ -978,6 +978,58 @@ async fn test_list_ssts_with_format(
assert_eq!(debug_format, expected_storage_ssts, "{}", debug_format);
}
#[tokio::test]
async fn test_all_region_infos() {
let mut env = TestEnv::with_prefix("all-region-infos").await;
let engine = env
.create_engine(MitoConfig {
default_flat_format: true,
..Default::default()
})
.await;
let region_id = RegionId::new(1024, 7);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows_for_key("region-info", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
let entries = engine.all_region_infos().await;
let entry = entries
.iter()
.find(|entry| entry.region_id == region_id)
.expect("region info entry should exist");
assert_eq!(region_id.as_u64(), entry.region_id.as_u64());
assert_eq!(region_id.table_id(), entry.table_id);
assert_eq!(region_id.region_number(), entry.region_number);
assert_eq!(region_id.region_group(), entry.region_group);
assert_eq!(region_id.region_sequence(), entry.region_sequence);
assert!(!entry.state.is_empty());
assert_eq!("Leader", entry.role);
assert!(entry.writable);
assert_eq!(3, entry.committed_sequence);
assert_eq!(Some(3), entry.flushed_sequence);
assert!(entry.manifest_version > 0);
assert!(serde_json::from_str::<serde_json::Value>(&entry.region_options).is_ok());
assert_eq!("flat", entry.sst_format);
}
#[tokio::test]
async fn test_all_index_metas_list_all_types() {
test_all_index_metas_list_all_types_with_format(false, r#"

View File

@@ -37,6 +37,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
};
use store_api::region_info::RegionInfoEntry;
use store_api::region_request::{PathType, StagingPartitionDirective};
use store_api::sst_entry::ManifestSstEntry;
use store_api::storage::{FileId, RegionId, SequenceNumber};
@@ -111,6 +112,22 @@ impl RegionRoleState {
RegionRoleState::Follower => None,
}
}
pub(crate) fn as_str(&self) -> &'static str {
match self {
RegionRoleState::Follower => "Follower",
RegionRoleState::Leader(RegionLeaderState::Writable) => "Leader(Writable)",
RegionRoleState::Leader(RegionLeaderState::Staging) => "Leader(Staging)",
RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
"Leader(EnteringStaging)"
}
RegionRoleState::Leader(RegionLeaderState::Altering) => "Leader(Altering)",
RegionRoleState::Leader(RegionLeaderState::Dropping) => "Leader(Dropping)",
RegionRoleState::Leader(RegionLeaderState::Truncating) => "Leader(Truncating)",
RegionRoleState::Leader(RegionLeaderState::Editing) => "Leader(Editing)",
RegionRoleState::Leader(RegionLeaderState::Downgrading) => "Leader(Downgrading)",
}
}
}
/// Metadata and runtime status of a region.
@@ -648,6 +665,41 @@ impl MitoRegion {
self.access_layer.clone()
}
/// Returns the region info entry of the region.
pub(crate) fn region_info_entry(&self, node_id: Option<u64>) -> RegionInfoEntry {
let region_id = self.region_id;
let version = self.version();
let state = self.state();
let role = self.region_role();
let region_options = serde_json::to_string(&version.options)
.unwrap_or_else(|err| serde_json::json!({ "error": err.to_string() }).to_string());
let sst_format = match version.options.sst_format.unwrap_or_default() {
crate::sst::FormatType::PrimaryKey => "primary_key",
crate::sst::FormatType::Flat => "flat",
}
.to_string();
RegionInfoEntry {
region_id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
region_group: region_id.region_group(),
region_sequence: region_id.region_sequence(),
state: state.as_str().to_string(),
role: role.to_string(),
writable: self.is_writable(),
committed_sequence: self.find_committed_sequence(),
flushed_sequence: Some(self.flushed_sequence()).filter(|sequence| *sequence > 0),
manifest_version: self.stats.manifest_version(),
compaction_time_window: version
.compaction_time_window
.map(|duration| humantime::format_duration(duration).to_string()),
region_options,
sst_format,
node_id,
}
}
/// Returns the SST entries of the region.
pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
let table_dir = self.table_dir();
@@ -1623,6 +1675,23 @@ mod tests {
assert!(AtomicCell::<RegionRoleState>::is_lock_free());
}
#[test]
fn test_region_role_state_as_str() {
assert_eq!("Follower", RegionRoleState::Follower.as_str());
assert_eq!(
"Leader(Writable)",
RegionRoleState::Leader(RegionLeaderState::Writable).as_str()
);
assert_eq!(
"Leader(Staging)",
RegionRoleState::Leader(RegionLeaderState::Staging).as_str()
);
assert_eq!(
"Leader(Downgrading)",
RegionRoleState::Leader(RegionLeaderState::Downgrading).as_str()
);
}
async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());

View File

@@ -23,6 +23,7 @@ mod metrics;
pub mod mito_engine_options;
pub mod path_utils;
pub mod region_engine;
pub mod region_info;
pub mod region_request;
pub mod sst_entry;
pub mod storage;

View File

@@ -0,0 +1,360 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_recordbatch::DfRecordBatch;
use datafusion_common::DataFusionError;
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
use datatypes::arrow::array::{
ArrayRef, BooleanArray, StringArray, UInt8Array, UInt32Array, UInt64Array,
};
use datatypes::arrow::error::ArrowError;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use serde::{Deserialize, Serialize};
use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
/// Runtime and manifest information of a region for inspection.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RegionInfoEntry {
/// The region id.
pub region_id: RegionId,
/// The table id this region belongs to.
pub table_id: TableId,
/// The region number inside the table.
pub region_number: RegionNumber,
/// The region group.
pub region_group: RegionGroup,
/// The region sequence inside the group.
pub region_sequence: RegionSeq,
/// The full runtime role/state label.
pub state: String,
/// The coarse region role.
pub role: String,
/// Whether the region accepts writes.
pub writable: bool,
/// The committed sequence of the region.
pub committed_sequence: u64,
/// The latest sequence that has been persisted into SSTs.
pub flushed_sequence: Option<u64>,
/// The manifest version of the region.
pub manifest_version: u64,
/// Human-readable compaction time window.
pub compaction_time_window: Option<String>,
/// Region options encoded as JSON.
pub region_options: String,
/// SST format used by the region.
pub sst_format: String,
/// Datanode id that reports the row.
pub node_id: Option<u64>,
}
impl RegionInfoEntry {
/// Returns the schema of the region info entry.
pub fn schema() -> SchemaRef {
use datatypes::prelude::ConcreteDataType as Ty;
Arc::new(Schema::new(vec![
ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
ColumnSchema::new("state", Ty::string_datatype(), false),
ColumnSchema::new("role", Ty::string_datatype(), false),
ColumnSchema::new("writable", Ty::boolean_datatype(), false),
ColumnSchema::new("committed_sequence", Ty::uint64_datatype(), false),
ColumnSchema::new("flushed_sequence", Ty::uint64_datatype(), true),
ColumnSchema::new("manifest_version", Ty::uint64_datatype(), false),
ColumnSchema::new("compaction_time_window", Ty::string_datatype(), true),
ColumnSchema::new("region_options", Ty::string_datatype(), false),
ColumnSchema::new("sst_format", Ty::string_datatype(), false),
ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
]))
}
/// Converts a list of region info entries to a record batch.
pub fn to_record_batch(entries: &[Self]) -> Result<DfRecordBatch, ArrowError> {
let schema = Self::schema();
let region_ids = entries.iter().map(|e| e.region_id.as_u64());
let table_ids = entries.iter().map(|e| e.table_id);
let region_numbers = entries.iter().map(|e| e.region_number);
let region_groups = entries.iter().map(|e| e.region_group);
let region_sequences = entries.iter().map(|e| e.region_sequence);
let states = entries.iter().map(|e| e.state.as_str());
let roles = entries.iter().map(|e| e.role.as_str());
let writable = entries.iter().map(|e| e.writable);
let committed_sequences = entries.iter().map(|e| e.committed_sequence);
let flushed_sequences = entries.iter().map(|e| e.flushed_sequence);
let manifest_versions = entries.iter().map(|e| e.manifest_version);
let compaction_time_windows = entries.iter().map(|e| e.compaction_time_window.as_ref());
let region_options = entries.iter().map(|e| e.region_options.as_str());
let sst_formats = entries.iter().map(|e| e.sst_format.as_str());
let node_ids = entries.iter().map(|e| e.node_id);
let columns: Vec<ArrayRef> = vec![
Arc::new(UInt64Array::from_iter_values(region_ids)),
Arc::new(UInt32Array::from_iter_values(table_ids)),
Arc::new(UInt32Array::from_iter_values(region_numbers)),
Arc::new(UInt8Array::from_iter_values(region_groups)),
Arc::new(UInt32Array::from_iter_values(region_sequences)),
Arc::new(StringArray::from_iter_values(states)),
Arc::new(StringArray::from_iter_values(roles)),
Arc::new(BooleanArray::from_iter(writable)),
Arc::new(UInt64Array::from_iter_values(committed_sequences)),
Arc::new(UInt64Array::from_iter(flushed_sequences)),
Arc::new(UInt64Array::from_iter_values(manifest_versions)),
Arc::new(StringArray::from_iter(compaction_time_windows)),
Arc::new(StringArray::from_iter_values(region_options)),
Arc::new(StringArray::from_iter_values(sst_formats)),
Arc::new(UInt64Array::from_iter(node_ids)),
];
DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
}
/// Reserved internal inspect table name for region info.
pub fn reserved_table_name_for_inspection() -> &'static str {
"__inspect/__mito/__region_info"
}
/// Builds a logical plan for scanning region info entries.
pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
let table_source = LogicalTableSource::new(Self::schema().arrow_schema().clone());
let projection = scan_request.projection_input.map(|input| input.projection);
let mut builder = LogicalPlanBuilder::scan(
Self::reserved_table_name_for_inspection(),
Arc::new(table_source),
projection,
)?;
for filter in scan_request.filters {
builder = builder.filter(filter)?;
}
if let Some(limit) = scan_request.limit {
builder = builder.limit(0, Some(limit))?;
}
builder.build()
}
}
#[cfg(test)]
mod tests {
use datafusion_common::TableReference;
use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
use datatypes::arrow::array::{
Array, BooleanArray, StringArray, UInt8Array, UInt32Array, UInt64Array,
};
use super::*;
use crate::storage::{RegionId, ScanRequest};
#[test]
fn test_region_info_schema() {
let schema = RegionInfoEntry::schema();
let columns = schema.column_schemas();
let names = columns.iter().map(|c| c.name.as_str()).collect::<Vec<_>>();
assert_eq!(
names,
vec![
"region_id",
"table_id",
"region_number",
"region_group",
"region_sequence",
"state",
"role",
"writable",
"committed_sequence",
"flushed_sequence",
"manifest_version",
"compaction_time_window",
"region_options",
"sst_format",
"node_id",
]
);
assert!(!columns[0].is_nullable());
assert!(!columns[8].is_nullable());
assert!(columns[9].is_nullable());
assert!(columns[11].is_nullable());
assert!(columns[14].is_nullable());
}
#[test]
fn test_region_info_to_record_batch() {
let region_id1 = RegionId::with_group_and_seq(10, 1, 20);
let region_id2 = RegionId::with_group_and_seq(11, 0, 21);
let entries = vec![
RegionInfoEntry {
region_id: region_id1,
table_id: region_id1.table_id(),
region_number: region_id1.region_number(),
region_group: region_id1.region_group(),
region_sequence: region_id1.region_sequence(),
state: "Leader(Writable)".to_string(),
role: "Leader".to_string(),
writable: true,
committed_sequence: 42,
flushed_sequence: Some(41),
manifest_version: 7,
compaction_time_window: Some("1h".to_string()),
region_options: "{\"sst_format\":\"flat\"}".to_string(),
sst_format: "flat".to_string(),
node_id: Some(3),
},
RegionInfoEntry {
region_id: region_id2,
table_id: region_id2.table_id(),
region_number: region_id2.region_number(),
region_group: region_id2.region_group(),
region_sequence: region_id2.region_sequence(),
state: "Follower".to_string(),
role: "Follower".to_string(),
writable: false,
committed_sequence: 9,
flushed_sequence: None,
manifest_version: 2,
compaction_time_window: None,
region_options: "{}".to_string(),
sst_format: "primary_key".to_string(),
node_id: None,
},
];
let batch = RegionInfoEntry::to_record_batch(&entries).unwrap();
assert_eq!(batch.num_rows(), 2);
let region_ids = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(region_id1.as_u64(), region_ids.value(0));
assert_eq!(region_id2.as_u64(), region_ids.value(1));
let table_ids = batch
.column(1)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap();
assert_eq!(10, table_ids.value(0));
assert_eq!(11, table_ids.value(1));
let region_groups = batch
.column(3)
.as_any()
.downcast_ref::<UInt8Array>()
.unwrap();
assert_eq!(1, region_groups.value(0));
assert_eq!(0, region_groups.value(1));
let states = batch
.column(5)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("Leader(Writable)", states.value(0));
assert_eq!("Follower", states.value(1));
let writable = batch
.column(7)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
assert!(writable.value(0));
assert!(!writable.value(1));
let committed_sequences = batch
.column(8)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(42, committed_sequences.value(0));
assert_eq!(9, committed_sequences.value(1));
let flushed_sequences = batch
.column(9)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(41, flushed_sequences.value(0));
assert!(flushed_sequences.is_null(1));
let compaction_time_windows = batch
.column(11)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("1h", compaction_time_windows.value(0));
assert!(compaction_time_windows.is_null(1));
let node_ids = batch
.column(14)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(3, node_ids.value(0));
assert!(node_ids.is_null(1));
}
#[test]
fn test_region_info_build_plan() {
let projection_input = Some(vec![0, 5, 7, 11].into());
let request = ScanRequest {
projection_input,
filters: vec![binary_expr(col("writable"), Operator::Eq, lit(true))],
limit: Some(10),
..Default::default()
};
let plan = RegionInfoEntry::build_plan(request).unwrap();
let (scan, has_filter, has_limit) = extract_scan(&plan);
assert!(has_filter);
assert!(has_limit);
assert_eq!(
scan.table_name,
TableReference::bare(RegionInfoEntry::reserved_table_name_for_inspection())
);
assert_eq!(scan.projection, Some(vec![0, 5, 7, 11]));
let fields = scan.projected_schema.fields();
assert_eq!(fields.len(), 4);
assert_eq!(fields[0].name(), "region_id");
assert_eq!(fields[1].name(), "state");
assert_eq!(fields[2].name(), "writable");
assert_eq!(fields[3].name(), "compaction_time_window");
}
fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
use datafusion_expr::logical_plan::Limit;
match plan {
LogicalPlan::Filter(f) => {
let (scan, _, has_limit) = extract_scan(&f.input);
(scan, true, has_limit)
}
LogicalPlan::Limit(Limit { input, .. }) => {
let (scan, has_filter, _) = extract_scan(input);
(scan, has_filter, true)
}
LogicalPlan::TableScan(scan) => (scan, false, false),
other => panic!("unexpected plan: {other:?}"),
}
}
}

View File

@@ -0,0 +1,63 @@
DESC TABLE information_schema.region_info;
+------------------------+---------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+------------------------+---------+-----+------+---------+---------------+
| region_id | UInt64 | | NO | | FIELD |
| table_id | UInt32 | | NO | | FIELD |
| region_number | UInt32 | | NO | | FIELD |
| region_group | UInt8 | | NO | | FIELD |
| region_sequence | UInt32 | | NO | | FIELD |
| state | String | | NO | | FIELD |
| role | String | | NO | | FIELD |
| writable | Boolean | | NO | | FIELD |
| committed_sequence | UInt64 | | NO | | FIELD |
| flushed_sequence | UInt64 | | YES | | FIELD |
| manifest_version | UInt64 | | NO | | FIELD |
| compaction_time_window | String | | YES | | FIELD |
| region_options | String | | NO | | FIELD |
| sst_format | String | | NO | | FIELD |
| node_id | UInt64 | | YES | | FIELD |
+------------------------+---------+-----+------+---------+---------------+
CREATE TABLE region_info_case (
a INT PRIMARY KEY,
ts TIMESTAMP TIME INDEX,
)
WITH ("sst_format" = "flat");
Affected Rows: 0
INSERT INTO region_info_case VALUES (1, 1), (2, 2);
Affected Rows: 2
ADMIN FLUSH_TABLE('region_info_case');
+---------------------------------------+
| ADMIN FLUSH_TABLE('region_info_case') |
+---------------------------------------+
| 0 |
+---------------------------------------+
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
-- SQLNESS REPLACE (\{".*"\}) <JSON>
-- SQLNESS REPLACE (-{40,}) ----------------
-- SQLNESS REPLACE (region_options\s+\|) region_options |
SELECT region_id, state, role, writable, committed_sequence, flushed_sequence, manifest_version, compaction_time_window, region_options, sst_format
FROM information_schema.region_info
WHERE region_id IN (
SELECT region_id FROM information_schema.region_peers WHERE table_name = 'region_info_case'
)
ORDER BY region_id;
+---------------+------------------+--------+----------+--------------------+------------------+------------------+------------------------+----------------+------------+
| region_id | state | role | writable | committed_sequence | flushed_sequence | manifest_version | compaction_time_window | region_options | sst_format |
+---------------+------------------+--------+----------+--------------------+------------------+------------------+------------------------+----------------+------------+
|<NUM>| Leader(Writable) | Leader | true |<NUM>|<NUM>|<NUM>| | <JSON> | flat |
+---------------+------------------+--------+----------+--------------------+------------------+------------------+------------------------+----------------+------------+
DROP TABLE region_info_case;
Affected Rows: 0

View File

@@ -0,0 +1,24 @@
DESC TABLE information_schema.region_info;
CREATE TABLE region_info_case (
a INT PRIMARY KEY,
ts TIMESTAMP TIME INDEX,
)
WITH ("sst_format" = "flat");
INSERT INTO region_info_case VALUES (1, 1), (2, 2);
ADMIN FLUSH_TABLE('region_info_case');
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
-- SQLNESS REPLACE (\{".*"\}) <JSON>
-- SQLNESS REPLACE (-{40,}) ----------------
-- SQLNESS REPLACE (region_options\s+\|) region_options |
SELECT region_id, state, role, writable, committed_sequence, flushed_sequence, manifest_version, compaction_time_window, region_options, sst_format
FROM information_schema.region_info
WHERE region_id IN (
SELECT region_id FROM information_schema.region_peers WHERE table_name = 'region_info_case'
)
ORDER BY region_id;
DROP TABLE region_info_case;

View File

@@ -49,6 +49,7 @@ SHOW TABLES;
| process_list |
| profiling |
| referential_constraints |
| region_info |
| region_peers |
| region_statistics |
| routines |
@@ -99,6 +100,7 @@ SHOW FULL TABLES;
| process_list | LOCAL TEMPORARY |
| profiling | LOCAL TEMPORARY |
| referential_constraints | LOCAL TEMPORARY |
| region_info | LOCAL TEMPORARY |
| region_peers | LOCAL TEMPORARY |
| region_statistics | LOCAL TEMPORARY |
| routines | LOCAL TEMPORARY |
@@ -143,6 +145,7 @@ SHOW TABLE STATUS;
|process_list||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|profiling||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|referential_constraints||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|region_info||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|region_peers||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|region_statistics||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|routines||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||

View File

@@ -36,6 +36,7 @@ order by table_schema, table_name;
|greptime|information_schema|process_list|LOCALTEMPORARY|36|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|greptime|information_schema|profiling|LOCALTEMPORARY|19|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|greptime|information_schema|referential_constraints|LOCALTEMPORARY|20|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|greptime|information_schema|region_info|LOCALTEMPORARY|41|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|greptime|information_schema|region_peers|LOCALTEMPORARY|29|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|greptime|information_schema|region_statistics|LOCALTEMPORARY|35|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|greptime|information_schema|routines|LOCALTEMPORARY|21|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
@@ -316,6 +317,21 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | information_schema | referential_constraints | unique_constraint_name | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | referential_constraints | unique_constraint_schema | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | referential_constraints | update_rule | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | region_info | committed_sequence | 9 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
| greptime | information_schema | region_info | compaction_time_window | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | region_info | flushed_sequence | 10 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_info | manifest_version | 11 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
| greptime | information_schema | region_info | node_id | 15 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_info | region_group | 4 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | |
| greptime | information_schema | region_info | region_id | 1 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
| greptime | information_schema | region_info | region_number | 3 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | information_schema | region_info | region_options | 13 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | region_info | region_sequence | 5 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | information_schema | region_info | role | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | region_info | sst_format | 14 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | region_info | state | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | region_info | table_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | information_schema | region_info | writable | 8 | | | | | | | | | | select,insert | | Boolean | boolean | FIELD | | No | boolean | | |
| greptime | information_schema | region_peers | down_seconds | 9 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | |
| greptime | information_schema | region_peers | is_leader | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | region_peers | peer_addr | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |

View File

@@ -116,6 +116,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE;
|greptime|information_schema|process_list|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|greptime|information_schema|profiling|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|greptime|information_schema|referential_constraints|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|greptime|information_schema|region_info|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|greptime|information_schema|region_peers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|greptime|information_schema|region_statistics|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|greptime|information_schema|routines|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|