From eb50cee6019ae1a45d65213cdec5b5614a0f4622 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 25 Apr 2023 12:04:02 +0800 Subject: [PATCH] feat: Switch to the procedure framework (#1448) * feat: Remove create_mock_sql_handler() create_to_request() and alter_to_request() don't need `&self`, so we don't need to mock the sql handler to test them * feat: Enable procedure manager by default * docs: Update config example * test: Enable procedure framework in all tests * refactor(datanode): rename methods using procedure * test(catalog): Fix temp dir drops before test finishes * tests: Enable procedure framework in sqlness * test: Fix sqlness standalone rename test * fix: Drop procedure allows table not in engine * test: Change rename table test * fix: add options to table meta when creating table by procedure * test: adjust error message in schema test case * test: Fix test_sql_api error message --- config/datanode.example.toml | 10 +- config/standalone.example.toml | 19 +-- src/catalog/src/error.rs | 2 +- src/catalog/tests/local_catalog_tests.rs | 17 +- src/cmd/src/datanode.rs | 2 +- src/cmd/src/standalone.rs | 4 +- src/datanode/src/datanode.rs | 4 +- src/datanode/src/instance.rs | 50 +++--- src/datanode/src/instance/sql.rs | 6 +- src/datanode/src/sql.rs | 6 +- src/datanode/src/sql/alter.rs | 95 ++--------- src/datanode/src/sql/create.rs | 156 +++++------------- src/datanode/src/sql/drop_table.rs | 76 +-------- src/datanode/src/tests/test_util.rs | 67 +------- src/frontend/src/error.rs | 2 +- src/frontend/src/tests.rs | 8 +- src/mito/src/engine/procedure/create.rs | 1 + src/mito/src/engine/procedure/drop.rs | 26 +-- src/table-procedure/src/alter.rs | 23 ++- src/table-procedure/src/error.rs | 2 +- tests-integration/src/test_util.rs | 9 +- tests-integration/tests/http.rs | 2 +- .../distributed/alter/rename_table.result | 2 +- .../standalone/alter/rename_table.result | 4 +- .../standalone/common/catalog/schema.result | 4 +- tests/conf/datanode-test.toml.template | 6 + tests/conf/standalone-test.toml.template | 6 + tests/runner/src/env.rs | 2 + 28 files changed, 183 insertions(+), 428 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index ba1d04a1ac..2583448cb5 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -53,8 +53,8 @@ gc_duration = '30s' checkpoint_on_startup = false # Procedure storage options, see `standalone.example.toml`. -# [procedure.store] -# type = "File" -# data_dir = "/tmp/greptimedb/procedure/" -# max_retry_times = 3 -# retry_delay = "500ms" +[procedure.store] +type = "File" +data_dir = "/tmp/greptimedb/procedure/" +max_retry_times = 3 +retry_delay = "500ms" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 35531b3b34..3ba676a288 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -118,13 +118,12 @@ gc_duration = '30s' checkpoint_on_startup = false # Procedure storage options. -# Uncomment to enable. -# [procedure.store] -# # Storage type. -# type = "File" -# # Procedure data path. -# data_dir = "/tmp/greptimedb/procedure/" -# # Procedure max retry time. -# max_retry_times = 3 -# # Initial retry delay of procedures, increases exponentially -# retry_delay = "500ms" +[procedure.store] +# Storage type. +type = "File" +# Procedure data path. +data_dir = "/tmp/greptimedb/procedure/" +# Procedure max retry time. +max_retry_times = 3 +# Initial retry delay of procedures, increases exponentially +retry_delay = "500ms" diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 6e65b57166..62e8e203a2 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -106,7 +106,7 @@ pub enum Error { #[snafu(display("Table `{}` already exists", table))] TableExists { table: String, location: Location }, - #[snafu(display("Table `{}` not exist", table))] + #[snafu(display("Table not found: {}", table))] TableNotExist { table: String, location: Location }, #[snafu(display("Schema {} already exists", schema))] diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs index ab125f741d..8aec5d80f1 100644 --- a/src/catalog/tests/local_catalog_tests.rs +++ b/src/catalog/tests/local_catalog_tests.rs @@ -20,14 +20,16 @@ mod tests { use catalog::{CatalogManager, RegisterTableRequest, RenameTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_telemetry::{error, info}; + use common_test_util::temp_dir::TempDir; use mito::config::EngineConfig; use table::engine::manager::MemoryTableEngineManager; use table::table::numbers::NumbersTable; use table::TableRef; use tokio::sync::Mutex; - async fn create_local_catalog_manager() -> Result { - let (_dir, object_store) = + async fn create_local_catalog_manager( + ) -> Result<(TempDir, LocalCatalogManager), catalog::error::Error> { + let (dir, object_store) = mito::table::test_util::new_test_object_store("setup_mock_engine_and_table").await; let mock_engine = Arc::new(mito::table::test_util::MockMitoEngine::new( EngineConfig::default(), @@ -37,13 +39,13 @@ mod tests { let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone())); let catalog_manager = LocalCatalogManager::try_new(engine_manager).await.unwrap(); catalog_manager.start().await?; - Ok(catalog_manager) + Ok((dir, catalog_manager)) } #[tokio::test] async fn test_rename_table() { common_telemetry::init_default_ut_logging(); - let catalog_manager = create_local_catalog_manager().await.unwrap(); + let (_dir, catalog_manager) = create_local_catalog_manager().await.unwrap(); // register table let table_name = "test_table"; let table_id = 42; @@ -81,7 +83,7 @@ mod tests { #[tokio::test] async fn test_duplicate_register() { - let catalog_manager = create_local_catalog_manager().await.unwrap(); + let (_dir, catalog_manager) = create_local_catalog_manager().await.unwrap(); let request = RegisterTableRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), @@ -118,8 +120,9 @@ mod tests { fn test_concurrent_register() { common_telemetry::init_default_ut_logging(); let rt = Arc::new(tokio::runtime::Builder::new_multi_thread().build().unwrap()); - let catalog_manager = - Arc::new(rt.block_on(async { create_local_catalog_manager().await.unwrap() })); + let (_dir, catalog_manager) = + rt.block_on(async { create_local_catalog_manager().await.unwrap() }); + let catalog_manager = Arc::new(catalog_manager); let succeed: Arc>> = Arc::new(Mutex::new(None)); diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index e992e3fd2d..ced365c6bc 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -159,7 +159,7 @@ impl TryFrom for DatanodeOptions { opts.wal.dir = wal_dir; } if let Some(procedure_dir) = cmd.procedure_dir { - opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir)); + opts.procedure = ProcedureConfig::from_file_path(procedure_dir); } if let Some(http_addr) = cmd.http_addr { opts.http_opts.addr = http_addr diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 30fed4775f..2148b5a097 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -81,7 +81,7 @@ pub struct StandaloneOptions { pub prom_options: Option, pub wal: WalConfig, pub storage: StorageConfig, - pub procedure: Option, + pub procedure: ProcedureConfig, } impl Default for StandaloneOptions { @@ -99,7 +99,7 @@ impl Default for StandaloneOptions { prom_options: Some(PromOptions::default()), wal: WalConfig::default(), storage: StorageConfig::default(), - procedure: None, + procedure: ProcedureConfig::default(), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index fbf47f7d83..578a14371d 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -237,7 +237,7 @@ pub struct DatanodeOptions { pub meta_client_options: Option, pub wal: WalConfig, pub storage: StorageConfig, - pub procedure: Option, + pub procedure: ProcedureConfig, } impl Default for DatanodeOptions { @@ -255,7 +255,7 @@ impl Default for DatanodeOptions { meta_client_options: None, wal: WalConfig::default(), storage: StorageConfig::default(), - procedure: None, + procedure: ProcedureConfig::default(), } } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 93fcebf8e8..78ff946445 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -76,7 +76,7 @@ pub struct Instance { pub(crate) catalog_manager: CatalogManagerRef, pub(crate) table_id_provider: Option, pub(crate) heartbeat_task: Option, - procedure_manager: Option, + procedure_manager: ProcedureManagerRef, } pub type InstanceRef = Arc; @@ -208,20 +208,17 @@ impl Instance { let procedure_manager = create_procedure_manager(&opts.procedure).await?; // Register all procedures. - if let Some(procedure_manager) = &procedure_manager { - // Register procedures of the mito engine. - mito_engine.register_procedure_loaders(&**procedure_manager); - immutable_file_engine.register_procedure_loaders(&**procedure_manager); - // Register procedures in table-procedure crate. - table_procedure::register_procedure_loaders( - catalog_manager.clone(), - mito_engine.clone(), - mito_engine.clone(), - &**procedure_manager, - ); - // TODO(yingwen): Register procedures of the file table engine once #1372 - // is ready. - } + // Register procedures of the mito engine. + mito_engine.register_procedure_loaders(&*procedure_manager); + // Register procedures of the file table engine. + immutable_file_engine.register_procedure_loaders(&*procedure_manager); + // Register procedures in table-procedure crate. + table_procedure::register_procedure_loaders( + catalog_manager.clone(), + mito_engine.clone(), + mito_engine.clone(), + &*procedure_manager, + ); Ok(Self { query_engine: query_engine.clone(), @@ -248,12 +245,10 @@ impl Instance { // Recover procedures after the catalog manager is started, so we can // ensure we can access all tables from the catalog manager. - if let Some(procedure_manager) = &self.procedure_manager { - procedure_manager - .recover() - .await - .context(RecoverProcedureSnafu)?; - } + self.procedure_manager + .recover() + .await + .context(RecoverProcedureSnafu)?; Ok(()) } @@ -547,12 +542,8 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result, -) -> Result> { - let Some(procedure_config) = procedure_config else { - return Ok(None); - }; - + procedure_config: &ProcedureConfig, +) -> Result { info!( "Creating procedure manager with config: {:?}", procedure_config @@ -566,8 +557,5 @@ pub(crate) async fn create_procedure_manager( retry_delay: procedure_config.retry_delay, }; - Ok(Some(Arc::new(LocalManager::new( - manager_config, - state_store, - )))) + Ok(Arc::new(LocalManager::new(manager_config, state_store))) } diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index fb85adb359..d38e0390b1 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -73,9 +73,7 @@ impl Instance { let name = create_table.name.clone(); let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?; let table_ref = TableReference::full(&catalog, &schema, &table); - let request = - self.sql_handler - .create_to_request(table_id, create_table, &table_ref)?; + let request = SqlHandler::create_to_request(table_id, create_table, &table_ref)?; let table_id = request.id; info!("Creating table: {table_ref}, table id = {table_id}",); @@ -108,7 +106,7 @@ impl Instance { let name = alter_table.table_name().clone(); let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?; let table_ref = TableReference::full(&catalog, &schema, &table); - let req = self.sql_handler.alter_to_request(alter_table, table_ref)?; + let req = SqlHandler::alter_to_request(alter_table, table_ref)?; self.sql_handler .execute(SqlRequest::Alter(req), query_ctx) .await diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 454c1d6ddd..854b7261c5 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -51,14 +51,14 @@ pub enum SqlRequest { pub struct SqlHandler { table_engine_manager: TableEngineManagerRef, catalog_manager: CatalogManagerRef, - procedure_manager: Option, + procedure_manager: ProcedureManagerRef, } impl SqlHandler { pub fn new( table_engine_manager: TableEngineManagerRef, catalog_manager: CatalogManagerRef, - procedure_manager: Option, + procedure_manager: ProcedureManagerRef, ) -> Self { Self { table_engine_manager, @@ -75,7 +75,7 @@ impl SqlHandler { let result = match request { SqlRequest::CreateTable(req) => self.create_table(req).await, SqlRequest::CreateDatabase(req) => self.create_database(req, query_ctx.clone()).await, - SqlRequest::Alter(req) => self.alter(req).await, + SqlRequest::Alter(req) => self.alter_table(req).await, SqlRequest::DropTable(req) => self.drop_table(req).await, SqlRequest::FlushTable(req) => self.flush_table(req).await, }; diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index bb93b4fa7c..88b25c428c 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use catalog::RenameTableRequest; -use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId}; +use common_procedure::{watcher, ProcedureWithId}; use common_query::Output; use common_telemetry::logging::info; use snafu::prelude::*; use sql::statements::alter::{AlterTable, AlterTableOperation}; use sql::statements::column_def_to_schema; -use table::engine::{EngineContext, TableReference}; +use table::engine::TableReference; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest}; use table_procedure::AlterTableProcedure; @@ -27,62 +26,7 @@ use crate::error::{self, Result}; use crate::sql::SqlHandler; impl SqlHandler { - pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result { - if let Some(procedure_manager) = &self.procedure_manager { - return self.alter_table_by_procedure(procedure_manager, req).await; - } - - let ctx = EngineContext {}; - let table_name = req.table_name.clone(); - let table_ref = TableReference { - catalog: &req.catalog_name, - schema: &req.schema_name, - table: &table_name, - }; - - let full_table_name = table_ref.to_string(); - - // fetches table via catalog - let table = self.get_table(&table_ref).await?; - // checks the table engine exist - let table_engine = self.table_engine(table)?; - ensure!( - table_engine.table_exists(&ctx, &table_ref), - error::TableNotFoundSnafu { - table_name: &full_table_name, - } - ); - let is_rename = req.is_rename_table(); - - let table = table_engine - .alter_table(&ctx, req) - .await - .context(error::AlterTableSnafu { - table_name: full_table_name, - })?; - if is_rename { - let table_info = &table.table_info(); - let rename_table_req = RenameTableRequest { - catalog: table_info.catalog_name.clone(), - schema: table_info.schema_name.clone(), - table_name, - new_table_name: table_info.name.clone(), - table_id: table_info.ident.table_id, - }; - self.catalog_manager - .rename_table(rename_table_req) - .await - .context(error::RenameTableSnafu)?; - } - // Tried in MySQL, it really prints "Affected Rows: 0". - Ok(Output::AffectedRows(0)) - } - - pub(crate) async fn alter_table_by_procedure( - &self, - procedure_manager: &ProcedureManagerRef, - req: AlterTableRequest, - ) -> Result { + pub(crate) async fn alter_table(&self, req: AlterTableRequest) -> Result { let table_name = req.table_name.clone(); let table_ref = TableReference { catalog: &req.catalog_name, @@ -100,7 +44,8 @@ impl SqlHandler { info!("Alter table {} by procedure {}", table_name, procedure_id); - let mut watcher = procedure_manager + let mut watcher = self + .procedure_manager .submit(procedure_with_id) .await .context(error::SubmitProcedureSnafu { procedure_id })?; @@ -108,11 +53,11 @@ impl SqlHandler { watcher::wait(&mut watcher) .await .context(error::WaitProcedureSnafu { procedure_id })?; + // Tried in MySQL, it really prints "Affected Rows: 0". Ok(Output::AffectedRows(0)) } pub(crate) fn alter_to_request( - &self, alter_table: AlterTable, table_ref: TableReference, ) -> Result { @@ -160,7 +105,7 @@ mod tests { use sql::statements::statement::Statement; use super::*; - use crate::tests::test_util::{create_mock_sql_handler, MockInstance}; + use crate::tests::test_util::MockInstance; fn parse_sql(sql: &str) -> AlterTable { let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); @@ -175,14 +120,12 @@ mod tests { #[tokio::test] async fn test_alter_to_request_with_adding_column() { - let handler = create_mock_sql_handler().await; let alter_table = parse_sql("ALTER TABLE my_metric_1 ADD tagk_i STRING Null;"); - let req = handler - .alter_to_request( - alter_table, - TableReference::full("greptime", "public", "my_metric_1"), - ) - .unwrap(); + let req = SqlHandler::alter_to_request( + alter_table, + TableReference::full("greptime", "public", "my_metric_1"), + ) + .unwrap(); assert_eq!(req.catalog_name, "greptime"); assert_eq!(req.schema_name, "public"); assert_eq!(req.table_name, "my_metric_1"); @@ -203,14 +146,12 @@ mod tests { #[tokio::test] async fn test_alter_to_request_with_renaming_table() { - let handler = create_mock_sql_handler().await; let alter_table = parse_sql("ALTER TABLE test_table RENAME table_t;"); - let req = handler - .alter_to_request( - alter_table, - TableReference::full("greptime", "public", "test_table"), - ) - .unwrap(); + let req = SqlHandler::alter_to_request( + alter_table, + TableReference::full("greptime", "public", "test_table"), + ) + .unwrap(); assert_eq!(req.catalog_name, "greptime"); assert_eq!(req.schema_name, "public"); assert_eq!(req.table_name, "test_table"); @@ -228,7 +169,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_alter_table_by_procedure() { - let instance = MockInstance::with_procedure_enabled("alter_table_by_procedure").await; + let instance = MockInstance::new("alter_table_by_procedure").await; // Create table first. let sql = r#"create table test_alter( diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 8196fcce92..ef54a183ab 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -14,10 +14,10 @@ use std::collections::HashMap; -use catalog::{RegisterSchemaRequest, RegisterTableRequest}; -use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId}; +use catalog::RegisterSchemaRequest; +use common_procedure::{watcher, ProcedureWithId}; use common_query::Output; -use common_telemetry::tracing::{error, info}; +use common_telemetry::tracing::info; use datatypes::schema::RawSchema; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; @@ -25,17 +25,16 @@ use sql::ast::{ColumnOption, TableConstraint}; use sql::statements::column_def_to_schema; use sql::statements::create::{CreateTable, TIME_INDEX}; use sql::util::to_lowercase_options_map; -use table::engine::{EngineContext, TableReference}; +use table::engine::TableReference; use table::metadata::TableId; use table::requests::*; use table_procedure::CreateTableProcedure; use crate::error::{ - self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu, - EngineProcedureNotFoundSnafu, IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, - KeyColumnNotFoundSnafu, RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, - SubmitProcedureSnafu, TableEngineNotFoundSnafu, UnrecognizedTableOptionSnafu, - WaitProcedureSnafu, + self, CatalogSnafu, ConstraintNotSupportedSnafu, EngineProcedureNotFoundSnafu, + IllegalPrimaryKeysDefSnafu, KeyColumnNotFoundSnafu, RegisterSchemaSnafu, Result, + SchemaExistsSnafu, SubmitProcedureSnafu, TableEngineNotFoundSnafu, + UnrecognizedTableOptionSnafu, WaitProcedureSnafu, }; use crate::sql::SqlHandler; @@ -74,76 +73,6 @@ impl SqlHandler { } pub(crate) async fn create_table(&self, req: CreateTableRequest) -> Result { - if let Some(procedure_manager) = &self.procedure_manager { - return self.create_table_by_procedure(procedure_manager, req).await; - } - - let ctx = EngineContext {}; - // first check if catalog and schema exist - let catalog = self - .catalog_manager - .catalog(&req.catalog_name) - .context(CatalogSnafu)? - .with_context(|| { - error!( - "Failed to create table {}.{}.{}, catalog not found", - &req.catalog_name, &req.schema_name, &req.table_name - ); - CatalogNotFoundSnafu { - name: &req.catalog_name, - } - })?; - catalog - .schema(&req.schema_name) - .context(CatalogSnafu)? - .with_context(|| { - error!( - "Failed to create table {}.{}.{}, schema not found", - &req.catalog_name, &req.schema_name, &req.table_name - ); - SchemaNotFoundSnafu { - name: &req.schema_name, - } - })?; - - // determine catalog and schema from the very beginning - let table_name = req.table_name.clone(); - let table_engine = - self.table_engine_manager - .engine(&req.engine) - .context(TableEngineNotFoundSnafu { - engine_name: &req.engine, - })?; - - let table = table_engine - .create_table(&ctx, req) - .await - .with_context(|_| CreateTableSnafu { - table_name: &table_name, - })?; - - let register_req = RegisterTableRequest { - catalog: table.table_info().catalog_name.clone(), - schema: table.table_info().schema_name.clone(), - table_name: table_name.clone(), - table_id: table.table_info().ident.table_id, - table, - }; - - self.catalog_manager - .register_table(register_req) - .await - .context(InsertSystemCatalogSnafu)?; - info!("Successfully created table: {:?}", table_name); - // TODO(hl): maybe support create multiple tables - Ok(Output::AffectedRows(0)) - } - - pub(crate) async fn create_table_by_procedure( - &self, - procedure_manager: &ProcedureManagerRef, - req: CreateTableRequest, - ) -> Result { let table_name = req.table_name.clone(); let table_engine = self.table_engine_manager @@ -168,7 +97,8 @@ impl SqlHandler { info!("Create table {} by procedure {}", table_name, procedure_id); - let mut watcher = procedure_manager + let mut watcher = self + .procedure_manager .submit(procedure_with_id) .await .context(SubmitProcedureSnafu { procedure_id })?; @@ -182,7 +112,6 @@ impl SqlHandler { /// Converts [CreateTable] to [SqlRequest::CreateTable]. pub(crate) fn create_to_request( - &self, table_id: TableId, stmt: CreateTable, table_ref: &TableReference, @@ -329,7 +258,7 @@ mod tests { use super::*; use crate::error::Error; - use crate::tests::test_util::{create_mock_sql_handler, MockInstance}; + use crate::tests::test_util::MockInstance; fn sql_to_statement(sql: &str) -> CreateTable { let mut res = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); @@ -351,9 +280,7 @@ mod tests { host STRING PRIMARY KEY ) engine=mito with(regions=1, ttl='7days',write_buffer_size='32MB',some='other');"#; let parsed_stmt = sql_to_statement(sql); - let handler = create_mock_sql_handler().await; - let c = handler - .create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) + let c = SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) .unwrap(); assert_eq!(Some(Duration::from_secs(604800)), c.table_options.ttl); @@ -366,7 +293,6 @@ mod tests { #[tokio::test] pub async fn test_create_with_inline_primary_key() { - let handler = create_mock_sql_handler().await; let parsed_stmt = sql_to_statement( r#" CREATE TABLE demo_table( @@ -375,8 +301,7 @@ mod tests { host STRING PRIMARY KEY ) engine=mito with(regions=1);"#, ); - let c = handler - .create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) + let c = SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) .unwrap(); assert_eq!("demo_table", c.table_name); assert_eq!(42, c.id); @@ -386,7 +311,6 @@ mod tests { #[tokio::test] pub async fn test_create_to_request() { - let handler = create_mock_sql_handler().await; let parsed_stmt = sql_to_statement( r#"create table demo_table( host string, @@ -396,8 +320,7 @@ mod tests { TIME INDEX (ts), PRIMARY KEY(host)) engine=mito with(regions=1);"#, ); - let c = handler - .create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) + let c = SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) .unwrap(); assert_eq!("demo_table", c.table_name); assert_eq!(42, c.id); @@ -409,7 +332,6 @@ mod tests { #[tokio::test] pub async fn test_multiple_primary_key_definitions() { - let handler = create_mock_sql_handler().await; let parsed_stmt = sql_to_statement( r#"create table demo_table ( "timestamp" BIGINT TIME INDEX, @@ -417,31 +339,28 @@ mod tests { host STRING PRIMARY KEY, PRIMARY KEY(host)) engine=mito with(regions=1);"#, ); - let error = handler - .create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) - .unwrap_err(); + let error = + SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) + .unwrap_err(); assert_matches!(error, Error::IllegalPrimaryKeysDef { .. }); } #[tokio::test] pub async fn test_multiple_inline_primary_key_definitions() { - let handler = create_mock_sql_handler().await; let parsed_stmt = sql_to_statement( r#"create table demo_table ( "timestamp" BIGINT TIME INDEX, "value" DOUBLE PRIMARY KEY, host STRING PRIMARY KEY) engine=mito with(regions=1);"#, ); - let error = handler - .create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) - .unwrap_err(); + let error = + SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) + .unwrap_err(); assert_matches!(error, Error::IllegalPrimaryKeysDef { .. }); } #[tokio::test] pub async fn test_primary_key_not_specified() { - let handler = create_mock_sql_handler().await; - let parsed_stmt = sql_to_statement( r#"create table demo_table( host string, @@ -450,8 +369,7 @@ mod tests { memory double, TIME INDEX (ts)) engine=mito with(regions=1);"#, ); - let c = handler - .create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) + let c = SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) .unwrap(); assert!(c.primary_key_indices.is_empty()); assert_eq!(c.schema.timestamp_index, Some(1)); @@ -460,17 +378,15 @@ mod tests { /// Constraints specified, not column cannot be found. #[tokio::test] pub async fn test_key_not_found() { - let handler = create_mock_sql_handler().await; - let parsed_stmt = sql_to_statement( r#"create table demo_table( host string, TIME INDEX (ts)) engine=mito with(regions=1);"#, ); - let error = handler - .create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) - .unwrap_err(); + let error = + SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) + .unwrap_err(); assert_matches!(error, Error::KeyColumnNotFound { .. }); } @@ -488,11 +404,12 @@ mod tests { ", ); - let handler = create_mock_sql_handler().await; - - let error = handler - .create_to_request(42, create_table, &TableReference::full("c", "s", "demo")) - .unwrap_err(); + let error = SqlHandler::create_to_request( + 42, + create_table, + &TableReference::full("c", "s", "demo"), + ) + .unwrap_err(); assert_matches!(error, Error::IllegalPrimaryKeysDef { .. }); } @@ -510,11 +427,12 @@ mod tests { ", ); - let handler = create_mock_sql_handler().await; - - let request = handler - .create_to_request(42, create_table, &TableReference::full("c", "s", "demo")) - .unwrap(); + let request = SqlHandler::create_to_request( + 42, + create_table, + &TableReference::full("c", "s", "demo"), + ) + .unwrap(); assert_eq!(42, request.id); assert_eq!("c".to_string(), request.catalog_name); @@ -545,7 +463,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn create_table_by_procedure() { - let instance = MockInstance::with_procedure_enabled("create_table_by_procedure").await; + let instance = MockInstance::new("create_table_by_procedure").await; let sql = r#"create table test_table( host string, diff --git a/src/datanode/src/sql/drop_table.rs b/src/datanode/src/sql/drop_table.rs index 841af8fd73..9daf46aec6 100644 --- a/src/datanode/src/sql/drop_table.rs +++ b/src/datanode/src/sql/drop_table.rs @@ -12,14 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use catalog::error::TableNotExistSnafu; -use catalog::DeregisterTableRequest; -use common_error::prelude::BoxedError; -use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId}; +use common_procedure::{watcher, ProcedureWithId}; use common_query::Output; use common_telemetry::info; -use snafu::{OptionExt, ResultExt}; -use table::engine::{EngineContext, TableReference}; +use snafu::ResultExt; +use table::engine::TableReference; use table::requests::DropTableRequest; use table_procedure::DropTableProcedure; @@ -27,67 +24,7 @@ use crate::error::{self, Result}; use crate::sql::SqlHandler; impl SqlHandler { - pub async fn drop_table(&self, req: DropTableRequest) -> Result { - if let Some(procedure_manager) = &self.procedure_manager { - return self.drop_table_by_procedure(procedure_manager, req).await; - } - - let deregister_table_req = DeregisterTableRequest { - catalog: req.catalog_name.clone(), - schema: req.schema_name.clone(), - table_name: req.table_name.clone(), - }; - - let table_reference = TableReference { - catalog: &req.catalog_name, - schema: &req.schema_name, - table: &req.table_name, - }; - let table_full_name = table_reference.to_string(); - - let table = self - .catalog_manager - .table(&req.catalog_name, &req.schema_name, &req.table_name) - .await - .context(error::CatalogSnafu)? - .context(TableNotExistSnafu { - table: &table_full_name, - }) - .map_err(BoxedError::new) - .context(error::DropTableSnafu { - table_name: &table_full_name, - })?; - - self.catalog_manager - .deregister_table(deregister_table_req) - .await - .map_err(BoxedError::new) - .context(error::DropTableSnafu { - table_name: &table_full_name, - })?; - - let ctx = EngineContext {}; - - let engine = self.table_engine(table)?; - - engine - .drop_table(&ctx, req) - .await - .map_err(BoxedError::new) - .with_context(|_| error::DropTableSnafu { - table_name: table_full_name.clone(), - })?; - - info!("Successfully dropped table: {}", table_full_name); - - Ok(Output::AffectedRows(1)) - } - - pub(crate) async fn drop_table_by_procedure( - &self, - procedure_manager: &ProcedureManagerRef, - req: DropTableRequest, - ) -> Result { + pub(crate) async fn drop_table(&self, req: DropTableRequest) -> Result { let table_name = req.table_name.clone(); let table_ref = TableReference { catalog: &req.catalog_name, @@ -106,7 +43,8 @@ impl SqlHandler { info!("Drop table {} by procedure {}", table_name, procedure_id); - let mut watcher = procedure_manager + let mut watcher = self + .procedure_manager .submit(procedure_with_id) .await .context(error::SubmitProcedureSnafu { procedure_id })?; @@ -130,7 +68,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_drop_table_by_procedure() { - let instance = MockInstance::with_procedure_enabled("alter_table_by_procedure").await; + let instance = MockInstance::new("alter_table_by_procedure").await; // Create table first. let sql = r#"create table test_drop( diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 83ca20ea2a..b447f1336e 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -12,22 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; - use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE, }; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; -use mito::config::EngineConfig; -use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; use servers::Mode; use snafu::ResultExt; -use table::engine::manager::MemoryTableEngineManager; -use table::engine::{EngineContext, TableEngine, TableEngineProcedureRef, TableEngineRef}; +use table::engine::{EngineContext, TableEngineRef}; use table::requests::{CreateTableRequest, TableOptions}; use crate::datanode::{ @@ -35,12 +28,10 @@ use crate::datanode::{ }; use crate::error::{CreateTableSnafu, Result}; use crate::instance::Instance; -use crate::sql::SqlHandler; pub(crate) struct MockInstance { instance: Instance, _guard: TestGuard, - _procedure_dir: Option, } impl MockInstance { @@ -50,32 +41,7 @@ impl MockInstance { let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); - MockInstance { - instance, - _guard, - _procedure_dir: None, - } - } - - pub(crate) async fn with_procedure_enabled(name: &str) -> Self { - let (mut opts, _guard) = create_tmp_dir_and_datanode_opts(name); - let procedure_dir = create_temp_dir(&format!("gt_procedure_{name}")); - opts.procedure = Some(ProcedureConfig { - store: ObjectStoreConfig::File(FileConfig { - data_dir: procedure_dir.path().to_str().unwrap().to_string(), - }), - max_retry_times: 3, - retry_delay: Duration::from_millis(500), - }); - - let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); - instance.start().await.unwrap(); - - MockInstance { - instance, - _guard, - _procedure_dir: Some(procedure_dir), - } + MockInstance { instance, _guard } } pub(crate) fn inner(&self) -> &Instance { @@ -86,11 +52,13 @@ impl MockInstance { struct TestGuard { _wal_tmp_dir: TempDir, _data_tmp_dir: TempDir, + _procedure_tmp_dir: TempDir, } fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) { let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}")); let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); + let procedure_tmp_dir = create_temp_dir(&format!("gt_procedure_{name}")); let opts = DatanodeOptions { wal: WalConfig { dir: wal_tmp_dir.path().to_str().unwrap().to_string(), @@ -103,6 +71,9 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) ..Default::default() }, mode: Mode::Standalone, + procedure: ProcedureConfig::from_file_path( + procedure_tmp_dir.path().to_str().unwrap().to_string(), + ), ..Default::default() }; ( @@ -110,6 +81,7 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) TestGuard { _wal_tmp_dir: wal_tmp_dir, _data_tmp_dir: data_tmp_dir, + _procedure_tmp_dir: procedure_tmp_dir, }, ) } @@ -161,26 +133,3 @@ pub(crate) async fn create_test_table( .unwrap(); Ok(()) } - -pub async fn create_mock_sql_handler() -> SqlHandler { - let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; - let mock_engine = Arc::new(MockMitoEngine::new( - EngineConfig::default(), - MockEngine::default(), - object_store, - )); - let mut engine_procedures = HashMap::new(); - engine_procedures.insert( - mock_engine.name().to_string(), - mock_engine.clone() as TableEngineProcedureRef, - ); - let engine_manager = Arc::new( - MemoryTableEngineManager::new(mock_engine).with_engine_procedures(engine_procedures), - ); - let catalog_manager = Arc::new( - catalog::local::LocalCatalogManager::try_new(engine_manager.clone()) - .await - .unwrap(), - ); - SqlHandler::new(engine_manager, catalog_manager, None) -} diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 154c353621..214ed11b5e 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -121,7 +121,7 @@ pub enum Error { #[snafu(display("Invalid InsertRequest, reason: {}", reason))] InvalidInsertRequest { reason: String, location: Location }, - #[snafu(display("Table `{}` not exist", table_name))] + #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String, location: Location, diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 68fcb20e71..2fc040a41a 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -127,9 +127,9 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) ..Default::default() }, mode: Mode::Standalone, - procedure: Some(ProcedureConfig::from_file_path( + procedure: ProcedureConfig::from_file_path( procedure_tmp_dir.path().to_str().unwrap().to_string(), - )), + ), ..Default::default() }; ( @@ -224,9 +224,9 @@ async fn create_distributed_datanode( ..Default::default() }, mode: Mode::Distributed, - procedure: Some(ProcedureConfig::from_file_path( + procedure: ProcedureConfig::from_file_path( procedure_tmp_dir.path().to_str().unwrap().to_string(), - )), + ), ..Default::default() }; diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index c7f4958844..121cc6fedd 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -290,6 +290,7 @@ impl CreateMitoTable { .engine(engine::MITO_ENGINE) .next_column_id(next_column_id) .primary_key_indices(self.data.request.primary_key_indices.clone()) + .options(self.data.request.table_options.clone()) .region_numbers(self.data.request.region_numbers.clone()) .build() .context(BuildTableMetaSnafu { diff --git a/src/mito/src/engine/procedure/drop.rs b/src/mito/src/engine/procedure/drop.rs index abe303d61b..ec93452094 100644 --- a/src/mito/src/engine/procedure/drop.rs +++ b/src/mito/src/engine/procedure/drop.rs @@ -18,21 +18,20 @@ use async_trait::async_trait; use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu}; use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status}; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use store_api::storage::StorageEngine; use table::engine::TableReference; use table::requests::DropTableRequest; use table::Table; use crate::engine::MitoEngineInner; -use crate::error::TableNotFoundSnafu; use crate::table::MitoTable; /// Procedure to drop a [MitoTable]. pub(crate) struct DropMitoTable { data: DropTableData, engine_inner: Arc>, - table: Arc>, + table: Option>>, } #[async_trait] @@ -55,7 +54,8 @@ impl Procedure for DropMitoTable { fn lock_key(&self) -> LockKey { let table_ref = self.data.table_ref(); - let info = self.table.table_info(); + let Some(table) = &self.table else { return LockKey::default() }; + let info = table.table_info(); let keys = info .meta .region_numbers @@ -78,12 +78,7 @@ impl DropMitoTable { request, }; let table_ref = data.table_ref(); - let table = - engine_inner - .get_mito_table(&table_ref) - .with_context(|| TableNotFoundSnafu { - table_name: table_ref.to_string(), - })?; + let table = engine_inner.get_mito_table(&table_ref); Ok(DropMitoTable { data, @@ -114,12 +109,7 @@ impl DropMitoTable { fn from_json(json: &str, engine_inner: Arc>) -> Result { let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?; let table_ref = data.table_ref(); - let table = - engine_inner - .get_mito_table(&table_ref) - .with_context(|| TableNotFoundSnafu { - table_name: table_ref.to_string(), - })?; + let table = engine_inner.get_mito_table(&table_ref); Ok(DropMitoTable { data, @@ -148,7 +138,9 @@ impl DropMitoTable { self.engine_inner.tables.remove(&table_ref.to_string()); // Close the table to close all regions. Closing a region is idempotent. - self.table.close().await.map_err(Error::from_error_ext)?; + if let Some(table) = &self.table { + table.close().await.map_err(Error::from_error_ext)?; + } // TODO(yingwen): Currently, DROP TABLE doesn't remove data. We can // write a drop meta update to the table and remove all files in the diff --git a/src/table-procedure/src/alter.rs b/src/table-procedure/src/alter.rs index 2b33ed87fa..f63de63149 100644 --- a/src/table-procedure/src/alter.rs +++ b/src/table-procedure/src/alter.rs @@ -134,26 +134,30 @@ impl AlterTableProcedure { async fn on_prepare(&mut self) -> Result { // Check whether catalog and schema exist. + let request = &self.data.request; let catalog = self .catalog_manager - .catalog(&self.data.request.catalog_name) + .catalog(&request.catalog_name) .context(AccessCatalogSnafu)? .context(CatalogNotFoundSnafu { - name: &self.data.request.catalog_name, + name: &request.catalog_name, })?; let schema = catalog - .schema(&self.data.request.schema_name) + .schema(&request.schema_name) .context(AccessCatalogSnafu)? .context(SchemaNotFoundSnafu { - name: &self.data.request.schema_name, + name: &request.schema_name, })?; let table = schema - .table(&self.data.request.table_name) + .table(&request.table_name) .await .context(AccessCatalogSnafu)? - .context(TableNotFoundSnafu { - name: &self.data.request.table_name, + .with_context(|| TableNotFoundSnafu { + name: format!( + "{}.{}.{}", + request.catalog_name, request.schema_name, request.table_name + ), })?; if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { ensure!( @@ -161,7 +165,10 @@ impl AlterTableProcedure { .table_exist(new_table_name) .context(AccessCatalogSnafu)?, TableExistsSnafu { - name: new_table_name, + name: format!( + "{}.{}.{}", + request.catalog_name, request.schema_name, new_table_name + ), } ); } diff --git a/src/table-procedure/src/error.rs b/src/table-procedure/src/error.rs index 5fb2e3b517..a38b4217c4 100644 --- a/src/table-procedure/src/error.rs +++ b/src/table-procedure/src/error.rs @@ -62,7 +62,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Table {} already exists", name))] + #[snafu(display("Table already exists: {}", name))] TableExists { name: String }, } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 21ffa7d26e..7c0628c988 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -26,7 +26,8 @@ use common_catalog::consts::{ use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datanode::datanode::{ - DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, S3Config, StorageConfig, WalConfig, + DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig, S3Config, + StorageConfig, WalConfig, }; use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::Instance; @@ -169,6 +170,7 @@ enum TempDirGuard { pub struct TestGuard { _wal_tmp_dir: TempDir, data_tmp_dir: Option, + _procedure_tmp_dir: TempDir, } impl TestGuard { @@ -187,6 +189,7 @@ pub fn create_tmp_dir_and_datanode_opts( name: &str, ) -> (DatanodeOptions, TestGuard) { let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}")); + let procedure_tmp_dir = create_temp_dir(&format!("gt_procedure_{name}")); let (store, data_tmp_dir) = get_test_store_config(&store_type, name); @@ -200,6 +203,9 @@ pub fn create_tmp_dir_and_datanode_opts( ..Default::default() }, mode: Mode::Standalone, + procedure: ProcedureConfig::from_file_path( + procedure_tmp_dir.path().to_str().unwrap().to_string(), + ), ..Default::default() }; ( @@ -207,6 +213,7 @@ pub fn create_tmp_dir_and_datanode_opts( TestGuard { _wal_tmp_dir: wal_tmp_dir, data_tmp_dir, + _procedure_tmp_dir: procedure_tmp_dir, }, ) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 07db55cd3d..8ee4093ac1 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -197,7 +197,7 @@ pub async fn test_sql_api(store_type: StorageType) { let body = serde_json::from_str::(&res.text().await).unwrap(); assert!(!body.success()); assert!(body.execution_time_ms().is_some()); - assert!(body.error().unwrap().contains("not exist")); + assert!(body.error().unwrap().contains("Table not found")); // test database given let res = client diff --git a/tests/cases/distributed/alter/rename_table.result b/tests/cases/distributed/alter/rename_table.result index a5bf55263b..2599550875 100644 --- a/tests/cases/distributed/alter/rename_table.result +++ b/tests/cases/distributed/alter/rename_table.result @@ -27,7 +27,7 @@ SELECT * from t; ALTER TABLE t RENAME new_table; -Error: 1003(Internal), Operation rename table not implemented yet +Error: 1001(Unsupported), Operation rename table not implemented yet DROP TABLE t; diff --git a/tests/cases/standalone/alter/rename_table.result b/tests/cases/standalone/alter/rename_table.result index 8e9f11fee5..ef456757ea 100644 --- a/tests/cases/standalone/alter/rename_table.result +++ b/tests/cases/standalone/alter/rename_table.result @@ -31,11 +31,11 @@ Affected Rows: 0 DESC TABLE t; -Error: 4001(TableNotFound), Table `t` not exist +Error: 4001(TableNotFound), Table not found: t SELECT * FROM t; -Error: 4001(TableNotFound), Table `greptime.public.t` not exist +Error: 4001(TableNotFound), Table not found: greptime.public.t CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX); diff --git a/tests/cases/standalone/common/catalog/schema.result b/tests/cases/standalone/common/catalog/schema.result index e886ca5bcc..1a18f6de7d 100644 --- a/tests/cases/standalone/common/catalog/schema.result +++ b/tests/cases/standalone/common/catalog/schema.result @@ -81,7 +81,7 @@ Affected Rows: 1 DROP TABLE hello; -Error: 4001(TableNotFound), Table `greptime.test_public_schema.hello` not exist +Error: 4001(TableNotFound), Table not found: greptime.test_public_schema.hello SHOW TABLES FROM test_public_schema; @@ -105,7 +105,7 @@ Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_publ SELECT * FROM test_public_schema.hello; -Error: 4001(TableNotFound), Table `greptime.test_public_schema.hello` not exist +Error: 4001(TableNotFound), Table not found: greptime.test_public_schema.hello USE public; diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template index 3a03784d31..55507aab3a 100644 --- a/tests/conf/datanode-test.toml.template +++ b/tests/conf/datanode-test.toml.template @@ -21,3 +21,9 @@ metasrv_addrs = ['127.0.0.1:3002'] timeout_millis = 3000 connect_timeout_millis = 5000 tcp_nodelay = false + +[procedure.store] +type = "File" +data_dir = "{procedure_dir}" +max_retry_times = 3 +retry_delay = "500ms" diff --git a/tests/conf/standalone-test.toml.template b/tests/conf/standalone-test.toml.template index b27c6d050b..488f877b33 100644 --- a/tests/conf/standalone-test.toml.template +++ b/tests/conf/standalone-test.toml.template @@ -16,3 +16,9 @@ data_dir = '{data_dir}' [grpc_options] addr = '127.0.0.1:4001' runtime_size = 8 + +[procedure.store] +type = "File" +data_dir = "{procedure_dir}" +max_retry_times = 3 +retry_delay = "500ms" diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 6c08f17d4f..9ade459a02 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -214,12 +214,14 @@ impl Env { struct Context { wal_dir: String, data_dir: String, + procedure_dir: String, } let greptimedb_dir = format!("/tmp/greptimedb-{subcommand}-{}", db_ctx.time); let ctx = Context { wal_dir: format!("{greptimedb_dir}/wal/"), data_dir: format!("{greptimedb_dir}/data/"), + procedure_dir: format!("{greptimedb_dir}/procedure/"), }; let rendered = tt.render(subcommand, &ctx).unwrap();