feat: information schema (#1327)

* feat: basic information schema

* show information schema only for current catalog

* fix: fragile tests
This commit is contained in:
LFC
2023-04-07 16:50:14 +08:00
committed by GitHub
parent 554a69ea54
commit a3e47955b8
29 changed files with 647 additions and 489 deletions

35
Cargo.lock generated
View File

@@ -1136,6 +1136,7 @@ version = "0.1.1"
dependencies = [
"api",
"arc-swap",
"arrow-schema",
"async-stream",
"async-trait",
"backoff",
@@ -2209,8 +2210,8 @@ dependencies = [
[[package]]
name = "datafusion"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
dependencies = [
"ahash 0.8.3",
"arrow",
@@ -2256,8 +2257,8 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
dependencies = [
"arrow",
"arrow-array",
@@ -2270,8 +2271,8 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
dependencies = [
"dashmap",
"datafusion-common",
@@ -2287,8 +2288,8 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
dependencies = [
"ahash 0.8.3",
"arrow",
@@ -2298,8 +2299,8 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
dependencies = [
"arrow",
"async-trait",
@@ -2315,11 +2316,12 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
dependencies = [
"ahash 0.8.3",
"arrow",
"arrow-array",
"arrow-buffer",
"arrow-schema",
"blake2",
@@ -2345,8 +2347,8 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
dependencies = [
"arrow",
"datafusion-common",
@@ -2356,9 +2358,10 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8e125d2ecf242b4f4b81f06839900dbb2037cc2a#8e125d2ecf242b4f4b81f06839900dbb2037cc2a"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
dependencies = [
"arrow",
"arrow-schema",
"datafusion-common",
"datafusion-expr",

View File

@@ -58,12 +58,12 @@ arrow-schema = { version = "36.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
futures = "0.3"
futures-util = "0.3"
parquet = "36.0"

View File

@@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
api = { path = "../api" }
arc-swap = "1.0"
arrow-schema.workspace = true
async-stream.workspace = true
async-trait = "0.1"
backoff = { version = "0.4", features = ["tokio"] }

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod catalog_adapter;

View File

@@ -18,10 +18,6 @@ use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use catalog::error::{self as catalog_error, Error};
use catalog::{
CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef,
};
use common_error::prelude::BoxedError;
use datafusion::catalog::catalog::{
CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider,
@@ -33,7 +29,10 @@ use snafu::ResultExt;
use table::table::adapter::{DfTableProviderAdapter, TableAdapter};
use table::TableRef;
use crate::datafusion::error;
use crate::error::{self, Result, SchemaProviderOperationSnafu};
use crate::{
CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef,
};
pub struct DfCatalogListAdapter {
catalog_list: CatalogListRef,
@@ -89,7 +88,7 @@ impl CatalogProvider for CatalogProviderAdapter {
self
}
fn schema_names(&self) -> catalog::error::Result<Vec<String>> {
fn schema_names(&self) -> Result<Vec<String>> {
Ok(self.df_catalog_provider.schema_names())
}
@@ -97,11 +96,11 @@ impl CatalogProvider for CatalogProviderAdapter {
&self,
_name: String,
_schema: SchemaProviderRef,
) -> catalog::error::Result<Option<SchemaProviderRef>> {
) -> Result<Option<SchemaProviderRef>> {
todo!("register_schema is not supported in Datafusion catalog provider")
}
fn schema(&self, name: &str) -> catalog::error::Result<Option<Arc<dyn SchemaProvider>>> {
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self
.df_catalog_provider
.schema(name)
@@ -196,11 +195,11 @@ impl SchemaProvider for SchemaProviderAdapter {
}
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Result<Vec<String>, Error> {
fn table_names(&self) -> Result<Vec<String>> {
Ok(self.df_schema_provider.table_names())
}
async fn table(&self, name: &str) -> Result<Option<TableRef>, Error> {
async fn table(&self, name: &str) -> Result<Option<TableRef>> {
let table = self.df_schema_provider.table(name).await;
let table = table.map(|table_provider| {
match table_provider
@@ -219,11 +218,7 @@ impl SchemaProvider for SchemaProviderAdapter {
Ok(table)
}
fn register_table(
&self,
name: String,
table: TableRef,
) -> catalog::error::Result<Option<TableRef>> {
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
Ok(self
.df_schema_provider
@@ -232,43 +227,43 @@ impl SchemaProvider for SchemaProviderAdapter {
msg: "Fail to register table to datafusion",
})
.map_err(BoxedError::new)
.context(catalog_error::SchemaProviderOperationSnafu)?
.context(SchemaProviderOperationSnafu)?
.map(|_| table))
}
fn rename_table(&self, _name: &str, _new_name: String) -> catalog_error::Result<TableRef> {
fn rename_table(&self, _name: &str, _new_name: String) -> Result<TableRef> {
todo!()
}
fn deregister_table(&self, name: &str) -> catalog::error::Result<Option<TableRef>> {
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
self.df_schema_provider
.deregister_table(name)
.context(error::DatafusionSnafu {
msg: "Fail to deregister table from datafusion",
})
.map_err(BoxedError::new)
.context(catalog_error::SchemaProviderOperationSnafu)?
.context(SchemaProviderOperationSnafu)?
.map(|table| {
let adapter = TableAdapter::new(table)
.context(error::TableSchemaMismatchSnafu)
.map_err(BoxedError::new)
.context(catalog_error::SchemaProviderOperationSnafu)?;
.context(SchemaProviderOperationSnafu)?;
Ok(Arc::new(adapter) as _)
})
.transpose()
}
fn table_exist(&self, name: &str) -> Result<bool, Error> {
fn table_exist(&self, name: &str) -> Result<bool> {
Ok(self.df_schema_provider.table_exist(name))
}
}
#[cfg(test)]
mod tests {
use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider};
use table::table::numbers::NumbersTable;
use super::*;
use crate::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider};
#[test]
#[should_panic]

View File

@@ -117,6 +117,9 @@ pub enum Error {
location: Location,
},
#[snafu(display("Operation {} not supported", op))]
NotSupported { op: String, location: Location },
#[snafu(display("Failed to open table, table info: {}, source: {}", table_info, source))]
OpenTable {
table_info: String,
@@ -136,6 +139,12 @@ pub enum Error {
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to create recordbatch, source: {}", source))]
CreateRecordBatch {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display(
"Failed to insert table creation record to system catalog, source: {}",
source
@@ -226,6 +235,19 @@ pub enum Error {
#[snafu(display("Invalid system table definition: {err_msg}"))]
InvalidSystemTableDef { err_msg: String, location: Location },
#[snafu(display("{}: {}", msg, source))]
Datafusion {
msg: String,
source: DataFusionError,
location: Location,
},
#[snafu(display("Table schema mismatch, source: {}", source))]
TableSchemaMismatch {
#[snafu(backtrace)]
source: table::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -247,7 +269,9 @@ impl ErrorExt for Error {
Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal,
Error::ReadSystemCatalog { source, .. } => source.status_code(),
Error::ReadSystemCatalog { source, .. } | Error::CreateRecordBatch { source } => {
source.status_code()
}
Error::InvalidCatalogValue { source, .. } | Error::CatalogEntrySerde { source } => {
source.status_code()
}
@@ -264,7 +288,8 @@ impl ErrorExt for Error {
| Error::OpenTable { source, .. }
| Error::CreateTable { source, .. }
| Error::DeregisterTable { source, .. }
| Error::RegionStats { source, .. } => source.status_code(),
| Error::RegionStats { source, .. }
| Error::TableSchemaMismatch { source } => source.status_code(),
Error::MetaSrv { source, .. } => source.status_code(),
Error::SystemCatalogTableScan { source } => source.status_code(),
@@ -274,8 +299,9 @@ impl ErrorExt for Error {
source.status_code()
}
Error::Unimplemented { .. } => StatusCode::Unsupported,
Error::Unimplemented { .. } | Error::NotSupported { .. } => StatusCode::Unsupported,
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
}
}

View File

@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod tables;
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::datasource::streaming::{PartitionStream, StreamingTable};
use snafu::ResultExt;
use table::table::adapter::TableAdapter;
use table::TableRef;
use crate::error::{DatafusionSnafu, Result, TableSchemaMismatchSnafu};
use crate::information_schema::tables::InformationSchemaTables;
use crate::{CatalogProviderRef, SchemaProvider};
const TABLES: &str = "tables";
pub(crate) struct InformationSchemaProvider {
catalog_name: String,
catalog_provider: CatalogProviderRef,
}
impl InformationSchemaProvider {
pub(crate) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
Self {
catalog_name,
catalog_provider,
}
}
}
#[async_trait]
impl SchemaProvider for InformationSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Result<Vec<String>> {
Ok(vec![TABLES.to_string()])
}
async fn table(&self, name: &str) -> Result<Option<TableRef>> {
let table = if name.eq_ignore_ascii_case(TABLES) {
Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
))
} else {
return Ok(None);
};
let table = Arc::new(
StreamingTable::try_new(table.schema().clone(), vec![table]).with_context(|_| {
DatafusionSnafu {
msg: format!("Failed to get InformationSchema table '{name}'"),
}
})?,
);
let table = TableAdapter::new(table).context(TableSchemaMismatchSnafu)?;
Ok(Some(Arc::new(table)))
}
fn table_exist(&self, name: &str) -> Result<bool> {
Ok(matches!(name.to_ascii_lowercase().as_str(), TABLES))
}
}

View File

@@ -0,0 +1,165 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_query::physical_plan::TaskContext;
use common_recordbatch::RecordBatch;
use datafusion::datasource::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::StringVectorBuilder;
use snafu::ResultExt;
use table::metadata::TableType;
use crate::error::{CreateRecordBatchSnafu, Result};
use crate::information_schema::TABLES;
use crate::CatalogProviderRef;
pub(super) struct InformationSchemaTables {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
}
impl InformationSchemaTables {
pub(super) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_type", ConcreteDataType::string_datatype(), false),
]));
Self {
schema,
catalog_name,
catalog_provider,
}
}
fn builder(&self) -> InformationSchemaTablesBuilder {
InformationSchemaTablesBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_provider.clone(),
)
}
}
/// Builds the `information_schema.TABLE` table row by row
///
/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
struct InformationSchemaTablesBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
table_names: StringVectorBuilder,
table_types: StringVectorBuilder,
}
impl InformationSchemaTablesBuilder {
fn new(schema: SchemaRef, catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
Self {
schema,
catalog_name,
catalog_provider,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
table_names: StringVectorBuilder::with_capacity(42),
table_types: StringVectorBuilder::with_capacity(42),
}
}
/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
for schema_name in self.catalog_provider.schema_names()? {
if schema_name == INFORMATION_SCHEMA_NAME {
continue;
}
let Some(schema) = self.catalog_provider.schema(&schema_name)? else { continue };
for table_name in schema.table_names()? {
let Some(table) = schema.table(&table_name).await? else { continue };
self.add_table(&catalog_name, &schema_name, &table_name, table.table_type());
}
}
// Add a final list for the information schema tables themselves
self.add_table(
&catalog_name,
INFORMATION_SCHEMA_NAME,
TABLES,
TableType::View,
);
self.finish()
}
fn add_table(
&mut self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
table_type: TableType,
) {
self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
self.table_types.push(Some(match table_type {
TableType::Base => "BASE TABLE",
TableType::View => "VIEW",
TableType::Temporary => "LOCAL TEMPORARY",
}));
}
fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.catalog_names.finish()),
Arc::new(self.schema_names.finish()),
Arc::new(self.table_names.finish()),
Arc::new(self.table_types.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
impl DfPartitionStream for InformationSchemaTables {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_tables()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}

View File

@@ -29,8 +29,10 @@ use table::TableRef;
use crate::error::{CreateTableSnafu, Result};
pub use crate::schema::{SchemaProvider, SchemaProviderRef};
pub mod datafusion;
pub mod error;
pub mod helper;
pub(crate) mod information_schema;
pub mod local;
pub mod remote;
pub mod schema;

View File

@@ -74,7 +74,7 @@ impl LocalCatalogManager {
})?;
let table = SystemCatalogTable::new(engine.clone()).await?;
let memory_catalog_list = crate::local::memory::new_memory_catalog_list()?;
let system_catalog = Arc::new(SystemCatalog::new(table, memory_catalog_list.clone()));
let system_catalog = Arc::new(SystemCatalog::new(table));
Ok(Self {
system: system_catalog,
catalogs: memory_catalog_list,
@@ -305,9 +305,7 @@ impl CatalogList for LocalCatalogManager {
}
fn catalog_names(&self) -> Result<Vec<String>> {
let mut res = self.catalogs.catalog_names()?;
res.push(SYSTEM_CATALOG_NAME.to_string());
Ok(res)
self.catalogs.catalog_names()
}
fn catalog(&self, name: &str) -> Result<Option<CatalogProviderRef>> {

View File

@@ -189,7 +189,9 @@ impl RemoteCatalogManager {
let max_table_id = MIN_USER_TABLE_ID - 1;
// initiate default catalog and schema
let default_catalog = self.initiate_default_catalog().await?;
let default_catalog = self
.create_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await?;
res.insert(DEFAULT_CATALOG_NAME.to_string(), default_catalog);
info!("Default catalog and schema registered");
@@ -269,13 +271,19 @@ impl RemoteCatalogManager {
Ok(())
}
async fn initiate_default_catalog(&self) -> Result<CatalogProviderRef> {
let default_catalog = self.new_catalog_provider(DEFAULT_CATALOG_NAME);
let default_schema = self.new_schema_provider(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone())?;
pub async fn create_catalog_and_schema(
&self,
catalog_name: &str,
schema_name: &str,
) -> Result<CatalogProviderRef> {
let schema_provider = self.new_schema_provider(catalog_name, schema_name);
let catalog_provider = self.new_catalog_provider(catalog_name);
catalog_provider.register_schema(schema_name.to_string(), schema_provider.clone())?;
let schema_key = SchemaKey {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
}
.to_string();
self.backend
@@ -286,10 +294,10 @@ impl RemoteCatalogManager {
.context(InvalidCatalogValueSnafu)?,
)
.await?;
info!("Registered default schema");
info!("Created schema '{schema_key}'");
let catalog_key = CatalogKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
catalog_name: catalog_name.to_string(),
}
.to_string();
self.backend
@@ -300,8 +308,8 @@ impl RemoteCatalogManager {
.context(InvalidCatalogValueSnafu)?,
)
.await?;
info!("Registered default catalog");
Ok(default_catalog)
info!("Created catalog '{catalog_key}");
Ok(catalog_provider)
}
async fn open_or_create_table(

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use table::TableRef;
use crate::error::Result;
use crate::error::{NotSupportedSnafu, Result};
/// Represents a schema, comprising a number of named tables.
#[async_trait]
@@ -35,15 +35,30 @@ pub trait SchemaProvider: Sync + Send {
/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>>;
fn register_table(&self, name: String, _table: TableRef) -> Result<Option<TableRef>> {
NotSupportedSnafu {
op: format!("register_table({name}, <table>)"),
}
.fail()
}
/// If supported by the implementation, renames an existing table from this schema and returns it.
/// If no table of that name exists, returns "Table not found" error.
fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef>;
fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
NotSupportedSnafu {
op: format!("rename_table({name}, {new_name})"),
}
.fail()
}
/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>>;
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
NotSupportedSnafu {
op: format!("deregister_table({name})"),
}
.fail()
}
/// If supported by the implementation, checks the table exist in the schema provider or not.
/// If no matched table in the schema provider, return false.

View File

@@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_catalog::format_full_table_name;
use datafusion::common::{ResolvedTableReference, TableReference};
use datafusion::datasource::provider_as_source;
@@ -26,6 +27,7 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
CatalogNotFoundSnafu, QueryAccessDeniedSnafu, Result, SchemaNotFoundSnafu, TableNotExistSnafu,
};
use crate::information_schema::InformationSchemaProvider;
use crate::CatalogListRef;
pub struct DfTableSourceProvider {
@@ -100,14 +102,25 @@ impl DfTableSourceProvider {
let schema_name = table_ref.schema.as_ref();
let table_name = table_ref.table.as_ref();
let catalog = self
.catalog_list
.catalog(catalog_name)?
.context(CatalogNotFoundSnafu { catalog_name })?;
let schema = catalog.schema(schema_name)?.context(SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?;
let schema = if schema_name != INFORMATION_SCHEMA_NAME {
let catalog = self
.catalog_list
.catalog(catalog_name)?
.context(CatalogNotFoundSnafu { catalog_name })?;
catalog.schema(schema_name)?.context(SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?
} else {
let catalog_provider = self
.catalog_list
.catalog(catalog_name)?
.context(CatalogNotFoundSnafu { catalog_name })?;
Arc::new(InformationSchemaProvider::new(
catalog_name.to_string(),
catalog_provider,
))
};
let table = schema
.table(table_name)
.await?

View File

@@ -15,27 +15,12 @@
// The `tables` table in system catalog keeps a record of all tables created by user.
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use async_stream::stream;
use async_trait::async_trait;
use common_catalog::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME};
use common_error::ext::BoxedError;
use common_query::logical_plan::Expr;
use common_query::physical_plan::PhysicalPlanRef;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream};
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::ValueRef;
use datatypes::vectors::VectorRef;
use futures::Stream;
use snafu::ResultExt;
use table::error::TablesRecordBatchSnafu;
use table::metadata::{TableId, TableInfoRef};
use table::table::scan::SimpleTableScan;
use table::metadata::TableId;
use table::{Table, TableRef};
use crate::error::{self, Error, InsertCatalogRecordSnafu, Result as CatalogResult};
@@ -43,152 +28,9 @@ use crate::system::{
build_schema_insert_request, build_table_deletion_request, build_table_insert_request,
SystemCatalogTable,
};
use crate::{
CatalogListRef, CatalogProvider, DeregisterTableRequest, SchemaProvider, SchemaProviderRef,
};
/// Tables holds all tables created by user.
pub struct Tables {
schema: SchemaRef,
catalogs: CatalogListRef,
}
impl Tables {
pub fn new(catalogs: CatalogListRef) -> Self {
Self {
schema: Arc::new(build_schema_for_tables()),
catalogs,
}
}
}
#[async_trait::async_trait]
impl Table for Tables {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
unreachable!("Tables does not support table_info method")
}
async fn scan(
&self,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> table::error::Result<PhysicalPlanRef> {
let catalogs = self.catalogs.clone();
let schema_ref = self.schema.clone();
let stream = stream!({
for catalog_name in catalogs
.catalog_names()
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
{
let catalog = catalogs
.catalog(&catalog_name)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
.unwrap();
for schema_name in catalog
.schema_names()
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
{
let schema = catalog
.schema(&schema_name)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
.unwrap();
let names = schema
.table_names()
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?;
let mut tables = Vec::with_capacity(names.len());
for name in names {
let table = schema
.table(&name)
.await
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
.unwrap();
tables.push(table);
}
let vec = tables_to_record_batch(&catalog_name, &schema_name, tables);
let record_batch_res = RecordBatch::new(schema_ref.clone(), vec);
yield record_batch_res;
}
}
});
let stream = Box::pin(TablesRecordBatchStream {
schema: self.schema.clone(),
stream: Box::pin(stream),
});
Ok(Arc::new(SimpleTableScan::new(stream)))
}
}
/// Convert tables info to `RecordBatch`.
fn tables_to_record_batch(
catalog_name: &str,
schema_name: &str,
tables: Vec<TableRef>,
) -> Vec<VectorRef> {
let mut catalog_vec = ConcreteDataType::string_datatype().create_mutable_vector(tables.len());
let mut schema_vec = ConcreteDataType::string_datatype().create_mutable_vector(tables.len());
let mut table_name_vec =
ConcreteDataType::string_datatype().create_mutable_vector(tables.len());
let mut engine_vec = ConcreteDataType::string_datatype().create_mutable_vector(tables.len());
for table in tables {
let name = &table.table_info().name;
let engine = &table.table_info().meta.engine;
// Safety: All these vectors are string type.
catalog_vec.push_value_ref(ValueRef::String(catalog_name));
schema_vec.push_value_ref(ValueRef::String(schema_name));
table_name_vec.push_value_ref(ValueRef::String(name));
engine_vec.push_value_ref(ValueRef::String(engine));
}
vec![
catalog_vec.to_vector(),
schema_vec.to_vector(),
table_name_vec.to_vector(),
engine_vec.to_vector(),
]
}
pub struct TablesRecordBatchStream {
schema: SchemaRef,
stream: Pin<Box<dyn Stream<Item = RecordBatchResult<RecordBatch>> + Send>>,
}
impl Stream for TablesRecordBatchStream {
type Item = RecordBatchResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}
impl RecordBatchStream for TablesRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
use crate::{CatalogProvider, DeregisterTableRequest, SchemaProvider, SchemaProviderRef};
pub struct InformationSchema {
pub tables: Arc<Tables>,
pub system: Arc<SystemCatalogTable>,
}
@@ -199,41 +41,19 @@ impl SchemaProvider for InformationSchema {
}
fn table_names(&self) -> Result<Vec<String>, Error> {
Ok(vec![
"tables".to_string(),
SYSTEM_CATALOG_TABLE_NAME.to_string(),
])
Ok(vec![SYSTEM_CATALOG_TABLE_NAME.to_string()])
}
async fn table(&self, name: &str) -> Result<Option<TableRef>, Error> {
if name.eq_ignore_ascii_case("tables") {
Ok(Some(self.tables.clone()))
} else if name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) {
if name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) {
Ok(Some(self.system.clone()))
} else {
Ok(None)
}
}
fn register_table(
&self,
_name: String,
_table: TableRef,
) -> crate::error::Result<Option<TableRef>> {
panic!("System catalog & schema does not support register table")
}
fn rename_table(&self, _name: &str, _new_name: String) -> crate::error::Result<TableRef> {
unimplemented!("System catalog & schema does not support rename table")
}
fn deregister_table(&self, _name: &str) -> crate::error::Result<Option<TableRef>> {
panic!("System catalog & schema does not support deregister table")
}
fn table_exist(&self, name: &str) -> Result<bool, Error> {
Ok(name.eq_ignore_ascii_case("tables")
|| name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME))
Ok(name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME))
}
}
@@ -242,9 +62,8 @@ pub struct SystemCatalog {
}
impl SystemCatalog {
pub fn new(system: SystemCatalogTable, catalogs: CatalogListRef) -> Self {
pub(crate) fn new(system: SystemCatalogTable) -> Self {
let schema = InformationSchema {
tables: Arc::new(Tables::new(catalogs)),
system: Arc::new(system),
};
Self {
@@ -322,107 +141,3 @@ impl CatalogProvider for SystemCatalog {
}
}
}
fn build_schema_for_tables() -> Schema {
let cols = vec![
ColumnSchema::new(
"catalog".to_string(),
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"schema".to_string(),
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"table_name".to_string(),
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"engine".to_string(),
ConcreteDataType::string_datatype(),
false,
),
];
Schema::new(cols)
}
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::physical_plan::SessionContext;
use futures_util::StreamExt;
use table::table::numbers::NumbersTable;
use super::*;
use crate::local::memory::new_memory_catalog_list;
use crate::CatalogList;
#[tokio::test]
async fn test_tables() {
let catalog_list = new_memory_catalog_list().unwrap();
let schema = catalog_list
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap()
.unwrap();
schema
.register_table(
"test_table".to_string(),
Arc::new(NumbersTable::with_name(1, "test_table".to_string())),
)
.unwrap();
let tables = Tables::new(catalog_list);
let tables_stream = tables.scan(None, &[], None).await.unwrap();
let session_ctx = SessionContext::new();
let mut tables_stream = tables_stream.execute(0, session_ctx.task_ctx()).unwrap();
if let Some(t) = tables_stream.next().await {
let batch = t.unwrap();
assert_eq!(1, batch.num_rows());
assert_eq!(4, batch.num_columns());
assert_eq!(
ConcreteDataType::string_datatype(),
batch.column(0).data_type()
);
assert_eq!(
ConcreteDataType::string_datatype(),
batch.column(1).data_type()
);
assert_eq!(
ConcreteDataType::string_datatype(),
batch.column(2).data_type()
);
assert_eq!(
ConcreteDataType::string_datatype(),
batch.column(3).data_type()
);
assert_eq!(
"greptime",
batch.column(0).get_ref(0).as_string().unwrap().unwrap()
);
assert_eq!(
"public",
batch.column(1).get_ref(0).as_string().unwrap().unwrap()
);
assert_eq!(
"test_table",
batch.column(2).get_ref(0).as_string().unwrap().unwrap()
);
assert_eq!(
"test_engine",
batch.column(3).get_ref(0).as_string().unwrap().unwrap()
);
} else {
panic!("Record batch should not be empty!")
}
}
}

View File

@@ -398,7 +398,6 @@ impl DFLogicalSubstraitConvertor {
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropView(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::DropTable(_)
@@ -409,7 +408,8 @@ impl DFLogicalSubstraitConvertor {
| LogicalPlan::Prepare(_)
| LogicalPlan::Dml(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Unnest(_) => InvalidParametersSnafu {
| LogicalPlan::Unnest(_)
| LogicalPlan::Statement(_) => InvalidParametersSnafu {
reason: format!(
"Trying to convert DDL/DML plan to substrait proto, plan: {plan:?}",
),

View File

@@ -413,22 +413,6 @@ impl SchemaProvider for FrontendSchemaProvider {
Ok(Some(table))
}
fn register_table(
&self,
_name: String,
_table: TableRef,
) -> catalog::error::Result<Option<TableRef>> {
unimplemented!("Frontend schema provider does not support register table")
}
fn rename_table(&self, _name: &str, _new_name: String) -> catalog_err::Result<TableRef> {
unimplemented!("Frontend schema provider does not support rename table")
}
fn deregister_table(&self, _name: &str) -> catalog::error::Result<Option<TableRef>> {
unimplemented!("Frontend schema provider does not support deregister table")
}
fn table_exist(&self, name: &str) -> catalog::error::Result<bool> {
Ok(self.table_names()?.contains(&name.to_string()))
}

View File

@@ -20,7 +20,9 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use catalog::remote::MetaKvBackend;
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::remote::{MetaKvBackend, RemoteCatalogManager};
use catalog::CatalogProvider;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_runtime::Builder as RuntimeBuilder;
@@ -87,6 +89,20 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon
let frontend_instance = Instance::try_new_standalone(dn_instance.clone())
.await
.unwrap();
// create another catalog and schema for testing
let another_catalog = Arc::new(MemoryCatalogProvider::new());
let _ = another_catalog
.register_schema(
"another_schema".to_string(),
Arc::new(MemorySchemaProvider::new()),
)
.unwrap();
let _ = dn_instance
.catalog_manager()
.register_catalog("another_catalog".to_string(), another_catalog)
.unwrap();
dn_instance.start().await.unwrap();
MockStandaloneInstance {
instance: Arc::new(frontend_instance),
@@ -209,6 +225,16 @@ async fn create_distributed_datanode(
);
instance.start().await.unwrap();
// create another catalog and schema for testing
let _ = instance
.catalog_manager()
.as_any()
.downcast_ref::<RemoteCatalogManager>()
.unwrap()
.create_catalog_and_schema("another_catalog", "another_schema")
.await
.unwrap();
(
instance,
TestGuard {

View File

@@ -15,7 +15,7 @@
use std::env;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::Output;
use common_recordbatch::util;
use common_telemetry::logging;
@@ -23,9 +23,9 @@ use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef};
use rstest::rstest;
use rstest_reuse::apply;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use session::context::{QueryContext, QueryContextRef};
use crate::error::Error;
use crate::error::{Error, Result};
use crate::instance::Instance;
use crate::tests::test_util::{
both_instances_cases, check_output_stream, check_unordered_output_stream, distributed,
@@ -246,8 +246,7 @@ async fn test_execute_insert_by_select(instance: Arc<dyn MockInstance>) {
+-------+------+--------+---------------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
+-------+------+--------+---------------------+"
.to_string();
+-------+------+--------+---------------------+";
check_output_stream(output, expected).await;
}
@@ -451,51 +450,58 @@ async fn test_rename_table(instance: Arc<dyn MockInstance>) {
let output = execute_sql(&instance, "create database db").await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = execute_sql_in_db(
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, "db"));
let output = execute_sql_with(
&instance,
"create table demo(host string, cpu double, memory double, ts timestamp, time index(ts))",
"db",
query_ctx.clone(),
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
// make sure table insertion is ok before altering table name
let output = execute_sql_in_db(
let output = execute_sql_with(
&instance,
"insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000), ('host2', 2.2, 200, 2000)",
"db",
query_ctx.clone(),
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
// rename table
let output = execute_sql_in_db(&instance, "alter table demo rename test_table", "db").await;
let output = execute_sql_with(
&instance,
"alter table demo rename test_table",
query_ctx.clone(),
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
let output = execute_sql_in_db(&instance, "show tables", "db").await;
let output = execute_sql_with(&instance, "show tables", query_ctx.clone()).await;
let expect = "\
+------------+
| Tables |
+------------+
| test_table |
+------------+\
"
.to_string();
+------------+";
check_output_stream(output, expect).await;
let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await;
let output = execute_sql_with(
&instance,
"select * from test_table order by ts",
query_ctx.clone(),
)
.await;
let expected = "\
+-------+-----+--------+---------------------+
| host | cpu | memory | ts |
+-------+-----+--------+---------------------+
| host1 | 1.1 | 100.0 | 1970-01-01T00:00:01 |
| host2 | 2.2 | 200.0 | 1970-01-01T00:00:02 |
+-------+-----+--------+---------------------+\
"
.to_string();
+-------+-----+--------+---------------------+";
check_output_stream(output, expected).await;
try_execute_sql_in_db(&instance, "select * from demo", "db")
try_execute_sql_with(&instance, "select * from demo", query_ctx)
.await
.expect_err("no table found in expect");
}
@@ -510,30 +516,31 @@ async fn test_create_table_after_rename_table(instance: Arc<dyn MockInstance>) {
// create test table
let table_name = "demo";
let output = execute_sql_in_db(
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, "db"));
let output = execute_sql_with(
&instance,
&format!("create table {table_name}(host string, cpu double, memory double, ts timestamp, time index(ts))"),
"db",
query_ctx.clone(),
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
// rename table
let new_table_name = "test_table";
let output = execute_sql_in_db(
let output = execute_sql_with(
&instance,
&format!("alter table {table_name} rename {new_table_name}"),
"db",
query_ctx.clone(),
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
// create table with same name
// create test table
let output = execute_sql_in_db(
let output = execute_sql_with(
&instance,
&format!("create table {table_name}(host string, cpu double, memory double, ts timestamp, time index(ts))"),
"db",
query_ctx.clone(),
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
@@ -544,10 +551,8 @@ async fn test_create_table_after_rename_table(instance: Arc<dyn MockInstance>) {
+------------+
| demo |
| test_table |
+------------+\
"
.to_string();
let output = execute_sql_in_db(&instance, "show tables", "db").await;
+------------+";
let output = execute_sql_with(&instance, "show tables", query_ctx).await;
check_output_stream(output, expect).await;
}
@@ -594,9 +599,7 @@ async fn test_alter_table(instance: Arc<dyn MockInstance>) {
| host1 | 1.1 | 100.0 | 1970-01-01T00:00:01 | |
| host2 | 2.2 | 200.0 | 1970-01-01T00:00:02 | hello |
| host3 | 3.3 | 300.0 | 1970-01-01T00:00:03 | |
+-------+-----+--------+---------------------+--------+\
"
.to_string();
+-------+-----+--------+---------------------+--------+";
check_output_stream(output, expected).await;
// Drop a column
@@ -611,9 +614,7 @@ async fn test_alter_table(instance: Arc<dyn MockInstance>) {
| host1 | 1.1 | 1970-01-01T00:00:01 | |
| host2 | 2.2 | 1970-01-01T00:00:02 | hello |
| host3 | 3.3 | 1970-01-01T00:00:03 | |
+-------+-----+---------------------+--------+\
"
.to_string();
+-------+-----+---------------------+--------+";
check_output_stream(output, expected).await;
// insert a new row
@@ -633,9 +634,7 @@ async fn test_alter_table(instance: Arc<dyn MockInstance>) {
| host2 | 2.2 | 1970-01-01T00:00:02 | hello |
| host3 | 3.3 | 1970-01-01T00:00:03 | |
| host4 | 400.0 | 1970-01-01T00:00:04 | world |
+-------+-------+---------------------+--------+\
"
.to_string();
+-------+-------+---------------------+--------+";
check_output_stream(output, expected).await;
}
@@ -676,9 +675,7 @@ async fn test_insert_with_default_value_for_type(instance: Arc<Instance>, type_n
+-------+-----+
| host1 | 1.1 |
| host2 | 2.2 |
+-------+-----+\
"
.to_string();
+-------+-----+";
check_output_stream(output, expected).await;
}
@@ -697,42 +694,39 @@ async fn test_use_database(instance: Arc<dyn MockInstance>) {
let output = execute_sql(&instance, "create database db1").await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = execute_sql_in_db(
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, "db1"));
let output = execute_sql_with(
&instance,
"create table tb1(col_i32 int, ts bigint, TIME INDEX(ts))",
"db1",
query_ctx.clone(),
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
let output = execute_sql_in_db(&instance, "show tables", "db1").await;
let output = execute_sql_with(&instance, "show tables", query_ctx.clone()).await;
let expected = "\
+--------+
| Tables |
+--------+
| tb1 |
+--------+\
"
.to_string();
+--------+";
check_output_stream(output, expected).await;
let output = execute_sql_in_db(
let output = execute_sql_with(
&instance,
r#"insert into tb1(col_i32, ts) values (1, 1655276557000)"#,
"db1",
query_ctx.clone(),
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = execute_sql_in_db(&instance, "select col_i32 from tb1", "db1").await;
let output = execute_sql_with(&instance, "select col_i32 from tb1", query_ctx.clone()).await;
let expected = "\
+---------+
| col_i32 |
+---------+
| 1 |
+---------+\
"
.to_string();
+---------+";
check_output_stream(output, expected).await;
// Making a particular database the default by means of the USE statement does not preclude
@@ -743,9 +737,7 @@ async fn test_use_database(instance: Arc<dyn MockInstance>) {
| number |
+--------+
| 0 |
+--------+\
"
.to_string();
+--------+";
check_output_stream(output, expected).await;
}
@@ -793,9 +785,7 @@ async fn test_delete(instance: Arc<dyn MockInstance>) {
+-------+---------------------+------+--------+
| host2 | 2022-06-15T07:02:38 | 77.7 | 2048.0 |
| host3 | 2022-06-15T07:02:39 | 88.8 | 3072.0 |
+-------+---------------------+------+--------+\
"
.to_string();
+-------+---------------------+------+--------+";
check_output_stream(output, expect).await;
}
@@ -929,34 +919,82 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
+-------+------+--------+---------------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
+-------+------+--------+---------------------+"
.to_string();
+-------+------+--------+---------------------+";
check_output_stream(output, expected).await;
}
}
}
}
#[apply(both_instances_cases)]
async fn test_information_schema(instance: Arc<dyn MockInstance>) {
let is_distributed_mode = instance.is_distributed_mode();
let instance = instance.frontend();
let sql = "create table another_table(i bigint time index)";
let query_ctx = Arc::new(QueryContext::with("another_catalog", "another_schema"));
let output = execute_sql_with(&instance, sql, query_ctx.clone()).await;
assert!(matches!(output, Output::AffectedRows(0)));
// User can only see information schema under current catalog.
// A necessary requirement to GreptimeCloud.
let sql = "select table_catalog, table_schema, table_name, table_type from information_schema.tables where table_type != 'SYSTEM VIEW' order by table_name";
let output = execute_sql(&instance, sql).await;
let expected = if is_distributed_mode {
"\
+---------------+--------------------+------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+------------+------------+
| greptime | public | scripts | BASE TABLE |
| greptime | information_schema | tables | VIEW |
+---------------+--------------------+------------+------------+"
} else {
"\
+---------------+--------------------+------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+------------+------------+
| greptime | public | numbers | BASE TABLE |
| greptime | public | scripts | BASE TABLE |
| greptime | information_schema | tables | VIEW |
+---------------+--------------------+------------+------------+"
};
check_output_stream(output, expected).await;
let output = execute_sql_with(&instance, sql, query_ctx).await;
let expected = "\
+-----------------+--------------------+---------------+------------+
| table_catalog | table_schema | table_name | table_type |
+-----------------+--------------------+---------------+------------+
| another_catalog | another_schema | another_table | BASE TABLE |
| another_catalog | information_schema | tables | VIEW |
+-----------------+--------------------+---------------+------------+";
check_output_stream(output, expected).await;
}
async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
execute_sql_with(instance, sql, QueryContext::arc()).await
}
async fn try_execute_sql(
instance: &Arc<Instance>,
sql: &str,
) -> Result<Output, crate::error::Error> {
try_execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
async fn try_execute_sql(instance: &Arc<Instance>, sql: &str) -> Result<Output> {
try_execute_sql_with(instance, sql, QueryContext::arc()).await
}
async fn try_execute_sql_in_db(
async fn try_execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
db: &str,
) -> Result<Output, crate::error::Error> {
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db));
query_ctx: QueryContextRef,
) -> Result<Output> {
instance.do_query(sql, query_ctx).await.remove(0)
}
async fn execute_sql_in_db(instance: &Arc<Instance>, sql: &str, db: &str) -> Output {
try_execute_sql_in_db(instance, sql, db).await.unwrap()
async fn execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> Output {
try_execute_sql_with(instance, sql, query_ctx)
.await
.unwrap()
}

View File

@@ -87,14 +87,14 @@ pub(crate) fn standalone_instance_case(
) {
}
pub(crate) async fn check_output_stream(output: Output, expected: String) {
pub(crate) async fn check_output_stream(output: Output, expected: &str) {
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected, "{}", pretty_print);
assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print);
}
pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str) {

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::router_server::RouterServer;
@@ -73,7 +74,10 @@ pub async fn mock(
.await
});
let config = ChannelConfig::new();
let config = ChannelConfig::new()
.timeout(Duration::from_secs(1))
.connect_timeout(Duration::from_secs(1))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config);
// Move client to an option so we can _move_ the inner value

View File

@@ -14,7 +14,6 @@
//! Planner, QueryEngine implementations based on DataFusion.
mod catalog_adapter;
mod error;
mod planner;
@@ -22,6 +21,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
pub use catalog::datafusion::catalog_adapter::DfCatalogListAdapter;
use common_error::prelude::BoxedError;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::udf::create_udf;
@@ -44,7 +44,6 @@ use snafu::{ensure, OptionExt, ResultExt};
use table::requests::{DeleteRequest, InsertRequest};
use table::TableRef;
pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::error::{
CatalogNotFoundSnafu, CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu,

View File

@@ -38,12 +38,6 @@ pub enum InnerError {
source: datatypes::error::Error,
},
#[snafu(display("Failed to convert table schema, source: {}", source))]
TableSchemaMismatch {
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display(
"Failed to convert DataFusion's recordbatch stream, source: {}",
source
@@ -67,10 +61,7 @@ impl ErrorExt for InnerError {
match self {
// TODO(yingwen): Further categorize datafusion error.
Datafusion { .. } => StatusCode::EngineExecuteQuery,
// This downcast should not fail in usual case.
PhysicalPlanDowncast { .. } | ConvertSchema { .. } | TableSchemaMismatch { .. } => {
StatusCode::Unexpected
}
PhysicalPlanDowncast { .. } | ConvertSchema { .. } => StatusCode::Unexpected,
ConvertDfRecordBatchStream { source } => source.status_code(),
ExecutePhysicalPlan { source } => source.status_code(),
}

View File

@@ -93,7 +93,6 @@ impl OptimizerRule for TypeConversionRule {
| LogicalPlan::DropView { .. }
| LogicalPlan::Distinct { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::SetVariable { .. }
| LogicalPlan::Analyze { .. } => {
let inputs = plan.inputs();
let mut new_inputs = Vec::with_capacity(inputs.len());
@@ -120,7 +119,8 @@ impl OptimizerRule for TypeConversionRule {
| LogicalPlan::Prepare(_)
| LogicalPlan::Dml(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Unnest(_) => Ok(Some(plan.clone())),
| LogicalPlan::Unnest(_)
| LogicalPlan::Statement(_) => Ok(Some(plan.clone())),
}
}

View File

@@ -18,7 +18,6 @@ use datatypes::for_all_primitive_types;
use datatypes::prelude::*;
use datatypes::types::WrapperType;
use datatypes::value::OrderedFloat;
use format_num::NumberFormat;
use num_traits::AsPrimitive;
use crate::error::Result;
@@ -56,14 +55,12 @@ where
let numbers =
function::get_numbers_from_table::<T>(column_name, table_name, engine.clone()).await;
let expected_value = numbers.iter().map(|&n| n.as_()).collect::<Vec<f64>>();
let expected_value = inc_stats::mean(expected_value.iter().cloned()).unwrap();
if let Value::Float64(OrderedFloat(value)) = value {
let num = NumberFormat::new();
let value = num.format(".6e", value);
let expected_value = num.format(".6e", expected_value);
assert_eq!(value, expected_value);
}
let numbers = numbers.iter().map(|&n| n.as_()).collect::<Vec<f64>>();
let expected = numbers.iter().sum::<f64>() / (numbers.len() as f64);
let Value::Float64(OrderedFloat(value)) = value else { unreachable!() };
assert!(
(value - expected).abs() < 1e-3,
"expected {expected}, actual {value}"
);
Ok(())
}

View File

@@ -279,8 +279,7 @@ fn sqrt(py: Python<'_>, val: PyObject) -> PyResult<PyObject> {
```
*/
bind_call_unary_math_function!(
sqrt, sin, cos, tan, asin, acos, atan, floor, ceil, round, trunc, abs, signum, exp, ln, log2,
log10
sqrt, sin, cos, tan, asin, acos, atan, floor, ceil, trunc, abs, signum, exp, ln, log2, log10
);
/// return a random vector range from 0 to 1 and length of len
@@ -296,6 +295,15 @@ fn random(py: Python<'_>, len: usize) -> PyResult<PyObject> {
columnar_value_to_py_any(py, res)
}
#[pyfunction]
fn round(py: Python<'_>, val: PyObject) -> PyResult<PyObject> {
let value = try_into_columnar_value(py, val)?;
let array = value.into_array(1);
let result =
math_expressions::round(&[array]).map_err(|e| PyValueError::new_err(format!("{e:?}")))?;
columnar_value_to_py_any(py, ColumnarValue::Array(result))
}
/// The macro for binding function in `datafusion_physical_expr::expressions`(most of them are aggregate function)
macro_rules! bind_aggr_expr {
($FUNC_NAME:ident, $AGGR_FUNC: ident, [$($ARG: ident),*], $ARG_TY: ident, $($EXPR:ident => $idx: literal),*) => {

View File

@@ -545,7 +545,10 @@ pub(crate) mod greptime_builtin {
/// simple math function, the backing implement is datafusion's `round` math function
#[pyfunction]
fn round(val: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
bind_call_unary_math_function!(round, vm, val);
let value = try_into_columnar_value(val, vm)?;
let array = value.into_array(1);
let result = math_expressions::round(&[array]).map_err(|e| from_df_err(e, vm))?;
try_into_py_obj(DFColValue::Array(result), vm)
}
/// simple math function, the backing implement is datafusion's `trunc` math function

View File

@@ -0,0 +1,48 @@
create
database my_db;
Affected Rows: 1
use
my_db;
++
++
create table foo
(
ts bigint time index
);
Affected Rows: 0
select table_name
from information_schema.tables
where table_schema = 'my_db'
order by table_name;
+------------+
| table_name |
+------------+
| foo |
+------------+
select table_catalog, table_schema, table_name, table_type
from information_schema.tables
where table_catalog = 'greptime'
and table_schema != 'public'
order by table_schema, table_name;
+---------------+--------------------+------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+------------+------------+
| greptime | information_schema | tables | VIEW |
| greptime | my_db | foo | BASE TABLE |
+---------------+--------------------+------------+------------+
use
public;
++
++

View File

@@ -0,0 +1,24 @@
create
database my_db;
use
my_db;
create table foo
(
ts bigint time index
);
select table_name
from information_schema.tables
where table_schema = 'my_db'
order by table_name;
select table_catalog, table_schema, table_name, table_type
from information_schema.tables
where table_catalog = 'greptime'
and table_schema != 'public'
order by table_schema, table_name;
use
public;

View File

@@ -169,7 +169,7 @@ impl Env {
}
"frontend" => {
args.push("--metasrv-addr=0.0.0.0:3002".to_string());
args.push("--http-addr=0.0.0.0:5000".to_string());
args.push("--http-addr=0.0.0.0:5003".to_string());
}
"metasrv" => {
args.push("--use-memory-store".to_string());
@@ -264,7 +264,7 @@ impl Database for GreptimeDB {
}
let mut client = self.client.lock().await;
if query.trim().starts_with("USE ") {
if query.trim().to_lowercase().starts_with("use ") {
let database = query
.split_ascii_whitespace()
.nth(1)