refactor: merge catalog provider & schema provider into catalog manager (#1803)

* move  to expr_factory

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* move configs into service_config

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* move GrpcQueryHandler into distributed.rs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix compile and test in catalog sub-crate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix table-procedure compile and test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix query compile and tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix datanode compile and tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix catalog/query/script/servers compile

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix frontend compile

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix nextest except information_schema

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* support information_schema

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix merge errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove other structs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change deregister_table's return type to empty tuple

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-06-26 15:08:59 +08:00
committed by GitHub
parent 964d26e415
commit a95f8767a8
50 changed files with 1412 additions and 2100 deletions

View File

@@ -192,11 +192,18 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display(
"Failed to upgrade weak catalog manager reference. location: {}",
location
))]
UpgradeWeakCatalogManagerRef { location: Location },
#[snafu(display("Failed to execute system catalog table scan, source: {}", source))]
SystemCatalogTableScanExec {
location: Location,
source: common_query::error::Error,
},
#[snafu(display("Cannot parse catalog value, source: {}", source))]
InvalidCatalogValue {
location: Location,
@@ -256,7 +263,9 @@ impl ErrorExt for Error {
| Error::EmptyValue { .. }
| Error::ValueDeserialize { .. } => StatusCode::StorageUnavailable,
Error::Generic { .. } | Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal,
Error::Generic { .. }
| Error::SystemCatalogTypeMismatch { .. }
| Error::UpgradeWeakCatalogManagerRef { .. } => StatusCode::Internal,
Error::ReadSystemCatalog { source, .. } | Error::CreateRecordBatch { source, .. } => {
source.status_code()

View File

@@ -67,6 +67,7 @@ pub fn build_schema_prefix(catalog_name: impl AsRef<str>) -> String {
format!("{SCHEMA_KEY_PREFIX}-{}-", catalog_name.as_ref())
}
/// Global table info has only one key across all datanodes so it does not have `node_id` field.
pub fn build_table_global_prefix(
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
@@ -78,6 +79,7 @@ pub fn build_table_global_prefix(
)
}
/// Regional table info varies between datanode, so it contains a `node_id` field.
pub fn build_table_regional_prefix(
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,

View File

@@ -16,7 +16,7 @@ mod columns;
mod tables;
use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use async_trait::async_trait;
use common_error::prelude::BoxedError;
@@ -33,46 +33,35 @@ use table::{Result as TableResult, Table, TableRef};
use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::tables::InformationSchemaTables;
use crate::{CatalogProviderRef, SchemaProvider};
use crate::CatalogManager;
const TABLES: &str = "tables";
const COLUMNS: &str = "columns";
pub(crate) struct InformationSchemaProvider {
pub struct InformationSchemaProvider {
catalog_name: String,
catalog_provider: CatalogProviderRef,
tables: Vec<String>,
catalog_manager: Weak<dyn CatalogManager>,
}
impl InformationSchemaProvider {
pub(crate) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
catalog_name,
catalog_provider,
tables: vec![TABLES.to_string(), COLUMNS.to_string()],
catalog_manager,
}
}
}
#[async_trait]
impl SchemaProvider for InformationSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
async fn table_names(&self) -> Result<Vec<String>> {
Ok(self.tables.clone())
}
async fn table(&self, name: &str) -> Result<Option<TableRef>> {
impl InformationSchemaProvider {
pub fn table(&self, name: &str) -> Result<Option<TableRef>> {
let stream_builder = match name.to_ascii_lowercase().as_ref() {
TABLES => Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)) as _,
COLUMNS => Arc::new(InformationSchemaColumns::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)) as _,
_ => {
return Ok(None);
@@ -81,11 +70,6 @@ impl SchemaProvider for InformationSchemaProvider {
Ok(Some(Arc::new(InformationTable::new(stream_builder))))
}
async fn table_exist(&self, name: &str) -> Result<bool> {
let normalized_name = name.to_ascii_lowercase();
Ok(self.tables.contains(&normalized_name))
}
}
// TODO(ruihang): make it a more generic trait:

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::{
@@ -29,16 +29,18 @@ use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use super::InformationStreamBuilder;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::CatalogProviderRef;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::CatalogManager;
pub(super) struct InformationSchemaColumns {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,
}
const TABLE_CATALOG: &str = "table_catalog";
@@ -49,7 +51,7 @@ const DATA_TYPE: &str = "data_type";
const SEMANTIC_TYPE: &str = "semantic_type";
impl InformationSchemaColumns {
pub(super) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
@@ -61,7 +63,7 @@ impl InformationSchemaColumns {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
}
}
@@ -69,7 +71,7 @@ impl InformationSchemaColumns {
InformationSchemaColumnsBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)
}
}
@@ -103,7 +105,7 @@ impl InformationStreamBuilder for InformationSchemaColumns {
struct InformationSchemaColumnsBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,
catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
@@ -114,11 +116,15 @@ struct InformationSchemaColumnsBuilder {
}
impl InformationSchemaColumnsBuilder {
fn new(schema: SchemaRef, catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
table_names: StringVectorBuilder::with_capacity(42),
@@ -131,11 +137,23 @@ impl InformationSchemaColumnsBuilder {
/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
for schema_name in self.catalog_provider.schema_names().await? {
let Some(schema) = self.catalog_provider.schema(&schema_name).await? else { continue };
for table_name in schema.table_names().await? {
let Some(table) = schema.table(&table_name).await? else { continue };
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exist(&catalog_name, &schema_name)
.await?
{
continue;
}
for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
let Some(table) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await? else { continue };
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
for (idx, column) in schema.column_schemas().iter().enumerate() {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_NAME;
@@ -26,21 +26,23 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableType;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::InformationStreamBuilder;
use crate::CatalogProviderRef;
use crate::CatalogManager;
pub(super) struct InformationSchemaTables {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,
}
impl InformationSchemaTables {
pub(super) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false),
@@ -52,7 +54,7 @@ impl InformationSchemaTables {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
}
}
@@ -60,7 +62,7 @@ impl InformationSchemaTables {
InformationSchemaTablesBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)
}
}
@@ -97,7 +99,7 @@ impl InformationStreamBuilder for InformationSchemaTables {
struct InformationSchemaTablesBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,
catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
@@ -108,11 +110,15 @@ struct InformationSchemaTablesBuilder {
}
impl InformationSchemaTablesBuilder {
fn new(schema: SchemaRef, catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
table_names: StringVectorBuilder::with_capacity(42),
@@ -125,15 +131,27 @@ impl InformationSchemaTablesBuilder {
/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
for schema_name in self.catalog_provider.schema_names().await? {
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if schema_name == INFORMATION_SCHEMA_NAME {
continue;
}
if !catalog_manager
.schema_exist(&catalog_name, &schema_name)
.await?
{
continue;
}
let Some(schema) = self.catalog_provider.schema(&schema_name).await? else { continue };
for table_name in schema.table_names().await? {
let Some(table) = schema.table(&table_name).await? else { continue };
for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
let Some(table) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await? else { continue };
let table_info = table.table_info();
self.add_table(
&catalog_name,

View File

@@ -14,6 +14,7 @@
#![feature(trait_upcasting)]
#![feature(assert_matches)]
#![feature(try_blocks)]
use std::any::Any;
use std::collections::HashMap;
@@ -29,65 +30,46 @@ use table::requests::CreateTableRequest;
use table::TableRef;
use crate::error::{CreateTableSnafu, Result};
pub use crate::schema::{SchemaProvider, SchemaProviderRef};
pub mod error;
pub mod helper;
pub(crate) mod information_schema;
pub mod information_schema;
pub mod local;
mod metrics;
pub mod remote;
pub mod schema;
pub mod system;
pub mod table_source;
pub mod tables;
/// Represents a catalog, comprising a number of named schemas.
#[async_trait::async_trait]
pub trait CatalogProvider: Sync + Send {
/// Returns the catalog provider as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Retrieves the list of available schema names in this catalog.
async fn schema_names(&self) -> Result<Vec<String>>;
/// Registers schema to this catalog.
async fn register_schema(
&self,
name: String,
schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>>;
/// Retrieves a specific schema from the catalog by name, provided it exists.
async fn schema(&self, name: &str) -> Result<Option<SchemaProviderRef>>;
}
pub type CatalogProviderRef = Arc<dyn CatalogProvider>;
#[async_trait::async_trait]
pub trait CatalogManager: Send + Sync {
fn as_any(&self) -> &dyn Any;
/// Starts a catalog manager.
async fn start(&self) -> Result<()>;
async fn register_catalog(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>>;
/// Registers a table within given catalog/schema to catalog manager,
/// returns whether the table registered.
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool>;
/// Deregisters a table within given catalog/schema to catalog manager,
/// returns whether the table deregistered.
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool>;
/// Registers a catalog to catalog manager, returns whether the catalog exist before.
async fn register_catalog(&self, name: String) -> Result<bool>;
/// Register a schema with catalog name and schema name. Retuens whether the
/// schema registered.
///
/// # Errors
///
/// This method will/should fail if catalog not exist
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
/// Registers a table within given catalog/schema to catalog manager,
/// returns whether the table registered.
///
/// # Errors
///
/// This method will/should fail if catalog or schema not exist
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool>;
/// Deregisters a table within given catalog/schema to catalog manager
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()>;
/// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed.
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool>;
@@ -97,9 +79,15 @@ pub trait CatalogManager: Send + Sync {
async fn catalog_names(&self) -> Result<Vec<String>>;
async fn catalog(&self, catalog: &str) -> Result<Option<CatalogProviderRef>>;
async fn schema_names(&self, catalog: &str) -> Result<Vec<String>>;
async fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>>;
async fn table_names(&self, catalog: &str, schema: &str) -> Result<Vec<String>>;
async fn catalog_exist(&self, catalog: &str) -> Result<bool>;
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool>;
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool>;
/// Returns the table by catalog, schema and table name.
async fn table(
@@ -108,8 +96,6 @@ pub trait CatalogManager: Send + Sync {
schema: &str,
table_name: &str,
) -> Result<Option<TableRef>>;
fn as_any(&self) -> &dyn Any;
}
pub type CatalogManagerRef = Arc<dyn CatalogManager>;
@@ -169,14 +155,6 @@ pub struct RegisterSchemaRequest {
pub schema: String,
}
pub trait CatalogProviderFactory {
fn create(&self, catalog_name: String) -> CatalogProviderRef;
}
pub trait SchemaProviderFactory {
fn create(&self, catalog_name: String, schema_name: String) -> SchemaProviderRef;
}
pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
manager: &'a M,
engine: TableEngineRef,
@@ -233,15 +211,11 @@ pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec<Reg
let Ok(catalog_names) = catalog_manager.catalog_names().await else { return (region_number, region_stats) };
for catalog_name in catalog_names {
let Ok(Some(catalog)) = catalog_manager.catalog(&catalog_name).await else { continue };
let Ok(schema_names) = catalog.schema_names().await else { continue };
let Ok(schema_names) = catalog_manager.schema_names(&catalog_name).await else { continue };
for schema_name in schema_names {
let Ok(Some(schema)) = catalog.schema(&schema_name).await else { continue };
let Ok(table_names) = schema.table_names().await else { continue };
let Ok(table_names) = catalog_manager.table_names(&catalog_name,&schema_name).await else { continue };
for table_name in table_names {
let Ok(Some(table)) = schema.table(&table_name).await else { continue };
let Ok(Some(table)) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await else { continue };
let region_numbers = &table.table_info().meta.region_numbers;
region_number += region_numbers.len() as u64;

View File

@@ -16,6 +16,4 @@ pub mod manager;
pub mod memory;
pub use manager::LocalCatalogManager;
pub use memory::{
new_memory_catalog_list, MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider,
};
pub use memory::{new_memory_catalog_manager, MemoryCatalogManager};

View File

@@ -18,7 +18,8 @@ use std::sync::Arc;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID,
MITO_ENGINE, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME,
MITO_ENGINE, NUMBERS_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID,
SYSTEM_CATALOG_TABLE_NAME,
};
use common_catalog::format_full_table_name;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
@@ -32,7 +33,7 @@ use table::engine::manager::TableEngineManagerRef;
use table::engine::EngineContext;
use table::metadata::TableId;
use table::requests::OpenTableRequest;
use table::table::numbers::NumbersTable;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::table::TableIdProvider;
use table::TableRef;
@@ -42,16 +43,16 @@ use crate::error::{
SystemCatalogTypeMismatchSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, TableNotExistSnafu,
TableNotFoundSnafu,
};
use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use crate::information_schema::InformationSchemaProvider;
use crate::local::memory::MemoryCatalogManager;
use crate::system::{
decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX,
VALUE_INDEX,
};
use crate::tables::SystemCatalog;
use crate::{
handle_system_table_request, CatalogManager, CatalogProviderRef, DeregisterTableRequest,
handle_system_table_request, CatalogManager, CatalogManagerRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProviderRef,
};
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
@@ -74,11 +75,11 @@ impl LocalCatalogManager {
engine_name: MITO_ENGINE,
})?;
let table = SystemCatalogTable::new(engine.clone()).await?;
let memory_catalog_list = crate::local::memory::new_memory_catalog_list()?;
let memory_catalog_manager = crate::local::memory::new_memory_catalog_manager()?;
let system_catalog = Arc::new(SystemCatalog::new(table));
Ok(Self {
system: system_catalog,
catalogs: memory_catalog_list,
catalogs: memory_catalog_manager,
engine_manager,
next_table_id: AtomicU32::new(MIN_USER_TABLE_ID),
init_lock: Mutex::new(false),
@@ -116,26 +117,44 @@ impl LocalCatalogManager {
}
async fn init_system_catalog(&self) -> Result<()> {
let system_schema = Arc::new(MemorySchemaProvider::new());
system_schema.register_table_sync(
SYSTEM_CATALOG_TABLE_NAME.to_string(),
self.system.information_schema.system.clone(),
)?;
let system_catalog = Arc::new(MemoryCatalogProvider::new());
system_catalog.register_schema_sync(INFORMATION_SCHEMA_NAME.to_string(), system_schema)?;
// register SystemCatalogTable
self.catalogs
.register_catalog_sync(SYSTEM_CATALOG_NAME.to_string(), system_catalog)?;
.register_catalog_sync(SYSTEM_CATALOG_NAME.to_string())?;
self.catalogs.register_schema_sync(RegisterSchemaRequest {
catalog: SYSTEM_CATALOG_NAME.to_string(),
schema: INFORMATION_SCHEMA_NAME.to_string(),
})?;
let register_table_req = RegisterTableRequest {
catalog: SYSTEM_CATALOG_NAME.to_string(),
schema: INFORMATION_SCHEMA_NAME.to_string(),
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
table_id: SYSTEM_CATALOG_TABLE_ID,
table: self.system.information_schema.system.clone(),
};
self.catalogs.register_table(register_table_req).await?;
let default_catalog = Arc::new(MemoryCatalogProvider::new());
let default_schema = Arc::new(MemorySchemaProvider::new());
// register default catalog and default schema
self.catalogs
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string())?;
self.catalogs.register_schema_sync(RegisterSchemaRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
})?;
// Add numbers table for test
let table = Arc::new(NumbersTable::default());
default_schema.register_table_sync("numbers".to_string(), table)?;
let numbers_table = Arc::new(NumbersTable::default());
let register_number_table_req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: NUMBERS_TABLE_NAME.to_string(),
table_id: NUMBERS_TABLE_ID,
table: numbers_table,
};
default_catalog.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?;
self.catalogs
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)?;
.register_table(register_number_table_req)
.await?;
Ok(())
}
@@ -207,24 +226,16 @@ impl LocalCatalogManager {
for entry in entries {
match entry {
Entry::Catalog(c) => {
self.catalogs.register_catalog_if_absent(
c.catalog_name.clone(),
Arc::new(MemoryCatalogProvider::new()),
);
self.catalogs
.register_catalog_if_absent(c.catalog_name.clone());
info!("Register catalog: {}", c.catalog_name);
}
Entry::Schema(s) => {
self.catalogs
.catalog(&s.catalog_name)
.await?
.context(CatalogNotFoundSnafu {
catalog_name: &s.catalog_name,
})?
.register_schema(
s.schema_name.clone(),
Arc::new(MemorySchemaProvider::new()),
)
.await?;
let req = RegisterSchemaRequest {
catalog: s.catalog_name.clone(),
schema: s.schema_name.clone(),
};
self.catalogs.register_schema_sync(req)?;
info!("Registered schema: {:?}", s);
}
Entry::Table(t) => {
@@ -245,23 +256,11 @@ impl LocalCatalogManager {
}
async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> {
let catalog =
self.catalogs
.catalog(&t.catalog_name)
.await?
.context(CatalogNotFoundSnafu {
catalog_name: &t.catalog_name,
})?;
let schema = catalog
.schema(&t.schema_name)
.await?
.context(SchemaNotFoundSnafu {
catalog: &t.catalog_name,
schema: &t.schema_name,
})?;
self.check_catalog_schema_exist(&t.catalog_name, &t.schema_name)
.await?;
let context = EngineContext {};
let request = OpenTableRequest {
let open_request = OpenTableRequest {
catalog_name: t.catalog_name.clone(),
schema_name: t.schema_name.clone(),
table_name: t.table_name.clone(),
@@ -275,8 +274,8 @@ impl LocalCatalogManager {
engine_name: &t.engine,
})?;
let option = engine
.open_table(&context, request)
let table_ref = engine
.open_table(&context, open_request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!(
@@ -291,7 +290,48 @@ impl LocalCatalogManager {
),
})?;
schema.register_table(t.table_name.clone(), option).await?;
let register_request = RegisterTableRequest {
catalog: t.catalog_name.clone(),
schema: t.schema_name.clone(),
table_name: t.table_name.clone(),
table_id: t.table_id,
table: table_ref,
};
self.catalogs.register_table(register_request).await?;
Ok(())
}
async fn check_state(&self) -> Result<()> {
let started = self.init_lock.lock().await;
ensure!(
*started,
IllegalManagerStateSnafu {
msg: "Catalog manager not started",
}
);
Ok(())
}
async fn check_catalog_schema_exist(
&self,
catalog_name: &str,
schema_name: &str,
) -> Result<()> {
if !self.catalogs.catalog_exist(catalog_name).await? {
return CatalogNotFoundSnafu { catalog_name }.fail()?;
}
if !self
.catalogs
.schema_exist(catalog_name, schema_name)
.await?
{
return SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
}
.fail()?;
}
Ok(())
}
}
@@ -312,34 +352,21 @@ impl CatalogManager for LocalCatalogManager {
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let started = self.init_lock.lock().await;
self.check_state().await?;
ensure!(
*started,
IllegalManagerStateSnafu {
msg: "Catalog manager not started",
}
);
let catalog_name = request.catalog.clone();
let schema_name = request.schema.clone();
let catalog_name = &request.catalog;
let schema_name = &request.schema;
let catalog = self
.catalogs
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
let schema = catalog
.schema(schema_name)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?;
self.check_catalog_schema_exist(&catalog_name, &schema_name)
.await?;
{
let _lock = self.register_lock.lock().await;
if let Some(existing) = schema.table(&request.table_name).await? {
if let Some(existing) = self
.catalogs
.table(&request.catalog, &request.schema, &request.table_name)
.await?
{
if existing.table_info().ident.table_id != request.table_id {
error!(
"Unexpected table register request: {:?}, existing: {:?}",
@@ -348,8 +375,8 @@ impl CatalogManager for LocalCatalogManager {
);
return TableExistsSnafu {
table: format_full_table_name(
catalog_name,
schema_name,
&catalog_name,
&schema_name,
&request.table_name,
),
}
@@ -358,24 +385,24 @@ impl CatalogManager for LocalCatalogManager {
// Try to register table with same table id, just ignore.
Ok(false)
} else {
let engine = request.table.table_info().meta.engine.to_string();
// table does not exist
let engine = request.table.table_info().meta.engine.to_string();
let table_name = request.table_name.clone();
let table_id = request.table_id;
self.catalogs.register_table(request).await?;
self.system
.register_table(
catalog_name.clone(),
schema_name.clone(),
request.table_name.clone(),
request.table_id,
table_name,
table_id,
engine,
)
.await?;
schema
.register_table(request.table_name, request.table)
.await?;
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(catalog_name, schema_name)],
&[crate::metrics::db_label(&catalog_name, &schema_name)],
);
Ok(true)
}
@@ -383,41 +410,27 @@ impl CatalogManager for LocalCatalogManager {
}
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
let started = self.init_lock.lock().await;
ensure!(
*started,
IllegalManagerStateSnafu {
msg: "Catalog manager not started",
}
);
self.check_state().await?;
let catalog_name = &request.catalog;
let schema_name = &request.schema;
let catalog = self
.catalogs
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
let schema = catalog
.schema(schema_name)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?;
let _lock = self.register_lock.lock().await;
self.check_catalog_schema_exist(catalog_name, schema_name)
.await?;
ensure!(
!schema.table_exist(&request.new_table_name).await?,
self.catalogs
.table(catalog_name, schema_name, &request.new_table_name)
.await?
.is_none(),
TableExistsSnafu {
table: &request.new_table_name
}
);
let old_table = schema
.table(&request.table_name)
let _lock = self.register_lock.lock().await;
let old_table = self
.catalogs
.table(catalog_name, schema_name, &request.table_name)
.await?
.context(TableNotExistSnafu {
table: &request.table_name,
@@ -435,18 +448,11 @@ impl CatalogManager for LocalCatalogManager {
)
.await?;
let renamed = schema
.rename_table(&request.table_name, request.new_table_name.clone())
.await
.is_ok();
Ok(renamed)
self.catalogs.rename_table(request).await
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
{
let started = *self.init_lock.lock().await;
ensure!(started, IllegalManagerStateSnafu { msg: "not started" });
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> {
self.check_state().await?;
{
let _ = self.register_lock.lock().await;
@@ -473,52 +479,39 @@ impl CatalogManager for LocalCatalogManager {
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
let started = self.init_lock.lock().await;
ensure!(
*started,
IllegalManagerStateSnafu {
msg: "Catalog manager not started",
}
);
self.check_state().await?;
let catalog_name = &request.catalog;
let schema_name = &request.schema;
let catalog = self
.catalogs
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
if !self.catalogs.catalog_exist(catalog_name).await? {
return CatalogNotFoundSnafu { catalog_name }.fail()?;
}
{
let _lock = self.register_lock.lock().await;
ensure!(
catalog.schema(schema_name).await?.is_none(),
!self
.catalogs
.schema_exist(catalog_name, schema_name)
.await?,
SchemaExistsSnafu {
schema: schema_name,
}
);
self.system
.register_schema(request.catalog, schema_name.clone())
.register_schema(request.catalog.clone(), schema_name.clone())
.await?;
catalog
.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))
.await?;
Ok(true)
self.catalogs.register_schema_sync(request)
}
}
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
self.check_state().await?;
let catalog_name = request.create_table_request.catalog_name.clone();
let schema_name = request.create_table_request.schema_name.clone();
ensure!(
!*self.init_lock.lock().await,
IllegalManagerStateSnafu {
msg: "Catalog manager already started",
}
);
let mut sys_table_requests = self.system_table_requests.lock().await;
sys_table_requests.push(request);
increment_gauge!(
@@ -529,15 +522,8 @@ impl CatalogManager for LocalCatalogManager {
Ok(())
}
async fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
self.catalogs
.catalog(catalog)
.await?
.context(CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.schema(schema)
.await
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
self.catalogs.schema_exist(catalog, schema).await
}
async fn table(
@@ -546,39 +532,44 @@ impl CatalogManager for LocalCatalogManager {
schema_name: &str,
table_name: &str,
) -> Result<Option<TableRef>> {
let catalog = self
.catalogs
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
let schema = catalog
.schema(schema_name)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?;
schema.table(table_name).await
if schema_name == INFORMATION_SCHEMA_NAME {
let manager: CatalogManagerRef = self.catalogs.clone() as _;
let provider =
InformationSchemaProvider::new(catalog_name.to_string(), Arc::downgrade(&manager));
return provider.table(table_name);
}
self.catalogs
.table(catalog_name, schema_name, table_name)
.await
}
async fn catalog(&self, catalog: &str) -> Result<Option<CatalogProviderRef>> {
async fn catalog_exist(&self, catalog: &str) -> Result<bool> {
if catalog.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) {
Ok(Some(self.system.clone()))
Ok(true)
} else {
self.catalogs.catalog(catalog).await
self.catalogs.catalog_exist(catalog).await
}
}
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
self.catalogs.table_exist(catalog, schema, table).await
}
async fn catalog_names(&self) -> Result<Vec<String>> {
self.catalogs.catalog_names().await
}
async fn register_catalog(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>> {
self.catalogs.register_catalog(name, catalog).await
async fn schema_names(&self, catalog_name: &str) -> Result<Vec<String>> {
self.catalogs.schema_names(catalog_name).await
}
async fn table_names(&self, catalog_name: &str, schema_name: &str) -> Result<Vec<String>> {
self.catalogs.table_names(catalog_name, schema_name).await
}
async fn register_catalog(&self, name: String) -> Result<bool> {
self.catalogs.register_catalog(name).await
}
fn as_any(&self) -> &dyn Any {

View File

@@ -18,29 +18,27 @@ use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_telemetry::error;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use metrics::{decrement_gauge, increment_gauge};
use snafu::{ensure, OptionExt};
use snafu::OptionExt;
use table::metadata::TableId;
use table::table::TableIdProvider;
use table::TableRef;
use crate::error::{
self, CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
};
use crate::schema::SchemaProvider;
use crate::{
CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProviderRef,
CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, RenameTableRequest,
};
type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
/// Simple in-memory list of catalogs
pub struct MemoryCatalogManager {
/// Collection of catalogs containing schemas and ultimately Tables
pub catalogs: RwLock<HashMap<String, CatalogProviderRef>>,
pub catalogs: RwLock<HashMap<String, SchemaEntries>>,
pub table_id: AtomicU32,
}
@@ -50,13 +48,15 @@ impl Default for MemoryCatalogManager {
table_id: AtomicU32::new(MIN_USER_TABLE_ID),
catalogs: Default::default(),
};
let default_catalog = Arc::new(MemoryCatalogProvider::new());
let mut catalog = HashMap::with_capacity(1);
catalog.insert(DEFAULT_SCHEMA_NAME.to_string(), HashMap::new());
manager
.register_catalog_sync("greptime".to_string(), default_catalog.clone())
.unwrap();
default_catalog
.register_schema_sync("public".to_string(), Arc::new(MemorySchemaProvider::new()))
.unwrap();
.catalogs
.write()
.unwrap()
.insert(DEFAULT_CATALOG_NAME.to_string(), catalog);
manager
}
}
@@ -76,80 +76,75 @@ impl CatalogManager for MemoryCatalogManager {
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let schema = self
.catalog(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.schema(&request.schema)
.await?
.context(SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
let catalog = request.catalog.clone();
let schema = request.schema.clone();
let result = self.register_table_sync(request);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
&[crate::metrics::db_label(&catalog, &schema)],
);
schema
.register_table(request.table_name, request.table)
.await
.map(|v| v.is_none())
result
}
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
let catalog = self
.catalog(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?;
let schema =
catalog
.schema(&request.schema)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
Ok(schema
.rename_table(&request.table_name, request.new_table_name)
.await
.is_ok())
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
let schema = self
.catalog(&request.catalog)
.context(CatalogNotFoundSnafu {
let mut catalogs = self.catalogs.write().unwrap();
let schema = catalogs
.get_mut(&request.catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.schema(&request.schema)
.await?
.get_mut(&request.schema)
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
decrement_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
schema
.deregister_table(&request.table_name)
.await
.map(|v| v.is_some())
// check old and new table names
if !schema.contains_key(&request.table_name) {
return TableNotFoundSnafu {
table_info: request.table_name.to_string(),
}
.fail()?;
}
if schema.contains_key(&request.new_table_name) {
return TableExistsSnafu {
table: &request.new_table_name,
}
.fail();
}
let table = schema.remove(&request.table_name).unwrap();
schema.insert(request.new_table_name, table);
Ok(true)
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> {
let mut catalogs = self.catalogs.write().unwrap();
let schema = catalogs
.get_mut(&request.catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.get_mut(&request.schema)
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
let result = schema.remove(&request.table_name);
if result.is_some() {
decrement_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
}
Ok(())
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
let catalog = self
.catalog(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?;
catalog
.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))
.await?;
self.register_schema_sync(request)?;
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
Ok(true)
}
@@ -159,12 +154,16 @@ impl CatalogManager for MemoryCatalogManager {
Ok(())
}
async fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
if let Some(c) = self.catalog(catalog) {
c.schema(schema).await
} else {
Ok(None)
}
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
Ok(self
.catalogs
.read()
.unwrap()
.get(catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.contains_key(schema))
}
async fn table(
@@ -173,27 +172,71 @@ impl CatalogManager for MemoryCatalogManager {
schema: &str,
table_name: &str,
) -> Result<Option<TableRef>> {
let Some(catalog) = self
.catalog(catalog) else { return Ok(None)};
let Some(s) = catalog.schema(schema).await? else { return Ok(None) };
s.table(table_name).await
let result = try {
self.catalogs
.read()
.unwrap()
.get(catalog)?
.get(schema)?
.get(table_name)
.cloned()?
};
Ok(result)
}
async fn catalog(&self, catalog: &str) -> Result<Option<CatalogProviderRef>> {
Ok(self.catalogs.read().unwrap().get(catalog).cloned())
async fn catalog_exist(&self, catalog: &str) -> Result<bool> {
Ok(self.catalogs.read().unwrap().get(catalog).is_some())
}
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
let catalogs = self.catalogs.read().unwrap();
Ok(catalogs
.get(catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.get(schema)
.with_context(|| SchemaNotFoundSnafu { catalog, schema })?
.contains_key(table))
}
async fn catalog_names(&self) -> Result<Vec<String>> {
Ok(self.catalogs.read().unwrap().keys().cloned().collect())
}
async fn register_catalog(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>> {
async fn schema_names(&self, catalog_name: &str) -> Result<Vec<String>> {
Ok(self
.catalogs
.read()
.unwrap()
.get(catalog_name)
.with_context(|| CatalogNotFoundSnafu { catalog_name })?
.keys()
.cloned()
.collect())
}
async fn table_names(&self, catalog_name: &str, schema_name: &str) -> Result<Vec<String>> {
Ok(self
.catalogs
.read()
.unwrap()
.get(catalog_name)
.with_context(|| CatalogNotFoundSnafu { catalog_name })?
.get(schema_name)
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?
.keys()
.cloned()
.collect())
}
async fn register_catalog(&self, name: String) -> Result<bool> {
self.register_catalog_sync(name)?;
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
self.register_catalog_sync(name, catalog)
Ok(true)
}
fn as_any(&self) -> &dyn Any {
@@ -202,206 +245,78 @@ impl CatalogManager for MemoryCatalogManager {
}
impl MemoryCatalogManager {
/// Registers a catalog and return `None` if no catalog with the same name was already
/// registered, or `Some` with the previously registered catalog.
pub fn register_catalog_if_absent(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Option<CatalogProviderRef> {
/// Registers a catalog and return the catalog already exist
pub fn register_catalog_if_absent(&self, name: String) -> bool {
let mut catalogs = self.catalogs.write().unwrap();
let entry = catalogs.entry(name);
match entry {
Entry::Occupied(v) => Some(v.get().clone()),
Entry::Occupied(_) => true,
Entry::Vacant(v) => {
v.insert(catalog);
None
v.insert(HashMap::new());
false
}
}
}
pub fn register_catalog_sync(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>> {
pub fn register_catalog_sync(&self, name: String) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
Ok(catalogs.insert(name, catalog))
Ok(catalogs.insert(name, HashMap::new()).is_some())
}
fn catalog(&self, catalog_name: &str) -> Option<CatalogProviderRef> {
self.catalogs.read().unwrap().get(catalog_name).cloned()
}
}
impl Default for MemoryCatalogProvider {
fn default() -> Self {
Self::new()
}
}
/// Simple in-memory implementation of a catalog.
pub struct MemoryCatalogProvider {
schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
}
impl MemoryCatalogProvider {
/// Instantiates a new MemoryCatalogProvider with an empty collection of schemas.
pub fn new() -> Self {
Self {
schemas: RwLock::new(HashMap::new()),
pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get_mut(&request.catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?;
if catalog.contains_key(&request.schema) {
return Ok(false);
}
catalog.insert(request.schema, HashMap::new());
Ok(true)
}
pub fn schema_names_sync(&self) -> Result<Vec<String>> {
let schemas = self.schemas.read().unwrap();
Ok(schemas.keys().cloned().collect())
}
pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let schema = catalogs
.get_mut(&request.catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.get_mut(&request.schema)
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
pub fn register_schema_sync(
&self,
name: String,
schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>> {
let mut schemas = self.schemas.write().unwrap();
ensure!(
!schemas.contains_key(&name),
error::SchemaExistsSnafu { schema: &name }
);
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
Ok(schemas.insert(name, schema))
}
pub fn schema_sync(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
let schemas = self.schemas.read().unwrap();
Ok(schemas.get(name).cloned())
}
}
#[async_trait::async_trait]
impl CatalogProvider for MemoryCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
async fn schema_names(&self) -> Result<Vec<String>> {
self.schema_names_sync()
}
async fn register_schema(
&self,
name: String,
schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>> {
self.register_schema_sync(name, schema)
}
async fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
self.schema_sync(name)
}
}
/// Simple in-memory implementation of a schema.
pub struct MemorySchemaProvider {
tables: RwLock<HashMap<String, TableRef>>,
}
impl MemorySchemaProvider {
/// Instantiates a new MemorySchemaProvider with an empty collection of tables.
pub fn new() -> Self {
Self {
tables: RwLock::new(HashMap::new()),
}
}
pub fn register_table_sync(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
let mut tables = self.tables.write().unwrap();
if let Some(existing) = tables.get(name.as_str()) {
// if table with the same name but different table id exists, then it's a fatal bug
if existing.table_info().ident.table_id != table.table_info().ident.table_id {
error!(
"Unexpected table register: {:?}, existing: {:?}",
table.table_info(),
existing.table_info()
);
return TableExistsSnafu { table: name }.fail()?;
if schema.contains_key(&request.table_name) {
return TableExistsSnafu {
table: &request.table_name,
}
Ok(Some(existing.clone()))
} else {
Ok(tables.insert(name, table))
.fail();
}
Ok(schema.insert(request.table_name, request.table).is_none())
}
pub fn rename_table_sync(&self, name: &str, new_name: String) -> Result<TableRef> {
let mut tables = self.tables.write().unwrap();
let Some(table) = tables.remove(name) else {
return TableNotFoundSnafu {
table_info: name.to_string(),
}
.fail()?;
#[cfg(any(test, feature = "testing"))]
pub fn new_with_table(table: TableRef) -> Self {
let manager = Self::default();
let request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table.table_info().name.clone(),
table_id: table.table_info().ident.table_id,
table,
};
let e = match tables.entry(new_name) {
Entry::Vacant(e) => e,
Entry::Occupied(e) => {
return TableExistsSnafu { table: e.key() }.fail();
}
};
e.insert(table.clone());
Ok(table)
}
pub fn table_exist_sync(&self, name: &str) -> Result<bool> {
let tables = self.tables.read().unwrap();
Ok(tables.contains_key(name))
}
pub fn deregister_table_sync(&self, name: &str) -> Result<Option<TableRef>> {
let mut tables = self.tables.write().unwrap();
Ok(tables.remove(name))
}
}
impl Default for MemorySchemaProvider {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl SchemaProvider for MemorySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
async fn table_names(&self) -> Result<Vec<String>> {
let tables = self.tables.read().unwrap();
Ok(tables.keys().cloned().collect())
}
async fn table(&self, name: &str) -> Result<Option<TableRef>> {
let tables = self.tables.read().unwrap();
Ok(tables.get(name).cloned())
}
async fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
self.register_table_sync(name, table)
}
async fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
self.rename_table_sync(name, new_name)
}
async fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
self.deregister_table_sync(name)
}
async fn table_exist(&self, name: &str) -> Result<bool> {
self.table_exist_sync(name)
manager.register_table_sync(request).unwrap();
manager
}
}
/// Create a memory catalog list contains a numbers table for test
pub fn new_memory_catalog_list() -> Result<Arc<MemoryCatalogManager>> {
pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
Ok(Arc::new(MemoryCatalogManager::default()))
}
@@ -410,88 +325,99 @@ mod tests {
use common_catalog::consts::*;
use common_error::ext::ErrorExt;
use common_error::prelude::StatusCode;
use table::table::numbers::NumbersTable;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
#[tokio::test]
async fn test_new_memory_catalog_list() {
let catalog_list = new_memory_catalog_list().unwrap();
let default_catalog = CatalogManager::catalog(&*catalog_list, DEFAULT_CATALOG_NAME)
.await
.unwrap()
.unwrap();
let catalog_list = new_memory_catalog_manager().unwrap();
let default_schema = default_catalog
.schema(DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
let register_request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: NUMBERS_TABLE_NAME.to_string(),
table_id: NUMBERS_TABLE_ID,
table: Arc::new(NumbersTable::default()),
};
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
catalog_list.register_table(register_request).await.unwrap();
let table = catalog_list
.table(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
NUMBERS_TABLE_NAME,
)
.await
.unwrap();
let table = default_schema.table("numbers").await.unwrap();
assert!(table.is_some());
assert!(default_schema.table("not_exists").await.unwrap().is_none());
}
#[tokio::test]
async fn test_mem_provider() {
let provider = MemorySchemaProvider::new();
let table_name = "numbers";
assert!(!provider.table_exist_sync(table_name).unwrap());
provider.deregister_table_sync(table_name).unwrap();
let test_table = NumbersTable::default();
// register table successfully
assert!(provider
.register_table_sync(table_name.to_string(), Arc::new(test_table))
assert!(catalog_list
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "not_exists")
.await
.unwrap()
.is_none());
assert!(provider.table_exist_sync(table_name).unwrap());
let other_table = NumbersTable::new(12);
let result = provider.register_table_sync(table_name.to_string(), Arc::new(other_table));
let err = result.err().unwrap();
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}
#[tokio::test]
async fn test_mem_provider_rename_table() {
let provider = MemorySchemaProvider::new();
let table_name = "num";
assert!(!provider.table_exist_sync(table_name).unwrap());
let test_table: TableRef = Arc::new(NumbersTable::default());
async fn test_mem_manager_rename_table() {
let catalog = MemoryCatalogManager::default();
let table_name = "test_table";
assert!(!catalog
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap());
// register test table
assert!(provider
.register_table_sync(table_name.to_string(), test_table.clone())
.unwrap()
.is_none());
assert!(provider.table_exist_sync(table_name).unwrap());
let table_id = 2333;
let register_request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
table: Arc::new(NumbersTable::new(table_id)),
};
assert!(catalog.register_table(register_request).await.unwrap());
assert!(catalog
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap());
// rename test table
let new_table_name = "numbers";
provider
.rename_table_sync(table_name, new_table_name.to_string())
.unwrap();
let new_table_name = "test_table_renamed";
let rename_request = RenameTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
new_table_name: new_table_name.to_string(),
table_id,
};
catalog.rename_table(rename_request).await.unwrap();
// test old table name not exist
assert!(!provider.table_exist_sync(table_name).unwrap());
provider.deregister_table_sync(table_name).unwrap();
assert!(!catalog
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap());
// test new table name exists
assert!(provider.table_exist_sync(new_table_name).unwrap());
let registered_table = provider.table(new_table_name).await.unwrap().unwrap();
assert_eq!(
registered_table.table_info().ident.table_id,
test_table.table_info().ident.table_id
);
assert!(catalog
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.await
.unwrap());
let registered_table = catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.await
.unwrap()
.unwrap();
assert_eq!(registered_table.table_info().ident.table_id, table_id);
let other_table = Arc::new(NumbersTable::new(2));
let result = provider
.register_table(new_table_name.to_string(), other_table)
.await;
let dup_register_request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: new_table_name.to_string(),
table_id: table_id + 1,
table: Arc::new(NumbersTable::new(table_id + 1)),
};
let result = catalog.register_table(dup_register_request).await;
let err = result.err().unwrap();
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}
@@ -499,16 +425,11 @@ mod tests {
#[tokio::test]
async fn test_catalog_rename_table() {
let catalog = MemoryCatalogManager::default();
let schema = catalog
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
// register table
let table_name = "num";
let table_id = 2333;
let table: TableRef = Arc::new(NumbersTable::new(table_id));
// register table
let register_table_req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
@@ -517,7 +438,11 @@ mod tests {
table,
};
assert!(catalog.register_table(register_table_req).await.unwrap());
assert!(schema.table_exist(table_name).await.unwrap());
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap()
.is_some());
// rename table
let new_table_name = "numbers_new";
@@ -529,8 +454,16 @@ mod tests {
table_id,
};
assert!(catalog.rename_table(rename_table_req).await.unwrap());
assert!(!schema.table_exist(table_name).await.unwrap());
assert!(schema.table_exist(new_table_name).await.unwrap());
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap()
.is_none());
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.await
.unwrap()
.is_some());
let registered_table = catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
@@ -543,50 +476,42 @@ mod tests {
#[test]
pub fn test_register_if_absent() {
let list = MemoryCatalogManager::default();
assert!(list
.register_catalog_if_absent(
"test_catalog".to_string(),
Arc::new(MemoryCatalogProvider::new())
)
.is_none());
list.register_catalog_if_absent(
"test_catalog".to_string(),
Arc::new(MemoryCatalogProvider::new()),
)
.unwrap();
list.as_any()
.downcast_ref::<MemoryCatalogManager>()
.unwrap();
assert!(!list.register_catalog_if_absent("test_catalog".to_string(),));
assert!(list.register_catalog_if_absent("test_catalog".to_string()));
}
#[tokio::test]
pub async fn test_catalog_deregister_table() {
let catalog = MemoryCatalogManager::default();
let schema = catalog
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
let table_name = "foo_table";
let register_table_req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "numbers".to_string(),
table_name: table_name.to_string(),
table_id: 2333,
table: Arc::new(NumbersTable::default()),
};
catalog.register_table(register_table_req).await.unwrap();
assert!(schema.table_exist("numbers").await.unwrap());
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap()
.is_some());
let deregister_table_req = DeregisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "numbers".to_string(),
table_name: table_name.to_string(),
};
catalog
.deregister_table(deregister_table_req)
.await
.unwrap();
assert!(!schema.table_exist("numbers").await.unwrap());
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap()
.is_none());
}
}

View File

@@ -20,7 +20,7 @@ use std::sync::Arc;
pub use client::{CachedMetaKvBackend, MetaKvBackend};
use futures::Stream;
use futures_util::StreamExt;
pub use manager::{RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider};
pub use manager::RemoteCatalogManager;
use crate::error::Error;

File diff suppressed because it is too large Load Diff

View File

@@ -1,69 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use table::TableRef;
use crate::error::{NotSupportedSnafu, Result};
/// Represents a schema, comprising a number of named tables.
#[async_trait]
pub trait SchemaProvider: Sync + Send {
/// Returns the schema provider as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Retrieves the list of available table names in this schema.
async fn table_names(&self) -> Result<Vec<String>>;
/// Retrieves a specific table from the schema by name, provided it exists.
async fn table(&self, name: &str) -> Result<Option<TableRef>>;
/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
async fn register_table(&self, name: String, _table: TableRef) -> Result<Option<TableRef>> {
NotSupportedSnafu {
op: format!("register_table({name}, <table>)"),
}
.fail()
}
/// If supported by the implementation, renames an existing table from this schema and returns it.
/// If no table of that name exists, returns "Table not found" error.
async fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
NotSupportedSnafu {
op: format!("rename_table({name}, {new_name})"),
}
.fail()
}
/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
async fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
NotSupportedSnafu {
op: format!("deregister_table({name})"),
}
.fail()
}
/// If supported by the implementation, checks the table exist in the schema provider or not.
/// If no matched table in the schema provider, return false.
/// Otherwise, return true.
async fn table_exist(&self, name: &str) -> Result<bool>;
}
pub type SchemaProviderRef = Arc<dyn SchemaProvider>;

View File

@@ -24,10 +24,7 @@ use session::context::QueryContext;
use snafu::{ensure, OptionExt};
use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
CatalogNotFoundSnafu, QueryAccessDeniedSnafu, Result, SchemaNotFoundSnafu, TableNotExistSnafu,
};
use crate::information_schema::InformationSchemaProvider;
use crate::error::{QueryAccessDeniedSnafu, Result, TableNotExistSnafu};
use crate::CatalogManagerRef;
pub struct DfTableSourceProvider {
@@ -104,41 +101,18 @@ impl DfTableSourceProvider {
let schema_name = table_ref.schema.as_ref();
let table_name = table_ref.table.as_ref();
let schema = if schema_name != INFORMATION_SCHEMA_NAME {
let catalog = self
.catalog_manager
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
catalog
.schema(schema_name)
.await?
.context(SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?
} else {
let catalog_provider = self
.catalog_manager
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
Arc::new(InformationSchemaProvider::new(
catalog_name.to_string(),
catalog_provider,
))
};
let table = schema
.table(table_name)
let table = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await?
.with_context(|| TableNotExistSnafu {
table: format_full_table_name(catalog_name, schema_name, table_name),
})?;
let table = DfTableProviderAdapter::new(table);
let table = provider_as_source(Arc::new(table));
self.resolved_tables.insert(resolved_name, table.clone());
Ok(table)
let provider = DfTableProviderAdapter::new(table);
let source = provider_as_source(Arc::new(provider));
self.resolved_tables.insert(resolved_name, source.clone());
Ok(source)
}
}

View File

@@ -14,50 +14,24 @@
// The `tables` table in system catalog keeps a record of all tables created by user.
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME};
use common_telemetry::logging;
use snafu::ResultExt;
use table::metadata::TableId;
use table::{Table, TableRef};
use table::Table;
use crate::error::{self, Error, InsertCatalogRecordSnafu, Result as CatalogResult};
use crate::error::{self, InsertCatalogRecordSnafu, Result as CatalogResult};
use crate::system::{
build_schema_insert_request, build_table_deletion_request, build_table_insert_request,
SystemCatalogTable,
};
use crate::{CatalogProvider, DeregisterTableRequest, SchemaProvider, SchemaProviderRef};
use crate::DeregisterTableRequest;
pub struct InformationSchema {
pub system: Arc<SystemCatalogTable>,
}
#[async_trait]
impl SchemaProvider for InformationSchema {
fn as_any(&self) -> &dyn Any {
self
}
async fn table_names(&self) -> Result<Vec<String>, Error> {
Ok(vec![SYSTEM_CATALOG_TABLE_NAME.to_string()])
}
async fn table(&self, name: &str) -> Result<Option<TableRef>, Error> {
if name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) {
Ok(Some(self.system.clone()))
} else {
Ok(None)
}
}
async fn table_exist(&self, name: &str) -> Result<bool, Error> {
Ok(name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME))
}
}
pub struct SystemCatalog {
pub information_schema: Arc<InformationSchema>,
}
@@ -125,30 +99,3 @@ impl SystemCatalog {
.context(InsertCatalogRecordSnafu)
}
}
#[async_trait::async_trait]
impl CatalogProvider for SystemCatalog {
fn as_any(&self) -> &dyn Any {
self
}
async fn schema_names(&self) -> Result<Vec<String>, Error> {
Ok(vec![INFORMATION_SCHEMA_NAME.to_string()])
}
async fn register_schema(
&self,
_name: String,
_schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>, Error> {
panic!("System catalog does not support registering schema!")
}
async fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>, Error> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA_NAME) {
Ok(Some(self.information_schema.clone()))
} else {
Ok(None)
}
}
}

View File

@@ -24,11 +24,8 @@ mod tests {
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::mock::{MockKvBackend, MockTableEngine};
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::remote::{
CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider,
RemoteSchemaProvider,
};
use catalog::{CatalogManager, RegisterTableRequest};
use catalog::remote::{CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager};
use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::ident::TableIdent;
use datatypes::schema::RawSchema;
@@ -40,6 +37,7 @@ mod tests {
use tokio::time::Instant;
struct TestingComponents {
#[allow(dead_code)]
kv_backend: KvBackendRef,
catalog_manager: Arc<RemoteCatalogManager>,
table_engine_manager: TableEngineManagerRef,
@@ -179,14 +177,12 @@ mod tests {
catalog_manager.catalog_names().await.unwrap()
);
let default_catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.unwrap()
.unwrap();
assert_eq!(
vec![DEFAULT_SCHEMA_NAME.to_string()],
default_catalog.schema_names().await.unwrap()
catalog_manager
.schema_names(DEFAULT_CATALOG_NAME)
.await
.unwrap()
);
}
@@ -242,23 +238,15 @@ mod tests {
async fn test_register_table() {
let node_id = 42;
let components = prepare_components(node_id).await;
let catalog_manager = &components.catalog_manager;
let default_catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.unwrap()
.unwrap();
assert_eq!(
vec![DEFAULT_SCHEMA_NAME.to_string()],
default_catalog.schema_names().await.unwrap()
components
.catalog_manager
.schema_names(DEFAULT_CATALOG_NAME)
.await
.unwrap()
);
let default_schema = default_catalog
.schema(DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
// register a new table with an nonexistent catalog
let catalog_name = DEFAULT_CATALOG_NAME.to_string();
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
@@ -293,10 +281,18 @@ mod tests {
table_id,
table,
};
assert!(catalog_manager.register_table(reg_req).await.unwrap());
assert!(components
.catalog_manager
.register_table(reg_req)
.await
.unwrap());
assert_eq!(
vec![table_name],
default_schema.table_names().await.unwrap()
components
.catalog_manager
.table_names(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await
.unwrap()
);
}
@@ -304,29 +300,28 @@ mod tests {
async fn test_register_catalog_schema_table() {
let node_id = 42;
let components = prepare_components(node_id).await;
let backend = &components.kv_backend;
let catalog_manager = components.catalog_manager.clone();
let engine_manager = components.table_engine_manager.clone();
let catalog_name = "test_catalog".to_string();
let schema_name = "nonexistent_schema".to_string();
let catalog = Arc::new(RemoteCatalogProvider::new(
catalog_name.clone(),
backend.clone(),
engine_manager.clone(),
node_id,
components.region_alive_keepers.clone(),
));
// register catalog to catalog manager
CatalogManager::register_catalog(&*catalog_manager, catalog_name.clone(), catalog)
components
.catalog_manager
.register_catalog(catalog_name.clone())
.await
.unwrap();
assert_eq!(
HashSet::<String>::from_iter(
vec![DEFAULT_CATALOG_NAME.to_string(), catalog_name.clone()].into_iter()
),
HashSet::from_iter(catalog_manager.catalog_names().await.unwrap().into_iter())
HashSet::from_iter(
components
.catalog_manager
.catalog_names()
.await
.unwrap()
.into_iter()
)
);
let table_to_register = components
@@ -359,38 +354,34 @@ mod tests {
};
// this register will fail since schema does not exist yet
assert_matches!(
catalog_manager
components
.catalog_manager
.register_table(reg_req.clone())
.await
.unwrap_err(),
catalog::error::Error::SchemaNotFound { .. }
);
let new_catalog = catalog_manager
.catalog(&catalog_name)
let register_schema_request = RegisterSchemaRequest {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
};
assert!(components
.catalog_manager
.register_schema(register_schema_request)
.await
.unwrap()
.expect("catalog should exist since it's already registered");
let schema = Arc::new(RemoteSchemaProvider::new(
catalog_name.clone(),
schema_name.clone(),
node_id,
engine_manager,
backend.clone(),
components.region_alive_keepers.clone(),
));
let prev = new_catalog
.register_schema(schema_name.clone(), schema.clone())
.expect("Register schema should not fail"));
assert!(components
.catalog_manager
.register_table(reg_req)
.await
.expect("Register schema should not fail");
assert!(prev.is_none());
assert!(catalog_manager.register_table(reg_req).await.unwrap());
.unwrap());
assert_eq!(
HashSet::from([schema_name.clone()]),
new_catalog
.schema_names()
components
.catalog_manager
.schema_names(&catalog_name)
.await
.unwrap()
.into_iter()

View File

@@ -27,6 +27,8 @@ pub const MAX_SYS_TABLE_ID: u32 = MIN_USER_TABLE_ID - 1;
pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0;
/// scripts table id
pub const SCRIPTS_TABLE_ID: u32 = 1;
/// numbers table id
pub const NUMBERS_TABLE_ID: u32 = 2;
pub const MITO_ENGINE: &str = "mito";
pub const IMMUTABLE_FILE_ENGINE: &str = "file";

View File

@@ -189,15 +189,13 @@ impl CloseRegionHandler {
})? {
CloseTableResult::NotFound | CloseTableResult::Released(_) => {
// Deregister table if The table released.
let deregistered = self.deregister_table(table_ref).await?;
self.deregister_table(table_ref).await?;
if deregistered {
self.region_alive_keepers
.deregister_table(table_ident)
.await;
}
self.region_alive_keepers
.deregister_table(table_ident)
.await;
Ok(deregistered)
Ok(true)
}
CloseTableResult::PartialClosed(regions) => {
// Requires caller to update the region_numbers
@@ -220,7 +218,7 @@ impl CloseRegionHandler {
Ok(true)
}
async fn deregister_table(&self, table_ref: &TableReference<'_>) -> Result<bool> {
async fn deregister_table(&self, table_ref: &TableReference<'_>) -> Result<()> {
self.catalog_manager
.deregister_table(DeregisterTableRequest {
catalog: table_ref.catalog.to_string(),

View File

@@ -238,6 +238,7 @@ impl Instance {
}
};
catalog_manager.start().await.context(CatalogSnafu)?;
let factory = QueryEngineFactory::new(catalog_manager.clone(), false);
let query_engine = factory.query_engine();
@@ -315,15 +316,10 @@ impl Instance {
}
pub async fn flush_tables(&self) -> Result<()> {
info!("going to flush all schemas");
info!("going to flush all schemas under {DEFAULT_CATALOG_NAME}");
let schema_list = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?
.expect("Default schema not found")
.schema_names()
.schema_names(DEFAULT_CATALOG_NAME)
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;

View File

@@ -42,9 +42,9 @@ use table::requests::CreateDatabaseRequest;
use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
self, CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu,
DeleteSnafu, ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu,
JoinTaskSnafu, PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
self, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu, DeleteSnafu,
ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, JoinTaskSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
};
use crate::instance::Instance;
@@ -231,19 +231,10 @@ impl DummySchemaProvider {
schema_name: String,
catalog_manager: CatalogManagerRef,
) -> Result<Self> {
let catalog = catalog_manager
.catalog(&catalog_name)
let table_names = catalog_manager
.table_names(&catalog_name, &schema_name)
.await
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: &catalog_name,
})?;
let schema = catalog
.schema(&schema_name)
.await
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu { name: &schema_name })?;
let table_names = schema.table_names().await.context(CatalogSnafu)?;
.unwrap();
Ok(Self {
catalog: catalog_name,
schema: schema_name,

View File

@@ -48,10 +48,9 @@ impl SqlHandler {
let schema = req.db_name;
if self
.catalog_manager
.schema(&catalog, &schema)
.schema_exist(&catalog, &schema)
.await
.context(CatalogSnafu)?
.is_some()
{
return if req.create_if_not_exists {
Ok(Output::AffectedRows(1))

View File

@@ -12,33 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::SchemaProviderRef;
use catalog::CatalogManagerRef;
use common_query::Output;
use snafu::{OptionExt, ResultExt};
use table::requests::FlushTableRequest;
use crate::error::{self, CatalogSnafu, DatabaseNotFoundSnafu, Result};
use crate::error::{self, CatalogSnafu, Result};
use crate::sql::SqlHandler;
impl SqlHandler {
pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result<Output> {
let schema = self
.catalog_manager
.schema(&req.catalog_name, &req.schema_name)
.await
.context(CatalogSnafu)?
.context(DatabaseNotFoundSnafu {
catalog: &req.catalog_name,
schema: &req.schema_name,
})?;
if let Some(table) = &req.table_name {
self.flush_table_inner(schema, table, req.region_number, req.wait)
.await?;
self.flush_table_inner(
&self.catalog_manager,
&req.catalog_name,
&req.schema_name,
table,
req.region_number,
req.wait,
)
.await?;
} else {
let all_table_names = schema.table_names().await.context(CatalogSnafu)?;
let all_table_names = self
.catalog_manager
.table_names(&req.catalog_name, &req.schema_name)
.await
.context(CatalogSnafu)?;
futures::future::join_all(all_table_names.iter().map(|table| {
self.flush_table_inner(schema.clone(), table, req.region_number, req.wait)
self.flush_table_inner(
&self.catalog_manager,
&req.catalog_name,
&req.schema_name,
table,
req.region_number,
req.wait,
)
}))
.await
.into_iter()
@@ -49,13 +57,15 @@ impl SqlHandler {
async fn flush_table_inner(
&self,
schema: SchemaProviderRef,
catalog_manager: &CatalogManagerRef,
catalog_name: &str,
schema_name: &str,
table_name: &str,
region: Option<u32>,
wait: Option<bool>,
) -> Result<()> {
schema
.table(table_name)
catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(error::FindTableSnafu { table_name })?
.context(error::TableNotFoundSnafu { table_name })?

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::RegisterTableRequest;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE,
};
@@ -119,15 +120,13 @@ pub(crate) async fn create_test_table(
.await
.context(CreateTableSnafu { table_name })?;
let schema_provider = instance
.catalog_manager
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
schema_provider
.register_table(table_name.to_string(), table.clone())
.await
.unwrap();
let req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id: table.table_info().ident.table_id,
table: table.clone(),
};
instance.catalog_manager.register_table(req).await.unwrap();
Ok(table)
}

View File

@@ -17,7 +17,6 @@ use std::collections::HashSet;
use std::sync::Arc;
use api::v1::CreateTableExpr;
use async_trait::async_trait;
use catalog::error::{
self as catalog_err, InternalSnafu, InvalidCatalogValueSnafu, InvalidSystemTableDefSnafu,
Result as CatalogResult, UnimplementedSnafu,
@@ -26,14 +25,14 @@ use catalog::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
TableGlobalKey, TableGlobalValue,
};
use catalog::information_schema::InformationSchemaProvider;
use catalog::remote::{Kv, KvBackendRef, KvCacheInvalidatorRef};
use catalog::{
CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProvider, SchemaProviderRef,
CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, RenameTableRequest,
};
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_meta::table_name::TableName;
use common_telemetry::warn;
@@ -129,12 +128,8 @@ impl CatalogManager for FrontendCatalogManager {
Ok(())
}
async fn register_catalog(
&self,
_name: String,
_catalog: CatalogProviderRef,
) -> CatalogResult<Option<CatalogProviderRef>> {
unimplemented!("Frontend catalog list does not support register catalog")
async fn register_catalog(&self, _name: String) -> CatalogResult<bool> {
unimplemented!("FrontendCatalogManager does not support register catalog")
}
// TODO(LFC): Handle the table caching in (de)register_table.
@@ -142,20 +137,20 @@ impl CatalogManager for FrontendCatalogManager {
Ok(true)
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> CatalogResult<bool> {
async fn deregister_table(&self, request: DeregisterTableRequest) -> CatalogResult<()> {
let table_name = TableName::new(request.catalog, request.schema, request.table_name);
self.partition_manager
.table_routes()
.invalidate_table_route(&table_name)
.await;
Ok(true)
Ok(())
}
async fn register_schema(
&self,
_request: RegisterSchemaRequest,
) -> catalog::error::Result<bool> {
unimplemented!()
unimplemented!("FrontendCatalogManager does not support register schema")
}
async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result<bool> {
@@ -270,67 +265,9 @@ impl CatalogManager for FrontendCatalogManager {
Ok(res.into_iter().collect())
}
async fn catalog(&self, catalog: &str) -> CatalogResult<Option<CatalogProviderRef>> {
let key = CatalogKey {
catalog_name: catalog.to_string(),
}
.to_string();
Ok(self.backend.get(key.as_bytes()).await?.map(|_| {
Arc::new(FrontendCatalogProvider {
catalog_name: catalog.to_string(),
catalog_manager: Arc::new(self.clone()),
}) as Arc<_>
}))
}
async fn schema(
&self,
catalog: &str,
schema: &str,
) -> catalog::error::Result<Option<SchemaProviderRef>> {
self.catalog(catalog)
.await?
.context(catalog::error::CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.schema(schema)
.await
}
async fn table(
&self,
catalog: &str,
schema: &str,
table_name: &str,
) -> catalog::error::Result<Option<TableRef>> {
self.schema(catalog, schema)
.await?
.context(catalog::error::SchemaNotFoundSnafu { catalog, schema })?
.table(table_name)
.await
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub struct FrontendCatalogProvider {
catalog_name: String,
catalog_manager: Arc<FrontendCatalogManager>,
}
#[async_trait::async_trait]
impl CatalogProvider for FrontendCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
async fn schema_names(&self) -> catalog::error::Result<Vec<String>> {
let key = build_schema_prefix(&self.catalog_name);
let backend = self.catalog_manager.backend();
let mut iter = backend.range(key.as_bytes());
async fn schema_names(&self, catalog: &str) -> CatalogResult<Vec<String>> {
let key = build_schema_prefix(catalog);
let mut iter = self.backend.range(key.as_bytes());
let mut res = HashSet::new();
while let Some(r) = iter.next().await {
let Kv(k, _) = r?;
@@ -341,61 +278,13 @@ impl CatalogProvider for FrontendCatalogProvider {
Ok(res.into_iter().collect())
}
async fn register_schema(
&self,
_name: String,
_schema: SchemaProviderRef,
) -> catalog::error::Result<Option<SchemaProviderRef>> {
unimplemented!("Frontend catalog provider does not support register schema")
}
async fn schema(&self, name: &str) -> catalog::error::Result<Option<SchemaProviderRef>> {
let catalog = &self.catalog_name;
let schema_key = SchemaKey {
catalog_name: catalog.clone(),
schema_name: name.to_string(),
}
.to_string();
let val = self
.catalog_manager
.backend()
.get(schema_key.as_bytes())
.await?;
let provider = val.map(|_| {
Arc::new(FrontendSchemaProvider {
catalog_name: catalog.clone(),
schema_name: name.to_string(),
catalog_manager: self.catalog_manager.clone(),
}) as Arc<dyn SchemaProvider>
});
Ok(provider)
}
}
pub struct FrontendSchemaProvider {
catalog_name: String,
schema_name: String,
catalog_manager: Arc<FrontendCatalogManager>,
}
#[async_trait]
impl SchemaProvider for FrontendSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
async fn table_names(&self) -> catalog::error::Result<Vec<String>> {
async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult<Vec<String>> {
let mut tables = vec![];
if self.catalog_name == DEFAULT_CATALOG_NAME && self.schema_name == DEFAULT_SCHEMA_NAME {
if catalog == DEFAULT_CATALOG_NAME && schema == DEFAULT_SCHEMA_NAME {
tables.push("numbers".to_string());
}
let key = build_table_global_prefix(&self.catalog_name, &self.schema_name);
let backend = self.catalog_manager.backend();
let iter = backend.range(key.as_bytes());
let key = build_table_global_prefix(catalog, schema);
let iter = self.backend.range(key.as_bytes());
let result = iter
.map(|r| {
let Kv(k, _) = r?;
@@ -409,20 +298,71 @@ impl SchemaProvider for FrontendSchemaProvider {
Ok(tables)
}
async fn table(&self, name: &str) -> catalog::error::Result<Option<TableRef>> {
if self.catalog_name == DEFAULT_CATALOG_NAME
&& self.schema_name == DEFAULT_SCHEMA_NAME
&& name == "numbers"
async fn catalog_exist(&self, catalog: &str) -> CatalogResult<bool> {
let key = CatalogKey {
catalog_name: catalog.to_string(),
}
.to_string();
Ok(self.backend.get(key.as_bytes()).await?.is_some())
}
async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
let schema_key = SchemaKey {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
}
.to_string();
Ok(self.backend().get(schema_key.as_bytes()).await?.is_some())
}
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
let table_global_key = TableGlobalKey {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
};
Ok(self
.backend()
.get(table_global_key.to_string().as_bytes())
.await?
.is_some())
}
async fn table(
&self,
catalog: &str,
schema: &str,
table_name: &str,
) -> CatalogResult<Option<TableRef>> {
if catalog == DEFAULT_CATALOG_NAME
&& schema == DEFAULT_SCHEMA_NAME
&& table_name == "numbers"
{
return Ok(Some(Arc::new(NumbersTable::default())));
}
if schema == INFORMATION_SCHEMA_NAME {
// hack: use existing cyclin reference to get Arc<Self>.
// This can be remove by refactoring the struct into something like Arc<Inner>
let manager = if let Some(instance) = self.dist_instance.as_ref() {
instance.catalog_manager() as _
} else {
return Ok(None);
};
let provider =
InformationSchemaProvider::new(catalog.to_string(), Arc::downgrade(&manager));
return provider.table(table_name);
}
let table_global_key = TableGlobalKey {
catalog_name: self.catalog_name.clone(),
schema_name: self.schema_name.clone(),
table_name: name.to_string(),
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table_name.to_string(),
};
let Some(kv) = self.catalog_manager.backend().get(table_global_key.to_string().as_bytes()).await? else {
let Some(kv) = self.backend().get(table_global_key.to_string().as_bytes()).await? else {
return Ok(None);
};
let v = TableGlobalValue::from_bytes(kv.1).context(InvalidCatalogValueSnafu)?;
@@ -432,14 +372,14 @@ impl SchemaProvider for FrontendSchemaProvider {
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?,
);
let table = Arc::new(DistTable::new(
TableName::new(&self.catalog_name, &self.schema_name, name),
TableName::new(catalog, schema, table_name),
table_info,
self.catalog_manager.clone(),
Arc::new(self.clone()),
));
Ok(Some(table))
}
async fn table_exist(&self, name: &str) -> catalog::error::Result<bool> {
Ok(self.table_names().await?.contains(&name.to_string()))
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -557,9 +557,8 @@ impl SqlQueryHandler for Instance {
async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
self.catalog_manager
.schema(catalog, schema)
.schema_exist(catalog, schema)
.await
.map(|s| s.is_some())
.context(error::CatalogSnafu)
}
}

View File

@@ -261,15 +261,10 @@ impl DistInstance {
schema: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
};
ensure!(
self.catalog_manager
.deregister_table(request)
.await
.context(CatalogSnafu)?,
error::TableNotFoundSnafu {
table_name: table_name.to_string()
}
);
self.catalog_manager
.deregister_table(request)
.await
.context(CatalogSnafu)?;
let expr = DropTableExpr {
catalog_name: table_name.catalog_name.clone(),
@@ -467,10 +462,9 @@ impl DistInstance {
let catalog = query_ctx.current_catalog();
if self
.catalog_manager
.schema(&catalog, &expr.database_name)
.schema_exist(&catalog, &expr.database_name)
.await
.context(CatalogSnafu)?
.is_some()
{
return if expr.create_if_not_exists {
Ok(Output::AffectedRows(1))

View File

@@ -96,7 +96,7 @@ impl StatementExecutor {
Statement::Use(db) => self.handle_use(db, query_ctx).await,
Statement::ShowDatabases(stmt) => self.show_databases(stmt).await,
Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
@@ -147,10 +147,9 @@ impl StatementExecutor {
let catalog = &query_ctx.current_catalog();
ensure!(
self.catalog_manager
.schema(catalog, &db)
.schema_exist(catalog, &db)
.await
.context(CatalogSnafu)?
.is_some(),
.context(CatalogSnafu)?,
SchemaNotFoundSnafu { schema_info: &db }
);

View File

@@ -15,13 +15,11 @@
use common_datasource::file_format::Format;
use common_query::Output;
use common_telemetry::info;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use crate::error;
use crate::error::{
CatalogNotFoundSnafu, CatalogSnafu, InvalidCopyParameterSnafu, SchemaNotFoundSnafu,
};
use crate::error::{CatalogSnafu, InvalidCopyParameterSnafu};
use crate::statement::StatementExecutor;
pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
@@ -42,27 +40,16 @@ impl StatementExecutor {
"Copy database {}.{}, dir: {},. time: {:?}",
req.catalog_name, req.schema_name, req.location, req.time_range
);
let schema = self
let table_names = self
.catalog_manager
.catalog(&req.catalog_name)
.table_names(&req.catalog_name, &req.schema_name)
.await
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu {
catalog_name: &req.catalog_name,
})?
.schema(&req.schema_name)
.await
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: &req.schema_name,
})?;
.context(CatalogSnafu)?;
let suffix = Format::try_from(&req.with)
.context(error::ParseFileFormatSnafu)?
.suffix();
let table_names = schema.table_names().await.context(CatalogSnafu)?;
let mut exported_rows = 0;
for table_name in table_names {
// TODO(hl): remove this hardcode once we've removed numbers table.

View File

@@ -21,8 +21,12 @@ use crate::error::{ExecuteStatementSnafu, Result};
use crate::statement::StatementExecutor;
impl StatementExecutor {
pub(super) async fn show_databases(&self, stmt: ShowDatabases) -> Result<Output> {
query::sql::show_databases(stmt, self.catalog_manager.clone())
pub(super) async fn show_databases(
&self,
stmt: ShowDatabases,
query_ctx: QueryContextRef,
) -> Result<Output> {
query::sql::show_databases(stmt, self.catalog_manager.clone(), query_ctx)
.await
.context(ExecuteStatementSnafu)
}

View File

@@ -46,9 +46,8 @@ use table::TableRef;
use crate::dataframe::DataFrame;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::error::{
CatalogNotFoundSnafu, CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu,
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, SchemaNotFoundSnafu,
TableNotFoundSnafu, UnsupportedExprSnafu,
CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, MissingTimestampColumnSnafu,
QueryExecutionSnafu, Result, TableNotFoundSnafu, UnsupportedExprSnafu,
};
use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
@@ -181,28 +180,12 @@ impl DatafusionQueryEngine {
let schema_name = table_name.schema.as_ref();
let table_name = table_name.table.as_ref();
let catalog = self
.state
self.state
.catalog_manager()
.catalog(catalog_name)
.table(catalog_name, schema_name, table_name)
.await
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu {
catalog: catalog_name,
})?;
let schema = catalog
.schema(schema_name)
.await
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
schema: schema_name,
})?;
let table = schema
.table(table_name)
.await
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table: table_name })?;
Ok(table)
.with_context(|| TableNotFoundSnafu { table: table_name })
}
}
@@ -395,9 +378,8 @@ mod tests {
use std::borrow::Cow::Borrowed;
use std::sync::Arc;
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use catalog::{CatalogManager, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_query::Output;
use common_recordbatch::util;
use datafusion::prelude::{col, lit};
@@ -405,30 +387,24 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
use session::context::QueryContext;
use table::table::numbers::NumbersTable;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
use crate::parser::QueryLanguageParser;
use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
async fn create_test_engine() -> QueryEngineRef {
let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let catalog_manager = catalog::local::new_memory_catalog_manager().unwrap();
let req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: NUMBERS_TABLE_NAME.to_string(),
table_id: NUMBERS_TABLE_ID,
table: Arc::new(NumbersTable::default()),
};
catalog_manager.register_table(req).await.unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.await
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.await
.unwrap();
catalog_list
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
QueryEngineFactory::new(catalog_list, false).query_engine()
QueryEngineFactory::new(catalog_manager, false).query_engine()
}
#[tokio::test]

View File

@@ -138,7 +138,7 @@ mod tests {
#[test]
fn test_query_engine_factory() {
let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let catalog_list = catalog::local::new_memory_catalog_manager().unwrap();
let factory = QueryEngineFactory::new(catalog_list, false);
let engine = factory.query_engine();

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX,
SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX,
};
use common_datasource::file_format::{infer_schemas, FileFormat, Format};
use common_datasource::lister::{Lister, Source};
@@ -95,6 +95,7 @@ static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
pub async fn show_databases(
stmt: ShowDatabases,
catalog_manager: CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
ensure!(
matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)),
@@ -103,14 +104,11 @@ pub async fn show_databases(
}
);
let catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
let mut databases = catalog_manager
.schema_names(&query_ctx.current_catalog())
.await
.context(error::CatalogSnafu)?
.context(error::CatalogNotFoundSnafu {
catalog: DEFAULT_CATALOG_NAME,
})?;
let mut databases = catalog.schema_names().await.context(error::CatalogSnafu)?;
.context(error::CatalogSnafu)?;
// TODO(dennis): Specify the order of the results in catalog manager API
databases.sort();
@@ -148,12 +146,11 @@ pub async fn show_tables(
query_ctx.current_schema()
};
// TODO(sunng87): move this function into query_ctx
let schema = catalog_manager
.schema(&query_ctx.current_catalog(), &schema)
let mut tables = catalog_manager
.table_names(&query_ctx.current_catalog(), &schema)
.await
.context(error::CatalogSnafu)?
.context(error::SchemaNotFoundSnafu { schema })?;
let mut tables = schema.table_names().await.context(error::CatalogSnafu)?;
.context(error::CatalogSnafu)?;
// TODO(dennis): Specify the order of the results in schema provider API
tables.sort();

View File

@@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use catalog::local::MemoryCatalogManager;
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use session::context::QueryContext;
use table::test_util::MemTable;
use crate::parser::QueryLanguageParser;
use crate::QueryEngineRef;
use crate::{QueryEngineFactory, QueryEngineRef};
mod argmax_test;
mod argmin_test;
@@ -46,3 +50,10 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
.unwrap() else { unreachable!() };
util::collect(stream).await.unwrap()
}
pub fn new_query_engine_with_table(table: MemTable) -> QueryEngineRef {
let table = Arc::new(table);
let catalog_manager = Arc::new(MemoryCatalogManager::new_with_table(table));
QueryEngineFactory::new(catalog_manager, false).query_engine()
}

View File

@@ -14,8 +14,6 @@
use std::sync::Arc;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_recordbatch::RecordBatch;
use datatypes::for_all_primitive_types;
use datatypes::prelude::*;
@@ -25,14 +23,10 @@ use datatypes::vectors::Helper;
use rand::Rng;
use table::test_util::MemTable;
use crate::tests::exec_selection;
use crate::{QueryEngine, QueryEngineFactory};
pub fn create_query_engine() -> Arc<dyn QueryEngine> {
let schema_provider = Arc::new(MemorySchemaProvider::new());
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
let catalog_list = Arc::new(MemoryCatalogManager::default());
use crate::tests::{exec_selection, new_query_engine_with_table};
use crate::{QueryEngine, QueryEngineRef};
pub fn create_query_engine() -> QueryEngineRef {
let mut column_schemas = vec![];
let mut columns = vec![];
macro_rules! create_number_table {
@@ -54,19 +48,8 @@ pub fn create_query_engine() -> Arc<dyn QueryEngine> {
let schema = Arc::new(Schema::new(column_schemas.clone()));
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let number_table = Arc::new(MemTable::new("numbers", recordbatch));
schema_provider
.register_table_sync(number_table.table_name().to_string(), number_table)
.unwrap();
catalog_provider
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
QueryEngineFactory::new(catalog_list, false).query_engine()
let number_table = MemTable::new("numbers", recordbatch);
new_query_engine_with_table(number_table)
}
pub async fn get_numbers_from_table<'s, T>(

View File

@@ -16,8 +16,6 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_function::scalars::aggregate::AggregateFunctionMeta;
use common_function_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Result as QueryResult};
@@ -33,8 +31,7 @@ use num_traits::AsPrimitive;
use table::test_util::MemTable;
use crate::error::Result;
use crate::tests::exec_selection;
use crate::QueryEngineFactory;
use crate::tests::{exec_selection, new_query_engine_with_table};
#[derive(Debug, Default)]
struct MySumAccumulator<T, SumT> {
@@ -207,8 +204,7 @@ where
let recordbatch = RecordBatch::new(schema, vec![column]).unwrap();
let testing_table = MemTable::new(&table_name, recordbatch);
let factory = new_query_engine_factory(testing_table);
let engine = factory.query_engine();
let engine = new_query_engine_with_table(testing_table);
engine.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"my_sum",
@@ -224,24 +220,3 @@ where
assert_eq!(expected, pretty_print);
Ok(())
}
fn new_query_engine_factory(table: MemTable) -> QueryEngineFactory {
let table_name = table.table_name().to_string();
let table = Arc::new(table);
let schema_provider = Arc::new(MemorySchemaProvider::new());
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
let catalog_list = Arc::new(MemoryCatalogManager::default());
schema_provider
.register_table_sync(table_name, table)
.unwrap();
catalog_provider
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
QueryEngineFactory::new(catalog_list, false)
}

View File

@@ -14,8 +14,6 @@
use std::sync::Arc;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_recordbatch::RecordBatch;
use datatypes::for_all_primitive_types;
use datatypes::prelude::*;
@@ -25,9 +23,10 @@ use function::{create_query_engine, get_numbers_from_table};
use num_traits::AsPrimitive;
use table::test_util::MemTable;
use super::new_query_engine_with_table;
use crate::error::Result;
use crate::tests::{exec_selection, function};
use crate::{QueryEngine, QueryEngineFactory};
use crate::QueryEngine;
#[tokio::test]
async fn test_percentile_aggregator() -> Result<()> {
@@ -80,9 +79,6 @@ where
fn create_correctness_engine() -> Arc<dyn QueryEngine> {
// create engine
let schema_provider = Arc::new(MemorySchemaProvider::new());
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
let catalog_list = Arc::new(MemoryCatalogManager::default());
let mut column_schemas = vec![];
let mut columns = vec![];
@@ -96,20 +92,6 @@ fn create_correctness_engine() -> Arc<dyn QueryEngine> {
columns.push(column);
let schema = Arc::new(Schema::new(column_schemas));
let number_table = Arc::new(MemTable::new(
"corr_numbers",
RecordBatch::new(schema, columns).unwrap(),
));
schema_provider
.register_table_sync(number_table.table_name().to_string(), number_table)
.unwrap();
catalog_provider
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
QueryEngineFactory::new(catalog_list, false).query_engine()
let number_table = MemTable::new("corr_numbers", RecordBatch::new(schema, columns).unwrap());
new_query_engine_with_table(number_table)
}

View File

@@ -14,9 +14,10 @@
use std::sync::Arc;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::local::MemoryCatalogManager;
use catalog::RegisterTableRequest;
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::prelude::BoxedError;
use common_query::prelude::{create_udf, make_scalar_function, Volatility};
use common_query::Output;
@@ -29,7 +30,7 @@ use datatypes::vectors::UInt32Vector;
use session::context::QueryContext;
use snafu::ResultExt;
use table::table::adapter::DfTableProviderAdapter;
use table::table::numbers::NumbersTable;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::test_util::MemTable;
use crate::error::{QueryExecutionSnafu, Result};
@@ -43,7 +44,7 @@ use crate::tests::pow::pow;
#[tokio::test]
async fn test_datafusion_query_engine() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = catalog::local::new_memory_catalog_list()
let catalog_list = catalog::local::new_memory_catalog_manager()
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let factory = QueryEngineFactory::new(catalog_list, false);
@@ -102,29 +103,24 @@ async fn test_datafusion_query_engine() -> Result<()> {
Ok(())
}
fn catalog_list() -> Result<Arc<MemoryCatalogManager>> {
let catalog_list = catalog::local::new_memory_catalog_list()
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
fn catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
let catalog_manager = catalog::local::new_memory_catalog_manager().unwrap();
let req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: NUMBERS_TABLE_NAME.to_string(),
table_id: NUMBERS_TABLE_ID,
table: Arc::new(NumbersTable::default()),
};
catalog_manager.register_table_sync(req).unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table_sync("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
Ok(catalog_list)
Ok(catalog_manager)
}
#[tokio::test]
async fn test_query_validate() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = catalog_list()?;
let catalog_list = catalog_manager()?;
// set plugins
let plugins = Plugins::new();
@@ -155,7 +151,7 @@ async fn test_query_validate() -> Result<()> {
#[tokio::test]
async fn test_udf() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = catalog_list()?;
let catalog_list = catalog_manager()?;
let factory = QueryEngineFactory::new(catalog_list, false);
let engine = factory.query_engine();

View File

@@ -15,7 +15,9 @@
use std::any::Any;
use std::sync::Arc;
use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::local::new_memory_catalog_manager;
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
@@ -114,21 +116,17 @@ fn create_test_engine() -> TimeRangeTester {
filter: Default::default(),
});
let catalog_list = new_memory_catalog_list().unwrap();
let catalog_manager = new_memory_catalog_manager().unwrap();
let req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "m".to_string(),
table_id: table.table_info().ident.table_id,
table: table.clone(),
};
catalog_manager.register_table_sync(req).unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
MemorySchemaProvider::register_table_sync(&default_schema, "m".to_string(), table.clone())
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema_sync("public".to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog_sync("greptime".to_string(), default_catalog)
.unwrap();
let engine = QueryEngineFactory::new(catalog_list, false).query_engine();
let engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
TimeRangeTester { engine, table }
}

View File

@@ -69,6 +69,7 @@ table = { path = "../table" }
tokio.workspace = true
[dev-dependencies]
catalog = { path = "../catalog", features = ["testing"] }
common-test-util = { path = "../common/test-util" }
log-store = { path = "../log-store" }
mito = { path = "../mito", features = ["test"] }

View File

@@ -15,8 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use catalog::local::MemoryCatalogManager;
use common_query::Output;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use futures::Future;
@@ -50,22 +49,10 @@ where
}
pub(crate) fn sample_script_engine() -> PyEngine {
let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table_sync("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list, false);
let query_engine = factory.query_engine();
let catalog_manager = Arc::new(MemoryCatalogManager::new_with_table(Arc::new(
NumbersTable::default(),
)));
let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
PyEngine::new(query_engine.clone())
}

View File

@@ -170,6 +170,7 @@ mod tests {
.await
.unwrap(),
);
catalog_manager.start().await.unwrap();
let factory = QueryEngineFactory::new(catalog_manager.clone(), false);
let query_engine = factory.query_engine();

View File

@@ -357,8 +357,7 @@ pub(crate) use tests::sample_script_engine;
#[cfg(test)]
mod tests {
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use catalog::local::MemoryCatalogManager;
use common_recordbatch::util;
use datatypes::prelude::ScalarVector;
use datatypes::value::Value;
@@ -369,22 +368,10 @@ mod tests {
use super::*;
pub(crate) fn sample_script_engine() -> PyEngine {
let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table_sync("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list, false);
let query_engine = factory.query_engine();
let catalog_manager = Arc::new(MemoryCatalogManager::new_with_table(Arc::new(
NumbersTable::default(),
)));
let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
PyEngine::new(query_engine.clone())
}

View File

@@ -18,7 +18,7 @@ use std::sync::{Arc, RwLock};
use api::v1::greptime_request::{Request as GreptimeRequest, Request};
use api::v1::query_request::Query;
use async_trait::async_trait;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::local::MemoryCatalogManager;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
@@ -200,24 +200,9 @@ impl GrpcQueryHandler for DummyInstance {
}
fn create_testing_instance(table: MemTable) -> DummyInstance {
let table_name = table.table_name().to_string();
let table = Arc::new(table);
let schema_provider = Arc::new(MemorySchemaProvider::new());
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
let catalog_list = Arc::new(MemoryCatalogManager::default());
schema_provider
.register_table_sync(table_name, table)
.unwrap();
catalog_provider
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list, false);
let query_engine = factory.query_engine();
let catalog_manager = Arc::new(MemoryCatalogManager::new_with_table(table));
let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
DummyInstance::new(query_engine)
}

View File

@@ -29,8 +29,8 @@ use table::metadata::TableId;
use table::requests::{AlterKind, AlterTableRequest};
use crate::error::{
AccessCatalogSnafu, CatalogNotFoundSnafu, DeserializeProcedureSnafu, SchemaNotFoundSnafu,
SerializeProcedureSnafu, TableExistsSnafu, TableNotFoundSnafu,
AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableExistsSnafu,
TableNotFoundSnafu,
};
/// Procedure to alter a table.
@@ -134,26 +134,14 @@ impl AlterTableProcedure {
}
async fn on_prepare(&mut self) -> Result<Status> {
// Check whether catalog and schema exist.
let request = &self.data.request;
let catalog = self
let table = self
.catalog_manager
.catalog(&request.catalog_name)
.await
.context(AccessCatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: &request.catalog_name,
})?;
let schema = catalog
.schema(&request.schema_name)
.await
.context(AccessCatalogSnafu)?
.context(SchemaNotFoundSnafu {
name: &request.schema_name,
})?;
let table = schema
.table(&request.table_name)
.table(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.await
.context(AccessCatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
@@ -162,12 +150,14 @@ impl AlterTableProcedure {
request.catalog_name, request.schema_name, request.table_name
),
})?;
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
ensure!(
!schema
.table_exist(new_table_name)
self.catalog_manager
.table(&request.catalog_name, &request.schema_name, new_table_name)
.await
.context(AccessCatalogSnafu)?,
.context(AccessCatalogSnafu)?
.is_none(),
TableExistsSnafu {
name: format!(
"{}.{}.{}",
@@ -341,16 +331,17 @@ mod tests {
let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap();
watcher.changed().await.unwrap();
let catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
let table = catalog_manager
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.await
.unwrap()
.unwrap();
let schema = catalog.schema(DEFAULT_SCHEMA_NAME).await.unwrap().unwrap();
let table = schema.table(new_table_name).await.unwrap().unwrap();
let table_info = table.table_info();
assert_eq!(new_table_name, table_info.name);
assert!(schema.table(table_name).await.unwrap().is_none());
assert!(!catalog_manager
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap());
}
}

View File

@@ -23,13 +23,12 @@ use common_procedure::{
};
use common_telemetry::logging;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference};
use table::requests::{CreateTableRequest, OpenTableRequest};
use crate::error::{
AccessCatalogSnafu, CatalogNotFoundSnafu, DeserializeProcedureSnafu, SchemaNotFoundSnafu,
SerializeProcedureSnafu,
AccessCatalogSnafu, DeserializeProcedureSnafu, SchemaNotFoundSnafu, SerializeProcedureSnafu,
};
/// Procedure to create a table.
@@ -133,34 +132,24 @@ impl CreateTableProcedure {
}
async fn on_prepare(&mut self) -> Result<Status> {
// Check whether catalog and schema exist.
let catalog = self
if !self
.catalog_manager
.catalog(&self.data.request.catalog_name)
.schema_exist(
&self.data.request.catalog_name,
&self.data.request.schema_name,
)
.await
.context(AccessCatalogSnafu)?
.with_context(|| {
logging::error!(
"Failed to create table {}, catalog not found",
self.data.table_ref()
);
CatalogNotFoundSnafu {
name: &self.data.request.catalog_name,
}
})?;
catalog
.schema(&self.data.request.schema_name)
.await
.context(AccessCatalogSnafu)?
.with_context(|| {
logging::error!(
"Failed to create table {}, schema not found",
self.data.table_ref(),
);
SchemaNotFoundSnafu {
name: &self.data.request.schema_name,
}
})?;
{
logging::error!(
"Failed to create table {}, schema not found",
self.data.table_ref(),
);
return SchemaNotFoundSnafu {
name: &self.data.request.schema_name,
}
.fail()?;
}
self.data.state = CreateTableState::EngineCreateTable;
// Assign procedure id to the subprocedure.
@@ -224,28 +213,16 @@ impl CreateTableProcedure {
}
async fn on_register_catalog(&mut self) -> Result<Status> {
let catalog = self
if self
.catalog_manager
.catalog(&self.data.request.catalog_name)
.await
.context(AccessCatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: &self.data.request.catalog_name,
})?;
let schema = catalog
.schema(&self.data.request.schema_name)
.await
.context(AccessCatalogSnafu)?
.context(SchemaNotFoundSnafu {
name: &self.data.request.schema_name,
})?;
let table_exists = schema
.table(&self.data.request.table_name)
.table_exist(
&self.data.request.catalog_name,
&self.data.request.schema_name,
&self.data.request.table_name,
)
.await
.map_err(Error::from_error_ext)?
.is_some();
if table_exists {
// Table already exists.
{
return Ok(Status::Done);
}

View File

@@ -28,8 +28,7 @@ use table::engine::{EngineContext, TableEngineProcedureRef, TableReference};
use table::requests::DropTableRequest;
use crate::error::{
AccessCatalogSnafu, DeregisterTableSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu,
TableNotFoundSnafu,
AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableNotFoundSnafu,
};
/// Procedure to drop a table.
@@ -159,17 +158,10 @@ impl DropTableProcedure {
schema: self.data.request.schema_name.clone(),
table_name: self.data.request.table_name.clone(),
};
if !self
.catalog_manager
self.catalog_manager
.deregister_table(deregister_table_req)
.await
.context(AccessCatalogSnafu)?
{
return DeregisterTableSnafu {
name: request.table_ref().to_string(),
}
.fail()?;
}
.context(AccessCatalogSnafu)?;
}
self.data.state = DropTableState::EngineDropTable;
@@ -298,13 +290,11 @@ mod tests {
let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap();
watcher.changed().await.unwrap();
let catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
assert!(!catalog_manager
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.await
.unwrap()
.unwrap();
let schema = catalog.schema(DEFAULT_SCHEMA_NAME).await.unwrap().unwrap();
assert!(schema.table(table_name).await.unwrap().is_none());
.unwrap());
let ctx = EngineContext::default();
assert!(!table_engine.table_exists(&ctx, table_id,));
}

View File

@@ -16,6 +16,7 @@ use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_query::physical_plan::PhysicalPlanRef;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
@@ -38,6 +39,8 @@ use crate::table::{Expr, Table};
const NUMBER_COLUMN: &str = "number";
pub const NUMBERS_TABLE_NAME: &str = "numbers";
/// numbers table for test
#[derive(Debug, Clone)]
pub struct NumbersTable {
@@ -49,7 +52,7 @@ pub struct NumbersTable {
impl NumbersTable {
pub fn new(table_id: TableId) -> Self {
NumbersTable::with_name(table_id, "numbers".to_string())
NumbersTable::with_name(table_id, NUMBERS_TABLE_NAME.to_string())
}
pub fn with_name(table_id: TableId, name: String) -> Self {
@@ -74,7 +77,7 @@ impl NumbersTable {
impl Default for NumbersTable {
fn default() -> Self {
NumbersTable::new(1)
NumbersTable::new(NUMBERS_TABLE_ID)
}
}
@@ -93,8 +96,8 @@ impl Table for NumbersTable {
TableInfoBuilder::default()
.table_id(self.table_id)
.name(&self.name)
.catalog_name("greptime")
.schema_name("public")
.catalog_name(DEFAULT_CATALOG_NAME)
.schema_name(DEFAULT_SCHEMA_NAME)
.table_version(0)
.table_type(TableType::Base)
.meta(

View File

@@ -184,7 +184,7 @@ impl GreptimeDbClusterBuilder {
instance.start().await.unwrap();
// create another catalog and schema for testing
let _ = instance
instance
.catalog_manager()
.as_any()
.downcast_ref::<RemoteCatalogManager>()

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use axum::Router;
use catalog::CatalogManagerRef;
use catalog::{CatalogManagerRef, RegisterTableRequest};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE,
};
@@ -308,19 +308,14 @@ pub async fn create_test_table(
.await
.context(CreateTableSnafu { table_name })?;
let schema_provider = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.unwrap()
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
schema_provider
.register_table(table_name.to_string(), table)
.await
.unwrap();
let req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id: table.table_info().ident.table_id,
table,
};
catalog_manager.register_table(req).await.unwrap();
Ok(())
}

View File

@@ -19,7 +19,7 @@ mod test_util;
use std::collections::HashMap;
use std::sync::Arc;
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::RegisterSchemaRequest;
use common_test_util::temp_dir::TempDir;
use datanode::instance::Instance as DatanodeInstance;
use frontend::instance::Instance;
@@ -71,17 +71,18 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon
.await
.unwrap();
// create another catalog and schema for testing
let another_catalog = Arc::new(MemoryCatalogProvider::new());
let _ = another_catalog
.register_schema_sync(
"another_schema".to_string(),
Arc::new(MemorySchemaProvider::new()),
)
.unwrap();
let _ = dn_instance
dn_instance
.catalog_manager()
.register_catalog("another_catalog".to_string(), another_catalog)
.register_catalog("another_catalog".to_string())
.await
.unwrap();
let req = RegisterSchemaRequest {
catalog: "another_catalog".to_string(),
schema: "another_schema".to_string(),
};
dn_instance
.catalog_manager()
.register_schema(req)
.await
.unwrap();

View File

@@ -1325,7 +1325,7 @@ async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {
+---------------+--------------+------------+------------+----------+-------------+
| table_catalog | table_schema | table_name | table_type | table_id | engine |
+---------------+--------------+------------+------------+----------+-------------+
| greptime | public | numbers | BASE TABLE | 1 | test_engine |
| greptime | public | numbers | BASE TABLE | 2 | test_engine |
| greptime | public | scripts | BASE TABLE | 1024 | mito |
+---------------+--------------+------------+------------+----------+-------------+"
}
@@ -1334,7 +1334,7 @@ async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {
+---------------+--------------+------------+------------+----------+-------------+
| table_catalog | table_schema | table_name | table_type | table_id | engine |
+---------------+--------------+------------+------------+----------+-------------+
| greptime | public | numbers | BASE TABLE | 1 | test_engine |
| greptime | public | numbers | BASE TABLE | 2 | test_engine |
| greptime | public | scripts | BASE TABLE | 1 | mito |
+---------------+--------------+------------+------------+----------+-------------+"
}