mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: implement catalog manager (#129)
Implement catalog manager that provides a vision of all existing tables while instance start. Current implementation is based on local table engine, all catalog info is stored in an system catalog table.
This commit is contained in:
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -510,9 +510,18 @@ checksum = "a2698f953def977c68f935bb0dfa959375ad4638570e969e2f1e9f433cbf1af6"
|
||||
name = "catalog"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"common-error",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
"datafusion",
|
||||
"datatypes",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
"table",
|
||||
"tokio",
|
||||
@@ -3015,6 +3024,8 @@ dependencies = [
|
||||
"num",
|
||||
"num-traits",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
"sql",
|
||||
"table",
|
||||
|
||||
@@ -6,9 +6,18 @@ edition = "2021"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
async-stream = "0.3"
|
||||
common-error = { path = "../common/error" }
|
||||
common-query = { path = "../common/query" }
|
||||
common-recordbatch = { path = "../common/recordbatch" }
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
|
||||
datatypes = { path = "../datatypes" }
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
table = { path = "../table" }
|
||||
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::schema::SchemaProvider;
|
||||
|
||||
/// Represent a list of named catalogs
|
||||
pub trait CatalogList: Sync + Send {
|
||||
/// Returns the catalog list as [`Any`](std::any::Any)
|
||||
/// so that it can be downcast to a specific implementation.
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
|
||||
/// Adds a new catalog to this catalog list
|
||||
/// If a catalog of the same name existed before, it is replaced in the list and returned.
|
||||
fn register_catalog(
|
||||
&self,
|
||||
name: String,
|
||||
catalog: Arc<dyn CatalogProvider>,
|
||||
) -> Option<Arc<dyn CatalogProvider>>;
|
||||
|
||||
/// Retrieves the list of available catalog names
|
||||
fn catalog_names(&self) -> Vec<String>;
|
||||
|
||||
/// Retrieves a specific catalog by name, provided it exists.
|
||||
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>;
|
||||
}
|
||||
|
||||
/// Represents a catalog, comprising a number of named schemas.
|
||||
pub trait CatalogProvider: Sync + Send {
|
||||
/// Returns the catalog provider as [`Any`](std::any::Any)
|
||||
/// so that it can be downcast to a specific implementation.
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
|
||||
/// Retrieves the list of available schema names in this catalog.
|
||||
fn schema_names(&self) -> Vec<String>;
|
||||
|
||||
/// Retrieves a specific schema from the catalog by name, provided it exists.
|
||||
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
|
||||
}
|
||||
|
||||
pub type CatalogListRef = Arc<dyn CatalogList>;
|
||||
pub type CatalogProviderRef = Arc<dyn CatalogProvider>;
|
||||
|
||||
pub const DEFAULT_CATALOG_NAME: &str = "greptime";
|
||||
pub const DEFAULT_SCHEMA_NAME: &str = "public";
|
||||
6
src/catalog/src/consts.rs
Normal file
6
src/catalog/src/consts.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
pub const SYSTEM_CATALOG_NAME: &str = "system";
|
||||
pub const INFORMATION_SCHEMA_NAME: &str = "information_schema";
|
||||
pub const SYSTEM_CATALOG_TABLE_ID: u64 = 0;
|
||||
pub const SYSTEM_CATALOG_TABLE_NAME: &str = "system_catalog";
|
||||
pub const DEFAULT_CATALOG_NAME: &str = "greptime";
|
||||
pub const DEFAULT_SCHEMA_NAME: &str = "public";
|
||||
@@ -1,11 +1,209 @@
|
||||
use datafusion::error::DataFusionError;
|
||||
use std::any::Any;
|
||||
|
||||
common_error::define_opaque_error!(Error);
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::prelude::{Snafu, StatusCode};
|
||||
use datafusion::error::DataFusionError;
|
||||
use datatypes::arrow;
|
||||
use snafu::{Backtrace, ErrorCompat};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to open system catalog table, source: {}", source))]
|
||||
OpenSystemCatalog {
|
||||
#[snafu(backtrace)]
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create system catalog table, source: {}", source))]
|
||||
CreateSystemCatalog {
|
||||
#[snafu(backtrace)]
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("System catalog is not valid: {}", msg))]
|
||||
SystemCatalog { msg: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display(
|
||||
"System catalog table type mismatch, expected: binary, found: {:?} source: {}",
|
||||
data_type,
|
||||
source
|
||||
))]
|
||||
SystemCatalogTypeMismatch {
|
||||
data_type: arrow::datatypes::DataType,
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid system catalog entry type: {:?}", entry_type))]
|
||||
InvalidEntryType { entry_type: Option<u8> },
|
||||
|
||||
#[snafu(display("Invalid system catalog key: {:?}", key))]
|
||||
InvalidKey { key: Option<String> },
|
||||
|
||||
#[snafu(display("Catalog value is not present"))]
|
||||
EmptyValue,
|
||||
|
||||
#[snafu(display("Failed to deserialize value, source: {}", source))]
|
||||
ValueDeserialize {
|
||||
source: serde_json::error::Error,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot find catalog by name: {}", catalog_name))]
|
||||
CatalogNotFound { catalog_name: String },
|
||||
|
||||
#[snafu(display("Cannot find schema, schema info: {}", schema_info))]
|
||||
SchemaNotFound { schema_info: String },
|
||||
|
||||
#[snafu(display("Table {} already exists", table))]
|
||||
TableExists { table: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Failed to register table"))]
|
||||
RegisterTable {
|
||||
#[snafu(backtrace)]
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to open table, table info: {}, source: {}", table_info, source))]
|
||||
OpenTable {
|
||||
table_info: String,
|
||||
#[snafu(backtrace)]
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Table not found while opening table, table info: {}", table_info))]
|
||||
TableNotFound { table_info: String },
|
||||
|
||||
#[snafu(display("Failed to read system catalog table records"))]
|
||||
ReadSystemCatalog {
|
||||
#[snafu(backtrace)]
|
||||
source: common_recordbatch::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build table schema for system catalog table"))]
|
||||
SystemCatalogSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl From<Error> for DataFusionError {
|
||||
fn from(e: Error) -> DataFusionError {
|
||||
DataFusionError::External(Box::new(e))
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Error::InvalidKey { .. }
|
||||
| Error::OpenSystemCatalog { .. }
|
||||
| Error::CreateSystemCatalog { .. }
|
||||
| Error::SchemaNotFound { .. }
|
||||
| Error::TableNotFound { .. }
|
||||
| Error::InvalidEntryType { .. } => StatusCode::Unexpected,
|
||||
Error::SystemCatalog { .. }
|
||||
| Error::SystemCatalogTypeMismatch { .. }
|
||||
| Error::EmptyValue
|
||||
| Error::ValueDeserialize { .. }
|
||||
| Error::CatalogNotFound { .. }
|
||||
| Error::OpenTable { .. }
|
||||
| Error::ReadSystemCatalog { .. } => StatusCode::StorageUnavailable,
|
||||
Error::RegisterTable { .. } | Error::SystemCatalogSchema { .. } => StatusCode::Internal,
|
||||
Error::TableExists { .. } => StatusCode::TableAlreadyExists,
|
||||
}
|
||||
}
|
||||
|
||||
fn backtrace_opt(&self) -> Option<&Backtrace> {
|
||||
ErrorCompat::backtrace(self)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for DataFusionError {
|
||||
fn from(e: Error) -> Self {
|
||||
DataFusionError::Internal(e.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_error::mock::MockError;
|
||||
use datatypes::arrow::datatypes::DataType;
|
||||
use snafu::GenerateImplicitData;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
pub fn test_error_status_code() {
|
||||
assert_eq!(
|
||||
StatusCode::TableAlreadyExists,
|
||||
Error::TableExists {
|
||||
table: "some_table".to_string(),
|
||||
backtrace: Backtrace::generate(),
|
||||
}
|
||||
.status_code()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
StatusCode::Unexpected,
|
||||
Error::InvalidKey { key: None }.status_code()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
StatusCode::Unexpected,
|
||||
Error::OpenSystemCatalog {
|
||||
source: table::error::Error::new(MockError::new(StatusCode::StorageUnavailable))
|
||||
}
|
||||
.status_code()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
StatusCode::Unexpected,
|
||||
Error::CreateSystemCatalog {
|
||||
source: table::error::Error::new(MockError::new(StatusCode::StorageUnavailable))
|
||||
}
|
||||
.status_code()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
StatusCode::StorageUnavailable,
|
||||
Error::SystemCatalog {
|
||||
msg: "".to_string(),
|
||||
backtrace: Backtrace::generate(),
|
||||
}
|
||||
.status_code()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
StatusCode::StorageUnavailable,
|
||||
Error::SystemCatalogTypeMismatch {
|
||||
data_type: DataType::Boolean,
|
||||
source: datatypes::error::Error::UnsupportedArrowType {
|
||||
arrow_type: DataType::Boolean,
|
||||
backtrace: Backtrace::generate()
|
||||
}
|
||||
}
|
||||
.status_code()
|
||||
);
|
||||
assert_eq!(
|
||||
StatusCode::StorageUnavailable,
|
||||
Error::EmptyValue.status_code()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_errors_to_datafusion_error() {
|
||||
let e: DataFusionError = Error::TableExists {
|
||||
table: "test_table".to_string(),
|
||||
backtrace: Backtrace::generate(),
|
||||
}
|
||||
.into();
|
||||
match e {
|
||||
DataFusionError::Internal(_) => {}
|
||||
_ => {
|
||||
panic!("catalog error should be converted to DataFusionError::Internal")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,63 @@
|
||||
mod catalog;
|
||||
pub mod error;
|
||||
pub mod memory;
|
||||
mod schema;
|
||||
#![feature(assert_matches)]
|
||||
|
||||
pub use crate::catalog::{CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef};
|
||||
pub use crate::catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use crate::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
pub use crate::manager::LocalCatalogManager;
|
||||
pub use crate::schema::{SchemaProvider, SchemaProviderRef};
|
||||
mod consts;
|
||||
pub mod error;
|
||||
mod manager;
|
||||
pub mod memory;
|
||||
pub mod schema;
|
||||
mod system;
|
||||
mod tables;
|
||||
|
||||
/// Represent a list of named catalogs
|
||||
pub trait CatalogList: Sync + Send {
|
||||
/// Returns the catalog list as [`Any`](std::any::Any)
|
||||
/// so that it can be downcast to a specific implementation.
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
|
||||
/// Adds a new catalog to this catalog list
|
||||
/// If a catalog of the same name existed before, it is replaced in the list and returned.
|
||||
fn register_catalog(
|
||||
&self,
|
||||
name: String,
|
||||
catalog: CatalogProviderRef,
|
||||
) -> Option<CatalogProviderRef>;
|
||||
|
||||
/// Retrieves the list of available catalog names
|
||||
fn catalog_names(&self) -> Vec<String>;
|
||||
|
||||
/// Retrieves a specific catalog by name, provided it exists.
|
||||
fn catalog(&self, name: &str) -> Option<CatalogProviderRef>;
|
||||
}
|
||||
|
||||
/// Represents a catalog, comprising a number of named schemas.
|
||||
pub trait CatalogProvider: Sync + Send {
|
||||
/// Returns the catalog provider as [`Any`](std::any::Any)
|
||||
/// so that it can be downcast to a specific implementation.
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
|
||||
/// Retrieves the list of available schema names in this catalog.
|
||||
fn schema_names(&self) -> Vec<String>;
|
||||
|
||||
/// Registers schema to this catalog.
|
||||
fn register_schema(&self, name: String, schema: SchemaProviderRef)
|
||||
-> Option<SchemaProviderRef>;
|
||||
|
||||
/// Retrieves a specific schema from the catalog by name, provided it exists.
|
||||
fn schema(&self, name: &str) -> Option<SchemaProviderRef>;
|
||||
}
|
||||
|
||||
pub type CatalogListRef = Arc<dyn CatalogList>;
|
||||
pub type CatalogProviderRef = Arc<dyn CatalogProvider>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait CatalogManager: CatalogList {
|
||||
async fn start(&self) -> error::Result<()>;
|
||||
}
|
||||
|
||||
pub type CatalogManagerRef = Arc<dyn CatalogManager>;
|
||||
|
||||
233
src/catalog/src/manager.rs
Normal file
233
src/catalog/src/manager.rs
Normal file
@@ -0,0 +1,233 @@
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_recordbatch::RecordBatch;
|
||||
use common_telemetry::info;
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::vectors::{BinaryVector, UInt8Vector};
|
||||
use futures_util::StreamExt;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::engine::{EngineContext, TableEngineRef};
|
||||
use table::requests::OpenTableRequest;
|
||||
use table::table::numbers::NumbersTable;
|
||||
|
||||
use super::error::Result;
|
||||
use crate::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME};
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, SchemaNotFoundSnafu,
|
||||
SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use crate::system::{decode_system_catalog, Entry, SystemCatalogTable, TableEntry};
|
||||
use crate::tables::SystemCatalog;
|
||||
use crate::{
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, SchemaProvider,
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
|
||||
};
|
||||
|
||||
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
|
||||
pub struct LocalCatalogManager {
|
||||
system: Arc<SystemCatalog>,
|
||||
catalogs: Arc<MemoryCatalogList>,
|
||||
engine: TableEngineRef,
|
||||
}
|
||||
|
||||
impl LocalCatalogManager {
|
||||
/// Create a new [CatalogManager] with given user catalogs and table engine
|
||||
pub async fn try_new(engine: TableEngineRef) -> Result<Self> {
|
||||
let table = SystemCatalogTable::new(engine.clone()).await?;
|
||||
let memory_catalog_list = crate::memory::new_memory_catalog_list()?;
|
||||
let system_catalog = Arc::new(SystemCatalog::new(
|
||||
table,
|
||||
memory_catalog_list.clone(),
|
||||
engine.clone(),
|
||||
));
|
||||
Ok(Self {
|
||||
system: system_catalog,
|
||||
catalogs: memory_catalog_list,
|
||||
engine,
|
||||
})
|
||||
}
|
||||
|
||||
/// Scan all entries from system catalog table
|
||||
pub async fn init(&self) -> Result<()> {
|
||||
self.init_system_catalog()?;
|
||||
let mut system_records = self.system.information_schema.system.records().await?;
|
||||
while let Some(records) = system_records
|
||||
.next()
|
||||
.await
|
||||
.transpose()
|
||||
.context(ReadSystemCatalogSnafu)?
|
||||
{
|
||||
self.handle_system_catalog_entries(records).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn init_system_catalog(&self) -> Result<()> {
|
||||
let system_schema = Arc::new(MemorySchemaProvider::new());
|
||||
system_schema.register_table(
|
||||
SYSTEM_CATALOG_TABLE_NAME.to_string(),
|
||||
self.system.information_schema.system.clone(),
|
||||
)?;
|
||||
let system_catalog = Arc::new(MemoryCatalogProvider::new());
|
||||
system_catalog.register_schema(INFORMATION_SCHEMA_NAME.to_string(), system_schema);
|
||||
self.catalogs
|
||||
.register_catalog(SYSTEM_CATALOG_NAME.to_string(), system_catalog);
|
||||
|
||||
let default_catalog = Arc::new(MemoryCatalogProvider::new());
|
||||
let default_schema = Arc::new(MemorySchemaProvider::new());
|
||||
|
||||
// Add numbers table for test
|
||||
// TODO(hl): remove this registration
|
||||
let table = Arc::new(NumbersTable::default());
|
||||
default_schema.register_table("numbers".to_string(), table)?;
|
||||
|
||||
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema);
|
||||
self.catalogs
|
||||
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_system_catalog_entries(&self, records: RecordBatch) -> Result<()> {
|
||||
ensure!(
|
||||
records.df_recordbatch.columns().len() >= 4,
|
||||
SystemCatalogSnafu {
|
||||
msg: format!(
|
||||
"Length mismatch: {}",
|
||||
records.df_recordbatch.columns().len()
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
let entry_type = UInt8Vector::try_from_arrow_array(&records.df_recordbatch.columns()[0])
|
||||
.with_context(|_| SystemCatalogTypeMismatchSnafu {
|
||||
data_type: records.df_recordbatch.columns()[0].data_type().clone(),
|
||||
})?;
|
||||
|
||||
let key = BinaryVector::try_from_arrow_array(&records.df_recordbatch.columns()[1])
|
||||
.with_context(|_| SystemCatalogTypeMismatchSnafu {
|
||||
data_type: records.df_recordbatch.columns()[1].data_type().clone(),
|
||||
})?;
|
||||
|
||||
let value = BinaryVector::try_from_arrow_array(&records.df_recordbatch.columns()[1])
|
||||
.with_context(|_| SystemCatalogTypeMismatchSnafu {
|
||||
data_type: records.df_recordbatch.columns()[3].data_type().clone(),
|
||||
})?;
|
||||
|
||||
for ((t, k), v) in entry_type
|
||||
.iter_data()
|
||||
.zip(key.iter_data())
|
||||
.zip(value.iter_data())
|
||||
{
|
||||
let entry = decode_system_catalog(t, k, v)?;
|
||||
match entry {
|
||||
Entry::Catalog(c) => {
|
||||
self.catalogs.register_catalog_if_absent(
|
||||
c.catalog_name.clone(),
|
||||
Arc::new(MemoryCatalogProvider::new()),
|
||||
);
|
||||
info!("Register catalog: {}", c.catalog_name);
|
||||
}
|
||||
Entry::Schema(s) => {
|
||||
let catalog =
|
||||
self.catalogs
|
||||
.catalog(&s.catalog_name)
|
||||
.context(CatalogNotFoundSnafu {
|
||||
catalog_name: &s.catalog_name,
|
||||
})?;
|
||||
catalog.register_schema(
|
||||
s.schema_name.clone(),
|
||||
Arc::new(MemorySchemaProvider::new()),
|
||||
);
|
||||
info!("Registered schema: {:?}", s);
|
||||
}
|
||||
Entry::Table(t) => {
|
||||
self.open_and_register_table(&t).await?;
|
||||
info!("Registered table: {:?}", t)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> {
|
||||
let catalog = self
|
||||
.catalogs
|
||||
.catalog(&t.catalog_name)
|
||||
.context(CatalogNotFoundSnafu {
|
||||
catalog_name: &t.catalog_name,
|
||||
})?;
|
||||
let schema = catalog
|
||||
.schema(&t.schema_name)
|
||||
.context(SchemaNotFoundSnafu {
|
||||
schema_info: format!("{}.{}", &t.catalog_name, &t.schema_name),
|
||||
})?;
|
||||
|
||||
let context = EngineContext {};
|
||||
let request = OpenTableRequest {
|
||||
catalog_name: t.catalog_name.clone(),
|
||||
schema_name: t.schema_name.clone(),
|
||||
table_name: t.table_name.clone(),
|
||||
table_id: t.table_id,
|
||||
};
|
||||
|
||||
let option = self
|
||||
.engine
|
||||
.open_table(&context, request)
|
||||
.await
|
||||
.with_context(|_| OpenTableSnafu {
|
||||
table_info: format!(
|
||||
"{}.{}.{}, id: {}",
|
||||
&t.catalog_name, &t.schema_name, &t.table_name, t.table_id
|
||||
),
|
||||
})?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_info: format!(
|
||||
"{}.{}.{}, id: {}",
|
||||
&t.catalog_name, &t.schema_name, &t.table_name, t.table_id
|
||||
),
|
||||
})?;
|
||||
|
||||
schema.register_table(t.table_name.clone(), option)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl CatalogList for LocalCatalogManager {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn register_catalog(
|
||||
&self,
|
||||
name: String,
|
||||
catalog: CatalogProviderRef,
|
||||
) -> Option<Arc<dyn CatalogProvider>> {
|
||||
self.catalogs.register_catalog(name, catalog)
|
||||
}
|
||||
|
||||
fn catalog_names(&self) -> Vec<String> {
|
||||
let mut res = self.catalogs.catalog_names();
|
||||
res.push(SYSTEM_CATALOG_NAME.to_string());
|
||||
res
|
||||
}
|
||||
|
||||
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
|
||||
if name.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) {
|
||||
Some(self.system.clone())
|
||||
} else {
|
||||
self.catalogs.catalog(name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CatalogManager for LocalCatalogManager {
|
||||
/// Start [MemoryCatalogManager] to load all information from system catalog table.
|
||||
/// Make sure table engine is initialized before starting [MemoryCatalogManager].
|
||||
async fn start(&self) -> Result<()> {
|
||||
self.init().await
|
||||
}
|
||||
}
|
||||
@@ -1,47 +1,14 @@
|
||||
use std::any::Any;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use common_error::prelude::*;
|
||||
use table::table::numbers::NumbersTable;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::catalog::{
|
||||
CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME,
|
||||
DEFAULT_SCHEMA_NAME,
|
||||
};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::error::{Result, TableExistsSnafu};
|
||||
use crate::schema::SchemaProvider;
|
||||
|
||||
/// Error implementation of memory catalog.
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum InnerError {
|
||||
#[snafu(display("Table {} already exists", table))]
|
||||
TableExists { table: String, backtrace: Backtrace },
|
||||
}
|
||||
|
||||
impl ErrorExt for InnerError {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
InnerError::TableExists { .. } => StatusCode::TableAlreadyExists,
|
||||
}
|
||||
}
|
||||
|
||||
fn backtrace_opt(&self) -> Option<&Backtrace> {
|
||||
ErrorCompat::backtrace(self)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerError> for Error {
|
||||
fn from(err: InnerError) -> Self {
|
||||
Self::new(err)
|
||||
}
|
||||
}
|
||||
use crate::{CatalogList, CatalogProvider, CatalogProviderRef, SchemaProviderRef};
|
||||
|
||||
/// Simple in-memory list of catalogs
|
||||
#[derive(Default)]
|
||||
@@ -50,6 +17,25 @@ pub struct MemoryCatalogList {
|
||||
pub catalogs: RwLock<HashMap<String, CatalogProviderRef>>,
|
||||
}
|
||||
|
||||
impl MemoryCatalogList {
|
||||
/// Registers a catalog and return `None` if no catalog with the same name was already
|
||||
/// registered, or `Some` with the previously registered catalog.
|
||||
pub fn register_catalog_if_absent(
|
||||
&self,
|
||||
name: String,
|
||||
catalog: Arc<dyn CatalogProvider>,
|
||||
) -> Option<CatalogProviderRef> {
|
||||
let mut catalogs = self.catalogs.write().unwrap();
|
||||
match catalogs.entry(name) {
|
||||
Entry::Occupied(v) => Some(v.get().clone()),
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(catalog);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CatalogList for MemoryCatalogList {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
@@ -93,15 +79,6 @@ impl MemoryCatalogProvider {
|
||||
schemas: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_schema(
|
||||
&self,
|
||||
name: impl Into<String>,
|
||||
schema: Arc<dyn SchemaProvider>,
|
||||
) -> Option<Arc<dyn SchemaProvider>> {
|
||||
let mut schemas = self.schemas.write().unwrap();
|
||||
schemas.insert(name.into(), schema)
|
||||
}
|
||||
}
|
||||
|
||||
impl CatalogProvider for MemoryCatalogProvider {
|
||||
@@ -114,6 +91,15 @@ impl CatalogProvider for MemoryCatalogProvider {
|
||||
schemas.keys().cloned().collect()
|
||||
}
|
||||
|
||||
fn register_schema(
|
||||
&self,
|
||||
name: String,
|
||||
schema: SchemaProviderRef,
|
||||
) -> Option<SchemaProviderRef> {
|
||||
let mut schemas = self.schemas.write().unwrap();
|
||||
schemas.insert(name, schema)
|
||||
}
|
||||
|
||||
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
|
||||
let schemas = self.schemas.read().unwrap();
|
||||
schemas.get(name).cloned()
|
||||
@@ -175,36 +161,39 @@ impl SchemaProvider for MemorySchemaProvider {
|
||||
}
|
||||
|
||||
/// Create a memory catalog list contains a numbers table for test
|
||||
pub fn new_memory_catalog_list() -> Result<CatalogListRef> {
|
||||
let schema_provider = Arc::new(MemorySchemaProvider::new());
|
||||
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
|
||||
let catalog_list = Arc::new(MemoryCatalogList::default());
|
||||
|
||||
// Add numbers table for test
|
||||
let table = Arc::new(NumbersTable::default());
|
||||
schema_provider.register_table("numbers".to_string(), table)?;
|
||||
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME, schema_provider);
|
||||
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
|
||||
|
||||
Ok(catalog_list)
|
||||
pub fn new_memory_catalog_list() -> Result<Arc<MemoryCatalogList>> {
|
||||
Ok(Arc::new(MemoryCatalogList::default()))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::prelude::StatusCode;
|
||||
use table::table::numbers::NumbersTable;
|
||||
|
||||
use super::*;
|
||||
use crate::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
|
||||
#[test]
|
||||
fn test_new_memory_catalog_list() {
|
||||
let catalog_list = new_memory_catalog_list().unwrap();
|
||||
|
||||
let catalog_provider = catalog_list.catalog(DEFAULT_CATALOG_NAME).unwrap();
|
||||
let schema_provider = catalog_provider.schema(DEFAULT_SCHEMA_NAME).unwrap();
|
||||
assert!(catalog_list.catalog(DEFAULT_CATALOG_NAME).is_none());
|
||||
let default_catalog = Arc::new(MemoryCatalogProvider::default());
|
||||
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog.clone());
|
||||
|
||||
let table = schema_provider.table("numbers");
|
||||
assert!(default_catalog.schema(DEFAULT_SCHEMA_NAME).is_none());
|
||||
let default_schema = Arc::new(MemorySchemaProvider::default());
|
||||
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone());
|
||||
|
||||
default_schema
|
||||
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
|
||||
.unwrap();
|
||||
|
||||
let table = default_schema.table("numbers");
|
||||
assert!(table.is_some());
|
||||
|
||||
assert!(schema_provider.table("not_exists").is_none());
|
||||
assert!(default_schema.table("not_exists").is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -226,4 +215,21 @@ mod tests {
|
||||
assert!(err.backtrace_opt().is_some());
|
||||
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_register_if_absent() {
|
||||
let list = MemoryCatalogList::default();
|
||||
assert!(list
|
||||
.register_catalog_if_absent(
|
||||
"test_catalog".to_string(),
|
||||
Arc::new(MemoryCatalogProvider::new())
|
||||
)
|
||||
.is_none());
|
||||
list.register_catalog_if_absent(
|
||||
"test_catalog".to_string(),
|
||||
Arc::new(MemoryCatalogProvider::new()),
|
||||
)
|
||||
.unwrap();
|
||||
list.as_any().downcast_ref::<MemoryCatalogList>().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
303
src/catalog/src/system.rs
Normal file
303
src/catalog/src/system.rs
Normal file
@@ -0,0 +1,303 @@
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::logical_plan::Expr;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::engine::{EngineContext, TableEngineRef};
|
||||
use table::requests::{CreateTableRequest, OpenTableRequest};
|
||||
use table::{Table, TableRef};
|
||||
|
||||
use crate::consts::{
|
||||
INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID,
|
||||
SYSTEM_CATALOG_TABLE_NAME,
|
||||
};
|
||||
use crate::error::{
|
||||
CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu,
|
||||
OpenSystemCatalogSnafu, Result, SystemCatalogSchemaSnafu, ValueDeserializeSnafu,
|
||||
};
|
||||
|
||||
pub struct SystemCatalogTable {
|
||||
schema: SchemaRef,
|
||||
pub table: TableRef,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Table for SystemCatalogTable {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
_projection: &Option<Vec<usize>>,
|
||||
_filters: &[Expr],
|
||||
_limit: Option<usize>,
|
||||
) -> table::Result<SendableRecordBatchStream> {
|
||||
panic!("System catalog table does not support scan!")
|
||||
}
|
||||
}
|
||||
|
||||
impl SystemCatalogTable {
|
||||
pub async fn new(engine: TableEngineRef) -> Result<Self> {
|
||||
let request = OpenTableRequest {
|
||||
catalog_name: SYSTEM_CATALOG_NAME.to_string(),
|
||||
schema_name: INFORMATION_SCHEMA_NAME.to_string(),
|
||||
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
|
||||
table_id: SYSTEM_CATALOG_TABLE_ID,
|
||||
};
|
||||
let schema = Arc::new(build_system_catalog_schema()?);
|
||||
let ctx = EngineContext::default();
|
||||
|
||||
if let Some(table) = engine
|
||||
.open_table(&ctx, request)
|
||||
.await
|
||||
.context(OpenSystemCatalogSnafu)?
|
||||
{
|
||||
Ok(Self { table, schema })
|
||||
} else {
|
||||
// system catalog table is not yet created, try to create
|
||||
let request = CreateTableRequest {
|
||||
name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
|
||||
desc: Some("System catalog table".to_string()),
|
||||
schema: schema.clone(),
|
||||
primary_key_indices: vec![0, 1, 2],
|
||||
create_if_not_exists: true,
|
||||
};
|
||||
|
||||
let table = engine
|
||||
.create_table(&ctx, request)
|
||||
.await
|
||||
.context(CreateSystemCatalogSnafu)?;
|
||||
Ok(Self { table, schema })
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a stream of all entries inside system catalog table
|
||||
pub async fn records(&self) -> Result<SendableRecordBatchStream> {
|
||||
let full_projection = None;
|
||||
let stream = self.table.scan(&full_projection, &[], None).await.unwrap();
|
||||
Ok(stream)
|
||||
}
|
||||
}
|
||||
|
||||
/// Build system catalog table schema.
|
||||
/// A system catalog table consists of 4 columns, namely
|
||||
/// - entry_type: type of entry in current row, can be any variant of [EntryType].
|
||||
/// - key: a binary encoded key of entry, differs according to different entry type.
|
||||
/// - timestamp: currently not used.
|
||||
/// - value: JSON-encoded value of entry's metadata.
|
||||
fn build_system_catalog_schema() -> Result<Schema> {
|
||||
let mut cols = Vec::with_capacity(6);
|
||||
cols.push(ColumnSchema::new(
|
||||
"entry_type".to_string(),
|
||||
ConcreteDataType::uint8_datatype(),
|
||||
false,
|
||||
));
|
||||
cols.push(ColumnSchema::new(
|
||||
"key".to_string(),
|
||||
ConcreteDataType::binary_datatype(),
|
||||
false,
|
||||
));
|
||||
cols.push(ColumnSchema::new(
|
||||
"timestamp".to_string(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
false,
|
||||
));
|
||||
cols.push(ColumnSchema::new(
|
||||
"value".to_string(),
|
||||
ConcreteDataType::binary_datatype(),
|
||||
false,
|
||||
));
|
||||
Schema::with_timestamp_index(cols, 2).context(SystemCatalogSchemaSnafu)
|
||||
}
|
||||
|
||||
pub fn decode_system_catalog(
|
||||
entry_type: Option<u8>,
|
||||
key: Option<&[u8]>,
|
||||
value: Option<&[u8]>,
|
||||
) -> Result<Entry> {
|
||||
let entry_type = entry_type.context(InvalidKeySnafu { key: None })?;
|
||||
let key = String::from_utf8_lossy(key.context(InvalidKeySnafu { key: None })?);
|
||||
|
||||
match EntryType::try_from(entry_type)? {
|
||||
EntryType::Catalog => {
|
||||
// As for catalog entry, the key is a string with format: `<catalog_name>`
|
||||
// and the value is current not used.
|
||||
let catalog_name = key.to_string();
|
||||
Ok(Entry::Catalog(CatalogEntry { catalog_name }))
|
||||
}
|
||||
EntryType::Schema => {
|
||||
// As for schema entry, the key is a string with format: `<catalog_name>.<schema_name>`
|
||||
// and the value is current not used.
|
||||
let schema_parts = key.split('.').collect::<Vec<_>>();
|
||||
ensure!(
|
||||
schema_parts.len() == 2,
|
||||
InvalidKeySnafu {
|
||||
key: Some(key.to_string())
|
||||
}
|
||||
);
|
||||
Ok(Entry::Schema(SchemaEntry {
|
||||
catalog_name: schema_parts[0].to_string(),
|
||||
schema_name: schema_parts[1].to_string(),
|
||||
}))
|
||||
}
|
||||
|
||||
EntryType::Table => {
|
||||
// As for table entry, the key is a string with format: `<catalog_name>.<schema_name>.<table_name>`
|
||||
// and the value is a JSON string with format: `{"table_id": <table_id>}`
|
||||
let table_parts = key.split('.').collect::<Vec<_>>();
|
||||
ensure!(
|
||||
table_parts.len() >= 3,
|
||||
InvalidKeySnafu {
|
||||
key: Some(key.to_string())
|
||||
}
|
||||
);
|
||||
let value = value.context(EmptyValueSnafu)?;
|
||||
let table_meta: TableEntryValue =
|
||||
serde_json::from_slice(value).context(ValueDeserializeSnafu)?;
|
||||
Ok(Entry::Table(TableEntry {
|
||||
catalog_name: table_parts[0].to_string(),
|
||||
schema_name: table_parts[1].to_string(),
|
||||
table_name: table_parts[2].to_string(),
|
||||
table_id: table_meta.table_id,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||
pub enum EntryType {
|
||||
Catalog = 1,
|
||||
Schema = 2,
|
||||
Table = 3,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for EntryType {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
|
||||
match value {
|
||||
b if b == Self::Catalog as u8 => Ok(Self::Catalog),
|
||||
b if b == Self::Schema as u8 => Ok(Self::Schema),
|
||||
b if b == Self::Table as u8 => Ok(Self::Table),
|
||||
b => InvalidEntryTypeSnafu {
|
||||
entry_type: Some(b),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Entry {
|
||||
Catalog(CatalogEntry),
|
||||
Schema(SchemaEntry),
|
||||
Table(TableEntry),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct CatalogEntry {
|
||||
pub catalog_name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct SchemaEntry {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct TableEntry {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub table_id: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct TableEntryValue {
|
||||
pub table_id: u64,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
pub fn test_decode_catalog_enrty() {
|
||||
let entry = decode_system_catalog(
|
||||
Some(EntryType::Catalog as u8),
|
||||
Some("some_catalog".as_bytes()),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
if let Entry::Catalog(e) = entry {
|
||||
assert_eq!("some_catalog", e.catalog_name);
|
||||
} else {
|
||||
panic!("Unexpected type: {:?}", entry);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_decode_schema_entry() {
|
||||
let entry = decode_system_catalog(
|
||||
Some(EntryType::Schema as u8),
|
||||
Some("some_catalog.some_schema".as_bytes()),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
if let Entry::Schema(e) = entry {
|
||||
assert_eq!("some_catalog", e.catalog_name);
|
||||
assert_eq!("some_schema", e.schema_name);
|
||||
} else {
|
||||
panic!("Unexpected type: {:?}", entry);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_decode_table() {
|
||||
let entry = decode_system_catalog(
|
||||
Some(EntryType::Table as u8),
|
||||
Some("some_catalog.some_schema.some_table".as_bytes()),
|
||||
Some("{\"table_id\":42}".as_bytes()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
if let Entry::Table(e) = entry {
|
||||
assert_eq!("some_catalog", e.catalog_name);
|
||||
assert_eq!("some_schema", e.schema_name);
|
||||
assert_eq!("some_table", e.table_name);
|
||||
assert_eq!(42, e.table_id);
|
||||
} else {
|
||||
panic!("Unexpected type: {:?}", entry);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
pub fn test_decode_mismatch() {
|
||||
decode_system_catalog(
|
||||
Some(EntryType::Table as u8),
|
||||
Some("some_catalog.some_schema.some_table".as_bytes()),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_entry_type() {
|
||||
assert_eq!(EntryType::Catalog, EntryType::try_from(1).unwrap());
|
||||
assert_eq!(EntryType::Schema, EntryType::try_from(2).unwrap());
|
||||
assert_eq!(EntryType::Table, EntryType::try_from(3).unwrap());
|
||||
assert!(EntryType::try_from(4).is_err());
|
||||
}
|
||||
}
|
||||
251
src/catalog/src/tables.rs
Normal file
251
src/catalog/src/tables.rs
Normal file
@@ -0,0 +1,251 @@
|
||||
// The `tables` table in system catalog keeps a record of all tables created by user.
|
||||
|
||||
use std::any::Any;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use async_stream::stream;
|
||||
use common_query::logical_plan::Expr;
|
||||
use common_recordbatch::error::Result as RecordBatchResult;
|
||||
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
|
||||
use datatypes::prelude::{ConcreteDataType, VectorBuilder};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use futures::Stream;
|
||||
use table::engine::TableEngineRef;
|
||||
use table::{Table, TableRef};
|
||||
|
||||
use crate::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME};
|
||||
use crate::system::SystemCatalogTable;
|
||||
use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef};
|
||||
|
||||
/// Tables holds all tables created by user.
|
||||
pub struct Tables {
|
||||
schema: SchemaRef,
|
||||
catalogs: CatalogListRef,
|
||||
engine: TableEngineRef,
|
||||
}
|
||||
|
||||
impl Tables {
|
||||
pub fn new(catalogs: CatalogListRef, engine: TableEngineRef) -> Self {
|
||||
Self {
|
||||
schema: Arc::new(build_schema_for_tables()),
|
||||
catalogs,
|
||||
engine,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Table for Tables {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
_projection: &Option<Vec<usize>>,
|
||||
_filters: &[Expr],
|
||||
_limit: Option<usize>,
|
||||
) -> table::error::Result<SendableRecordBatchStream> {
|
||||
let catalogs = self.catalogs.clone();
|
||||
let schema_ref = self.schema.clone();
|
||||
let engine_name = self.engine.name().to_string();
|
||||
|
||||
let stream = stream!({
|
||||
for catalog_name in catalogs.catalog_names() {
|
||||
let catalog = catalogs.catalog(&catalog_name).unwrap();
|
||||
for schema_name in catalog.schema_names() {
|
||||
let mut tables_in_schema = Vec::with_capacity(catalog.schema_names().len());
|
||||
let schema = catalog.schema(&schema_name).unwrap();
|
||||
for table_name in schema.table_names() {
|
||||
tables_in_schema.push(table_name);
|
||||
}
|
||||
|
||||
let vec = tables_to_record_batch(
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
tables_in_schema,
|
||||
&engine_name,
|
||||
);
|
||||
let record_batch_res = RecordBatch::new(schema_ref.clone(), vec);
|
||||
yield record_batch_res;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Box::pin(TablesRecordBatchStream {
|
||||
schema: self.schema.clone(),
|
||||
stream: Box::pin(stream),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert tables info to `RecordBatch`.
|
||||
fn tables_to_record_batch(
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
table_names: Vec<String>,
|
||||
engine: &str,
|
||||
) -> Vec<VectorRef> {
|
||||
let mut catalog_vec =
|
||||
VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len());
|
||||
let mut schema_vec =
|
||||
VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len());
|
||||
let mut table_name_vec =
|
||||
VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len());
|
||||
let mut engine_vec =
|
||||
VectorBuilder::with_capacity(ConcreteDataType::string_datatype(), table_names.len());
|
||||
|
||||
for table_name in table_names {
|
||||
catalog_vec.push(&Value::String(catalog_name.into()));
|
||||
schema_vec.push(&Value::String(schema_name.into()));
|
||||
table_name_vec.push(&Value::String(table_name.into()));
|
||||
engine_vec.push(&Value::String(engine.into()));
|
||||
}
|
||||
|
||||
vec![
|
||||
catalog_vec.finish(),
|
||||
schema_vec.finish(),
|
||||
table_name_vec.finish(),
|
||||
engine_vec.finish(),
|
||||
]
|
||||
}
|
||||
|
||||
pub struct TablesRecordBatchStream {
|
||||
schema: SchemaRef,
|
||||
stream: Pin<Box<dyn Stream<Item = RecordBatchResult<RecordBatch>> + Send>>,
|
||||
}
|
||||
|
||||
impl Stream for TablesRecordBatchStream {
|
||||
type Item = RecordBatchResult<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
Pin::new(&mut self.stream).poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for TablesRecordBatchStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InformationSchema {
|
||||
pub tables: Arc<Tables>,
|
||||
pub system: Arc<SystemCatalogTable>,
|
||||
}
|
||||
|
||||
impl SchemaProvider for InformationSchema {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn table_names(&self) -> Vec<String> {
|
||||
vec!["tables".to_string(), SYSTEM_CATALOG_TABLE_NAME.to_string()]
|
||||
}
|
||||
|
||||
fn table(&self, name: &str) -> Option<TableRef> {
|
||||
if name.eq_ignore_ascii_case("tables") {
|
||||
Some(self.tables.clone())
|
||||
} else if name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME) {
|
||||
Some(self.system.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn register_table(
|
||||
&self,
|
||||
_name: String,
|
||||
_table: TableRef,
|
||||
) -> crate::error::Result<Option<TableRef>> {
|
||||
panic!("System catalog & schema does not support register table")
|
||||
}
|
||||
|
||||
fn deregister_table(&self, _name: &str) -> crate::error::Result<Option<TableRef>> {
|
||||
panic!("System catalog & schema does not support deregister table")
|
||||
}
|
||||
|
||||
fn table_exist(&self, name: &str) -> bool {
|
||||
name.eq_ignore_ascii_case("tables") || name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SystemCatalog {
|
||||
pub information_schema: Arc<InformationSchema>,
|
||||
}
|
||||
|
||||
impl SystemCatalog {
|
||||
pub fn new(
|
||||
system: SystemCatalogTable,
|
||||
catalogs: CatalogListRef,
|
||||
engine: TableEngineRef,
|
||||
) -> Self {
|
||||
let schema = InformationSchema {
|
||||
tables: Arc::new(Tables::new(catalogs, engine)),
|
||||
system: Arc::new(system),
|
||||
};
|
||||
Self {
|
||||
information_schema: Arc::new(schema),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CatalogProvider for SystemCatalog {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema_names(&self) -> Vec<String> {
|
||||
vec![INFORMATION_SCHEMA_NAME.to_string()]
|
||||
}
|
||||
|
||||
fn register_schema(
|
||||
&self,
|
||||
_name: String,
|
||||
_schema: SchemaProviderRef,
|
||||
) -> Option<SchemaProviderRef> {
|
||||
panic!("System catalog does not support registering schema!")
|
||||
}
|
||||
|
||||
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
|
||||
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA_NAME) {
|
||||
Some(self.information_schema.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn build_schema_for_tables() -> Schema {
|
||||
let cols = vec![
|
||||
ColumnSchema::new(
|
||||
"catalog".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
"schema".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
"table_name".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
"engine".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
];
|
||||
Schema::new(cols)
|
||||
}
|
||||
@@ -1,9 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::CatalogListRef;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{NewCatalogSnafu, Result};
|
||||
use crate::error::Result;
|
||||
use crate::instance::{Instance, InstanceRef};
|
||||
use crate::server::Services;
|
||||
|
||||
@@ -28,19 +25,16 @@ impl Default for DatanodeOptions {
|
||||
pub struct Datanode {
|
||||
opts: DatanodeOptions,
|
||||
services: Services,
|
||||
_catalog_list: CatalogListRef,
|
||||
instance: InstanceRef,
|
||||
}
|
||||
|
||||
impl Datanode {
|
||||
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
|
||||
let catalog_list = catalog::memory::new_memory_catalog_list().context(NewCatalogSnafu)?;
|
||||
let instance = Arc::new(Instance::new(&opts, catalog_list.clone()).await?);
|
||||
let instance = Arc::new(Instance::new(&opts).await?);
|
||||
|
||||
Ok(Self {
|
||||
opts,
|
||||
services: Services::new(instance.clone()),
|
||||
_catalog_list: catalog_list,
|
||||
instance,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -166,9 +166,9 @@ mod tests {
|
||||
}
|
||||
|
||||
fn throw_catalog_error() -> std::result::Result<(), catalog::error::Error> {
|
||||
Err(catalog::error::Error::new(MockError::with_backtrace(
|
||||
StatusCode::Internal,
|
||||
)))
|
||||
Err(catalog::error::Error::RegisterTable {
|
||||
source: BoxedError::new(MockError::with_backtrace(StatusCode::Internal)),
|
||||
})
|
||||
}
|
||||
|
||||
fn assert_internal_error(err: &Error) {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{fs, path, sync::Arc};
|
||||
|
||||
use api::v1::InsertExpr;
|
||||
use catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_telemetry::logging::info;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
@@ -17,7 +17,8 @@ use table_engine::engine::MitoEngine;
|
||||
|
||||
use crate::datanode::DatanodeOptions;
|
||||
use crate::error::{
|
||||
self, CreateTableSnafu, ExecuteSqlSnafu, InsertSnafu, Result, TableNotFoundSnafu,
|
||||
self, CreateTableSnafu, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result,
|
||||
TableNotFoundSnafu,
|
||||
};
|
||||
use crate::server::grpc::insert::insertion_expr_to_request;
|
||||
use crate::sql::SqlHandler;
|
||||
@@ -31,33 +32,38 @@ pub struct Instance {
|
||||
table_engine: DefaultEngine,
|
||||
sql_handler: SqlHandler<DefaultEngine>,
|
||||
// Catalog list
|
||||
catalog_list: CatalogListRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
}
|
||||
|
||||
pub type InstanceRef = Arc<Instance>;
|
||||
|
||||
impl Instance {
|
||||
pub async fn new(opts: &DatanodeOptions, catalog_list: CatalogListRef) -> Result<Self> {
|
||||
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
|
||||
let log_store = create_local_file_log_store(opts).await?;
|
||||
let factory = QueryEngineFactory::new(catalog_list.clone());
|
||||
let query_engine = factory.query_engine().clone();
|
||||
let table_engine = DefaultEngine::new(
|
||||
EngineImpl::new(EngineConfig::default(), Arc::new(log_store))
|
||||
.await
|
||||
.context(error::OpenStorageEngineSnafu)?,
|
||||
);
|
||||
let catalog_manager = Arc::new(
|
||||
catalog::LocalCatalogManager::try_new(Arc::new(table_engine.clone()))
|
||||
.await
|
||||
.context(NewCatalogSnafu)?,
|
||||
);
|
||||
let factory = QueryEngineFactory::new(catalog_manager.clone());
|
||||
let query_engine = factory.query_engine().clone();
|
||||
|
||||
Ok(Self {
|
||||
query_engine,
|
||||
sql_handler: SqlHandler::new(table_engine.clone()),
|
||||
table_engine,
|
||||
catalog_list,
|
||||
catalog_manager,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn execute_grpc_insert(&self, insert_expr: InsertExpr) -> Result<Output> {
|
||||
let schema_provider = self
|
||||
.catalog_list
|
||||
.catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.unwrap()
|
||||
.schema(DEFAULT_SCHEMA_NAME)
|
||||
@@ -98,7 +104,7 @@ impl Instance {
|
||||
}
|
||||
Statement::Insert(_) => {
|
||||
let schema_provider = self
|
||||
.catalog_list
|
||||
.catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.unwrap()
|
||||
.schema(DEFAULT_SCHEMA_NAME)
|
||||
@@ -114,6 +120,10 @@ impl Instance {
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
self.catalog_manager
|
||||
.start()
|
||||
.await
|
||||
.context(NewCatalogSnafu)?;
|
||||
// FIXME(dennis): create a demo table for test
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
|
||||
@@ -142,7 +152,7 @@ impl Instance {
|
||||
.context(CreateTableSnafu { table_name })?;
|
||||
|
||||
let schema_provider = self
|
||||
.catalog_list
|
||||
.catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.unwrap()
|
||||
.schema(DEFAULT_SCHEMA_NAME)
|
||||
@@ -185,9 +195,9 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execute_insert() {
|
||||
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
|
||||
let instance = Instance::new(&opts, catalog_list).await.unwrap();
|
||||
let instance = Instance::new(&opts).await.unwrap();
|
||||
instance.start().await.unwrap();
|
||||
|
||||
let output = instance
|
||||
@@ -205,9 +215,9 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execute_query() {
|
||||
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
|
||||
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
|
||||
let instance = Instance::new(&opts, catalog_list).await.unwrap();
|
||||
let instance = Instance::new(&opts).await.unwrap();
|
||||
instance.start().await.unwrap();
|
||||
|
||||
let output = instance
|
||||
.execute_sql("select sum(number) from numbers limit 20")
|
||||
|
||||
@@ -59,9 +59,9 @@ mod tests {
|
||||
}
|
||||
|
||||
async fn create_extension() -> Extension<InstanceRef> {
|
||||
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
|
||||
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
|
||||
let instance = Arc::new(Instance::new(&opts, catalog_list).await.unwrap());
|
||||
let instance = Arc::new(Instance::new(&opts).await.unwrap());
|
||||
instance.start().await.unwrap();
|
||||
Extension(instance)
|
||||
}
|
||||
|
||||
@@ -82,6 +82,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sql_output_rows() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let query = create_params();
|
||||
let extension = create_extension().await;
|
||||
|
||||
@@ -89,7 +90,7 @@ mod tests {
|
||||
|
||||
match json {
|
||||
HttpResponse::Json(json) => {
|
||||
assert!(json.success);
|
||||
assert!(json.success, "{:?}", json);
|
||||
assert!(json.error.is_none());
|
||||
assert!(json.output.is_some());
|
||||
|
||||
|
||||
@@ -11,9 +11,8 @@ use crate::server::http::HttpServer;
|
||||
use crate::test_util;
|
||||
|
||||
async fn make_test_app() -> Router {
|
||||
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
|
||||
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
|
||||
let instance = Arc::new(Instance::new(&opts, catalog_list).await.unwrap());
|
||||
let instance = Arc::new(Instance::new(&opts).await.unwrap());
|
||||
instance.start().await.unwrap();
|
||||
let http_server = HttpServer::new(instance);
|
||||
http_server.make_app()
|
||||
|
||||
@@ -23,6 +23,8 @@ datatypes = { path = "../datatypes" }
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
metrics = "0.18"
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
sql = { path = "../sql" }
|
||||
table = { path = "../table" }
|
||||
|
||||
@@ -208,15 +208,31 @@ impl QueryExecutor for DatafusionQueryEngine {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::UInt64Array;
|
||||
use catalog::memory::{MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::{
|
||||
CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
|
||||
};
|
||||
use common_recordbatch::util;
|
||||
use datafusion::field_util::FieldExt;
|
||||
use datafusion::field_util::SchemaExt;
|
||||
use table::table::numbers::NumbersTable;
|
||||
|
||||
use crate::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
|
||||
|
||||
fn create_test_engine() -> QueryEngineRef {
|
||||
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
|
||||
|
||||
let default_schema = Arc::new(MemorySchemaProvider::new());
|
||||
default_schema
|
||||
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
|
||||
.unwrap();
|
||||
let default_catalog = Arc::new(MemoryCatalogProvider::new());
|
||||
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema);
|
||||
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog);
|
||||
|
||||
let factory = QueryEngineFactory::new(catalog_list);
|
||||
factory.query_engine().clone()
|
||||
}
|
||||
|
||||
@@ -3,7 +3,9 @@
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::{CatalogListRef, CatalogProvider, SchemaProvider};
|
||||
use catalog::{
|
||||
CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
use datafusion::catalog::{
|
||||
catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider},
|
||||
schema::SchemaProvider as DfSchemaProvider,
|
||||
@@ -44,7 +46,7 @@ impl DfCatalogList for DfCatalogListAdapter {
|
||||
catalog: Arc<dyn DfCatalogProvider>,
|
||||
) -> Option<Arc<dyn DfCatalogProvider>> {
|
||||
let catalog_adapter = Arc::new(CatalogProviderAdapter {
|
||||
df_cataglog_provider: catalog,
|
||||
df_catalog_provider: catalog,
|
||||
runtime: self.runtime.clone(),
|
||||
});
|
||||
self.catalog_list
|
||||
@@ -73,7 +75,7 @@ impl DfCatalogList for DfCatalogListAdapter {
|
||||
|
||||
/// Datafusion's CatalogProvider -> greptime CatalogProvider
|
||||
struct CatalogProviderAdapter {
|
||||
df_cataglog_provider: Arc<dyn DfCatalogProvider>,
|
||||
df_catalog_provider: Arc<dyn DfCatalogProvider>,
|
||||
runtime: Arc<RuntimeEnv>,
|
||||
}
|
||||
|
||||
@@ -83,11 +85,19 @@ impl CatalogProvider for CatalogProviderAdapter {
|
||||
}
|
||||
|
||||
fn schema_names(&self) -> Vec<String> {
|
||||
self.df_cataglog_provider.schema_names()
|
||||
self.df_catalog_provider.schema_names()
|
||||
}
|
||||
|
||||
fn register_schema(
|
||||
&self,
|
||||
_name: String,
|
||||
_schema: SchemaProviderRef,
|
||||
) -> Option<SchemaProviderRef> {
|
||||
todo!("register_schema is not supported in Datafusion catalog provider")
|
||||
}
|
||||
|
||||
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
|
||||
self.df_cataglog_provider
|
||||
self.df_catalog_provider
|
||||
.schema(name)
|
||||
.map(|df_schema_provider| {
|
||||
Arc::new(SchemaProviderAdapter {
|
||||
@@ -100,7 +110,7 @@ impl CatalogProvider for CatalogProviderAdapter {
|
||||
|
||||
///Greptime CatalogProvider -> datafusion's CatalogProvider
|
||||
struct DfCatalogProviderAdapter {
|
||||
catalog_provider: Arc<dyn CatalogProvider>,
|
||||
catalog_provider: CatalogProviderRef,
|
||||
runtime: Arc<RuntimeEnv>,
|
||||
}
|
||||
|
||||
@@ -234,3 +244,65 @@ impl SchemaProvider for SchemaProviderAdapter {
|
||||
self.df_schema_provider.table_exist(name)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use catalog::memory::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use table::table::numbers::NumbersTable;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
pub fn test_register_schema() {
|
||||
let adapter = CatalogProviderAdapter {
|
||||
df_catalog_provider: Arc::new(
|
||||
datafusion::catalog::catalog::MemoryCatalogProvider::new(),
|
||||
),
|
||||
runtime: Arc::new(RuntimeEnv::default()),
|
||||
};
|
||||
|
||||
adapter.register_schema(
|
||||
"whatever".to_string(),
|
||||
Arc::new(MemorySchemaProvider::new()),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_register_table() {
|
||||
let adapter = DfSchemaProviderAdapter {
|
||||
runtime: Arc::new(RuntimeEnv::default()),
|
||||
schema_provider: Arc::new(MemorySchemaProvider::new()),
|
||||
};
|
||||
|
||||
adapter
|
||||
.register_table(
|
||||
"test_table".to_string(),
|
||||
Arc::new(DfTableProviderAdapter::new(Arc::new(
|
||||
NumbersTable::default(),
|
||||
))),
|
||||
)
|
||||
.unwrap();
|
||||
adapter.table("test_table").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_register_catalog() {
|
||||
let rt = Arc::new(RuntimeEnv::default());
|
||||
let catalog_list = DfCatalogListAdapter {
|
||||
runtime: rt.clone(),
|
||||
catalog_list: new_memory_catalog_list().unwrap(),
|
||||
};
|
||||
assert!(catalog_list
|
||||
.register_catalog(
|
||||
"test_catalog".to_string(),
|
||||
Arc::new(DfCatalogProviderAdapter {
|
||||
catalog_provider: Arc::new(MemoryCatalogProvider::new()),
|
||||
runtime: rt,
|
||||
}),
|
||||
)
|
||||
.is_none());
|
||||
|
||||
catalog_list.catalog("test_catalog").unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +73,9 @@ impl ErrorExt for InnerError {
|
||||
|
||||
impl From<InnerError> for catalog::error::Error {
|
||||
fn from(e: InnerError) -> Self {
|
||||
catalog::error::Error::new(e)
|
||||
catalog::error::Error::RegisterTable {
|
||||
source: BoxedError::new(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,6 +87,8 @@ impl From<InnerError> for Error {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_error::mock::MockError;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn throw_df_error() -> Result<(), DataFusionError> {
|
||||
@@ -129,4 +133,15 @@ mod tests {
|
||||
let sql_err = raise_sql_error().err().unwrap();
|
||||
assert_eq!(sql_err.status_code(), err.status_code());
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_from_inner_error() {
|
||||
let err = InnerError::TableSchemaMismatch {
|
||||
source: table::error::Error::new(MockError::new(StatusCode::Unexpected)),
|
||||
};
|
||||
|
||||
let catalog_error = catalog::error::Error::from(err);
|
||||
// [InnerError] to [catalog::error::Error] is considered as Internal error
|
||||
assert_eq!(StatusCode::Internal, catalog_error.status_code());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,9 @@ use std::sync::Arc;
|
||||
|
||||
use arc_swap::ArcSwapOption;
|
||||
use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::{CatalogList, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use catalog::{
|
||||
CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
|
||||
};
|
||||
use common_function::scalars::aggregate::AggregateFunctionMeta;
|
||||
use common_query::error::CreateAccumulatorSnafu;
|
||||
use common_query::error::Result as QueryResult;
|
||||
@@ -262,7 +264,7 @@ pub fn new_query_engine_factory(table_name: String, table: TableRef) -> QueryEng
|
||||
let catalog_list = Arc::new(MemoryCatalogList::default());
|
||||
|
||||
schema_provider.register_table(table_name, table).unwrap();
|
||||
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME, schema_provider);
|
||||
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider);
|
||||
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
|
||||
|
||||
QueryEngineFactory::new(catalog_list)
|
||||
|
||||
@@ -4,7 +4,9 @@ use std::sync::Arc;
|
||||
|
||||
use arrow::array::UInt32Array;
|
||||
use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::{CatalogList, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use catalog::{
|
||||
CatalogList, CatalogProvider, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
|
||||
};
|
||||
use common_query::prelude::{create_udf, make_scalar_function, Volatility};
|
||||
use common_recordbatch::error::Result as RecordResult;
|
||||
use common_recordbatch::{util, RecordBatch};
|
||||
@@ -23,6 +25,7 @@ use query::query_engine::{Output, QueryEngineFactory};
|
||||
use query::QueryEngine;
|
||||
use rand::Rng;
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
use table::table::numbers::NumbersTable;
|
||||
use test_util::MemTable;
|
||||
|
||||
use crate::pow::pow;
|
||||
@@ -87,6 +90,15 @@ async fn test_datafusion_query_engine() -> Result<()> {
|
||||
async fn test_udf() -> Result<()> {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let catalog_list = catalog::memory::new_memory_catalog_list()?;
|
||||
|
||||
let default_schema = Arc::new(MemorySchemaProvider::new());
|
||||
default_schema
|
||||
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
|
||||
.unwrap();
|
||||
let default_catalog = Arc::new(MemoryCatalogProvider::new());
|
||||
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema);
|
||||
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog);
|
||||
|
||||
let factory = QueryEngineFactory::new(catalog_list);
|
||||
let engine = factory.query_engine();
|
||||
|
||||
@@ -205,7 +217,7 @@ fn create_query_engine() -> Arc<dyn QueryEngine> {
|
||||
.register_table("float_numbers".to_string(), float_number_table)
|
||||
.unwrap();
|
||||
|
||||
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME, schema_provider);
|
||||
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider);
|
||||
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
|
||||
|
||||
let factory = QueryEngineFactory::new(catalog_list);
|
||||
|
||||
@@ -144,7 +144,7 @@ impl<'a> ParserContext<'a> {
|
||||
/// Parses the set of valid formats
|
||||
fn parse_table_engine(&mut self) -> Result<String> {
|
||||
if !self.consume_token(ENGINE) {
|
||||
return Ok(engine::DEFAULT_ENGINE.to_string());
|
||||
return Ok(engine::MITO_ENGINE.to_string());
|
||||
}
|
||||
|
||||
self.parser
|
||||
|
||||
@@ -131,7 +131,6 @@ impl ManifestLogStorage for ManifestObjectStore {
|
||||
.is_exist()
|
||||
.await
|
||||
.context(ReadObjectSnafu { path: &self.path })?;
|
||||
|
||||
if !dir_exists {
|
||||
return Ok(ObjectStoreLogIterator {
|
||||
iter: Box::new(Vec::default().into_iter()),
|
||||
|
||||
@@ -23,11 +23,11 @@ use tokio::sync::Mutex;
|
||||
|
||||
use crate::error::{
|
||||
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
|
||||
BuildRowKeyDescriptorSnafu, MissingTimestamIndexSnafu, Result,
|
||||
BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result,
|
||||
};
|
||||
use crate::table::MitoTable;
|
||||
|
||||
pub const DEFAULT_ENGINE: &str = "mito";
|
||||
pub const MITO_ENGINE: &str = "mito";
|
||||
const INIT_COLUMN_ID: ColumnId = 0;
|
||||
|
||||
/// [TableEngine] implementation.
|
||||
@@ -49,6 +49,10 @@ impl<S: StorageEngine> MitoEngine<S> {
|
||||
|
||||
#[async_trait]
|
||||
impl<S: StorageEngine> TableEngine for MitoEngine<S> {
|
||||
fn name(&self) -> &str {
|
||||
MITO_ENGINE
|
||||
}
|
||||
|
||||
async fn create_table(
|
||||
&self,
|
||||
ctx: &EngineContext,
|
||||
@@ -127,7 +131,7 @@ fn build_row_key_desc_from_schema(
|
||||
request
|
||||
.schema
|
||||
.timestamp_column()
|
||||
.context(MissingTimestamIndexSnafu {
|
||||
.context(MissingTimestampIndexSnafu {
|
||||
table_name: &request.name,
|
||||
})?;
|
||||
let timestamp_index = request.schema.timestamp_index().unwrap();
|
||||
@@ -191,7 +195,7 @@ fn build_column_family_from_request(
|
||||
let ts_index = request
|
||||
.schema
|
||||
.timestamp_index()
|
||||
.context(MissingTimestamIndexSnafu {
|
||||
.context(MissingTimestampIndexSnafu {
|
||||
table_name: &request.name,
|
||||
})?;
|
||||
let column_schemas = request
|
||||
@@ -271,7 +275,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
// Use region meta schema instead of request schema
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(region.in_memory_metadata().schema().clone())
|
||||
.engine(DEFAULT_ENGINE)
|
||||
.engine(MITO_ENGINE)
|
||||
.next_column_id(next_column_id)
|
||||
.primary_key_indices(request.primary_key_indices.clone())
|
||||
.build()
|
||||
@@ -337,7 +341,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
//FIXME(boyan): recover table meta from table manifest
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(region.in_memory_metadata().schema().clone())
|
||||
.engine(DEFAULT_ENGINE)
|
||||
.engine(MITO_ENGINE)
|
||||
.next_column_id(INIT_COLUMN_ID)
|
||||
.primary_key_indices(Vec::default())
|
||||
.build()
|
||||
@@ -445,6 +449,7 @@ mod tests {
|
||||
|
||||
let (engine, table) = {
|
||||
let (engine, table_engine, table) = test_util::setup_mock_engine_and_table().await;
|
||||
assert_eq!(MITO_ENGINE, table_engine.name());
|
||||
// Now try to open the table again.
|
||||
let reopened = table_engine
|
||||
.open_table(&ctx, open_req.clone())
|
||||
|
||||
@@ -43,7 +43,7 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display("Missing timestamp index for table: {}", table_name))]
|
||||
MissingTimestamIndex {
|
||||
MissingTimestampIndex {
|
||||
table_name: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
@@ -118,7 +118,7 @@ impl ErrorExt for Error {
|
||||
| BuildTableMeta { .. }
|
||||
| BuildTableInfo { .. }
|
||||
| BuildRegionDescriptor { .. }
|
||||
| MissingTimestamIndex { .. } => StatusCode::InvalidArguments,
|
||||
| MissingTimestampIndex { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,9 @@ use crate::TableRef;
|
||||
/// Table engine abstraction.
|
||||
#[async_trait::async_trait]
|
||||
pub trait TableEngine: Send + Sync {
|
||||
/// Return engine name
|
||||
fn name(&self) -> &str;
|
||||
|
||||
/// Create a table by given request.
|
||||
///
|
||||
/// Return the created table.
|
||||
|
||||
Reference in New Issue
Block a user