From fd412b7b07206097dcb0b648bff2918c522caa83 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sun, 25 Jun 2023 16:05:20 +0900 Subject: [PATCH] refactor!: Uses table id to locate tables in table engines (#1817) * refactor: add table_id to get_table()/table_exists() * refactor: Add table_id to alter table request * refactor: Add table id to DropTableRequest * refactor: add table id to DropTableRequest * refactor: Use table id as key for the tables map * refactor: use table id as file engine's map key * refactor: Remove table reference from engine's get_table/table_exists * style: remove unused imports * feat!: Add table id to TableRegionalValue * style: fix cilppy * chore: add comments and logs --- src/catalog/src/helper.rs | 3 + src/catalog/src/remote/manager.rs | 51 ++++++--- src/catalog/src/remote/mock.rs | 60 +++------- src/catalog/src/remote/region_alive_keeper.rs | 13 ++- src/common/grpc-expr/src/alter.rs | 11 +- .../src/heartbeat/handler/close_region.rs | 3 +- src/datanode/src/instance/sql.rs | 11 +- src/datanode/src/server/grpc.rs | 35 +++++- src/datanode/src/sql/alter.rs | 5 + src/file-table-engine/src/engine/immutable.rs | 66 ++++------- .../src/engine/procedure/create.rs | 8 +- src/file-table-engine/src/engine/tests.rs | 58 ++++------ src/frontend/src/instance/distributed.rs | 7 +- src/frontend/src/table.rs | 1 + src/mito/src/engine.rs | 108 ++++++------------ src/mito/src/engine/procedure/alter.rs | 94 ++++----------- src/mito/src/engine/procedure/create.rs | 38 +++--- src/mito/src/engine/procedure/drop.rs | 14 +-- src/mito/src/engine/tests.rs | 85 +++++--------- src/mito/src/table/test_util.rs | 7 +- src/table-procedure/src/alter.rs | 3 +- src/table-procedure/src/create.rs | 18 +-- src/table-procedure/src/drop.rs | 12 +- src/table-procedure/src/test_util.rs | 6 +- src/table/src/engine.rs | 8 +- src/table/src/requests.rs | 3 + src/table/src/test_util/mock_engine.rs | 7 +- 27 files changed, 303 insertions(+), 432 deletions(-) diff --git a/src/catalog/src/helper.rs b/src/catalog/src/helper.rs index 8520bc3953..c95fc1b4d0 100644 --- a/src/catalog/src/helper.rs +++ b/src/catalog/src/helper.rs @@ -201,6 +201,9 @@ impl TableRegionalKey { /// region ids allocated by metasrv. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct TableRegionalValue { + // We can remove the `Option` from the table id once all regional values + // stored in meta have table ids. + pub table_id: Option, pub version: TableVersion, pub regions_ids: Vec, pub engine_name: Option, diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 36545df835..4bc5d4c7d8 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -984,21 +984,29 @@ impl SchemaProvider for RemoteSchemaProvider { .get(key.as_bytes()) .await? .map(|Kv(_, v)| { - let TableRegionalValue { engine_name, .. } = - TableRegionalValue::parse(String::from_utf8_lossy(&v)) - .context(InvalidCatalogValueSnafu)?; - let reference = TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: name, + let TableRegionalValue { + table_id, + engine_name, + .. + } = TableRegionalValue::parse(String::from_utf8_lossy(&v)) + .context(InvalidCatalogValueSnafu)?; + + let Some(table_id) = table_id else { + warn!("Cannot find table id for {key}, the value has an old format"); + return Ok(None); }; let engine_name = engine_name.as_deref().unwrap_or(MITO_ENGINE); let engine = self .engine_manager .engine(engine_name) .context(TableEngineNotFoundSnafu { engine_name })?; + let reference = TableReference { + catalog: &self.catalog_name, + schema: &self.schema_name, + table: name, + }; let table = engine - .get_table(&EngineContext {}, &reference) + .get_table(&EngineContext {}, table_id) .with_context(|_| OpenTableSnafu { table_info: reference.to_string(), })?; @@ -1011,9 +1019,12 @@ impl SchemaProvider for RemoteSchemaProvider { } async fn register_table(&self, name: String, table: TableRef) -> Result> { + // Currently, initiate_tables() always call this method to register the table to the schema thus we + // always update the region value. let table_info = table.table_info(); let table_version = table_info.ident.version; let table_value = TableRegionalValue { + table_id: Some(table_info.ident.table_id), version: table_version, regions_ids: table.table_info().meta.region_numbers.clone(), engine_name: Some(table_info.meta.engine.clone()), @@ -1061,25 +1072,27 @@ impl SchemaProvider for RemoteSchemaProvider { .get(table_key.as_bytes()) .await? .map(|Kv(_, v)| { - let TableRegionalValue { engine_name, .. } = - TableRegionalValue::parse(String::from_utf8_lossy(&v)) - .context(InvalidCatalogValueSnafu)?; - Ok(engine_name) + let TableRegionalValue { + table_id, + engine_name, + .. + } = TableRegionalValue::parse(String::from_utf8_lossy(&v)) + .context(InvalidCatalogValueSnafu)?; + Ok(engine_name.and_then(|name| table_id.map(|id| (name, id)))) }) .transpose()? .flatten(); - let engine_name = engine_opt.as_deref().unwrap_or_else(|| { - warn!("Cannot find table engine name for {table_key}"); - MITO_ENGINE - }); - self.backend.delete(table_key.as_bytes()).await?; debug!( "Successfully deleted catalog table entry, key: {}", table_key ); + let Some((engine_name, table_id)) = engine_opt else { + warn!("Cannot find table id and engine name for {table_key}"); + return Ok(None); + }; let reference = TableReference { catalog: &self.catalog_name, schema: &self.schema_name, @@ -1088,9 +1101,9 @@ impl SchemaProvider for RemoteSchemaProvider { // deregistering table does not necessarily mean dropping the table let table = self .engine_manager - .engine(engine_name) + .engine(&engine_name) .context(TableEngineNotFoundSnafu { engine_name })? - .get_table(&EngineContext {}, &reference) + .get_table(&EngineContext {}, table_id) .with_context(|_| OpenTableSnafu { table_info: reference.to_string(), })?; diff --git a/src/catalog/src/remote/mock.rs b/src/catalog/src/remote/mock.rs index a975ab64a4..c23e1fa757 100644 --- a/src/catalog/src/remote/mock.rs +++ b/src/catalog/src/remote/mock.rs @@ -16,8 +16,7 @@ use std::any::Any; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::{Display, Formatter}; -use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, RwLock as StdRwLock}; use async_stream::stream; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -27,7 +26,7 @@ use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::StringVector; use serde::Serializer; -use table::engine::{CloseTableResult, EngineContext, TableEngine, TableReference}; +use table::engine::{CloseTableResult, EngineContext, TableEngine}; use table::metadata::TableId; use table::requests::{ AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, @@ -167,7 +166,7 @@ impl KvBackend for MockKvBackend { #[derive(Default)] pub struct MockTableEngine { - tables: RwLock>, + tables: StdRwLock>, } #[async_trait::async_trait] @@ -182,21 +181,8 @@ impl TableEngine for MockTableEngine { _ctx: &EngineContext, request: CreateTableRequest, ) -> table::Result { - let table_name = request.table_name.clone(); - let catalog_name = request.catalog_name.clone(); - let schema_name = request.schema_name.clone(); - let table_full_name = - TableReference::full(&catalog_name, &schema_name, &table_name).to_string(); + let table_id = request.id; - let default_table_id = "0".to_owned(); - let table_id = TableId::from_str( - request - .table_options - .extra_options - .get("table_id") - .unwrap_or(&default_table_id), - ) - .unwrap(); let schema = Arc::new(Schema::new(vec![ColumnSchema::new( "name", ConcreteDataType::string_datatype(), @@ -206,16 +192,16 @@ impl TableEngine for MockTableEngine { let data = vec![Arc::new(StringVector::from(vec!["a", "b", "c"])) as _]; let record_batch = RecordBatch::new(schema, data).unwrap(); let table: TableRef = Arc::new(MemTable::new_with_catalog( - &table_name, + &request.table_name, record_batch, table_id, - catalog_name, - schema_name, + request.catalog_name, + request.schema_name, vec![0], )) as Arc<_>; - let mut tables = self.tables.write().await; - tables.insert(table_full_name, table.clone() as TableRef); + let mut tables = self.tables.write().unwrap(); + tables.insert(table_id, table.clone() as TableRef); Ok(table) } @@ -224,7 +210,7 @@ impl TableEngine for MockTableEngine { _ctx: &EngineContext, request: OpenTableRequest, ) -> table::Result> { - Ok(self.tables.read().await.get(&request.table_name).cloned()) + Ok(self.tables.read().unwrap().get(&request.table_id).cloned()) } async fn alter_table( @@ -238,25 +224,13 @@ impl TableEngine for MockTableEngine { fn get_table( &self, _ctx: &EngineContext, - table_ref: &TableReference, + table_id: TableId, ) -> table::Result> { - futures::executor::block_on(async { - Ok(self - .tables - .read() - .await - .get(&table_ref.to_string()) - .cloned()) - }) + Ok(self.tables.read().unwrap().get(&table_id).cloned()) } - fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool { - futures::executor::block_on(async { - self.tables - .read() - .await - .contains_key(&table_ref.to_string()) - }) + fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool { + self.tables.read().unwrap().contains_key(&table_id) } async fn drop_table( @@ -272,11 +246,7 @@ impl TableEngine for MockTableEngine { _ctx: &EngineContext, request: CloseTableRequest, ) -> table::Result { - let _ = self - .tables - .write() - .await - .remove(&request.table_ref().to_string()); + let _ = self.tables.write().unwrap().remove(&request.table_id); Ok(CloseTableResult::Released(vec![])) } diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index 9b64e35559..130b7536fd 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -475,6 +475,7 @@ impl CountdownTask { catalog_name: table_ident.catalog.clone(), schema_name: table_ident.schema.clone(), table_name: table_ident.table.clone(), + table_id: table_ident.table_id, region_numbers: vec![region], flush: true, }; @@ -499,7 +500,7 @@ mod test { use common_meta::heartbeat::mailbox::HeartbeatMailbox; use datatypes::schema::RawSchema; use table::engine::manager::MemoryTableEngineManager; - use table::engine::{TableEngine, TableReference}; + use table::engine::TableEngine; use table::requests::{CreateTableRequest, TableOptions}; use table::test_util::EmptyTable; @@ -751,8 +752,9 @@ mod test { let catalog = "my_catalog"; let schema = "my_schema"; let table = "my_table"; + let table_id = 1; let request = CreateTableRequest { - id: 1, + id: table_id, catalog_name: catalog.to_string(), schema_name: schema.to_string(), table_name: table.to_string(), @@ -768,7 +770,6 @@ mod test { table_options: TableOptions::default(), engine: "mito".to_string(), }; - let table_ref = TableReference::full(catalog, schema, table); let table_engine = Arc::new(MockTableEngine::default()); table_engine.create_table(ctx, request).await.unwrap(); @@ -777,7 +778,7 @@ mod test { catalog: catalog.to_string(), schema: schema.to_string(), table: table.to_string(), - table_id: 1024, + table_id, engine: "mito".to_string(), }; let (tx, rx) = mpsc::channel(10); @@ -813,9 +814,9 @@ mod test { .unwrap(); // assert the table is closed after deadline is reached - assert!(table_engine.table_exists(ctx, &table_ref)); + assert!(table_engine.table_exists(ctx, table_id)); // spare 500ms for the task to close the table tokio::time::sleep(Duration::from_millis(2000)).await; - assert!(!table_engine.table_exists(ctx, &table_ref)); + assert!(!table_engine.table_exists(ctx, table_id)); } } diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 1d9b7ca905..c5345a72b3 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -34,7 +34,7 @@ const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32; const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32; /// Convert an [`AlterExpr`] to an [`AlterTableRequest`] -pub fn alter_expr_to_request(expr: AlterExpr) -> Result { +pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result { let catalog_name = expr.catalog_name; let schema_name = expr.schema_name; let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?; @@ -69,6 +69,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result { catalog_name, schema_name, table_name: expr.table_name, + table_id, alter_kind, }; Ok(request) @@ -82,6 +83,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result { catalog_name, schema_name, table_name: expr.table_name, + table_id, alter_kind, }; Ok(request) @@ -92,6 +94,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result { catalog_name, schema_name, table_name: expr.table_name, + table_id, alter_kind, }; Ok(request) @@ -239,7 +242,7 @@ mod tests { })), }; - let alter_request = alter_expr_to_request(expr).unwrap(); + let alter_request = alter_expr_to_request(1, expr).unwrap(); assert_eq!(alter_request.catalog_name, ""); assert_eq!(alter_request.schema_name, ""); assert_eq!("monitor".to_string(), alter_request.table_name); @@ -296,7 +299,7 @@ mod tests { })), }; - let alter_request = alter_expr_to_request(expr).unwrap(); + let alter_request = alter_expr_to_request(1, expr).unwrap(); assert_eq!(alter_request.catalog_name, ""); assert_eq!(alter_request.schema_name, ""); assert_eq!("monitor".to_string(), alter_request.table_name); @@ -344,7 +347,7 @@ mod tests { })), }; - let alter_request = alter_expr_to_request(expr).unwrap(); + let alter_request = alter_expr_to_request(1, expr).unwrap(); assert_eq!(alter_request.catalog_name, "test_catalog"); assert_eq!(alter_request.schema_name, "test_schema"); assert_eq!("monitor".to_string(), alter_request.table_name); diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index abc492d40f..69b9e3e665 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -164,7 +164,7 @@ impl CloseRegionHandler { } if engine - .get_table(&ctx, table_ref) + .get_table(&ctx, region_ident.table_ident.table_id) .with_context(|_| error::GetTableSnafu { table_name: table_ref.to_string(), })? @@ -178,6 +178,7 @@ impl CloseRegionHandler { schema_name: table_ref.schema.to_string(), table_name: table_ref.table.to_string(), region_numbers: region_numbers.clone(), + table_id: region_ident.table_ident.table_id, flush: true, }, ) diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 3b7b5f88be..c7257da043 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -106,7 +106,13 @@ impl Instance { let name = alter_table.table_name().clone(); let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?; let table_ref = TableReference::full(&catalog, &schema, &table); - let req = SqlHandler::alter_to_request(alter_table, table_ref)?; + // Currently, we have to get the table multiple times. Consider remove the sql handler in the future. + let table = self.sql_handler.get_table(&table_ref).await?; + let req = SqlHandler::alter_to_request( + alter_table, + table_ref, + table.table_info().ident.table_id, + )?; self.sql_handler .execute(SqlRequest::Alter(req), query_ctx) .await @@ -114,10 +120,13 @@ impl Instance { Statement::DropTable(drop_table) => { let (catalog_name, schema_name, table_name) = table_idents_to_full_name(drop_table.table_name(), query_ctx.clone())?; + let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name); + let table = self.sql_handler.get_table(&table_ref).await?; let req = DropTableRequest { catalog_name, schema_name, table_name, + table_id: table.table_info().ident.table_id, }; self.sql_handler .execute(SqlRequest::DropTable(req), query_ctx) diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index e6c3121632..89096c5e17 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -14,6 +14,7 @@ use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, FlushTableExpr}; use common_catalog::consts::IMMUTABLE_FILE_ENGINE; +use common_catalog::format_full_table_name; use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; use common_query::Output; use common_telemetry::info; @@ -22,8 +23,8 @@ use snafu::prelude::*; use table::requests::{DropTableRequest, FlushTableRequest}; use crate::error::{ - AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu, - IncorrectInternalStateSnafu, Result, + AlterExprToRequestSnafu, BumpTableIdSnafu, CatalogSnafu, CreateExprToRequestSnafu, + IncorrectInternalStateSnafu, Result, TableNotFoundSnafu, }; use crate::instance::Instance; use crate::sql::SqlRequest; @@ -69,17 +70,45 @@ impl Instance { } pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> Result { - let request = alter_expr_to_request(expr).context(AlterExprToRequestSnafu)?; + let table = self + .catalog_manager + .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name( + &expr.catalog_name, + &expr.schema_name, + &expr.table_name, + ), + })?; + + let request = alter_expr_to_request(table.table_info().ident.table_id, expr) + .context(AlterExprToRequestSnafu)?; self.sql_handler() .execute(SqlRequest::Alter(request), QueryContext::arc()) .await } pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> Result { + let table = self + .catalog_manager + .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name( + &expr.catalog_name, + &expr.schema_name, + &expr.table_name, + ), + })?; + let req = DropTableRequest { catalog_name: expr.catalog_name, schema_name: expr.schema_name, table_name: expr.table_name, + table_id: table.table_info().ident.table_id, }; self.sql_handler() .execute(SqlRequest::DropTable(req), QueryContext::arc()) diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index 32537af9ea..c786c7f321 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -19,6 +19,7 @@ use snafu::prelude::*; use sql::statements::alter::{AlterTable, AlterTableOperation}; use sql::statements::column_def_to_schema; use table::engine::TableReference; +use table::metadata::TableId; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest}; use table_procedure::AlterTableProcedure; @@ -60,6 +61,7 @@ impl SqlHandler { pub(crate) fn alter_to_request( alter_table: AlterTable, table_ref: TableReference, + table_id: TableId, ) -> Result { let alter_kind = match &alter_table.alter_operation() { AlterTableOperation::AddConstraint(table_constraint) => { @@ -91,6 +93,7 @@ impl SqlHandler { catalog_name: table_ref.catalog.to_string(), schema_name: table_ref.schema.to_string(), table_name: table_ref.table.to_string(), + table_id, alter_kind, }) } @@ -128,6 +131,7 @@ mod tests { let req = SqlHandler::alter_to_request( alter_table, TableReference::full("greptime", "public", "my_metric_1"), + 1, ) .unwrap(); assert_eq!(req.catalog_name, "greptime"); @@ -154,6 +158,7 @@ mod tests { let req = SqlHandler::alter_to_request( alter_table, TableReference::full("greptime", "public", "test_table"), + 1, ) .unwrap(); assert_eq!(req.catalog_name, "greptime"); diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs index 1814a33066..0101824b8c 100644 --- a/src/file-table-engine/src/engine/immutable.rs +++ b/src/file-table-engine/src/engine/immutable.rs @@ -25,7 +25,7 @@ use object_store::ObjectStore; use snafu::ResultExt; use table::engine::{table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference}; use table::error::TableOperationSnafu; -use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; +use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use table::{error as table_error, Result as TableResult, Table, TableRef}; use tokio::sync::Mutex; @@ -88,16 +88,12 @@ impl TableEngine for ImmutableFileTableEngine { .fail() } - fn get_table( - &self, - _ctx: &EngineContext, - table_ref: &TableReference, - ) -> TableResult> { - Ok(self.inner.get_table(table_ref)) + fn get_table(&self, _ctx: &EngineContext, table_id: TableId) -> TableResult> { + Ok(self.inner.get_table(table_id)) } - fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool { - self.inner.get_table(table_ref).is_some() + fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool { + self.inner.get_table(table_id).is_some() } async fn drop_table( @@ -151,8 +147,8 @@ impl TableEngineProcedure for ImmutableFileTableEngine { #[cfg(test)] impl ImmutableFileTableEngine { - pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> { - self.inner.close_table(table_ref).await + pub async fn close_table(&self, table_id: TableId) -> TableResult<()> { + self.inner.close_table(table_id).await } } @@ -173,10 +169,10 @@ impl ImmutableFileTableEngine { } struct EngineInner { - /// All tables opened by the engine. Map key is formatted [TableReference]. + /// All tables opened by the engine. /// /// Writing to `tables` should also hold the `table_mutex`. - tables: RwLock>, + tables: RwLock>, object_store: ObjectStore, /// Table mutex is used to protect the operations such as creating/opening/closing @@ -199,6 +195,7 @@ impl EngineInner { request: CreateTableRequest, ) -> Result { let CreateTableRequest { + id: table_id, catalog_name, schema_name, table_name, @@ -212,7 +209,7 @@ impl EngineInner { table: &table_name, }; - if let Some(table) = self.get_table(&table_ref) { + if let Some(table) = self.get_table(table_id) { return if create_if_not_exists { Ok(table) } else { @@ -223,14 +220,13 @@ impl EngineInner { let table_schema = Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?); - let table_id = request.id; let table_dir = table_dir(&catalog_name, &schema_name, table_id); let table_full_name = table_ref.to_string(); let _lock = self.table_mutex.lock().await; // Checks again, read lock should be enough since we are guarded by the mutex. - if let Some(table) = self.get_table_by_full_name(&table_full_name) { + if let Some(table) = self.get_table(table_id) { return if request.create_if_not_exists { Ok(table) } else { @@ -279,27 +275,20 @@ impl EngineInner { table_id ); - self.tables - .write() - .unwrap() - .insert(table_full_name, table.clone()); + self.tables.write().unwrap().insert(table_id, table.clone()); Ok(table) } - fn get_table_by_full_name(&self, full_name: &str) -> Option { + fn get_table(&self, table_id: TableId) -> Option { self.tables .read() .unwrap() - .get(full_name) + .get(&table_id) .cloned() .map(|table| table as _) } - fn get_table(&self, table_ref: &TableReference) -> Option { - self.get_table_by_full_name(&table_ref.to_string()) - } - async fn open_table( &self, _ctx: &EngineContext, @@ -309,6 +298,7 @@ impl EngineInner { catalog_name, schema_name, table_name, + table_id, .. } = request; let table_ref = TableReference { @@ -317,16 +307,15 @@ impl EngineInner { table: &table_name, }; - let table_full_name = table_ref.to_string(); - - if let Some(table) = self.get_table_by_full_name(&table_full_name) { + if let Some(table) = self.get_table(table_id) { return Ok(Some(table)); } + let table_full_name = table_ref.to_string(); let table = { let _lock = self.table_mutex.lock().await; // Checks again, read lock should be enough since we are guarded by the mutex. - if let Some(table) = self.get_table_by_full_name(&table_full_name) { + if let Some(table) = self.get_table(table_id) { return Ok(Some(table)); } @@ -350,10 +339,7 @@ impl EngineInner { .context(table_error::TableOperationSnafu)?, ); - self.tables - .write() - .unwrap() - .insert(table_full_name, table.clone()); + self.tables.write().unwrap().insert(table_id, table.clone()); Some(table as _) }; @@ -375,7 +361,7 @@ impl EngineInner { let table_full_name = table_ref.to_string(); let _lock = self.table_mutex.lock().await; - if let Some(table) = self.get_table_by_full_name(&table_full_name) { + if let Some(table) = self.get_table(req.table_id) { let table_id = table.table_info().ident.table_id; let table_dir = table_dir(&req.catalog_name, &req.schema_name, table_id); @@ -389,7 +375,7 @@ impl EngineInner { .context(DropTableSnafu { table_name: &table_full_name, })?; - self.tables.write().unwrap().remove(&table_full_name); + self.tables.write().unwrap().remove(&req.table_id); Ok(true) } else { @@ -429,12 +415,10 @@ impl EngineInner { #[cfg(test)] impl EngineInner { - pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> { - let full_name = table_ref.to_string(); - + pub async fn close_table(&self, table_id: TableId) -> TableResult<()> { let _lock = self.table_mutex.lock().await; - if let Some(table) = self.get_table_by_full_name(&full_name) { + if let Some(table) = self.get_table(table_id) { let regions = Vec::new(); table .close(®ions) @@ -443,7 +427,7 @@ impl EngineInner { .context(table_error::TableOperationSnafu)?; } - self.tables.write().unwrap().remove(&full_name); + self.tables.write().unwrap().remove(&table_id); Ok(()) } diff --git a/src/file-table-engine/src/engine/procedure/create.rs b/src/file-table-engine/src/engine/procedure/create.rs index 836ae84364..b99aacea17 100644 --- a/src/file-table-engine/src/engine/procedure/create.rs +++ b/src/file-table-engine/src/engine/procedure/create.rs @@ -92,14 +92,13 @@ impl CreateImmutableFileTable { fn on_prepare(&mut self) -> Result { let engine_ctx = EngineContext::default(); - let table_ref = self.data.table_ref(); // Safety: Current get_table implementation always returns Ok. - if self.engine.table_exists(&engine_ctx, &table_ref) { + if self.engine.table_exists(&engine_ctx, self.data.request.id) { // The table already exists. ensure!( self.data.request.create_if_not_exists, TableExistsSnafu { - table_name: table_ref.to_string(), + table_name: self.data.table_ref().to_string(), } ); @@ -113,8 +112,7 @@ impl CreateImmutableFileTable { async fn on_create_table(&mut self) -> Result { let engine_ctx = EngineContext::default(); - let table_ref = self.data.table_ref(); - if self.engine.table_exists(&engine_ctx, &table_ref) { + if self.engine.table_exists(&engine_ctx, self.data.request.id) { // Table already created. We don't need to check create_if_not_exists as // we have checked it in prepare state. return Ok(Status::Done); diff --git a/src/file-table-engine/src/engine/tests.rs b/src/file-table-engine/src/engine/tests.rs index eb98d0fdf5..3ad89fcca8 100644 --- a/src/file-table-engine/src/engine/tests.rs +++ b/src/file-table-engine/src/engine/tests.rs @@ -16,7 +16,7 @@ use std::assert_matches::assert_matches; use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, IMMUTABLE_FILE_ENGINE}; -use table::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference}; +use table::engine::{EngineContext, TableEngine, TableEngineProcedure}; use table::requests::{AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest}; use table::{error as table_error, Table}; @@ -35,14 +35,9 @@ async fn test_get_table() { .. } = test_util::setup_test_engine_and_table("test_get_table").await; let table_info = table.table_info(); - let table_ref = TableReference { - catalog: &table_info.catalog_name, - schema: &table_info.schema_name, - table: &table_info.name, - }; let got = table_engine - .get_table(&EngineContext::default(), &table_ref) + .get_table(&EngineContext::default(), table_info.ident.table_id) .unwrap() .unwrap(); @@ -53,21 +48,17 @@ async fn test_get_table() { async fn test_open_table() { common_telemetry::init_default_ut_logging(); let ctx = EngineContext::default(); + // the test table id is 1 + let table_id = 1; let open_req = OpenTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: test_util::TEST_TABLE_NAME.to_string(), // the test table id is 1 - table_id: 1, + table_id, region_numbers: vec![0], }; - let table_ref = TableReference { - catalog: DEFAULT_CATALOG_NAME, - schema: DEFAULT_SCHEMA_NAME, - table: test_util::TEST_TABLE_NAME, - }; - let TestEngineComponents { table_engine, table_ref: table, @@ -77,7 +68,7 @@ async fn test_open_table() { assert_eq!(IMMUTABLE_FILE_ENGINE, table_engine.name()); - table_engine.close_table(&table_ref).await.unwrap(); + table_engine.close_table(table_id).await.unwrap(); let reopened = table_engine .open_table(&ctx, open_req.clone()) @@ -101,21 +92,17 @@ async fn test_open_table() { async fn test_close_all_table() { common_telemetry::init_default_ut_logging(); - let table_ref = TableReference { - catalog: DEFAULT_CATALOG_NAME, - schema: DEFAULT_SCHEMA_NAME, - table: test_util::TEST_TABLE_NAME, - }; - let TestEngineComponents { table_engine, dir: _dir, + table_ref: table, .. } = test_util::setup_test_engine_and_table("test_close_all_table").await; table_engine.close().await.unwrap(); - let exist = table_engine.table_exists(&EngineContext::default(), &table_ref); + let table_id = table.table_info().ident.table_id; + let exist = table_engine.table_exists(&EngineContext::default(), table_id); assert!(!exist); } @@ -126,6 +113,7 @@ async fn test_alter_table() { let TestEngineComponents { table_engine, dir: _dir, + table_ref, .. } = test_util::setup_test_engine_and_table("test_alter_table").await; @@ -133,6 +121,7 @@ async fn test_alter_table() { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TEST_TABLE_NAME.to_string(), + table_id: table_ref.table_info().ident.table_id, alter_kind: AlterKind::RenameTable { new_table_name: "foo".to_string(), }, @@ -151,12 +140,6 @@ async fn test_alter_table() { async fn test_drop_table() { common_telemetry::init_default_ut_logging(); - let drop_req = DropTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: TEST_TABLE_NAME.to_string(), - }; - let TestEngineComponents { table_engine, object_store, @@ -167,12 +150,13 @@ async fn test_drop_table() { } = test_util::setup_test_engine_and_table("test_drop_table").await; let table_info = table.table_info(); - let table_ref = TableReference { - catalog: &table_info.catalog_name, - schema: &table_info.schema_name, - table: &table_info.name, - }; + let drop_req = DropTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TEST_TABLE_NAME.to_string(), + table_id: table_info.ident.table_id, + }; let dropped = table_engine .drop_table(&EngineContext::default(), drop_req) .await @@ -180,7 +164,7 @@ async fn test_drop_table() { assert!(dropped); - let exist = table_engine.table_exists(&EngineContext::default(), &table_ref); + let exist = table_engine.table_exists(&EngineContext::default(), table_info.ident.table_id); assert!(!exist); // check table_dir manifest @@ -203,13 +187,14 @@ async fn test_create_drop_table_procedure() { let engine_ctx = EngineContext::default(); // Test create table by procedure. let create_request = test_util::new_create_request(schema); + let table_id = create_request.id; let mut procedure = table_engine .create_table_procedure(&engine_ctx, create_request.clone()) .unwrap(); common_procedure_test::execute_procedure_until_done(&mut procedure).await; assert!(table_engine - .get_table(&engine_ctx, &create_request.table_ref()) + .get_table(&engine_ctx, table_id) .unwrap() .is_some()); @@ -218,6 +203,7 @@ async fn test_create_drop_table_procedure() { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TEST_TABLE_NAME.to_string(), + table_id, }; let mut procedure = table_engine .drop_table_procedure(&engine_ctx, drop_request) @@ -225,7 +211,7 @@ async fn test_create_drop_table_procedure() { common_procedure_test::execute_procedure_until_done(&mut procedure).await; assert!(table_engine - .get_table(&engine_ctx, &create_request.table_ref()) + .get_table(&engine_ctx, table_id) .unwrap() .is_none()); } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index f20d0596b0..cbe26a2642 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -543,8 +543,11 @@ impl DistInstance { table_name: format_full_table_name(catalog_name, schema_name, table_name), })?; - let request = common_grpc_expr::alter_expr_to_request(expr.clone()) - .context(AlterExprToRequestSnafu)?; + let request = common_grpc_expr::alter_expr_to_request( + table.table_info().ident.table_id, + expr.clone(), + ) + .context(AlterExprToRequestSnafu)?; let mut context = AlterContext::with_capacity(1); diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 87fa2622fb..384c7dab6e 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -382,6 +382,7 @@ impl DistTable { schema_name, table_name, alter_kind, + table_id: _table_id, } = request; let alter_expr = context diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index dbdf80ee1d..d64ed4bb4e 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -42,8 +42,7 @@ use table::engine::{ }; use table::metadata::{TableId, TableInfo, TableVersion}; use table::requests::{ - AlterKind, AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, - OpenTableRequest, + AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, }; use table::{error as table_error, Result as TableResult, Table, TableRef}; @@ -102,9 +101,8 @@ impl TableEngine for MitoEngine { .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; - let table_ref = request.table_ref(); - let _lock = self.inner.table_mutex.lock(table_ref.to_string()).await; - if let Some(table) = self.inner.get_mito_table(&table_ref) { + let _lock = self.inner.table_mutex.lock(request.id).await; + if let Some(table) = self.inner.get_mito_table(request.id) { if request.create_if_not_exists { return Ok(table); } else { @@ -148,26 +146,10 @@ impl TableEngine for MitoEngine { ) -> TableResult { let _timer = common_telemetry::timer!(metrics::MITO_ALTER_TABLE_ELAPSED); - if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { - let mut table_ref = req.table_ref(); - table_ref.table = new_table_name; - if self.inner.get_mito_table(&table_ref).is_some() { - return TableExistsSnafu { - table_name: table_ref.to_string(), - } - .fail() - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; - } - } - let mut procedure = AlterMitoTable::new(req, self.inner.clone()) .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; - // TODO(yingwen): Rename has concurrent issue without the procedure runtime. But - // users can't use this method to alter a table so it is still safe. We should - // refactor the table engine to avoid using table name as key. procedure .engine_alter_table() .await @@ -175,16 +157,12 @@ impl TableEngine for MitoEngine { .context(table_error::TableOperationSnafu) } - fn get_table( - &self, - _ctx: &EngineContext, - table_ref: &TableReference, - ) -> TableResult> { - Ok(self.inner.get_table(table_ref)) + fn get_table(&self, _ctx: &EngineContext, table_id: TableId) -> TableResult> { + Ok(self.inner.get_table(table_id)) } - fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool { - self.inner.get_table(table_ref).is_some() + fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool { + self.inner.get_table(table_id).is_some() } async fn drop_table( @@ -254,16 +232,16 @@ impl TableEngineProcedure for MitoEngine { } pub(crate) struct MitoEngineInner { - /// All tables opened by the engine. Map key is formatted [TableReference]. + /// All tables opened by the engine. /// /// Writing to `tables` should also hold the `table_mutex`. - tables: DashMap>>, + tables: DashMap>>, object_store: ObjectStore, compress_type: CompressionType, storage_engine: S, /// Table mutex is used to protect the operations such as creating/opening/closing /// a table, to avoid things like opening the same table simultaneously. - table_mutex: Arc>, + table_mutex: Arc>, } fn build_row_key_desc( @@ -429,11 +407,6 @@ impl MitoEngineInner { let catalog_name = &request.catalog_name; let schema_name = &request.schema_name; let table_name = &request.table_name; - let table_ref = TableReference { - catalog: catalog_name, - schema: schema_name, - table: table_name, - }; let table_id = request.table_id; let engine_ctx = StorageEngineContext::default(); @@ -463,6 +436,11 @@ impl MitoEngineInner { let mut regions = HashMap::with_capacity(table_info.meta.region_numbers.len()); + let table_ref = TableReference { + catalog: catalog_name, + schema: schema_name, + table: table_name, + }; for region_number in &request.region_numbers { let region = self .open_region(&engine_ctx, table_id, *region_number, &table_ref, &opts) @@ -556,16 +534,7 @@ impl MitoEngineInner { ctx: &EngineContext, request: OpenTableRequest, ) -> TableResult> { - let catalog_name = &request.catalog_name; - let schema_name = &request.schema_name; - let table_name = &request.table_name; - let table_ref = TableReference { - catalog: catalog_name, - schema: schema_name, - table: table_name, - }; - - if let Some(table) = self.get_table(&table_ref) { + if let Some(table) = self.get_table(request.table_id) { if let Some(table) = self.check_regions(table, &request.region_numbers)? { return Ok(Some(table)); } @@ -573,11 +542,10 @@ impl MitoEngineInner { // Acquires the mutex before opening a new table. let table = { - let table_name_key = table_ref.to_string(); - let _lock = self.table_mutex.lock(table_name_key.clone()).await; + let _lock = self.table_mutex.lock(request.table_id).await; // Checks again, read lock should be enough since we are guarded by the mutex. - if let Some(table) = self.get_mito_table(&table_ref) { + if let Some(table) = self.get_mito_table(request.table_id) { // Contains all regions or target region if let Some(table) = self.check_regions(table.clone(), &request.region_numbers)? { Some(table) @@ -593,7 +561,7 @@ impl MitoEngineInner { let table = self.recover_table(ctx, request.clone()).await?; if let Some(table) = table { // already locked - self.tables.insert(table_ref.to_string(), table.clone()); + self.tables.insert(request.table_id, table.clone()); Some(table as _) } else { @@ -604,8 +572,8 @@ impl MitoEngineInner { logging::info!( "Mito engine opened table: {} in schema: {}", - table_name, - schema_name + request.table_name, + request.schema_name ); Ok(table) @@ -613,10 +581,8 @@ impl MitoEngineInner { async fn drop_table(&self, request: DropTableRequest) -> TableResult { // Remove the table from the engine to avoid further access from users. - let table_ref = request.table_ref(); - - let _lock = self.table_mutex.lock(table_ref.to_string()).await; - let removed_table = self.tables.remove(&table_ref.to_string()); + let _lock = self.table_mutex.lock(request.table_id).await; + let removed_table = self.tables.remove(&request.table_id); // Close the table to close all regions. Closing a region is idempotent. if let Some((_, table)) = &removed_table { let regions = table.region_ids(); @@ -663,17 +629,13 @@ impl MitoEngineInner { Ok(Some((manifest, table_info))) } - fn get_table(&self, table_ref: &TableReference) -> Option { - self.tables - .get(&table_ref.to_string()) - .map(|en| en.value().clone() as _) + fn get_table(&self, table_id: TableId) -> Option { + self.tables.get(&table_id).map(|en| en.value().clone() as _) } /// Returns the [MitoTable]. - fn get_mito_table(&self, table_ref: &TableReference) -> Option>> { - self.tables - .get(&table_ref.to_string()) - .map(|en| en.value().clone()) + fn get_mito_table(&self, table_id: TableId) -> Option>> { + self.tables.get(&table_id).map(|en| en.value().clone()) } async fn close(&self) -> TableResult<()> { @@ -696,8 +658,7 @@ impl MitoEngineInner { } async fn close_table(&self, request: CloseTableRequest) -> TableResult { - let table_ref = request.table_ref(); - if let Some(table) = self.get_mito_table(&table_ref) { + if let Some(table) = self.get_mito_table(request.table_id) { return self .close_table_inner(table, Some(&request.region_numbers), request.flush) .await; @@ -713,13 +674,8 @@ impl MitoEngineInner { flush: bool, ) -> TableResult { let info = table.table_info(); - let table_ref = TableReference { - catalog: &info.catalog_name, - schema: &info.schema_name, - table: &info.name, - }; let table_id = info.ident.table_id; - let _lock = self.table_mutex.lock(table_ref.to_string()).await; + let _lock = self.table_mutex.lock(table_id).await; let all_regions = table.region_ids(); let regions = regions.unwrap_or(&all_regions); @@ -738,12 +694,12 @@ impl MitoEngineInner { } if table.is_releasable() { - self.tables.remove(&table_ref.to_string()); + self.tables.remove(&table_id); logging::info!( "Mito engine closed table: {} in schema: {}", - table_ref.table, - table_ref.schema, + info.name, + info.schema_name, ); return Ok(CloseTableResult::Released(removed_regions)); } diff --git a/src/mito/src/engine/procedure/alter.rs b/src/mito/src/engine/procedure/alter.rs index 6f53ee5585..fadd14e49b 100644 --- a/src/mito/src/engine/procedure/alter.rs +++ b/src/mito/src/engine/procedure/alter.rs @@ -29,9 +29,7 @@ use table::requests::{AlterKind, AlterTableRequest}; use table::{Table, TableRef}; use crate::engine::MitoEngineInner; -use crate::error::{ - TableExistsSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu, -}; +use crate::error::{TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu}; use crate::manifest::action::{TableChange, TableMetaAction, TableMetaActionList}; use crate::metrics; use crate::table::MitoTable; @@ -39,7 +37,6 @@ use crate::table::MitoTable; /// Procedure to alter a [MitoTable]. pub(crate) struct AlterMitoTable { data: AlterTableData, - engine_inner: Arc>, table: Arc>, /// The table info after alteration. new_info: Option, @@ -107,18 +104,16 @@ impl AlterMitoTable { table_version: 0, }; let table_ref = data.table_ref(); - let table = - engine_inner - .get_mito_table(&table_ref) - .with_context(|| TableNotFoundSnafu { - table_name: table_ref.to_string(), - })?; + let table = engine_inner + .get_mito_table(data.request.table_id) + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; let info = table.table_info(); data.table_version = info.ident.version; Ok(AlterMitoTable { data, - engine_inner, table, new_info: None, alter_op: None, @@ -148,16 +143,14 @@ impl AlterMitoTable { fn from_json(json: &str, engine_inner: Arc>) -> Result { let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?; let table_ref = data.table_ref(); - let table = - engine_inner - .get_mito_table(&table_ref) - .with_context(|| TableNotFoundSnafu { - table_name: table_ref.to_string(), - })?; + let table = engine_inner + .get_mito_table(data.request.table_id) + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; Ok(AlterMitoTable { data, - engine_inner, table, new_info: None, alter_op: None, @@ -176,17 +169,8 @@ impl AlterMitoTable { } ); - if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { - let mut table_ref = self.data.table_ref(); - table_ref.table = new_table_name; - ensure!( - self.engine_inner.get_mito_table(&table_ref).is_none(), - TableExistsSnafu { - table_name: table_ref.to_string(), - } - ); - } - + // We don't check the table name in the table engine as it is the catalog + // manager's duty to ensure the table name is unused. self.data.state = AlterTableState::EngineAlterTable; Ok(Status::executing(true)) @@ -252,22 +236,6 @@ impl AlterMitoTable { // Update in memory metadata of the table. self.table.set_table_info(new_info.clone()); - // Rename key in tables map. - if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { - let mut table_ref = self.data.table_ref(); - let removed = { - let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string()); - self.engine_inner.tables.remove(&table_ref.to_string()) - }; - ensure!(removed.is_some(), TableNotFoundSnafu { table_name }); - - table_ref.table = new_table_name.as_str(); - let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string()); - self.engine_inner - .tables - .insert(table_ref.to_string(), self.table.clone()); - } - Ok(self.table.clone()) } @@ -363,19 +331,15 @@ mod tests { procedure_test_util::execute_procedure_until_done(&mut procedure).await; // Get metadata of the created table. - let table_ref = TableReference { - catalog: &request.catalog_name, - schema: &request.schema_name, - table: &request.table_name, - }; let table = table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, request.id) .unwrap() .unwrap(); let old_info = table.table_info(); let old_meta = &old_info.meta; // Alter the table. + let table_id = request.id; let request = new_add_columns_req(); let mut procedure = table_engine .alter_table_procedure(&engine_ctx, request.clone()) @@ -384,7 +348,7 @@ mod tests { // Validate. let table = table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, table_id) .unwrap() .unwrap(); let new_info = table.table_info(); @@ -405,7 +369,7 @@ mod tests { ConcreteDataType::string_datatype(), true, ); - let request = new_add_columns_req_with_location(&new_tag, &new_field); + let request = new_add_columns_req_with_location(table_id, &new_tag, &new_field); let mut procedure = table_engine .alter_table_procedure(&engine_ctx, request.clone()) .unwrap(); @@ -413,7 +377,7 @@ mod tests { // Validate. let table = table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, table_id) .unwrap() .unwrap(); let new_info = table.table_info(); @@ -456,6 +420,7 @@ mod tests { procedure_test_util::execute_procedure_until_done(&mut procedure).await; // Add columns. + let table_id = request.id; let request = new_add_columns_req(); let mut procedure = table_engine .alter_table_procedure(&engine_ctx, request.clone()) @@ -463,13 +428,8 @@ mod tests { procedure_test_util::execute_procedure_until_done(&mut procedure).await; // Get metadata. - let table_ref = TableReference { - catalog: &request.catalog_name, - schema: &request.schema_name, - table: &request.table_name, - }; let table = table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, table_id) .unwrap() .unwrap(); let old_info = table.table_info(); @@ -521,13 +481,8 @@ mod tests { procedure_test_util::execute_procedure_until_done(&mut procedure).await; // Get metadata of the created table. - let mut table_ref = TableReference { - catalog: &create_request.catalog_name, - schema: &create_request.schema_name, - table: &create_request.table_name, - }; let table = table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, create_request.id) .unwrap() .unwrap(); @@ -546,12 +501,7 @@ mod tests { let info = table.table_info(); assert_eq!(new_name, info.name); assert!(table_engine - .get_table(&engine_ctx, &table_ref) - .unwrap() - .is_none()); - table_ref.table = &new_name; - assert!(table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, create_request.id) .unwrap() .is_some()); } diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index 563278430b..93c75ff606 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -131,7 +131,12 @@ impl CreateMitoTable { let table_ref = self.creator.data.table_ref(); logging::debug!("on prepare create table {}", table_ref); - if self.creator.engine_inner.get_table(&table_ref).is_some() { + if self + .creator + .engine_inner + .get_table(self.creator.data.request.id) + .is_some() + { // If the table already exists. ensure!( self.creator.data.request.create_if_not_exists, @@ -152,14 +157,14 @@ impl CreateMitoTable { async fn on_engine_create_table(&mut self) -> Result { // In this state, we can ensure we are able to create a new table. let table_ref = self.creator.data.table_ref(); - logging::debug!("on engine create table {}", table_ref); + let table_id = self.creator.data.request.id; + logging::debug!( + "on engine create table {}, table_id: {}", + table_ref, + table_id + ); - let _lock = self - .creator - .engine_inner - .table_mutex - .lock(table_ref.to_string()) - .await; + let _lock = self.creator.engine_inner.table_mutex.lock(table_id).await; self.creator.create_table().await?; Ok(Status::Done) @@ -209,14 +214,13 @@ impl TableCreator { self.data.request.id, ); - let table_ref = self.data.table_ref(); // It is possible that the procedure retries `CREATE TABLE` many times, so we // return the table if it exists. - if let Some(table) = self.engine_inner.get_table(&table_ref) { + if let Some(table) = self.engine_inner.get_table(self.data.request.id) { return Ok(table.clone()); } - logging::debug!("Creator create table {}", table_ref); + logging::debug!("Creator create table {}", self.data.table_ref()); self.create_regions(&table_dir).await?; @@ -313,7 +317,6 @@ impl TableCreator { /// Writes metadata to the table manifest. async fn write_table_manifest(&mut self, table_dir: &str) -> Result { // Try to open the table first, as the table manifest might already exist. - let table_ref = self.data.table_ref(); if let Some((manifest, table_info)) = self .engine_inner .recover_table_manifest_and_info(&self.data.request.table_name, table_dir) @@ -323,7 +326,7 @@ impl TableCreator { self.engine_inner .tables - .insert(table_ref.to_string(), table.clone()); + .insert(self.data.request.id, table.clone()); return Ok(table); } @@ -333,7 +336,7 @@ impl TableCreator { self.engine_inner .tables - .insert(table_ref.to_string(), table.clone()); + .insert(self.data.request.id, table.clone()); Ok(table) } @@ -432,13 +435,8 @@ mod tests { .unwrap(); procedure_test_util::execute_procedure_until_done(&mut procedure).await; - let table_ref = TableReference { - catalog: &request.catalog_name, - schema: &request.schema_name, - table: &request.table_name, - }; assert!(table_engine - .get_table(&EngineContext::default(), &table_ref) + .get_table(&EngineContext::default(), request.id) .unwrap() .is_some()); } diff --git a/src/mito/src/engine/procedure/drop.rs b/src/mito/src/engine/procedure/drop.rs index 2beb92088b..c447db10e5 100644 --- a/src/mito/src/engine/procedure/drop.rs +++ b/src/mito/src/engine/procedure/drop.rs @@ -84,8 +84,7 @@ impl DropMitoTable { state: DropTableState::Prepare, request, }; - let table_ref = data.table_ref(); - let table = engine_inner.get_mito_table(&table_ref); + let table = engine_inner.get_mito_table(data.request.table_id); Ok(DropMitoTable { data, @@ -115,8 +114,7 @@ impl DropMitoTable { /// Recover the procedure from json. fn from_json(json: &str, engine_inner: Arc>) -> Result { let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?; - let table_ref = data.table_ref(); - let table = engine_inner.get_mito_table(&table_ref); + let table = engine_inner.get_mito_table(data.request.table_id); Ok(DropMitoTable { data, @@ -182,6 +180,7 @@ mod tests { procedure_test_util::execute_procedure_until_done(&mut procedure).await; // Drop the table. + let table_id = request.id; let request = test_util::new_drop_request(); let mut procedure = table_engine .drop_table_procedure(&engine_ctx, request.clone()) @@ -189,13 +188,8 @@ mod tests { procedure_test_util::execute_procedure_until_done(&mut procedure).await; // The table is dropped. - let table_ref = TableReference { - catalog: &request.catalog_name, - schema: &request.schema_name, - table: &request.table_name, - }; assert!(table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, table_id) .unwrap() .is_none()); } diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index e7f0178279..816ccc68dc 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -40,7 +40,7 @@ use table::Table; use super::*; use crate::table::test_util::{ - self, new_insert_request, schema_for_test, setup_table, TestEngineComponents, TABLE_NAME, + self, new_insert_request, setup_table, TestEngineComponents, TABLE_NAME, }; pub fn has_parquet_file(sst_dir: &str) -> bool { @@ -537,11 +537,16 @@ fn test_region_id() { assert_eq!(18446744069414584330, region_id(u32::MAX, 10)); } -fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> AlterTableRequest { +fn new_add_columns_req( + table_id: TableId, + new_tag: &ColumnSchema, + new_field: &ColumnSchema, +) -> AlterTableRequest { AlterTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TABLE_NAME.to_string(), + table_id, alter_kind: AlterKind::AddColumns { columns: vec![ AddColumnRequest { @@ -560,6 +565,7 @@ fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> Alte } pub(crate) fn new_add_columns_req_with_location( + table_id: TableId, new_tag: &ColumnSchema, new_field: &ColumnSchema, ) -> AlterTableRequest { @@ -567,6 +573,7 @@ pub(crate) fn new_add_columns_req_with_location( catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TABLE_NAME.to_string(), + table_id, alter_kind: AlterKind::AddColumns { columns: vec![ AddColumnRequest { @@ -597,7 +604,7 @@ async fn test_alter_table_add_column() { let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true); - let req = new_add_columns_req(&new_tag, &new_field); + let req = new_add_columns_req(table.table_info().ident.table_id, &new_tag, &new_field); let table = table_engine .alter_table(&EngineContext::default(), req) .await @@ -633,7 +640,7 @@ async fn test_alter_table_add_column() { ConcreteDataType::string_datatype(), true, ); - let req = new_add_columns_req_with_location(&new_tag, &new_field); + let req = new_add_columns_req_with_location(new_info.ident.table_id, &new_tag, &new_field); let table = table_engine .alter_table(&EngineContext::default(), req) .await @@ -653,13 +660,13 @@ async fn test_alter_table_add_column() { #[tokio::test] async fn test_alter_table_remove_column() { - let (_engine, table_engine, _table, _object_store, _dir) = + let (_engine, table_engine, table, _object_store, _dir) = test_util::setup_mock_engine_and_table().await; // Add two columns to the table first. let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true); - let req = new_add_columns_req(&new_tag, &new_field); + let req = new_add_columns_req(table.table_info().ident.table_id, &new_tag, &new_field); let table = table_engine .alter_table(&EngineContext::default(), req) .await @@ -674,6 +681,7 @@ async fn test_alter_table_remove_column() { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TABLE_NAME.to_string(), + table_id: table.table_info().ident.table_id, alter_kind: AlterKind::DropColumns { names: vec![String::from("memory"), String::from("my_field")], }, @@ -706,45 +714,13 @@ async fn test_alter_rename_table() { let TestEngineComponents { table_engine, storage_engine, + table_ref, object_store, dir: _dir, .. } = test_util::setup_test_engine_and_table().await; let ctx = EngineContext::default(); - - // register another table - let another_name = "another_table"; - let req = CreateTableRequest { - id: 1024, - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: another_name.to_string(), - desc: Some("another test table".to_string()), - schema: RawSchema::from(&schema_for_test()), - region_numbers: vec![0], - primary_key_indices: vec![0], - create_if_not_exists: true, - table_options: TableOptions::default(), - engine: MITO_ENGINE.to_string(), - }; - table_engine - .create_table(&ctx, req) - .await - .expect("create table must succeed"); - // test renaming a table with an existing name. - let req = AlterTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: TABLE_NAME.to_string(), - alter_kind: AlterKind::RenameTable { - new_table_name: another_name.to_string(), - }, - }; - let err = table_engine.alter_table(&ctx, req).await.err().unwrap(); - assert!( - err.to_string().contains("Table already exists"), - "Unexpected error: {err}" - ); + let table_id = table_ref.table_info().ident.table_id; let new_table_name = "test_table"; // test rename table @@ -752,6 +728,7 @@ async fn test_alter_rename_table() { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TABLE_NAME.to_string(), + table_id, alter_kind: AlterKind::RenameTable { new_table_name: new_table_name.to_string(), }, @@ -765,7 +742,7 @@ async fn test_alter_rename_table() { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: new_table_name.to_string(), - table_id: 1, + table_id, region_numbers: vec![0], }; @@ -794,17 +771,13 @@ async fn test_drop_table() { let engine_ctx = EngineContext {}; let table_info = table.table_info(); - let table_reference = TableReference { - catalog: DEFAULT_CATALOG_NAME, - schema: DEFAULT_SCHEMA_NAME, - table: &table_info.name, - }; + let table_id = 1; let create_table_request = CreateTableRequest { - id: 1, + id: table_id, catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_info.name.to_string(), + table_name: table_info.name.clone(), schema: RawSchema::from(&*table_info.meta.schema), create_if_not_exists: true, desc: None, @@ -819,23 +792,25 @@ async fn test_drop_table() { .await .unwrap(); assert_eq!(table_info, created_table.table_info()); - assert!(table_engine.table_exists(&engine_ctx, &table_reference)); + assert!(table_engine.table_exists(&engine_ctx, table_id)); let drop_table_request = DropTableRequest { - catalog_name: table_reference.catalog.to_string(), - schema_name: table_reference.schema.to_string(), - table_name: table_reference.table.to_string(), + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_info.name.clone(), + table_id, }; let table_dropped = table_engine .drop_table(&engine_ctx, drop_table_request) .await .unwrap(); assert!(table_dropped); - assert!(!table_engine.table_exists(&engine_ctx, &table_reference)); + assert!(!table_engine.table_exists(&engine_ctx, table_id)); // should be able to re-create + let table_id = 2; let request = CreateTableRequest { - id: 2, + id: table_id, catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_info.name.to_string(), @@ -848,7 +823,7 @@ async fn test_drop_table() { engine: MITO_ENGINE.to_string(), }; table_engine.create_table(&ctx, request).await.unwrap(); - assert!(table_engine.table_exists(&engine_ctx, &table_reference)); + assert!(table_engine.table_exists(&engine_ctx, table_id)); } #[tokio::test] diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index adf0196302..b23cd8d4f5 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -28,7 +28,7 @@ use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::engine::{EngineContext, TableEngine}; -use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; +use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::{ AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, InsertRequest, TableOptions, }; @@ -39,6 +39,7 @@ use crate::engine::{MitoEngine, MITO_ENGINE}; pub use crate::table::test_util::mock_engine::{MockEngine, MockRegion}; pub const TABLE_NAME: &str = "demo"; +pub const TABLE_ID: TableId = 1; /// Create a InsertRequest with default catalog and schema. pub fn new_insert_request( @@ -107,7 +108,7 @@ pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) { pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { CreateTableRequest { - id: 1, + id: TABLE_ID, catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TABLE_NAME.to_string(), @@ -126,6 +127,7 @@ pub fn new_alter_request(alter_kind: AlterKind) -> AlterTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TABLE_NAME.to_string(), + table_id: TABLE_ID, alter_kind, } } @@ -135,6 +137,7 @@ pub fn new_drop_request() -> DropTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TABLE_NAME.to_string(), + table_id: TABLE_ID, } } diff --git a/src/table-procedure/src/alter.rs b/src/table-procedure/src/alter.rs index f96f99e9ac..9674dbd6be 100644 --- a/src/table-procedure/src/alter.rs +++ b/src/table-procedure/src/alter.rs @@ -315,13 +315,14 @@ mod tests { async fn test_alter_table_procedure_rename() { let env = TestEnv::new("rename"); let table_name = "test_old"; - env.create_table(table_name).await; + let table_id = env.create_table(table_name).await; let new_table_name = "test_new"; let request = AlterTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), + table_id, alter_kind: AlterKind::RenameTable { new_table_name: new_table_name.to_string(), }, diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs index 6fb8da5e59..e6ab7ef210 100644 --- a/src/table-procedure/src/create.rs +++ b/src/table-procedure/src/create.rs @@ -349,14 +349,9 @@ mod tests { table_engine.clone(), ); - let table_ref = TableReference { - catalog: &request.catalog_name, - schema: &request.schema_name, - table: &request.table_name, - }; let engine_ctx = EngineContext::default(); assert!(table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, request.id) .unwrap() .is_none()); @@ -365,7 +360,7 @@ mod tests { watcher.changed().await.unwrap(); assert!(table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, request.id) .unwrap() .is_some()); } @@ -390,14 +385,9 @@ mod tests { table_engine.clone(), ); - let table_ref = TableReference { - catalog: &request.catalog_name, - schema: &request.schema_name, - table: &request.table_name, - }; let engine_ctx = EngineContext::default(); assert!(table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, request.id) .unwrap() .is_none()); @@ -452,7 +442,7 @@ mod tests { // The table is created. assert!(table_engine - .get_table(&engine_ctx, &table_ref) + .get_table(&engine_ctx, request.id) .unwrap() .is_some()); } diff --git a/src/table-procedure/src/drop.rs b/src/table-procedure/src/drop.rs index 9a8c5314b4..cfd8c4fa9f 100644 --- a/src/table-procedure/src/drop.rs +++ b/src/table-procedure/src/drop.rs @@ -277,12 +277,13 @@ mod tests { async fn test_drop_table_procedure() { let env = TestEnv::new("drop"); let table_name = "test_drop"; - env.create_table(table_name).await; + let table_id = env.create_table(table_name).await; let request = DropTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), + table_id, }; let TestEnv { dir: _dir, @@ -305,13 +306,6 @@ mod tests { let schema = catalog.schema(DEFAULT_SCHEMA_NAME).await.unwrap().unwrap(); assert!(schema.table(table_name).await.unwrap().is_none()); let ctx = EngineContext::default(); - assert!(!table_engine.table_exists( - &ctx, - &TableReference { - catalog: DEFAULT_CATALOG_NAME, - schema: DEFAULT_SCHEMA_NAME, - table: table_name, - } - )); + assert!(!table_engine.table_exists(&ctx, table_id,)); } } diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs index 66e99b1936..7e3b028d05 100644 --- a/src/table-procedure/src/test_util.rs +++ b/src/table-procedure/src/test_util.rs @@ -32,6 +32,7 @@ use object_store::ObjectStore; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; +use table::metadata::TableId; use table::requests::CreateTableRequest; use crate::CreateTableProcedure; @@ -92,8 +93,9 @@ impl TestEnv { } } - pub async fn create_table(&self, table_name: &str) { + pub async fn create_table(&self, table_name: &str) -> TableId { let request = new_create_request(table_name); + let table_id = request.id; let procedure = CreateTableProcedure::new( request, self.catalog_manager.clone(), @@ -108,6 +110,8 @@ impl TestEnv { .await .unwrap(); watcher.changed().await.unwrap(); + + table_id } } diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index d0fc69cf21..5b4115c6b2 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -108,14 +108,10 @@ pub trait TableEngine: Send + Sync { ) -> Result; /// Returns the table by it's name. - fn get_table( - &self, - ctx: &EngineContext, - table_ref: &TableReference, - ) -> Result>; + fn get_table(&self, ctx: &EngineContext, table_id: TableId) -> Result>; /// Returns true when the given table is exists. - fn table_exists(&self, ctx: &EngineContext, table_ref: &TableReference) -> bool; + fn table_exists(&self, ctx: &EngineContext, table_id: TableId) -> bool; /// Drops the given table. Return true if the table is dropped, or false if the table doesn't exist. async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result; diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 57fbcb3a1c..7e76ad8ba2 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -162,6 +162,7 @@ pub struct AlterTableRequest { pub catalog_name: String, pub schema_name: String, pub table_name: String, + pub table_id: TableId, pub alter_kind: AlterKind, } @@ -200,6 +201,7 @@ pub struct DropTableRequest { pub catalog_name: String, pub schema_name: String, pub table_name: String, + pub table_id: TableId, } impl DropTableRequest { @@ -218,6 +220,7 @@ pub struct CloseTableRequest { pub catalog_name: String, pub schema_name: String, pub table_name: String, + pub table_id: TableId, /// Do nothing if region_numbers is empty pub region_numbers: Vec, /// flush regions diff --git a/src/table/src/test_util/mock_engine.rs b/src/table/src/test_util/mock_engine.rs index a59c04548a..741af85f6a 100644 --- a/src/table/src/test_util/mock_engine.rs +++ b/src/table/src/test_util/mock_engine.rs @@ -19,7 +19,8 @@ use async_trait::async_trait; use common_procedure::BoxedProcedure; use tokio::sync::Mutex; -use crate::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference}; +use crate::engine::{EngineContext, TableEngine, TableEngineProcedure}; +use crate::metadata::TableId; use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use crate::test_util::EmptyTable; use crate::{Result, TableRef}; @@ -86,11 +87,11 @@ impl TableEngine for MockTableEngine { unimplemented!() } - fn get_table(&self, _ctx: &EngineContext, _ref: &TableReference) -> Result> { + fn get_table(&self, _ctx: &EngineContext, _table_id: TableId) -> Result> { unimplemented!() } - fn table_exists(&self, _ctx: &EngineContext, _name: &TableReference) -> bool { + fn table_exists(&self, _ctx: &EngineContext, _table_id: TableId) -> bool { unimplemented!() }