refactor: catalog (#1454)

* wip

* add schema_async

* remove CatalogList

* remove catalog provider and schema provider

* fix

* fix: rename table

* fix: sqlness

* fix: ignore tonic error metadata

* fix: table engine name

* feat: rename catalog_async to catalog

* respect engine name in table regional value when deregistering tables

* fix: CR
This commit is contained in:
Lei, HUANG
2023-04-26 16:36:40 +08:00
committed by GitHub
parent ef4e473e6d
commit fb9978e95d
59 changed files with 998 additions and 1227 deletions

View File

@@ -1,15 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod catalog_adapter;

View File

@@ -1,324 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Catalog adapter between datafusion and greptime query engine.
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use datafusion::catalog::catalog::{
CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider,
};
use datafusion::catalog::schema::SchemaProvider as DfSchemaProvider;
use datafusion::datasource::TableProvider as DfTableProvider;
use datafusion::error::Result as DataFusionResult;
use snafu::ResultExt;
use table::table::adapter::{DfTableProviderAdapter, TableAdapter};
use table::TableRef;
use crate::error::{self, Result, SchemaProviderOperationSnafu};
use crate::{
CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef,
};
pub struct DfCatalogListAdapter {
catalog_list: CatalogListRef,
}
impl DfCatalogListAdapter {
pub fn new(catalog_list: CatalogListRef) -> DfCatalogListAdapter {
DfCatalogListAdapter { catalog_list }
}
}
impl DfCatalogList for DfCatalogListAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn DfCatalogProvider>,
) -> Option<Arc<dyn DfCatalogProvider>> {
let catalog_adapter = Arc::new(CatalogProviderAdapter {
df_catalog_provider: catalog,
});
self.catalog_list
.register_catalog(name, catalog_adapter)
.expect("datafusion does not accept fallible catalog access") // TODO(hl): datafusion register catalog does not handles errors
.map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _)
}
fn catalog_names(&self) -> Vec<String> {
// TODO(hl): datafusion register catalog does not handles errors
self.catalog_list
.catalog_names()
.expect("datafusion does not accept fallible catalog access")
}
fn catalog(&self, name: &str) -> Option<Arc<dyn DfCatalogProvider>> {
self.catalog_list
.catalog(name)
.expect("datafusion does not accept fallible catalog access") // TODO(hl): datafusion register catalog does not handles errors
.map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _)
}
}
/// Datafusion's CatalogProvider -> greptime CatalogProvider
struct CatalogProviderAdapter {
df_catalog_provider: Arc<dyn DfCatalogProvider>,
}
impl CatalogProvider for CatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Result<Vec<String>> {
Ok(self.df_catalog_provider.schema_names())
}
fn register_schema(
&self,
_name: String,
_schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>> {
todo!("register_schema is not supported in Datafusion catalog provider")
}
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self
.df_catalog_provider
.schema(name)
.map(|df_schema_provider| Arc::new(SchemaProviderAdapter { df_schema_provider }) as _))
}
}
///Greptime CatalogProvider -> datafusion's CatalogProvider
pub struct DfCatalogProviderAdapter {
catalog_provider: CatalogProviderRef,
}
impl DfCatalogProviderAdapter {
pub fn new(catalog_provider: CatalogProviderRef) -> Self {
Self { catalog_provider }
}
}
impl DfCatalogProvider for DfCatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.catalog_provider
.schema_names()
.expect("datafusion does not accept fallible catalog access")
}
fn schema(&self, name: &str) -> Option<Arc<dyn DfSchemaProvider>> {
self.catalog_provider
.schema(name)
.expect("datafusion does not accept fallible catalog access")
.map(|schema_provider| Arc::new(DfSchemaProviderAdapter { schema_provider }) as _)
}
}
/// Greptime SchemaProvider -> datafusion SchemaProvider
struct DfSchemaProviderAdapter {
schema_provider: Arc<dyn SchemaProvider>,
}
#[async_trait]
impl DfSchemaProvider for DfSchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.schema_provider
.table_names()
.expect("datafusion does not accept fallible catalog access")
}
async fn table(&self, name: &str) -> Option<Arc<dyn DfTableProvider>> {
self.schema_provider
.table(name)
.await
.expect("datafusion does not accept fallible catalog access")
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
}
fn register_table(
&self,
name: String,
table: Arc<dyn DfTableProvider>,
) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
let table = Arc::new(TableAdapter::new(table)?);
match self.schema_provider.register_table(name, table)? {
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
None => Ok(None),
}
}
fn deregister_table(&self, name: &str) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
match self.schema_provider.deregister_table(name)? {
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
None => Ok(None),
}
}
fn table_exist(&self, name: &str) -> bool {
self.schema_provider
.table_exist(name)
.expect("datafusion does not accept fallible catalog access")
}
}
/// Datafusion SchemaProviderAdapter -> greptime SchemaProviderAdapter
struct SchemaProviderAdapter {
df_schema_provider: Arc<dyn DfSchemaProvider>,
}
#[async_trait]
impl SchemaProvider for SchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Result<Vec<String>> {
Ok(self.df_schema_provider.table_names())
}
async fn table(&self, name: &str) -> Result<Option<TableRef>> {
let table = self.df_schema_provider.table(name).await;
let table = table.map(|table_provider| {
match table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
{
Some(adapter) => adapter.table(),
None => {
// TODO(yingwen): Avoid panic here.
let adapter =
TableAdapter::new(table_provider).expect("convert datafusion table");
Arc::new(adapter) as _
}
}
});
Ok(table)
}
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
Ok(self
.df_schema_provider
.register_table(name, table_provider)
.context(error::DatafusionSnafu {
msg: "Fail to register table to datafusion",
})
.map_err(BoxedError::new)
.context(SchemaProviderOperationSnafu)?
.map(|_| table))
}
fn rename_table(&self, _name: &str, _new_name: String) -> Result<TableRef> {
todo!()
}
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
self.df_schema_provider
.deregister_table(name)
.context(error::DatafusionSnafu {
msg: "Fail to deregister table from datafusion",
})
.map_err(BoxedError::new)
.context(SchemaProviderOperationSnafu)?
.map(|table| {
let adapter = TableAdapter::new(table)
.context(error::TableSchemaMismatchSnafu)
.map_err(BoxedError::new)
.context(SchemaProviderOperationSnafu)?;
Ok(Arc::new(adapter) as _)
})
.transpose()
}
fn table_exist(&self, name: &str) -> Result<bool> {
Ok(self.df_schema_provider.table_exist(name))
}
}
#[cfg(test)]
mod tests {
use table::table::numbers::NumbersTable;
use super::*;
use crate::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider};
#[test]
#[should_panic]
pub fn test_register_schema() {
let adapter = CatalogProviderAdapter {
df_catalog_provider: Arc::new(
datafusion::catalog::catalog::MemoryCatalogProvider::new(),
),
};
adapter
.register_schema(
"whatever".to_string(),
Arc::new(MemorySchemaProvider::new()),
)
.unwrap();
}
#[tokio::test]
async fn test_register_table() {
let adapter = DfSchemaProviderAdapter {
schema_provider: Arc::new(MemorySchemaProvider::new()),
};
adapter
.register_table(
"test_table".to_string(),
Arc::new(DfTableProviderAdapter::new(Arc::new(
NumbersTable::default(),
))),
)
.unwrap();
adapter.table("test_table").await.unwrap();
}
#[test]
pub fn test_register_catalog() {
let catalog_list = DfCatalogListAdapter {
catalog_list: new_memory_catalog_list().unwrap(),
};
assert!(catalog_list
.register_catalog(
"test_catalog".to_string(),
Arc::new(DfCatalogProviderAdapter {
catalog_provider: Arc::new(MemoryCatalogProvider::new()),
}),
)
.is_none());
catalog_list.catalog("test_catalog").unwrap();
}
}

View File

@@ -191,6 +191,7 @@ impl TableRegionalKey {
pub struct TableRegionalValue {
pub version: TableVersion,
pub regions_ids: Vec<u32>,
pub engine_name: Option<String>,
}
pub struct CatalogKey {

View File

@@ -49,7 +49,7 @@ impl SchemaProvider for InformationSchemaProvider {
self
}
fn table_names(&self) -> Result<Vec<String>> {
async fn table_names(&self) -> Result<Vec<String>> {
Ok(vec![TABLES.to_string()])
}
@@ -74,7 +74,7 @@ impl SchemaProvider for InformationSchemaProvider {
Ok(Some(Arc::new(table)))
}
fn table_exist(&self, name: &str) -> Result<bool> {
async fn table_exist(&self, name: &str) -> Result<bool> {
Ok(matches!(name.to_ascii_lowercase().as_str(), TABLES))
}
}

View File

@@ -98,13 +98,13 @@ impl InformationSchemaTablesBuilder {
async fn make_tables(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
for schema_name in self.catalog_provider.schema_names()? {
for schema_name in self.catalog_provider.schema_names().await? {
if schema_name == INFORMATION_SCHEMA_NAME {
continue;
}
let Some(schema) = self.catalog_provider.schema(&schema_name)? else { continue };
for table_name in schema.table_names()? {
let Some(schema) = self.catalog_provider.schema(&schema_name).await? else { continue };
for table_name in schema.table_names().await? {
let Some(table) = schema.table(&table_name).await? else { continue };
let table_info = table.table_info();
self.add_table(

View File

@@ -29,7 +29,6 @@ use table::TableRef;
use crate::error::{CreateTableSnafu, Result};
pub use crate::schema::{SchemaProvider, SchemaProviderRef};
pub mod datafusion;
pub mod error;
pub mod helper;
pub(crate) mod information_schema;
@@ -40,55 +39,40 @@ pub mod system;
pub mod table_source;
pub mod tables;
/// Represent a list of named catalogs
pub trait CatalogList: Sync + Send {
/// Returns the catalog list as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Adds a new catalog to this catalog list
/// If a catalog of the same name existed before, it is replaced in the list and returned.
fn register_catalog(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>>;
/// Retrieves the list of available catalog names
fn catalog_names(&self) -> Result<Vec<String>>;
/// Retrieves a specific catalog by name, provided it exists.
fn catalog(&self, name: &str) -> Result<Option<CatalogProviderRef>>;
}
/// Represents a catalog, comprising a number of named schemas.
#[async_trait::async_trait]
pub trait CatalogProvider: Sync + Send {
/// Returns the catalog provider as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Retrieves the list of available schema names in this catalog.
fn schema_names(&self) -> Result<Vec<String>>;
async fn schema_names(&self) -> Result<Vec<String>>;
/// Registers schema to this catalog.
fn register_schema(
async fn register_schema(
&self,
name: String,
schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>>;
/// Retrieves a specific schema from the catalog by name, provided it exists.
fn schema(&self, name: &str) -> Result<Option<SchemaProviderRef>>;
async fn schema(&self, name: &str) -> Result<Option<SchemaProviderRef>>;
}
pub type CatalogListRef = Arc<dyn CatalogList>;
pub type CatalogProviderRef = Arc<dyn CatalogProvider>;
#[async_trait::async_trait]
pub trait CatalogManager: CatalogList {
pub trait CatalogManager: Send + Sync {
/// Starts a catalog manager.
async fn start(&self) -> Result<()>;
async fn register_catalog(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>>;
/// Registers a table within given catalog/schema to catalog manager,
/// returns whether the table registered.
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool>;
@@ -108,7 +92,11 @@ pub trait CatalogManager: CatalogList {
async fn register_system_table(&self, request: RegisterSystemTableRequest)
-> error::Result<()>;
fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>>;
async fn catalog_names(&self) -> Result<Vec<String>>;
async fn catalog(&self, catalog: &str) -> Result<Option<CatalogProviderRef>>;
async fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>>;
/// Returns the table by catalog, schema and table name.
async fn table(
@@ -117,6 +105,8 @@ pub trait CatalogManager: CatalogList {
schema: &str,
table_name: &str,
) -> Result<Option<TableRef>>;
fn as_any(&self) -> &dyn Any;
}
pub type CatalogManagerRef = Arc<dyn CatalogManager>;
@@ -238,15 +228,15 @@ pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec<Reg
let mut region_number: u64 = 0;
let mut region_stats = Vec::new();
let Ok(catalog_names) = catalog_manager.catalog_names() else { return (region_number, region_stats) };
let Ok(catalog_names) = catalog_manager.catalog_names().await else { return (region_number, region_stats) };
for catalog_name in catalog_names {
let Ok(Some(catalog)) = catalog_manager.catalog(&catalog_name) else { continue };
let Ok(Some(catalog)) = catalog_manager.catalog(&catalog_name).await else { continue };
let Ok(schema_names) = catalog.schema_names() else { continue };
let Ok(schema_names) = catalog.schema_names().await else { continue };
for schema_name in schema_names {
let Ok(Some(schema)) = catalog.schema(&schema_name) else { continue };
let Ok(Some(schema)) = catalog.schema(&schema_name).await else { continue };
let Ok(table_names) = schema.table_names() else { continue };
let Ok(table_names) = schema.table_names().await else { continue };
for table_name in table_names {
let Ok(Some(table)) = schema.table(&table_name).await else { continue };

View File

@@ -48,9 +48,9 @@ use crate::system::{
};
use crate::tables::SystemCatalog;
use crate::{
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef,
handle_system_table_request, CatalogManager, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProviderRef,
};
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
@@ -88,7 +88,7 @@ impl LocalCatalogManager {
/// Scan all entries from system catalog table
pub async fn init(&self) -> Result<()> {
self.init_system_catalog()?;
self.init_system_catalog().await?;
let system_records = self.system.information_schema.system.records().await?;
let entries = self.collect_system_catalog_entries(system_records).await?;
let max_table_id = self.handle_system_catalog_entries(entries).await?;
@@ -114,27 +114,27 @@ impl LocalCatalogManager {
Ok(())
}
fn init_system_catalog(&self) -> Result<()> {
async fn init_system_catalog(&self) -> Result<()> {
let system_schema = Arc::new(MemorySchemaProvider::new());
system_schema.register_table(
system_schema.register_table_sync(
SYSTEM_CATALOG_TABLE_NAME.to_string(),
self.system.information_schema.system.clone(),
)?;
let system_catalog = Arc::new(MemoryCatalogProvider::new());
system_catalog.register_schema(INFORMATION_SCHEMA_NAME.to_string(), system_schema)?;
system_catalog.register_schema_sync(INFORMATION_SCHEMA_NAME.to_string(), system_schema)?;
self.catalogs
.register_catalog(SYSTEM_CATALOG_NAME.to_string(), system_catalog)?;
.register_catalog_sync(SYSTEM_CATALOG_NAME.to_string(), system_catalog)?;
let default_catalog = Arc::new(MemoryCatalogProvider::new());
let default_schema = Arc::new(MemorySchemaProvider::new());
// Add numbers table for test
let table = Arc::new(NumbersTable::default());
default_schema.register_table("numbers".to_string(), table)?;
default_schema.register_table_sync("numbers".to_string(), table)?;
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?;
default_catalog.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?;
self.catalogs
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)?;
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)?;
Ok(())
}
@@ -213,16 +213,17 @@ impl LocalCatalogManager {
info!("Register catalog: {}", c.catalog_name);
}
Entry::Schema(s) => {
let catalog =
self.catalogs
.catalog(&s.catalog_name)?
.context(CatalogNotFoundSnafu {
catalog_name: &s.catalog_name,
})?;
catalog.register_schema(
s.schema_name.clone(),
Arc::new(MemorySchemaProvider::new()),
)?;
self.catalogs
.catalog(&s.catalog_name)
.await?
.context(CatalogNotFoundSnafu {
catalog_name: &s.catalog_name,
})?
.register_schema(
s.schema_name.clone(),
Arc::new(MemorySchemaProvider::new()),
)
.await?;
info!("Registered schema: {:?}", s);
}
Entry::Table(t) => {
@@ -243,14 +244,16 @@ impl LocalCatalogManager {
}
async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> {
let catalog = self
.catalogs
.catalog(&t.catalog_name)?
.context(CatalogNotFoundSnafu {
catalog_name: &t.catalog_name,
})?;
let catalog =
self.catalogs
.catalog(&t.catalog_name)
.await?
.context(CatalogNotFoundSnafu {
catalog_name: &t.catalog_name,
})?;
let schema = catalog
.schema(&t.schema_name)?
.schema(&t.schema_name)
.await?
.context(SchemaNotFoundSnafu {
catalog: &t.catalog_name,
schema: &t.schema_name,
@@ -286,37 +289,11 @@ impl LocalCatalogManager {
),
})?;
schema.register_table(t.table_name.clone(), option)?;
schema.register_table(t.table_name.clone(), option).await?;
Ok(())
}
}
impl CatalogList for LocalCatalogManager {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>> {
self.catalogs.register_catalog(name, catalog)
}
fn catalog_names(&self) -> Result<Vec<String>> {
self.catalogs.catalog_names()
}
fn catalog(&self, name: &str) -> Result<Option<CatalogProviderRef>> {
if name.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) {
Ok(Some(self.system.clone()))
} else {
self.catalogs.catalog(name)
}
}
}
#[async_trait::async_trait]
impl TableIdProvider for LocalCatalogManager {
async fn next_table_id(&self) -> table::Result<TableId> {
@@ -347,10 +324,12 @@ impl CatalogManager for LocalCatalogManager {
let catalog = self
.catalogs
.catalog(catalog_name)?
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
let schema = catalog
.schema(schema_name)?
.schema(schema_name)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
@@ -388,7 +367,9 @@ impl CatalogManager for LocalCatalogManager {
engine,
)
.await?;
schema.register_table(request.table_name, request.table)?;
schema
.register_table(request.table_name, request.table)
.await?;
Ok(true)
}
}
@@ -409,11 +390,13 @@ impl CatalogManager for LocalCatalogManager {
let catalog = self
.catalogs
.catalog(catalog_name)?
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
let schema = catalog
.schema(schema_name)?
.schema(schema_name)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
@@ -421,7 +404,7 @@ impl CatalogManager for LocalCatalogManager {
let _lock = self.register_lock.lock().await;
ensure!(
!schema.table_exist(&request.new_table_name)?,
!schema.table_exist(&request.new_table_name).await?,
TableExistsSnafu {
table: &request.new_table_name
}
@@ -447,6 +430,7 @@ impl CatalogManager for LocalCatalogManager {
let renamed = schema
.rename_table(&request.table_name, request.new_table_name.clone())
.await
.is_ok();
Ok(renamed)
}
@@ -497,13 +481,14 @@ impl CatalogManager for LocalCatalogManager {
let catalog = self
.catalogs
.catalog(catalog_name)?
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
{
let _lock = self.register_lock.lock().await;
ensure!(
catalog.schema(schema_name)?.is_none(),
catalog.schema(schema_name).await?.is_none(),
SchemaExistsSnafu {
schema: schema_name,
}
@@ -511,7 +496,9 @@ impl CatalogManager for LocalCatalogManager {
self.system
.register_schema(request.catalog, schema_name.clone())
.await?;
catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?;
catalog
.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))
.await?;
Ok(true)
}
}
@@ -530,13 +517,15 @@ impl CatalogManager for LocalCatalogManager {
Ok(())
}
fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
async fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
self.catalogs
.catalog(catalog)?
.catalog(catalog)
.await?
.context(CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.schema(schema)
.await
}
async fn table(
@@ -547,16 +536,42 @@ impl CatalogManager for LocalCatalogManager {
) -> Result<Option<TableRef>> {
let catalog = self
.catalogs
.catalog(catalog_name)?
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
let schema = catalog
.schema(schema_name)?
.schema(schema_name)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?;
schema.table(table_name).await
}
async fn catalog(&self, catalog: &str) -> Result<Option<CatalogProviderRef>> {
if catalog.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) {
Ok(Some(self.system.clone()))
} else {
self.catalogs.catalog(catalog).await
}
}
async fn catalog_names(&self) -> Result<Vec<String>> {
self.catalogs.catalog_names().await
}
async fn register_catalog(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>> {
self.catalogs.register_catalog(name, catalog).await
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]

View File

@@ -31,7 +31,7 @@ use crate::error::{
};
use crate::schema::SchemaProvider;
use crate::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProviderRef,
};
@@ -51,10 +51,10 @@ impl Default for MemoryCatalogManager {
};
let default_catalog = Arc::new(MemoryCatalogProvider::new());
manager
.register_catalog("greptime".to_string(), default_catalog.clone())
.register_catalog_sync("greptime".to_string(), default_catalog.clone())
.unwrap();
default_catalog
.register_schema("public".to_string(), Arc::new(MemorySchemaProvider::new()))
.register_schema_sync("public".to_string(), Arc::new(MemorySchemaProvider::new()))
.unwrap();
manager
}
@@ -75,70 +75,70 @@ impl CatalogManager for MemoryCatalogManager {
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get(&request.catalog)
let schema = self
.catalog(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.clone();
let schema = catalog
.schema(&request.schema)?
.with_context(|| SchemaNotFoundSnafu {
.schema(&request.schema)
.await?
.context(SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
schema
.register_table(request.table_name, request.table)
.await
.map(|v| v.is_none())
}
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get(&request.catalog)
let catalog = self
.catalog(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.clone();
let schema = catalog
.schema(&request.schema)?
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
let schema =
catalog
.schema(&request.schema)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
Ok(schema
.rename_table(&request.table_name, request.new_table_name)
.await
.is_ok())
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get(&request.catalog)
let schema = self
.catalog(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.clone();
let schema = catalog
.schema(&request.schema)?
.schema(&request.schema)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
schema
.deregister_table(&request.table_name)
.await
.map(|v| v.is_some())
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get(&request.catalog)
let catalog = self
.catalog(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?;
catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?;
catalog
.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))
.await?;
Ok(true)
}
@@ -147,10 +147,9 @@ impl CatalogManager for MemoryCatalogManager {
Ok(())
}
fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
let catalogs = self.catalogs.read().unwrap();
if let Some(c) = catalogs.get(catalog) {
c.schema(schema)
async fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
if let Some(c) = self.catalog(catalog) {
c.schema(schema).await
} else {
Ok(None)
}
@@ -162,15 +161,30 @@ impl CatalogManager for MemoryCatalogManager {
schema: &str,
table_name: &str,
) -> Result<Option<TableRef>> {
let catalog = {
let c = self.catalogs.read().unwrap();
let Some(c) = c.get(catalog) else { return Ok(None) };
c.clone()
};
match catalog.schema(schema)? {
None => Ok(None),
Some(s) => s.table(table_name).await,
}
let Some(catalog) = self
.catalog(catalog) else { return Ok(None)};
let Some(s) = catalog.schema(schema).await? else { return Ok(None) };
s.table(table_name).await
}
async fn catalog(&self, catalog: &str) -> Result<Option<CatalogProviderRef>> {
Ok(self.catalogs.read().unwrap().get(catalog).cloned())
}
async fn catalog_names(&self) -> Result<Vec<String>> {
Ok(self.catalogs.read().unwrap().keys().cloned().collect())
}
async fn register_catalog(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>> {
self.register_catalog_sync(name, catalog)
}
fn as_any(&self) -> &dyn Any {
self
}
}
@@ -192,14 +206,8 @@ impl MemoryCatalogManager {
}
}
}
}
impl CatalogList for MemoryCatalogManager {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
pub fn register_catalog_sync(
&self,
name: String,
catalog: CatalogProviderRef,
@@ -208,14 +216,8 @@ impl CatalogList for MemoryCatalogManager {
Ok(catalogs.insert(name, catalog))
}
fn catalog_names(&self) -> Result<Vec<String>> {
let catalogs = self.catalogs.read().unwrap();
Ok(catalogs.keys().map(|s| s.to_string()).collect())
}
fn catalog(&self, name: &str) -> Result<Option<CatalogProviderRef>> {
let catalogs = self.catalogs.read().unwrap();
Ok(catalogs.get(name).cloned())
fn catalog(&self, catalog_name: &str) -> Option<CatalogProviderRef> {
self.catalogs.read().unwrap().get(catalog_name).cloned()
}
}
@@ -237,19 +239,13 @@ impl MemoryCatalogProvider {
schemas: RwLock::new(HashMap::new()),
}
}
}
impl CatalogProvider for MemoryCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Result<Vec<String>> {
pub fn schema_names_sync(&self) -> Result<Vec<String>> {
let schemas = self.schemas.read().unwrap();
Ok(schemas.keys().cloned().collect())
}
fn register_schema(
pub fn register_schema_sync(
&self,
name: String,
schema: SchemaProviderRef,
@@ -262,12 +258,35 @@ impl CatalogProvider for MemoryCatalogProvider {
Ok(schemas.insert(name, schema))
}
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
pub fn schema_sync(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
let schemas = self.schemas.read().unwrap();
Ok(schemas.get(name).cloned())
}
}
#[async_trait::async_trait]
impl CatalogProvider for MemoryCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
async fn schema_names(&self) -> Result<Vec<String>> {
self.schema_names_sync()
}
async fn register_schema(
&self,
name: String,
schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>> {
self.register_schema_sync(name, schema)
}
async fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
self.schema_sync(name)
}
}
/// Simple in-memory implementation of a schema.
pub struct MemorySchemaProvider {
tables: RwLock<HashMap<String, TableRef>>,
@@ -280,31 +299,8 @@ impl MemorySchemaProvider {
tables: RwLock::new(HashMap::new()),
}
}
}
impl Default for MemorySchemaProvider {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl SchemaProvider for MemorySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Result<Vec<String>> {
let tables = self.tables.read().unwrap();
Ok(tables.keys().cloned().collect())
}
async fn table(&self, name: &str) -> Result<Option<TableRef>> {
let tables = self.tables.read().unwrap();
Ok(tables.get(name).cloned())
}
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
pub fn register_table_sync(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
let mut tables = self.tables.write().unwrap();
if let Some(existing) = tables.get(name.as_str()) {
// if table with the same name but different table id exists, then it's a fatal bug
@@ -322,7 +318,7 @@ impl SchemaProvider for MemorySchemaProvider {
}
}
fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
pub fn rename_table_sync(&self, name: &str, new_name: String) -> Result<TableRef> {
let mut tables = self.tables.write().unwrap();
let Some(table) = tables.remove(name) else {
return TableNotFoundSnafu {
@@ -340,14 +336,53 @@ impl SchemaProvider for MemorySchemaProvider {
Ok(table)
}
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
pub fn table_exist_sync(&self, name: &str) -> Result<bool> {
let tables = self.tables.read().unwrap();
Ok(tables.contains_key(name))
}
pub fn deregister_table_sync(&self, name: &str) -> Result<Option<TableRef>> {
let mut tables = self.tables.write().unwrap();
Ok(tables.remove(name))
}
}
fn table_exist(&self, name: &str) -> Result<bool> {
impl Default for MemorySchemaProvider {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl SchemaProvider for MemorySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
async fn table_names(&self) -> Result<Vec<String>> {
let tables = self.tables.read().unwrap();
Ok(tables.contains_key(name))
Ok(tables.keys().cloned().collect())
}
async fn table(&self, name: &str) -> Result<Option<TableRef>> {
let tables = self.tables.read().unwrap();
Ok(tables.get(name).cloned())
}
async fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
self.register_table_sync(name, table)
}
async fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
self.rename_table_sync(name, new_name)
}
async fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
self.deregister_table_sync(name)
}
async fn table_exist(&self, name: &str) -> Result<bool> {
self.table_exist_sync(name)
}
}
@@ -368,15 +403,20 @@ mod tests {
#[tokio::test]
async fn test_new_memory_catalog_list() {
let catalog_list = new_memory_catalog_list().unwrap();
let default_catalog = catalog_list.catalog(DEFAULT_CATALOG_NAME).unwrap().unwrap();
let default_catalog = CatalogManager::catalog(&*catalog_list, DEFAULT_CATALOG_NAME)
.await
.unwrap()
.unwrap();
let default_schema = default_catalog
.schema(DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.await
.unwrap();
let table = default_schema.table("numbers").await.unwrap();
@@ -388,17 +428,17 @@ mod tests {
async fn test_mem_provider() {
let provider = MemorySchemaProvider::new();
let table_name = "numbers";
assert!(!provider.table_exist(table_name).unwrap());
assert!(provider.deregister_table(table_name).unwrap().is_none());
assert!(!provider.table_exist_sync(table_name).unwrap());
provider.deregister_table_sync(table_name).unwrap();
let test_table = NumbersTable::default();
// register table successfully
assert!(provider
.register_table(table_name.to_string(), Arc::new(test_table))
.register_table_sync(table_name.to_string(), Arc::new(test_table))
.unwrap()
.is_none());
assert!(provider.table_exist(table_name).unwrap());
assert!(provider.table_exist_sync(table_name).unwrap());
let other_table = NumbersTable::new(12);
let result = provider.register_table(table_name.to_string(), Arc::new(other_table));
let result = provider.register_table_sync(table_name.to_string(), Arc::new(other_table));
let err = result.err().unwrap();
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}
@@ -407,27 +447,27 @@ mod tests {
async fn test_mem_provider_rename_table() {
let provider = MemorySchemaProvider::new();
let table_name = "num";
assert!(!provider.table_exist(table_name).unwrap());
assert!(!provider.table_exist_sync(table_name).unwrap());
let test_table: TableRef = Arc::new(NumbersTable::default());
// register test table
assert!(provider
.register_table(table_name.to_string(), test_table.clone())
.register_table_sync(table_name.to_string(), test_table.clone())
.unwrap()
.is_none());
assert!(provider.table_exist(table_name).unwrap());
assert!(provider.table_exist_sync(table_name).unwrap());
// rename test table
let new_table_name = "numbers";
provider
.rename_table(table_name, new_table_name.to_string())
.rename_table_sync(table_name, new_table_name.to_string())
.unwrap();
// test old table name not exist
assert!(!provider.table_exist(table_name).unwrap());
assert!(provider.deregister_table(table_name).unwrap().is_none());
assert!(!provider.table_exist_sync(table_name).unwrap());
provider.deregister_table_sync(table_name).unwrap();
// test new table name exists
assert!(provider.table_exist(new_table_name).unwrap());
assert!(provider.table_exist_sync(new_table_name).unwrap());
let registered_table = provider.table(new_table_name).await.unwrap().unwrap();
assert_eq!(
registered_table.table_info().ident.table_id,
@@ -435,7 +475,9 @@ mod tests {
);
let other_table = Arc::new(NumbersTable::new(2));
let result = provider.register_table(new_table_name.to_string(), other_table);
let result = provider
.register_table(new_table_name.to_string(), other_table)
.await;
let err = result.err().unwrap();
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}
@@ -445,6 +487,7 @@ mod tests {
let catalog = MemoryCatalogManager::default();
let schema = catalog
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
@@ -460,7 +503,7 @@ mod tests {
table,
};
assert!(catalog.register_table(register_table_req).await.unwrap());
assert!(schema.table_exist(table_name).unwrap());
assert!(schema.table_exist(table_name).await.unwrap());
// rename table
let new_table_name = "numbers_new";
@@ -472,8 +515,8 @@ mod tests {
table_id,
};
assert!(catalog.rename_table(rename_table_req).await.unwrap());
assert!(!schema.table_exist(table_name).unwrap());
assert!(schema.table_exist(new_table_name).unwrap());
assert!(!schema.table_exist(table_name).await.unwrap());
assert!(schema.table_exist(new_table_name).await.unwrap());
let registered_table = catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
@@ -507,6 +550,7 @@ mod tests {
let catalog = MemoryCatalogManager::default();
let schema = catalog
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
@@ -518,7 +562,7 @@ mod tests {
table: Arc::new(NumbersTable::default()),
};
catalog.register_table(register_table_req).await.unwrap();
assert!(schema.table_exist("numbers").unwrap());
assert!(schema.table_exist("numbers").await.unwrap());
let deregister_table_req = DeregisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
@@ -529,6 +573,6 @@ mod tests {
.deregister_table(deregister_table_req)
.await
.unwrap();
assert!(!schema.table_exist("numbers").unwrap());
assert!(!schema.table_exist("numbers").await.unwrap());
}
}

View File

@@ -17,19 +17,17 @@ use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;
use arc_swap::ArcSwap;
use async_stream::stream;
use async_trait::async_trait;
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, warn};
use dashmap::DashMap;
use futures::Stream;
use futures_util::{StreamExt, TryStreamExt};
use key_lock::KeyLock;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
use table::engine::EngineContext;
use table::engine::{EngineContext, TableReference};
use table::metadata::TableId;
use table::requests::{CreateTableRequest, OpenTableRequest};
use table::TableRef;
@@ -41,13 +39,13 @@ use crate::error::{
TableExistsSnafu, UnimplementedSnafu,
};
use crate::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
CATALOG_KEY_PREFIX,
build_catalog_prefix, build_schema_prefix, build_table_global_prefix,
build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey,
TableGlobalValue, TableRegionalKey, TableRegionalValue, CATALOG_KEY_PREFIX,
};
use crate::remote::{Kv, KvBackendRef};
use crate::{
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
handle_system_table_request, CatalogManager, CatalogProvider, CatalogProviderRef,
DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef,
};
@@ -72,19 +70,12 @@ impl RemoteCatalogManager {
}
}
fn build_catalog_key(&self, catalog_name: impl AsRef<str>) -> CatalogKey {
CatalogKey {
catalog_name: catalog_name.as_ref().to_string(),
}
}
fn new_catalog_provider(&self, catalog_name: &str) -> CatalogProviderRef {
Arc::new(RemoteCatalogProvider {
node_id: self.node_id,
catalog_name: catalog_name.to_string(),
backend: self.backend.clone(),
schemas: Default::default(),
mutex: Default::default(),
engine_manager: self.engine_manager.clone(),
}) as _
}
@@ -92,10 +83,9 @@ impl RemoteCatalogManager {
Arc::new(RemoteSchemaProvider {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
tables: Default::default(),
node_id: self.node_id,
backend: self.backend.clone(),
mutex: Default::default(),
engine_manager: self.engine_manager.clone(),
}) as _
}
@@ -217,10 +207,12 @@ impl RemoteCatalogManager {
..
} = r?;
info!("Found schema: {}.{}", catalog_name, schema_name);
let schema = match catalog.schema(&schema_name)? {
let schema = match catalog.schema(&schema_name).await? {
None => {
let schema = self.new_schema_provider(&catalog_name, &schema_name);
catalog.register_schema(schema_name.clone(), schema.clone())?;
catalog
.register_schema(schema_name.clone(), schema.clone())
.await?;
info!("Registered schema: {}", &schema_name);
schema
}
@@ -266,7 +258,7 @@ impl RemoteCatalogManager {
let table_info = table_ref.table_info();
let table_name = &table_info.name;
let table_id = table_info.ident.table_id;
schema.register_table(table_name.clone(), table_ref)?;
schema.register_table(table_name.clone(), table_ref).await?;
info!("Registered table {}", table_name);
max_table_id = max_table_id.max(table_id);
table_num += 1;
@@ -287,7 +279,9 @@ impl RemoteCatalogManager {
let schema_provider = self.new_schema_provider(catalog_name, schema_name);
let catalog_provider = self.new_catalog_provider(catalog_name);
catalog_provider.register_schema(schema_name.to_string(), schema_provider.clone())?;
catalog_provider
.register_schema(schema_name.to_string(), schema_provider.clone())
.await?;
let schema_key = SchemaKey {
catalog_name: catalog_name.to_string(),
@@ -438,56 +432,85 @@ impl CatalogManager for RemoteCatalogManager {
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let catalog_name = request.catalog;
let schema_name = request.schema;
let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu {
catalog_name: &catalog_name,
})?;
let schema_provider =
catalog_provider
.schema(&schema_name)?
.with_context(|| SchemaNotFoundSnafu {
catalog: &catalog_name,
schema: &schema_name,
})?;
if schema_provider.table_exist(&request.table_name)? {
let schema_provider = self
.catalog(&catalog_name)
.await?
.context(CatalogNotFoundSnafu {
catalog_name: &catalog_name,
})?
.schema(&schema_name)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: &catalog_name,
schema: &schema_name,
})?;
if schema_provider.table_exist(&request.table_name).await? {
return TableExistsSnafu {
table: format!("{}.{}.{}", &catalog_name, &schema_name, &request.table_name),
}
.fail();
}
schema_provider.register_table(request.table_name, request.table)?;
schema_provider
.register_table(request.table_name, request.table)
.await?;
Ok(true)
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
let catalog_name = &request.catalog;
let schema_name = &request.schema;
let schema = self
.schema(catalog_name, schema_name)?
let result = self
.schema(catalog_name, schema_name)
.await?
.context(SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?;
let result = schema.deregister_table(&request.table_name)?;
})?
.deregister_table(&request.table_name)
.await?;
Ok(result.is_none())
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
let catalog_name = request.catalog;
let schema_name = request.schema;
let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu {
catalog_name: &catalog_name,
})?;
let catalog_provider =
self.catalog(&catalog_name)
.await?
.context(CatalogNotFoundSnafu {
catalog_name: &catalog_name,
})?;
let schema_provider = self.new_schema_provider(&catalog_name, &schema_name);
catalog_provider.register_schema(schema_name, schema_provider)?;
catalog_provider
.register_schema(schema_name, schema_provider)
.await?;
Ok(true)
}
async fn rename_table(&self, _request: RenameTableRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "rename table",
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
let old_table_key = TableRegionalKey {
catalog_name: request.catalog.clone(),
schema_name: request.schema.clone(),
table_name: request.table_name.clone(),
node_id: self.node_id,
}
.fail()
.to_string();
let Some(Kv(_, value_bytes)) = self.backend.get(old_table_key.as_bytes()).await? else {
return Ok(false)
};
let new_table_key = TableRegionalKey {
catalog_name: request.catalog.clone(),
schema_name: request.schema.clone(),
table_name: request.new_table_name,
node_id: self.node_id,
};
self.backend
.set(new_table_key.to_string().as_bytes(), &value_bytes)
.await?;
self.backend
.delete(old_table_key.to_string().as_bytes())
.await?;
Ok(true)
}
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
@@ -496,12 +519,14 @@ impl CatalogManager for RemoteCatalogManager {
Ok(())
}
fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
self.catalog(catalog)?
async fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
self.catalog(catalog)
.await?
.context(CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.schema(schema)
.await
}
async fn table(
@@ -511,114 +536,67 @@ impl CatalogManager for RemoteCatalogManager {
table_name: &str,
) -> Result<Option<TableRef>> {
let catalog = self
.catalog(catalog_name)?
.catalog(catalog_name)
.await?
.with_context(|| CatalogNotFoundSnafu { catalog_name })?;
let schema = catalog
.schema(schema_name)?
.schema(schema_name)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?;
schema.table(table_name).await
}
}
impl CatalogList for RemoteCatalogManager {
fn as_any(&self) -> &dyn Any {
self
async fn catalog(&self, catalog: &str) -> Result<Option<CatalogProviderRef>> {
let key = CatalogKey {
catalog_name: catalog.to_string(),
};
Ok(self
.backend
.get(key.to_string().as_bytes())
.await?
.map(|_| self.new_catalog_provider(catalog)))
}
fn register_catalog(
&self,
name: String,
catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>> {
let key = self.build_catalog_key(&name).to_string();
let backend = self.backend.clone();
let catalogs = self.catalogs.clone();
async fn catalog_names(&self) -> Result<Vec<String>> {
let mut stream = self.backend.range(CATALOG_KEY_PREFIX.as_bytes());
let mut catalogs = HashSet::new();
std::thread::spawn(|| {
common_runtime::block_on_write(async move {
backend
.set(
key.as_bytes(),
&CatalogValue {}
.as_bytes()
.context(InvalidCatalogValueSnafu)?,
)
.await?;
while let Some(catalog) = stream.next().await {
if let Ok(catalog) = catalog {
let catalog_key = String::from_utf8_lossy(&catalog.0);
let catalogs = catalogs.read();
let prev = catalogs.insert(name, catalog.clone());
Ok(prev)
})
})
.join()
.unwrap()
}
/// List all catalogs from metasrv
fn catalog_names(&self) -> Result<Vec<String>> {
let catalogs = self.catalogs.read();
Ok(catalogs.iter().map(|k| k.key().to_string()).collect())
}
/// Read catalog info of given name from metasrv.
fn catalog(&self, name: &str) -> Result<Option<CatalogProviderRef>> {
{
let catalogs = self.catalogs.read();
let catalog = catalogs.get(name);
if let Some(catalog) = catalog {
return Ok(Some(catalog.clone()));
if let Ok(key) = CatalogKey::parse(&catalog_key) {
catalogs.insert(key.catalog_name);
}
}
}
let catalogs = self.catalogs.write();
Ok(catalogs.into_iter().collect())
}
let catalog = catalogs.get(name);
if let Some(catalog) = catalog {
return Ok(Some(catalog.clone()));
}
async fn register_catalog(
&self,
name: String,
_catalog: CatalogProviderRef,
) -> Result<Option<CatalogProviderRef>> {
let key = CatalogKey { catalog_name: name }.to_string();
// TODO(hl): use compare_and_swap to prevent concurrent update
self.backend
.set(
key.as_bytes(),
&CatalogValue {}
.as_bytes()
.context(InvalidCatalogValueSnafu)?,
)
.await?;
Ok(None)
}
// It's for lack of incremental catalog syncing between datanode and meta. Here we fetch catalog
// from meta on demand. This can be removed when incremental catalog syncing is done in datanode.
let backend = self.backend.clone();
let catalogs_from_meta: HashSet<String> = std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let mut stream = backend.range(CATALOG_KEY_PREFIX.as_bytes());
let mut catalogs = HashSet::new();
while let Some(catalog) = stream.next().await {
if let Ok(catalog) = catalog {
let catalog_key = String::from_utf8_lossy(&catalog.0);
if let Ok(key) = CatalogKey::parse(&catalog_key) {
catalogs.insert(key.catalog_name);
}
}
}
catalogs
})
})
.join()
.unwrap();
catalogs.retain(|catalog_name, _| catalogs_from_meta.get(catalog_name).is_some());
for catalog in catalogs_from_meta {
catalogs
.entry(catalog.clone())
.or_insert(self.new_catalog_provider(&catalog));
}
let catalog = catalogs.get(name);
Ok(catalog.as_deref().cloned())
fn as_any(&self) -> &dyn Any {
self
}
}
@@ -626,119 +604,91 @@ pub struct RemoteCatalogProvider {
node_id: u64,
catalog_name: String,
backend: KvBackendRef,
schemas: Arc<ArcSwap<HashMap<String, SchemaProviderRef>>>,
mutex: Arc<Mutex<()>>,
engine_manager: TableEngineManagerRef,
}
impl RemoteCatalogProvider {
pub fn new(catalog_name: String, backend: KvBackendRef, node_id: u64) -> Self {
pub fn new(
catalog_name: String,
backend: KvBackendRef,
engine_manager: TableEngineManagerRef,
node_id: u64,
) -> Self {
Self {
node_id,
catalog_name,
backend,
schemas: Default::default(),
mutex: Default::default(),
engine_manager,
}
}
pub fn refresh_schemas(&self) -> Result<()> {
let schemas = self.schemas.clone();
let schema_prefix = build_schema_prefix(&self.catalog_name);
let catalog_name = self.catalog_name.clone();
let mutex = self.mutex.clone();
let backend = self.backend.clone();
let node_id = self.node_id;
std::thread::spawn(move || {
common_runtime::block_on_write(async move {
let _guard = mutex.lock().await;
let prev_schemas = schemas.load();
let mut new_schemas = HashMap::with_capacity(prev_schemas.len() + 1);
new_schemas.clone_from(&prev_schemas);
let mut remote_schemas = backend.range(schema_prefix.as_bytes());
while let Some(r) = remote_schemas.next().await {
let Kv(k, _) = r?;
let schema_key = SchemaKey::parse(&String::from_utf8_lossy(&k))
.context(InvalidCatalogValueSnafu)?;
if !new_schemas.contains_key(&schema_key.schema_name) {
new_schemas.insert(
schema_key.schema_name.clone(),
Arc::new(RemoteSchemaProvider::new(
catalog_name.clone(),
schema_key.schema_name,
node_id,
backend.clone(),
)),
);
}
}
schemas.store(Arc::new(new_schemas));
Ok(())
})
})
.join()
.unwrap()?;
Ok(())
}
fn build_schema_key(&self, schema_name: impl AsRef<str>) -> SchemaKey {
SchemaKey {
catalog_name: self.catalog_name.clone(),
schema_name: schema_name.as_ref().to_string(),
}
}
fn build_schema_provider(&self, schema_name: &str) -> SchemaProviderRef {
let provider = RemoteSchemaProvider {
catalog_name: self.catalog_name.clone(),
schema_name: schema_name.to_string(),
node_id: self.node_id,
backend: self.backend.clone(),
engine_manager: self.engine_manager.clone(),
};
Arc::new(provider) as Arc<_>
}
}
#[async_trait::async_trait]
impl CatalogProvider for RemoteCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Result<Vec<String>> {
self.refresh_schemas()?;
Ok(self.schemas.load().keys().cloned().collect::<Vec<_>>())
async fn schema_names(&self) -> Result<Vec<String>> {
let schema_prefix = build_schema_prefix(&self.catalog_name);
let remote_schemas = self.backend.range(schema_prefix.as_bytes());
let res = remote_schemas
.map(|kv| {
let Kv(k, _) = kv?;
let schema_key = SchemaKey::parse(String::from_utf8_lossy(&k))
.context(InvalidCatalogValueSnafu)?;
Ok(schema_key.schema_name)
})
.try_collect()
.await?;
Ok(res)
}
fn register_schema(
async fn register_schema(
&self,
name: String,
schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>> {
let _ = schema; // we don't care about schema provider
let key = self.build_schema_key(&name).to_string();
let backend = self.backend.clone();
let mutex = self.mutex.clone();
let schemas = self.schemas.clone();
std::thread::spawn(|| {
common_runtime::block_on_write(async move {
let _guard = mutex.lock().await;
backend
.set(
key.as_bytes(),
&SchemaValue {}
.as_bytes()
.context(InvalidCatalogValueSnafu)?,
)
.await?;
let prev_schemas = schemas.load();
let mut new_schemas = HashMap::with_capacity(prev_schemas.len() + 1);
new_schemas.clone_from(&prev_schemas);
let prev_schema = new_schemas.insert(name, schema);
schemas.store(Arc::new(new_schemas));
Ok(prev_schema)
})
})
.join()
.unwrap()
self.backend
.set(
key.as_bytes(),
&SchemaValue {}
.as_bytes()
.context(InvalidCatalogValueSnafu)?,
)
.await?;
// TODO(hl): maybe return preview schema by cas
Ok(None)
}
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
// TODO(hl): We should refresh whole catalog before calling datafusion's query engine.
self.refresh_schemas()?;
Ok(self.schemas.load().get(name).cloned())
async fn schema(&self, name: &str) -> Result<Option<SchemaProviderRef>> {
let key = self.build_schema_key(name).to_string();
Ok(self
.backend
.get(key.as_bytes())
.await?
.map(|_| self.build_schema_provider(name)))
}
}
@@ -747,8 +697,7 @@ pub struct RemoteSchemaProvider {
schema_name: String,
node_id: u64,
backend: KvBackendRef,
tables: Arc<ArcSwap<DashMap<String, TableRef>>>,
mutex: Arc<KeyLock<String>>,
engine_manager: TableEngineManagerRef,
}
impl RemoteSchemaProvider {
@@ -756,6 +705,7 @@ impl RemoteSchemaProvider {
catalog_name: String,
schema_name: String,
node_id: u64,
engine_manager: TableEngineManagerRef,
backend: KvBackendRef,
) -> Self {
Self {
@@ -763,8 +713,7 @@ impl RemoteSchemaProvider {
schema_name,
node_id,
backend,
tables: Default::default(),
mutex: Default::default(),
engine_manager,
}
}
@@ -784,89 +733,132 @@ impl SchemaProvider for RemoteSchemaProvider {
self
}
fn table_names(&self) -> Result<Vec<String>> {
Ok(self
.tables
.load()
.iter()
.map(|en| en.key().clone())
.collect::<Vec<_>>())
async fn table_names(&self) -> Result<Vec<String>> {
let key_prefix = build_table_regional_prefix(&self.catalog_name, &self.schema_name);
let iter = self.backend.range(key_prefix.as_bytes());
let table_names = iter
.map(|kv| {
let Kv(key, _) = kv?;
let regional_key = TableRegionalKey::parse(String::from_utf8_lossy(&key))
.context(InvalidCatalogValueSnafu)?;
Ok(regional_key.table_name)
})
.try_collect()
.await?;
Ok(table_names)
}
async fn table(&self, name: &str) -> Result<Option<TableRef>> {
Ok(self.tables.load().get(name).map(|en| en.value().clone()))
let key = self.build_regional_table_key(name).to_string();
let table_opt = self
.backend
.get(key.as_bytes())
.await?
.map(|Kv(_, v)| {
let TableRegionalValue { engine_name, .. } =
TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
let reference = TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: name,
};
let engine_name = engine_name.as_deref().unwrap_or(MITO_ENGINE);
let engine = self
.engine_manager
.engine(engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?;
let table = engine
.get_table(&EngineContext {}, &reference)
.with_context(|_| OpenTableSnafu {
table_info: reference.to_string(),
})?;
Ok(table)
})
.transpose()?
.flatten();
Ok(table_opt)
}
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
async fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
let table_info = table.table_info();
let table_version = table_info.ident.version;
let table_value = TableRegionalValue {
version: table_version,
regions_ids: table.table_info().meta.region_numbers.clone(),
engine_name: Some(table_info.meta.engine.clone()),
};
let backend = self.backend.clone();
let mutex = self.mutex.clone();
let tables = self.tables.clone();
let table_key = self.build_regional_table_key(&name).to_string();
self.backend
.set(
table_key.as_bytes(),
&table_value.as_bytes().context(InvalidCatalogValueSnafu)?,
)
.await?;
debug!(
"Successfully set catalog table entry, key: {}, table value: {:?}",
table_key, table_value
);
let prev = std::thread::spawn(move || {
common_runtime::block_on_read(async move {
let _guard = mutex.lock(table_key.clone()).await;
backend
.set(
table_key.as_bytes(),
&table_value.as_bytes().context(InvalidCatalogValueSnafu)?,
)
.await?;
debug!(
"Successfully set catalog table entry, key: {}, table value: {:?}",
table_key, table_value
);
let tables = tables.load();
let prev = tables.insert(name, table);
Ok(prev)
})
})
.join()
.unwrap();
prev
// TODO(hl): retrieve prev table info using cas
Ok(None)
}
fn rename_table(&self, _name: &str, _new_name: String) -> Result<TableRef> {
async fn rename_table(&self, _name: &str, _new_name: String) -> Result<TableRef> {
UnimplementedSnafu {
operation: "rename table",
}
.fail()
}
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
let table_name = name.to_string();
let table_key = self.build_regional_table_key(&table_name).to_string();
let backend = self.backend.clone();
let mutex = self.mutex.clone();
let tables = self.tables.clone();
let prev = std::thread::spawn(move || {
common_runtime::block_on_read(async move {
let _guard = mutex.lock(table_key.clone()).await;
backend.delete(table_key.as_bytes()).await?;
debug!(
"Successfully deleted catalog table entry, key: {}",
table_key
);
async fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
let table_key = self.build_regional_table_key(name).to_string();
let tables = tables.load();
let prev = tables.remove(&table_name).map(|en| en.1);
Ok(prev)
let engine_opt = self
.backend
.get(table_key.as_bytes())
.await?
.map(|Kv(_, v)| {
let TableRegionalValue { engine_name, .. } =
TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
Ok(engine_name)
})
})
.join()
.unwrap();
prev
.transpose()?
.flatten();
let engine_name = engine_opt.as_deref().unwrap_or_else(|| {
warn!("Cannot find table engine name for {table_key}");
MITO_ENGINE
});
self.backend.delete(table_key.as_bytes()).await?;
debug!(
"Successfully deleted catalog table entry, key: {}",
table_key
);
let reference = TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: name,
};
// deregistering table does not necessarily mean dropping the table
let table = self
.engine_manager
.engine(engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?
.get_table(&EngineContext {}, &reference)
.with_context(|_| OpenTableSnafu {
table_info: reference.to_string(),
})?;
Ok(table)
}
/// Checks if table exists in schema provider based on locally opened table map.
fn table_exist(&self, name: &str) -> Result<bool> {
Ok(self.tables.load().contains_key(name))
async fn table_exist(&self, name: &str) -> Result<bool> {
let key = self.build_regional_table_key(name).to_string();
Ok(self.backend.get(key.as_bytes()).await?.is_some())
}
}

View File

@@ -28,14 +28,14 @@ pub trait SchemaProvider: Sync + Send {
fn as_any(&self) -> &dyn Any;
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Result<Vec<String>>;
async fn table_names(&self) -> Result<Vec<String>>;
/// Retrieves a specific table from the schema by name, provided it exists.
async fn table(&self, name: &str) -> Result<Option<TableRef>>;
/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
fn register_table(&self, name: String, _table: TableRef) -> Result<Option<TableRef>> {
async fn register_table(&self, name: String, _table: TableRef) -> Result<Option<TableRef>> {
NotSupportedSnafu {
op: format!("register_table({name}, <table>)"),
}
@@ -44,7 +44,7 @@ pub trait SchemaProvider: Sync + Send {
/// If supported by the implementation, renames an existing table from this schema and returns it.
/// If no table of that name exists, returns "Table not found" error.
fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
async fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
NotSupportedSnafu {
op: format!("rename_table({name}, {new_name})"),
}
@@ -53,7 +53,7 @@ pub trait SchemaProvider: Sync + Send {
/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
async fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
NotSupportedSnafu {
op: format!("deregister_table({name})"),
}
@@ -63,7 +63,7 @@ pub trait SchemaProvider: Sync + Send {
/// If supported by the implementation, checks the table exist in the schema provider or not.
/// If no matched table in the schema provider, return false.
/// Otherwise, return true.
fn table_exist(&self, name: &str) -> Result<bool>;
async fn table_exist(&self, name: &str) -> Result<bool>;
}
pub type SchemaProviderRef = Arc<dyn SchemaProvider>;

View File

@@ -28,10 +28,10 @@ use crate::error::{
CatalogNotFoundSnafu, QueryAccessDeniedSnafu, Result, SchemaNotFoundSnafu, TableNotExistSnafu,
};
use crate::information_schema::InformationSchemaProvider;
use crate::CatalogListRef;
use crate::CatalogManagerRef;
pub struct DfTableSourceProvider {
catalog_list: CatalogListRef,
catalog_manager: CatalogManagerRef,
resolved_tables: HashMap<String, Arc<dyn TableSource>>,
disallow_cross_schema_query: bool,
default_catalog: String,
@@ -40,12 +40,12 @@ pub struct DfTableSourceProvider {
impl DfTableSourceProvider {
pub fn new(
catalog_list: CatalogListRef,
catalog_manager: CatalogManagerRef,
disallow_cross_schema_query: bool,
query_ctx: &QueryContext,
) -> Self {
Self {
catalog_list,
catalog_manager,
disallow_cross_schema_query,
resolved_tables: HashMap::new(),
default_catalog: query_ctx.current_catalog(),
@@ -104,17 +104,22 @@ impl DfTableSourceProvider {
let schema = if schema_name != INFORMATION_SCHEMA_NAME {
let catalog = self
.catalog_list
.catalog(catalog_name)?
.catalog_manager
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
catalog.schema(schema_name)?.context(SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?
catalog
.schema(schema_name)
.await?
.context(SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?
} else {
let catalog_provider = self
.catalog_list
.catalog(catalog_name)?
.catalog_manager
.catalog(catalog_name)
.await?
.context(CatalogNotFoundSnafu { catalog_name })?;
Arc::new(InformationSchemaProvider::new(
catalog_name.to_string(),

View File

@@ -40,7 +40,7 @@ impl SchemaProvider for InformationSchema {
self
}
fn table_names(&self) -> Result<Vec<String>, Error> {
async fn table_names(&self) -> Result<Vec<String>, Error> {
Ok(vec![SYSTEM_CATALOG_TABLE_NAME.to_string()])
}
@@ -52,7 +52,7 @@ impl SchemaProvider for InformationSchema {
}
}
fn table_exist(&self, name: &str) -> Result<bool, Error> {
async fn table_exist(&self, name: &str) -> Result<bool, Error> {
Ok(name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME))
}
}
@@ -116,16 +116,17 @@ impl SystemCatalog {
}
}
#[async_trait::async_trait]
impl CatalogProvider for SystemCatalog {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Result<Vec<String>, Error> {
async fn schema_names(&self) -> Result<Vec<String>, Error> {
Ok(vec![INFORMATION_SCHEMA_NAME.to_string()])
}
fn register_schema(
async fn register_schema(
&self,
_name: String,
_schema: SchemaProviderRef,
@@ -133,7 +134,7 @@ impl CatalogProvider for SystemCatalog {
panic!("System catalog does not support registering schema!")
}
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>, Error> {
async fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>, Error> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA_NAME) {
Ok(Some(self.information_schema.clone()))
} else {

View File

@@ -26,11 +26,11 @@ mod tests {
use catalog::remote::{
KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider,
};
use catalog::{CatalogList, CatalogManager, RegisterTableRequest};
use catalog::{CatalogManager, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use datatypes::schema::RawSchema;
use futures_util::StreamExt;
use table::engine::manager::MemoryTableEngineManager;
use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef};
use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;
@@ -78,35 +78,47 @@ mod tests {
async fn prepare_components(
node_id: u64,
) -> (KvBackendRef, TableEngineRef, Arc<RemoteCatalogManager>) {
) -> (
KvBackendRef,
TableEngineRef,
Arc<RemoteCatalogManager>,
TableEngineManagerRef,
) {
let backend = Arc::new(MockKvBackend::default()) as KvBackendRef;
let table_engine = Arc::new(MockTableEngine::default());
let engine_manager = Arc::new(MemoryTableEngineManager::alias(
MITO_ENGINE.to_string(),
table_engine.clone(),
));
let catalog_manager = RemoteCatalogManager::new(engine_manager, node_id, backend.clone());
let catalog_manager =
RemoteCatalogManager::new(engine_manager.clone(), node_id, backend.clone());
catalog_manager.start().await.unwrap();
(backend, table_engine, Arc::new(catalog_manager))
(
backend,
table_engine,
Arc::new(catalog_manager),
engine_manager as Arc<_>,
)
}
#[tokio::test]
async fn test_remote_catalog_default() {
common_telemetry::init_default_ut_logging();
let node_id = 42;
let (_, _, catalog_manager) = prepare_components(node_id).await;
let (_, _, catalog_manager, _) = prepare_components(node_id).await;
assert_eq!(
vec![DEFAULT_CATALOG_NAME.to_string()],
catalog_manager.catalog_names().unwrap()
catalog_manager.catalog_names().await.unwrap()
);
let default_catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.unwrap()
.unwrap();
assert_eq!(
vec![DEFAULT_SCHEMA_NAME.to_string()],
default_catalog.schema_names().unwrap()
default_catalog.schema_names().await.unwrap()
);
}
@@ -114,7 +126,7 @@ mod tests {
async fn test_remote_catalog_register_nonexistent() {
common_telemetry::init_default_ut_logging();
let node_id = 42;
let (_, table_engine, catalog_manager) = prepare_components(node_id).await;
let (_, table_engine, catalog_manager, _) = prepare_components(node_id).await;
// register a new table with an nonexistent catalog
let catalog_name = "nonexistent_catalog".to_string();
let schema_name = "nonexistent_schema".to_string();
@@ -159,18 +171,20 @@ mod tests {
#[tokio::test]
async fn test_register_table() {
let node_id = 42;
let (_, table_engine, catalog_manager) = prepare_components(node_id).await;
let (_, table_engine, catalog_manager, _) = prepare_components(node_id).await;
let default_catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.unwrap()
.unwrap();
assert_eq!(
vec![DEFAULT_SCHEMA_NAME.to_string()],
default_catalog.schema_names().unwrap()
default_catalog.schema_names().await.unwrap()
);
let default_schema = default_catalog
.schema(DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
@@ -208,31 +222,36 @@ mod tests {
table,
};
assert!(catalog_manager.register_table(reg_req).await.unwrap());
assert_eq!(vec![table_name], default_schema.table_names().unwrap());
assert_eq!(
vec![table_name],
default_schema.table_names().await.unwrap()
);
}
#[tokio::test]
async fn test_register_catalog_schema_table() {
let node_id = 42;
let (backend, table_engine, catalog_manager) = prepare_components(node_id).await;
let (backend, table_engine, catalog_manager, engine_manager) =
prepare_components(node_id).await;
let catalog_name = "test_catalog".to_string();
let schema_name = "nonexistent_schema".to_string();
let catalog = Arc::new(RemoteCatalogProvider::new(
catalog_name.clone(),
backend.clone(),
engine_manager.clone(),
node_id,
));
// register catalog to catalog manager
catalog_manager
.register_catalog(catalog_name.clone(), catalog)
CatalogManager::register_catalog(&*catalog_manager, catalog_name.clone(), catalog)
.await
.unwrap();
assert_eq!(
HashSet::<String>::from_iter(
vec![DEFAULT_CATALOG_NAME.to_string(), catalog_name.clone()].into_iter()
),
HashSet::from_iter(catalog_manager.catalog_names().unwrap().into_iter())
HashSet::from_iter(catalog_manager.catalog_names().await.unwrap().into_iter())
);
let table_to_register = table_engine
@@ -273,24 +292,32 @@ mod tests {
let new_catalog = catalog_manager
.catalog(&catalog_name)
.await
.unwrap()
.expect("catalog should exist since it's already registered");
let schema = Arc::new(RemoteSchemaProvider::new(
catalog_name.clone(),
schema_name.clone(),
node_id,
engine_manager,
backend.clone(),
));
let prev = new_catalog
.register_schema(schema_name.clone(), schema.clone())
.await
.expect("Register schema should not fail");
assert!(prev.is_none());
assert!(catalog_manager.register_table(reg_req).await.unwrap());
assert_eq!(
HashSet::from([schema_name.clone()]),
new_catalog.schema_names().unwrap().into_iter().collect()
new_catalog
.schema_names()
.await
.unwrap()
.into_iter()
.collect()
)
}
}

View File

@@ -35,7 +35,8 @@ partition = { path = "../partition" }
query = { path = "../query" }
rustyline = "10.1"
serde.workspace = true
servers = { path = "../servers" }
servers = { path = "../servers", features = ["dashboard"] }
session = { path = "../session" }
snafu.workspace = true
substrait = { path = "../common/substrait" }

View File

@@ -16,18 +16,18 @@ use std::sync::Arc;
use async_recursion::async_recursion;
use async_trait::async_trait;
use bytes::{Buf, Bytes, BytesMut};
use bytes::{Buf, Bytes};
use catalog::table_source::DfTableSourceProvider;
use catalog::CatalogManagerRef;
use common_catalog::format_full_table_name;
use common_telemetry::debug;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::catalog::catalog::CatalogList;
use datafusion::common::{DFField, DFSchema};
use datafusion::datasource::DefaultTableSource;
use datafusion::physical_plan::project_schema;
use datafusion::sql::TableReference;
use datafusion_expr::{Filter, LogicalPlan, TableScan};
use prost::Message;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use substrait_proto::proto::expression::mask_expression::{StructItem, StructSelect};
@@ -42,9 +42,9 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::context::ConvertorContext;
use crate::df_expr::{expression_from_df_expr, to_df_expr};
use crate::error::{
self, DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, Error,
InvalidParametersSnafu, MissingFieldSnafu, ResolveTableSnafu, SchemaNotMatchSnafu,
UnknownPlanSnafu, UnsupportedExprSnafu, UnsupportedPlanSnafu,
self, DFInternalSnafu, EmptyPlanSnafu, Error, InvalidParametersSnafu, MissingFieldSnafu,
ResolveTableSnafu, SchemaNotMatchSnafu, UnknownPlanSnafu, UnsupportedExprSnafu,
UnsupportedPlanSnafu,
};
use crate::schema::{from_schema, to_schema};
use crate::SubstraitPlan;
@@ -59,20 +59,14 @@ impl SubstraitPlan for DFLogicalSubstraitConvertorDeprecated {
async fn decode<B: Buf + Send>(
&self,
message: B,
catalog_manager: CatalogManagerRef,
_message: B,
_catalog_list: Arc<dyn CatalogList>,
) -> Result<Self::Plan, Self::Error> {
let plan = Plan::decode(message).context(DecodeRelSnafu)?;
self.convert_plan(plan, catalog_manager).await
unimplemented!()
}
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error> {
let plan = self.convert_df_plan(plan)?;
let mut buf = BytesMut::new();
plan.encode(&mut buf).context(EncodeRelSnafu)?;
Ok(buf.freeze())
unimplemented!()
}
}
@@ -538,112 +532,3 @@ fn same_schema_without_metadata(lhs: &ArrowSchemaRef, rhs: &ArrowSchemaRef) -> b
&& x.is_nullable() == y.is_nullable()
})
}
#[cfg(test)]
mod test {
use catalog::local::{LocalCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use datafusion::common::{DFSchema, ToDFSchema};
use datafusion_expr::TableSource;
use datatypes::schema::RawSchema;
use table::engine::manager::MemoryTableEngineManager;
use table::requests::CreateTableRequest;
use table::test_util::{EmptyTable, MockTableEngine};
use super::*;
use crate::schema::test::supported_types;
const DEFAULT_TABLE_NAME: &str = "SubstraitTable";
async fn build_mock_catalog_manager() -> CatalogManagerRef {
let mock_table_engine = Arc::new(MockTableEngine::new());
let engine_manager = Arc::new(MemoryTableEngineManager::alias(
MITO_ENGINE.to_string(),
mock_table_engine.clone(),
));
let catalog_manager = Arc::new(LocalCatalogManager::try_new(engine_manager).await.unwrap());
let schema_provider = Arc::new(MemorySchemaProvider::new());
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_manager
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
catalog_manager.init().await.unwrap();
catalog_manager
}
fn build_create_table_request<N: ToString>(table_name: N) -> CreateTableRequest {
CreateTableRequest {
id: 1,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
desc: None,
schema: RawSchema::new(supported_types()),
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: true,
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
}
}
async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) {
let convertor = DFLogicalSubstraitConvertorDeprecated;
let proto = convertor.encode(plan.clone()).unwrap();
let tripped_plan = convertor.decode(proto, catalog).await.unwrap();
assert_eq!(format!("{plan:?}"), format!("{tripped_plan:?}"));
}
#[tokio::test]
async fn test_table_scan() {
let catalog_manager = build_mock_catalog_manager().await;
let table_ref = Arc::new(EmptyTable::new(build_create_table_request(
DEFAULT_TABLE_NAME,
)));
catalog_manager
.register_table(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: DEFAULT_TABLE_NAME.to_string(),
table_id: 1,
table: table_ref.clone(),
})
.await
.unwrap();
let adapter = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(table_ref),
)));
let projection = vec![1, 3, 5];
let df_schema = adapter.schema().to_dfschema().unwrap();
let projected_fields = projection
.iter()
.map(|index| df_schema.field(*index).clone())
.collect();
let projected_schema =
Arc::new(DFSchema::new_with_metadata(projected_fields, Default::default()).unwrap());
let table_name = TableReference::full(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
DEFAULT_TABLE_NAME,
);
let table_scan_plan = LogicalPlan::TableScan(TableScan {
table_name,
source: adapter,
projection: Some(projection),
projected_schema,
filters: vec![],
fetch: None,
});
logical_plan_round_trip(table_scan_plan, catalog_manager).await;
}
}

View File

@@ -16,13 +16,12 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Buf, Bytes, BytesMut};
use catalog::CatalogManagerRef;
use datafusion::catalog::catalog::CatalogList;
use datafusion::prelude::SessionContext;
use datafusion_expr::LogicalPlan;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
use prost::Message;
use query::datafusion::DfCatalogListAdapter;
use snafu::ResultExt;
use substrait_proto::proto::Plan;
@@ -40,13 +39,11 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
async fn decode<B: Buf + Send>(
&self,
message: B,
catalog_manager: CatalogManagerRef,
catalog_list: Arc<dyn CatalogList>,
) -> Result<Self::Plan, Self::Error> {
let mut context = SessionContext::new();
let df_catalog_list = Arc::new(DfCatalogListAdapter::new(catalog_manager));
context.register_catalog_list(df_catalog_list);
let plan = Plan::decode(message).context(DecodeRelSnafu)?;
context.register_catalog_list(catalog_list);
let df_plan = from_substrait_plan(&mut context, &plan)
.await
.context(DecodeDfPlanSnafu)?;

View File

@@ -17,15 +17,18 @@
mod context;
mod df_expr;
#[allow(unused)]
mod df_logical;
mod df_substrait;
pub mod error;
mod schema;
mod types;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use catalog::CatalogManagerRef;
use datafusion::catalog::catalog::CatalogList;
pub use crate::df_substrait::DFLogicalSubstraitConvertor;
@@ -38,7 +41,7 @@ pub trait SubstraitPlan {
async fn decode<B: Buf + Send>(
&self,
message: B,
catalog_manager: CatalogManagerRef,
catalog_list: Arc<dyn CatalogList>,
) -> Result<Self::Plan, Self::Error>;
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error>;

View File

@@ -275,10 +275,12 @@ impl Instance {
let schema_list = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?
.expect("Default schema not found")
.schema_names()
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
let flush_requests = schema_list

View File

@@ -12,12 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request as GrpcRequest;
use api::v1::query_request::Query;
use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequest};
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_query::Output;
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::catalog::schema::SchemaProvider;
use datafusion::datasource::TableProvider;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::SqlStatementExecutor;
@@ -28,11 +37,12 @@ use sql::statements::statement::Statement;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::engine::TableReference;
use table::requests::CreateDatabaseRequest;
use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
self, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu, DeleteSnafu,
ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, PlanStatementSnafu, Result,
TableNotFoundSnafu,
self, CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu,
DeleteSnafu, ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, PlanStatementSnafu, Result,
SchemaNotFoundSnafu, TableNotFoundSnafu,
};
use crate::instance::Instance;
@@ -49,9 +59,20 @@ impl Instance {
self.sql_handler.create_database(req, query_ctx).await
}
pub(crate) async fn execute_logical(&self, plan_bytes: Vec<u8>) -> Result<Output> {
pub(crate) async fn execute_logical(
&self,
plan_bytes: Vec<u8>,
ctx: QueryContextRef,
) -> Result<Output> {
let catalog_list = new_dummy_catalog_list(
&ctx.current_catalog(),
&ctx.current_schema(),
self.catalog_manager.clone(),
)
.await?;
let logical_plan = DFLogicalSubstraitConvertor
.decode(plan_bytes.as_slice(), self.catalog_manager.clone())
.decode(plan_bytes.as_slice(), Arc::new(catalog_list) as Arc<_>)
.await
.context(DecodeLogicalPlanSnafu)?;
@@ -85,7 +106,7 @@ impl Instance {
}
}
}
Query::LogicalPlan(plan) => self.execute_logical(plan).await,
Query::LogicalPlan(plan) => self.execute_logical(plan, ctx).await,
Query::PromRangeQuery(promql) => {
let prom_query = PromQuery {
query: promql.query,
@@ -185,6 +206,89 @@ impl GrpcQueryHandler for Instance {
}
}
struct DummySchemaProvider {
catalog: String,
schema: String,
table_names: Vec<String>,
catalog_manager: CatalogManagerRef,
}
impl DummySchemaProvider {
pub async fn try_new(
catalog_name: String,
schema_name: String,
catalog_manager: CatalogManagerRef,
) -> Result<Self> {
let catalog = catalog_manager
.catalog(&catalog_name)
.await
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: &catalog_name,
})?;
let schema = catalog
.schema(&schema_name)
.await
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu { name: &schema_name })?;
let table_names = schema.table_names().await.context(CatalogSnafu)?;
Ok(Self {
catalog: catalog_name,
schema: schema_name,
table_names,
catalog_manager,
})
}
}
#[async_trait::async_trait]
impl SchemaProvider for DummySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.table_names.clone()
}
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
self.catalog_manager
.table(&self.catalog, &self.schema, name)
.await
.context(CatalogSnafu)
.ok()
.flatten()
.map(|t| Arc::new(DfTableProviderAdapter::new(t)) as Arc<_>)
}
fn table_exist(&self, name: &str) -> bool {
self.table_names.iter().any(|t| t == name)
}
}
async fn new_dummy_catalog_list(
catalog_name: &str,
schema_name: &str,
catalog_manager: CatalogManagerRef,
) -> Result<MemoryCatalogList> {
let schema_provider = DummySchemaProvider::try_new(
catalog_name.to_string(),
schema_name.to_string(),
catalog_manager,
)
.await?;
let catalog_provider = MemoryCatalogProvider::new();
catalog_provider
.register_schema(schema_name, Arc::new(schema_provider) as Arc<_>)
.unwrap();
let catalog_list = MemoryCatalogList::new();
catalog_list.register_catalog(
catalog_name.to_string(),
Arc::new(catalog_provider) as Arc<_>,
);
Ok(catalog_list)
}
#[cfg(test)]
mod test {
use api::v1::column::{SemanticType, Values};

View File

@@ -49,6 +49,7 @@ impl SqlHandler {
if self
.catalog_manager
.schema(&catalog, &schema)
.await
.context(CatalogSnafu)?
.is_some()
{

View File

@@ -25,6 +25,7 @@ impl SqlHandler {
let schema = self
.catalog_manager
.schema(&req.catalog_name, &req.schema_name)
.await
.context(CatalogSnafu)?
.context(DatabaseNotFoundSnafu {
catalog: &req.catalog_name,
@@ -35,7 +36,7 @@ impl SqlHandler {
self.flush_table_inner(schema, table, req.region_number, req.wait)
.await?;
} else {
let all_table_names = schema.table_names().context(CatalogSnafu)?;
let all_table_names = schema.table_names().await.context(CatalogSnafu)?;
futures::future::join_all(all_table_names.iter().map(|table| {
self.flush_table_inner(schema.clone(), table, req.region_number, req.wait)
}))

View File

@@ -126,10 +126,12 @@ pub(crate) async fn create_test_table(
let schema_provider = instance
.catalog_manager
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
schema_provider
.register_table(table_name.to_string(), table)
.await
.unwrap();
Ok(())
}

View File

@@ -28,7 +28,7 @@ use catalog::helper::{
};
use catalog::remote::{Kv, KvBackendRef};
use catalog::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProvider, SchemaProviderRef,
};
@@ -36,6 +36,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_telemetry::warn;
use futures::StreamExt;
use futures_util::TryStreamExt;
use meta_client::rpc::TableName;
use partition::manager::PartitionRuleManagerRef;
use snafu::prelude::*;
@@ -99,6 +100,14 @@ impl CatalogManager for FrontendCatalogManager {
Ok(())
}
async fn register_catalog(
&self,
_name: String,
_catalog: CatalogProviderRef,
) -> CatalogResult<Option<CatalogProviderRef>> {
unimplemented!("Frontend catalog list does not support register catalog")
}
// TODO(LFC): Handle the table caching in (de)register_table.
async fn register_table(&self, _request: RegisterTableRequest) -> CatalogResult<bool> {
Ok(true)
@@ -216,16 +225,49 @@ impl CatalogManager for FrontendCatalogManager {
}
}
fn schema(
async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
let key = build_catalog_prefix();
let mut iter = self.backend.range(key.as_bytes());
let mut res = HashSet::new();
while let Some(r) = iter.next().await {
let Kv(k, _) = r?;
let catalog_key = String::from_utf8_lossy(&k);
if let Ok(key) = CatalogKey::parse(catalog_key.as_ref()) {
res.insert(key.catalog_name);
} else {
warn!("invalid catalog key: {:?}", catalog_key);
}
}
Ok(res.into_iter().collect())
}
async fn catalog(&self, catalog: &str) -> CatalogResult<Option<CatalogProviderRef>> {
let key = CatalogKey {
catalog_name: catalog.to_string(),
}
.to_string();
Ok(self.backend.get(key.as_bytes()).await?.map(|_| {
Arc::new(FrontendCatalogProvider {
catalog_name: catalog.to_string(),
backend: self.backend.clone(),
partition_manager: self.partition_manager.clone(),
datanode_clients: self.datanode_clients.clone(),
}) as Arc<_>
}))
}
async fn schema(
&self,
catalog: &str,
schema: &str,
) -> catalog::error::Result<Option<SchemaProviderRef>> {
self.catalog(catalog)?
self.catalog(catalog)
.await?
.context(catalog::error::CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.schema(schema)
.await
}
async fn table(
@@ -234,65 +276,16 @@ impl CatalogManager for FrontendCatalogManager {
schema: &str,
table_name: &str,
) -> catalog::error::Result<Option<TableRef>> {
self.schema(catalog, schema)?
self.schema(catalog, schema)
.await?
.context(catalog::error::SchemaNotFoundSnafu { catalog, schema })?
.table(table_name)
.await
}
}
impl CatalogList for FrontendCatalogManager {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
_name: String,
_catalog: CatalogProviderRef,
) -> catalog::error::Result<Option<CatalogProviderRef>> {
unimplemented!("Frontend catalog list does not support register catalog")
}
fn catalog_names(&self) -> catalog::error::Result<Vec<String>> {
let backend = self.backend.clone();
let res = std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let key = build_catalog_prefix();
let mut iter = backend.range(key.as_bytes());
let mut res = HashSet::new();
while let Some(r) = iter.next().await {
let Kv(k, _) = r?;
let catalog_key = String::from_utf8_lossy(&k);
if let Ok(key) = CatalogKey::parse(catalog_key.as_ref()) {
res.insert(key.catalog_name);
} else {
warn!("invalid catalog key: {:?}", catalog_key);
}
}
Ok(res.into_iter().collect())
})
})
.join()
.unwrap();
res
}
fn catalog(&self, name: &str) -> catalog::error::Result<Option<CatalogProviderRef>> {
let all_catalogs = self.catalog_names()?;
if all_catalogs.contains(&name.to_string()) {
Ok(Some(Arc::new(FrontendCatalogProvider {
catalog_name: name.to_string(),
backend: self.backend.clone(),
partition_manager: self.partition_manager.clone(),
datanode_clients: self.datanode_clients.clone(),
})))
} else {
Ok(None)
}
}
}
pub struct FrontendCatalogProvider {
@@ -302,35 +295,26 @@ pub struct FrontendCatalogProvider {
datanode_clients: Arc<DatanodeClients>,
}
#[async_trait::async_trait]
impl CatalogProvider for FrontendCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> catalog::error::Result<Vec<String>> {
let backend = self.backend.clone();
let catalog_name = self.catalog_name.clone();
let res = std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let key = build_schema_prefix(&catalog_name);
let mut iter = backend.range(key.as_bytes());
let mut res = HashSet::new();
while let Some(r) = iter.next().await {
let Kv(k, _) = r?;
let key = SchemaKey::parse(String::from_utf8_lossy(&k))
.context(InvalidCatalogValueSnafu)?;
res.insert(key.schema_name);
}
Ok(res.into_iter().collect())
})
})
.join()
.unwrap();
res
async fn schema_names(&self) -> catalog::error::Result<Vec<String>> {
let key = build_schema_prefix(&self.catalog_name);
let mut iter = self.backend.range(key.as_bytes());
let mut res = HashSet::new();
while let Some(r) = iter.next().await {
let Kv(k, _) = r?;
let key =
SchemaKey::parse(String::from_utf8_lossy(&k)).context(InvalidCatalogValueSnafu)?;
res.insert(key.schema_name);
}
Ok(res.into_iter().collect())
}
fn register_schema(
async fn register_schema(
&self,
_name: String,
_schema: SchemaProviderRef,
@@ -338,8 +322,8 @@ impl CatalogProvider for FrontendCatalogProvider {
unimplemented!("Frontend catalog provider does not support register schema")
}
fn schema(&self, name: &str) -> catalog::error::Result<Option<SchemaProviderRef>> {
let all_schemas = self.schema_names()?;
async fn schema(&self, name: &str) -> catalog::error::Result<Option<SchemaProviderRef>> {
let all_schemas = self.schema_names().await?;
if all_schemas.contains(&name.to_string()) {
Ok(Some(Arc::new(FrontendSchemaProvider {
catalog_name: self.catalog_name.clone(),
@@ -368,32 +352,24 @@ impl SchemaProvider for FrontendSchemaProvider {
self
}
fn table_names(&self) -> catalog::error::Result<Vec<String>> {
let backend = self.backend.clone();
let catalog_name = self.catalog_name.clone();
let schema_name = self.schema_name.clone();
std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let mut tables = vec![];
if catalog_name == DEFAULT_CATALOG_NAME && schema_name == DEFAULT_SCHEMA_NAME {
tables.push("numbers".to_string());
}
let key = build_table_global_prefix(catalog_name, schema_name);
let mut iter = backend.range(key.as_bytes());
while let Some(r) = iter.next().await {
let Kv(k, _) = r?;
let key = TableGlobalKey::parse(String::from_utf8_lossy(&k))
.context(InvalidCatalogValueSnafu)?;
tables.push(key.table_name);
}
Ok(tables)
async fn table_names(&self) -> catalog::error::Result<Vec<String>> {
let mut tables = vec![];
if self.catalog_name == DEFAULT_CATALOG_NAME && self.schema_name == DEFAULT_SCHEMA_NAME {
tables.push("numbers".to_string());
}
let key = build_table_global_prefix(&self.catalog_name, &self.schema_name);
let iter = self.backend.range(key.as_bytes());
let result = iter
.map(|r| {
let Kv(k, _) = r?;
let key = TableGlobalKey::parse(String::from_utf8_lossy(&k))
.context(InvalidCatalogValueSnafu)?;
Ok(key.table_name)
})
})
.join()
.unwrap()
.try_collect::<Vec<_>>()
.await?;
tables.extend(result);
Ok(tables)
}
async fn table(&self, name: &str) -> catalog::error::Result<Option<TableRef>> {
@@ -426,8 +402,8 @@ impl SchemaProvider for FrontendSchemaProvider {
Ok(Some(table))
}
fn table_exist(&self, name: &str) -> catalog::error::Result<bool> {
Ok(self.table_names()?.contains(&name.to_string()))
async fn table_exist(&self, name: &str) -> catalog::error::Result<bool> {
Ok(self.table_names().await?.contains(&name.to_string()))
}
}

View File

@@ -527,9 +527,10 @@ impl SqlQueryHandler for Instance {
}
}
fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
self.catalog_manager
.schema(catalog, schema)
.await
.map(|s| s.is_some())
.context(error::CatalogSnafu)
}

View File

@@ -421,6 +421,7 @@ impl DistInstance {
if self
.catalog_manager
.schema(&catalog, &expr.database_name)
.await
.context(CatalogSnafu)?
.is_some()
{

View File

@@ -86,11 +86,11 @@ impl StatementExecutor {
Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
Statement::Use(db) => self.handle_use(db, query_ctx),
Statement::Use(db) => self.handle_use(db, query_ctx).await,
Statement::ShowDatabases(stmt) => self.show_databases(stmt),
Statement::ShowDatabases(stmt) => self.show_databases(stmt).await,
Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx),
Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
Statement::Copy(stmt) => {
let req = to_copy_table_request(stmt, query_ctx)?;
@@ -126,11 +126,12 @@ impl StatementExecutor {
.context(ExecLogicalPlanSnafu)
}
fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
async fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
let catalog = &query_ctx.current_catalog();
ensure!(
self.catalog_manager
.schema(catalog, &db)
.await
.context(CatalogSnafu)?
.is_some(),
SchemaNotFoundSnafu { schema_info: &db }

View File

@@ -21,17 +21,19 @@ use crate::error::{ExecuteStatementSnafu, Result};
use crate::statement::StatementExecutor;
impl StatementExecutor {
pub(super) fn show_databases(&self, stmt: ShowDatabases) -> Result<Output> {
pub(super) async fn show_databases(&self, stmt: ShowDatabases) -> Result<Output> {
query::sql::show_databases(stmt, self.catalog_manager.clone())
.await
.context(ExecuteStatementSnafu)
}
pub(super) fn show_tables(
pub(super) async fn show_tables(
&self,
stmt: ShowTables,
query_ctx: QueryContextRef,
) -> Result<Output> {
query::sql::show_tables(stmt, self.catalog_manager.clone(), query_ctx)
.await
.context(ExecuteStatementSnafu)
}
}

View File

@@ -42,7 +42,7 @@ use snafu::prelude::*;
use store_api::storage::RegionNumber;
use table::error::TableOperationSnafu;
use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef};
use table::requests::{AlterTableRequest, DeleteRequest, InsertRequest};
use table::requests::{AlterKind, AlterTableRequest, DeleteRequest, InsertRequest};
use table::table::AlterContext;
use table::{meter_insert_request, Table};
use tokio::sync::RwLock;
@@ -261,6 +261,13 @@ impl DistTable {
.context(error::CatalogSnafu)
}
async fn delete_table_global_value(&self, key: TableGlobalKey) -> Result<()> {
self.backend
.delete(key.to_string().as_bytes())
.await
.context(error::CatalogSnafu)
}
async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> {
let alter_expr = context
.get::<AlterExpr>()
@@ -297,7 +304,17 @@ impl DistTable {
value.table_info = new_info.into();
self.set_table_global_value(key, value).await
if let AlterKind::RenameTable { new_table_name } = &request.alter_kind {
let new_key = TableGlobalKey {
catalog_name: alter_expr.catalog_name.clone(),
schema_name: alter_expr.schema_name.clone(),
table_name: new_table_name.clone(),
};
self.set_table_global_value(new_key, value).await?;
self.delete_table_global_value(key).await
} else {
self.set_table_global_value(key, value).await
}
}
/// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between

View File

@@ -22,7 +22,6 @@ use std::time::Duration;
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::remote::{MetaKvBackend, RemoteCatalogManager};
use catalog::CatalogProvider;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_runtime::Builder as RuntimeBuilder;
@@ -94,7 +93,7 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon
// create another catalog and schema for testing
let another_catalog = Arc::new(MemoryCatalogProvider::new());
let _ = another_catalog
.register_schema(
.register_schema_sync(
"another_schema".to_string(),
Arc::new(MemorySchemaProvider::new()),
)
@@ -102,6 +101,7 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon
let _ = dn_instance
.catalog_manager()
.register_catalog("another_catalog".to_string(), another_catalog)
.await
.unwrap();
dn_instance.start().await.unwrap();

View File

@@ -18,12 +18,13 @@ use std::sync::Arc;
use api::v1::meta::router_client::RouterClient;
use api::v1::meta::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, Location, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use crate::client::{load_balance as lb, Id};
use crate::error;
use crate::error::Error::TonicStatus;
use crate::error::Result;
#[derive(Clone, Debug)]
@@ -122,8 +123,15 @@ impl Inner {
async fn delete(&self, mut req: DeleteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
let res = client.delete(req).await.context(error::TonicStatusSnafu)?;
let res = client.delete(req).await.map_err(|mut source| {
// FIXME(hl): here intentionally clear the metadata field so that error date does not changes which will break sqlness test.
// we can remove this hack as soon as either: sqlness supports regex result match or greptimedb supports renaming table routes
source.metadata_mut().clear();
TonicStatus {
source,
location: Location::default(),
}
})?;
Ok(res.into_inner())
}

View File

@@ -21,7 +21,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
pub use catalog::datafusion::catalog_adapter::DfCatalogListAdapter;
use common_error::prelude::BoxedError;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::udf::create_udf;
@@ -183,19 +182,20 @@ impl DatafusionQueryEngine {
let catalog = self
.state
.catalog_list()
.catalog_manager()
.catalog(catalog_name)
.await
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu {
catalog: catalog_name,
})?;
let schema =
catalog
.schema(schema_name)
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
schema: schema_name,
})?;
let schema = catalog
.schema(schema_name)
.await
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
schema: schema_name,
})?;
let table = schema
.table(table_name)
.await
@@ -377,7 +377,7 @@ mod tests {
use std::sync::Arc;
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use catalog::{CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::util;
@@ -390,19 +390,21 @@ mod tests {
use crate::parser::QueryLanguageParser;
use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
fn create_test_engine() -> QueryEngineRef {
async fn create_test_engine() -> QueryEngineRef {
let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.await
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.await
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
QueryEngineFactory::new(catalog_list).query_engine()
@@ -410,7 +412,7 @@ mod tests {
#[tokio::test]
async fn test_sql_to_plan() {
let engine = create_test_engine();
let engine = create_test_engine().await;
let sql = "select sum(number) from numbers limit 20";
let stmt = QueryLanguageParser::parse_sql(sql).unwrap();
@@ -432,7 +434,7 @@ mod tests {
#[tokio::test]
async fn test_execute() {
let engine = create_test_engine();
let engine = create_test_engine().await;
let sql = "select sum(number) from numbers limit 20";
let stmt = QueryLanguageParser::parse_sql(sql).unwrap();
@@ -470,7 +472,7 @@ mod tests {
#[tokio::test]
async fn test_describe() {
let engine = create_test_engine();
let engine = create_test_engine().await;
let sql = "select sum(number) from numbers limit 20";
let stmt = QueryLanguageParser::parse_sql(sql).unwrap();

View File

@@ -55,7 +55,7 @@ impl DfContextProviderAdapter {
.context(DataFusionSnafu)?;
let mut table_provider = DfTableSourceProvider::new(
engine_state.catalog_list().clone(),
engine_state.catalog_manager().clone(),
engine_state.disallow_cross_schema_query(),
query_ctx.as_ref(),
);

View File

@@ -82,7 +82,7 @@ impl DfLogicalPlanner {
async fn plan_pql(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let table_provider = DfTableSourceProvider::new(
self.engine_state.catalog_list().clone(),
self.engine_state.catalog_manager().clone(),
self.engine_state.disallow_cross_schema_query(),
query_ctx.as_ref(),
);

View File

@@ -19,7 +19,7 @@ mod state;
use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogListRef;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY};
@@ -65,12 +65,12 @@ pub struct QueryEngineFactory {
}
impl QueryEngineFactory {
pub fn new(catalog_list: CatalogListRef) -> Self {
Self::new_with_plugins(catalog_list, Default::default())
pub fn new(catalog_manager: CatalogManagerRef) -> Self {
Self::new_with_plugins(catalog_manager, Default::default())
}
pub fn new_with_plugins(catalog_list: CatalogListRef, plugins: Arc<Plugins>) -> Self {
let state = Arc::new(QueryEngineState::new(catalog_list, plugins));
pub fn new_with_plugins(catalog_manager: CatalogManagerRef, plugins: Arc<Plugins>) -> Self {
let state = Arc::new(QueryEngineState::new(catalog_manager, plugins));
let query_engine = Arc::new(DatafusionQueryEngine::new(state));
register_functions(&query_engine);
Self { query_engine }

View File

@@ -17,11 +17,12 @@ use std::fmt;
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use catalog::CatalogListRef;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::physical_plan::SessionContext;
use common_query::prelude::ScalarUdf;
use datafusion::catalog::catalog::MemoryCatalogList;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState};
use datafusion::execution::runtime_env::RuntimeEnv;
@@ -31,7 +32,6 @@ use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datafusion_optimizer::analyzer::Analyzer;
use promql::extension_plan::PromExtensionPlanner;
use crate::datafusion::DfCatalogListAdapter;
use crate::optimizer::TypeConversionRule;
use crate::query_engine::options::QueryOptions;
@@ -42,7 +42,7 @@ use crate::query_engine::options::QueryOptions;
#[derive(Clone)]
pub struct QueryEngineState {
df_context: SessionContext,
catalog_list: CatalogListRef,
catalog_manager: CatalogManagerRef,
aggregate_functions: Arc<RwLock<HashMap<String, AggregateFunctionMetaRef>>>,
plugins: Arc<Plugins>,
}
@@ -55,7 +55,7 @@ impl fmt::Debug for QueryEngineState {
}
impl QueryEngineState {
pub fn new(catalog_list: CatalogListRef, plugins: Arc<Plugins>) -> Self {
pub fn new(catalog_list: CatalogManagerRef, plugins: Arc<Plugins>) -> Self {
let runtime_env = Arc::new(RuntimeEnv::default());
let session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
// Apply the type conversion rule first.
@@ -65,7 +65,7 @@ impl QueryEngineState {
let session_state = SessionState::with_config_rt_and_catalog_list(
session_config,
runtime_env,
Arc::new(DfCatalogListAdapter::new(catalog_list.clone())),
Arc::new(MemoryCatalogList::default()), // pass a dummy catalog list
)
.with_analyzer_rules(analyzer.rules)
.with_query_planner(Arc::new(DfQueryPlanner::new()));
@@ -74,7 +74,7 @@ impl QueryEngineState {
Self {
df_context,
catalog_list,
catalog_manager: catalog_list,
aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
plugins,
}
@@ -104,8 +104,8 @@ impl QueryEngineState {
}
#[inline]
pub fn catalog_list(&self) -> &CatalogListRef {
&self.catalog_list
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
pub(crate) fn disallow_cross_schema_query(&self) -> bool {

View File

@@ -99,7 +99,10 @@ static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
]))
});
pub fn show_databases(stmt: ShowDatabases, catalog_manager: CatalogManagerRef) -> Result<Output> {
pub async fn show_databases(
stmt: ShowDatabases,
catalog_manager: CatalogManagerRef,
) -> Result<Output> {
// TODO(LFC): supports WHERE
ensure!(
matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)),
@@ -110,11 +113,12 @@ pub fn show_databases(stmt: ShowDatabases, catalog_manager: CatalogManagerRef) -
let catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.context(error::CatalogSnafu)?
.context(error::CatalogNotFoundSnafu {
catalog: DEFAULT_CATALOG_NAME,
})?;
let mut databases = catalog.schema_names().context(error::CatalogSnafu)?;
let mut databases = catalog.schema_names().await.context(error::CatalogSnafu)?;
// TODO(dennis): Specify the order of the results in catalog manager API
databases.sort();
@@ -134,7 +138,7 @@ pub fn show_databases(stmt: ShowDatabases, catalog_manager: CatalogManagerRef) -
Ok(Output::RecordBatches(records))
}
pub fn show_tables(
pub async fn show_tables(
stmt: ShowTables,
catalog_manager: CatalogManagerRef,
query_ctx: QueryContextRef,
@@ -155,9 +159,10 @@ pub fn show_tables(
// TODO(sunng87): move this function into query_ctx
let schema = catalog_manager
.schema(&query_ctx.current_catalog(), &schema)
.await
.context(error::CatalogSnafu)?
.context(error::SchemaNotFoundSnafu { schema })?;
let mut tables = schema.table_names().context(error::CatalogSnafu)?;
let mut tables = schema.table_names().await.context(error::CatalogSnafu)?;
// TODO(dennis): Specify the order of the results in schema provider API
tables.sort();

View File

@@ -15,7 +15,6 @@
use std::sync::Arc;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_recordbatch::RecordBatch;
use datatypes::for_all_primitive_types;
@@ -57,14 +56,14 @@ pub fn create_query_engine() -> Arc<dyn QueryEngine> {
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let number_table = Arc::new(MemTable::new("numbers", recordbatch));
schema_provider
.register_table(number_table.table_name().to_string(), number_table)
.register_table_sync(number_table.table_name().to_string(), number_table)
.unwrap();
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
QueryEngineFactory::new(catalog_list).query_engine()

View File

@@ -17,7 +17,6 @@ use std::marker::PhantomData;
use std::sync::Arc;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_function::scalars::aggregate::AggregateFunctionMeta;
use common_function_macro::{as_aggr_func_creator, AggrFuncTypeStore};
@@ -234,12 +233,14 @@ fn new_query_engine_factory(table: MemTable) -> QueryEngineFactory {
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
let catalog_list = Arc::new(MemoryCatalogManager::default());
schema_provider.register_table(table_name, table).unwrap();
schema_provider
.register_table_sync(table_name, table)
.unwrap();
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
QueryEngineFactory::new(catalog_list)

View File

@@ -15,7 +15,6 @@
use std::sync::Arc;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_recordbatch::RecordBatch;
use datatypes::for_all_primitive_types;
@@ -102,14 +101,14 @@ fn create_correctness_engine() -> Arc<dyn QueryEngine> {
RecordBatch::new(schema, columns).unwrap(),
));
schema_provider
.register_table(number_table.table_name().to_string(), number_table)
.register_table_sync(number_table.table_name().to_string(), number_table)
.unwrap();
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
QueryEngineFactory::new(catalog_list).query_engine()

View File

@@ -15,7 +15,6 @@
use std::sync::Arc;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
@@ -110,14 +109,14 @@ fn catalog_list() -> Result<Arc<MemoryCatalogManager>> {
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.register_table_sync("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
Ok(catalog_list)
}

View File

@@ -16,7 +16,6 @@ use std::any::Any;
use std::sync::Arc;
use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use common_recordbatch::RecordBatch;
@@ -109,14 +108,15 @@ fn create_test_engine() -> TimeRangeTester {
let catalog_list = new_memory_catalog_list().unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
MemorySchemaProvider::register_table(&default_schema, "m".to_string(), table.clone()).unwrap();
MemorySchemaProvider::register_table_sync(&default_schema, "m".to_string(), table.clone())
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema("public".to_string(), default_schema)
.register_schema_sync("public".to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog("greptime".to_string(), default_catalog)
.register_catalog_sync("greptime".to_string(), default_catalog)
.unwrap();
let engine = QueryEngineFactory::new(catalog_list).query_engine();

View File

@@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
@@ -55,14 +54,14 @@ pub(crate) fn sample_script_engine() -> PyEngine {
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.register_table_sync("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);

View File

@@ -352,7 +352,6 @@ pub(crate) use tests::sample_script_engine;
#[cfg(test)]
mod tests {
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_recordbatch::util;
use datatypes::prelude::ScalarVector;
@@ -368,14 +367,14 @@ mod tests {
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.register_table_sync("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);

View File

@@ -71,14 +71,14 @@ use crate::server::Server;
/// create query context from database name information, catalog and schema are
/// resolved from the name
pub(crate) fn query_context_from_db(
pub(crate) async fn query_context_from_db(
query_handler: ServerSqlQueryHandlerRef,
db: Option<String>,
) -> std::result::Result<Arc<QueryContext>, JsonResponse> {
if let Some(db) = &db {
let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db);
match query_handler.is_valid_schema(catalog, schema) {
match query_handler.is_valid_schema(catalog, schema).await {
Ok(true) => Ok(Arc::new(QueryContext::with(catalog, schema))),
Ok(false) => Err(JsonResponse::with_error(
format!("Database not found: {db}"),
@@ -698,7 +698,7 @@ mod test {
unimplemented!()
}
fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
Ok(true)
}
}

View File

@@ -52,7 +52,7 @@ pub async fn sql(
let db = query_params.db.or(form_params.db);
let resp = if let Some(sql) = &sql {
match super::query_context_from_db(sql_handler.clone(), db) {
match crate::http::query_context_from_db(sql_handler.clone(), db).await {
Ok(query_ctx) => {
JsonResponse::from_output(sql_handler.do_query(sql, query_ctx).await).await
}
@@ -102,7 +102,7 @@ pub async fn promql(
let exec_start = Instant::now();
let db = params.db.clone();
let prom_query = params.into();
let resp = match super::query_context_from_db(sql_handler.clone(), db) {
let resp = match super::query_context_from_db(sql_handler.clone(), db).await {
Ok(query_ctx) => {
JsonResponse::from_output(sql_handler.do_promql_query(&prom_query, query_ctx).await)
.await

View File

@@ -234,7 +234,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
async fn on_init<'a>(&'a mut self, database: &'a str, w: InitWriter<'a, W>) -> Result<()> {
let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(database);
ensure!(
self.query_handler.is_valid_schema(catalog, schema)?,
self.query_handler.is_valid_schema(catalog, schema).await?,
error::DatabaseNotFoundSnafu { catalog, schema }
);

View File

@@ -135,7 +135,7 @@ impl StartupHandler for PostgresServerHandler {
auth::save_startup_parameters_to_metadata(client, startup);
// check if db is valid
match resolve_db_info(client, self.query_handler.clone())? {
match resolve_db_info(client, self.query_handler.clone()).await? {
DbResolution::Resolved(catalog, schema) => {
client
.metadata_mut()
@@ -210,7 +210,7 @@ enum DbResolution {
}
/// A function extracted to resolve lifetime and readability issues:
fn resolve_db_info<C>(
async fn resolve_db_info<C>(
client: &mut C,
query_handler: ServerSqlQueryHandlerRef,
) -> PgWireResult<DbResolution>
@@ -222,6 +222,7 @@ where
let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db);
if query_handler
.is_valid_schema(catalog, schema)
.await
.map_err(|e| PgWireError::ApiError(Box::new(e)))?
{
Ok(DbResolution::Resolved(

View File

@@ -50,7 +50,7 @@ pub trait SqlQueryHandler {
query_ctx: QueryContextRef,
) -> std::result::Result<Option<Schema>, Self::Error>;
fn is_valid_schema(
async fn is_valid_schema(
&self,
catalog: &str,
schema: &str,
@@ -116,9 +116,10 @@ where
.context(error::DescribeStatementSnafu)
}
fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
self.0
.is_valid_schema(catalog, schema)
.await
.map_err(BoxedError::new)
.context(error::CheckDatabaseValiditySnafu)
}

View File

@@ -87,7 +87,7 @@ impl SqlQueryHandler for DummyInstance {
unimplemented!()
}
fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
Ok(true)
}
}

View File

@@ -85,7 +85,7 @@ impl SqlQueryHandler for DummyInstance {
unimplemented!()
}
fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
Ok(true)
}
}

View File

@@ -110,7 +110,7 @@ impl SqlQueryHandler for DummyInstance {
unimplemented!()
}
fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
Ok(true)
}
}

View File

@@ -19,7 +19,6 @@ use api::v1::greptime_request::{Request as GreptimeRequest, Request};
use api::v1::query_request::Query;
use async_trait::async_trait;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use datatypes::schema::Schema;
@@ -106,7 +105,7 @@ impl SqlQueryHandler for DummyInstance {
}
}
fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
Ok(catalog == DEFAULT_CATALOG_NAME && schema == DEFAULT_SCHEMA_NAME)
}
}
@@ -202,12 +201,14 @@ fn create_testing_instance(table: MemTable) -> DummyInstance {
let schema_provider = Arc::new(MemorySchemaProvider::new());
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
let catalog_list = Arc::new(MemoryCatalogManager::default());
schema_provider.register_table(table_name, table).unwrap();
schema_provider
.register_table_sync(table_name, table)
.unwrap();
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);

View File

@@ -138,12 +138,14 @@ impl AlterTableProcedure {
let catalog = self
.catalog_manager
.catalog(&request.catalog_name)
.await
.context(AccessCatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: &request.catalog_name,
})?;
let schema = catalog
.schema(&request.schema_name)
.await
.context(AccessCatalogSnafu)?
.context(SchemaNotFoundSnafu {
name: &request.schema_name,
@@ -163,6 +165,7 @@ impl AlterTableProcedure {
ensure!(
!schema
.table_exist(new_table_name)
.await
.context(AccessCatalogSnafu)?,
TableExistsSnafu {
name: format!(
@@ -338,9 +341,10 @@ mod tests {
let catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.unwrap()
.unwrap();
let schema = catalog.schema(DEFAULT_SCHEMA_NAME).unwrap().unwrap();
let schema = catalog.schema(DEFAULT_SCHEMA_NAME).await.unwrap().unwrap();
let table = schema.table(new_table_name).await.unwrap().unwrap();
let table_info = table.table_info();
assert_eq!(new_table_name, table_info.name);

View File

@@ -47,7 +47,7 @@ impl Procedure for CreateTableProcedure {
async fn execute(&mut self, ctx: &Context) -> Result<Status> {
match self.data.state {
CreateTableState::Prepare => self.on_prepare(),
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::EngineCreateTable => self.on_engine_create_table(ctx).await,
CreateTableState::RegisterCatalog => self.on_register_catalog().await,
}
@@ -131,11 +131,12 @@ impl CreateTableProcedure {
})
}
fn on_prepare(&mut self) -> Result<Status> {
async fn on_prepare(&mut self) -> Result<Status> {
// Check whether catalog and schema exist.
let catalog = self
.catalog_manager
.catalog(&self.data.request.catalog_name)
.await
.context(AccessCatalogSnafu)?
.with_context(|| {
logging::error!(
@@ -148,6 +149,7 @@ impl CreateTableProcedure {
})?;
catalog
.schema(&self.data.request.schema_name)
.await
.context(AccessCatalogSnafu)?
.with_context(|| {
logging::error!(
@@ -224,12 +226,14 @@ impl CreateTableProcedure {
let catalog = self
.catalog_manager
.catalog(&self.data.request.catalog_name)
.await
.context(AccessCatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: &self.data.request.catalog_name,
})?;
let schema = catalog
.schema(&self.data.request.schema_name)
.await
.context(AccessCatalogSnafu)?
.context(SchemaNotFoundSnafu {
name: &self.data.request.schema_name,

View File

@@ -291,9 +291,10 @@ mod tests {
let catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.unwrap()
.unwrap();
let schema = catalog.schema(DEFAULT_SCHEMA_NAME).unwrap().unwrap();
let schema = catalog.schema(DEFAULT_SCHEMA_NAME).await.unwrap().unwrap();
assert!(schema.table(table_name).await.unwrap().is_none());
let ctx = EngineContext::default();
assert!(!table_engine.table_exists(

View File

@@ -89,7 +89,7 @@ pub struct TableIdent {
pub version: TableVersion,
}
/// The table medatadata
/// The table metadata
/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
/// TODO(dennis): find a better way to ensure 'new_meta_builder' works when adding new fields.
#[derive(Clone, Debug, Builder, PartialEq, Eq)]
@@ -172,7 +172,15 @@ impl TableMeta {
AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
// No need to rebuild table meta when renaming tables.
AlterKind::RenameTable { .. } => Ok(TableMetaBuilder::default()),
AlterKind::RenameTable { .. } => {
let mut meta_builder = TableMetaBuilder::default();
meta_builder
.schema(self.schema.clone())
.primary_key_indices(self.primary_key_indices.clone())
.engine(self.engine.clone())
.next_column_id(self.next_column_id);
Ok(meta_builder)
}
}
}

View File

@@ -267,13 +267,16 @@ pub async fn create_test_table(
let schema_provider = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
.unwrap()
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.await
.unwrap()
.unwrap();
schema_provider
.register_table(table_name.to_string(), table)
.await
.unwrap();
Ok(())
}

View File

@@ -27,9 +27,13 @@ SELECT * from t;
ALTER TABLE t RENAME new_table;
Error: 1001(Unsupported), Operation rename table not implemented yet
Affected Rows: 0
DROP TABLE t;
Affected Rows: 1
Error: 4001(TableNotFound), Table not found: greptime.public.t
DROP TABLE new_table;
Error: 1003(Internal), status: Internal, message: "Table route not found: __meta_table_route-greptime-public-new_table-1025", details: [], metadata: MetadataMap { headers: {} }

View File

@@ -10,3 +10,6 @@ SELECT * from t;
ALTER TABLE t RENAME new_table;
DROP TABLE t;
-- TODO: this clause should success
DROP TABLE new_table;