From a3e47955b82ae1a77d6173640d577b3f62488aa9 Mon Sep 17 00:00:00 2001 From: LFC Date: Fri, 7 Apr 2023 16:50:14 +0800 Subject: [PATCH] feat: information schema (#1327) * feat: basic information schema * show information schema only for current catalog * fix: fragile tests --- Cargo.lock | 35 ++- Cargo.toml | 12 +- src/catalog/Cargo.toml | 1 + src/catalog/src/datafusion.rs | 15 + .../src/datafusion/catalog_adapter.rs | 39 +-- src/catalog/src/error.rs | 32 +- src/catalog/src/information_schema.rs | 80 +++++ src/catalog/src/information_schema/tables.rs | 165 ++++++++++ src/catalog/src/lib.rs | 2 + src/catalog/src/local/manager.rs | 6 +- src/catalog/src/remote/manager.rs | 30 +- src/catalog/src/schema.rs | 23 +- src/catalog/src/table_source.rs | 29 +- src/catalog/src/tables.rs | 297 +----------------- src/common/substrait/src/df_logical.rs | 4 +- src/frontend/src/catalog.rs | 16 - src/frontend/src/tests.rs | 28 +- src/frontend/src/tests/instance_test.rs | 184 ++++++----- src/frontend/src/tests/test_util.rs | 4 +- src/meta-srv/src/mocks.rs | 6 +- src/query/src/datafusion.rs | 3 +- src/query/src/datafusion/error.rs | 11 +- src/query/src/optimizer.rs | 4 +- src/query/src/tests/mean_test.rs | 17 +- src/script/src/python/pyo3/builtins.rs | 12 +- src/script/src/python/rspython/builtins.rs | 5 +- .../common/system/information_schema.result | 48 +++ .../common/system/information_schema.sql | 24 ++ tests/runner/src/env.rs | 4 +- 29 files changed, 647 insertions(+), 489 deletions(-) create mode 100644 src/catalog/src/datafusion.rs rename src/{query => catalog}/src/datafusion/catalog_adapter.rs (89%) create mode 100644 src/catalog/src/information_schema.rs create mode 100644 src/catalog/src/information_schema/tables.rs create mode 100644 tests/cases/standalone/common/system/information_schema.result create mode 100644 tests/cases/standalone/common/system/information_schema.sql diff --git a/Cargo.lock b/Cargo.lock index 0e131015eb..4d0667b95f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1136,6 +1136,7 @@ version = "0.1.1" dependencies = [ "api", "arc-swap", + "arrow-schema", "async-stream", "async-trait", "backoff", @@ -2209,8 +2210,8 @@ dependencies = [ [[package]] name = "datafusion" -version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a" +version = "21.1.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a" dependencies = [ "ahash 0.8.3", "arrow", @@ -2256,8 +2257,8 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a" +version = "21.1.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a" dependencies = [ "arrow", "arrow-array", @@ -2270,8 +2271,8 @@ dependencies = [ [[package]] name = "datafusion-execution" -version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a" +version = "21.1.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a" dependencies = [ "dashmap", "datafusion-common", @@ -2287,8 +2288,8 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a" +version = "21.1.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a" dependencies = [ "ahash 0.8.3", "arrow", @@ -2298,8 +2299,8 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a" +version = "21.1.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a" dependencies = [ "arrow", "async-trait", @@ -2315,11 +2316,12 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a" +version = "21.1.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a" dependencies = [ "ahash 0.8.3", "arrow", + "arrow-array", "arrow-buffer", "arrow-schema", "blake2", @@ -2345,8 +2347,8 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a" +version = "21.1.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a" dependencies = [ "arrow", "datafusion-common", @@ -2356,9 +2358,10 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a" +version = "21.1.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a" dependencies = [ + "arrow", "arrow-schema", "datafusion-common", "datafusion-expr", diff --git a/Cargo.toml b/Cargo.toml index 07ad3cee8e..b10bc70733 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,12 +58,12 @@ arrow-schema = { version = "36.0", features = ["serde"] } async-stream = "0.3" async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" } -datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" } -datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" } -datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" } -datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" } +datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" } +datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" } futures = "0.3" futures-util = "0.3" parquet = "36.0" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 8492ab44ef..2bc1000187 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] api = { path = "../api" } arc-swap = "1.0" +arrow-schema.workspace = true async-stream.workspace = true async-trait = "0.1" backoff = { version = "0.4", features = ["tokio"] } diff --git a/src/catalog/src/datafusion.rs b/src/catalog/src/datafusion.rs new file mode 100644 index 0000000000..f54699455d --- /dev/null +++ b/src/catalog/src/datafusion.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod catalog_adapter; diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/catalog/src/datafusion/catalog_adapter.rs similarity index 89% rename from src/query/src/datafusion/catalog_adapter.rs rename to src/catalog/src/datafusion/catalog_adapter.rs index a2e7ff5273..c83d5f1879 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/catalog/src/datafusion/catalog_adapter.rs @@ -18,10 +18,6 @@ use std::any::Any; use std::sync::Arc; use async_trait::async_trait; -use catalog::error::{self as catalog_error, Error}; -use catalog::{ - CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef, -}; use common_error::prelude::BoxedError; use datafusion::catalog::catalog::{ CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider, @@ -33,7 +29,10 @@ use snafu::ResultExt; use table::table::adapter::{DfTableProviderAdapter, TableAdapter}; use table::TableRef; -use crate::datafusion::error; +use crate::error::{self, Result, SchemaProviderOperationSnafu}; +use crate::{ + CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef, +}; pub struct DfCatalogListAdapter { catalog_list: CatalogListRef, @@ -89,7 +88,7 @@ impl CatalogProvider for CatalogProviderAdapter { self } - fn schema_names(&self) -> catalog::error::Result> { + fn schema_names(&self) -> Result> { Ok(self.df_catalog_provider.schema_names()) } @@ -97,11 +96,11 @@ impl CatalogProvider for CatalogProviderAdapter { &self, _name: String, _schema: SchemaProviderRef, - ) -> catalog::error::Result> { + ) -> Result> { todo!("register_schema is not supported in Datafusion catalog provider") } - fn schema(&self, name: &str) -> catalog::error::Result>> { + fn schema(&self, name: &str) -> Result>> { Ok(self .df_catalog_provider .schema(name) @@ -196,11 +195,11 @@ impl SchemaProvider for SchemaProviderAdapter { } /// Retrieves the list of available table names in this schema. - fn table_names(&self) -> Result, Error> { + fn table_names(&self) -> Result> { Ok(self.df_schema_provider.table_names()) } - async fn table(&self, name: &str) -> Result, Error> { + async fn table(&self, name: &str) -> Result> { let table = self.df_schema_provider.table(name).await; let table = table.map(|table_provider| { match table_provider @@ -219,11 +218,7 @@ impl SchemaProvider for SchemaProviderAdapter { Ok(table) } - fn register_table( - &self, - name: String, - table: TableRef, - ) -> catalog::error::Result> { + fn register_table(&self, name: String, table: TableRef) -> Result> { let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone())); Ok(self .df_schema_provider @@ -232,43 +227,43 @@ impl SchemaProvider for SchemaProviderAdapter { msg: "Fail to register table to datafusion", }) .map_err(BoxedError::new) - .context(catalog_error::SchemaProviderOperationSnafu)? + .context(SchemaProviderOperationSnafu)? .map(|_| table)) } - fn rename_table(&self, _name: &str, _new_name: String) -> catalog_error::Result { + fn rename_table(&self, _name: &str, _new_name: String) -> Result { todo!() } - fn deregister_table(&self, name: &str) -> catalog::error::Result> { + fn deregister_table(&self, name: &str) -> Result> { self.df_schema_provider .deregister_table(name) .context(error::DatafusionSnafu { msg: "Fail to deregister table from datafusion", }) .map_err(BoxedError::new) - .context(catalog_error::SchemaProviderOperationSnafu)? + .context(SchemaProviderOperationSnafu)? .map(|table| { let adapter = TableAdapter::new(table) .context(error::TableSchemaMismatchSnafu) .map_err(BoxedError::new) - .context(catalog_error::SchemaProviderOperationSnafu)?; + .context(SchemaProviderOperationSnafu)?; Ok(Arc::new(adapter) as _) }) .transpose() } - fn table_exist(&self, name: &str) -> Result { + fn table_exist(&self, name: &str) -> Result { Ok(self.df_schema_provider.table_exist(name)) } } #[cfg(test)] mod tests { - use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; use table::table::numbers::NumbersTable; use super::*; + use crate::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; #[test] #[should_panic] diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index c9c46f2068..36c1e9de39 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -117,6 +117,9 @@ pub enum Error { location: Location, }, + #[snafu(display("Operation {} not supported", op))] + NotSupported { op: String, location: Location }, + #[snafu(display("Failed to open table, table info: {}, source: {}", table_info, source))] OpenTable { table_info: String, @@ -136,6 +139,12 @@ pub enum Error { source: common_recordbatch::error::Error, }, + #[snafu(display("Failed to create recordbatch, source: {}", source))] + CreateRecordBatch { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + #[snafu(display( "Failed to insert table creation record to system catalog, source: {}", source @@ -226,6 +235,19 @@ pub enum Error { #[snafu(display("Invalid system table definition: {err_msg}"))] InvalidSystemTableDef { err_msg: String, location: Location }, + + #[snafu(display("{}: {}", msg, source))] + Datafusion { + msg: String, + source: DataFusionError, + location: Location, + }, + + #[snafu(display("Table schema mismatch, source: {}", source))] + TableSchemaMismatch { + #[snafu(backtrace)] + source: table::error::Error, + }, } pub type Result = std::result::Result; @@ -247,7 +269,9 @@ impl ErrorExt for Error { Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal, - Error::ReadSystemCatalog { source, .. } => source.status_code(), + Error::ReadSystemCatalog { source, .. } | Error::CreateRecordBatch { source } => { + source.status_code() + } Error::InvalidCatalogValue { source, .. } | Error::CatalogEntrySerde { source } => { source.status_code() } @@ -264,7 +288,8 @@ impl ErrorExt for Error { | Error::OpenTable { source, .. } | Error::CreateTable { source, .. } | Error::DeregisterTable { source, .. } - | Error::RegionStats { source, .. } => source.status_code(), + | Error::RegionStats { source, .. } + | Error::TableSchemaMismatch { source } => source.status_code(), Error::MetaSrv { source, .. } => source.status_code(), Error::SystemCatalogTableScan { source } => source.status_code(), @@ -274,8 +299,9 @@ impl ErrorExt for Error { source.status_code() } - Error::Unimplemented { .. } => StatusCode::Unsupported, + Error::Unimplemented { .. } | Error::NotSupported { .. } => StatusCode::Unsupported, Error::QueryAccessDenied { .. } => StatusCode::AccessDenied, + Error::Datafusion { .. } => StatusCode::EngineExecuteQuery, } } diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs new file mode 100644 index 0000000000..f4d70ec6b9 --- /dev/null +++ b/src/catalog/src/information_schema.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod tables; + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::datasource::streaming::{PartitionStream, StreamingTable}; +use snafu::ResultExt; +use table::table::adapter::TableAdapter; +use table::TableRef; + +use crate::error::{DatafusionSnafu, Result, TableSchemaMismatchSnafu}; +use crate::information_schema::tables::InformationSchemaTables; +use crate::{CatalogProviderRef, SchemaProvider}; + +const TABLES: &str = "tables"; + +pub(crate) struct InformationSchemaProvider { + catalog_name: String, + catalog_provider: CatalogProviderRef, +} + +impl InformationSchemaProvider { + pub(crate) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self { + Self { + catalog_name, + catalog_provider, + } + } +} + +#[async_trait] +impl SchemaProvider for InformationSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Result> { + Ok(vec![TABLES.to_string()]) + } + + async fn table(&self, name: &str) -> Result> { + let table = if name.eq_ignore_ascii_case(TABLES) { + Arc::new(InformationSchemaTables::new( + self.catalog_name.clone(), + self.catalog_provider.clone(), + )) + } else { + return Ok(None); + }; + + let table = Arc::new( + StreamingTable::try_new(table.schema().clone(), vec![table]).with_context(|_| { + DatafusionSnafu { + msg: format!("Failed to get InformationSchema table '{name}'"), + } + })?, + ); + let table = TableAdapter::new(table).context(TableSchemaMismatchSnafu)?; + Ok(Some(Arc::new(table))) + } + + fn table_exist(&self, name: &str) -> Result { + Ok(matches!(name.to_ascii_lowercase().as_str(), TABLES)) + } +} diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs new file mode 100644 index 0000000000..311964c5aa --- /dev/null +++ b/src/catalog/src/information_schema/tables.rs @@ -0,0 +1,165 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_catalog::consts::INFORMATION_SCHEMA_NAME; +use common_query::physical_plan::TaskContext; +use common_recordbatch::RecordBatch; +use datafusion::datasource::streaming::PartitionStream as DfPartitionStream; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::vectors::StringVectorBuilder; +use snafu::ResultExt; +use table::metadata::TableType; + +use crate::error::{CreateRecordBatchSnafu, Result}; +use crate::information_schema::TABLES; +use crate::CatalogProviderRef; + +pub(super) struct InformationSchemaTables { + schema: SchemaRef, + catalog_name: String, + catalog_provider: CatalogProviderRef, +} + +impl InformationSchemaTables { + pub(super) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("table_type", ConcreteDataType::string_datatype(), false), + ])); + Self { + schema, + catalog_name, + catalog_provider, + } + } + + fn builder(&self) -> InformationSchemaTablesBuilder { + InformationSchemaTablesBuilder::new( + self.schema.clone(), + self.catalog_name.clone(), + self.catalog_provider.clone(), + ) + } +} + +/// Builds the `information_schema.TABLE` table row by row +/// +/// Columns are based on +struct InformationSchemaTablesBuilder { + schema: SchemaRef, + catalog_name: String, + catalog_provider: CatalogProviderRef, + + catalog_names: StringVectorBuilder, + schema_names: StringVectorBuilder, + table_names: StringVectorBuilder, + table_types: StringVectorBuilder, +} + +impl InformationSchemaTablesBuilder { + fn new(schema: SchemaRef, catalog_name: String, catalog_provider: CatalogProviderRef) -> Self { + Self { + schema, + catalog_name, + catalog_provider, + catalog_names: StringVectorBuilder::with_capacity(42), + schema_names: StringVectorBuilder::with_capacity(42), + table_names: StringVectorBuilder::with_capacity(42), + table_types: StringVectorBuilder::with_capacity(42), + } + } + + /// Construct the `information_schema.tables` virtual table + async fn make_tables(&mut self) -> Result { + let catalog_name = self.catalog_name.clone(); + + for schema_name in self.catalog_provider.schema_names()? { + if schema_name == INFORMATION_SCHEMA_NAME { + continue; + } + + let Some(schema) = self.catalog_provider.schema(&schema_name)? else { continue }; + for table_name in schema.table_names()? { + let Some(table) = schema.table(&table_name).await? else { continue }; + self.add_table(&catalog_name, &schema_name, &table_name, table.table_type()); + } + } + + // Add a final list for the information schema tables themselves + self.add_table( + &catalog_name, + INFORMATION_SCHEMA_NAME, + TABLES, + TableType::View, + ); + + self.finish() + } + + fn add_table( + &mut self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + table_type: TableType, + ) { + self.catalog_names.push(Some(catalog_name)); + self.schema_names.push(Some(schema_name)); + self.table_names.push(Some(table_name)); + self.table_types.push(Some(match table_type { + TableType::Base => "BASE TABLE", + TableType::View => "VIEW", + TableType::Temporary => "LOCAL TEMPORARY", + })); + } + + fn finish(&mut self) -> Result { + let columns: Vec = vec![ + Arc::new(self.catalog_names.finish()), + Arc::new(self.schema_names.finish()), + Arc::new(self.table_names.finish()), + Arc::new(self.table_types.finish()), + ]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} + +impl DfPartitionStream for InformationSchemaTables { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_tables() + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 84f70044c3..41d640049d 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -29,8 +29,10 @@ use table::TableRef; use crate::error::{CreateTableSnafu, Result}; pub use crate::schema::{SchemaProvider, SchemaProviderRef}; +pub mod datafusion; pub mod error; pub mod helper; +pub(crate) mod information_schema; pub mod local; pub mod remote; pub mod schema; diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index ed3d528f45..e546d90c75 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -74,7 +74,7 @@ impl LocalCatalogManager { })?; let table = SystemCatalogTable::new(engine.clone()).await?; let memory_catalog_list = crate::local::memory::new_memory_catalog_list()?; - let system_catalog = Arc::new(SystemCatalog::new(table, memory_catalog_list.clone())); + let system_catalog = Arc::new(SystemCatalog::new(table)); Ok(Self { system: system_catalog, catalogs: memory_catalog_list, @@ -305,9 +305,7 @@ impl CatalogList for LocalCatalogManager { } fn catalog_names(&self) -> Result> { - let mut res = self.catalogs.catalog_names()?; - res.push(SYSTEM_CATALOG_NAME.to_string()); - Ok(res) + self.catalogs.catalog_names() } fn catalog(&self, name: &str) -> Result> { diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 5279a66ce3..5acb9b87b5 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -189,7 +189,9 @@ impl RemoteCatalogManager { let max_table_id = MIN_USER_TABLE_ID - 1; // initiate default catalog and schema - let default_catalog = self.initiate_default_catalog().await?; + let default_catalog = self + .create_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) + .await?; res.insert(DEFAULT_CATALOG_NAME.to_string(), default_catalog); info!("Default catalog and schema registered"); @@ -269,13 +271,19 @@ impl RemoteCatalogManager { Ok(()) } - async fn initiate_default_catalog(&self) -> Result { - let default_catalog = self.new_catalog_provider(DEFAULT_CATALOG_NAME); - let default_schema = self.new_schema_provider(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); - default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone())?; + pub async fn create_catalog_and_schema( + &self, + catalog_name: &str, + schema_name: &str, + ) -> Result { + let schema_provider = self.new_schema_provider(catalog_name, schema_name); + + let catalog_provider = self.new_catalog_provider(catalog_name); + catalog_provider.register_schema(schema_name.to_string(), schema_provider.clone())?; + let schema_key = SchemaKey { - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - catalog_name: DEFAULT_CATALOG_NAME.to_string(), + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), } .to_string(); self.backend @@ -286,10 +294,10 @@ impl RemoteCatalogManager { .context(InvalidCatalogValueSnafu)?, ) .await?; - info!("Registered default schema"); + info!("Created schema '{schema_key}'"); let catalog_key = CatalogKey { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), + catalog_name: catalog_name.to_string(), } .to_string(); self.backend @@ -300,8 +308,8 @@ impl RemoteCatalogManager { .context(InvalidCatalogValueSnafu)?, ) .await?; - info!("Registered default catalog"); - Ok(default_catalog) + info!("Created catalog '{catalog_key}"); + Ok(catalog_provider) } async fn open_or_create_table( diff --git a/src/catalog/src/schema.rs b/src/catalog/src/schema.rs index 1c9dd11744..9dcf329657 100644 --- a/src/catalog/src/schema.rs +++ b/src/catalog/src/schema.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; use table::TableRef; -use crate::error::Result; +use crate::error::{NotSupportedSnafu, Result}; /// Represents a schema, comprising a number of named tables. #[async_trait] @@ -35,15 +35,30 @@ pub trait SchemaProvider: Sync + Send { /// If supported by the implementation, adds a new table to this schema. /// If a table of the same name existed before, it returns "Table already exists" error. - fn register_table(&self, name: String, table: TableRef) -> Result>; + fn register_table(&self, name: String, _table: TableRef) -> Result> { + NotSupportedSnafu { + op: format!("register_table({name}, )"), + } + .fail() + } /// If supported by the implementation, renames an existing table from this schema and returns it. /// If no table of that name exists, returns "Table not found" error. - fn rename_table(&self, name: &str, new_name: String) -> Result; + fn rename_table(&self, name: &str, new_name: String) -> Result { + NotSupportedSnafu { + op: format!("rename_table({name}, {new_name})"), + } + .fail() + } /// If supported by the implementation, removes an existing table from this schema and returns it. /// If no table of that name exists, returns Ok(None). - fn deregister_table(&self, name: &str) -> Result>; + fn deregister_table(&self, name: &str) -> Result> { + NotSupportedSnafu { + op: format!("deregister_table({name})"), + } + .fail() + } /// If supported by the implementation, checks the table exist in the schema provider or not. /// If no matched table in the schema provider, return false. diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index 0a7c9a5ca0..81a0840a4c 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use common_catalog::consts::INFORMATION_SCHEMA_NAME; use common_catalog::format_full_table_name; use datafusion::common::{ResolvedTableReference, TableReference}; use datafusion::datasource::provider_as_source; @@ -26,6 +27,7 @@ use table::table::adapter::DfTableProviderAdapter; use crate::error::{ CatalogNotFoundSnafu, QueryAccessDeniedSnafu, Result, SchemaNotFoundSnafu, TableNotExistSnafu, }; +use crate::information_schema::InformationSchemaProvider; use crate::CatalogListRef; pub struct DfTableSourceProvider { @@ -100,14 +102,25 @@ impl DfTableSourceProvider { let schema_name = table_ref.schema.as_ref(); let table_name = table_ref.table.as_ref(); - let catalog = self - .catalog_list - .catalog(catalog_name)? - .context(CatalogNotFoundSnafu { catalog_name })?; - let schema = catalog.schema(schema_name)?.context(SchemaNotFoundSnafu { - catalog: catalog_name, - schema: schema_name, - })?; + let schema = if schema_name != INFORMATION_SCHEMA_NAME { + let catalog = self + .catalog_list + .catalog(catalog_name)? + .context(CatalogNotFoundSnafu { catalog_name })?; + catalog.schema(schema_name)?.context(SchemaNotFoundSnafu { + catalog: catalog_name, + schema: schema_name, + })? + } else { + let catalog_provider = self + .catalog_list + .catalog(catalog_name)? + .context(CatalogNotFoundSnafu { catalog_name })?; + Arc::new(InformationSchemaProvider::new( + catalog_name.to_string(), + catalog_provider, + )) + }; let table = schema .table(table_name) .await? diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 64e2c65b97..8262621309 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -15,27 +15,12 @@ // The `tables` table in system catalog keeps a record of all tables created by user. use std::any::Any; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; -use async_stream::stream; use async_trait::async_trait; use common_catalog::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME}; -use common_error::ext::BoxedError; -use common_query::logical_plan::Expr; -use common_query::physical_plan::PhysicalPlanRef; -use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream}; -use datatypes::prelude::{ConcreteDataType, DataType}; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use datatypes::value::ValueRef; -use datatypes::vectors::VectorRef; -use futures::Stream; use snafu::ResultExt; -use table::error::TablesRecordBatchSnafu; -use table::metadata::{TableId, TableInfoRef}; -use table::table::scan::SimpleTableScan; +use table::metadata::TableId; use table::{Table, TableRef}; use crate::error::{self, Error, InsertCatalogRecordSnafu, Result as CatalogResult}; @@ -43,152 +28,9 @@ use crate::system::{ build_schema_insert_request, build_table_deletion_request, build_table_insert_request, SystemCatalogTable, }; -use crate::{ - CatalogListRef, CatalogProvider, DeregisterTableRequest, SchemaProvider, SchemaProviderRef, -}; - -/// Tables holds all tables created by user. -pub struct Tables { - schema: SchemaRef, - catalogs: CatalogListRef, -} - -impl Tables { - pub fn new(catalogs: CatalogListRef) -> Self { - Self { - schema: Arc::new(build_schema_for_tables()), - catalogs, - } - } -} - -#[async_trait::async_trait] -impl Table for Tables { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn table_info(&self) -> TableInfoRef { - unreachable!("Tables does not support table_info method") - } - - async fn scan( - &self, - _projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> table::error::Result { - let catalogs = self.catalogs.clone(); - let schema_ref = self.schema.clone(); - - let stream = stream!({ - for catalog_name in catalogs - .catalog_names() - .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu)? - { - let catalog = catalogs - .catalog(&catalog_name) - .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu)? - .unwrap(); - for schema_name in catalog - .schema_names() - .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu)? - { - let schema = catalog - .schema(&schema_name) - .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu)? - .unwrap(); - let names = schema - .table_names() - .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu)?; - let mut tables = Vec::with_capacity(names.len()); - - for name in names { - let table = schema - .table(&name) - .await - .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu)? - .unwrap(); - - tables.push(table); - } - - let vec = tables_to_record_batch(&catalog_name, &schema_name, tables); - let record_batch_res = RecordBatch::new(schema_ref.clone(), vec); - yield record_batch_res; - } - } - }); - - let stream = Box::pin(TablesRecordBatchStream { - schema: self.schema.clone(), - stream: Box::pin(stream), - }); - Ok(Arc::new(SimpleTableScan::new(stream))) - } -} - -/// Convert tables info to `RecordBatch`. -fn tables_to_record_batch( - catalog_name: &str, - schema_name: &str, - tables: Vec, -) -> Vec { - let mut catalog_vec = ConcreteDataType::string_datatype().create_mutable_vector(tables.len()); - let mut schema_vec = ConcreteDataType::string_datatype().create_mutable_vector(tables.len()); - let mut table_name_vec = - ConcreteDataType::string_datatype().create_mutable_vector(tables.len()); - let mut engine_vec = ConcreteDataType::string_datatype().create_mutable_vector(tables.len()); - - for table in tables { - let name = &table.table_info().name; - let engine = &table.table_info().meta.engine; - // Safety: All these vectors are string type. - catalog_vec.push_value_ref(ValueRef::String(catalog_name)); - schema_vec.push_value_ref(ValueRef::String(schema_name)); - table_name_vec.push_value_ref(ValueRef::String(name)); - engine_vec.push_value_ref(ValueRef::String(engine)); - } - - vec![ - catalog_vec.to_vector(), - schema_vec.to_vector(), - table_name_vec.to_vector(), - engine_vec.to_vector(), - ] -} - -pub struct TablesRecordBatchStream { - schema: SchemaRef, - stream: Pin> + Send>>, -} - -impl Stream for TablesRecordBatchStream { - type Item = RecordBatchResult; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(cx) - } -} - -impl RecordBatchStream for TablesRecordBatchStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} +use crate::{CatalogProvider, DeregisterTableRequest, SchemaProvider, SchemaProviderRef}; pub struct InformationSchema { - pub tables: Arc, pub system: Arc, } @@ -199,41 +41,19 @@ impl SchemaProvider for InformationSchema { } fn table_names(&self) -> Result, Error> { - Ok(vec![ - "tables".to_string(), - SYSTEM_CATALOG_TABLE_NAME.to_string(), - ]) + Ok(vec![SYSTEM_CATALOG_TABLE_NAME.to_string()]) } async fn table(&self, name: &str) -> Result, Error> { - if name.eq_ignore_ascii_case("tables") { - Ok(Some(self.tables.clone())) - } else if name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) { + if name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) { Ok(Some(self.system.clone())) } else { Ok(None) } } - fn register_table( - &self, - _name: String, - _table: TableRef, - ) -> crate::error::Result> { - panic!("System catalog & schema does not support register table") - } - - fn rename_table(&self, _name: &str, _new_name: String) -> crate::error::Result { - unimplemented!("System catalog & schema does not support rename table") - } - - fn deregister_table(&self, _name: &str) -> crate::error::Result> { - panic!("System catalog & schema does not support deregister table") - } - fn table_exist(&self, name: &str) -> Result { - Ok(name.eq_ignore_ascii_case("tables") - || name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME)) + Ok(name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME)) } } @@ -242,9 +62,8 @@ pub struct SystemCatalog { } impl SystemCatalog { - pub fn new(system: SystemCatalogTable, catalogs: CatalogListRef) -> Self { + pub(crate) fn new(system: SystemCatalogTable) -> Self { let schema = InformationSchema { - tables: Arc::new(Tables::new(catalogs)), system: Arc::new(system), }; Self { @@ -322,107 +141,3 @@ impl CatalogProvider for SystemCatalog { } } } - -fn build_schema_for_tables() -> Schema { - let cols = vec![ - ColumnSchema::new( - "catalog".to_string(), - ConcreteDataType::string_datatype(), - false, - ), - ColumnSchema::new( - "schema".to_string(), - ConcreteDataType::string_datatype(), - false, - ), - ColumnSchema::new( - "table_name".to_string(), - ConcreteDataType::string_datatype(), - false, - ), - ColumnSchema::new( - "engine".to_string(), - ConcreteDataType::string_datatype(), - false, - ), - ]; - Schema::new(cols) -} - -#[cfg(test)] -mod tests { - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_query::physical_plan::SessionContext; - use futures_util::StreamExt; - use table::table::numbers::NumbersTable; - - use super::*; - use crate::local::memory::new_memory_catalog_list; - use crate::CatalogList; - - #[tokio::test] - async fn test_tables() { - let catalog_list = new_memory_catalog_list().unwrap(); - let schema = catalog_list - .catalog(DEFAULT_CATALOG_NAME) - .unwrap() - .unwrap() - .schema(DEFAULT_SCHEMA_NAME) - .unwrap() - .unwrap(); - schema - .register_table( - "test_table".to_string(), - Arc::new(NumbersTable::with_name(1, "test_table".to_string())), - ) - .unwrap(); - - let tables = Tables::new(catalog_list); - let tables_stream = tables.scan(None, &[], None).await.unwrap(); - let session_ctx = SessionContext::new(); - let mut tables_stream = tables_stream.execute(0, session_ctx.task_ctx()).unwrap(); - - if let Some(t) = tables_stream.next().await { - let batch = t.unwrap(); - assert_eq!(1, batch.num_rows()); - assert_eq!(4, batch.num_columns()); - assert_eq!( - ConcreteDataType::string_datatype(), - batch.column(0).data_type() - ); - assert_eq!( - ConcreteDataType::string_datatype(), - batch.column(1).data_type() - ); - assert_eq!( - ConcreteDataType::string_datatype(), - batch.column(2).data_type() - ); - assert_eq!( - ConcreteDataType::string_datatype(), - batch.column(3).data_type() - ); - assert_eq!( - "greptime", - batch.column(0).get_ref(0).as_string().unwrap().unwrap() - ); - - assert_eq!( - "public", - batch.column(1).get_ref(0).as_string().unwrap().unwrap() - ); - - assert_eq!( - "test_table", - batch.column(2).get_ref(0).as_string().unwrap().unwrap() - ); - - assert_eq!( - "test_engine", - batch.column(3).get_ref(0).as_string().unwrap().unwrap() - ); - } else { - panic!("Record batch should not be empty!") - } - } -} diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index a3956fc027..faa426d338 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -398,7 +398,6 @@ impl DFLogicalSubstraitConvertor { | LogicalPlan::CreateCatalog(_) | LogicalPlan::DropView(_) | LogicalPlan::Distinct(_) - | LogicalPlan::SetVariable(_) | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable(_) @@ -409,7 +408,8 @@ impl DFLogicalSubstraitConvertor { | LogicalPlan::Prepare(_) | LogicalPlan::Dml(_) | LogicalPlan::DescribeTable(_) - | LogicalPlan::Unnest(_) => InvalidParametersSnafu { + | LogicalPlan::Unnest(_) + | LogicalPlan::Statement(_) => InvalidParametersSnafu { reason: format!( "Trying to convert DDL/DML plan to substrait proto, plan: {plan:?}", ), diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index d54c9e72ac..af35d1d459 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -413,22 +413,6 @@ impl SchemaProvider for FrontendSchemaProvider { Ok(Some(table)) } - fn register_table( - &self, - _name: String, - _table: TableRef, - ) -> catalog::error::Result> { - unimplemented!("Frontend schema provider does not support register table") - } - - fn rename_table(&self, _name: &str, _new_name: String) -> catalog_err::Result { - unimplemented!("Frontend schema provider does not support rename table") - } - - fn deregister_table(&self, _name: &str) -> catalog::error::Result> { - unimplemented!("Frontend schema provider does not support deregister table") - } - fn table_exist(&self, name: &str) -> catalog::error::Result { Ok(self.table_names()?.contains(&name.to_string())) } diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index ccd2d9e3cd..e9e88648aa 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -20,7 +20,9 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use catalog::remote::MetaKvBackend; +use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::remote::{MetaKvBackend, RemoteCatalogManager}; +use catalog::CatalogProvider; use client::Client; use common_grpc::channel_manager::ChannelManager; use common_runtime::Builder as RuntimeBuilder; @@ -87,6 +89,20 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon let frontend_instance = Instance::try_new_standalone(dn_instance.clone()) .await .unwrap(); + + // create another catalog and schema for testing + let another_catalog = Arc::new(MemoryCatalogProvider::new()); + let _ = another_catalog + .register_schema( + "another_schema".to_string(), + Arc::new(MemorySchemaProvider::new()), + ) + .unwrap(); + let _ = dn_instance + .catalog_manager() + .register_catalog("another_catalog".to_string(), another_catalog) + .unwrap(); + dn_instance.start().await.unwrap(); MockStandaloneInstance { instance: Arc::new(frontend_instance), @@ -209,6 +225,16 @@ async fn create_distributed_datanode( ); instance.start().await.unwrap(); + // create another catalog and schema for testing + let _ = instance + .catalog_manager() + .as_any() + .downcast_ref::() + .unwrap() + .create_catalog_and_schema("another_catalog", "another_schema") + .await + .unwrap(); + ( instance, TestGuard { diff --git a/src/frontend/src/tests/instance_test.rs b/src/frontend/src/tests/instance_test.rs index e2bb4083e6..26e36dad0e 100644 --- a/src/frontend/src/tests/instance_test.rs +++ b/src/frontend/src/tests/instance_test.rs @@ -15,7 +15,7 @@ use std::env; use std::sync::Arc; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_query::Output; use common_recordbatch::util; use common_telemetry::logging; @@ -23,9 +23,9 @@ use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef}; use rstest::rstest; use rstest_reuse::apply; use servers::query_handler::sql::SqlQueryHandler; -use session::context::QueryContext; +use session::context::{QueryContext, QueryContextRef}; -use crate::error::Error; +use crate::error::{Error, Result}; use crate::instance::Instance; use crate::tests::test_util::{ both_instances_cases, check_output_stream, check_unordered_output_stream, distributed, @@ -246,8 +246,7 @@ async fn test_execute_insert_by_select(instance: Arc) { +-------+------+--------+---------------------+ | host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | | host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | -+-------+------+--------+---------------------+" - .to_string(); ++-------+------+--------+---------------------+"; check_output_stream(output, expected).await; } @@ -451,51 +450,58 @@ async fn test_rename_table(instance: Arc) { let output = execute_sql(&instance, "create database db").await; assert!(matches!(output, Output::AffectedRows(1))); - let output = execute_sql_in_db( + let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, "db")); + let output = execute_sql_with( &instance, "create table demo(host string, cpu double, memory double, ts timestamp, time index(ts))", - "db", + query_ctx.clone(), ) .await; assert!(matches!(output, Output::AffectedRows(0))); // make sure table insertion is ok before altering table name - let output = execute_sql_in_db( + let output = execute_sql_with( &instance, "insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000), ('host2', 2.2, 200, 2000)", - "db", + query_ctx.clone(), ) .await; assert!(matches!(output, Output::AffectedRows(2))); // rename table - let output = execute_sql_in_db(&instance, "alter table demo rename test_table", "db").await; + let output = execute_sql_with( + &instance, + "alter table demo rename test_table", + query_ctx.clone(), + ) + .await; assert!(matches!(output, Output::AffectedRows(0))); - let output = execute_sql_in_db(&instance, "show tables", "db").await; + let output = execute_sql_with(&instance, "show tables", query_ctx.clone()).await; let expect = "\ +------------+ | Tables | +------------+ | test_table | -+------------+\ -" - .to_string(); ++------------+"; check_output_stream(output, expect).await; - let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await; + let output = execute_sql_with( + &instance, + "select * from test_table order by ts", + query_ctx.clone(), + ) + .await; let expected = "\ +-------+-----+--------+---------------------+ | host | cpu | memory | ts | +-------+-----+--------+---------------------+ | host1 | 1.1 | 100.0 | 1970-01-01T00:00:01 | | host2 | 2.2 | 200.0 | 1970-01-01T00:00:02 | -+-------+-----+--------+---------------------+\ -" - .to_string(); ++-------+-----+--------+---------------------+"; check_output_stream(output, expected).await; - try_execute_sql_in_db(&instance, "select * from demo", "db") + try_execute_sql_with(&instance, "select * from demo", query_ctx) .await .expect_err("no table found in expect"); } @@ -510,30 +516,31 @@ async fn test_create_table_after_rename_table(instance: Arc) { // create test table let table_name = "demo"; - let output = execute_sql_in_db( + let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, "db")); + let output = execute_sql_with( &instance, &format!("create table {table_name}(host string, cpu double, memory double, ts timestamp, time index(ts))"), - "db", + query_ctx.clone(), ) .await; assert!(matches!(output, Output::AffectedRows(0))); // rename table let new_table_name = "test_table"; - let output = execute_sql_in_db( + let output = execute_sql_with( &instance, &format!("alter table {table_name} rename {new_table_name}"), - "db", + query_ctx.clone(), ) .await; assert!(matches!(output, Output::AffectedRows(0))); // create table with same name // create test table - let output = execute_sql_in_db( + let output = execute_sql_with( &instance, &format!("create table {table_name}(host string, cpu double, memory double, ts timestamp, time index(ts))"), - "db", + query_ctx.clone(), ) .await; assert!(matches!(output, Output::AffectedRows(0))); @@ -544,10 +551,8 @@ async fn test_create_table_after_rename_table(instance: Arc) { +------------+ | demo | | test_table | -+------------+\ -" - .to_string(); - let output = execute_sql_in_db(&instance, "show tables", "db").await; ++------------+"; + let output = execute_sql_with(&instance, "show tables", query_ctx).await; check_output_stream(output, expect).await; } @@ -594,9 +599,7 @@ async fn test_alter_table(instance: Arc) { | host1 | 1.1 | 100.0 | 1970-01-01T00:00:01 | | | host2 | 2.2 | 200.0 | 1970-01-01T00:00:02 | hello | | host3 | 3.3 | 300.0 | 1970-01-01T00:00:03 | | -+-------+-----+--------+---------------------+--------+\ - " - .to_string(); ++-------+-----+--------+---------------------+--------+"; check_output_stream(output, expected).await; // Drop a column @@ -611,9 +614,7 @@ async fn test_alter_table(instance: Arc) { | host1 | 1.1 | 1970-01-01T00:00:01 | | | host2 | 2.2 | 1970-01-01T00:00:02 | hello | | host3 | 3.3 | 1970-01-01T00:00:03 | | -+-------+-----+---------------------+--------+\ - " - .to_string(); ++-------+-----+---------------------+--------+"; check_output_stream(output, expected).await; // insert a new row @@ -633,9 +634,7 @@ async fn test_alter_table(instance: Arc) { | host2 | 2.2 | 1970-01-01T00:00:02 | hello | | host3 | 3.3 | 1970-01-01T00:00:03 | | | host4 | 400.0 | 1970-01-01T00:00:04 | world | -+-------+-------+---------------------+--------+\ - " - .to_string(); ++-------+-------+---------------------+--------+"; check_output_stream(output, expected).await; } @@ -676,9 +675,7 @@ async fn test_insert_with_default_value_for_type(instance: Arc, type_n +-------+-----+ | host1 | 1.1 | | host2 | 2.2 | -+-------+-----+\ - " - .to_string(); ++-------+-----+"; check_output_stream(output, expected).await; } @@ -697,42 +694,39 @@ async fn test_use_database(instance: Arc) { let output = execute_sql(&instance, "create database db1").await; assert!(matches!(output, Output::AffectedRows(1))); - let output = execute_sql_in_db( + let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, "db1")); + let output = execute_sql_with( &instance, "create table tb1(col_i32 int, ts bigint, TIME INDEX(ts))", - "db1", + query_ctx.clone(), ) .await; assert!(matches!(output, Output::AffectedRows(0))); - let output = execute_sql_in_db(&instance, "show tables", "db1").await; + let output = execute_sql_with(&instance, "show tables", query_ctx.clone()).await; let expected = "\ +--------+ | Tables | +--------+ | tb1 | -+--------+\ - " - .to_string(); ++--------+"; check_output_stream(output, expected).await; - let output = execute_sql_in_db( + let output = execute_sql_with( &instance, r#"insert into tb1(col_i32, ts) values (1, 1655276557000)"#, - "db1", + query_ctx.clone(), ) .await; assert!(matches!(output, Output::AffectedRows(1))); - let output = execute_sql_in_db(&instance, "select col_i32 from tb1", "db1").await; + let output = execute_sql_with(&instance, "select col_i32 from tb1", query_ctx.clone()).await; let expected = "\ +---------+ | col_i32 | +---------+ | 1 | -+---------+\ - " - .to_string(); ++---------+"; check_output_stream(output, expected).await; // Making a particular database the default by means of the USE statement does not preclude @@ -743,9 +737,7 @@ async fn test_use_database(instance: Arc) { | number | +--------+ | 0 | -+--------+\ - " - .to_string(); ++--------+"; check_output_stream(output, expected).await; } @@ -793,9 +785,7 @@ async fn test_delete(instance: Arc) { +-------+---------------------+------+--------+ | host2 | 2022-06-15T07:02:38 | 77.7 | 2048.0 | | host3 | 2022-06-15T07:02:39 | 88.8 | 3072.0 | -+-------+---------------------+------+--------+\ -" - .to_string(); ++-------+---------------------+------+--------+"; check_output_stream(output, expect).await; } @@ -929,34 +919,82 @@ async fn test_execute_copy_from_s3(instance: Arc) { +-------+------+--------+---------------------+ | host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | | host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | -+-------+------+--------+---------------------+" - .to_string(); ++-------+------+--------+---------------------+"; check_output_stream(output, expected).await; } } } } +#[apply(both_instances_cases)] +async fn test_information_schema(instance: Arc) { + let is_distributed_mode = instance.is_distributed_mode(); + + let instance = instance.frontend(); + + let sql = "create table another_table(i bigint time index)"; + let query_ctx = Arc::new(QueryContext::with("another_catalog", "another_schema")); + let output = execute_sql_with(&instance, sql, query_ctx.clone()).await; + assert!(matches!(output, Output::AffectedRows(0))); + + // User can only see information schema under current catalog. + // A necessary requirement to GreptimeCloud. + let sql = "select table_catalog, table_schema, table_name, table_type from information_schema.tables where table_type != 'SYSTEM VIEW' order by table_name"; + + let output = execute_sql(&instance, sql).await; + let expected = if is_distributed_mode { + "\ ++---------------+--------------------+------------+------------+ +| table_catalog | table_schema | table_name | table_type | ++---------------+--------------------+------------+------------+ +| greptime | public | scripts | BASE TABLE | +| greptime | information_schema | tables | VIEW | ++---------------+--------------------+------------+------------+" + } else { + "\ ++---------------+--------------------+------------+------------+ +| table_catalog | table_schema | table_name | table_type | ++---------------+--------------------+------------+------------+ +| greptime | public | numbers | BASE TABLE | +| greptime | public | scripts | BASE TABLE | +| greptime | information_schema | tables | VIEW | ++---------------+--------------------+------------+------------+" + }; + check_output_stream(output, expected).await; + + let output = execute_sql_with(&instance, sql, query_ctx).await; + let expected = "\ ++-----------------+--------------------+---------------+------------+ +| table_catalog | table_schema | table_name | table_type | ++-----------------+--------------------+---------------+------------+ +| another_catalog | another_schema | another_table | BASE TABLE | +| another_catalog | information_schema | tables | VIEW | ++-----------------+--------------------+---------------+------------+"; + check_output_stream(output, expected).await; +} + async fn execute_sql(instance: &Arc, sql: &str) -> Output { - execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await + execute_sql_with(instance, sql, QueryContext::arc()).await } -async fn try_execute_sql( - instance: &Arc, - sql: &str, -) -> Result { - try_execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await +async fn try_execute_sql(instance: &Arc, sql: &str) -> Result { + try_execute_sql_with(instance, sql, QueryContext::arc()).await } -async fn try_execute_sql_in_db( +async fn try_execute_sql_with( instance: &Arc, sql: &str, - db: &str, -) -> Result { - let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db)); + query_ctx: QueryContextRef, +) -> Result { instance.do_query(sql, query_ctx).await.remove(0) } -async fn execute_sql_in_db(instance: &Arc, sql: &str, db: &str) -> Output { - try_execute_sql_in_db(instance, sql, db).await.unwrap() +async fn execute_sql_with( + instance: &Arc, + sql: &str, + query_ctx: QueryContextRef, +) -> Output { + try_execute_sql_with(instance, sql, query_ctx) + .await + .unwrap() } diff --git a/src/frontend/src/tests/test_util.rs b/src/frontend/src/tests/test_util.rs index 534e09709e..c40e2752f2 100644 --- a/src/frontend/src/tests/test_util.rs +++ b/src/frontend/src/tests/test_util.rs @@ -87,14 +87,14 @@ pub(crate) fn standalone_instance_case( ) { } -pub(crate) async fn check_output_stream(output: Output, expected: String) { +pub(crate) async fn check_output_stream(output: Output, expected: &str) { let recordbatches = match output { Output::Stream(stream) => util::collect_batches(stream).await.unwrap(), Output::RecordBatches(recordbatches) => recordbatches, _ => unreachable!(), }; let pretty_print = recordbatches.pretty_print().unwrap(); - assert_eq!(pretty_print, expected, "{}", pretty_print); + assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print); } pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str) { diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index db38a15b35..fcd08c5c6a 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::router_server::RouterServer; @@ -73,7 +74,10 @@ pub async fn mock( .await }); - let config = ChannelConfig::new(); + let config = ChannelConfig::new() + .timeout(Duration::from_secs(1)) + .connect_timeout(Duration::from_secs(1)) + .tcp_nodelay(true); let channel_manager = ChannelManager::with_config(config); // Move client to an option so we can _move_ the inner value diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 8e147a26b7..f17cef3634 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -14,7 +14,6 @@ //! Planner, QueryEngine implementations based on DataFusion. -mod catalog_adapter; mod error; mod planner; @@ -22,6 +21,7 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +pub use catalog::datafusion::catalog_adapter::DfCatalogListAdapter; use common_error::prelude::BoxedError; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::udf::create_udf; @@ -44,7 +44,6 @@ use snafu::{ensure, OptionExt, ResultExt}; use table::requests::{DeleteRequest, InsertRequest}; use table::TableRef; -pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; pub use crate::datafusion::planner::DfContextProviderAdapter; use crate::error::{ CatalogNotFoundSnafu, CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index 2f52ba8b4a..569f7d4790 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -38,12 +38,6 @@ pub enum InnerError { source: datatypes::error::Error, }, - #[snafu(display("Failed to convert table schema, source: {}", source))] - TableSchemaMismatch { - #[snafu(backtrace)] - source: table::error::Error, - }, - #[snafu(display( "Failed to convert DataFusion's recordbatch stream, source: {}", source @@ -67,10 +61,7 @@ impl ErrorExt for InnerError { match self { // TODO(yingwen): Further categorize datafusion error. Datafusion { .. } => StatusCode::EngineExecuteQuery, - // This downcast should not fail in usual case. - PhysicalPlanDowncast { .. } | ConvertSchema { .. } | TableSchemaMismatch { .. } => { - StatusCode::Unexpected - } + PhysicalPlanDowncast { .. } | ConvertSchema { .. } => StatusCode::Unexpected, ConvertDfRecordBatchStream { source } => source.status_code(), ExecutePhysicalPlan { source } => source.status_code(), } diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index ef7745ccdb..a02108827e 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -93,7 +93,6 @@ impl OptimizerRule for TypeConversionRule { | LogicalPlan::DropView { .. } | LogicalPlan::Distinct { .. } | LogicalPlan::Values { .. } - | LogicalPlan::SetVariable { .. } | LogicalPlan::Analyze { .. } => { let inputs = plan.inputs(); let mut new_inputs = Vec::with_capacity(inputs.len()); @@ -120,7 +119,8 @@ impl OptimizerRule for TypeConversionRule { | LogicalPlan::Prepare(_) | LogicalPlan::Dml(_) | LogicalPlan::DescribeTable(_) - | LogicalPlan::Unnest(_) => Ok(Some(plan.clone())), + | LogicalPlan::Unnest(_) + | LogicalPlan::Statement(_) => Ok(Some(plan.clone())), } } diff --git a/src/query/src/tests/mean_test.rs b/src/query/src/tests/mean_test.rs index 5ae9a0a605..604ad33f67 100644 --- a/src/query/src/tests/mean_test.rs +++ b/src/query/src/tests/mean_test.rs @@ -18,7 +18,6 @@ use datatypes::for_all_primitive_types; use datatypes::prelude::*; use datatypes::types::WrapperType; use datatypes::value::OrderedFloat; -use format_num::NumberFormat; use num_traits::AsPrimitive; use crate::error::Result; @@ -56,14 +55,12 @@ where let numbers = function::get_numbers_from_table::(column_name, table_name, engine.clone()).await; - let expected_value = numbers.iter().map(|&n| n.as_()).collect::>(); - - let expected_value = inc_stats::mean(expected_value.iter().cloned()).unwrap(); - if let Value::Float64(OrderedFloat(value)) = value { - let num = NumberFormat::new(); - let value = num.format(".6e", value); - let expected_value = num.format(".6e", expected_value); - assert_eq!(value, expected_value); - } + let numbers = numbers.iter().map(|&n| n.as_()).collect::>(); + let expected = numbers.iter().sum::() / (numbers.len() as f64); + let Value::Float64(OrderedFloat(value)) = value else { unreachable!() }; + assert!( + (value - expected).abs() < 1e-3, + "expected {expected}, actual {value}" + ); Ok(()) } diff --git a/src/script/src/python/pyo3/builtins.rs b/src/script/src/python/pyo3/builtins.rs index 514b7e729c..8d9609421c 100644 --- a/src/script/src/python/pyo3/builtins.rs +++ b/src/script/src/python/pyo3/builtins.rs @@ -279,8 +279,7 @@ fn sqrt(py: Python<'_>, val: PyObject) -> PyResult { ``` */ bind_call_unary_math_function!( - sqrt, sin, cos, tan, asin, acos, atan, floor, ceil, round, trunc, abs, signum, exp, ln, log2, - log10 + sqrt, sin, cos, tan, asin, acos, atan, floor, ceil, trunc, abs, signum, exp, ln, log2, log10 ); /// return a random vector range from 0 to 1 and length of len @@ -296,6 +295,15 @@ fn random(py: Python<'_>, len: usize) -> PyResult { columnar_value_to_py_any(py, res) } +#[pyfunction] +fn round(py: Python<'_>, val: PyObject) -> PyResult { + let value = try_into_columnar_value(py, val)?; + let array = value.into_array(1); + let result = + math_expressions::round(&[array]).map_err(|e| PyValueError::new_err(format!("{e:?}")))?; + columnar_value_to_py_any(py, ColumnarValue::Array(result)) +} + /// The macro for binding function in `datafusion_physical_expr::expressions`(most of them are aggregate function) macro_rules! bind_aggr_expr { ($FUNC_NAME:ident, $AGGR_FUNC: ident, [$($ARG: ident),*], $ARG_TY: ident, $($EXPR:ident => $idx: literal),*) => { diff --git a/src/script/src/python/rspython/builtins.rs b/src/script/src/python/rspython/builtins.rs index bc0c5ba155..42f4ae944b 100644 --- a/src/script/src/python/rspython/builtins.rs +++ b/src/script/src/python/rspython/builtins.rs @@ -545,7 +545,10 @@ pub(crate) mod greptime_builtin { /// simple math function, the backing implement is datafusion's `round` math function #[pyfunction] fn round(val: PyObjectRef, vm: &VirtualMachine) -> PyResult { - bind_call_unary_math_function!(round, vm, val); + let value = try_into_columnar_value(val, vm)?; + let array = value.into_array(1); + let result = math_expressions::round(&[array]).map_err(|e| from_df_err(e, vm))?; + try_into_py_obj(DFColValue::Array(result), vm) } /// simple math function, the backing implement is datafusion's `trunc` math function diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result new file mode 100644 index 0000000000..e375153620 --- /dev/null +++ b/tests/cases/standalone/common/system/information_schema.result @@ -0,0 +1,48 @@ +create +database my_db; + +Affected Rows: 1 + +use +my_db; + +++ +++ + +create table foo +( + ts bigint time index +); + +Affected Rows: 0 + +select table_name +from information_schema.tables +where table_schema = 'my_db' +order by table_name; + ++------------+ +| table_name | ++------------+ +| foo | ++------------+ + +select table_catalog, table_schema, table_name, table_type +from information_schema.tables +where table_catalog = 'greptime' + and table_schema != 'public' +order by table_schema, table_name; + ++---------------+--------------------+------------+------------+ +| table_catalog | table_schema | table_name | table_type | ++---------------+--------------------+------------+------------+ +| greptime | information_schema | tables | VIEW | +| greptime | my_db | foo | BASE TABLE | ++---------------+--------------------+------------+------------+ + +use +public; + +++ +++ + diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql new file mode 100644 index 0000000000..871c01c49d --- /dev/null +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -0,0 +1,24 @@ +create +database my_db; + +use +my_db; + +create table foo +( + ts bigint time index +); + +select table_name +from information_schema.tables +where table_schema = 'my_db' +order by table_name; + +select table_catalog, table_schema, table_name, table_type +from information_schema.tables +where table_catalog = 'greptime' + and table_schema != 'public' +order by table_schema, table_name; + +use +public; diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index d3ac5b8ce2..abb60b1f72 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -169,7 +169,7 @@ impl Env { } "frontend" => { args.push("--metasrv-addr=0.0.0.0:3002".to_string()); - args.push("--http-addr=0.0.0.0:5000".to_string()); + args.push("--http-addr=0.0.0.0:5003".to_string()); } "metasrv" => { args.push("--use-memory-store".to_string()); @@ -264,7 +264,7 @@ impl Database for GreptimeDB { } let mut client = self.client.lock().await; - if query.trim().starts_with("USE ") { + if query.trim().to_lowercase().starts_with("use ") { let database = query .split_ascii_whitespace() .nth(1)