mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-28 10:50:39 +00:00
fix: let information_schema know itself (#2149)
* rename show create table Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * register information_schema on registering catalog * fix tests in standalone Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix frontend catalog manager Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * add sqlness test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix clippy Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix clippy & typo Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * tweak sqlness test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * rename constructor Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * rename method Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix typo (again) Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * remove redundent clones Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -16,18 +16,23 @@ mod columns;
|
||||
mod tables;
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::{
|
||||
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME,
|
||||
INFORMATION_SCHEMA_TABLES_TABLE_ID,
|
||||
};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use futures_util::StreamExt;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::ScanRequest;
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
use table::data_source::DataSource;
|
||||
use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
|
||||
use table::metadata::TableType;
|
||||
use table::metadata::{TableIdent, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::{Result as TableResult, Table, TableRef};
|
||||
|
||||
use self::columns::InformationSchemaColumns;
|
||||
@@ -36,8 +41,8 @@ use crate::information_schema::tables::InformationSchemaTables;
|
||||
use crate::table_factory::TableFactory;
|
||||
use crate::CatalogManager;
|
||||
|
||||
const TABLES: &str = "tables";
|
||||
const COLUMNS: &str = "columns";
|
||||
pub const TABLES: &str = "tables";
|
||||
pub const COLUMNS: &str = "columns";
|
||||
|
||||
pub struct InformationSchemaProvider {
|
||||
catalog_name: String,
|
||||
@@ -51,42 +56,95 @@ impl InformationSchemaProvider {
|
||||
catalog_manager,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InformationSchemaProvider {
|
||||
/// Build a map of [TableRef] in information schema.
|
||||
/// Including `tables` and `columns`.
|
||||
pub fn build(
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
) -> HashMap<String, TableRef> {
|
||||
let mut schema = HashMap::new();
|
||||
|
||||
schema.insert(
|
||||
TABLES.to_string(),
|
||||
Arc::new(InformationTable::new(
|
||||
catalog_name.clone(),
|
||||
INFORMATION_SCHEMA_TABLES_TABLE_ID,
|
||||
TABLES.to_string(),
|
||||
Arc::new(InformationSchemaTables::new(
|
||||
catalog_name.clone(),
|
||||
catalog_manager.clone(),
|
||||
)),
|
||||
)) as _,
|
||||
);
|
||||
schema.insert(
|
||||
COLUMNS.to_string(),
|
||||
Arc::new(InformationTable::new(
|
||||
catalog_name.clone(),
|
||||
INFORMATION_SCHEMA_COLUMNS_TABLE_ID,
|
||||
COLUMNS.to_string(),
|
||||
Arc::new(InformationSchemaColumns::new(catalog_name, catalog_manager)),
|
||||
)) as _,
|
||||
);
|
||||
|
||||
schema
|
||||
}
|
||||
|
||||
pub fn table(&self, name: &str) -> Result<Option<TableRef>> {
|
||||
let stream_builder = match name.to_ascii_lowercase().as_ref() {
|
||||
TABLES => Arc::new(InformationSchemaTables::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _,
|
||||
COLUMNS => Arc::new(InformationSchemaColumns::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _,
|
||||
let (stream_builder, table_id) = match name.to_ascii_lowercase().as_ref() {
|
||||
TABLES => (
|
||||
Arc::new(InformationSchemaTables::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _,
|
||||
INFORMATION_SCHEMA_TABLES_TABLE_ID,
|
||||
),
|
||||
COLUMNS => (
|
||||
Arc::new(InformationSchemaColumns::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _,
|
||||
INFORMATION_SCHEMA_COLUMNS_TABLE_ID,
|
||||
),
|
||||
_ => {
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Some(Arc::new(InformationTable::new(stream_builder))))
|
||||
Ok(Some(Arc::new(InformationTable::new(
|
||||
self.catalog_name.clone(),
|
||||
table_id,
|
||||
name.to_string(),
|
||||
stream_builder,
|
||||
))))
|
||||
}
|
||||
|
||||
pub fn table_factory(&self, name: &str) -> Result<Option<TableFactory>> {
|
||||
let stream_builder = match name.to_ascii_lowercase().as_ref() {
|
||||
TABLES => Arc::new(InformationSchemaTables::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _,
|
||||
COLUMNS => Arc::new(InformationSchemaColumns::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _,
|
||||
let (stream_builder, table_id) = match name.to_ascii_lowercase().as_ref() {
|
||||
TABLES => (
|
||||
Arc::new(InformationSchemaTables::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _,
|
||||
INFORMATION_SCHEMA_TABLES_TABLE_ID,
|
||||
),
|
||||
COLUMNS => (
|
||||
Arc::new(InformationSchemaColumns::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _,
|
||||
INFORMATION_SCHEMA_COLUMNS_TABLE_ID,
|
||||
),
|
||||
_ => {
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
let data_source = Arc::new(InformationTable::new(stream_builder));
|
||||
let data_source = Arc::new(InformationTable::new(
|
||||
self.catalog_name.clone(),
|
||||
table_id,
|
||||
name.to_string(),
|
||||
stream_builder,
|
||||
));
|
||||
|
||||
Ok(Some(Arc::new(move || data_source.clone())))
|
||||
}
|
||||
@@ -101,12 +159,25 @@ pub trait InformationStreamBuilder: Send + Sync {
|
||||
}
|
||||
|
||||
pub struct InformationTable {
|
||||
catalog_name: String,
|
||||
table_id: TableId,
|
||||
name: String,
|
||||
stream_builder: Arc<dyn InformationStreamBuilder>,
|
||||
}
|
||||
|
||||
impl InformationTable {
|
||||
pub fn new(stream_builder: Arc<dyn InformationStreamBuilder>) -> Self {
|
||||
Self { stream_builder }
|
||||
pub fn new(
|
||||
catalog_name: String,
|
||||
table_id: TableId,
|
||||
name: String,
|
||||
stream_builder: Arc<dyn InformationStreamBuilder>,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_name,
|
||||
table_id,
|
||||
name,
|
||||
stream_builder,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,7 +192,26 @@ impl Table for InformationTable {
|
||||
}
|
||||
|
||||
fn table_info(&self) -> table::metadata::TableInfoRef {
|
||||
unreachable!("Should not call table_info() of InformationTable directly")
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(self.stream_builder.schema())
|
||||
.primary_key_indices(vec![])
|
||||
.next_column_id(0)
|
||||
.build()
|
||||
.unwrap();
|
||||
Arc::new(
|
||||
TableInfoBuilder::default()
|
||||
.ident(TableIdent {
|
||||
table_id: self.table_id,
|
||||
version: 0,
|
||||
})
|
||||
.name(self.name.clone())
|
||||
.catalog_name(self.catalog_name.clone())
|
||||
.schema_name(INFORMATION_SCHEMA_NAME.to_string())
|
||||
.meta(table_meta)
|
||||
.table_type(TableType::Temporary)
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
fn table_type(&self) -> TableType {
|
||||
|
||||
@@ -16,7 +16,8 @@ use std::sync::{Arc, Weak};
|
||||
|
||||
use arrow_schema::SchemaRef as ArrowSchemaRef;
|
||||
use common_catalog::consts::{
|
||||
SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX,
|
||||
INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
|
||||
SEMANTIC_TYPE_TIME_INDEX,
|
||||
};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::physical_plan::TaskContext;
|
||||
@@ -31,7 +32,8 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::vectors::{StringVectorBuilder, VectorRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use super::InformationStreamBuilder;
|
||||
use super::tables::InformationSchemaTables;
|
||||
use super::{InformationStreamBuilder, COLUMNS, TABLES};
|
||||
use crate::error::{
|
||||
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
|
||||
};
|
||||
@@ -52,19 +54,22 @@ const SEMANTIC_TYPE: &str = "semantic_type";
|
||||
|
||||
impl InformationSchemaColumns {
|
||||
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Self {
|
||||
schema: Self::schema(),
|
||||
catalog_name,
|
||||
catalog_manager,
|
||||
}
|
||||
}
|
||||
|
||||
fn schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(DATA_TYPE, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(SEMANTIC_TYPE, ConcreteDataType::string_datatype(), false),
|
||||
]));
|
||||
Self {
|
||||
schema,
|
||||
catalog_name,
|
||||
catalog_manager,
|
||||
}
|
||||
]))
|
||||
}
|
||||
|
||||
fn builder(&self) -> InformationSchemaColumnsBuilder {
|
||||
@@ -153,14 +158,28 @@ impl InformationSchemaColumnsBuilder {
|
||||
.table_names(&catalog_name, &schema_name)
|
||||
.await?
|
||||
{
|
||||
let Some(table) = catalog_manager
|
||||
let (keys, schema) = if let Some(table) = catalog_manager
|
||||
.table(&catalog_name, &schema_name, &table_name)
|
||||
.await?
|
||||
else {
|
||||
continue;
|
||||
{
|
||||
let keys = &table.table_info().meta.primary_key_indices;
|
||||
let schema = table.schema();
|
||||
(keys.clone(), schema)
|
||||
} else {
|
||||
// TODO: this specific branch is only a workaround for FrontendCatalogManager.
|
||||
if schema_name == INFORMATION_SCHEMA_NAME {
|
||||
if table_name == COLUMNS {
|
||||
(vec![], InformationSchemaColumns::schema())
|
||||
} else if table_name == TABLES {
|
||||
(vec![], InformationSchemaTables::schema())
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let keys = &table.table_info().meta.primary_key_indices;
|
||||
let schema = table.schema();
|
||||
|
||||
for (idx, column) in schema.column_schemas().iter().enumerate() {
|
||||
let semantic_type = if column.is_time_index() {
|
||||
SEMANTIC_TYPE_TIME_INDEX
|
||||
|
||||
@@ -15,7 +15,10 @@
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use arrow_schema::SchemaRef as ArrowSchemaRef;
|
||||
use common_catalog::consts::INFORMATION_SCHEMA_NAME;
|
||||
use common_catalog::consts::{
|
||||
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME,
|
||||
INFORMATION_SCHEMA_TABLES_TABLE_ID,
|
||||
};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::physical_plan::TaskContext;
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
@@ -29,6 +32,7 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableType;
|
||||
|
||||
use super::{COLUMNS, TABLES};
|
||||
use crate::error::{
|
||||
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
|
||||
};
|
||||
@@ -43,19 +47,22 @@ pub(super) struct InformationSchemaTables {
|
||||
|
||||
impl InformationSchemaTables {
|
||||
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Self {
|
||||
schema: Self::schema(),
|
||||
catalog_name,
|
||||
catalog_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new("table_type", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new("table_id", ConcreteDataType::uint32_datatype(), true),
|
||||
ColumnSchema::new("engine", ConcreteDataType::string_datatype(), true),
|
||||
]));
|
||||
Self {
|
||||
schema,
|
||||
catalog_name,
|
||||
catalog_manager,
|
||||
}
|
||||
]))
|
||||
}
|
||||
|
||||
fn builder(&self) -> InformationSchemaTablesBuilder {
|
||||
@@ -137,9 +144,6 @@ impl InformationSchemaTablesBuilder {
|
||||
.context(UpgradeWeakCatalogManagerRefSnafu)?;
|
||||
|
||||
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
|
||||
if schema_name == INFORMATION_SCHEMA_NAME {
|
||||
continue;
|
||||
}
|
||||
if !catalog_manager
|
||||
.schema_exist(&catalog_name, &schema_name)
|
||||
.await?
|
||||
@@ -151,21 +155,43 @@ impl InformationSchemaTablesBuilder {
|
||||
.table_names(&catalog_name, &schema_name)
|
||||
.await?
|
||||
{
|
||||
let Some(table) = catalog_manager
|
||||
if let Some(table) = catalog_manager
|
||||
.table(&catalog_name, &schema_name, &table_name)
|
||||
.await?
|
||||
else {
|
||||
continue;
|
||||
{
|
||||
let table_info = table.table_info();
|
||||
self.add_table(
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_name,
|
||||
table.table_type(),
|
||||
Some(table_info.ident.table_id),
|
||||
Some(&table_info.meta.engine),
|
||||
);
|
||||
} else {
|
||||
// TODO: this specific branch is only a workaround for FrontendCatalogManager.
|
||||
if schema_name == INFORMATION_SCHEMA_NAME {
|
||||
if table_name == COLUMNS {
|
||||
self.add_table(
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_name,
|
||||
TableType::Temporary,
|
||||
Some(INFORMATION_SCHEMA_COLUMNS_TABLE_ID),
|
||||
None,
|
||||
);
|
||||
} else if table_name == TABLES {
|
||||
self.add_table(
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_name,
|
||||
TableType::Temporary,
|
||||
Some(INFORMATION_SCHEMA_TABLES_TABLE_ID),
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
let table_info = table.table_info();
|
||||
self.add_table(
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_name,
|
||||
table.table_type(),
|
||||
Some(table_info.ident.table_id),
|
||||
Some(&table_info.meta.engine),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ pub trait CatalogManager: Send + Sync {
|
||||
async fn start(&self) -> Result<()>;
|
||||
|
||||
/// Registers a catalog to catalog manager, returns whether the catalog exist before.
|
||||
async fn register_catalog(&self, name: String) -> Result<bool>;
|
||||
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool>;
|
||||
|
||||
/// Register a schema with catalog name and schema name. Retuens whether the
|
||||
/// schema registered.
|
||||
|
||||
@@ -43,7 +43,6 @@ use crate::error::{
|
||||
SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, TableNotExistSnafu,
|
||||
TableNotFoundSnafu, UnimplementedSnafu,
|
||||
};
|
||||
use crate::information_schema::InformationSchemaProvider;
|
||||
use crate::local::memory::MemoryCatalogManager;
|
||||
use crate::system::{
|
||||
decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX,
|
||||
@@ -51,9 +50,8 @@ use crate::system::{
|
||||
};
|
||||
use crate::tables::SystemCatalog;
|
||||
use crate::{
|
||||
handle_system_table_request, CatalogManager, CatalogManagerRef, DeregisterSchemaRequest,
|
||||
DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
|
||||
RegisterTableRequest, RenameTableRequest,
|
||||
handle_system_table_request, CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest,
|
||||
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
|
||||
};
|
||||
|
||||
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
|
||||
@@ -118,11 +116,18 @@ impl LocalCatalogManager {
|
||||
}
|
||||
|
||||
async fn init_system_catalog(&self) -> Result<()> {
|
||||
// register default catalog and default schema
|
||||
self.catalogs
|
||||
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string())?;
|
||||
self.catalogs.register_schema_sync(RegisterSchemaRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
})?;
|
||||
|
||||
// register SystemCatalogTable
|
||||
let _ = self
|
||||
.catalogs
|
||||
self.catalogs
|
||||
.register_catalog_sync(SYSTEM_CATALOG_NAME.to_string())?;
|
||||
let _ = self.catalogs.register_schema_sync(RegisterSchemaRequest {
|
||||
self.catalogs.register_schema_sync(RegisterSchemaRequest {
|
||||
catalog: SYSTEM_CATALOG_NAME.to_string(),
|
||||
schema: INFORMATION_SCHEMA_NAME.to_string(),
|
||||
})?;
|
||||
@@ -133,16 +138,7 @@ impl LocalCatalogManager {
|
||||
table_id: SYSTEM_CATALOG_TABLE_ID,
|
||||
table: self.system.information_schema.system.clone(),
|
||||
};
|
||||
let _ = self.catalogs.register_table(register_table_req).await?;
|
||||
|
||||
// register default catalog and default schema
|
||||
let _ = self
|
||||
.catalogs
|
||||
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string())?;
|
||||
let _ = self.catalogs.register_schema_sync(RegisterSchemaRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
})?;
|
||||
self.catalogs.register_table(register_table_req).await?;
|
||||
|
||||
// Add numbers table for test
|
||||
let numbers_table = Arc::new(NumbersTable::default());
|
||||
@@ -154,8 +150,7 @@ impl LocalCatalogManager {
|
||||
table: numbers_table,
|
||||
};
|
||||
|
||||
let _ = self
|
||||
.catalogs
|
||||
self.catalogs
|
||||
.register_table(register_number_table_req)
|
||||
.await?;
|
||||
|
||||
@@ -230,9 +225,8 @@ impl LocalCatalogManager {
|
||||
for entry in entries {
|
||||
match entry {
|
||||
Entry::Catalog(c) => {
|
||||
let _ = self
|
||||
.catalogs
|
||||
.register_catalog_if_absent(c.catalog_name.clone());
|
||||
self.catalogs
|
||||
.register_catalog_sync(c.catalog_name.clone())?;
|
||||
info!("Register catalog: {}", c.catalog_name);
|
||||
}
|
||||
Entry::Schema(s) => {
|
||||
@@ -548,13 +542,6 @@ impl CatalogManager for LocalCatalogManager {
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
) -> Result<Option<TableRef>> {
|
||||
if schema_name == INFORMATION_SCHEMA_NAME {
|
||||
let manager: CatalogManagerRef = self.catalogs.clone() as _;
|
||||
let provider =
|
||||
InformationSchemaProvider::new(catalog_name.to_string(), Arc::downgrade(&manager));
|
||||
return provider.table(table_name);
|
||||
}
|
||||
|
||||
self.catalogs
|
||||
.table(catalog_name, schema_name, table_name)
|
||||
.await
|
||||
@@ -584,8 +571,8 @@ impl CatalogManager for LocalCatalogManager {
|
||||
self.catalogs.table_names(catalog_name, schema_name).await
|
||||
}
|
||||
|
||||
async fn register_catalog(&self, name: String) -> Result<bool> {
|
||||
self.catalogs.register_catalog(name).await
|
||||
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool> {
|
||||
self.catalogs.clone().register_catalog(name).await
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
|
||||
@@ -16,9 +16,11 @@ use std::any::Any;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc, RwLock, Weak};
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
|
||||
use common_catalog::consts::{
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID,
|
||||
};
|
||||
use metrics::{decrement_gauge, increment_gauge};
|
||||
use snafu::OptionExt;
|
||||
use table::metadata::TableId;
|
||||
@@ -28,6 +30,7 @@ use table::TableRef;
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::information_schema::InformationSchemaProvider;
|
||||
use crate::{
|
||||
CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
|
||||
RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
|
||||
@@ -42,24 +45,6 @@ pub struct MemoryCatalogManager {
|
||||
pub table_id: AtomicU32,
|
||||
}
|
||||
|
||||
impl Default for MemoryCatalogManager {
|
||||
fn default() -> Self {
|
||||
let manager = Self {
|
||||
table_id: AtomicU32::new(MIN_USER_TABLE_ID),
|
||||
catalogs: Default::default(),
|
||||
};
|
||||
|
||||
let catalog = HashMap::from([(DEFAULT_SCHEMA_NAME.to_string(), HashMap::new())]);
|
||||
let _ = manager
|
||||
.catalogs
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(DEFAULT_CATALOG_NAME.to_string(), catalog);
|
||||
|
||||
manager
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TableIdProvider for MemoryCatalogManager {
|
||||
async fn next_table_id(&self) -> table::error::Result<TableId> {
|
||||
@@ -250,7 +235,7 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn register_catalog(&self, name: String) -> Result<bool> {
|
||||
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool> {
|
||||
self.register_catalog_sync(name)
|
||||
}
|
||||
|
||||
@@ -260,6 +245,28 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
}
|
||||
|
||||
impl MemoryCatalogManager {
|
||||
/// Create a manager with some default setups
|
||||
/// (e.g. default catalog/schema and information schema)
|
||||
pub fn with_default_setup() -> Arc<Self> {
|
||||
let manager = Arc::new(Self {
|
||||
table_id: AtomicU32::new(MIN_USER_TABLE_ID),
|
||||
catalogs: Default::default(),
|
||||
});
|
||||
|
||||
// Safety: default catalog/schema is registered in order so no CatalogNotFound error will occur
|
||||
manager
|
||||
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string())
|
||||
.unwrap();
|
||||
manager
|
||||
.register_schema_sync(RegisterSchemaRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
manager
|
||||
}
|
||||
|
||||
/// Registers a catalog and return the catalog already exist
|
||||
pub fn register_catalog_if_absent(&self, name: String) -> bool {
|
||||
let mut catalogs = self.catalogs.write().unwrap();
|
||||
@@ -273,12 +280,13 @@ impl MemoryCatalogManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_catalog_sync(&self, name: String) -> Result<bool> {
|
||||
pub fn register_catalog_sync(self: &Arc<Self>, name: String) -> Result<bool> {
|
||||
let mut catalogs = self.catalogs.write().unwrap();
|
||||
|
||||
match catalogs.entry(name) {
|
||||
match catalogs.entry(name.clone()) {
|
||||
Entry::Vacant(e) => {
|
||||
e.insert(HashMap::new());
|
||||
let catalog = self.create_catalog_entry(name);
|
||||
e.insert(catalog);
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
|
||||
Ok(true)
|
||||
}
|
||||
@@ -332,9 +340,19 @@ impl MemoryCatalogManager {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn create_catalog_entry(self: &Arc<Self>, catalog: String) -> SchemaEntries {
|
||||
let information_schema = InformationSchemaProvider::build(
|
||||
catalog,
|
||||
Arc::downgrade(self) as Weak<dyn CatalogManager>,
|
||||
);
|
||||
let mut catalog = HashMap::new();
|
||||
catalog.insert(INFORMATION_SCHEMA_NAME.to_string(), information_schema);
|
||||
catalog
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn new_with_table(table: TableRef) -> Self {
|
||||
let manager = Self::default();
|
||||
pub fn new_with_table(table: TableRef) -> Arc<Self> {
|
||||
let manager = Self::with_default_setup();
|
||||
let request = RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
@@ -349,7 +367,7 @@ impl MemoryCatalogManager {
|
||||
|
||||
/// Create a memory catalog list contains a numbers table for test
|
||||
pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
|
||||
Ok(Arc::new(MemoryCatalogManager::default()))
|
||||
Ok(MemoryCatalogManager::with_default_setup())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -392,7 +410,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mem_manager_rename_table() {
|
||||
let catalog = MemoryCatalogManager::default();
|
||||
let catalog = MemoryCatalogManager::with_default_setup();
|
||||
let table_name = "test_table";
|
||||
assert!(!catalog
|
||||
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
|
||||
@@ -456,7 +474,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_catalog_rename_table() {
|
||||
let catalog = MemoryCatalogManager::default();
|
||||
let catalog = MemoryCatalogManager::with_default_setup();
|
||||
let table_name = "num";
|
||||
let table_id = 2333;
|
||||
let table: TableRef = Arc::new(NumbersTable::new(table_id));
|
||||
@@ -507,14 +525,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
pub fn test_register_if_absent() {
|
||||
let list = MemoryCatalogManager::default();
|
||||
let list = MemoryCatalogManager::with_default_setup();
|
||||
assert!(!list.register_catalog_if_absent("test_catalog".to_string(),));
|
||||
assert!(list.register_catalog_if_absent("test_catalog".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_catalog_deregister_table() {
|
||||
let catalog = MemoryCatalogManager::default();
|
||||
let catalog = MemoryCatalogManager::with_default_setup();
|
||||
let table_name = "foo_table";
|
||||
|
||||
let register_table_req = RegisterTableRequest {
|
||||
@@ -549,7 +567,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_catalog_deregister_schema() {
|
||||
let catalog = MemoryCatalogManager::default();
|
||||
let catalog = MemoryCatalogManager::with_default_setup();
|
||||
|
||||
// Registers a catalog, a schema, and a table.
|
||||
let catalog_name = "foo_catalog".to_string();
|
||||
@@ -567,6 +585,7 @@ mod tests {
|
||||
table: Arc::new(NumbersTable::default()),
|
||||
};
|
||||
catalog
|
||||
.clone()
|
||||
.register_catalog(catalog_name.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -67,7 +67,7 @@ impl RemoteCatalogManager {
|
||||
backend,
|
||||
system_table_requests: Default::default(),
|
||||
region_alive_keepers,
|
||||
memory_catalog_manager: Arc::new(MemoryCatalogManager::default()),
|
||||
memory_catalog_manager: MemoryCatalogManager::with_default_setup(),
|
||||
table_metadata_manager,
|
||||
}
|
||||
}
|
||||
@@ -386,6 +386,7 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
if remote_catalog_exists
|
||||
&& self
|
||||
.memory_catalog_manager
|
||||
.clone()
|
||||
.register_catalog(catalog.to_string())
|
||||
.await?
|
||||
{
|
||||
@@ -423,7 +424,7 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn register_catalog(&self, name: String) -> Result<bool> {
|
||||
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool> {
|
||||
self.memory_catalog_manager.register_catalog_sync(name)
|
||||
}
|
||||
|
||||
|
||||
@@ -130,7 +130,7 @@ mod tests {
|
||||
let query_ctx = &QueryContext::with("greptime", "public");
|
||||
|
||||
let table_provider =
|
||||
DfTableSourceProvider::new(Arc::new(MemoryCatalogManager::default()), true, query_ctx);
|
||||
DfTableSourceProvider::new(MemoryCatalogManager::with_default_setup(), true, query_ctx);
|
||||
|
||||
let table_ref = TableReference::Bare {
|
||||
table: Cow::Borrowed("table_name"),
|
||||
|
||||
@@ -26,7 +26,9 @@ mod tests {
|
||||
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
|
||||
use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager};
|
||||
use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
|
||||
use common_catalog::consts::{
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MITO_ENGINE,
|
||||
};
|
||||
use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
@@ -179,12 +181,17 @@ mod tests {
|
||||
catalog_manager.catalog_names().await.unwrap()
|
||||
);
|
||||
|
||||
let mut schema_names = catalog_manager
|
||||
.schema_names(DEFAULT_CATALOG_NAME)
|
||||
.await
|
||||
.unwrap();
|
||||
schema_names.sort_unstable();
|
||||
assert_eq!(
|
||||
vec![DEFAULT_SCHEMA_NAME.to_string()],
|
||||
catalog_manager
|
||||
.schema_names(DEFAULT_CATALOG_NAME)
|
||||
.await
|
||||
.unwrap()
|
||||
vec![
|
||||
INFORMATION_SCHEMA_NAME.to_string(),
|
||||
DEFAULT_SCHEMA_NAME.to_string()
|
||||
],
|
||||
schema_names
|
||||
);
|
||||
}
|
||||
|
||||
@@ -240,13 +247,18 @@ mod tests {
|
||||
async fn test_register_table() {
|
||||
let node_id = 42;
|
||||
let components = prepare_components(node_id).await;
|
||||
let mut schema_names = components
|
||||
.catalog_manager
|
||||
.schema_names(DEFAULT_CATALOG_NAME)
|
||||
.await
|
||||
.unwrap();
|
||||
schema_names.sort_unstable();
|
||||
assert_eq!(
|
||||
vec![DEFAULT_SCHEMA_NAME.to_string()],
|
||||
components
|
||||
.catalog_manager
|
||||
.schema_names(DEFAULT_CATALOG_NAME)
|
||||
.await
|
||||
.unwrap()
|
||||
vec![
|
||||
INFORMATION_SCHEMA_NAME.to_string(),
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
],
|
||||
schema_names
|
||||
);
|
||||
|
||||
// register a new table with an nonexistent catalog
|
||||
@@ -309,6 +321,7 @@ mod tests {
|
||||
// register catalog to catalog manager
|
||||
assert!(components
|
||||
.catalog_manager
|
||||
.clone()
|
||||
.register_catalog(catalog_name.clone())
|
||||
.await
|
||||
.is_ok());
|
||||
@@ -374,7 +387,7 @@ mod tests {
|
||||
.unwrap());
|
||||
|
||||
assert_eq!(
|
||||
HashSet::from([schema_name.clone()]),
|
||||
HashSet::from([schema_name.clone(), INFORMATION_SCHEMA_NAME.to_string()]),
|
||||
components
|
||||
.catalog_manager
|
||||
.schema_names(&catalog_name)
|
||||
|
||||
@@ -29,6 +29,10 @@ pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0;
|
||||
pub const SCRIPTS_TABLE_ID: u32 = 1;
|
||||
/// numbers table id
|
||||
pub const NUMBERS_TABLE_ID: u32 = 2;
|
||||
/// id for information_schema.tables
|
||||
pub const INFORMATION_SCHEMA_TABLES_TABLE_ID: u32 = 3;
|
||||
/// id for information_schema.columns
|
||||
pub const INFORMATION_SCHEMA_COLUMNS_TABLE_ID: u32 = 4;
|
||||
|
||||
pub const MITO_ENGINE: &str = "mito";
|
||||
pub const IMMUTABLE_FILE_ENGINE: &str = "file";
|
||||
|
||||
@@ -208,7 +208,7 @@ impl Instance {
|
||||
let (catalog_manager, table_id_provider, region_alive_keepers) = match opts.mode {
|
||||
Mode::Standalone => {
|
||||
if opts.enable_memory_catalog {
|
||||
let catalog = Arc::new(catalog::local::MemoryCatalogManager::default());
|
||||
let catalog = catalog::local::MemoryCatalogManager::with_default_setup();
|
||||
let table = NumbersTable::new(MIN_USER_TABLE_ID);
|
||||
|
||||
let _ = catalog
|
||||
|
||||
@@ -21,7 +21,7 @@ use catalog::error::{
|
||||
self as catalog_err, InternalSnafu, InvalidCatalogValueSnafu, InvalidSystemTableDefSnafu,
|
||||
Result as CatalogResult, TableMetadataManagerSnafu, UnimplementedSnafu,
|
||||
};
|
||||
use catalog::information_schema::InformationSchemaProvider;
|
||||
use catalog::information_schema::{InformationSchemaProvider, COLUMNS, TABLES};
|
||||
use catalog::remote::KvCacheInvalidatorRef;
|
||||
use catalog::{
|
||||
CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
|
||||
@@ -43,7 +43,7 @@ use common_telemetry::{debug, warn};
|
||||
use partition::manager::PartitionRuleManagerRef;
|
||||
use snafu::prelude::*;
|
||||
use table::metadata::TableId;
|
||||
use table::table::numbers::NumbersTable;
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::expr_factory;
|
||||
@@ -160,7 +160,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn register_catalog(&self, _name: String) -> CatalogResult<bool> {
|
||||
async fn register_catalog(self: Arc<Self>, _name: String) -> CatalogResult<bool> {
|
||||
unimplemented!("FrontendCatalogManager does not support registering catalog")
|
||||
}
|
||||
|
||||
@@ -318,6 +318,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
.kvs;
|
||||
|
||||
let mut res = HashSet::new();
|
||||
res.insert(INFORMATION_SCHEMA_NAME.to_string());
|
||||
for KeyValue { key: k, value: _ } in kvs {
|
||||
let key =
|
||||
SchemaKey::parse(String::from_utf8_lossy(&k)).context(InvalidCatalogValueSnafu)?;
|
||||
@@ -337,7 +338,11 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
.map(|(k, _)| k)
|
||||
.collect::<Vec<String>>();
|
||||
if catalog == DEFAULT_CATALOG_NAME && schema == DEFAULT_SCHEMA_NAME {
|
||||
tables.push("numbers".to_string());
|
||||
tables.push(NUMBERS_TABLE_NAME.to_string());
|
||||
}
|
||||
if schema == INFORMATION_SCHEMA_NAME {
|
||||
tables.push(TABLES.to_string());
|
||||
tables.push(COLUMNS.to_string());
|
||||
}
|
||||
|
||||
Ok(tables)
|
||||
@@ -356,6 +361,10 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
}
|
||||
|
||||
async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
|
||||
if schema == INFORMATION_SCHEMA_NAME {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let schema_key = SchemaKey {
|
||||
catalog_name: catalog.to_string(),
|
||||
schema_name: schema.to_string(),
|
||||
@@ -370,6 +379,10 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
}
|
||||
|
||||
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
|
||||
if schema == INFORMATION_SCHEMA_NAME && (table == TABLES || table == COLUMNS) {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let key = TableNameKey::new(catalog, schema, table);
|
||||
self.table_metadata_manager
|
||||
.table_name_manager()
|
||||
@@ -387,7 +400,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
) -> CatalogResult<Option<TableRef>> {
|
||||
if catalog == DEFAULT_CATALOG_NAME
|
||||
&& schema == DEFAULT_SCHEMA_NAME
|
||||
&& table_name == "numbers"
|
||||
&& table_name == NUMBERS_TABLE_NAME
|
||||
{
|
||||
return Ok(Some(Arc::new(NumbersTable::default())));
|
||||
}
|
||||
@@ -395,9 +408,12 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
if schema == INFORMATION_SCHEMA_NAME {
|
||||
// hack: use existing cyclin reference to get Arc<Self>.
|
||||
// This can be remove by refactoring the struct into something like Arc<Inner>
|
||||
common_telemetry::info!("going to use dist instance");
|
||||
let manager = if let Some(instance) = self.dist_instance.as_ref() {
|
||||
common_telemetry::info!("dist instance exist");
|
||||
instance.catalog_manager() as _
|
||||
} else {
|
||||
common_telemetry::info!("dist instance doesn't exist");
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
|
||||
@@ -79,7 +79,7 @@ const MAX_VALUE: &str = "MAXVALUE";
|
||||
#[derive(Clone)]
|
||||
pub struct DistInstance {
|
||||
meta_client: Arc<MetaClient>,
|
||||
catalog_manager: Arc<FrontendCatalogManager>,
|
||||
pub(crate) catalog_manager: Arc<FrontendCatalogManager>,
|
||||
datanode_clients: Arc<DatanodeClients>,
|
||||
}
|
||||
|
||||
|
||||
@@ -1389,7 +1389,7 @@ mod test {
|
||||
.build()
|
||||
.unwrap();
|
||||
let table = Arc::new(EmptyTable::from_table_info(&table_info));
|
||||
let catalog_list = Arc::new(MemoryCatalogManager::default());
|
||||
let catalog_list = MemoryCatalogManager::with_default_setup();
|
||||
assert!(catalog_list
|
||||
.register_table(RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
|
||||
@@ -20,7 +20,7 @@ use async_trait::async_trait;
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::client_manager::DatanodeClients;
|
||||
use common_base::bytes::Bytes;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::table_name::TableName;
|
||||
use datafusion::common::Result;
|
||||
@@ -93,13 +93,6 @@ impl ExtensionPlanner for DistExtensionPlanner {
|
||||
return Ok(Some(input_physical_plan));
|
||||
};
|
||||
|
||||
if table_name.schema_name == INFORMATION_SCHEMA_NAME {
|
||||
return planner
|
||||
.create_physical_plan(input_plan, session_state)
|
||||
.await
|
||||
.map(Some);
|
||||
}
|
||||
|
||||
let input_schema = input_physical_plan.schema().clone();
|
||||
let input_plan = self.set_table_name(&table_name, input_plan.clone())?;
|
||||
let substrait_plan: Bytes = DFLogicalSubstraitConvertor
|
||||
|
||||
@@ -379,7 +379,7 @@ mod test {
|
||||
.build()
|
||||
.unwrap();
|
||||
let table = Arc::new(EmptyTable::from_table_info(&table_info));
|
||||
let catalog_list = Arc::new(MemoryCatalogManager::default());
|
||||
let catalog_list = MemoryCatalogManager::with_default_setup();
|
||||
assert!(catalog_list
|
||||
.register_table(RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod show;
|
||||
mod show_create_table;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
@@ -185,7 +185,7 @@ pub async fn show_tables(
|
||||
pub fn show_create_table(table: TableRef, partitions: Option<Partitions>) -> Result<Output> {
|
||||
let table_info = table.table_info();
|
||||
let table_name = &table_info.name;
|
||||
let mut stmt = show::create_table_stmt(&table_info)?;
|
||||
let mut stmt = show_create_table::create_table_stmt(&table_info)?;
|
||||
stmt.partitions = partitions;
|
||||
let sql = format!("{}", stmt);
|
||||
let columns = vec![
|
||||
|
||||
@@ -11,6 +11,9 @@
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Implementation of `SHOW CREATE TABLE` statement.
|
||||
|
||||
use std::fmt::Display;
|
||||
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaRef, COMMENT_KEY};
|
||||
@@ -52,7 +52,7 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
|
||||
|
||||
pub fn new_query_engine_with_table(table: MemTable) -> QueryEngineRef {
|
||||
let table = Arc::new(table);
|
||||
let catalog_manager = Arc::new(MemoryCatalogManager::new_with_table(table));
|
||||
let catalog_manager = MemoryCatalogManager::new_with_table(table);
|
||||
|
||||
QueryEngineFactory::new(catalog_manager, false).query_engine()
|
||||
}
|
||||
|
||||
@@ -49,9 +49,7 @@ where
|
||||
}
|
||||
|
||||
pub(crate) fn sample_script_engine() -> PyEngine {
|
||||
let catalog_manager = Arc::new(MemoryCatalogManager::new_with_table(Arc::new(
|
||||
NumbersTable::default(),
|
||||
)));
|
||||
let catalog_manager = MemoryCatalogManager::new_with_table(Arc::new(NumbersTable::default()));
|
||||
let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
|
||||
|
||||
PyEngine::new(query_engine.clone())
|
||||
|
||||
@@ -369,9 +369,8 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
pub(crate) fn sample_script_engine() -> PyEngine {
|
||||
let catalog_manager = Arc::new(MemoryCatalogManager::new_with_table(Arc::new(
|
||||
NumbersTable::default(),
|
||||
)));
|
||||
let catalog_manager =
|
||||
MemoryCatalogManager::new_with_table(Arc::new(NumbersTable::default()));
|
||||
let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
|
||||
|
||||
PyEngine::new(query_engine.clone())
|
||||
|
||||
@@ -202,7 +202,7 @@ impl GrpcQueryHandler for DummyInstance {
|
||||
|
||||
fn create_testing_instance(table: MemTable) -> DummyInstance {
|
||||
let table = Arc::new(table);
|
||||
let catalog_manager = Arc::new(MemoryCatalogManager::new_with_table(table));
|
||||
let catalog_manager = MemoryCatalogManager::new_with_table(table);
|
||||
let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
|
||||
DummyInstance::new(query_engine)
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ impl TestEnv {
|
||||
let state_store = Arc::new(ObjectStateStore::new(object_store));
|
||||
let procedure_manager = Arc::new(LocalManager::new(config, state_store));
|
||||
|
||||
let catalog_manager = Arc::new(MemoryCatalogManager::default());
|
||||
let catalog_manager = MemoryCatalogManager::with_default_setup();
|
||||
|
||||
TestEnv {
|
||||
dir,
|
||||
|
||||
@@ -86,6 +86,7 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon
|
||||
|
||||
assert!(dn_instance
|
||||
.catalog_manager()
|
||||
.clone()
|
||||
.register_catalog("another_catalog".to_string())
|
||||
.await
|
||||
.is_ok());
|
||||
|
||||
@@ -392,11 +392,14 @@ async fn test_execute_show_databases_tables(instance: Arc<dyn MockInstance>) {
|
||||
Output::RecordBatches(databases) => {
|
||||
let databases = databases.take();
|
||||
assert_eq!(1, databases[0].num_columns());
|
||||
assert_eq!(databases[0].column(0).len(), 1);
|
||||
assert_eq!(databases[0].column(0).len(), 2);
|
||||
|
||||
assert_eq!(
|
||||
*databases[0].column(0),
|
||||
Arc::new(StringVector::from(vec![Some("public")])) as VectorRef
|
||||
Arc::new(StringVector::from(vec![
|
||||
Some("information_schema"),
|
||||
Some("public")
|
||||
])) as VectorRef
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
@@ -1390,21 +1393,25 @@ async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {
|
||||
let expected = match is_distributed_mode {
|
||||
true => {
|
||||
"\
|
||||
+---------------+--------------+------------+-----------------+----------+-------------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+---------------+--------------+------------+-----------------+----------+-------------+
|
||||
| greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine |
|
||||
| greptime | public | scripts | BASE TABLE | 1024 | mito |
|
||||
+---------------+--------------+------------+-----------------+----------+-------------+"
|
||||
+---------------+--------------------+------------+-----------------+----------+-------------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+---------------+--------------------+------------+-----------------+----------+-------------+
|
||||
| greptime | information_schema | columns | LOCAL TEMPORARY | 4 | |
|
||||
| greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine |
|
||||
| greptime | public | scripts | BASE TABLE | 1024 | mito |
|
||||
| greptime | information_schema | tables | LOCAL TEMPORARY | 3 | |
|
||||
+---------------+--------------------+------------+-----------------+----------+-------------+"
|
||||
}
|
||||
false => {
|
||||
"\
|
||||
+---------------+--------------+------------+-----------------+----------+-------------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+---------------+--------------+------------+-----------------+----------+-------------+
|
||||
| greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine |
|
||||
| greptime | public | scripts | BASE TABLE | 1 | mito |
|
||||
+---------------+--------------+------------+-----------------+----------+-------------+"
|
||||
+---------------+--------------------+------------+-----------------+----------+-------------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+---------------+--------------------+------------+-----------------+----------+-------------+
|
||||
| greptime | information_schema | columns | LOCAL TEMPORARY | 4 | |
|
||||
| greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine |
|
||||
| greptime | public | scripts | BASE TABLE | 1 | mito |
|
||||
| greptime | information_schema | tables | LOCAL TEMPORARY | 3 | |
|
||||
+---------------+--------------------+------------+-----------------+----------+-------------+"
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1414,19 +1421,23 @@ async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {
|
||||
let expected = match is_distributed_mode {
|
||||
true => {
|
||||
"\
|
||||
+-----------------+----------------+---------------+------------+----------+--------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+-----------------+----------------+---------------+------------+----------+--------+
|
||||
| another_catalog | another_schema | another_table | BASE TABLE | 1025 | mito |
|
||||
+-----------------+----------------+---------------+------------+----------+--------+"
|
||||
+-----------------+--------------------+---------------+-----------------+----------+--------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+-----------------+--------------------+---------------+-----------------+----------+--------+
|
||||
| another_catalog | another_schema | another_table | BASE TABLE | 1025 | mito |
|
||||
| another_catalog | information_schema | columns | LOCAL TEMPORARY | 4 | |
|
||||
| another_catalog | information_schema | tables | LOCAL TEMPORARY | 3 | |
|
||||
+-----------------+--------------------+---------------+-----------------+----------+--------+"
|
||||
}
|
||||
false => {
|
||||
"\
|
||||
+-----------------+----------------+---------------+------------+----------+--------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+-----------------+----------------+---------------+------------+----------+--------+
|
||||
| another_catalog | another_schema | another_table | BASE TABLE | 1024 | mito |
|
||||
+-----------------+----------------+---------------+------------+----------+--------+"
|
||||
+-----------------+--------------------+---------------+-----------------+----------+--------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+-----------------+--------------------+---------------+-----------------+----------+--------+
|
||||
| another_catalog | another_schema | another_table | BASE TABLE | 1024 | mito |
|
||||
| another_catalog | information_schema | columns | LOCAL TEMPORARY | 4 | |
|
||||
| another_catalog | information_schema | tables | LOCAL TEMPORARY | 3 | |
|
||||
+-----------------+--------------------+---------------+-----------------+----------+--------+"
|
||||
}
|
||||
};
|
||||
check_output_stream(output, expected).await;
|
||||
@@ -1447,28 +1458,52 @@ async fn test_information_schema_dot_columns(instance: Arc<dyn MockInstance>) {
|
||||
|
||||
let output = execute_sql(&instance, sql).await;
|
||||
let expected = "\
|
||||
+---------------+--------------+------------+--------------+----------------------+---------------+
|
||||
| table_catalog | table_schema | table_name | column_name | data_type | semantic_type |
|
||||
+---------------+--------------+------------+--------------+----------------------+---------------+
|
||||
| greptime | public | numbers | number | UInt32 | PRIMARY KEY |
|
||||
| greptime | public | scripts | schema | String | PRIMARY KEY |
|
||||
| greptime | public | scripts | name | String | PRIMARY KEY |
|
||||
| greptime | public | scripts | script | String | FIELD |
|
||||
| greptime | public | scripts | engine | String | FIELD |
|
||||
| greptime | public | scripts | timestamp | TimestampMillisecond | TIME INDEX |
|
||||
| greptime | public | scripts | gmt_created | TimestampMillisecond | FIELD |
|
||||
| greptime | public | scripts | gmt_modified | TimestampMillisecond | FIELD |
|
||||
+---------------+--------------+------------+--------------+----------------------+---------------+";
|
||||
+---------------+--------------------+------------+---------------+----------------------+---------------+
|
||||
| table_catalog | table_schema | table_name | column_name | data_type | semantic_type |
|
||||
+---------------+--------------------+------------+---------------+----------------------+---------------+
|
||||
| greptime | information_schema | columns | table_catalog | String | FIELD |
|
||||
| greptime | information_schema | columns | table_schema | String | FIELD |
|
||||
| greptime | information_schema | columns | table_name | String | FIELD |
|
||||
| greptime | information_schema | columns | column_name | String | FIELD |
|
||||
| greptime | information_schema | columns | data_type | String | FIELD |
|
||||
| greptime | information_schema | columns | semantic_type | String | FIELD |
|
||||
| greptime | public | numbers | number | UInt32 | PRIMARY KEY |
|
||||
| greptime | public | scripts | schema | String | PRIMARY KEY |
|
||||
| greptime | public | scripts | name | String | PRIMARY KEY |
|
||||
| greptime | public | scripts | script | String | FIELD |
|
||||
| greptime | public | scripts | engine | String | FIELD |
|
||||
| greptime | public | scripts | timestamp | TimestampMillisecond | TIME INDEX |
|
||||
| greptime | public | scripts | gmt_created | TimestampMillisecond | FIELD |
|
||||
| greptime | public | scripts | gmt_modified | TimestampMillisecond | FIELD |
|
||||
| greptime | information_schema | tables | table_catalog | String | FIELD |
|
||||
| greptime | information_schema | tables | table_schema | String | FIELD |
|
||||
| greptime | information_schema | tables | table_name | String | FIELD |
|
||||
| greptime | information_schema | tables | table_type | String | FIELD |
|
||||
| greptime | information_schema | tables | table_id | UInt32 | FIELD |
|
||||
| greptime | information_schema | tables | engine | String | FIELD |
|
||||
+---------------+--------------------+------------+---------------+----------------------+---------------+";
|
||||
|
||||
check_output_stream(output, expected).await;
|
||||
|
||||
let output = execute_sql_with(&instance, sql, query_ctx).await;
|
||||
let expected = "\
|
||||
+-----------------+----------------+---------------+-------------+-----------+---------------+
|
||||
| table_catalog | table_schema | table_name | column_name | data_type | semantic_type |
|
||||
+-----------------+----------------+---------------+-------------+-----------+---------------+
|
||||
| another_catalog | another_schema | another_table | i | Int64 | TIME INDEX |
|
||||
+-----------------+----------------+---------------+-------------+-----------+---------------+";
|
||||
+-----------------+--------------------+---------------+---------------+-----------+---------------+
|
||||
| table_catalog | table_schema | table_name | column_name | data_type | semantic_type |
|
||||
+-----------------+--------------------+---------------+---------------+-----------+---------------+
|
||||
| another_catalog | another_schema | another_table | i | Int64 | TIME INDEX |
|
||||
| another_catalog | information_schema | columns | table_catalog | String | FIELD |
|
||||
| another_catalog | information_schema | columns | table_schema | String | FIELD |
|
||||
| another_catalog | information_schema | columns | table_name | String | FIELD |
|
||||
| another_catalog | information_schema | columns | column_name | String | FIELD |
|
||||
| another_catalog | information_schema | columns | data_type | String | FIELD |
|
||||
| another_catalog | information_schema | columns | semantic_type | String | FIELD |
|
||||
| another_catalog | information_schema | tables | table_catalog | String | FIELD |
|
||||
| another_catalog | information_schema | tables | table_schema | String | FIELD |
|
||||
| another_catalog | information_schema | tables | table_name | String | FIELD |
|
||||
| another_catalog | information_schema | tables | table_type | String | FIELD |
|
||||
| another_catalog | information_schema | tables | table_id | UInt32 | FIELD |
|
||||
| another_catalog | information_schema | tables | engine | String | FIELD |
|
||||
+-----------------+--------------------+---------------+---------------+-----------+---------------+";
|
||||
|
||||
check_output_stream(output, expected).await;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
show databases;
|
||||
|
||||
+-----------------------+
|
||||
| Schemas |
|
||||
+-----------------------+
|
||||
| information_schema |
|
||||
| public |
|
||||
| test_public_schema |
|
||||
| upper_case_table_name |
|
||||
+-----------------------+
|
||||
|
||||
use information_schema;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
show tables;
|
||||
|
||||
+---------+
|
||||
| Tables |
|
||||
+---------+
|
||||
| columns |
|
||||
| tables |
|
||||
+---------+
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
show databases;
|
||||
|
||||
use information_schema;
|
||||
|
||||
show tables;
|
||||
@@ -1,3 +1,44 @@
|
||||
-- scripts table has different table ids in different modes
|
||||
select *
|
||||
from information_schema.tables
|
||||
where table_name != 'scripts'
|
||||
order by table_schema, table_name;
|
||||
|
||||
+---------------+--------------------+------------+-----------------+----------+-------------+
|
||||
| table_catalog | table_schema | table_name | table_type | table_id | engine |
|
||||
+---------------+--------------------+------------+-----------------+----------+-------------+
|
||||
| greptime | information_schema | columns | LOCAL TEMPORARY | 4 | |
|
||||
| greptime | information_schema | tables | LOCAL TEMPORARY | 3 | |
|
||||
| greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine |
|
||||
+---------------+--------------------+------------+-----------------+----------+-------------+
|
||||
|
||||
select * from information_schema.columns order by table_schema, table_name;
|
||||
|
||||
+---------------+--------------------+------------+---------------+----------------------+---------------+
|
||||
| table_catalog | table_schema | table_name | column_name | data_type | semantic_type |
|
||||
+---------------+--------------------+------------+---------------+----------------------+---------------+
|
||||
| greptime | information_schema | columns | table_catalog | String | FIELD |
|
||||
| greptime | information_schema | columns | table_schema | String | FIELD |
|
||||
| greptime | information_schema | columns | table_name | String | FIELD |
|
||||
| greptime | information_schema | columns | column_name | String | FIELD |
|
||||
| greptime | information_schema | columns | data_type | String | FIELD |
|
||||
| greptime | information_schema | columns | semantic_type | String | FIELD |
|
||||
| greptime | information_schema | tables | table_catalog | String | FIELD |
|
||||
| greptime | information_schema | tables | table_schema | String | FIELD |
|
||||
| greptime | information_schema | tables | table_name | String | FIELD |
|
||||
| greptime | information_schema | tables | table_type | String | FIELD |
|
||||
| greptime | information_schema | tables | table_id | UInt32 | FIELD |
|
||||
| greptime | information_schema | tables | engine | String | FIELD |
|
||||
| greptime | public | numbers | number | UInt32 | PRIMARY KEY |
|
||||
| greptime | public | scripts | schema | String | PRIMARY KEY |
|
||||
| greptime | public | scripts | name | String | PRIMARY KEY |
|
||||
| greptime | public | scripts | script | String | FIELD |
|
||||
| greptime | public | scripts | engine | String | FIELD |
|
||||
| greptime | public | scripts | timestamp | TimestampMillisecond | TIME INDEX |
|
||||
| greptime | public | scripts | gmt_created | TimestampMillisecond | FIELD |
|
||||
| greptime | public | scripts | gmt_modified | TimestampMillisecond | FIELD |
|
||||
+---------------+--------------------+------------+---------------+----------------------+---------------+
|
||||
|
||||
create
|
||||
database my_db;
|
||||
|
||||
@@ -29,6 +70,7 @@ select table_catalog, table_schema, table_name, table_type, engine
|
||||
from information_schema.tables
|
||||
where table_catalog = 'greptime'
|
||||
and table_schema != 'public'
|
||||
and table_schema != 'information_schema'
|
||||
order by table_schema, table_name;
|
||||
|
||||
+---------------+--------------+------------+------------+--------+
|
||||
@@ -41,6 +83,7 @@ select table_catalog, table_schema, table_name, column_name, data_type, semantic
|
||||
from information_schema.columns
|
||||
where table_catalog = 'greptime'
|
||||
and table_schema != 'public'
|
||||
and table_schema != 'information_schema'
|
||||
order by table_schema, table_name;
|
||||
|
||||
+---------------+--------------+------------+-------------+-----------+---------------+
|
||||
@@ -53,3 +96,7 @@ use public;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop schema my_db;
|
||||
|
||||
Error: 1001(Unsupported), SQL statement is not supported: drop schema my_db;, keyword: schema
|
||||
|
||||
|
||||
@@ -1,3 +1,11 @@
|
||||
-- scripts table has different table ids in different modes
|
||||
select *
|
||||
from information_schema.tables
|
||||
where table_name != 'scripts'
|
||||
order by table_schema, table_name;
|
||||
|
||||
select * from information_schema.columns order by table_schema, table_name;
|
||||
|
||||
create
|
||||
database my_db;
|
||||
|
||||
@@ -17,12 +25,16 @@ select table_catalog, table_schema, table_name, table_type, engine
|
||||
from information_schema.tables
|
||||
where table_catalog = 'greptime'
|
||||
and table_schema != 'public'
|
||||
and table_schema != 'information_schema'
|
||||
order by table_schema, table_name;
|
||||
|
||||
select table_catalog, table_schema, table_name, column_name, data_type, semantic_type
|
||||
from information_schema.columns
|
||||
where table_catalog = 'greptime'
|
||||
and table_schema != 'public'
|
||||
and table_schema != 'information_schema'
|
||||
order by table_schema, table_name;
|
||||
|
||||
use public;
|
||||
|
||||
drop schema my_db;
|
||||
|
||||
Reference in New Issue
Block a user