diff --git a/Cargo.lock b/Cargo.lock index c269433a84..aaca510c99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -644,10 +644,16 @@ dependencies = [ "datatypes", "futures", "futures-util", + "log-store", + "object-store", + "opendal", "serde", "serde_json", "snafu", + "storage", "table", + "table-engine", + "tempdir", "tokio", ] @@ -4752,6 +4758,7 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" name = "sql" version = "0.1.0" dependencies = [ + "catalog", "common-error", "common-time", "datatypes", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 2d981d8e35..6525783485 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -22,4 +22,10 @@ snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } [dev-dependencies] +log-store = { path = "../log-store" } +object-store = { path = "../object-store" } +opendal = "0.17" +storage = { path = "../storage" } +table-engine = { path = "../table-engine" } +tempdir = "0.3" tokio = { version = "1.0", features = ["full"] } diff --git a/src/catalog/src/manager.rs b/src/catalog/src/manager.rs index 8ec2aadbd9..28e336ddee 100644 --- a/src/catalog/src/manager.rs +++ b/src/catalog/src/manager.rs @@ -97,7 +97,7 @@ impl LocalCatalogManager { let table_id = req.create_table_request.id; let table = if let Some(table) = - self.table(catalog_name.as_deref(), schema_name.as_deref(), table_name)? + self.table(Some(catalog_name), Some(schema_name), table_name)? { table } else { @@ -108,15 +108,12 @@ impl LocalCatalogManager { .with_context(|_| CreateTableSnafu { table_info: format!( "{}.{}.{}, id: {}", - catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME), - schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), - table_name, - table_id, + catalog_name, schema_name, table_name, table_id, ), })?; self.register_table(RegisterTableRequest { - catalog: catalog_name.clone(), - schema: schema_name.clone(), + catalog: Some(catalog_name.clone()), + schema: Some(schema_name.clone()), table_name: table_name.clone(), table_id, table: table.clone(), diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index dee3b25d11..a7bc2aa262 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -13,7 +13,7 @@ use datatypes::vectors::{BinaryVector, TimestampVector, UInt8Vector}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; -use table::metadata::TableId; +use table::metadata::{TableId, TableInfoRef}; use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest}; use table::{Table, TableRef}; @@ -32,7 +32,7 @@ pub const TIMESTAMP_INDEX: usize = 2; pub const VALUE_INDEX: usize = 3; pub struct SystemCatalogTable { - schema: SchemaRef, + table_info: TableInfoRef, pub table: TableRef, } @@ -43,7 +43,7 @@ impl Table for SystemCatalogTable { } fn schema(&self) -> SchemaRef { - self.schema.clone() + self.table_info.meta.schema.clone() } async fn scan( @@ -59,6 +59,10 @@ impl Table for SystemCatalogTable { async fn insert(&self, request: InsertRequest) -> table::error::Result { self.table.insert(request).await } + + fn table_info(&self) -> TableInfoRef { + self.table_info.clone() + } } impl SystemCatalogTable { @@ -77,13 +81,16 @@ impl SystemCatalogTable { .await .context(OpenSystemCatalogSnafu)? { - Ok(Self { table, schema }) + Ok(Self { + table_info: table.table_info(), + table, + }) } else { // system catalog table is not yet created, try to create let request = CreateTableRequest { id: SYSTEM_CATALOG_TABLE_ID, - catalog_name: Some(SYSTEM_CATALOG_NAME.to_string()), - schema_name: Some(INFORMATION_SCHEMA_NAME.to_string()), + catalog_name: SYSTEM_CATALOG_NAME.to_string(), + schema_name: INFORMATION_SCHEMA_NAME.to_string(), table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), desc: Some("System catalog table".to_string()), schema: schema.clone(), @@ -96,7 +103,8 @@ impl SystemCatalogTable { .create_table(&ctx, request) .await .context(CreateSystemCatalogSnafu)?; - Ok(Self { table, schema }) + let table_info = table.table_info(); + Ok(Self { table, table_info }) } } @@ -320,6 +328,22 @@ pub struct TableEntryValue { #[cfg(test)] mod tests { + use datafusion::execution::runtime_env::RuntimeEnv; + use datafusion::field_util::SchemaExt; + use datatypes::arrow; + use log_store::fs::noop::NoopLogStore; + use object_store::ObjectStore; + use storage::config::EngineConfig as StorageEngineConfig; + use storage::EngineImpl; + use table::engine::TableEngine; + use table::metadata::TableType; + use table::metadata::TableType::Base; + use table::requests::{AlterTableRequest, DropTableRequest}; + use table::table::adapter::TableAdapter; + use table_engine::config::EngineConfig; + use table_engine::engine::MitoEngine; + use tempdir::TempDir; + use super::*; #[test] @@ -391,4 +415,120 @@ mod tests { assert_eq!(EntryType::Table, EntryType::try_from(3).unwrap()); assert!(EntryType::try_from(4).is_err()); } + + struct MockTableEngine { + table_name: String, + sole_table: TableRef, + } + + impl Default for MockTableEngine { + fn default() -> Self { + Self { + table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), + sole_table: Arc::new( + TableAdapter::new( + Arc::new(datafusion::datasource::empty::EmptyTable::new(Arc::new( + arrow::datatypes::Schema::empty(), + ))), + Arc::new(RuntimeEnv::default()), + ) + .unwrap(), + ), + } + } + } + + #[async_trait::async_trait] + impl TableEngine for MockTableEngine { + fn name(&self) -> &str { + "MockTableEngine" + } + + async fn create_table( + &self, + _ctx: &EngineContext, + _request: CreateTableRequest, + ) -> table::Result { + unreachable!() + } + + async fn open_table( + &self, + _ctx: &EngineContext, + request: OpenTableRequest, + ) -> table::Result> { + if request.table_name == self.table_name { + Ok(Some(self.sole_table.clone())) + } else { + Ok(None) + } + } + + async fn alter_table( + &self, + _ctx: &EngineContext, + _request: AlterTableRequest, + ) -> table::Result { + unreachable!() + } + + fn get_table(&self, _ctx: &EngineContext, name: &str) -> table::Result> { + if name == self.table_name { + Ok(Some(self.sole_table.clone())) + } else { + Ok(None) + } + } + + fn table_exists(&self, _ctx: &EngineContext, name: &str) -> bool { + name == self.table_name + } + + async fn drop_table( + &self, + _ctx: &EngineContext, + _request: DropTableRequest, + ) -> table::Result<()> { + unreachable!() + } + } + + async fn prepare_table_engine() -> (TempDir, TableEngineRef) { + let dir = TempDir::new("system-table-test").unwrap(); + let store_dir = dir.path().to_string_lossy(); + let accessor = opendal::services::fs::Builder::default() + .root(&store_dir) + .build() + .unwrap(); + let object_store = ObjectStore::new(accessor); + let table_engine = Arc::new(MitoEngine::new( + EngineConfig::default(), + EngineImpl::new( + StorageEngineConfig::default(), + Arc::new(NoopLogStore::default()), + object_store.clone(), + ), + object_store, + )); + (dir, table_engine) + } + + #[tokio::test] + async fn test_system_table_type() { + let (_dir, table_engine) = prepare_table_engine().await; + let system_table = SystemCatalogTable::new(table_engine).await.unwrap(); + assert_eq!(Base, system_table.table_type()); + } + + #[tokio::test] + async fn test_system_table_info() { + let (_dir, table_engine) = prepare_table_engine().await; + let system_table = SystemCatalogTable::new(table_engine).await.unwrap(); + let info = system_table.table_info(); + assert_eq!(TableType::Base, info.table_type); + assert_eq!(SYSTEM_CATALOG_TABLE_NAME, info.name); + assert_eq!(SYSTEM_CATALOG_TABLE_ID, info.ident.table_id); + assert_eq!(SYSTEM_CATALOG_NAME, info.catalog_name); + assert_eq!(INFORMATION_SCHEMA_NAME, info.schema_name); + } } diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index a366ccac74..c26c7db286 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -16,7 +16,7 @@ use datatypes::vectors::VectorRef; use futures::Stream; use snafu::ResultExt; use table::engine::TableEngineRef; -use table::metadata::TableId; +use table::metadata::{TableId, TableInfoRef}; use table::{Table, TableRef}; use crate::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME}; @@ -53,6 +53,10 @@ impl Table for Tables { self.schema.clone() } + fn table_info(&self) -> TableInfoRef { + unreachable!("Tables does not support table_info method") + } + async fn scan( &self, _projection: &Option>, diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 2cb04767dd..250df8504f 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -51,13 +51,20 @@ impl Instance { async fn create_table_by_insert_batches( &self, + catalog_name: &str, + schema_name: &str, table_name: &str, insert_batches: &[InsertBatch], ) -> Result<()> { // Create table automatically, build schema from data. let table_id = self.catalog_manager.next_table_id(); - let create_table_request = - insert::build_create_table_request(table_id, table_name, insert_batches)?; + let create_table_request = insert::build_create_table_request( + catalog_name, + schema_name, + table_id, + table_name, + insert_batches, + )?; info!( "Try to create table: {} automatically with request: {:?}", @@ -79,11 +86,15 @@ impl Instance { table_name: &str, values: insert_expr::Values, ) -> Result { + // maybe infer from insert batch? + let catalog_name = DEFAULT_CATALOG_NAME; + let schema_name = DEFAULT_SCHEMA_NAME; + let schema_provider = self .catalog_manager - .catalog(DEFAULT_CATALOG_NAME) + .catalog(catalog_name) .unwrap() - .schema(DEFAULT_SCHEMA_NAME) + .schema(schema_name) .unwrap(); let insert_batches = insert::insert_batches(values.values)?; @@ -98,8 +109,13 @@ impl Instance { table } else { - self.create_table_by_insert_batches(table_name, &insert_batches) - .await?; + self.create_table_by_insert_batches( + catalog_name, + schema_name, + table_name, + &insert_batches, + ) + .await?; schema_provider .table(table_name) diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 06cc6ce20a..dde2b56202 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDef, CreateExpr}; +use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::{ErrorExt, StatusCode}; use common_query::Output; use datatypes::schema::ColumnDefaultConstraint; @@ -77,10 +78,16 @@ impl Instance { let table_id = self.catalog_manager().next_table_id(); + let catalog_name = expr + .catalog_name + .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()); + let schema_name = expr + .schema_name + .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); Ok(CreateTableRequest { id: table_id, - catalog_name: expr.catalog_name, - schema_name: expr.schema_name, + catalog_name, + schema_name, table_name: expr.table_name, desc: expr.desc, schema, @@ -182,8 +189,8 @@ mod tests { let expr = testing_create_expr(); let request = instance.create_expr_to_request(expr).unwrap(); assert_eq!(request.id, MIN_USER_TABLE_ID); - assert_eq!(request.catalog_name, None); - assert_eq!(request.schema_name, None); + assert_eq!(request.catalog_name, "greptime".to_string()); + assert_eq!(request.schema_name, "public".to_string()); assert_eq!(request.table_name, "my-metrics"); assert_eq!(request.desc, Some("blabla".to_string())); assert_eq!(request.schema, expected_table_schema()); diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 77773cb320..dec4f91bc2 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -97,6 +97,8 @@ pub fn build_alter_table_request( /// Try to build create table request from insert data. pub fn build_create_table_request( + catalog_name: &str, + schema_name: &str, table_id: TableId, table_name: &str, insert_batches: &[InsertBatch], @@ -158,8 +160,8 @@ pub fn build_create_table_request( return Ok(CreateTableRequest { id: table_id, - catalog_name: None, - schema_name: None, + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), table_name: table_name.to_string(), desc: None, schema, @@ -371,6 +373,7 @@ mod tests { value::Value, }; use table::error::Result as TableResult; + use table::metadata::TableInfoRef; use table::Table; use super::{ @@ -384,14 +387,13 @@ mod tests { let table_id = 10; let table_name = "test_metric"; - assert!(build_create_table_request(table_id, table_name, &[]).is_err()); + assert!(build_create_table_request("", "", table_id, table_name, &[]).is_err()); let insert_batches = insert_batches(mock_insert_batches()).unwrap(); - let req = build_create_table_request(table_id, table_name, &insert_batches).unwrap(); + let req = + build_create_table_request("", "", table_id, table_name, &insert_batches).unwrap(); assert_eq!(table_id, req.id); - assert!(req.catalog_name.is_none()); - assert!(req.schema_name.is_none()); assert_eq!(table_name, req.table_name); assert!(req.desc.is_none()); assert_eq!(vec![0], req.primary_key_indices); @@ -540,6 +542,11 @@ mod tests { .unwrap(), ) } + + fn table_info(&self) -> TableInfoRef { + unimplemented!() + } + async fn scan( &self, _projection: &Option>, diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index c31aa0dc97..0be1dade92 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -104,6 +104,7 @@ mod tests { use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::error::Result as TableResult; + use table::metadata::TableInfoRef; use table::{Table, TableRef}; use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; @@ -135,6 +136,11 @@ mod tests { .unwrap(), ) } + + fn table_info(&self) -> TableInfoRef { + unimplemented!() + } + async fn scan( &self, _projection: &Option>, diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index 5ff4f87516..7f92c1f2e8 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -44,8 +44,8 @@ impl SqlHandler { }, }; Ok(AlterTableRequest { - catalog_name, - schema_name, + catalog_name: Some(catalog_name), + schema_name: Some(schema_name), table_name, alter_kind, }) @@ -80,8 +80,8 @@ mod tests { let handler = create_mock_sql_handler().await; let alter_table = parse_sql("ALTER TABLE my_metric_1 ADD tagk_i STRING Null;"); let req = handler.alter_to_request(alter_table).unwrap(); - assert_eq!(req.catalog_name, None); - assert_eq!(req.schema_name, None); + assert_eq!(req.catalog_name, Some("greptime".to_string())); + assert_eq!(req.schema_name, Some("public".to_string())); assert_eq!(req.table_name, "my_metric_1"); let alter_kind = req.alter_kind; diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 2c35759311..f096a8aaff 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -37,8 +37,8 @@ impl SqlHandler { })?; let register_req = RegisterTableRequest { - catalog: catalog_name, - schema: schema_name, + catalog: Some(catalog_name.to_string()), + schema: Some(schema_name.to_string()), table_name: table_name.clone(), table_id, table, @@ -203,8 +203,6 @@ mod tests { let c = handler.create_to_request(42, parsed_stmt).unwrap(); assert_eq!("demo_table", c.table_name); assert_eq!(42, c.id); - assert!(c.schema_name.is_none()); - assert!(c.catalog_name.is_none()); assert!(!c.create_if_not_exists); assert_eq!(vec![0], c.primary_key_indices); assert_eq!(1, c.schema.timestamp_index().unwrap()); @@ -302,8 +300,8 @@ mod tests { let request = handler.create_to_request(42, create_table).unwrap(); assert_eq!(42, request.id); - assert_eq!(Some("c".to_string()), request.catalog_name); - assert_eq!(Some("s".to_string()), request.schema_name); + assert_eq!("c".to_string(), request.catalog_name); + assert_eq!("s".to_string(), request.schema_name); assert_eq!("demo".to_string(), request.table_name); assert!(!request.create_if_not_exists); assert_eq!(4, request.schema.column_schemas().len()); diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index b18124038a..9fff98da2c 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -58,8 +58,8 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> { &EngineContext::default(), CreateTableRequest { id: MIN_USER_TABLE_ID, - catalog_name: None, - schema_name: None, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), table_name: table_name.to_string(), desc: Some(" a test table".to_string()), schema: Arc::new( diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 82fb0c43e2..f95f3ade14 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -136,8 +136,8 @@ fn create_to_expr(create: CreateTable) -> Result { table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?; let expr = CreateExpr { - catalog_name, - schema_name, + catalog_name: Some(catalog_name), + schema_name: Some(schema_name), table_name, column_defs: columns_to_expr(&create.columns)?, time_index: find_time_index(&create.constraints)?, diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index 953bf539f4..13cd828f61 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -2,7 +2,6 @@ name = "meta-client" version = "0.1.0" edition = "2021" - # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index d08cfc4aed..e058219d34 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -2,7 +2,6 @@ name = "meta-srv" version = "0.1.0" edition = "2021" - # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] @@ -14,8 +13,8 @@ common-telemetry = { path = "../common/telemetry" } etcd-client = "0.10" futures = "0.3" http-body = "0.4" -snafu = { version = "0.7", features = ["backtraces"] } serde = "1.0" +snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.0", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" diff --git a/src/script/src/table.rs b/src/script/src/table.rs index d99af75ffc..8abd93d6a6 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -41,8 +41,8 @@ impl ScriptsTable { // maybe put into system catalog? let request = CreateTableRequest { id: SCRIPTS_TABLE_ID, - catalog_name: Some(DEFAULT_CATALOG_NAME.to_string()), - schema_name: Some(DEFAULT_SCHEMA_NAME.to_string()), + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: SCRIPTS_TABLE_NAME.to_string(), desc: Some("Scripts table".to_string()), schema, diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 07de1893ab..3fe9b2649f 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +catalog = { path = "../catalog" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 53ce293b6b..0f5a89c8a5 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -7,6 +7,7 @@ pub mod statement; use std::str::FromStr; +use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; @@ -24,14 +25,16 @@ use crate::error::{ /// Converts maybe fully-qualified table name (`..` or `
` when /// catalog and schema are default) to tuple. -pub fn table_idents_to_full_name( - obj_name: &ObjectName, -) -> Result<(Option, Option, String)> { +pub fn table_idents_to_full_name(obj_name: &ObjectName) -> Result<(String, String, String)> { match &obj_name.0[..] { - [table] => Ok((None, None, table.value.clone())), + [table] => Ok(( + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), + table.value.clone(), + )), [catalog, schema, table] => Ok(( - Some(catalog.value.clone()), - Some(schema.value.clone()), + catalog.value.clone(), + schema.value.clone(), table.value.clone(), )), _ => error::InvalidSqlSnafu { diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 6d193c146c..d6ab139463 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -238,13 +238,18 @@ impl MitoEngineInner { _ctx: &EngineContext, request: CreateTableRequest, ) -> Result { + let catalog_name = request.catalog_name; + let schema_name = request.schema_name; let table_name = &request.table_name; if let Some(table) = self.get_table(table_name) { if request.create_if_not_exists { return Ok(table); } else { - return TableExistsSnafu { table_name }.fail(); + return TableExistsSnafu { + table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name), + } + .fail(); } } @@ -313,6 +318,8 @@ impl MitoEngineInner { .ident(table_id) .table_version(INIT_TABLE_VERSION) .table_type(TableType::Base) + .catalog_name(catalog_name.to_string()) + .schema_name(schema_name.to_string()) .desc(request.desc) .build() .context(error::BuildTableInfoSnafu { table_name })?; @@ -479,8 +486,8 @@ mod tests { &EngineContext::default(), CreateTableRequest { id: 1, - catalog_name: None, - schema_name: None, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), table_name: table_name.to_string(), desc: Some("a test table".to_string()), schema: schema.clone(), @@ -713,8 +720,8 @@ mod tests { let request = CreateTableRequest { id: 1, - catalog_name: None, - schema_name: None, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), table_name: table_info.name.to_string(), schema: table_info.meta.schema.clone(), create_if_not_exists: true, @@ -736,8 +743,8 @@ mod tests { // test create_if_not_exists=false let request = CreateTableRequest { id: 1, - catalog_name: None, - schema_name: None, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), table_name: table_info.name.to_string(), schema: table_info.meta.schema.clone(), create_if_not_exists: false, diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 5ee8aa396f..3a20f82001 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -145,6 +145,10 @@ impl Table for MitoTable { self.table_info().table_type } + fn table_info(&self) -> TableInfoRef { + self.table_info.load_full() + } + async fn scan( &self, projection: &Option>, diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index aad856140d..a580183aa6 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -93,8 +93,8 @@ pub async fn setup_test_engine_and_table() -> ( &EngineContext::default(), CreateTableRequest { id: 1, - catalog_name: None, - schema_name: None, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), table_name: TABLE_NAME.to_string(), desc: Some("a test table".to_string()), schema: schema.clone(), @@ -125,8 +125,8 @@ pub async fn setup_mock_engine_and_table( &EngineContext::default(), CreateTableRequest { id: 1, - catalog_name: None, - schema_name: None, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), table_name: TABLE_NAME.to_string(), desc: None, schema: schema.clone(), diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 01ec76fc19..fd00114783 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -101,6 +101,10 @@ pub struct TableInfo { pub name: String, #[builder(default, setter(into))] pub desc: Option, + #[builder(default, setter(into))] + pub catalog_name: String, + #[builder(default, setter(into))] + pub schema_name: String, pub meta: TableMeta, #[builder(default = "TableType::Base")] pub table_type: TableType, diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 09e541cab0..115bf41ee3 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -17,8 +17,8 @@ pub struct InsertRequest { #[derive(Debug, Clone)] pub struct CreateTableRequest { pub id: TableId, - pub catalog_name: Option, - pub schema_name: Option, + pub catalog_name: String, + pub schema_name: String, pub table_name: String, pub desc: Option, pub schema: SchemaRef, diff --git a/src/table/src/table.rs b/src/table/src/table.rs index e7f648ba69..7254df239f 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -9,7 +9,7 @@ use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; use crate::error::Result; -use crate::metadata::{FilterPushDownType, TableType}; +use crate::metadata::{FilterPushDownType, TableInfoRef, TableType}; use crate::requests::{AlterTableRequest, InsertRequest}; /// Table abstraction. @@ -22,6 +22,9 @@ pub trait Table: Send + Sync { /// Get a reference to the schema for this table fn schema(&self) -> SchemaRef; + /// Get a reference to the table info. + fn table_info(&self) -> TableInfoRef; + /// Get the type of this table for metadata/catalog purposes. fn table_type(&self) -> TableType { TableType::Base diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 8f988b80e4..42481fcfc0 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -32,6 +32,7 @@ use futures::Stream; use snafu::prelude::*; use crate::error::{self, Result}; +use crate::metadata::TableInfoRef; use crate::table::{FilterPushDownType, Table, TableRef, TableType}; /// Greptime SendableRecordBatchStream -> datafusion ExecutionPlan. @@ -189,6 +190,10 @@ impl Table for TableAdapter { self.schema.clone() } + fn table_info(&self) -> TableInfoRef { + unreachable!("Should not call table_info of TableAdaptor directly") + } + fn table_type(&self) -> TableType { match self.table_provider.table_type() { DfTableType::Base => TableType::Base, @@ -309,3 +314,28 @@ impl Stream for RecordBatchStreamAdapter { self.stream.size_hint() } } + +#[cfg(test)] +mod tests { + use datafusion::arrow; + use datafusion::datasource::empty::EmptyTable; + use datafusion_common::field_util::SchemaExt; + + use super::*; + use crate::metadata::TableType::Base; + + #[test] + #[should_panic] + fn test_table_adaptor_info() { + let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); + let table_adapter = TableAdapter::new(df_table, Arc::new(RuntimeEnv::default())).unwrap(); + let _ = table_adapter.table_info(); + } + + #[test] + fn test_table_adaptor_type() { + let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); + let table_adapter = TableAdapter::new(df_table, Arc::new(RuntimeEnv::default())).unwrap(); + assert_eq!(Base, table_adapter.table_type()); + } +} diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index cb47404f7c..92b019b6de 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -12,6 +12,7 @@ use futures::task::{Context, Poll}; use futures::Stream; use crate::error::Result; +use crate::metadata::TableInfoRef; use crate::table::{Expr, Table}; /// numbers table for test @@ -43,6 +44,10 @@ impl Table for NumbersTable { self.schema.clone() } + fn table_info(&self) -> TableInfoRef { + unimplemented!() + } + async fn scan( &self, _projection: &Option>, diff --git a/test-util/src/memtable.rs b/test-util/src/memtable.rs index 72a3d43340..1869233395 100644 --- a/test-util/src/memtable.rs +++ b/test-util/src/memtable.rs @@ -13,6 +13,7 @@ use futures::task::{Context, Poll}; use futures::Stream; use snafu::prelude::*; use table::error::{Result, SchemaConversionSnafu, TableProjectionSnafu}; +use table::metadata::TableInfoRef; use table::Table; #[derive(Debug, Clone)] @@ -60,6 +61,10 @@ impl Table for MemTable { self.recordbatch.schema.clone() } + fn table_info(&self) -> TableInfoRef { + unimplemented!() + } + async fn scan( &self, projection: &Option>,