diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index fb29b84d97..9e5fa135fa 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod columns; mod tables; use std::any::Any; @@ -23,15 +24,18 @@ use snafu::ResultExt; use table::table::adapter::TableAdapter; use table::TableRef; +use self::columns::InformationSchemaColumns; use crate::error::{DatafusionSnafu, Result, TableSchemaMismatchSnafu}; use crate::information_schema::tables::InformationSchemaTables; use crate::{CatalogProviderRef, SchemaProvider}; const TABLES: &str = "tables"; +const COLUMNS: &str = "columns"; pub(crate) struct InformationSchemaProvider { catalog_name: String, catalog_provider: CatalogProviderRef, + tables: Vec, } impl InformationSchemaProvider { @@ -39,6 +43,7 @@ impl InformationSchemaProvider { Self { catalog_name, catalog_provider, + tables: vec![TABLES.to_string(), COLUMNS.to_string()], } } } @@ -50,31 +55,48 @@ impl SchemaProvider for InformationSchemaProvider { } async fn table_names(&self) -> Result> { - Ok(vec![TABLES.to_string()]) + Ok(self.tables.clone()) } async fn table(&self, name: &str) -> Result> { - let table = if name.eq_ignore_ascii_case(TABLES) { - Arc::new(InformationSchemaTables::new( - self.catalog_name.clone(), - self.catalog_provider.clone(), - )) - } else { - return Ok(None); + let table = match name.to_ascii_lowercase().as_ref() { + TABLES => { + let inner = Arc::new(InformationSchemaTables::new( + self.catalog_name.clone(), + self.catalog_provider.clone(), + )); + Arc::new( + StreamingTable::try_new(inner.schema().clone(), vec![inner]).with_context( + |_| DatafusionSnafu { + msg: format!("Failed to get InformationSchema table '{name}'"), + }, + )?, + ) + } + COLUMNS => { + let inner = Arc::new(InformationSchemaColumns::new( + self.catalog_name.clone(), + self.catalog_provider.clone(), + )); + Arc::new( + StreamingTable::try_new(inner.schema().clone(), vec![inner]).with_context( + |_| DatafusionSnafu { + msg: format!("Failed to get InformationSchema table '{name}'"), + }, + )?, + ) + } + _ => { + return Ok(None); + } }; - let table = Arc::new( - StreamingTable::try_new(table.schema().clone(), vec![table]).with_context(|_| { - DatafusionSnafu { - msg: format!("Failed to get InformationSchema table '{name}'"), - } - })?, - ); let table = TableAdapter::new(table).context(TableSchemaMismatchSnafu)?; Ok(Some(Arc::new(table))) } async fn table_exist(&self, name: &str) -> Result { - Ok(matches!(name.to_ascii_lowercase().as_str(), TABLES)) + let normalized_name = name.to_ascii_lowercase(); + Ok(self.tables.contains(&normalized_name)) } } diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs new file mode 100644 index 0000000000..0babffb63f --- /dev/null +++ b/src/catalog/src/information_schema/columns.rs @@ -0,0 +1,165 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_query::physical_plan::TaskContext; +use common_recordbatch::RecordBatch; +use datafusion::datasource::streaming::PartitionStream as DfPartitionStream; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::prelude::{ConcreteDataType, DataType}; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::vectors::{StringVectorBuilder, VectorRef}; +use snafu::ResultExt; + +use crate::error::{CreateRecordBatchSnafu, Result}; +use crate::CatalogProviderRef; + +pub(super) struct InformationSchemaColumns { + schema: SchemaRef, + catalog_name: String, + catalog_provider: CatalogProviderRef, +} + +const TABLE_CATALOG: &str = "table_catalog"; +const TABLE_SCHEMA: &str = "table_schema"; +const TABLE_NAME: &str = "table_name"; +const COLUMN_NAME: &str = "column_name"; +const DATA_TYPE: &str = "data_type"; + +impl InformationSchemaColumns { + pub(super) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(DATA_TYPE, ConcreteDataType::string_datatype(), false), + ])); + Self { + schema, + catalog_name, + catalog_provider, + } + } + + fn builder(&self) -> InformationSchemaColumnsBuilder { + InformationSchemaColumnsBuilder::new( + self.schema.clone(), + self.catalog_name.clone(), + self.catalog_provider.clone(), + ) + } +} + +struct InformationSchemaColumnsBuilder { + schema: SchemaRef, + catalog_name: String, + catalog_provider: CatalogProviderRef, + + catalog_names: StringVectorBuilder, + schema_names: StringVectorBuilder, + table_names: StringVectorBuilder, + column_names: StringVectorBuilder, + data_types: StringVectorBuilder, +} + +impl InformationSchemaColumnsBuilder { + fn new(schema: SchemaRef, catalog_name: String, catalog_provider: CatalogProviderRef) -> Self { + Self { + schema, + catalog_name, + catalog_provider, + catalog_names: StringVectorBuilder::with_capacity(42), + schema_names: StringVectorBuilder::with_capacity(42), + table_names: StringVectorBuilder::with_capacity(42), + column_names: StringVectorBuilder::with_capacity(42), + data_types: StringVectorBuilder::with_capacity(42), + } + } + + /// Construct the `information_schema.tables` virtual table + async fn make_tables(&mut self) -> Result { + let catalog_name = self.catalog_name.clone(); + + for schema_name in self.catalog_provider.schema_names().await? { + let Some(schema) = self.catalog_provider.schema(&schema_name).await? else { continue }; + for table_name in schema.table_names().await? { + let Some(table) = schema.table(&table_name).await? else { continue }; + let schema = table.schema(); + for column in schema.column_schemas() { + self.add_column( + &catalog_name, + &schema_name, + &table_name, + &column.name, + column.data_type.name(), + ); + } + } + } + + self.finish() + } + + fn add_column( + &mut self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + column_name: &str, + data_type: &str, + ) { + self.catalog_names.push(Some(catalog_name)); + self.schema_names.push(Some(schema_name)); + self.table_names.push(Some(table_name)); + self.column_names.push(Some(column_name)); + self.data_types.push(Some(data_type)); + } + + fn finish(&mut self) -> Result { + let columns: Vec = vec![ + Arc::new(self.catalog_names.finish()), + Arc::new(self.schema_names.finish()), + Arc::new(self.table_names.finish()), + Arc::new(self.column_names.finish()), + Arc::new(self.data_types.finish()), + ]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} + +impl DfPartitionStream for InformationSchemaColumns { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_tables() + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 2f9d200159..1a5afa9d3e 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -28,7 +28,6 @@ use snafu::ResultExt; use table::metadata::TableType; use crate::error::{CreateRecordBatchSnafu, Result}; -use crate::information_schema::TABLES; use crate::CatalogProviderRef; pub(super) struct InformationSchemaTables { @@ -118,16 +117,6 @@ impl InformationSchemaTablesBuilder { } } - // Add a final list for the information schema tables themselves - self.add_table( - &catalog_name, - INFORMATION_SCHEMA_NAME, - TABLES, - TableType::View, - None, - None, - ); - self.finish() } diff --git a/src/frontend/src/tests/instance_test.rs b/src/frontend/src/tests/instance_test.rs index fa7325544b..17c59666d1 100644 --- a/src/frontend/src/tests/instance_test.rs +++ b/src/frontend/src/tests/instance_test.rs @@ -1234,7 +1234,7 @@ async fn test_execute_copy_from_s3(instance: Arc) { } #[apply(both_instances_cases)] -async fn test_information_schema(instance: Arc) { +async fn test_information_schema_dot_tables(instance: Arc) { let is_distributed_mode = instance.is_distributed_mode(); let instance = instance.frontend(); @@ -1251,23 +1251,21 @@ async fn test_information_schema(instance: Arc) { let expected = match is_distributed_mode { true => { "\ -+---------------+--------------------+------------+------------+----------+-------------+ -| table_catalog | table_schema | table_name | table_type | table_id | engine | -+---------------+--------------------+------------+------------+----------+-------------+ -| greptime | public | numbers | BASE TABLE | 1 | test_engine | -| greptime | public | scripts | BASE TABLE | 1024 | mito | -| greptime | information_schema | tables | VIEW | | | -+---------------+--------------------+------------+------------+----------+-------------+" ++---------------+--------------+------------+------------+----------+-------------+ +| table_catalog | table_schema | table_name | table_type | table_id | engine | ++---------------+--------------+------------+------------+----------+-------------+ +| greptime | public | numbers | BASE TABLE | 1 | test_engine | +| greptime | public | scripts | BASE TABLE | 1024 | mito | ++---------------+--------------+------------+------------+----------+-------------+" } false => { "\ -+---------------+--------------------+------------+------------+----------+-------------+ -| table_catalog | table_schema | table_name | table_type | table_id | engine | -+---------------+--------------------+------------+------------+----------+-------------+ -| greptime | public | numbers | BASE TABLE | 1 | test_engine | -| greptime | public | scripts | BASE TABLE | 1 | mito | -| greptime | information_schema | tables | VIEW | | | -+---------------+--------------------+------------+------------+----------+-------------+" ++---------------+--------------+------------+------------+----------+-------------+ +| table_catalog | table_schema | table_name | table_type | table_id | engine | ++---------------+--------------+------------+------------+----------+-------------+ +| greptime | public | numbers | BASE TABLE | 1 | test_engine | +| greptime | public | scripts | BASE TABLE | 1 | mito | ++---------------+--------------+------------+------------+----------+-------------+" } }; @@ -1277,26 +1275,65 @@ async fn test_information_schema(instance: Arc) { let expected = match is_distributed_mode { true => { "\ -+-----------------+--------------------+---------------+------------+----------+--------+ -| table_catalog | table_schema | table_name | table_type | table_id | engine | -+-----------------+--------------------+---------------+------------+----------+--------+ -| another_catalog | another_schema | another_table | BASE TABLE | 1025 | mito | -| another_catalog | information_schema | tables | VIEW | | | -+-----------------+--------------------+---------------+------------+----------+--------+" ++-----------------+----------------+---------------+------------+----------+--------+ +| table_catalog | table_schema | table_name | table_type | table_id | engine | ++-----------------+----------------+---------------+------------+----------+--------+ +| another_catalog | another_schema | another_table | BASE TABLE | 1025 | mito | ++-----------------+----------------+---------------+------------+----------+--------+" } false => { "\ -+-----------------+--------------------+---------------+------------+----------+--------+ -| table_catalog | table_schema | table_name | table_type | table_id | engine | -+-----------------+--------------------+---------------+------------+----------+--------+ -| another_catalog | another_schema | another_table | BASE TABLE | 1024 | mito | -| another_catalog | information_schema | tables | VIEW | | | -+-----------------+--------------------+---------------+------------+----------+--------+" ++-----------------+----------------+---------------+------------+----------+--------+ +| table_catalog | table_schema | table_name | table_type | table_id | engine | ++-----------------+----------------+---------------+------------+----------+--------+ +| another_catalog | another_schema | another_table | BASE TABLE | 1024 | mito | ++-----------------+----------------+---------------+------------+----------+--------+" } }; check_output_stream(output, expected).await; } +#[apply(both_instances_cases)] +async fn test_information_schema_dot_columns(instance: Arc) { + let instance = instance.frontend(); + + let sql = "create table another_table(i bigint time index)"; + let query_ctx = Arc::new(QueryContext::with("another_catalog", "another_schema")); + let output = execute_sql_with(&instance, sql, query_ctx.clone()).await; + assert!(matches!(output, Output::AffectedRows(0))); + + // User can only see information schema under current catalog. + // A necessary requirement to GreptimeCloud. + let sql = "select table_catalog, table_schema, table_name, column_name, data_type from information_schema.columns order by table_name"; + + let output = execute_sql(&instance, sql).await; + let expected = "\ ++---------------+--------------+------------+--------------+----------------------+ +| table_catalog | table_schema | table_name | column_name | data_type | ++---------------+--------------+------------+--------------+----------------------+ +| greptime | public | numbers | number | UInt32 | +| greptime | public | scripts | schema | String | +| greptime | public | scripts | name | String | +| greptime | public | scripts | script | String | +| greptime | public | scripts | engine | String | +| greptime | public | scripts | timestamp | TimestampMillisecond | +| greptime | public | scripts | gmt_created | TimestampMillisecond | +| greptime | public | scripts | gmt_modified | TimestampMillisecond | ++---------------+--------------+------------+--------------+----------------------+"; + + check_output_stream(output, expected).await; + + let output = execute_sql_with(&instance, sql, query_ctx).await; + let expected = "\ ++-----------------+----------------+---------------+-------------+-----------+ +| table_catalog | table_schema | table_name | column_name | data_type | ++-----------------+----------------+---------------+-------------+-----------+ +| another_catalog | another_schema | another_table | i | Int64 | ++-----------------+----------------+---------------+-------------+-----------+"; + + check_output_stream(output, expected).await; +} + async fn execute_sql(instance: &Arc, sql: &str) -> Output { execute_sql_with(instance, sql, QueryContext::arc()).await } diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index ca9b0ffa73..c7b9ab4e1a 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -33,12 +33,23 @@ where table_catalog = 'greptime' and table_schema != 'public' order by table_schema, table_name; -+---------------+--------------------+------------+------------+--------+ -| table_catalog | table_schema | table_name | table_type | engine | -+---------------+--------------------+------------+------------+--------+ -| greptime | information_schema | tables | VIEW | | -| greptime | my_db | foo | BASE TABLE | mito | -+---------------+--------------------+------------+------------+--------+ ++---------------+--------------+------------+------------+--------+ +| table_catalog | table_schema | table_name | table_type | engine | ++---------------+--------------+------------+------------+--------+ +| greptime | my_db | foo | BASE TABLE | mito | ++---------------+--------------+------------+------------+--------+ + +select table_catalog, table_schema, table_name, column_name, data_type +from information_schema.columns +where table_catalog = 'greptime' + and table_schema != 'public' +order by table_schema, table_name; + ++---------------+--------------+------------+-------------+-----------+ +| table_catalog | table_schema | table_name | column_name | data_type | ++---------------+--------------+------------+-------------+-----------+ +| greptime | my_db | foo | ts | Int64 | ++---------------+--------------+------------+-------------+-----------+ use public; diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql index ac3a683d02..b33a72075f 100644 --- a/tests/cases/standalone/common/system/information_schema.sql +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -20,5 +20,11 @@ where table_catalog = 'greptime' and table_schema != 'public' order by table_schema, table_name; +select table_catalog, table_schema, table_name, column_name, data_type +from information_schema.columns +where table_catalog = 'greptime' + and table_schema != 'public' +order by table_schema, table_name; + use public;