refactor: catalog crate (#331)

* chore: refactor dir for local catalog manager

* refactor: CatalogProvider returns Result

* refactor: SchemaProvider returns Result

* feat: add kv operations to remote catalog

* chore: refactor some code

* feat: impl catalog initialization

* feat: add register table and register system table function

* refactor: add table_info method for Table trait

* chore: add some tests

* chore: add register schema test

* chore: fix build issue after rebase onto develop

* refactor: mock to separate file

* build: failed to compile

* fix: use a container struct to bridge KvBackend and Accessor trait

* feat: upgrade opendal to 0.17

* test: add more tests

* chore: add catalog name and schema name to table info

* chore: add catalog name and schema name to table info

* chore: rebase onto develop

* refactor: common-catalog crate

* refactor: remove remote catalog related files

* fix: compilation

* feat: add table version to TableKey

* feat: add node id to TableValue

* fix: some CR comments

* chore: change async fn create_expr_to_request to sync

* fix: add backtrace to errors

* fix: code style

* fix: CatalogManager::table also requires both catalog_name and schema_name

* chore: merge develop
This commit is contained in:
Lei, Huang
2022-10-26 10:50:39 +08:00
committed by GitHub
parent 7fe39e9187
commit 932b30d299
52 changed files with 919 additions and 314 deletions

30
Cargo.lock generated
View File

@@ -635,18 +635,23 @@ version = "0.1.0"
dependencies = [
"async-stream",
"async-trait",
"chrono",
"common-catalog",
"common-error",
"common-query",
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-time",
"datafusion",
"datatypes",
"futures",
"futures-util",
"lazy_static",
"log-store",
"object-store",
"opendal",
"regex",
"serde",
"serde_json",
"snafu",
@@ -876,6 +881,25 @@ dependencies = [
"snafu",
]
[[package]]
name = "common-catalog"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"common-error",
"common-telemetry",
"datatypes",
"lazy_static",
"regex",
"serde",
"serde_json",
"snafu",
"table",
"tempdir",
"tokio",
]
[[package]]
name = "common-error"
version = "0.1.0"
@@ -1397,6 +1421,7 @@ dependencies = [
"catalog",
"client",
"common-base",
"common-catalog",
"common-error",
"common-grpc",
"common-query",
@@ -3743,6 +3768,7 @@ dependencies = [
"arrow2",
"async-trait",
"catalog",
"common-catalog",
"common-error",
"common-function",
"common-function-macro",
@@ -4468,6 +4494,7 @@ version = "0.1.0"
dependencies = [
"async-trait",
"catalog",
"common-catalog",
"common-error",
"common-function",
"common-query",
@@ -4608,6 +4635,7 @@ dependencies = [
"bytes",
"catalog",
"common-base",
"common-catalog",
"common-error",
"common-grpc",
"common-query",
@@ -4816,6 +4844,7 @@ name = "sql"
version = "0.1.0"
dependencies = [
"catalog",
"common-catalog",
"common-error",
"common-time",
"datatypes",
@@ -5047,6 +5076,7 @@ version = "0.1.0"
dependencies = [
"bytes",
"catalog",
"common-catalog",
"common-error",
"datafusion",
"datatypes",

View File

@@ -5,6 +5,7 @@ members = [
"src/client",
"src/cmd",
"src/common/base",
"src/common/catalog",
"src/common/error",
"src/common/function",
"src/common/function-macro",

View File

@@ -7,9 +7,11 @@ edition = "2021"
[dependencies]
async-stream = "0.3"
async-trait = "0.1"
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
@@ -18,12 +20,18 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch =
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"
lazy_static = "1.4"
opendal = "0.17"
regex = "1.6"
serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
storage = { path = "../storage" }
table = { path = "../table" }
tokio = { version = "1.18", features = ["full"] }
[dev-dependencies]
chrono = "0.4"
log-store = { path = "../log-store" }
object-store = { path = "../object-store" }
opendal = "0.17"

View File

@@ -115,6 +115,17 @@ pub enum Error {
#[snafu(backtrace)]
source: common_query::error::Error,
},
#[snafu(display("Cannot parse catalog value, source: {}", source))]
InvalidCatalogValue {
#[snafu(backtrace)]
source: common_catalog::error::Error,
},
#[snafu(display("IO error occurred while fetching catalog info, source: {}", source))]
Io {
backtrace: Backtrace,
source: std::io::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -129,12 +140,14 @@ impl ErrorExt for Error {
| Error::CatalogNotFound { .. }
| Error::InvalidEntryType { .. } => StatusCode::Unexpected,
Error::SystemCatalog { .. } | Error::EmptyValue | Error::ValueDeserialize { .. } => {
StatusCode::StorageUnavailable
}
Error::SystemCatalog { .. }
| Error::EmptyValue
| Error::ValueDeserialize { .. }
| Error::Io { .. } => StatusCode::StorageUnavailable,
Error::ReadSystemCatalog { source, .. } => source.status_code(),
Error::SystemCatalogTypeMismatch { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. } => source.status_code(),
Error::RegisterTable { .. } => StatusCode::Internal,
Error::TableExists { .. } => StatusCode::TableAlreadyExists,

View File

@@ -3,18 +3,18 @@
use std::any::Any;
use std::sync::Arc;
use common_telemetry::info;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::TableId;
use table::requests::CreateTableRequest;
use table::TableRef;
pub use crate::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
pub use crate::manager::LocalCatalogManager;
use crate::error::{CreateTableSnafu, Result};
pub use crate::schema::{SchemaProvider, SchemaProviderRef};
pub mod consts;
pub mod error;
mod manager;
pub mod memory;
pub mod local;
pub mod schema;
pub mod system;
pub mod tables;
@@ -31,13 +31,13 @@ pub trait CatalogList: Sync + Send {
&self,
name: String,
catalog: CatalogProviderRef,
) -> Option<CatalogProviderRef>;
) -> Result<Option<CatalogProviderRef>>;
/// Retrieves the list of available catalog names
fn catalog_names(&self) -> Vec<String>;
fn catalog_names(&self) -> Result<Vec<String>>;
/// Retrieves a specific catalog by name, provided it exists.
fn catalog(&self, name: &str) -> Option<CatalogProviderRef>;
fn catalog(&self, name: &str) -> Result<Option<CatalogProviderRef>>;
}
/// Represents a catalog, comprising a number of named schemas.
@@ -47,14 +47,17 @@ pub trait CatalogProvider: Sync + Send {
fn as_any(&self) -> &dyn Any;
/// Retrieves the list of available schema names in this catalog.
fn schema_names(&self) -> Vec<String>;
fn schema_names(&self) -> Result<Vec<String>>;
/// Registers schema to this catalog.
fn register_schema(&self, name: String, schema: SchemaProviderRef)
-> Option<SchemaProviderRef>;
fn register_schema(
&self,
name: String,
schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>>;
/// Retrieves a specific schema from the catalog by name, provided it exists.
fn schema(&self, name: &str) -> Option<SchemaProviderRef>;
fn schema(&self, name: &str) -> Result<Option<SchemaProviderRef>>;
}
pub type CatalogListRef = Arc<dyn CatalogList>;
@@ -79,8 +82,8 @@ pub trait CatalogManager: CatalogList {
/// Returns the table by catalog, schema and table name.
fn table(
&self,
catalog: Option<&str>,
schema: Option<&str>,
catalog: &str,
schema: &str,
table_name: &str,
) -> error::Result<Option<TableRef>>;
}
@@ -99,9 +102,10 @@ pub struct RegisterSystemTableRequest {
pub open_hook: Option<OpenSystemTableHook>,
}
#[derive(Clone)]
pub struct RegisterTableRequest {
pub catalog: Option<String>,
pub schema: Option<String>,
pub catalog: String,
pub schema: String,
pub table_name: String,
pub table_id: TableId,
pub table: TableRef,
@@ -111,3 +115,53 @@ pub struct RegisterTableRequest {
pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String {
format!("{}.{}.{}", catalog, schema, table)
}
pub trait CatalogProviderFactory {
fn create(&self, catalog_name: String) -> CatalogProviderRef;
}
pub trait SchemaProviderFactory {
fn create(&self, catalog_name: String, schema_name: String) -> SchemaProviderRef;
}
pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
manager: &'a M,
engine: TableEngineRef,
sys_table_requests: &'a mut Vec<RegisterSystemTableRequest>,
) -> Result<()> {
for req in sys_table_requests.drain(..) {
let catalog_name = &req.create_table_request.catalog_name;
let schema_name = &req.create_table_request.schema_name;
let table_name = &req.create_table_request.table_name;
let table_id = req.create_table_request.id;
let table = if let Some(table) = manager.table(catalog_name, schema_name, table_name)? {
table
} else {
let table = engine
.create_table(&EngineContext::default(), req.create_table_request.clone())
.await
.with_context(|_| CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id: {}",
catalog_name, schema_name, table_name, table_id,
),
})?;
manager
.register_table(RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name: table_name.clone(),
table_id,
table: table.clone(),
})
.await?;
info!("Created and registered system table: {}", table_name);
table
};
if let Some(hook) = req.open_hook {
(hook)(table)?;
}
}
Ok(())
}

7
src/catalog/src/local.rs Normal file
View File

@@ -0,0 +1,7 @@
pub mod manager;
pub mod memory;
pub use manager::LocalCatalogManager;
pub use memory::{
new_memory_catalog_list, MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider,
};

View File

@@ -2,6 +2,10 @@ use std::any::Any;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID,
SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME,
};
use common_recordbatch::RecordBatch;
use common_telemetry::{debug, info};
use datatypes::prelude::ScalarVector;
@@ -15,25 +19,22 @@ use table::requests::OpenTableRequest;
use table::table::numbers::NumbersTable;
use table::TableRef;
use super::error::Result;
use crate::consts::{
INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME,
};
use crate::error::Result;
use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, IllegalManagerStateSnafu, OpenTableSnafu,
ReadSystemCatalogSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu,
SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu,
CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu,
SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu,
TableNotFoundSnafu,
};
use crate::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use crate::local::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use crate::system::{
decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX,
VALUE_INDEX,
};
use crate::tables::SystemCatalog;
use crate::{
format_full_table_name, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
format_full_table_name, handle_system_table_request, CatalogList, CatalogManager,
CatalogProvider, CatalogProviderRef, RegisterSystemTableRequest, RegisterTableRequest,
SchemaProvider,
};
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
@@ -50,7 +51,7 @@ impl LocalCatalogManager {
/// Create a new [CatalogManager] with given user catalogs and table engine
pub async fn try_new(engine: TableEngineRef) -> Result<Self> {
let table = SystemCatalogTable::new(engine.clone()).await?;
let memory_catalog_list = crate::memory::new_memory_catalog_list()?;
let memory_catalog_list = crate::local::memory::new_memory_catalog_list()?;
let system_catalog = Arc::new(SystemCatalog::new(
table,
memory_catalog_list.clone(),
@@ -90,46 +91,7 @@ impl LocalCatalogManager {
// Processing system table hooks
let mut sys_table_requests = self.system_table_requests.lock().await;
for req in sys_table_requests.drain(..) {
let catalog_name = &req.create_table_request.catalog_name;
let schema_name = &req.create_table_request.schema_name;
let table_name = &req.create_table_request.table_name;
let table_id = req.create_table_request.id;
let table = if let Some(table) =
self.table(Some(catalog_name), Some(schema_name), table_name)?
{
table
} else {
let table = self
.engine
.create_table(&EngineContext::default(), req.create_table_request.clone())
.await
.with_context(|_| CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id: {}",
catalog_name, schema_name, table_name, table_id,
),
})?;
self.register_table(RegisterTableRequest {
catalog: Some(catalog_name.clone()),
schema: Some(schema_name.clone()),
table_name: table_name.clone(),
table_id,
table: table.clone(),
})
.await?;
info!("Created and registered system table: {}", table_name);
table
};
if let Some(hook) = req.open_hook {
(hook)(table)?;
}
}
handle_system_table_request(self, self.engine.clone(), &mut sys_table_requests).await?;
Ok(())
}
@@ -140,9 +102,9 @@ impl LocalCatalogManager {
self.system.information_schema.system.clone(),
)?;
let system_catalog = Arc::new(MemoryCatalogProvider::new());
system_catalog.register_schema(INFORMATION_SCHEMA_NAME.to_string(), system_schema);
system_catalog.register_schema(INFORMATION_SCHEMA_NAME.to_string(), system_schema)?;
self.catalogs
.register_catalog(SYSTEM_CATALOG_NAME.to_string(), system_catalog);
.register_catalog(SYSTEM_CATALOG_NAME.to_string(), system_catalog)?;
let default_catalog = Arc::new(MemoryCatalogProvider::new());
let default_schema = Arc::new(MemorySchemaProvider::new());
@@ -152,9 +114,9 @@ impl LocalCatalogManager {
let table = Arc::new(NumbersTable::default());
default_schema.register_table("numbers".to_string(), table)?;
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema);
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?;
self.catalogs
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog);
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)?;
Ok(())
}
@@ -210,14 +172,14 @@ impl LocalCatalogManager {
Entry::Schema(s) => {
let catalog =
self.catalogs
.catalog(&s.catalog_name)
.catalog(&s.catalog_name)?
.context(CatalogNotFoundSnafu {
catalog_name: &s.catalog_name,
})?;
catalog.register_schema(
s.schema_name.clone(),
Arc::new(MemorySchemaProvider::new()),
);
)?;
info!("Registered schema: {:?}", s);
}
Entry::Table(t) => {
@@ -234,12 +196,12 @@ impl LocalCatalogManager {
async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> {
let catalog = self
.catalogs
.catalog(&t.catalog_name)
.catalog(&t.catalog_name)?
.context(CatalogNotFoundSnafu {
catalog_name: &t.catalog_name,
})?;
let schema = catalog
.schema(&t.schema_name)
.schema(&t.schema_name)?
.context(SchemaNotFoundSnafu {
schema_info: format!("{}.{}", &t.catalog_name, &t.schema_name),
})?;
@@ -283,19 +245,19 @@ impl CatalogList for LocalCatalogManager {
&self,
name: String,
catalog: CatalogProviderRef,
) -> Option<Arc<dyn CatalogProvider>> {
) -> Result<Option<CatalogProviderRef>> {
self.catalogs.register_catalog(name, catalog)
}
fn catalog_names(&self) -> Vec<String> {
let mut res = self.catalogs.catalog_names();
fn catalog_names(&self) -> Result<Vec<String>> {
let mut res = self.catalogs.catalog_names()?;
res.push(SYSTEM_CATALOG_NAME.to_string());
res
Ok(res)
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
fn catalog(&self, name: &str) -> Result<Option<CatalogProviderRef>> {
if name.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) {
Some(self.system.clone())
Ok(Some(self.system.clone()))
} else {
self.catalogs.catalog(name)
}
@@ -304,7 +266,7 @@ impl CatalogList for LocalCatalogManager {
#[async_trait::async_trait]
impl CatalogManager for LocalCatalogManager {
/// Start [MemoryCatalogManager] to load all information from system catalog table.
/// Start [LocalCatalogManager] to load all information from system catalog table.
/// Make sure table engine is initialized before starting [MemoryCatalogManager].
async fn start(&self) -> Result<()> {
self.init().await
@@ -325,36 +287,30 @@ impl CatalogManager for LocalCatalogManager {
}
);
let catalog_name = request
.catalog
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema_name = request
.schema
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let catalog_name = &request.catalog;
let schema_name = &request.schema;
let catalog = self
.catalogs
.catalog(&catalog_name)
.context(CatalogNotFoundSnafu {
catalog_name: &catalog_name,
})?;
.catalog(catalog_name)?
.context(CatalogNotFoundSnafu { catalog_name })?;
let schema = catalog
.schema(&schema_name)
.schema(schema_name)?
.with_context(|| SchemaNotFoundSnafu {
schema_info: format!("{}.{}", catalog_name, schema_name),
})?;
if schema.table_exist(&request.table_name) {
if schema.table_exist(&request.table_name)? {
return TableExistsSnafu {
table: format_full_table_name(&catalog_name, &schema_name, &request.table_name),
table: format_full_table_name(catalog_name, schema_name, &request.table_name),
}
.fail();
}
self.system
.register_table(
catalog_name,
schema_name,
catalog_name.clone(),
schema_name.clone(),
request.table_name.clone(),
request.table_id,
)
@@ -380,22 +336,19 @@ impl CatalogManager for LocalCatalogManager {
fn table(
&self,
catalog: Option<&str>,
schema: Option<&str>,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> Result<Option<TableRef>> {
let catalog_name = catalog.unwrap_or(DEFAULT_CATALOG_NAME);
let schema_name = schema.unwrap_or(DEFAULT_SCHEMA_NAME);
let catalog = self
.catalogs
.catalog(catalog_name)
.catalog(catalog_name)?
.context(CatalogNotFoundSnafu { catalog_name })?;
let schema = catalog
.schema(schema_name)
.schema(schema_name)?
.with_context(|| SchemaNotFoundSnafu {
schema_info: format!("{}.{}", catalog_name, schema_name),
})?;
Ok(schema.table(table_name))
schema.table(table_name)
}
}

View File

@@ -23,7 +23,7 @@ impl MemoryCatalogList {
pub fn register_catalog_if_absent(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
catalog: CatalogProviderRef,
) -> Option<CatalogProviderRef> {
let mut catalogs = self.catalogs.write().unwrap();
let entry = catalogs.entry(name);
@@ -46,19 +46,19 @@ impl CatalogList for MemoryCatalogList {
&self,
name: String,
catalog: CatalogProviderRef,
) -> Option<CatalogProviderRef> {
) -> Result<Option<CatalogProviderRef>> {
let mut catalogs = self.catalogs.write().unwrap();
catalogs.insert(name, catalog)
Ok(catalogs.insert(name, catalog))
}
fn catalog_names(&self) -> Vec<String> {
fn catalog_names(&self) -> Result<Vec<String>> {
let catalogs = self.catalogs.read().unwrap();
catalogs.keys().map(|s| s.to_string()).collect()
Ok(catalogs.keys().map(|s| s.to_string()).collect())
}
fn catalog(&self, name: &str) -> Option<CatalogProviderRef> {
fn catalog(&self, name: &str) -> Result<Option<CatalogProviderRef>> {
let catalogs = self.catalogs.read().unwrap();
catalogs.get(name).cloned()
Ok(catalogs.get(name).cloned())
}
}
@@ -87,23 +87,23 @@ impl CatalogProvider for MemoryCatalogProvider {
self
}
fn schema_names(&self) -> Vec<String> {
fn schema_names(&self) -> Result<Vec<String>> {
let schemas = self.schemas.read().unwrap();
schemas.keys().cloned().collect()
Ok(schemas.keys().cloned().collect())
}
fn register_schema(
&self,
name: String,
schema: SchemaProviderRef,
) -> Option<SchemaProviderRef> {
) -> Result<Option<SchemaProviderRef>> {
let mut schemas = self.schemas.write().unwrap();
schemas.insert(name, schema)
Ok(schemas.insert(name, schema))
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
let schemas = self.schemas.read().unwrap();
schemas.get(name).cloned()
Ok(schemas.get(name).cloned())
}
}
@@ -132,18 +132,18 @@ impl SchemaProvider for MemorySchemaProvider {
self
}
fn table_names(&self) -> Vec<String> {
fn table_names(&self) -> Result<Vec<String>> {
let tables = self.tables.read().unwrap();
tables.keys().cloned().collect()
Ok(tables.keys().cloned().collect())
}
fn table(&self, name: &str) -> Option<TableRef> {
fn table(&self, name: &str) -> Result<Option<TableRef>> {
let tables = self.tables.read().unwrap();
tables.get(name).cloned()
Ok(tables.get(name).cloned())
}
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
if self.table_exist(name.as_str()) {
if self.table_exist(name.as_str())? {
return TableExistsSnafu { table: name }.fail()?;
}
let mut tables = self.tables.write().unwrap();
@@ -155,9 +155,9 @@ impl SchemaProvider for MemorySchemaProvider {
Ok(tables.remove(name))
}
fn table_exist(&self, name: &str) -> bool {
fn table_exist(&self, name: &str) -> Result<bool> {
let tables = self.tables.read().unwrap();
tables.contains_key(name)
Ok(tables.contains_key(name))
}
}
@@ -168,40 +168,50 @@ pub fn new_memory_catalog_list() -> Result<Arc<MemoryCatalogList>> {
#[cfg(test)]
mod tests {
use common_catalog::consts::*;
use common_error::ext::ErrorExt;
use common_error::prelude::StatusCode;
use table::table::numbers::NumbersTable;
use super::*;
use crate::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
#[test]
fn test_new_memory_catalog_list() {
let catalog_list = new_memory_catalog_list().unwrap();
assert!(catalog_list.catalog(DEFAULT_CATALOG_NAME).is_none());
assert!(catalog_list
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.is_none());
let default_catalog = Arc::new(MemoryCatalogProvider::default());
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog.clone());
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog.clone())
.unwrap();
assert!(default_catalog.schema(DEFAULT_SCHEMA_NAME).is_none());
assert!(default_catalog
.schema(DEFAULT_SCHEMA_NAME)
.unwrap()
.is_none());
let default_schema = Arc::new(MemorySchemaProvider::default());
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone());
default_catalog
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone())
.unwrap();
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
let table = default_schema.table("numbers");
let table = default_schema.table("numbers").unwrap();
assert!(table.is_some());
assert!(default_schema.table("not_exists").is_none());
assert!(default_schema.table("not_exists").unwrap().is_none());
}
#[tokio::test]
async fn test_mem_provider() {
let provider = MemorySchemaProvider::new();
let table_name = "numbers";
assert!(!provider.table_exist(table_name));
assert!(!provider.table_exist(table_name).unwrap());
assert!(provider.deregister_table(table_name).unwrap().is_none());
let test_table = NumbersTable::default();
// register table successfully
@@ -209,7 +219,7 @@ mod tests {
.register_table(table_name.to_string(), Arc::new(test_table))
.unwrap()
.is_none());
assert!(provider.table_exist(table_name));
assert!(provider.table_exist(table_name).unwrap());
let other_table = NumbersTable::default();
let result = provider.register_table(table_name.to_string(), Arc::new(other_table));
let err = result.err().unwrap();

View File

@@ -12,10 +12,10 @@ pub trait SchemaProvider: Sync + Send {
fn as_any(&self) -> &dyn Any;
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec<String>;
fn table_names(&self) -> Result<Vec<String>>;
/// Retrieves a specific table from the schema by name, provided it exists.
fn table(&self, name: &str) -> Option<TableRef>;
fn table(&self, name: &str) -> Result<Option<TableRef>>;
/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
@@ -28,7 +28,7 @@ pub trait SchemaProvider: Sync + Send {
/// If supported by the implementation, checks the table exist in the schema provider or not.
/// If no matched table in the schema provider, return false.
/// Otherwise, return true.
fn table_exist(&self, name: &str) -> bool;
fn table_exist(&self, name: &str) -> Result<bool>;
}
pub type SchemaProviderRef = Arc<dyn SchemaProvider>;

View File

@@ -2,6 +2,10 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use common_catalog::consts::{
INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID,
SYSTEM_CATALOG_TABLE_NAME,
};
use common_query::logical_plan::Expr;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::physical_plan::RuntimeEnv;
@@ -19,10 +23,6 @@ use table::metadata::{TableId, TableInfoRef};
use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest};
use table::{Table, TableRef};
use crate::consts::{
INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID,
SYSTEM_CATALOG_TABLE_NAME,
};
use crate::error::{
self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu,
OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu,

View File

@@ -6,6 +6,8 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use async_stream::stream;
use common_catalog::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME};
use common_error::ext::BoxedError;
use common_query::logical_plan::Expr;
use common_query::physical_plan::PhysicalPlanRef;
use common_recordbatch::error::Result as RecordBatchResult;
@@ -17,12 +19,12 @@ use datatypes::vectors::VectorRef;
use futures::Stream;
use snafu::ResultExt;
use table::engine::TableEngineRef;
use table::error::TablesRecordBatchSnafu;
use table::metadata::{TableId, TableInfoRef};
use table::table::scan::SimpleTableScan;
use table::{Table, TableRef};
use crate::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME};
use crate::error::InsertTableRecordSnafu;
use crate::error::{Error, InsertTableRecordSnafu};
use crate::system::{build_table_insert_request, SystemCatalogTable};
use crate::{
format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef,
@@ -70,12 +72,38 @@ impl Table for Tables {
let engine_name = self.engine_name.clone();
let stream = stream!({
for catalog_name in catalogs.catalog_names() {
let catalog = catalogs.catalog(&catalog_name).unwrap();
for schema_name in catalog.schema_names() {
let mut tables_in_schema = Vec::with_capacity(catalog.schema_names().len());
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
for catalog_name in catalogs
.catalog_names()
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
{
let catalog = catalogs
.catalog(&catalog_name)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
.unwrap();
for schema_name in catalog
.schema_names()
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
{
let mut tables_in_schema = Vec::with_capacity(
catalog
.schema_names()
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
.len(),
);
let schema = catalog
.schema(&schema_name)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
.unwrap();
for table_name in schema
.table_names()
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
{
tables_in_schema.push(table_name);
}
@@ -159,17 +187,20 @@ impl SchemaProvider for InformationSchema {
self
}
fn table_names(&self) -> Vec<String> {
vec!["tables".to_string(), SYSTEM_CATALOG_TABLE_NAME.to_string()]
fn table_names(&self) -> Result<Vec<String>, Error> {
Ok(vec![
"tables".to_string(),
SYSTEM_CATALOG_TABLE_NAME.to_string(),
])
}
fn table(&self, name: &str) -> Option<TableRef> {
fn table(&self, name: &str) -> Result<Option<TableRef>, Error> {
if name.eq_ignore_ascii_case("tables") {
Some(self.tables.clone())
Ok(Some(self.tables.clone()))
} else if name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) {
Some(self.system.clone())
Ok(Some(self.system.clone()))
} else {
None
Ok(None)
}
}
@@ -185,8 +216,9 @@ impl SchemaProvider for InformationSchema {
panic!("System catalog & schema does not support deregister table")
}
fn table_exist(&self, name: &str) -> bool {
name.eq_ignore_ascii_case("tables") || name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME)
fn table_exist(&self, name: &str) -> Result<bool, Error> {
Ok(name.eq_ignore_ascii_case("tables")
|| name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME))
}
}
@@ -231,23 +263,23 @@ impl CatalogProvider for SystemCatalog {
self
}
fn schema_names(&self) -> Vec<String> {
vec![INFORMATION_SCHEMA_NAME.to_string()]
fn schema_names(&self) -> Result<Vec<String>, Error> {
Ok(vec![INFORMATION_SCHEMA_NAME.to_string()])
}
fn register_schema(
&self,
_name: String,
_schema: SchemaProviderRef,
) -> Option<SchemaProviderRef> {
) -> Result<Option<SchemaProviderRef>, Error> {
panic!("System catalog does not support registering schema!")
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>, Error> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA_NAME) {
Some(self.information_schema.clone())
Ok(Some(self.information_schema.clone()))
} else {
None
Ok(None)
}
}
}
@@ -287,7 +319,9 @@ mod tests {
use table::table::numbers::NumbersTable;
use super::*;
use crate::memory::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider};
use crate::local::memory::{
new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider,
};
use crate::CatalogList;
#[tokio::test]
@@ -298,8 +332,12 @@ mod tests {
schema
.register_table("test_table".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
catalog_provider.register_schema("test_schema".to_string(), schema);
catalog_list.register_catalog("test_catalog".to_string(), catalog_provider);
catalog_provider
.register_schema("test_schema".to_string(), schema)
.unwrap();
catalog_list
.register_catalog("test_catalog".to_string(), catalog_provider)
.unwrap();
let tables = Tables::new(catalog_list, "test_engine".to_string());
let tables_stream = tables.scan(&None, &[], None).await.unwrap();

View File

@@ -27,11 +27,11 @@ tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[dev-dependencies.substrait_proto]
package = "substrait"
version = "0.2"
# TODO(ruihang): upgrade to 0.11 once substrait-rs supports it.
[dev-dependencies.prost_09]
package = "prost"
version = "0.9"
[dev-dependencies.substrait_proto]
package = "substrait"
version = "0.2"

View File

@@ -0,0 +1,22 @@
[package]
name = "common-catalog"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
datatypes = { path = "../../datatypes" }
lazy_static = "1.4"
regex = "1.6"
serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../../table" }
[dev-dependencies]
chrono = "0.4"
tempdir = "0.3"
tokio = { version = "1.0", features = ["full"] }

View File

@@ -11,3 +11,7 @@ pub const MIN_USER_TABLE_ID: u32 = 1024;
pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0;
/// scripts table id
pub const SCRIPTS_TABLE_ID: u32 = 1;
pub(crate) const CATALOG_KEY_PREFIX: &str = "__c";
pub(crate) const SCHEMA_KEY_PREFIX: &str = "__s";
pub(crate) const TABLE_KEY_PREFIX: &str = "__t";

View File

@@ -0,0 +1,45 @@
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::prelude::{Snafu, StatusCode};
use snafu::{Backtrace, ErrorCompat};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Invalid catalog info: {}", key))]
InvalidCatalog { key: String, backtrace: Backtrace },
#[snafu(display("Failed to deserialize catalog entry value: {}", raw))]
DeserializeCatalogEntryValue {
raw: String,
backtrace: Backtrace,
source: serde_json::error::Error,
},
#[snafu(display("Failed to serialize catalog entry value"))]
SerializeCatalogEntryValue {
backtrace: Backtrace,
source: serde_json::error::Error,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::InvalidCatalog { .. }
| Error::DeserializeCatalogEntryValue { .. }
| Error::SerializeCatalogEntryValue { .. } => StatusCode::Unexpected,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -0,0 +1,283 @@
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize, Serializer};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::{TableId, TableMeta, TableVersion};
use crate::consts::{CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_KEY_PREFIX};
use crate::error::{
DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu,
};
lazy_static! {
static ref CATALOG_KEY_PATTERN: Regex = Regex::new(&format!(
"^{}-([a-zA-Z_]+)-([a-zA-Z_]+)$",
CATALOG_KEY_PREFIX
))
.unwrap();
}
lazy_static! {
static ref SCHEMA_KEY_PATTERN: Regex = Regex::new(&format!(
"^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)$",
SCHEMA_KEY_PREFIX
))
.unwrap();
}
lazy_static! {
static ref TABLE_KEY_PATTERN: Regex = Regex::new(&format!(
"^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)-([a-zA-Z_]+)$",
TABLE_KEY_PREFIX
))
.unwrap();
}
pub fn build_catalog_prefix() -> String {
format!("{}-", CATALOG_KEY_PREFIX)
}
pub fn build_schema_prefix(catalog_name: impl AsRef<str>) -> String {
format!("{}-{}-", SCHEMA_KEY_PREFIX, catalog_name.as_ref())
}
pub fn build_table_prefix(catalog_name: impl AsRef<str>, schema_name: impl AsRef<str>) -> String {
format!(
"{}-{}-{}-",
TABLE_KEY_PREFIX,
catalog_name.as_ref(),
schema_name.as_ref()
)
}
pub struct TableKey {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub version: TableVersion,
pub node_id: String,
}
impl Display for TableKey {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(TABLE_KEY_PREFIX)?;
f.write_str("-")?;
f.write_str(&self.catalog_name)?;
f.write_str("-")?;
f.write_str(&self.schema_name)?;
f.write_str("-")?;
f.write_str(&self.table_name)?;
f.write_str("-")?;
f.serialize_u64(self.version)?;
f.write_str("-")?;
f.write_str(&self.node_id)
}
}
impl TableKey {
pub fn parse<S: AsRef<str>>(s: S) -> Result<Self, Error> {
let key = s.as_ref();
let captures = TABLE_KEY_PATTERN
.captures(key)
.context(InvalidCatalogSnafu { key })?;
ensure!(captures.len() == 6, InvalidCatalogSnafu { key });
let version =
u64::from_str(&captures[4]).map_err(|_| InvalidCatalogSnafu { key }.build())?;
Ok(Self {
catalog_name: captures[1].to_string(),
schema_name: captures[2].to_string(),
table_name: captures[3].to_string(),
version,
node_id: captures[5].to_string(),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TableValue {
pub id: TableId,
pub node_id: String,
pub meta: TableMeta,
}
impl TableValue {
pub fn parse(s: impl AsRef<str>) -> Result<Self, Error> {
serde_json::from_str(s.as_ref())
.context(DeserializeCatalogEntryValueSnafu { raw: s.as_ref() })
}
pub fn as_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(serde_json::to_string(self)
.context(SerializeCatalogEntryValueSnafu)?
.into_bytes())
}
}
pub struct CatalogKey {
pub catalog_name: String,
pub node_id: String,
}
impl Display for CatalogKey {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(CATALOG_KEY_PREFIX)?;
f.write_str("-")?;
f.write_str(&self.catalog_name)?;
f.write_str("-")?;
f.write_str(&self.node_id)
}
}
impl CatalogKey {
pub fn parse(s: impl AsRef<str>) -> Result<Self, Error> {
let key = s.as_ref();
let captures = CATALOG_KEY_PATTERN
.captures(key)
.context(InvalidCatalogSnafu { key })?;
ensure!(captures.len() == 3, InvalidCatalogSnafu { key });
Ok(Self {
catalog_name: captures[1].to_string(),
node_id: captures[2].to_string(),
})
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CatalogValue;
impl CatalogValue {
pub fn to_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(serde_json::to_string(self)
.context(SerializeCatalogEntryValueSnafu)?
.into_bytes())
}
}
pub struct SchemaKey {
pub catalog_name: String,
pub schema_name: String,
pub node_id: String,
}
impl Display for SchemaKey {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(SCHEMA_KEY_PREFIX)?;
f.write_str("-")?;
f.write_str(&self.catalog_name)?;
f.write_str("-")?;
f.write_str(&self.schema_name)?;
f.write_str("-")?;
f.write_str(&self.node_id)
}
}
impl SchemaKey {
pub fn parse(s: impl AsRef<str>) -> Result<Self, Error> {
let key = s.as_ref();
let captures = SCHEMA_KEY_PATTERN
.captures(key)
.context(InvalidCatalogSnafu { key })?;
ensure!(captures.len() == 4, InvalidCatalogSnafu { key });
Ok(Self {
catalog_name: captures[1].to_string(),
schema_name: captures[2].to_string(),
node_id: captures[3].to_string(),
})
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SchemaValue;
impl SchemaValue {
pub fn to_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(serde_json::to_string(self)
.context(SerializeCatalogEntryValueSnafu)?
.into_bytes())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use super::*;
#[test]
fn test_parse_catalog_key() {
let key = "__c-C-N";
let catalog_key = CatalogKey::parse(key).unwrap();
assert_eq!("C", catalog_key.catalog_name);
assert_eq!("N", catalog_key.node_id);
assert_eq!(key, catalog_key.to_string());
}
#[test]
fn test_parse_schema_key() {
let key = "__s-C-S-N";
let schema_key = SchemaKey::parse(key).unwrap();
assert_eq!("C", schema_key.catalog_name);
assert_eq!("S", schema_key.schema_name);
assert_eq!("N", schema_key.node_id);
assert_eq!(key, schema_key.to_string());
}
#[test]
fn test_parse_table_key() {
let key = "__t-C-S-T-42-N";
let entry = TableKey::parse(key).unwrap();
assert_eq!("C", entry.catalog_name);
assert_eq!("S", entry.schema_name);
assert_eq!("T", entry.table_name);
assert_eq!("N", entry.node_id);
assert_eq!(42, entry.version);
assert_eq!(key, &entry.to_string());
}
#[test]
fn test_build_prefix() {
assert_eq!("__c-", build_catalog_prefix());
assert_eq!("__s-CATALOG-", build_schema_prefix("CATALOG"));
assert_eq!(
"__t-CATALOG-SCHEMA-",
build_table_prefix("CATALOG", "SCHEMA")
);
}
#[test]
fn test_serialize_schema() {
let schema_ref = Arc::new(Schema::new(vec![ColumnSchema::new(
"name",
ConcreteDataType::string_datatype(),
true,
)]));
let meta = TableMeta {
schema: schema_ref,
engine: "mito".to_string(),
created_on: chrono::DateTime::default(),
primary_key_indices: vec![0, 1],
next_column_id: 3,
engine_options: Default::default(),
value_indices: vec![2, 3],
options: Default::default(),
};
let value = TableValue {
id: 42,
node_id: "localhost".to_string(),
meta,
};
let serialized = serde_json::to_string(&value).unwrap();
let deserialized = TableValue::parse(&serialized).unwrap();
assert_eq!(value, deserialized);
}
}

View File

@@ -0,0 +1,8 @@
pub mod consts;
pub mod error;
mod helper;
pub use helper::{
build_catalog_prefix, build_schema_prefix, build_table_prefix, CatalogKey, CatalogValue,
SchemaKey, SchemaValue, TableKey, TableValue,
};

View File

@@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
bytes = "1.1"
catalog = { path = "../../catalog" }
common-catalog = { path = "../catalog" }
common-error = { path = "../error" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",

View File

@@ -151,7 +151,7 @@ impl DFLogicalSubstraitConvertor {
// Get table handle from catalog manager
let table_ref = self
.catalog_manager
.table(Some(&catalog_name), Some(&schema_name), &table_name)
.table(&catalog_name, &schema_name, &table_name)
.map_err(BoxedError::new)
.context(InternalSnafu)?
.context(TableNotFoundSnafu {
@@ -279,11 +279,12 @@ impl DFLogicalSubstraitConvertor {
#[cfg(test)]
mod test {
use catalog::local::LocalCatalogManager;
use catalog::{
memory::{MemoryCatalogProvider, MemorySchemaProvider},
CatalogList, CatalogProvider, LocalCatalogManager, RegisterTableRequest,
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
local::{MemoryCatalogProvider, MemorySchemaProvider},
CatalogList, CatalogProvider, RegisterTableRequest,
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::schema::Schema;
use table::{requests::CreateTableRequest, test_util::EmptyTable, test_util::MockTableEngine};
@@ -300,8 +301,12 @@ mod test {
);
let schema_provider = Arc::new(MemorySchemaProvider::new());
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider);
catalog_manager.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_manager
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
catalog_manager.init().await.unwrap();
catalog_manager
@@ -338,8 +343,8 @@ mod test {
)));
catalog_manager
.register_table(RegisterTableRequest {
catalog: Some(DEFAULT_CATALOG_NAME.to_string()),
schema: Some(DEFAULT_SCHEMA_NAME.to_string()),
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: DEFAULT_TABLE_NAME.to_string(),
table_id: 1,
table: table_ref.clone(),

View File

@@ -14,6 +14,7 @@ axum = "0.6.0-rc.2"
axum-macros = "0.3.0-rc.1"
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-query = { path = "../common/query" }

View File

@@ -267,6 +267,18 @@ pub enum Error {
duplicated: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to access catalog, source: {}", source))]
Catalog {
#[snafu(backtrace)]
source: catalog::error::Error,
},
#[snafu(display("Failed to find table {} from catalog, source: {}", table_name, source))]
FindTable {
table_name: String,
source: catalog::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -278,7 +290,7 @@ impl ErrorExt for Error {
Error::DecodeLogicalPlan { source } => source.status_code(),
Error::ExecutePhysicalPlan { source } => source.status_code(),
Error::NewCatalog { source } => source.status_code(),
Error::FindTable { source, .. } => source.status_code(),
Error::CreateTable { source, .. }
| Error::GetTable { source, .. }
| Error::AlterTable { source, .. } => source.status_code(),
@@ -321,7 +333,8 @@ impl ErrorExt for Error {
| Error::Conversion { .. }
| Error::IntoPhysicalPlan { .. }
| Error::UnsupportedExpr { .. }
| Error::ColumnDataType { .. } => StatusCode::Internal,
| Error::ColumnDataType { .. }
| Error::Catalog { .. } => StatusCode::Internal,
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),
@@ -366,7 +379,7 @@ mod tests {
)))
}
fn throw_catalog_error() -> std::result::Result<(), catalog::error::Error> {
fn throw_catalog_error() -> catalog::error::Result<()> {
Err(catalog::error::Error::RegisterTable {
source: BoxedError::new(MockError::with_backtrace(StatusCode::Internal)),
})

View File

@@ -47,7 +47,7 @@ impl Instance {
object_store,
));
let catalog_manager = Arc::new(
catalog::LocalCatalogManager::try_new(table_engine.clone())
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
.await
.context(NewCatalogSnafu)?,
);
@@ -96,7 +96,7 @@ impl Instance {
));
let catalog_manager = Arc::new(
catalog::LocalCatalogManager::try_new(mock_engine.clone())
catalog::local::manager::LocalCatalogManager::try_new(mock_engine.clone())
.await
.unwrap(),
);

View File

@@ -3,7 +3,7 @@ use api::v1::{
ObjectExpr, ObjectResult, SelectExpr,
};
use async_trait::async_trait;
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::status_code::StatusCode;
use common_query::Output;
use common_telemetry::logging::{debug, info};
@@ -14,8 +14,8 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::requests::AddColumnRequest;
use crate::error::{
self, DecodeLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, Result, TableNotFoundSnafu,
UnsupportedExprSnafu,
self, CatalogSnafu, DecodeLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, Result,
TableNotFoundSnafu, UnsupportedExprSnafu,
};
use crate::instance::Instance;
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
@@ -99,13 +99,15 @@ impl Instance {
.catalog_manager
.catalog(catalog_name)
.unwrap()
.expect("default catalog must exist")
.schema(schema_name)
.expect("default schema must exist")
.unwrap();
let insert_batches = insert::insert_batches(values.values)?;
ensure!(!insert_batches.is_empty(), error::IllegalInsertDataSnafu);
let table = if let Some(table) = schema_provider.table(table_name) {
let table = if let Some(table) = schema_provider.table(table_name).context(CatalogSnafu)? {
let schema = table.schema();
if let Some(add_columns) = insert::find_new_columns(&schema, &insert_batches)? {
self.add_new_columns_to_table(table_name, add_columns)
@@ -124,6 +126,7 @@ impl Instance {
schema_provider
.table(table_name)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table_name })?
};

View File

@@ -1,5 +1,5 @@
use async_trait::async_trait;
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_query::Output;
use common_telemetry::{
@@ -38,8 +38,10 @@ impl Instance {
let schema_provider = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.expect("datafusion does not accept fallible catalog access")
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.expect("datafusion does not accept fallible catalog access")
.unwrap();
let request = self.sql_handler.insert_to_request(schema_provider, *i)?;
@@ -52,9 +54,9 @@ impl Instance {
// TODO(hl): Select table engine by engine_name
let request = self.sql_handler.create_to_request(table_id, c)?;
let catalog_name = request.catalog_name.clone();
let schema_name = request.schema_name.clone();
let table_name = request.table_name.clone();
let catalog_name = &request.catalog_name;
let schema_name = &request.schema_name;
let table_name = &request.table_name;
let table_id = request.id;
info!(
"Creating table, catalog: {:?}, schema: {:?}, table name: {:?}, table id: {}",

View File

@@ -2,7 +2,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDef, CreateExpr};
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::{ErrorExt, StatusCode};
use common_query::Output;
use datatypes::schema::ColumnDefaultConstraint;
@@ -173,7 +173,6 @@ fn create_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
mod tests {
use std::collections::HashMap;
use catalog::MIN_USER_TABLE_ID;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
@@ -188,7 +187,7 @@ mod tests {
let expr = testing_create_expr();
let request = instance.create_expr_to_request(expr).unwrap();
assert_eq!(request.id, MIN_USER_TABLE_ID);
assert_eq!(request.id, common_catalog::consts::MIN_USER_TABLE_ID);
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
assert_eq!(request.table_name, "my-metrics");

View File

@@ -1,9 +1,7 @@
//! sql handler
use catalog::{
schema::SchemaProviderRef, CatalogManagerRef, CatalogProviderRef, DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
};
use catalog::{schema::SchemaProviderRef, CatalogManagerRef, CatalogProviderRef};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use snafu::{OptionExt, ResultExt};
use sql::statements::show::{ShowDatabases, ShowTables};
@@ -12,7 +10,8 @@ use table::requests::*;
use table::TableRef;
use crate::error::{
CatalogNotFoundSnafu, GetTableSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
CatalogNotFoundSnafu, CatalogSnafu, GetTableSnafu, Result, SchemaNotFoundSnafu,
TableNotFoundSnafu,
};
mod alter;
@@ -63,6 +62,7 @@ impl SqlHandler {
pub(crate) fn get_default_catalog(&self) -> Result<CatalogProviderRef> {
self.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: DEFAULT_CATALOG_NAME,
})
@@ -71,10 +71,12 @@ impl SqlHandler {
pub(crate) fn get_default_schema(&self) -> Result<SchemaProviderRef> {
self.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: DEFAULT_CATALOG_NAME,
})?
.schema(DEFAULT_SCHEMA_NAME)
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
name: DEFAULT_SCHEMA_NAME,
})
@@ -158,13 +160,13 @@ mod tests {
self
}
fn table_names(&self) -> Vec<String> {
vec!["demo".to_string()]
fn table_names(&self) -> catalog::error::Result<Vec<String>> {
Ok(vec!["demo".to_string()])
}
fn table(&self, name: &str) -> Option<TableRef> {
fn table(&self, name: &str) -> catalog::error::Result<Option<TableRef>> {
assert_eq!(name, "demo");
Some(Arc::new(DemoTable {}))
Ok(Some(Arc::new(DemoTable {})))
}
fn register_table(
@@ -177,8 +179,8 @@ mod tests {
fn deregister_table(&self, _name: &str) -> catalog::error::Result<Option<TableRef>> {
unimplemented!();
}
fn table_exist(&self, name: &str) -> bool {
name == "demo"
fn table_exist(&self, name: &str) -> catalog::error::Result<bool> {
Ok(name == "demo")
}
}
@@ -205,7 +207,7 @@ mod tests {
));
let catalog_list = Arc::new(
catalog::LocalCatalogManager::try_new(table_engine.clone())
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
.await
.unwrap(),
);

View File

@@ -23,8 +23,7 @@ use crate::sql::SqlHandler;
impl SqlHandler {
pub(crate) async fn create(&self, req: CreateTableRequest) -> Result<Output> {
let ctx = EngineContext {};
let catalog_name = req.catalog_name.clone();
let schema_name = req.schema_name.clone();
// determine catalog and schema from the very beginning
let table_name = req.table_name.clone();
let table_id = req.id;
@@ -37,8 +36,8 @@ impl SqlHandler {
})?;
let register_req = RegisterTableRequest {
catalog: Some(catalog_name.to_string()),
schema: Some(schema_name.to_string()),
catalog: table.table_info().catalog_name.clone(),
schema: table.table_info().schema_name.clone(),
table_name: table_name.clone(),
table_id,
table,

View File

@@ -10,8 +10,8 @@ use sql::statements::{self, insert::Insert};
use table::requests::*;
use crate::error::{
ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlValueSnafu, Result,
TableNotFoundSnafu,
CatalogSnafu, ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu,
ParseSqlValueSnafu, Result, TableNotFoundSnafu,
};
use crate::sql::{SqlHandler, SqlRequest};
@@ -41,6 +41,7 @@ impl SqlHandler {
let table = schema_provider
.table(&table_name)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu {
table_name: &table_name,
})?;

View File

@@ -11,8 +11,8 @@ use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::show::{ShowDatabases, ShowKind, ShowTables};
use crate::error::{
ArrowComputationSnafu, CastVectorSnafu, NewRecordBatchSnafu, NewRecordBatchesSnafu, Result,
SchemaNotFoundSnafu, UnsupportedExprSnafu,
ArrowComputationSnafu, CastVectorSnafu, CatalogSnafu, NewRecordBatchSnafu,
NewRecordBatchesSnafu, Result, SchemaNotFoundSnafu, UnsupportedExprSnafu,
};
use crate::sql::SqlHandler;
@@ -43,7 +43,7 @@ impl SqlHandler {
let catalog = self.get_default_catalog()?;
// TODO(dennis): return an iterator or stream would be better.
let schemas = catalog.schema_names();
let schemas = catalog.schema_names().context(CatalogSnafu)?;
let column_schemas = vec![ColumnSchema::new(
SCHEMAS_COLUMN,
@@ -77,11 +77,14 @@ impl SqlHandler {
let schema = if let Some(name) = &stmt.database {
let catalog = self.get_default_catalog()?;
catalog.schema(name).context(SchemaNotFoundSnafu { name })?
catalog
.schema(name)
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu { name })?
} else {
self.get_default_schema()?
};
let tables = schema.table_names();
let tables = schema.table_names().context(CatalogSnafu)?;
let column_schemas = vec![ColumnSchema::new(
TABLES_COLUMN,

View File

@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use snafu::ResultExt;
@@ -81,7 +81,9 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> {
.catalog_manager()
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap()
.unwrap();
schema_provider
.register_table(table_name.to_string(), table)
@@ -97,7 +99,7 @@ pub async fn create_mock_sql_handler() -> SqlHandler {
object_store,
));
let catalog_manager = Arc::new(
catalog::LocalCatalogManager::try_new(mock_engine.clone())
catalog::local::LocalCatalogManager::try_new(mock_engine.clone())
.await
.unwrap(),
);

View File

@@ -7,6 +7,7 @@ edition = "2021"
arc-swap = "1.0"
async-trait = "0.1"
catalog = { path = "../catalog" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-function = { path = "../common/function" }
common-query = { path = "../common/query" }

View File

@@ -233,10 +233,9 @@ mod tests {
use std::sync::Arc;
use arrow::array::UInt64Array;
use catalog::memory::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{
CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::util;
use datafusion::field_util::FieldExt;
@@ -246,15 +245,19 @@ mod tests {
use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
fn create_test_engine() -> QueryEngineRef {
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema);
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog);
default_catalog
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
factory.query_engine().clone()

View File

@@ -3,6 +3,7 @@
use std::any::Any;
use std::sync::Arc;
use catalog::error::Error;
use catalog::{
CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef,
};
@@ -45,16 +46,21 @@ impl DfCatalogList for DfCatalogListAdapter {
});
self.catalog_list
.register_catalog(name, catalog_adapter)
.expect("datafusion does not accept fallible catalog access") // TODO(hl): datafusion register catalog does not handles errors
.map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _)
}
fn catalog_names(&self) -> Vec<String> {
self.catalog_list.catalog_names()
// TODO(hl): datafusion register catalog does not handles errors
self.catalog_list
.catalog_names()
.expect("datafusion does not accept fallible catalog access")
}
fn catalog(&self, name: &str) -> Option<Arc<dyn DfCatalogProvider>> {
self.catalog_list
.catalog(name)
.expect("datafusion does not accept fallible catalog access") // TODO(hl): datafusion register catalog does not handles errors
.map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _)
}
}
@@ -69,22 +75,23 @@ impl CatalogProvider for CatalogProviderAdapter {
self
}
fn schema_names(&self) -> Vec<String> {
self.df_catalog_provider.schema_names()
fn schema_names(&self) -> catalog::error::Result<Vec<String>> {
Ok(self.df_catalog_provider.schema_names())
}
fn register_schema(
&self,
_name: String,
_schema: SchemaProviderRef,
) -> Option<SchemaProviderRef> {
) -> catalog::error::Result<Option<SchemaProviderRef>> {
todo!("register_schema is not supported in Datafusion catalog provider")
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.df_catalog_provider
fn schema(&self, name: &str) -> catalog::error::Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self
.df_catalog_provider
.schema(name)
.map(|df_schema_provider| Arc::new(SchemaProviderAdapter { df_schema_provider }) as _)
.map(|df_schema_provider| Arc::new(SchemaProviderAdapter { df_schema_provider }) as _))
}
}
@@ -99,12 +106,15 @@ impl DfCatalogProvider for DfCatalogProviderAdapter {
}
fn schema_names(&self) -> Vec<String> {
self.catalog_provider.schema_names()
self.catalog_provider
.schema_names()
.expect("datafusion does not accept fallible catalog access")
}
fn schema(&self, name: &str) -> Option<Arc<dyn DfSchemaProvider>> {
self.catalog_provider
.schema(name)
.expect("datafusion does not accept fallible catalog access")
.map(|schema_provider| Arc::new(DfSchemaProviderAdapter { schema_provider }) as _)
}
}
@@ -120,12 +130,15 @@ impl DfSchemaProvider for DfSchemaProviderAdapter {
}
fn table_names(&self) -> Vec<String> {
self.schema_provider.table_names()
self.schema_provider
.table_names()
.expect("datafusion does not accept fallible catalog access")
}
fn table(&self, name: &str) -> Option<Arc<dyn DfTableProvider>> {
self.schema_provider
.table(name)
.expect("datafusion does not accept fallible catalog access")
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
}
@@ -149,7 +162,9 @@ impl DfSchemaProvider for DfSchemaProviderAdapter {
}
fn table_exist(&self, name: &str) -> bool {
self.schema_provider.table_exist(name)
self.schema_provider
.table_exist(name)
.expect("datafusion does not accept fallible catalog access")
}
}
@@ -164,12 +179,12 @@ impl SchemaProvider for SchemaProviderAdapter {
}
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec<String> {
self.df_schema_provider.table_names()
fn table_names(&self) -> Result<Vec<String>, Error> {
Ok(self.df_schema_provider.table_names())
}
fn table(&self, name: &str) -> Option<TableRef> {
self.df_schema_provider.table(name).map(|table_provider| {
fn table(&self, name: &str) -> Result<Option<TableRef>, Error> {
let table = self.df_schema_provider.table(name).map(|table_provider| {
match table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
@@ -182,7 +197,8 @@ impl SchemaProvider for SchemaProviderAdapter {
Arc::new(adapter) as _
}
}
})
});
Ok(table)
}
fn register_table(
@@ -213,14 +229,14 @@ impl SchemaProvider for SchemaProviderAdapter {
.transpose()
}
fn table_exist(&self, name: &str) -> bool {
self.df_schema_provider.table_exist(name)
fn table_exist(&self, name: &str) -> Result<bool, Error> {
Ok(self.df_schema_provider.table_exist(name))
}
}
#[cfg(test)]
mod tests {
use catalog::memory::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider};
use table::table::numbers::NumbersTable;
use super::*;
@@ -234,10 +250,12 @@ mod tests {
),
};
adapter.register_schema(
"whatever".to_string(),
Arc::new(MemorySchemaProvider::new()),
);
adapter
.register_schema(
"whatever".to_string(),
Arc::new(MemorySchemaProvider::new()),
)
.unwrap();
}
#[test]

View File

@@ -3,7 +3,7 @@ mod state;
use std::sync::Arc;
use catalog::CatalogList;
use catalog::CatalogListRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY};
use common_query::physical_plan::PhysicalPlan;
@@ -43,7 +43,7 @@ pub struct QueryEngineFactory {
}
impl QueryEngineFactory {
pub fn new(catalog_list: Arc<dyn CatalogList>) -> Self {
pub fn new(catalog_list: CatalogListRef) -> Self {
let query_engine = Arc::new(DatafusionQueryEngine::new(catalog_list));
for func in FUNCTION_REGISTRY.functions() {
@@ -72,7 +72,7 @@ mod tests {
#[test]
fn test_query_engine_factory() {
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let factory = QueryEngineFactory::new(catalog_list);
let engine = factory.query_engine();

View File

@@ -3,6 +3,7 @@ use std::fmt;
use std::sync::{Arc, RwLock};
use catalog::CatalogListRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::physical_plan::RuntimeEnv;
use common_query::prelude::ScalarUdf;
@@ -39,10 +40,7 @@ impl fmt::Debug for QueryEngineState {
impl QueryEngineState {
pub(crate) fn new(catalog_list: CatalogListRef) -> Self {
let config = ExecutionConfig::new()
.with_default_catalog_and_schema(
catalog::DEFAULT_CATALOG_NAME,
catalog::DEFAULT_SCHEMA_NAME,
)
.with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.with_optimizer_rules(vec![
// TODO(hl): SimplifyExpressions is not exported.
Arc::new(TypeConversionRule {}),

View File

@@ -1,9 +1,8 @@
use std::sync::Arc;
use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{
CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use datatypes::for_all_primitive_types;
@@ -47,8 +46,12 @@ pub fn create_query_engine() -> Arc<dyn QueryEngine> {
.register_table(number_table.table_name().to_string(), number_table)
.unwrap();
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider);
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
factory.query_engine().clone()

View File

@@ -2,10 +2,9 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{
CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_function::scalars::aggregate::AggregateFunctionMeta;
use common_function_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::CreateAccumulatorSnafu;
@@ -246,8 +245,12 @@ fn new_query_engine_factory(table: MemTable) -> QueryEngineFactory {
let catalog_list = Arc::new(MemoryCatalogList::default());
schema_provider.register_table(table_name, table).unwrap();
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider);
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
QueryEngineFactory::new(catalog_list)
}

View File

@@ -1,9 +1,8 @@
use std::sync::Arc;
mod function;
use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{
CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::error::Result as RecordResult;
use common_recordbatch::{util, RecordBatch};
@@ -137,8 +136,12 @@ fn create_correctness_engine() -> Arc<dyn QueryEngine> {
.register_table(number_table.table_name().to_string(), number_table)
.unwrap();
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider);
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
factory.query_engine().clone()

View File

@@ -3,10 +3,9 @@ mod pow;
use std::sync::Arc;
use arrow::array::UInt32Array;
use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{
CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::{create_udf, make_scalar_function, Volatility};
use common_query::Output;
use common_recordbatch::error::Result as RecordResult;
@@ -34,7 +33,7 @@ use crate::pow::pow;
#[tokio::test]
async fn test_datafusion_query_engine() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = catalog::memory::new_memory_catalog_list()?;
let catalog_list = catalog::local::new_memory_catalog_list()?;
let factory = QueryEngineFactory::new(catalog_list);
let engine = factory.query_engine();
@@ -90,15 +89,19 @@ async fn test_datafusion_query_engine() -> Result<()> {
#[tokio::test]
async fn test_udf() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = catalog::memory::new_memory_catalog_list()?;
let catalog_list = catalog::local::new_memory_catalog_list()?;
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema);
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog);
default_catalog
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
let engine = factory.query_engine();
@@ -206,8 +209,12 @@ fn create_query_engine() -> Arc<dyn QueryEngine> {
.register_table(odd_number_table.table_name().to_string(), odd_number_table)
.unwrap();
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider);
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
factory.query_engine().clone()

View File

@@ -21,6 +21,7 @@ python = [
[dependencies]
async-trait = "0.1"
catalog = { path = "../catalog" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-function = { path = "../common/function" }
common-query = { path = "../common/query" }

View File

@@ -118,7 +118,7 @@ mod tests {
));
let catalog_manager = Arc::new(
catalog::LocalCatalogManager::try_new(mock_engine.clone())
catalog::local::LocalCatalogManager::try_new(mock_engine.clone())
.await
.unwrap(),
);

View File

@@ -133,10 +133,9 @@ impl ScriptEngine for PyEngine {
#[cfg(test)]
mod tests {
use catalog::memory::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{
CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_recordbatch::util;
use datafusion_common::field_util::FieldExt;
use datafusion_common::field_util::SchemaExt;
@@ -149,15 +148,19 @@ mod tests {
#[tokio::test]
async fn test_compile_execute() {
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
let default_catalog = Arc::new(MemoryCatalogProvider::new());
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema);
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog);
default_catalog
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
let query_engine = factory.query_engine();

View File

@@ -2,8 +2,8 @@
use std::collections::HashMap;
use std::sync::Arc;
use catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID};
use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID};
use common_query::Output;
use common_recordbatch::util as record_util;
use common_telemetry::logging;
@@ -107,8 +107,8 @@ impl ScriptsTable {
let table = self
.catalog_manager
.table(
Some(DEFAULT_CATALOG_NAME),
Some(DEFAULT_SCHEMA_NAME),
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
SCRIPTS_TABLE_NAME,
)
.context(FindScriptsTableSnafu)?

View File

@@ -10,6 +10,7 @@ axum = "0.6.0-rc.2"
axum-macros = "0.3.0-rc.1"
bytes = "1.2"
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-query = { path = "../common/query" }

View File

@@ -2,10 +2,9 @@ use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{
CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use query::{QueryEngineFactory, QueryEngineRef};
use servers::error::Result;
@@ -73,8 +72,12 @@ fn create_testing_sql_query_handler(table: MemTable) -> SqlQueryHandlerRef {
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
let catalog_list = Arc::new(MemoryCatalogList::default());
schema_provider.register_table(table_name, table).unwrap();
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider);
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
catalog_provider
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
.unwrap();
catalog_list
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider)
.unwrap();
let factory = QueryEngineFactory::new(catalog_list);
let query_engine = factory.query_engine().clone();

View File

@@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
catalog = { path = "../catalog" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }

View File

@@ -7,7 +7,7 @@ pub mod statement;
use std::str::FromStr;
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};

View File

@@ -238,8 +238,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
_ctx: &EngineContext,
request: CreateTableRequest,
) -> Result<TableRef> {
let catalog_name = request.catalog_name;
let schema_name = request.schema_name;
let catalog_name = &request.catalog_name;
let schema_name = &request.schema_name;
let table_name = &request.table_name;
if let Some(table) = self.get_table(table_name) {

View File

@@ -58,6 +58,8 @@ pub fn build_test_table_info() -> TableInfo {
.ident(0)
.table_version(0u64)
.table_type(TableType::Base)
.catalog_name("greptime".to_string())
.schema_name("public".to_string())
.build()
.unwrap()
}

View File

@@ -40,6 +40,7 @@ pub trait TableEngine: Send + Sync {
fn get_table(&self, ctx: &EngineContext, name: &str) -> Result<Option<TableRef>>;
/// Returns true when the given table is exists.
/// TODO(hl): support catalog and schema
fn table_exists(&self, ctx: &EngineContext, name: &str) -> bool;
/// Drops the given table.

View File

@@ -45,6 +45,12 @@ pub enum InnerError {
source: ArrowError,
backtrace: Backtrace,
},
#[snafu(display("Failed to create record batch for Tables, source: {}", source))]
TablesRecordBatch {
#[snafu(backtrace)]
source: BoxedError,
},
}
impl ErrorExt for InnerError {
@@ -55,6 +61,7 @@ impl ErrorExt for InnerError {
| InnerError::SchemaConversion { .. }
| InnerError::TableProjection { .. } => StatusCode::EngineExecuteQuery,
InnerError::MissingColumn { .. } => StatusCode::InvalidArguments,
InnerError::TablesRecordBatch { .. } => StatusCode::Unexpected,
}
}

View File

@@ -15,26 +15,69 @@ use futures::Stream;
use snafu::prelude::*;
use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu};
use crate::metadata::TableInfoRef;
use crate::metadata::{
TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, TableVersion,
};
use crate::table::scan::SimpleTableScan;
use crate::Table;
#[derive(Debug, Clone)]
pub struct MemTable {
table_name: String,
info: TableInfoRef,
recordbatch: RecordBatch,
}
impl MemTable {
pub fn new(table_name: impl Into<String>, recordbatch: RecordBatch) -> Self {
Self {
table_name: table_name.into(),
Self::new_with_catalog(
table_name,
recordbatch,
}
0,
"greptime".to_string(),
"public".to_string(),
)
}
pub fn new_with_catalog(
table_name: impl Into<String>,
recordbatch: RecordBatch,
table_id: TableId,
catalog_name: String,
schema_name: String,
) -> Self {
let schema = recordbatch.schema.clone();
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.value_indices(vec![])
.engine("mock".to_string())
.next_column_id(0)
.engine_options(Default::default())
.options(Default::default())
.created_on(Default::default())
.build()
.unwrap();
let info = Arc::new(
TableInfoBuilder::default()
.table_id(table_id)
.table_version(0 as TableVersion)
.name(table_name.into())
.schema_name(schema_name)
.catalog_name(catalog_name)
.desc(None)
.table_type(TableType::Base)
.meta(meta)
.build()
.unwrap(),
);
Self { info, recordbatch }
}
pub fn table_name(&self) -> &str {
&self.table_name
&self.info.name
}
/// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and
@@ -65,7 +108,7 @@ impl Table for MemTable {
}
fn table_info(&self) -> TableInfoRef {
unimplemented!()
self.info.clone()
}
async fn scan(