From 0819582a26f02aad0cedd48f76a19d508a560af5 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 13 Apr 2023 14:05:53 +0800 Subject: [PATCH] feat: Add alter table procedure (#1354) * feat: Implement AlterTableProcedure * test: Test alter table procedure * feat: support alter table by procedure in datanode * chore: update comment --- Cargo.lock | 1 + src/datanode/src/sql.rs | 2 + src/datanode/src/sql/alter.rs | 69 +++++- src/table-procedure/Cargo.toml | 1 + src/table-procedure/src/alter.rs | 343 +++++++++++++++++++++++++++ src/table-procedure/src/create.rs | 54 +---- src/table-procedure/src/error.rs | 11 +- src/table-procedure/src/lib.rs | 7 +- src/table-procedure/src/test_util.rs | 63 ++++- 9 files changed, 499 insertions(+), 52 deletions(-) create mode 100644 src/table-procedure/src/alter.rs diff --git a/Cargo.lock b/Cargo.lock index 404723b51b..ff82495aeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7876,6 +7876,7 @@ version = "0.1.1" dependencies = [ "async-trait", "catalog", + "common-catalog", "common-error", "common-procedure", "common-telemetry", diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 67547d7e9a..c78e4a7f76 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -54,6 +54,8 @@ pub enum SqlRequest { pub struct SqlHandler { table_engine_manager: TableEngineManagerRef, catalog_manager: CatalogManagerRef, + // TODO(yingwen): Support multiple table engine. We need to add a method + // to TableEngineManagerRef to return engine procedure by engine name. engine_procedure: TableEngineProcedureRef, procedure_manager: Option, } diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index f9159f0de1..a23dac6b32 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -13,18 +13,25 @@ // limitations under the License. use catalog::RenameTableRequest; +use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId}; use common_query::Output; +use common_telemetry::logging::info; use snafu::prelude::*; use sql::statements::alter::{AlterTable, AlterTableOperation}; use sql::statements::column_def_to_schema; use table::engine::{EngineContext, TableReference}; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest}; +use table_procedure::AlterTableProcedure; use crate::error::{self, Result}; use crate::sql::SqlHandler; impl SqlHandler { pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result { + if let Some(procedure_manager) = &self.procedure_manager { + return self.alter_table_by_procedure(procedure_manager, req).await; + } + let ctx = EngineContext {}; let table_name = req.table_name.clone(); let table_ref = TableReference { @@ -71,6 +78,33 @@ impl SqlHandler { Ok(Output::AffectedRows(0)) } + pub(crate) async fn alter_table_by_procedure( + &self, + procedure_manager: &ProcedureManagerRef, + req: AlterTableRequest, + ) -> Result { + let table_name = req.table_name.clone(); + let procedure = AlterTableProcedure::new( + req, + self.catalog_manager.clone(), + self.engine_procedure.clone(), + ); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; + + info!("Alter table {} by procedure {}", table_name, procedure_id); + + let mut watcher = procedure_manager + .submit(procedure_with_id) + .await + .context(error::SubmitProcedureSnafu { procedure_id })?; + + watcher::wait(&mut watcher) + .await + .context(error::WaitProcedureSnafu { procedure_id })?; + Ok(Output::AffectedRows(0)) + } + pub(crate) fn alter_to_request( &self, alter_table: AlterTable, @@ -112,12 +146,14 @@ mod tests { use std::assert_matches::assert_matches; use datatypes::prelude::ConcreteDataType; + use query::parser::QueryLanguageParser; + use session::context::QueryContext; use sql::dialect::GenericDialect; use sql::parser::ParserContext; use sql::statements::statement::Statement; use super::*; - use crate::tests::test_util::create_mock_sql_handler; + use crate::tests::test_util::{create_mock_sql_handler, MockInstance}; fn parse_sql(sql: &str) -> AlterTable { let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); @@ -182,4 +218,35 @@ mod tests { _ => unreachable!(), } } + + #[tokio::test(flavor = "multi_thread")] + async fn alter_table_by_procedure() { + let instance = MockInstance::with_procedure_enabled("alter_table_by_procedure").await; + + // Create table first. + let sql = r#"create table test_alter( + host string, + ts timestamp, + cpu double default 0, + TIME INDEX (ts), + PRIMARY KEY(host) + ) engine=mito with(regions=1);"#; + let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); + let output = instance + .inner() + .execute_stmt(stmt, QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + // Alter table. + let sql = r#"alter table test_alter add column memory double"#; + let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); + let output = instance + .inner() + .execute_stmt(stmt, QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + } } diff --git a/src/table-procedure/Cargo.toml b/src/table-procedure/Cargo.toml index d37d98ac3b..e1927afd2e 100644 --- a/src/table-procedure/Cargo.toml +++ b/src/table-procedure/Cargo.toml @@ -17,6 +17,7 @@ snafu.workspace = true table = { path = "../table" } [dev-dependencies] +common-catalog = { path = "../common/catalog" } common-test-util = { path = "../common/test-util" } log-store = { path = "../log-store" } mito = { path = "../mito" } diff --git a/src/table-procedure/src/alter.rs b/src/table-procedure/src/alter.rs new file mode 100644 index 0000000000..81cf202edd --- /dev/null +++ b/src/table-procedure/src/alter.rs @@ -0,0 +1,343 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Procedure to alter a table. + +use async_trait::async_trait; +use catalog::{CatalogManagerRef, RenameTableRequest}; +use common_procedure::{ + Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, + ProcedureWithId, Result, Status, +}; +use common_telemetry::logging; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; +use table::engine::{EngineContext, TableEngineProcedureRef, TableReference}; +use table::metadata::TableId; +use table::requests::{AlterKind, AlterTableRequest}; + +use crate::error::{ + AccessCatalogSnafu, CatalogNotFoundSnafu, DeserializeProcedureSnafu, SchemaNotFoundSnafu, + SerializeProcedureSnafu, SubprocedureFailedSnafu, TableExistsSnafu, TableNotFoundSnafu, +}; + +pub struct AlterTableProcedure { + data: AlterTableData, + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, +} + +#[async_trait] +impl Procedure for AlterTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, ctx: &Context) -> Result { + match self.data.state { + AlterTableState::Prepare => self.on_prepare().await, + AlterTableState::EngineAlterTable => self.on_engine_alter_table(ctx).await, + AlterTableState::RenameInCatalog => self.on_rename_in_catalog().await, + } + } + + fn dump(&self) -> Result { + let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?; + Ok(json) + } + + fn lock_key(&self) -> LockKey { + // We lock the whole table. + let table_name = self.data.table_ref().to_string(); + // If alter kind is rename, we also need to lock the renamed table. + if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { + let new_table_name = TableReference { + catalog: &self.data.request.catalog_name, + schema: &self.data.request.schema_name, + table: new_table_name, + } + .to_string(); + LockKey::new([table_name, new_table_name]) + } else { + LockKey::single(table_name) + } + } +} + +impl AlterTableProcedure { + const TYPE_NAME: &str = "table-procedure:AlterTableProcedure"; + + /// Returns a new [AlterTableProcedure]. + pub fn new( + request: AlterTableRequest, + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, + ) -> AlterTableProcedure { + AlterTableProcedure { + data: AlterTableData { + state: AlterTableState::Prepare, + request, + subprocedure_id: None, + table_id: None, + }, + catalog_manager, + engine_procedure, + } + } + + /// Register the loader of this procedure to the `procedure_manager`. + /// + /// # Panics + /// Panics on error. + pub fn register_loader( + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, + procedure_manager: &dyn ProcedureManager, + ) { + procedure_manager + .register_loader( + Self::TYPE_NAME, + Box::new(move |data| { + Self::from_json(data, catalog_manager.clone(), engine_procedure.clone()) + .map(|p| Box::new(p) as _) + }), + ) + .unwrap() + } + + /// Recover the procedure from json. + fn from_json( + json: &str, + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, + ) -> Result { + let data: AlterTableData = serde_json::from_str(json).context(DeserializeProcedureSnafu)?; + + Ok(AlterTableProcedure { + data, + catalog_manager, + engine_procedure, + }) + } + + async fn on_prepare(&mut self) -> Result { + // Check whether catalog and schema exist. + let catalog = self + .catalog_manager + .catalog(&self.data.request.catalog_name) + .context(AccessCatalogSnafu)? + .with_context(|| CatalogNotFoundSnafu { + name: &self.data.request.catalog_name, + })?; + let schema = catalog + .schema(&self.data.request.schema_name) + .context(AccessCatalogSnafu)? + .with_context(|| SchemaNotFoundSnafu { + name: &self.data.request.schema_name, + })?; + + let table = schema + .table(&self.data.request.table_name) + .await + .context(AccessCatalogSnafu)? + .context(TableNotFoundSnafu { + name: &self.data.request.table_name, + })?; + if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { + ensure!( + !schema + .table_exist(new_table_name) + .context(AccessCatalogSnafu)?, + TableExistsSnafu { + name: new_table_name, + } + ); + } + + self.data.state = AlterTableState::EngineAlterTable; + // Assign procedure id to the subprocedure. + self.data.subprocedure_id = Some(ProcedureId::random()); + // Set the table id. + self.data.table_id = Some(table.table_info().ident.table_id); + + Ok(Status::executing(true)) + } + + async fn on_engine_alter_table(&mut self, ctx: &Context) -> Result { + // Safety: subprocedure id is always set in this state. + let sub_id = self.data.subprocedure_id.unwrap(); + + // Query subprocedure state. + let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else { + logging::info!( + "On engine alter table {}, subprocedure not found, sub_id: {}", + self.data.request.table_name, + sub_id + ); + + // If the subprocedure is not found, we create a new subprocedure with the same id. + let engine_ctx = EngineContext::default(); + let procedure = self + .engine_procedure + .alter_table_procedure(&engine_ctx, self.data.request.clone()) + .map_err(Error::from_error_ext)?; + return Ok(Status::Suspended { + subprocedures: vec![ProcedureWithId { + id: sub_id, + procedure, + }], + persist: true, + }); + }; + + match sub_state { + ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended { + subprocedures: Vec::new(), + persist: false, + }), + ProcedureState::Done => { + logging::info!( + "On engine alter table {}, done, sub_id: {}", + self.data.request.table_name, + sub_id + ); + // The sub procedure is done, we can execute next step. + if self.data.request.is_rename_table() { + // We also need to rename the table in the catalog. + self.data.state = AlterTableState::RenameInCatalog; + Ok(Status::executing(true)) + } else { + // If this isn't a rename operation, we are done. + Ok(Status::Done) + } + } + ProcedureState::Failed { .. } => { + // Return error if the subprocedure is failed. + SubprocedureFailedSnafu { + subprocedure_id: sub_id, + } + .fail()? + } + } + } + + async fn on_rename_in_catalog(&mut self) -> Result { + // Safety: table id is available in this state. + let table_id = self.data.table_id.unwrap(); + if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { + let rename_req = RenameTableRequest { + catalog: self.data.request.catalog_name.clone(), + schema: self.data.request.schema_name.clone(), + table_name: self.data.request.table_name.clone(), + new_table_name: new_table_name.clone(), + table_id, + }; + + self.catalog_manager + .rename_table(rename_req) + .await + .map_err(Error::from_error_ext)?; + } + + Ok(Status::Done) + } +} + +/// Represents each step while altering a table in the datanode. +#[derive(Debug, Serialize, Deserialize)] +enum AlterTableState { + /// Validate request and prepare to alter table. + Prepare, + /// Alter table in the table engine. + EngineAlterTable, + /// Rename the table in the catalog (optional). + RenameInCatalog, +} + +/// Serializable data of [AlterTableProcedure]. +#[derive(Debug, Serialize, Deserialize)] +struct AlterTableData { + /// Current state. + state: AlterTableState, + /// Request to alter this table. + request: AlterTableRequest, + /// Id of the table. + /// + /// Available after [AlterTableState::Prepare] state. + table_id: Option, + /// Id of the subprocedure to create this table in the engine. + /// + /// This id is `Some` while the procedure is in [AlterTableState::EngineAlterTable] + /// state. + subprocedure_id: Option, +} + +impl AlterTableData { + fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.request.catalog_name, + schema: &self.request.schema_name, + table: &self.request.table_name, + } + } +} + +#[cfg(test)] +mod tests { + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + + use super::*; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn test_alter_table_procedure_rename() { + let env = TestEnv::new("rename"); + let table_name = "test_old"; + env.create_table(table_name).await; + + let new_table_name = "test_new"; + let request = AlterTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + alter_kind: AlterKind::RenameTable { + new_table_name: new_table_name.to_string(), + }, + }; + + let TestEnv { + dir: _dir, + table_engine, + procedure_manager, + catalog_manager, + } = env; + let procedure = + AlterTableProcedure::new(request, catalog_manager.clone(), table_engine.clone()); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap(); + watcher.changed().await.unwrap(); + + let catalog = catalog_manager + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .unwrap(); + let schema = catalog.schema(DEFAULT_SCHEMA_NAME).unwrap().unwrap(); + let table = schema.table(new_table_name).await.unwrap().unwrap(); + let table_info = table.table_info(); + assert_eq!(new_table_name, table_info.name); + + assert!(schema.table(table_name).await.unwrap().is_none()); + } +} diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs index 62b686b4ff..c81201987c 100644 --- a/src/table-procedure/src/create.rs +++ b/src/table-procedure/src/create.rs @@ -66,7 +66,7 @@ impl Procedure for CreateTableProcedure { } impl CreateTableProcedure { - const TYPE_NAME: &str = "table-procedures::CreateTableProcedure"; + const TYPE_NAME: &str = "table-procedure::CreateTableProcedure"; /// Returns a new [CreateTableProcedure]. pub fn new( @@ -176,7 +176,7 @@ impl CreateTableProcedure { // do this check as we might not submitted the subprocedure yet when the manager // recover this procedure from procedure store. logging::info!( - "On engine create table {}, not found, sub_id: {}", + "On engine create table {}, subprocedure not found, sub_id: {}", self.data.request.table_name, sub_id ); @@ -186,7 +186,7 @@ impl CreateTableProcedure { let procedure = self .engine_procedure .create_table_procedure(&engine_ctx, self.data.request.clone()) - .map_err(Error::external)?; + .map_err(Error::from_error_ext)?; return Ok(Status::Suspended { subprocedures: vec![ProcedureWithId { id: sub_id, @@ -238,7 +238,7 @@ impl CreateTableProcedure { let table_exists = schema .table(&self.data.request.table_name) .await - .map_err(Error::external)? + .map_err(Error::from_error_ext)? .is_some(); if table_exists { // Table already exists. @@ -251,7 +251,7 @@ impl CreateTableProcedure { let table = self .table_engine .get_table(&engine_ctx, &table_ref) - .map_err(Error::external)? + .map_err(Error::from_error_ext)? .unwrap(); let register_req = RegisterTableRequest { @@ -264,7 +264,7 @@ impl CreateTableProcedure { self.catalog_manager .register_table(register_req) .await - .map_err(Error::external)?; + .map_err(Error::from_error_ext)?; Ok(Status::Done) } @@ -307,48 +307,10 @@ impl CreateTableData { #[cfg(test)] mod tests { - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, RawSchema}; - use mito::engine::MITO_ENGINE; use table::engine::{EngineContext, TableEngine}; use super::*; - use crate::test_util::TestEnv; - - fn schema_for_test() -> RawSchema { - let column_schemas = vec![ - // Key - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - // Nullable value column: cpu - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - // Non-null value column: memory - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), false), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true), - ]; - - RawSchema::new(column_schemas) - } - - fn new_create_request(table_name: &str) -> CreateTableRequest { - CreateTableRequest { - id: 1, - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_name.to_string(), - desc: Some("a test table".to_string()), - schema: schema_for_test(), - region_numbers: vec![0, 1], - create_if_not_exists: true, - primary_key_indices: vec![0], - table_options: Default::default(), - engine: MITO_ENGINE.to_string(), - } - } + use crate::test_util::{self, TestEnv}; #[tokio::test] async fn test_create_table_procedure() { @@ -360,7 +322,7 @@ mod tests { } = TestEnv::new("create"); let table_name = "test_create"; - let request = new_create_request(table_name); + let request = test_util::new_create_request(table_name); let procedure = CreateTableProcedure::new( request.clone(), catalog_manager, diff --git a/src/table-procedure/src/error.rs b/src/table-procedure/src/error.rs index 84bc49a542..9ed582a6a6 100644 --- a/src/table-procedure/src/error.rs +++ b/src/table-procedure/src/error.rs @@ -51,11 +51,17 @@ pub enum Error { #[snafu(display("Schema {} not found", name))] SchemaNotFound { name: String }, + #[snafu(display("Table {} not found", name))] + TableNotFound { name: String }, + #[snafu(display("Subprocedure {} failed", subprocedure_id))] SubprocedureFailed { subprocedure_id: ProcedureId, location: Location, }, + + #[snafu(display("Table {} already exists", name))] + TableExists { name: String }, } pub type Result = std::result::Result; @@ -70,7 +76,10 @@ impl ErrorExt for Error { } InvalidRawSchema { source, .. } => source.status_code(), AccessCatalog { source } => source.status_code(), - CatalogNotFound { .. } | SchemaNotFound { .. } => StatusCode::InvalidArguments, + CatalogNotFound { .. } | SchemaNotFound { .. } | TableExists { .. } => { + StatusCode::InvalidArguments + } + TableNotFound { .. } => StatusCode::TableNotFound, } } diff --git a/src/table-procedure/src/lib.rs b/src/table-procedure/src/lib.rs index 9915376808..7c1b7eaeac 100644 --- a/src/table-procedure/src/lib.rs +++ b/src/table-procedure/src/lib.rs @@ -14,11 +14,13 @@ //! Procedures for table operations. +mod alter; mod create; pub mod error; #[cfg(test)] mod test_util; +pub use alter::AlterTableProcedure; use catalog::CatalogManagerRef; use common_procedure::ProcedureManager; pub use create::CreateTableProcedure; @@ -35,9 +37,10 @@ pub fn register_procedure_loaders( procedure_manager: &dyn ProcedureManager, ) { CreateTableProcedure::register_loader( - catalog_manager, - engine_procedure, + catalog_manager.clone(), + engine_procedure.clone(), table_engine, procedure_manager, ); + AlterTableProcedure::register_loader(catalog_manager, engine_procedure, procedure_manager); } diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs index c86433284e..59615a1a78 100644 --- a/src/table-procedure/src/test_util.rs +++ b/src/table-procedure/src/test_util.rs @@ -17,18 +17,24 @@ use std::time::Duration; use catalog::local::MemoryCatalogManager; use catalog::CatalogManagerRef; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::store::state_store::ObjectStateStore; -use common_procedure::ProcedureManagerRef; +use common_procedure::{ProcedureManagerRef, ProcedureWithId}; use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, RawSchema}; use log_store::NoopLogStore; use mito::config::EngineConfig; -use mito::engine::MitoEngine; +use mito::engine::{MitoEngine, MITO_ENGINE}; use object_store::services::Fs; use object_store::ObjectStore; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; +use table::requests::CreateTableRequest; + +use crate::CreateTableProcedure; pub struct TestEnv { pub dir: TempDir, @@ -79,4 +85,57 @@ impl TestEnv { catalog_manager, } } + + pub async fn create_table(&self, table_name: &str) { + let request = new_create_request(table_name); + let procedure = CreateTableProcedure::new( + request, + self.catalog_manager.clone(), + self.table_engine.clone(), + self.table_engine.clone(), + ); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let mut watcher = self + .procedure_manager + .submit(procedure_with_id) + .await + .unwrap(); + watcher.changed().await.unwrap(); + } +} + +pub fn schema_for_test() -> RawSchema { + let column_schemas = vec![ + // Key + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + // Nullable value column: cpu + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + // Non-null value column: memory + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true), + ]; + + RawSchema::new(column_schemas) +} + +pub fn new_create_request(table_name: &str) -> CreateTableRequest { + CreateTableRequest { + id: 1, + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + desc: Some("a test table".to_string()), + schema: schema_for_test(), + region_numbers: vec![0, 1], + create_if_not_exists: true, + primary_key_indices: vec![0], + table_options: Default::default(), + engine: MITO_ENGINE.to_string(), + } }