mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-16 10:12:58 +00:00
feat: add support for information_schema.columns (#1500)
* feat: add support for information_schema.columns * feat: remove information_schema from its view * Update src/catalog/src/information_schema.rs Co-authored-by: LFC <bayinamine@gmail.com> * fix: error on table data type * test: correct sqlness test for information schema * test: add information_schema.columns sqlness tests --------- Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod columns;
|
||||
mod tables;
|
||||
|
||||
use std::any::Any;
|
||||
@@ -23,15 +24,18 @@ use snafu::ResultExt;
|
||||
use table::table::adapter::TableAdapter;
|
||||
use table::TableRef;
|
||||
|
||||
use self::columns::InformationSchemaColumns;
|
||||
use crate::error::{DatafusionSnafu, Result, TableSchemaMismatchSnafu};
|
||||
use crate::information_schema::tables::InformationSchemaTables;
|
||||
use crate::{CatalogProviderRef, SchemaProvider};
|
||||
|
||||
const TABLES: &str = "tables";
|
||||
const COLUMNS: &str = "columns";
|
||||
|
||||
pub(crate) struct InformationSchemaProvider {
|
||||
catalog_name: String,
|
||||
catalog_provider: CatalogProviderRef,
|
||||
tables: Vec<String>,
|
||||
}
|
||||
|
||||
impl InformationSchemaProvider {
|
||||
@@ -39,6 +43,7 @@ impl InformationSchemaProvider {
|
||||
Self {
|
||||
catalog_name,
|
||||
catalog_provider,
|
||||
tables: vec![TABLES.to_string(), COLUMNS.to_string()],
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -50,31 +55,48 @@ impl SchemaProvider for InformationSchemaProvider {
|
||||
}
|
||||
|
||||
async fn table_names(&self) -> Result<Vec<String>> {
|
||||
Ok(vec![TABLES.to_string()])
|
||||
Ok(self.tables.clone())
|
||||
}
|
||||
|
||||
async fn table(&self, name: &str) -> Result<Option<TableRef>> {
|
||||
let table = if name.eq_ignore_ascii_case(TABLES) {
|
||||
Arc::new(InformationSchemaTables::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_provider.clone(),
|
||||
))
|
||||
} else {
|
||||
return Ok(None);
|
||||
let table = match name.to_ascii_lowercase().as_ref() {
|
||||
TABLES => {
|
||||
let inner = Arc::new(InformationSchemaTables::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_provider.clone(),
|
||||
));
|
||||
Arc::new(
|
||||
StreamingTable::try_new(inner.schema().clone(), vec![inner]).with_context(
|
||||
|_| DatafusionSnafu {
|
||||
msg: format!("Failed to get InformationSchema table '{name}'"),
|
||||
},
|
||||
)?,
|
||||
)
|
||||
}
|
||||
COLUMNS => {
|
||||
let inner = Arc::new(InformationSchemaColumns::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_provider.clone(),
|
||||
));
|
||||
Arc::new(
|
||||
StreamingTable::try_new(inner.schema().clone(), vec![inner]).with_context(
|
||||
|_| DatafusionSnafu {
|
||||
msg: format!("Failed to get InformationSchema table '{name}'"),
|
||||
},
|
||||
)?,
|
||||
)
|
||||
}
|
||||
_ => {
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let table = Arc::new(
|
||||
StreamingTable::try_new(table.schema().clone(), vec![table]).with_context(|_| {
|
||||
DatafusionSnafu {
|
||||
msg: format!("Failed to get InformationSchema table '{name}'"),
|
||||
}
|
||||
})?,
|
||||
);
|
||||
let table = TableAdapter::new(table).context(TableSchemaMismatchSnafu)?;
|
||||
Ok(Some(Arc::new(table)))
|
||||
}
|
||||
|
||||
async fn table_exist(&self, name: &str) -> Result<bool> {
|
||||
Ok(matches!(name.to_ascii_lowercase().as_str(), TABLES))
|
||||
let normalized_name = name.to_ascii_lowercase();
|
||||
Ok(self.tables.contains(&normalized_name))
|
||||
}
|
||||
}
|
||||
|
||||
165
src/catalog/src/information_schema/columns.rs
Normal file
165
src/catalog/src/information_schema/columns.rs
Normal file
@@ -0,0 +1,165 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_schema::SchemaRef as ArrowSchemaRef;
|
||||
use common_query::physical_plan::TaskContext;
|
||||
use common_recordbatch::RecordBatch;
|
||||
use datafusion::datasource::streaming::PartitionStream as DfPartitionStream;
|
||||
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
|
||||
use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::vectors::{StringVectorBuilder, VectorRef};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{CreateRecordBatchSnafu, Result};
|
||||
use crate::CatalogProviderRef;
|
||||
|
||||
pub(super) struct InformationSchemaColumns {
|
||||
schema: SchemaRef,
|
||||
catalog_name: String,
|
||||
catalog_provider: CatalogProviderRef,
|
||||
}
|
||||
|
||||
const TABLE_CATALOG: &str = "table_catalog";
|
||||
const TABLE_SCHEMA: &str = "table_schema";
|
||||
const TABLE_NAME: &str = "table_name";
|
||||
const COLUMN_NAME: &str = "column_name";
|
||||
const DATA_TYPE: &str = "data_type";
|
||||
|
||||
impl InformationSchemaColumns {
|
||||
pub(super) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(DATA_TYPE, ConcreteDataType::string_datatype(), false),
|
||||
]));
|
||||
Self {
|
||||
schema,
|
||||
catalog_name,
|
||||
catalog_provider,
|
||||
}
|
||||
}
|
||||
|
||||
fn builder(&self) -> InformationSchemaColumnsBuilder {
|
||||
InformationSchemaColumnsBuilder::new(
|
||||
self.schema.clone(),
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_provider.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct InformationSchemaColumnsBuilder {
|
||||
schema: SchemaRef,
|
||||
catalog_name: String,
|
||||
catalog_provider: CatalogProviderRef,
|
||||
|
||||
catalog_names: StringVectorBuilder,
|
||||
schema_names: StringVectorBuilder,
|
||||
table_names: StringVectorBuilder,
|
||||
column_names: StringVectorBuilder,
|
||||
data_types: StringVectorBuilder,
|
||||
}
|
||||
|
||||
impl InformationSchemaColumnsBuilder {
|
||||
fn new(schema: SchemaRef, catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
catalog_name,
|
||||
catalog_provider,
|
||||
catalog_names: StringVectorBuilder::with_capacity(42),
|
||||
schema_names: StringVectorBuilder::with_capacity(42),
|
||||
table_names: StringVectorBuilder::with_capacity(42),
|
||||
column_names: StringVectorBuilder::with_capacity(42),
|
||||
data_types: StringVectorBuilder::with_capacity(42),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct the `information_schema.tables` virtual table
|
||||
async fn make_tables(&mut self) -> Result<RecordBatch> {
|
||||
let catalog_name = self.catalog_name.clone();
|
||||
|
||||
for schema_name in self.catalog_provider.schema_names().await? {
|
||||
let Some(schema) = self.catalog_provider.schema(&schema_name).await? else { continue };
|
||||
for table_name in schema.table_names().await? {
|
||||
let Some(table) = schema.table(&table_name).await? else { continue };
|
||||
let schema = table.schema();
|
||||
for column in schema.column_schemas() {
|
||||
self.add_column(
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_name,
|
||||
&column.name,
|
||||
column.data_type.name(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.finish()
|
||||
}
|
||||
|
||||
fn add_column(
|
||||
&mut self,
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
column_name: &str,
|
||||
data_type: &str,
|
||||
) {
|
||||
self.catalog_names.push(Some(catalog_name));
|
||||
self.schema_names.push(Some(schema_name));
|
||||
self.table_names.push(Some(table_name));
|
||||
self.column_names.push(Some(column_name));
|
||||
self.data_types.push(Some(data_type));
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> Result<RecordBatch> {
|
||||
let columns: Vec<VectorRef> = vec![
|
||||
Arc::new(self.catalog_names.finish()),
|
||||
Arc::new(self.schema_names.finish()),
|
||||
Arc::new(self.table_names.finish()),
|
||||
Arc::new(self.column_names.finish()),
|
||||
Arc::new(self.data_types.finish()),
|
||||
];
|
||||
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl DfPartitionStream for InformationSchemaColumns {
|
||||
fn schema(&self) -> &ArrowSchemaRef {
|
||||
self.schema.arrow_schema()
|
||||
}
|
||||
|
||||
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
|
||||
let schema = self.schema().clone();
|
||||
let mut builder = self.builder();
|
||||
Box::pin(DfRecordBatchStreamAdapter::new(
|
||||
schema,
|
||||
futures::stream::once(async move {
|
||||
builder
|
||||
.make_tables()
|
||||
.await
|
||||
.map(|x| x.into_df_record_batch())
|
||||
.map_err(Into::into)
|
||||
}),
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -28,7 +28,6 @@ use snafu::ResultExt;
|
||||
use table::metadata::TableType;
|
||||
|
||||
use crate::error::{CreateRecordBatchSnafu, Result};
|
||||
use crate::information_schema::TABLES;
|
||||
use crate::CatalogProviderRef;
|
||||
|
||||
pub(super) struct InformationSchemaTables {
|
||||
@@ -118,16 +117,6 @@ impl InformationSchemaTablesBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
// Add a final list for the information schema tables themselves
|
||||
self.add_table(
|
||||
&catalog_name,
|
||||
INFORMATION_SCHEMA_NAME,
|
||||
TABLES,
|
||||
TableType::View,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
self.finish()
|
||||
}
|
||||
|
||||
|
||||
@@ -1234,7 +1234,7 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
|
||||
}
|
||||
|
||||
#[apply(both_instances_cases)]
|
||||
async fn test_information_schema(instance: Arc<dyn MockInstance>) {
|
||||
async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {
|
||||
let is_distributed_mode = instance.is_distributed_mode();
|
||||
let instance = instance.frontend();
|
||||
|
||||
@@ -1251,23 +1251,21 @@ async fn test_information_schema(instance: Arc<dyn MockInstance>) {
|
||||
let expected = match is_distributed_mode {
|
||||
true => {
|
||||
"\
|
||||
+---------------+--------------------+------------+------------+----------+-------------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+---------------+--------------------+------------+------------+----------+-------------+
|
||||
| greptime | public | numbers | BASE TABLE | 1 | test_engine |
|
||||
| greptime | public | scripts | BASE TABLE | 1024 | mito |
|
||||
| greptime | information_schema | tables | VIEW | | |
|
||||
+---------------+--------------------+------------+------------+----------+-------------+"
|
||||
+---------------+--------------+------------+------------+----------+-------------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+---------------+--------------+------------+------------+----------+-------------+
|
||||
| greptime | public | numbers | BASE TABLE | 1 | test_engine |
|
||||
| greptime | public | scripts | BASE TABLE | 1024 | mito |
|
||||
+---------------+--------------+------------+------------+----------+-------------+"
|
||||
}
|
||||
false => {
|
||||
"\
|
||||
+---------------+--------------------+------------+------------+----------+-------------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+---------------+--------------------+------------+------------+----------+-------------+
|
||||
| greptime | public | numbers | BASE TABLE | 1 | test_engine |
|
||||
| greptime | public | scripts | BASE TABLE | 1 | mito |
|
||||
| greptime | information_schema | tables | VIEW | | |
|
||||
+---------------+--------------------+------------+------------+----------+-------------+"
|
||||
+---------------+--------------+------------+------------+----------+-------------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+---------------+--------------+------------+------------+----------+-------------+
|
||||
| greptime | public | numbers | BASE TABLE | 1 | test_engine |
|
||||
| greptime | public | scripts | BASE TABLE | 1 | mito |
|
||||
+---------------+--------------+------------+------------+----------+-------------+"
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1277,26 +1275,65 @@ async fn test_information_schema(instance: Arc<dyn MockInstance>) {
|
||||
let expected = match is_distributed_mode {
|
||||
true => {
|
||||
"\
|
||||
+-----------------+--------------------+---------------+------------+----------+--------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+-----------------+--------------------+---------------+------------+----------+--------+
|
||||
| another_catalog | another_schema | another_table | BASE TABLE | 1025 | mito |
|
||||
| another_catalog | information_schema | tables | VIEW | | |
|
||||
+-----------------+--------------------+---------------+------------+----------+--------+"
|
||||
+-----------------+----------------+---------------+------------+----------+--------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+-----------------+----------------+---------------+------------+----------+--------+
|
||||
| another_catalog | another_schema | another_table | BASE TABLE | 1025 | mito |
|
||||
+-----------------+----------------+---------------+------------+----------+--------+"
|
||||
}
|
||||
false => {
|
||||
"\
|
||||
+-----------------+--------------------+---------------+------------+----------+--------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+-----------------+--------------------+---------------+------------+----------+--------+
|
||||
| another_catalog | another_schema | another_table | BASE TABLE | 1024 | mito |
|
||||
| another_catalog | information_schema | tables | VIEW | | |
|
||||
+-----------------+--------------------+---------------+------------+----------+--------+"
|
||||
+-----------------+----------------+---------------+------------+----------+--------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+-----------------+----------------+---------------+------------+----------+--------+
|
||||
| another_catalog | another_schema | another_table | BASE TABLE | 1024 | mito |
|
||||
+-----------------+----------------+---------------+------------+----------+--------+"
|
||||
}
|
||||
};
|
||||
check_output_stream(output, expected).await;
|
||||
}
|
||||
|
||||
#[apply(both_instances_cases)]
|
||||
async fn test_information_schema_dot_columns(instance: Arc<dyn MockInstance>) {
|
||||
let instance = instance.frontend();
|
||||
|
||||
let sql = "create table another_table(i bigint time index)";
|
||||
let query_ctx = Arc::new(QueryContext::with("another_catalog", "another_schema"));
|
||||
let output = execute_sql_with(&instance, sql, query_ctx.clone()).await;
|
||||
assert!(matches!(output, Output::AffectedRows(0)));
|
||||
|
||||
// User can only see information schema under current catalog.
|
||||
// A necessary requirement to GreptimeCloud.
|
||||
let sql = "select table_catalog, table_schema, table_name, column_name, data_type from information_schema.columns order by table_name";
|
||||
|
||||
let output = execute_sql(&instance, sql).await;
|
||||
let expected = "\
|
||||
+---------------+--------------+------------+--------------+----------------------+
|
||||
| table_catalog | table_schema | table_name | column_name | data_type |
|
||||
+---------------+--------------+------------+--------------+----------------------+
|
||||
| greptime | public | numbers | number | UInt32 |
|
||||
| greptime | public | scripts | schema | String |
|
||||
| greptime | public | scripts | name | String |
|
||||
| greptime | public | scripts | script | String |
|
||||
| greptime | public | scripts | engine | String |
|
||||
| greptime | public | scripts | timestamp | TimestampMillisecond |
|
||||
| greptime | public | scripts | gmt_created | TimestampMillisecond |
|
||||
| greptime | public | scripts | gmt_modified | TimestampMillisecond |
|
||||
+---------------+--------------+------------+--------------+----------------------+";
|
||||
|
||||
check_output_stream(output, expected).await;
|
||||
|
||||
let output = execute_sql_with(&instance, sql, query_ctx).await;
|
||||
let expected = "\
|
||||
+-----------------+----------------+---------------+-------------+-----------+
|
||||
| table_catalog | table_schema | table_name | column_name | data_type |
|
||||
+-----------------+----------------+---------------+-------------+-----------+
|
||||
| another_catalog | another_schema | another_table | i | Int64 |
|
||||
+-----------------+----------------+---------------+-------------+-----------+";
|
||||
|
||||
check_output_stream(output, expected).await;
|
||||
}
|
||||
|
||||
async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
|
||||
execute_sql_with(instance, sql, QueryContext::arc()).await
|
||||
}
|
||||
|
||||
@@ -33,12 +33,23 @@ where table_catalog = 'greptime'
|
||||
and table_schema != 'public'
|
||||
order by table_schema, table_name;
|
||||
|
||||
+---------------+--------------------+------------+------------+--------+
|
||||
| table_catalog | table_schema | table_name | table_type | engine |
|
||||
+---------------+--------------------+------------+------------+--------+
|
||||
| greptime | information_schema | tables | VIEW | |
|
||||
| greptime | my_db | foo | BASE TABLE | mito |
|
||||
+---------------+--------------------+------------+------------+--------+
|
||||
+---------------+--------------+------------+------------+--------+
|
||||
| table_catalog | table_schema | table_name | table_type | engine |
|
||||
+---------------+--------------+------------+------------+--------+
|
||||
| greptime | my_db | foo | BASE TABLE | mito |
|
||||
+---------------+--------------+------------+------------+--------+
|
||||
|
||||
select table_catalog, table_schema, table_name, column_name, data_type
|
||||
from information_schema.columns
|
||||
where table_catalog = 'greptime'
|
||||
and table_schema != 'public'
|
||||
order by table_schema, table_name;
|
||||
|
||||
+---------------+--------------+------------+-------------+-----------+
|
||||
| table_catalog | table_schema | table_name | column_name | data_type |
|
||||
+---------------+--------------+------------+-------------+-----------+
|
||||
| greptime | my_db | foo | ts | Int64 |
|
||||
+---------------+--------------+------------+-------------+-----------+
|
||||
|
||||
use
|
||||
public;
|
||||
|
||||
@@ -20,5 +20,11 @@ where table_catalog = 'greptime'
|
||||
and table_schema != 'public'
|
||||
order by table_schema, table_name;
|
||||
|
||||
select table_catalog, table_schema, table_name, column_name, data_type
|
||||
from information_schema.columns
|
||||
where table_catalog = 'greptime'
|
||||
and table_schema != 'public'
|
||||
order by table_schema, table_name;
|
||||
|
||||
use
|
||||
public;
|
||||
|
||||
Reference in New Issue
Block a user