feat: impl partitions and region_peers information schema (#3278)

* feat: impl partitions table

* fix: typo

* feat: impl region_peers information schema

* chore: rename region_peers to greptime_region_peers

* chore: rename statuses to upper case

* fix: comments

* chore: update partition result

* chore: remove redundant checking

* refactor: replace 42 with constant

* feat: fetch region routes in batch
This commit is contained in:
dennis zhuang
2024-02-19 14:47:14 +08:00
committed by GitHub
parent 1851c20c13
commit 8b73067815
27 changed files with 1004 additions and 90 deletions

1
Cargo.lock generated
View File

@@ -6324,6 +6324,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datatypes",
"itertools 0.10.5",
"lazy_static",
"meta-client",
"moka",

View File

@@ -164,6 +164,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to find table partitions: #{table}"))]
FindPartitions {
source: partition::error::Error,
table: String,
},
#[snafu(display("Failed to find region routes"))]
FindRegionRoutes { source: partition::error::Error },
#[snafu(display("Failed to read system catalog table records"))]
ReadSystemCatalog {
location: Location,
@@ -254,11 +263,14 @@ impl ErrorExt for Error {
match self {
Error::InvalidKey { .. }
| Error::SchemaNotFound { .. }
| Error::TableNotFound { .. }
| Error::CatalogNotFound { .. }
| Error::FindPartitions { .. }
| Error::FindRegionRoutes { .. }
| Error::InvalidEntryType { .. }
| Error::ParallelOpenTable { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::SystemCatalog { .. }
| Error::EmptyValue { .. }
| Error::ValueDeserialize { .. } => StatusCode::StorageUnavailable,

View File

@@ -15,7 +15,9 @@
mod columns;
mod key_column_usage;
mod memory_table;
mod partitions;
mod predicate;
mod region_peers;
mod runtime_metrics;
mod schemata;
mod table_names;
@@ -47,6 +49,8 @@ use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
use crate::information_schema::memory_table::{get_schema_columns, MemoryTable};
use crate::information_schema::partitions::InformationSchemaPartitions;
use crate::information_schema::region_peers::InformationSchemaRegionPeers;
use crate::information_schema::runtime_metrics::InformationSchemaMetrics;
use crate::information_schema::schemata::InformationSchemaSchemata;
use crate::information_schema::tables::InformationSchemaTables;
@@ -74,6 +78,7 @@ lazy_static! {
TRIGGERS,
GLOBAL_STATUS,
SESSION_STATUS,
PARTITIONS,
];
}
@@ -156,6 +161,10 @@ impl InformationSchemaProvider {
BUILD_INFO.to_string(),
self.build_table(BUILD_INFO).unwrap(),
);
tables.insert(
REGION_PEERS.to_string(),
self.build_table(REGION_PEERS).unwrap(),
);
}
tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
@@ -226,6 +235,14 @@ impl InformationSchemaProvider {
self.catalog_manager.clone(),
)) as _),
RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
_ => None,
}
}

View File

@@ -58,6 +58,7 @@ const COLUMN_DEFAULT: &str = "column_default";
const IS_NULLABLE: &str = "is_nullable";
const COLUMN_TYPE: &str = "column_type";
const COLUMN_COMMENT: &str = "column_comment";
const INIT_CAPACITY: usize = 42;
impl InformationSchemaColumns {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
@@ -154,16 +155,16 @@ impl InformationSchemaColumnsBuilder {
schema,
catalog_name,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
table_names: StringVectorBuilder::with_capacity(42),
column_names: StringVectorBuilder::with_capacity(42),
data_types: StringVectorBuilder::with_capacity(42),
semantic_types: StringVectorBuilder::with_capacity(42),
column_defaults: StringVectorBuilder::with_capacity(42),
is_nullables: StringVectorBuilder::with_capacity(42),
column_types: StringVectorBuilder::with_capacity(42),
column_comments: StringVectorBuilder::with_capacity(42),
catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
column_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
data_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
semantic_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
column_defaults: StringVectorBuilder::with_capacity(INIT_CAPACITY),
is_nullables: StringVectorBuilder::with_capacity(INIT_CAPACITY),
column_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
column_comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
@@ -177,13 +178,6 @@ impl InformationSchemaColumnsBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exists(&catalog_name, &schema_name)
.await?
{
continue;
}
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
while let Some(table) = stream.try_next().await? {

View File

@@ -23,10 +23,10 @@ use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use datatypes::vectors::{ConstantVector, StringVector, StringVectorBuilder, UInt32VectorBuilder};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
@@ -44,6 +44,7 @@ const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const COLUMN_NAME: &str = "column_name";
const ORDINAL_POSITION: &str = "ordinal_position";
const INIT_CAPACITY: usize = 42;
/// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`.
pub(super) struct InformationSchemaKeyColumnUsage {
@@ -162,9 +163,6 @@ struct InformationSchemaKeyColumnUsageBuilder {
column_name: StringVectorBuilder,
ordinal_position: UInt32VectorBuilder,
position_in_unique_constraint: UInt32VectorBuilder,
referenced_table_schema: StringVectorBuilder,
referenced_table_name: StringVectorBuilder,
referenced_column_name: StringVectorBuilder,
}
impl InformationSchemaKeyColumnUsageBuilder {
@@ -177,18 +175,15 @@ impl InformationSchemaKeyColumnUsageBuilder {
schema,
catalog_name,
catalog_manager,
constraint_catalog: StringVectorBuilder::with_capacity(42),
constraint_schema: StringVectorBuilder::with_capacity(42),
constraint_name: StringVectorBuilder::with_capacity(42),
table_catalog: StringVectorBuilder::with_capacity(42),
table_schema: StringVectorBuilder::with_capacity(42),
table_name: StringVectorBuilder::with_capacity(42),
column_name: StringVectorBuilder::with_capacity(42),
ordinal_position: UInt32VectorBuilder::with_capacity(42),
position_in_unique_constraint: UInt32VectorBuilder::with_capacity(42),
referenced_table_schema: StringVectorBuilder::with_capacity(42),
referenced_table_name: StringVectorBuilder::with_capacity(42),
referenced_column_name: StringVectorBuilder::with_capacity(42),
constraint_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY),
constraint_schema: StringVectorBuilder::with_capacity(INIT_CAPACITY),
constraint_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_schema: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
column_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
ordinal_position: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
position_in_unique_constraint: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
}
}
@@ -301,12 +296,15 @@ impl InformationSchemaKeyColumnUsageBuilder {
self.column_name.push(Some(column_name));
self.ordinal_position.push(Some(ordinal_position));
self.position_in_unique_constraint.push(None);
self.referenced_table_schema.push(None);
self.referenced_table_name.push(None);
self.referenced_column_name.push(None);
}
fn finish(&mut self) -> Result<RecordBatch> {
let rows_num = self.table_catalog.len();
let null_string_vector = Arc::new(ConstantVector::new(
Arc::new(StringVector::from(vec![None as Option<&str>])),
rows_num,
));
let columns: Vec<VectorRef> = vec![
Arc::new(self.constraint_catalog.finish()),
Arc::new(self.constraint_schema.finish()),
@@ -317,9 +315,9 @@ impl InformationSchemaKeyColumnUsageBuilder {
Arc::new(self.column_name.finish()),
Arc::new(self.ordinal_position.finish()),
Arc::new(self.position_in_unique_constraint.finish()),
Arc::new(self.referenced_table_schema.finish()),
Arc::new(self.referenced_table_name.finish()),
Arc::new(self.referenced_column_name.finish()),
null_string_vector.clone(),
null_string_vector.clone(),
null_string_vector,
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}

View File

@@ -0,0 +1,399 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::datetime::DateTime;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{
ConstantVector, DateTimeVector, DateTimeVectorBuilder, Int64Vector, Int64VectorBuilder,
MutableVector, StringVector, StringVectorBuilder, UInt64VectorBuilder,
};
use futures::TryStreamExt;
use partition::manager::PartitionInfo;
use partition::partition::PartitionDef;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};
use super::PARTITIONS;
use crate::error::{
CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::{InformationTable, Predicates};
use crate::kvbackend::KvBackendCatalogManager;
use crate::CatalogManager;
const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const PARTITION_NAME: &str = "partition_name";
const PARTITION_EXPRESSION: &str = "partition_expression";
/// The region id
const GREPTIME_PARTITION_ID: &str = "greptime_partition_id";
const INIT_CAPACITY: usize = 42;
/// The `PARTITIONS` table provides information about partitioned tables.
/// See https://dev.mysql.com/doc/refman/8.0/en/information-schema-partitions-table.html
/// We provide an extral column `greptime_partition_id` for GreptimeDB region id.
pub(super) struct InformationSchemaPartitions {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
}
impl InformationSchemaPartitions {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
}
}
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(PARTITION_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"subpartition_name",
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"partition_ordinal_position",
ConcreteDataType::int64_datatype(),
true,
),
ColumnSchema::new(
"subpartition_ordinal_position",
ConcreteDataType::int64_datatype(),
true,
),
ColumnSchema::new(
"partition_method",
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"subpartition_method",
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
PARTITION_EXPRESSION,
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"subpartition_expression",
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"partition_description",
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new("table_rows", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("avg_row_length", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("data_length", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("max_data_length", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("index_length", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("data_free", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("create_time", ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new("update_time", ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new("check_time", ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new("checksum", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(
"partition_comment",
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new("nodegroup", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("tablespace_name", ConcreteDataType::string_datatype(), true),
ColumnSchema::new(
GREPTIME_PARTITION_ID,
ConcreteDataType::uint64_datatype(),
true,
),
]))
}
fn builder(&self) -> InformationSchemaPartitionsBuilder {
InformationSchemaPartitionsBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
)
}
}
impl InformationTable for InformationSchemaPartitions {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_PARTITIONS_TABLE_ID
}
fn table_name(&self) -> &'static str {
PARTITIONS
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_partitions(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
struct InformationSchemaPartitionsBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
table_names: StringVectorBuilder,
partition_names: StringVectorBuilder,
partition_ordinal_positions: Int64VectorBuilder,
partition_expressions: StringVectorBuilder,
create_times: DateTimeVectorBuilder,
partition_ids: UInt64VectorBuilder,
}
impl InformationSchemaPartitionsBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
create_times: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
}
}
/// Construct the `information_schema.partitions` virtual table
async fn make_partitions(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let partition_manager = catalog_manager
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.map(|catalog_manager| catalog_manager.partition_manager());
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
if table_info.table_type == TableType::Temporary {
continue;
}
let table_id = table_info.ident.table_id;
let partitions = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_table_partitions(table_id)
.await
.context(FindPartitionsSnafu {
table: &table_info.name,
})?
} else {
// Current node must be a standalone instance, contains only one partition by default.
// TODO(dennis): change it when we support multi-regions for standalone.
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}]
};
self.add_partitions(
&predicates,
&table_info,
&catalog_name,
&schema_name,
&table_info.name,
&partitions,
);
}
}
self.finish()
}
#[allow(clippy::too_many_arguments)]
fn add_partitions(
&mut self,
predicates: &Predicates,
table_info: &TableInfo,
catalog_name: &str,
schema_name: &str,
table_name: &str,
partitions: &[PartitionInfo],
) {
let row = [
(TABLE_CATALOG, &Value::from(catalog_name)),
(TABLE_SCHEMA, &Value::from(schema_name)),
(TABLE_NAME, &Value::from(table_name)),
];
if !predicates.eval(&row) {
return;
}
for (index, partition) in partitions.iter().enumerate() {
let partition_name = format!("p{index}");
self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
self.partition_names.push(Some(&partition_name));
self.partition_ordinal_positions
.push(Some((index + 1) as i64));
let expressions = if partition.partition.partition_columns().is_empty() {
None
} else {
Some(partition.partition.to_string())
};
self.partition_expressions.push(expressions.as_deref());
self.create_times.push(Some(DateTime::from(
table_info.meta.created_on.timestamp_millis(),
)));
self.partition_ids.push(Some(partition.id.as_u64()));
}
}
fn finish(&mut self) -> Result<RecordBatch> {
let rows_num = self.catalog_names.len();
let null_string_vector = Arc::new(ConstantVector::new(
Arc::new(StringVector::from(vec![None as Option<&str>])),
rows_num,
));
let null_i64_vector = Arc::new(ConstantVector::new(
Arc::new(Int64Vector::from(vec![None])),
rows_num,
));
let null_datetime_vector = Arc::new(ConstantVector::new(
Arc::new(DateTimeVector::from(vec![None])),
rows_num,
));
let partition_methods = Arc::new(ConstantVector::new(
Arc::new(StringVector::from(vec![Some("RANGE")])),
rows_num,
));
let columns: Vec<VectorRef> = vec![
Arc::new(self.catalog_names.finish()),
Arc::new(self.schema_names.finish()),
Arc::new(self.table_names.finish()),
Arc::new(self.partition_names.finish()),
null_string_vector.clone(),
Arc::new(self.partition_ordinal_positions.finish()),
null_i64_vector.clone(),
partition_methods,
null_string_vector.clone(),
Arc::new(self.partition_expressions.finish()),
null_string_vector.clone(),
null_string_vector.clone(),
// TODO(dennis): rows and index statistics info
null_i64_vector.clone(),
null_i64_vector.clone(),
null_i64_vector.clone(),
null_i64_vector.clone(),
null_i64_vector.clone(),
null_i64_vector.clone(),
Arc::new(self.create_times.finish()),
// TODO(dennis): supports update_time
null_datetime_vector.clone(),
null_datetime_vector,
null_i64_vector,
null_string_vector.clone(),
null_string_vector.clone(),
null_string_vector,
Arc::new(self.partition_ids.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
impl DfPartitionStream for InformationSchemaPartitions {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_partitions(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}

View File

@@ -0,0 +1,279 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use core::pin::pin;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt64VectorBuilder};
use futures::{StreamExt, TryStreamExt};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
use super::REGION_PEERS;
use crate::error::{
CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::{InformationTable, Predicates};
use crate::kvbackend::KvBackendCatalogManager;
use crate::CatalogManager;
const REGION_ID: &str = "region_id";
const PEER_ID: &str = "peer_id";
const PEER_ADDR: &str = "peer_addr";
const IS_LEADER: &str = "is_leader";
const STATUS: &str = "status";
const DOWN_SECONDS: &str = "down_seconds";
const INIT_CAPACITY: usize = 42;
/// The `REGION_PEERS` table provides information about the region distribution and routes. Including fields:
///
/// - `region_id`: the region id
/// - `peer_id`: the region storage datanode peer id
/// - `peer_addr`: the region storage datanode peer address
/// - `is_leader`: whether the peer is the leader
/// - `status`: the region status, `ALIVE` or `DOWNGRADED`.
/// - `down_seconds`: the duration of being offline, in seconds.
///
pub(super) struct InformationSchemaRegionPeers {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
}
impl InformationSchemaRegionPeers {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
}
}
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(PEER_ID, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(IS_LEADER, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(STATUS, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(DOWN_SECONDS, ConcreteDataType::int64_datatype(), true),
]))
}
fn builder(&self) -> InformationSchemaRegionPeersBuilder {
InformationSchemaRegionPeersBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
)
}
}
impl InformationTable for InformationSchemaRegionPeers {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID
}
fn table_name(&self) -> &'static str {
REGION_PEERS
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_region_peers(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
struct InformationSchemaRegionPeersBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
region_ids: UInt64VectorBuilder,
peer_ids: UInt64VectorBuilder,
peer_addrs: StringVectorBuilder,
is_leaders: StringVectorBuilder,
statuses: StringVectorBuilder,
down_seconds: Int64VectorBuilder,
}
impl InformationSchemaRegionPeersBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
is_leaders: StringVectorBuilder::with_capacity(INIT_CAPACITY),
statuses: StringVectorBuilder::with_capacity(INIT_CAPACITY),
down_seconds: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
}
}
/// Construct the `information_schema.region_peers` virtual table
async fn make_region_peers(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let partition_manager = catalog_manager
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.map(|catalog_manager| catalog_manager.partition_manager());
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let table_id_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.await
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Ok(None)
} else {
Ok(Some(table_info.ident.table_id))
}
});
const BATCH_SIZE: usize = 128;
// Split table ids into chunks
let mut table_id_chunks = pin!(table_id_stream.ready_chunks(BATCH_SIZE));
while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids.into_iter().collect::<Result<Vec<_>>>()?;
let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_region_routes_batch(&table_ids)
.await
.context(FindRegionRoutesSnafu)?
} else {
table_ids.into_iter().map(|id| (id, vec![])).collect()
};
for routes in table_routes.values() {
self.add_region_peers(&predicates, routes);
}
}
}
self.finish()
}
fn add_region_peers(&mut self, predicates: &Predicates, routes: &[RegionRoute]) {
for route in routes {
let region_id = route.region.id.as_u64();
let peer_id = route.leader_peer.clone().map(|p| p.id);
let peer_addr = route.leader_peer.clone().map(|p| p.addr);
let status = if let Some(status) = route.leader_status {
Some(status.as_ref().to_string())
} else {
// Alive by default
Some("ALIVE".to_string())
};
let row = [(REGION_ID, &Value::from(region_id))];
if !predicates.eval(&row) {
return;
}
// TODO(dennis): adds followers.
self.region_ids.push(Some(region_id));
self.peer_ids.push(peer_id);
self.peer_addrs.push(peer_addr.as_deref());
self.is_leaders.push(Some("Yes"));
self.statuses.push(status.as_deref());
self.down_seconds
.push(route.leader_down_millis().map(|m| m / 1000));
}
}
fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.region_ids.finish()),
Arc::new(self.peer_ids.finish()),
Arc::new(self.peer_addrs.finish()),
Arc::new(self.is_leaders.finish()),
Arc::new(self.statuses.finish()),
Arc::new(self.down_seconds.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
impl DfPartitionStream for InformationSchemaRegionPeers {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_region_peers(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}

View File

@@ -41,6 +41,7 @@ const CATALOG_NAME: &str = "catalog_name";
const SCHEMA_NAME: &str = "schema_name";
const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name";
const DEFAULT_COLLATION_NAME: &str = "default_collation_name";
const INIT_CAPACITY: usize = 42;
/// The `information_schema.schemata` table implementation.
pub(super) struct InformationSchemaSchemata {
@@ -144,11 +145,11 @@ impl InformationSchemaSchemataBuilder {
schema,
catalog_name,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
charset_names: StringVectorBuilder::with_capacity(42),
collation_names: StringVectorBuilder::with_capacity(42),
sql_paths: StringVectorBuilder::with_capacity(42),
catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
charset_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
collation_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
sql_paths: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
@@ -162,13 +163,6 @@ impl InformationSchemaSchemataBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exists(&catalog_name, &schema_name)
.await?
{
continue;
}
self.add_schema(&predicates, &catalog_name, &schema_name);
}

View File

@@ -39,3 +39,5 @@ pub const TRIGGERS: &str = "triggers";
pub const GLOBAL_STATUS: &str = "global_status";
pub const SESSION_STATUS: &str = "session_status";
pub const RUNTIME_METRICS: &str = "runtime_metrics";
pub const PARTITIONS: &str = "partitions";
pub const REGION_PEERS: &str = "greptime_region_peers";

View File

@@ -45,6 +45,7 @@ const TABLE_NAME: &str = "table_name";
const TABLE_TYPE: &str = "table_type";
const TABLE_ID: &str = "table_id";
const ENGINE: &str = "engine";
const INIT_CAPACITY: usize = 42;
pub(super) struct InformationSchemaTables {
schema: SchemaRef,
@@ -141,12 +142,12 @@ impl InformationSchemaTablesBuilder {
schema,
catalog_name,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
table_names: StringVectorBuilder::with_capacity(42),
table_types: StringVectorBuilder::with_capacity(42),
table_ids: UInt32VectorBuilder::with_capacity(42),
engines: StringVectorBuilder::with_capacity(42),
catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
@@ -160,13 +161,6 @@ impl InformationSchemaTablesBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exists(&catalog_name, &schema_name)
.await?
{
continue;
}
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
while let Some(table) = stream.try_next().await? {

View File

@@ -156,6 +156,7 @@ fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
}),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
});
}

View File

@@ -82,6 +82,10 @@ pub const INFORMATION_SCHEMA_GLOBAL_STATUS_TABLE_ID: u32 = 25;
pub const INFORMATION_SCHEMA_SESSION_STATUS_TABLE_ID: u32 = 26;
/// id for information_schema.RUNTIME_METRICS
pub const INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID: u32 = 27;
/// id for information_schema.PARTITIONS
pub const INFORMATION_SCHEMA_PARTITIONS_TABLE_ID: u32 = 28;
/// id for information_schema.REGION_PEERS
pub const INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID: u32 = 29;
/// ----- End of information_schema tables -----
pub const MITO_ENGINE: &str = "mito";

View File

@@ -844,6 +844,7 @@ mod tests {
use std::sync::Arc;
use bytes::Bytes;
use common_time::util::current_time_millis;
use futures::TryStreamExt;
use table::metadata::{RawTableInfo, TableInfo};
@@ -910,6 +911,7 @@ mod tests {
leader_peer: Some(Peer::new(datanode, "a2")),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
}
}
@@ -1263,6 +1265,7 @@ mod tests {
leader_peer: Some(Peer::new(datanode, "a2")),
leader_status: Some(RegionStatus::Downgraded),
follower_peers: vec![],
leader_down_since: Some(current_time_millis()),
},
RegionRoute {
region: Region {
@@ -1274,6 +1277,7 @@ mod tests {
leader_peer: Some(Peer::new(datanode, "a1")),
leader_status: None,
follower_peers: vec![],
leader_down_since: None,
},
];
let table_info: RawTableInfo =
@@ -1314,10 +1318,18 @@ mod tests {
updated_route_value.region_routes().unwrap()[0].leader_status,
Some(RegionStatus::Downgraded)
);
assert!(updated_route_value.region_routes().unwrap()[0]
.leader_down_since
.is_some());
assert_eq!(
updated_route_value.region_routes().unwrap()[1].leader_status,
Some(RegionStatus::Downgraded)
);
assert!(updated_route_value.region_routes().unwrap()[1]
.leader_down_since
.is_some());
}
async fn assert_datanode_table(

View File

@@ -457,7 +457,7 @@ mod tests {
let new_raw_v = format!("{:?}", v);
assert_eq!(
new_raw_v,
r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None }], version: 0 })"#
r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }], version: 0 })"#
);
}
}

View File

@@ -18,11 +18,13 @@ use api::v1::meta::{
Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
TableRoute as PbTableRoute,
};
use common_time::util::current_time_millis;
use derive_builder::Builder;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use snafu::OptionExt;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use crate::error::{self, Result};
use crate::key::RegionDistribution;
@@ -204,6 +206,7 @@ impl TableRoute {
leader_peer,
follower_peers,
leader_status: None,
leader_down_since: None,
});
}
@@ -258,10 +261,25 @@ pub struct RegionRoute {
#[builder(setter(into, strip_option), default)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub leader_status: Option<RegionStatus>,
/// The start time when the leader is in `Downgraded` status.
#[serde(default)]
#[builder(default = "self.default_leader_down_since()")]
pub leader_down_since: Option<i64>,
}
impl RegionRouteBuilder {
fn default_leader_down_since(&self) -> Option<i64> {
match self.leader_status {
Some(Some(RegionStatus::Downgraded)) => Some(current_time_millis()),
_ => None,
}
}
}
/// The Status of the [Region].
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq)]
/// TODO(dennis): It's better to add more fine-grained statuses such as `PENDING` etc.
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
#[strum(serialize_all = "UPPERCASE")]
pub enum RegionStatus {
/// The following cases in which the [Region] will be downgraded.
///
@@ -292,15 +310,34 @@ impl RegionRoute {
/// **Notes:** Meta Server will stop renewing the lease for the downgraded [Region].
///
pub fn downgrade_leader(&mut self) {
self.leader_down_since = Some(current_time_millis());
self.leader_status = Some(RegionStatus::Downgraded)
}
/// Returns how long since the leader is in `Downgraded` status.
pub fn leader_down_millis(&self) -> Option<i64> {
self.leader_down_since
.map(|start| current_time_millis() - start)
}
/// Sets the leader status.
///
/// Returns true if updated.
pub fn set_leader_status(&mut self, status: Option<RegionStatus>) -> bool {
let updated = self.leader_status != status;
match (status, updated) {
(Some(RegionStatus::Downgraded), true) => {
self.leader_down_since = Some(current_time_millis());
}
(Some(RegionStatus::Downgraded), false) => {
// Do nothing if leader is still in `Downgraded` status.
}
_ => {
self.leader_down_since = None;
}
}
self.leader_status = status;
updated
}
@@ -441,6 +478,7 @@ mod tests {
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
leader_status: None,
leader_down_since: None,
};
assert!(!region_route.is_leader_downgraded());
@@ -462,6 +500,7 @@ mod tests {
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
leader_status: None,
leader_down_since: None,
};
let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;

View File

@@ -296,6 +296,7 @@ mod test {
leader_peer: Some(peer.clone()),
follower_peers: vec![follower_peer.clone()],
leader_status: Some(RegionStatus::Downgraded),
leader_down_since: Some(1),
},
RegionRoute {
region: Region::new_test(another_region_id),

View File

@@ -190,6 +190,7 @@ mod tests {
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{Region, RegionRoute, RegionStatus};
use common_time::util::current_time_millis;
use store_api::storage::RegionId;
use crate::error::Error;
@@ -285,6 +286,7 @@ mod tests {
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_status: Some(RegionStatus::Downgraded),
leader_down_since: Some(current_time_millis()),
}];
env.create_physical_table_metadata(table_info, region_routes)
@@ -296,6 +298,7 @@ mod tests {
.unwrap();
assert!(!new_region_routes[0].is_leader_downgraded());
assert!(new_region_routes[0].leader_down_since.is_none());
assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
}
@@ -316,6 +319,7 @@ mod tests {
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5), Peer::empty(3)],
leader_status: Some(RegionStatus::Downgraded),
leader_down_since: Some(current_time_millis()),
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
@@ -377,6 +381,7 @@ mod tests {
leader_peer: Some(leader_peer),
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_status: None,
leader_down_since: None,
}];
env.create_physical_table_metadata(table_info, region_routes)
@@ -400,6 +405,7 @@ mod tests {
leader_peer: Some(candidate_peer),
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_status: None,
leader_down_since: None,
}];
env.create_physical_table_metadata(table_info, region_routes)
@@ -423,6 +429,7 @@ mod tests {
leader_peer: Some(candidate_peer),
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_status: Some(RegionStatus::Downgraded),
leader_down_since: None,
}];
env.create_physical_table_metadata(table_info, region_routes)

View File

@@ -50,6 +50,7 @@ pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64)
leader_peer,
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
}
}
@@ -132,6 +133,7 @@ pub(crate) async fn prepare_table_region_and_info_value(
}),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
};
// Region distribution:

View File

@@ -133,6 +133,7 @@ pub(crate) async fn create_partition_rule_manager(
leader_peer: Some(Peer::new(3, "")),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region {
@@ -151,6 +152,7 @@ pub(crate) async fn create_partition_rule_manager(
leader_peer: Some(Peer::new(2, "")),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region {
@@ -169,6 +171,7 @@ pub(crate) async fn create_partition_rule_manager(
leader_peer: Some(Peer::new(1, "")),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
]),
region_wal_options.clone(),
@@ -200,6 +203,7 @@ pub(crate) async fn create_partition_rule_manager(
leader_peer: None,
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region {
@@ -221,6 +225,7 @@ pub(crate) async fn create_partition_rule_manager(
leader_peer: None,
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region {
@@ -239,6 +244,7 @@ pub(crate) async fn create_partition_rule_manager(
leader_peer: None,
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
]),
region_wal_options,

View File

@@ -17,6 +17,7 @@ datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
moka = { workspace = true, features = ["future"] }

View File

@@ -65,7 +65,7 @@ impl PartitionRuleManager {
}
}
async fn find_region_routes(&self, table_id: TableId) -> Result<Vec<RegionRoute>> {
pub async fn find_region_routes(&self, table_id: TableId) -> Result<Vec<RegionRoute>> {
let (_, route) = self
.table_route_manager
.get_physical_table_route(table_id)
@@ -74,6 +74,29 @@ impl PartitionRuleManager {
Ok(route.region_routes)
}
pub async fn find_region_routes_batch(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, Vec<RegionRoute>>> {
let table_routes = self
.table_route_manager
.batch_get(table_ids)
.await
.context(error::TableRouteManagerSnafu)?;
let mut table_region_routes = HashMap::with_capacity(table_routes.len());
for (table_id, table_route) in table_routes {
let region_routes = table_route
.region_routes()
.context(error::TableRouteManagerSnafu)?
.clone();
table_region_routes.insert(table_id, region_routes);
}
Ok(table_region_routes)
}
pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
let region_routes = self.find_region_routes(table_id).await?;
ensure!(

View File

@@ -13,12 +13,13 @@
// limitations under the License.
use std::any::Any;
use std::fmt::Debug;
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;
use common_meta::rpc::router::Partition as MetaPartition;
use datafusion_expr::Operator;
use datatypes::prelude::Value;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
@@ -56,6 +57,29 @@ pub struct PartitionDef {
partition_bounds: Vec<PartitionBound>,
}
impl Display for PartitionBound {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Value(v) => write!(f, "{}", v),
Self::MaxValue => write!(f, "MAXVALUE"),
}
}
}
impl Display for PartitionDef {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"({}) VALUES LESS THAN ({})",
self.partition_columns.iter().join(", "),
self.partition_bounds
.iter()
.map(|b| format!("{b}"))
.join(", ")
)
}
}
impl PartitionDef {
pub fn new(partition_columns: Vec<String>, partition_bounds: Vec<PartitionBound>) -> Self {
Self {
@@ -162,6 +186,8 @@ mod tests {
PartitionBound::Value(1_i32.into()),
],
};
assert_eq!("(a, b) VALUES LESS THAN (MAXVALUE, 1)", def.to_string());
let partition: MetaPartition = def.try_into().unwrap();
assert_eq!(
r#"{"column_list":["a","b"],"value_list":["\"MaxValue\"","{\"Value\":{\"Int32\":1}}"]}"#,

View File

@@ -0,0 +1,40 @@
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION BY RANGE COLUMNS (a) (
PARTITION p0 VALUES LESS THAN (10),
PARTITION p1 VALUES LESS THAN (20),
PARTITION p2 VALUES LESS THAN (MAXVALUE),
);
Affected Rows: 0
-- SQLNESS REPLACE (\d{13}) ID
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name;
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id |
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
| greptime | public | my_table | p0 | (a) VALUES LESS THAN (10) | ID |
| greptime | public | my_table | p1 | (a) VALUES LESS THAN (20) | ID |
| greptime | public | my_table | p2 | (a) VALUES LESS THAN (MAXVALUE) | ID |
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
-- SQLNESS REPLACE (\d{13}) REGION_ID
-- SQLNESS REPLACE (\d{1}) PEER_ID
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
+---------------+---------+-----------+--------+
| region_id | peer_id | is_leader | status |
+---------------+---------+-----------+--------+
| REGION_ID | PEER_ID | Yes | ALIVE |
| REGION_ID | PEER_ID | Yes | ALIVE |
| REGION_ID | PEER_ID | Yes | ALIVE |
+---------------+---------+-----------+--------+
DROP TABLE my_table;
Affected Rows: 0

View File

@@ -0,0 +1,19 @@
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION BY RANGE COLUMNS (a) (
PARTITION p0 VALUES LESS THAN (10),
PARTITION p1 VALUES LESS THAN (20),
PARTITION p2 VALUES LESS THAN (MAXVALUE),
);
-- SQLNESS REPLACE (\d{13}) ID
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name;
-- SQLNESS REPLACE (\d{13}) REGION_ID
-- SQLNESS REPLACE (\d{1}) PEER_ID
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
DROP TABLE my_table;

View File

@@ -34,6 +34,7 @@ show tables;
| key_column_usage |
| optimizer_trace |
| parameters |
| partitions |
| profiling |
| referential_constraints |
| routines |

View File

@@ -26,6 +26,7 @@ order by table_schema, table_name;
| greptime | information_schema | key_column_usage | LOCAL TEMPORARY | 16 | |
| greptime | information_schema | optimizer_trace | LOCAL TEMPORARY | 17 | |
| greptime | information_schema | parameters | LOCAL TEMPORARY | 18 | |
| greptime | information_schema | partitions | LOCAL TEMPORARY | 28 | |
| greptime | information_schema | profiling | LOCAL TEMPORARY | 19 | |
| greptime | information_schema | referential_constraints | LOCAL TEMPORARY | 20 | |
| greptime | information_schema | routines | LOCAL TEMPORARY | 21 | |
@@ -182,6 +183,32 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | information_schema | parameters | specific_catalog | String | FIELD | | No | String | |
| greptime | information_schema | parameters | specific_name | String | FIELD | | No | String | |
| greptime | information_schema | parameters | specific_schema | String | FIELD | | No | String | |
| greptime | information_schema | partitions | avg_row_length | Int64 | FIELD | | Yes | Int64 | |
| greptime | information_schema | partitions | check_time | DateTime | FIELD | | Yes | DateTime | |
| greptime | information_schema | partitions | checksum | Int64 | FIELD | | Yes | Int64 | |
| greptime | information_schema | partitions | create_time | DateTime | FIELD | | Yes | DateTime | |
| greptime | information_schema | partitions | data_free | Int64 | FIELD | | Yes | Int64 | |
| greptime | information_schema | partitions | data_length | Int64 | FIELD | | Yes | Int64 | |
| greptime | information_schema | partitions | greptime_partition_id | UInt64 | FIELD | | Yes | UInt64 | |
| greptime | information_schema | partitions | index_length | Int64 | FIELD | | Yes | Int64 | |
| greptime | information_schema | partitions | max_data_length | Int64 | FIELD | | Yes | Int64 | |
| greptime | information_schema | partitions | nodegroup | String | FIELD | | Yes | String | |
| greptime | information_schema | partitions | partition_comment | String | FIELD | | Yes | String | |
| greptime | information_schema | partitions | partition_description | String | FIELD | | Yes | String | |
| greptime | information_schema | partitions | partition_expression | String | FIELD | | Yes | String | |
| greptime | information_schema | partitions | partition_method | String | FIELD | | Yes | String | |
| greptime | information_schema | partitions | partition_name | String | FIELD | | No | String | |
| greptime | information_schema | partitions | partition_ordinal_position | Int64 | FIELD | | Yes | Int64 | |
| greptime | information_schema | partitions | subpartition_expression | String | FIELD | | Yes | String | |
| greptime | information_schema | partitions | subpartition_method | String | FIELD | | Yes | String | |
| greptime | information_schema | partitions | subpartition_name | String | FIELD | | Yes | String | |
| greptime | information_schema | partitions | subpartition_ordinal_position | Int64 | FIELD | | Yes | Int64 | |
| greptime | information_schema | partitions | table_catalog | String | FIELD | | No | String | |
| greptime | information_schema | partitions | table_name | String | FIELD | | No | String | |
| greptime | information_schema | partitions | table_rows | Int64 | FIELD | | Yes | Int64 | |
| greptime | information_schema | partitions | table_schema | String | FIELD | | No | String | |
| greptime | information_schema | partitions | tablespace_name | String | FIELD | | Yes | String | |
| greptime | information_schema | partitions | update_time | DateTime | FIELD | | Yes | DateTime | |
| greptime | information_schema | profiling | block_ops_in | Int64 | FIELD | | No | Int64 | |
| greptime | information_schema | profiling | block_ops_out | Int64 | FIELD | | No | Int64 | |
| greptime | information_schema | profiling | context_involuntary | Int64 | FIELD | | No | Int64 | |
@@ -528,7 +555,7 @@ select * from key_column_usage;
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
-- tables not implemented
desc table COLUMN_PRIVILEGES;
DESC TABLE COLUMN_PRIVILEGES;
+----------------+--------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
@@ -542,14 +569,14 @@ desc table COLUMN_PRIVILEGES;
| is_grantable | String | | NO | | FIELD |
+----------------+--------+-----+------+---------+---------------+
select * from COLUMN_PRIVILEGES;
SELECT * FROM COLUMN_PRIVILEGES;
+---------+---------------+--------------+------------+-------------+----------------+--------------+
| grantee | table_catalog | table_schema | table_name | column_name | privilege_type | is_grantable |
+---------+---------------+--------------+------------+-------------+----------------+--------------+
+---------+---------------+--------------+------------+-------------+----------------+--------------+
desc table COLUMN_STATISTICS;
DESC TABLE COLUMN_STATISTICS;
+-------------+--------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
@@ -560,14 +587,14 @@ desc table COLUMN_STATISTICS;
| histogram | String | | NO | | FIELD |
+-------------+--------+-----+------+---------+---------------+
select * from COLUMN_STATISTICS;
SELECT * FROM COLUMN_STATISTICS;
+-------------+------------+-------------+-----------+
| schema_name | table_name | column_name | histogram |
+-------------+------------+-------------+-----------+
+-------------+------------+-------------+-----------+
select * from CHARACTER_SETS;
SELECT * FROM CHARACTER_SETS;
+--------------------+----------------------+---------------+--------+
| character_set_name | default_collate_name | description | maxlen |
@@ -575,7 +602,7 @@ select * from CHARACTER_SETS;
| utf8 | utf8_bin | UTF-8 Unicode | 4 |
+--------------------+----------------------+---------------+--------+
select * from COLLATIONS;
SELECT * FROM COLLATIONS;
+----------------+--------------------+----+------------+-------------+---------+
| collation_name | character_set_name | id | is_default | is_compiled | sortlen |
@@ -583,7 +610,7 @@ select * from COLLATIONS;
| utf8_bin | utf8 | 1 | Yes | Yes | 1 |
+----------------+--------------------+----+------------+-------------+---------+
select * from COLLATION_CHARACTER_SET_APPLICABILITY;
SELECT * FROM COLLATION_CHARACTER_SET_APPLICABILITY;
+----------------+--------------------+
| collation_name | character_set_name |
@@ -591,7 +618,7 @@ select * from COLLATION_CHARACTER_SET_APPLICABILITY;
| utf8_bin | utf8 |
+----------------+--------------------+
desc table CHECK_CONSTRAINTS;
DESC TABLE CHECK_CONSTRAINTS;
+--------------------+--------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
@@ -602,14 +629,14 @@ desc table CHECK_CONSTRAINTS;
| check_clause | String | | NO | | FIELD |
+--------------------+--------+-----+------+---------+---------------+
select * from CHECK_CONSTRAINTS;
SELECT * FROM CHECK_CONSTRAINTS;
+--------------------+-------------------+-----------------+--------------+
| constraint_catalog | constraint_schema | constraint_name | check_clause |
+--------------------+-------------------+-----------------+--------------+
+--------------------+-------------------+-----------------+--------------+
desc table RUNTIME_METRICS;
DESC TABLE RUNTIME_METRICS;
+-------------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
@@ -622,6 +649,19 @@ desc table RUNTIME_METRICS;
| timestamp | TimestampMillisecond | | NO | | FIELD |
+-------------+----------------------+-----+------+---------+---------------+
DESC TABLE GREPTIME_REGION_PEERS;
+--------------+--------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------------+--------+-----+------+---------+---------------+
| region_id | UInt64 | | NO | | FIELD |
| peer_id | UInt64 | | YES | | FIELD |
| peer_addr | String | | YES | | FIELD |
| is_leader | String | | YES | | FIELD |
| status | String | | YES | | FIELD |
| down_seconds | Int64 | | YES | | FIELD |
+--------------+--------+-----+------+---------+---------------+
drop table my_db.foo;
Affected Rows: 0

View File

@@ -97,25 +97,27 @@ desc table key_column_usage;
select * from key_column_usage;
-- tables not implemented
desc table COLUMN_PRIVILEGES;
DESC TABLE COLUMN_PRIVILEGES;
select * from COLUMN_PRIVILEGES;
SELECT * FROM COLUMN_PRIVILEGES;
desc table COLUMN_STATISTICS;
DESC TABLE COLUMN_STATISTICS;
select * from COLUMN_STATISTICS;
SELECT * FROM COLUMN_STATISTICS;
select * from CHARACTER_SETS;
SELECT * FROM CHARACTER_SETS;
select * from COLLATIONS;
SELECT * FROM COLLATIONS;
select * from COLLATION_CHARACTER_SET_APPLICABILITY;
SELECT * FROM COLLATION_CHARACTER_SET_APPLICABILITY;
desc table CHECK_CONSTRAINTS;
DESC TABLE CHECK_CONSTRAINTS;
select * from CHECK_CONSTRAINTS;
SELECT * FROM CHECK_CONSTRAINTS;
desc table RUNTIME_METRICS;
DESC TABLE RUNTIME_METRICS;
DESC TABLE GREPTIME_REGION_PEERS;
drop table my_db.foo;