feat: Switch to the procedure framework (#1448)

* feat: Remove create_mock_sql_handler()

create_to_request() and alter_to_request() don't need `&self`, so
we don't need to mock the sql handler to test them

* feat: Enable procedure manager by default

* docs: Update config example

* test: Enable procedure framework in all tests

* refactor(datanode): rename methods using procedure

* test(catalog): Fix temp dir drops before test finishes

* tests: Enable procedure framework in sqlness

* test: Fix sqlness standalone rename test

* fix: Drop procedure allows table not in engine

* test: Change rename table test

* fix: add options to table meta when creating table by procedure

* test: adjust error message in schema test case

* test: Fix test_sql_api error message
This commit is contained in:
Yingwen
2023-04-25 12:04:02 +08:00
committed by GitHub
parent 92c0808766
commit eb50cee601
28 changed files with 183 additions and 428 deletions

View File

@@ -53,8 +53,8 @@ gc_duration = '30s'
checkpoint_on_startup = false
# Procedure storage options, see `standalone.example.toml`.
# [procedure.store]
# type = "File"
# data_dir = "/tmp/greptimedb/procedure/"
# max_retry_times = 3
# retry_delay = "500ms"
[procedure.store]
type = "File"
data_dir = "/tmp/greptimedb/procedure/"
max_retry_times = 3
retry_delay = "500ms"

View File

@@ -118,13 +118,12 @@ gc_duration = '30s'
checkpoint_on_startup = false
# Procedure storage options.
# Uncomment to enable.
# [procedure.store]
# # Storage type.
# type = "File"
# # Procedure data path.
# data_dir = "/tmp/greptimedb/procedure/"
# # Procedure max retry time.
# max_retry_times = 3
# # Initial retry delay of procedures, increases exponentially
# retry_delay = "500ms"
[procedure.store]
# Storage type.
type = "File"
# Procedure data path.
data_dir = "/tmp/greptimedb/procedure/"
# Procedure max retry time.
max_retry_times = 3
# Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"

View File

@@ -106,7 +106,7 @@ pub enum Error {
#[snafu(display("Table `{}` already exists", table))]
TableExists { table: String, location: Location },
#[snafu(display("Table `{}` not exist", table))]
#[snafu(display("Table not found: {}", table))]
TableNotExist { table: String, location: Location },
#[snafu(display("Schema {} already exists", schema))]

View File

@@ -20,14 +20,16 @@ mod tests {
use catalog::{CatalogManager, RegisterTableRequest, RenameTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::{error, info};
use common_test_util::temp_dir::TempDir;
use mito::config::EngineConfig;
use table::engine::manager::MemoryTableEngineManager;
use table::table::numbers::NumbersTable;
use table::TableRef;
use tokio::sync::Mutex;
async fn create_local_catalog_manager() -> Result<LocalCatalogManager, catalog::error::Error> {
let (_dir, object_store) =
async fn create_local_catalog_manager(
) -> Result<(TempDir, LocalCatalogManager), catalog::error::Error> {
let (dir, object_store) =
mito::table::test_util::new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = Arc::new(mito::table::test_util::MockMitoEngine::new(
EngineConfig::default(),
@@ -37,13 +39,13 @@ mod tests {
let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone()));
let catalog_manager = LocalCatalogManager::try_new(engine_manager).await.unwrap();
catalog_manager.start().await?;
Ok(catalog_manager)
Ok((dir, catalog_manager))
}
#[tokio::test]
async fn test_rename_table() {
common_telemetry::init_default_ut_logging();
let catalog_manager = create_local_catalog_manager().await.unwrap();
let (_dir, catalog_manager) = create_local_catalog_manager().await.unwrap();
// register table
let table_name = "test_table";
let table_id = 42;
@@ -81,7 +83,7 @@ mod tests {
#[tokio::test]
async fn test_duplicate_register() {
let catalog_manager = create_local_catalog_manager().await.unwrap();
let (_dir, catalog_manager) = create_local_catalog_manager().await.unwrap();
let request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
@@ -118,8 +120,9 @@ mod tests {
fn test_concurrent_register() {
common_telemetry::init_default_ut_logging();
let rt = Arc::new(tokio::runtime::Builder::new_multi_thread().build().unwrap());
let catalog_manager =
Arc::new(rt.block_on(async { create_local_catalog_manager().await.unwrap() }));
let (_dir, catalog_manager) =
rt.block_on(async { create_local_catalog_manager().await.unwrap() });
let catalog_manager = Arc::new(catalog_manager);
let succeed: Arc<Mutex<Option<TableRef>>> = Arc::new(Mutex::new(None));

View File

@@ -159,7 +159,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
opts.wal.dir = wal_dir;
}
if let Some(procedure_dir) = cmd.procedure_dir {
opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir));
opts.procedure = ProcedureConfig::from_file_path(procedure_dir);
}
if let Some(http_addr) = cmd.http_addr {
opts.http_opts.addr = http_addr

View File

@@ -81,7 +81,7 @@ pub struct StandaloneOptions {
pub prom_options: Option<PromOptions>,
pub wal: WalConfig,
pub storage: StorageConfig,
pub procedure: Option<ProcedureConfig>,
pub procedure: ProcedureConfig,
}
impl Default for StandaloneOptions {
@@ -99,7 +99,7 @@ impl Default for StandaloneOptions {
prom_options: Some(PromOptions::default()),
wal: WalConfig::default(),
storage: StorageConfig::default(),
procedure: None,
procedure: ProcedureConfig::default(),
}
}
}

View File

@@ -237,7 +237,7 @@ pub struct DatanodeOptions {
pub meta_client_options: Option<MetaClientOptions>,
pub wal: WalConfig,
pub storage: StorageConfig,
pub procedure: Option<ProcedureConfig>,
pub procedure: ProcedureConfig,
}
impl Default for DatanodeOptions {
@@ -255,7 +255,7 @@ impl Default for DatanodeOptions {
meta_client_options: None,
wal: WalConfig::default(),
storage: StorageConfig::default(),
procedure: None,
procedure: ProcedureConfig::default(),
}
}
}

View File

@@ -76,7 +76,7 @@ pub struct Instance {
pub(crate) catalog_manager: CatalogManagerRef,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
procedure_manager: Option<ProcedureManagerRef>,
procedure_manager: ProcedureManagerRef,
}
pub type InstanceRef = Arc<Instance>;
@@ -208,20 +208,17 @@ impl Instance {
let procedure_manager = create_procedure_manager(&opts.procedure).await?;
// Register all procedures.
if let Some(procedure_manager) = &procedure_manager {
// Register procedures of the mito engine.
mito_engine.register_procedure_loaders(&**procedure_manager);
immutable_file_engine.register_procedure_loaders(&**procedure_manager);
// Register procedures in table-procedure crate.
table_procedure::register_procedure_loaders(
catalog_manager.clone(),
mito_engine.clone(),
mito_engine.clone(),
&**procedure_manager,
);
// TODO(yingwen): Register procedures of the file table engine once #1372
// is ready.
}
// Register procedures of the mito engine.
mito_engine.register_procedure_loaders(&*procedure_manager);
// Register procedures of the file table engine.
immutable_file_engine.register_procedure_loaders(&*procedure_manager);
// Register procedures in table-procedure crate.
table_procedure::register_procedure_loaders(
catalog_manager.clone(),
mito_engine.clone(),
mito_engine.clone(),
&*procedure_manager,
);
Ok(Self {
query_engine: query_engine.clone(),
@@ -248,12 +245,10 @@ impl Instance {
// Recover procedures after the catalog manager is started, so we can
// ensure we can access all tables from the catalog manager.
if let Some(procedure_manager) = &self.procedure_manager {
procedure_manager
.recover()
.await
.context(RecoverProcedureSnafu)?;
}
self.procedure_manager
.recover()
.await
.context(RecoverProcedureSnafu)?;
Ok(())
}
@@ -547,12 +542,8 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result<RaftEngin
}
pub(crate) async fn create_procedure_manager(
procedure_config: &Option<ProcedureConfig>,
) -> Result<Option<ProcedureManagerRef>> {
let Some(procedure_config) = procedure_config else {
return Ok(None);
};
procedure_config: &ProcedureConfig,
) -> Result<ProcedureManagerRef> {
info!(
"Creating procedure manager with config: {:?}",
procedure_config
@@ -566,8 +557,5 @@ pub(crate) async fn create_procedure_manager(
retry_delay: procedure_config.retry_delay,
};
Ok(Some(Arc::new(LocalManager::new(
manager_config,
state_store,
))))
Ok(Arc::new(LocalManager::new(manager_config, state_store)))
}

View File

@@ -73,9 +73,7 @@ impl Instance {
let name = create_table.name.clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let request =
self.sql_handler
.create_to_request(table_id, create_table, &table_ref)?;
let request = SqlHandler::create_to_request(table_id, create_table, &table_ref)?;
let table_id = request.id;
info!("Creating table: {table_ref}, table id = {table_id}",);
@@ -108,7 +106,7 @@ impl Instance {
let name = alter_table.table_name().clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let req = self.sql_handler.alter_to_request(alter_table, table_ref)?;
let req = SqlHandler::alter_to_request(alter_table, table_ref)?;
self.sql_handler
.execute(SqlRequest::Alter(req), query_ctx)
.await

View File

@@ -51,14 +51,14 @@ pub enum SqlRequest {
pub struct SqlHandler {
table_engine_manager: TableEngineManagerRef,
catalog_manager: CatalogManagerRef,
procedure_manager: Option<ProcedureManagerRef>,
procedure_manager: ProcedureManagerRef,
}
impl SqlHandler {
pub fn new(
table_engine_manager: TableEngineManagerRef,
catalog_manager: CatalogManagerRef,
procedure_manager: Option<ProcedureManagerRef>,
procedure_manager: ProcedureManagerRef,
) -> Self {
Self {
table_engine_manager,
@@ -75,7 +75,7 @@ impl SqlHandler {
let result = match request {
SqlRequest::CreateTable(req) => self.create_table(req).await,
SqlRequest::CreateDatabase(req) => self.create_database(req, query_ctx.clone()).await,
SqlRequest::Alter(req) => self.alter(req).await,
SqlRequest::Alter(req) => self.alter_table(req).await,
SqlRequest::DropTable(req) => self.drop_table(req).await,
SqlRequest::FlushTable(req) => self.flush_table(req).await,
};

View File

@@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::RenameTableRequest;
use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId};
use common_procedure::{watcher, ProcedureWithId};
use common_query::Output;
use common_telemetry::logging::info;
use snafu::prelude::*;
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::column_def_to_schema;
use table::engine::{EngineContext, TableReference};
use table::engine::TableReference;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};
use table_procedure::AlterTableProcedure;
@@ -27,62 +26,7 @@ use crate::error::{self, Result};
use crate::sql::SqlHandler;
impl SqlHandler {
pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result<Output> {
if let Some(procedure_manager) = &self.procedure_manager {
return self.alter_table_by_procedure(procedure_manager, req).await;
}
let ctx = EngineContext {};
let table_name = req.table_name.clone();
let table_ref = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
table: &table_name,
};
let full_table_name = table_ref.to_string();
// fetches table via catalog
let table = self.get_table(&table_ref).await?;
// checks the table engine exist
let table_engine = self.table_engine(table)?;
ensure!(
table_engine.table_exists(&ctx, &table_ref),
error::TableNotFoundSnafu {
table_name: &full_table_name,
}
);
let is_rename = req.is_rename_table();
let table = table_engine
.alter_table(&ctx, req)
.await
.context(error::AlterTableSnafu {
table_name: full_table_name,
})?;
if is_rename {
let table_info = &table.table_info();
let rename_table_req = RenameTableRequest {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table_name,
new_table_name: table_info.name.clone(),
table_id: table_info.ident.table_id,
};
self.catalog_manager
.rename_table(rename_table_req)
.await
.context(error::RenameTableSnafu)?;
}
// Tried in MySQL, it really prints "Affected Rows: 0".
Ok(Output::AffectedRows(0))
}
pub(crate) async fn alter_table_by_procedure(
&self,
procedure_manager: &ProcedureManagerRef,
req: AlterTableRequest,
) -> Result<Output> {
pub(crate) async fn alter_table(&self, req: AlterTableRequest) -> Result<Output> {
let table_name = req.table_name.clone();
let table_ref = TableReference {
catalog: &req.catalog_name,
@@ -100,7 +44,8 @@ impl SqlHandler {
info!("Alter table {} by procedure {}", table_name, procedure_id);
let mut watcher = procedure_manager
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu { procedure_id })?;
@@ -108,11 +53,11 @@ impl SqlHandler {
watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu { procedure_id })?;
// Tried in MySQL, it really prints "Affected Rows: 0".
Ok(Output::AffectedRows(0))
}
pub(crate) fn alter_to_request(
&self,
alter_table: AlterTable,
table_ref: TableReference,
) -> Result<AlterTableRequest> {
@@ -160,7 +105,7 @@ mod tests {
use sql::statements::statement::Statement;
use super::*;
use crate::tests::test_util::{create_mock_sql_handler, MockInstance};
use crate::tests::test_util::MockInstance;
fn parse_sql(sql: &str) -> AlterTable {
let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
@@ -175,14 +120,12 @@ mod tests {
#[tokio::test]
async fn test_alter_to_request_with_adding_column() {
let handler = create_mock_sql_handler().await;
let alter_table = parse_sql("ALTER TABLE my_metric_1 ADD tagk_i STRING Null;");
let req = handler
.alter_to_request(
alter_table,
TableReference::full("greptime", "public", "my_metric_1"),
)
.unwrap();
let req = SqlHandler::alter_to_request(
alter_table,
TableReference::full("greptime", "public", "my_metric_1"),
)
.unwrap();
assert_eq!(req.catalog_name, "greptime");
assert_eq!(req.schema_name, "public");
assert_eq!(req.table_name, "my_metric_1");
@@ -203,14 +146,12 @@ mod tests {
#[tokio::test]
async fn test_alter_to_request_with_renaming_table() {
let handler = create_mock_sql_handler().await;
let alter_table = parse_sql("ALTER TABLE test_table RENAME table_t;");
let req = handler
.alter_to_request(
alter_table,
TableReference::full("greptime", "public", "test_table"),
)
.unwrap();
let req = SqlHandler::alter_to_request(
alter_table,
TableReference::full("greptime", "public", "test_table"),
)
.unwrap();
assert_eq!(req.catalog_name, "greptime");
assert_eq!(req.schema_name, "public");
assert_eq!(req.table_name, "test_table");
@@ -228,7 +169,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_alter_table_by_procedure() {
let instance = MockInstance::with_procedure_enabled("alter_table_by_procedure").await;
let instance = MockInstance::new("alter_table_by_procedure").await;
// Create table first.
let sql = r#"create table test_alter(

View File

@@ -14,10 +14,10 @@
use std::collections::HashMap;
use catalog::{RegisterSchemaRequest, RegisterTableRequest};
use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId};
use catalog::RegisterSchemaRequest;
use common_procedure::{watcher, ProcedureWithId};
use common_query::Output;
use common_telemetry::tracing::{error, info};
use common_telemetry::tracing::info;
use datatypes::schema::RawSchema;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
@@ -25,17 +25,16 @@ use sql::ast::{ColumnOption, TableConstraint};
use sql::statements::column_def_to_schema;
use sql::statements::create::{CreateTable, TIME_INDEX};
use sql::util::to_lowercase_options_map;
use table::engine::{EngineContext, TableReference};
use table::engine::TableReference;
use table::metadata::TableId;
use table::requests::*;
use table_procedure::CreateTableProcedure;
use crate::error::{
self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu,
EngineProcedureNotFoundSnafu, IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu,
KeyColumnNotFoundSnafu, RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu,
SubmitProcedureSnafu, TableEngineNotFoundSnafu, UnrecognizedTableOptionSnafu,
WaitProcedureSnafu,
self, CatalogSnafu, ConstraintNotSupportedSnafu, EngineProcedureNotFoundSnafu,
IllegalPrimaryKeysDefSnafu, KeyColumnNotFoundSnafu, RegisterSchemaSnafu, Result,
SchemaExistsSnafu, SubmitProcedureSnafu, TableEngineNotFoundSnafu,
UnrecognizedTableOptionSnafu, WaitProcedureSnafu,
};
use crate::sql::SqlHandler;
@@ -74,76 +73,6 @@ impl SqlHandler {
}
pub(crate) async fn create_table(&self, req: CreateTableRequest) -> Result<Output> {
if let Some(procedure_manager) = &self.procedure_manager {
return self.create_table_by_procedure(procedure_manager, req).await;
}
let ctx = EngineContext {};
// first check if catalog and schema exist
let catalog = self
.catalog_manager
.catalog(&req.catalog_name)
.context(CatalogSnafu)?
.with_context(|| {
error!(
"Failed to create table {}.{}.{}, catalog not found",
&req.catalog_name, &req.schema_name, &req.table_name
);
CatalogNotFoundSnafu {
name: &req.catalog_name,
}
})?;
catalog
.schema(&req.schema_name)
.context(CatalogSnafu)?
.with_context(|| {
error!(
"Failed to create table {}.{}.{}, schema not found",
&req.catalog_name, &req.schema_name, &req.table_name
);
SchemaNotFoundSnafu {
name: &req.schema_name,
}
})?;
// determine catalog and schema from the very beginning
let table_name = req.table_name.clone();
let table_engine =
self.table_engine_manager
.engine(&req.engine)
.context(TableEngineNotFoundSnafu {
engine_name: &req.engine,
})?;
let table = table_engine
.create_table(&ctx, req)
.await
.with_context(|_| CreateTableSnafu {
table_name: &table_name,
})?;
let register_req = RegisterTableRequest {
catalog: table.table_info().catalog_name.clone(),
schema: table.table_info().schema_name.clone(),
table_name: table_name.clone(),
table_id: table.table_info().ident.table_id,
table,
};
self.catalog_manager
.register_table(register_req)
.await
.context(InsertSystemCatalogSnafu)?;
info!("Successfully created table: {:?}", table_name);
// TODO(hl): maybe support create multiple tables
Ok(Output::AffectedRows(0))
}
pub(crate) async fn create_table_by_procedure(
&self,
procedure_manager: &ProcedureManagerRef,
req: CreateTableRequest,
) -> Result<Output> {
let table_name = req.table_name.clone();
let table_engine =
self.table_engine_manager
@@ -168,7 +97,8 @@ impl SqlHandler {
info!("Create table {} by procedure {}", table_name, procedure_id);
let mut watcher = procedure_manager
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(SubmitProcedureSnafu { procedure_id })?;
@@ -182,7 +112,6 @@ impl SqlHandler {
/// Converts [CreateTable] to [SqlRequest::CreateTable].
pub(crate) fn create_to_request(
&self,
table_id: TableId,
stmt: CreateTable,
table_ref: &TableReference,
@@ -329,7 +258,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::tests::test_util::{create_mock_sql_handler, MockInstance};
use crate::tests::test_util::MockInstance;
fn sql_to_statement(sql: &str) -> CreateTable {
let mut res = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
@@ -351,9 +280,7 @@ mod tests {
host STRING PRIMARY KEY
) engine=mito with(regions=1, ttl='7days',write_buffer_size='32MB',some='other');"#;
let parsed_stmt = sql_to_statement(sql);
let handler = create_mock_sql_handler().await;
let c = handler
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
let c = SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap();
assert_eq!(Some(Duration::from_secs(604800)), c.table_options.ttl);
@@ -366,7 +293,6 @@ mod tests {
#[tokio::test]
pub async fn test_create_with_inline_primary_key() {
let handler = create_mock_sql_handler().await;
let parsed_stmt = sql_to_statement(
r#"
CREATE TABLE demo_table(
@@ -375,8 +301,7 @@ mod tests {
host STRING PRIMARY KEY
) engine=mito with(regions=1);"#,
);
let c = handler
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
let c = SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap();
assert_eq!("demo_table", c.table_name);
assert_eq!(42, c.id);
@@ -386,7 +311,6 @@ mod tests {
#[tokio::test]
pub async fn test_create_to_request() {
let handler = create_mock_sql_handler().await;
let parsed_stmt = sql_to_statement(
r#"create table demo_table(
host string,
@@ -396,8 +320,7 @@ mod tests {
TIME INDEX (ts),
PRIMARY KEY(host)) engine=mito with(regions=1);"#,
);
let c = handler
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
let c = SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap();
assert_eq!("demo_table", c.table_name);
assert_eq!(42, c.id);
@@ -409,7 +332,6 @@ mod tests {
#[tokio::test]
pub async fn test_multiple_primary_key_definitions() {
let handler = create_mock_sql_handler().await;
let parsed_stmt = sql_to_statement(
r#"create table demo_table (
"timestamp" BIGINT TIME INDEX,
@@ -417,31 +339,28 @@ mod tests {
host STRING PRIMARY KEY,
PRIMARY KEY(host)) engine=mito with(regions=1);"#,
);
let error = handler
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap_err();
let error =
SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap_err();
assert_matches!(error, Error::IllegalPrimaryKeysDef { .. });
}
#[tokio::test]
pub async fn test_multiple_inline_primary_key_definitions() {
let handler = create_mock_sql_handler().await;
let parsed_stmt = sql_to_statement(
r#"create table demo_table (
"timestamp" BIGINT TIME INDEX,
"value" DOUBLE PRIMARY KEY,
host STRING PRIMARY KEY) engine=mito with(regions=1);"#,
);
let error = handler
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap_err();
let error =
SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap_err();
assert_matches!(error, Error::IllegalPrimaryKeysDef { .. });
}
#[tokio::test]
pub async fn test_primary_key_not_specified() {
let handler = create_mock_sql_handler().await;
let parsed_stmt = sql_to_statement(
r#"create table demo_table(
host string,
@@ -450,8 +369,7 @@ mod tests {
memory double,
TIME INDEX (ts)) engine=mito with(regions=1);"#,
);
let c = handler
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
let c = SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap();
assert!(c.primary_key_indices.is_empty());
assert_eq!(c.schema.timestamp_index, Some(1));
@@ -460,17 +378,15 @@ mod tests {
/// Constraints specified, not column cannot be found.
#[tokio::test]
pub async fn test_key_not_found() {
let handler = create_mock_sql_handler().await;
let parsed_stmt = sql_to_statement(
r#"create table demo_table(
host string,
TIME INDEX (ts)) engine=mito with(regions=1);"#,
);
let error = handler
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap_err();
let error =
SqlHandler::create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap_err();
assert_matches!(error, Error::KeyColumnNotFound { .. });
}
@@ -488,11 +404,12 @@ mod tests {
",
);
let handler = create_mock_sql_handler().await;
let error = handler
.create_to_request(42, create_table, &TableReference::full("c", "s", "demo"))
.unwrap_err();
let error = SqlHandler::create_to_request(
42,
create_table,
&TableReference::full("c", "s", "demo"),
)
.unwrap_err();
assert_matches!(error, Error::IllegalPrimaryKeysDef { .. });
}
@@ -510,11 +427,12 @@ mod tests {
",
);
let handler = create_mock_sql_handler().await;
let request = handler
.create_to_request(42, create_table, &TableReference::full("c", "s", "demo"))
.unwrap();
let request = SqlHandler::create_to_request(
42,
create_table,
&TableReference::full("c", "s", "demo"),
)
.unwrap();
assert_eq!(42, request.id);
assert_eq!("c".to_string(), request.catalog_name);
@@ -545,7 +463,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn create_table_by_procedure() {
let instance = MockInstance::with_procedure_enabled("create_table_by_procedure").await;
let instance = MockInstance::new("create_table_by_procedure").await;
let sql = r#"create table test_table(
host string,

View File

@@ -12,14 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::error::TableNotExistSnafu;
use catalog::DeregisterTableRequest;
use common_error::prelude::BoxedError;
use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId};
use common_procedure::{watcher, ProcedureWithId};
use common_query::Output;
use common_telemetry::info;
use snafu::{OptionExt, ResultExt};
use table::engine::{EngineContext, TableReference};
use snafu::ResultExt;
use table::engine::TableReference;
use table::requests::DropTableRequest;
use table_procedure::DropTableProcedure;
@@ -27,67 +24,7 @@ use crate::error::{self, Result};
use crate::sql::SqlHandler;
impl SqlHandler {
pub async fn drop_table(&self, req: DropTableRequest) -> Result<Output> {
if let Some(procedure_manager) = &self.procedure_manager {
return self.drop_table_by_procedure(procedure_manager, req).await;
}
let deregister_table_req = DeregisterTableRequest {
catalog: req.catalog_name.clone(),
schema: req.schema_name.clone(),
table_name: req.table_name.clone(),
};
let table_reference = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
table: &req.table_name,
};
let table_full_name = table_reference.to_string();
let table = self
.catalog_manager
.table(&req.catalog_name, &req.schema_name, &req.table_name)
.await
.context(error::CatalogSnafu)?
.context(TableNotExistSnafu {
table: &table_full_name,
})
.map_err(BoxedError::new)
.context(error::DropTableSnafu {
table_name: &table_full_name,
})?;
self.catalog_manager
.deregister_table(deregister_table_req)
.await
.map_err(BoxedError::new)
.context(error::DropTableSnafu {
table_name: &table_full_name,
})?;
let ctx = EngineContext {};
let engine = self.table_engine(table)?;
engine
.drop_table(&ctx, req)
.await
.map_err(BoxedError::new)
.with_context(|_| error::DropTableSnafu {
table_name: table_full_name.clone(),
})?;
info!("Successfully dropped table: {}", table_full_name);
Ok(Output::AffectedRows(1))
}
pub(crate) async fn drop_table_by_procedure(
&self,
procedure_manager: &ProcedureManagerRef,
req: DropTableRequest,
) -> Result<Output> {
pub(crate) async fn drop_table(&self, req: DropTableRequest) -> Result<Output> {
let table_name = req.table_name.clone();
let table_ref = TableReference {
catalog: &req.catalog_name,
@@ -106,7 +43,8 @@ impl SqlHandler {
info!("Drop table {} by procedure {}", table_name, procedure_id);
let mut watcher = procedure_manager
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu { procedure_id })?;
@@ -130,7 +68,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_drop_table_by_procedure() {
let instance = MockInstance::with_procedure_enabled("alter_table_by_procedure").await;
let instance = MockInstance::new("alter_table_by_procedure").await;
// Create table first.
let sql = r#"create table test_drop(

View File

@@ -12,22 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE,
};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use mito::config::EngineConfig;
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
use servers::Mode;
use snafu::ResultExt;
use table::engine::manager::MemoryTableEngineManager;
use table::engine::{EngineContext, TableEngine, TableEngineProcedureRef, TableEngineRef};
use table::engine::{EngineContext, TableEngineRef};
use table::requests::{CreateTableRequest, TableOptions};
use crate::datanode::{
@@ -35,12 +28,10 @@ use crate::datanode::{
};
use crate::error::{CreateTableSnafu, Result};
use crate::instance::Instance;
use crate::sql::SqlHandler;
pub(crate) struct MockInstance {
instance: Instance,
_guard: TestGuard,
_procedure_dir: Option<TempDir>,
}
impl MockInstance {
@@ -50,32 +41,7 @@ impl MockInstance {
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
MockInstance {
instance,
_guard,
_procedure_dir: None,
}
}
pub(crate) async fn with_procedure_enabled(name: &str) -> Self {
let (mut opts, _guard) = create_tmp_dir_and_datanode_opts(name);
let procedure_dir = create_temp_dir(&format!("gt_procedure_{name}"));
opts.procedure = Some(ProcedureConfig {
store: ObjectStoreConfig::File(FileConfig {
data_dir: procedure_dir.path().to_str().unwrap().to_string(),
}),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
});
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
MockInstance {
instance,
_guard,
_procedure_dir: Some(procedure_dir),
}
MockInstance { instance, _guard }
}
pub(crate) fn inner(&self) -> &Instance {
@@ -86,11 +52,13 @@ impl MockInstance {
struct TestGuard {
_wal_tmp_dir: TempDir,
_data_tmp_dir: TempDir,
_procedure_tmp_dir: TempDir,
}
fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}"));
let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let procedure_tmp_dir = create_temp_dir(&format!("gt_procedure_{name}"));
let opts = DatanodeOptions {
wal: WalConfig {
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
@@ -103,6 +71,9 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
..Default::default()
},
mode: Mode::Standalone,
procedure: ProcedureConfig::from_file_path(
procedure_tmp_dir.path().to_str().unwrap().to_string(),
),
..Default::default()
};
(
@@ -110,6 +81,7 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
_data_tmp_dir: data_tmp_dir,
_procedure_tmp_dir: procedure_tmp_dir,
},
)
}
@@ -161,26 +133,3 @@ pub(crate) async fn create_test_table(
.unwrap();
Ok(())
}
pub async fn create_mock_sql_handler() -> SqlHandler {
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = Arc::new(MockMitoEngine::new(
EngineConfig::default(),
MockEngine::default(),
object_store,
));
let mut engine_procedures = HashMap::new();
engine_procedures.insert(
mock_engine.name().to_string(),
mock_engine.clone() as TableEngineProcedureRef,
);
let engine_manager = Arc::new(
MemoryTableEngineManager::new(mock_engine).with_engine_procedures(engine_procedures),
);
let catalog_manager = Arc::new(
catalog::local::LocalCatalogManager::try_new(engine_manager.clone())
.await
.unwrap(),
);
SqlHandler::new(engine_manager, catalog_manager, None)
}

View File

@@ -121,7 +121,7 @@ pub enum Error {
#[snafu(display("Invalid InsertRequest, reason: {}", reason))]
InvalidInsertRequest { reason: String, location: Location },
#[snafu(display("Table `{}` not exist", table_name))]
#[snafu(display("Table not found: {}", table_name))]
TableNotFound {
table_name: String,
location: Location,

View File

@@ -127,9 +127,9 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
..Default::default()
},
mode: Mode::Standalone,
procedure: Some(ProcedureConfig::from_file_path(
procedure: ProcedureConfig::from_file_path(
procedure_tmp_dir.path().to_str().unwrap().to_string(),
)),
),
..Default::default()
};
(
@@ -224,9 +224,9 @@ async fn create_distributed_datanode(
..Default::default()
},
mode: Mode::Distributed,
procedure: Some(ProcedureConfig::from_file_path(
procedure: ProcedureConfig::from_file_path(
procedure_tmp_dir.path().to_str().unwrap().to_string(),
)),
),
..Default::default()
};

View File

@@ -290,6 +290,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
.engine(engine::MITO_ENGINE)
.next_column_id(next_column_id)
.primary_key_indices(self.data.request.primary_key_indices.clone())
.options(self.data.request.table_options.clone())
.region_numbers(self.data.request.region_numbers.clone())
.build()
.context(BuildTableMetaSnafu {

View File

@@ -18,21 +18,20 @@ use async_trait::async_trait;
use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use store_api::storage::StorageEngine;
use table::engine::TableReference;
use table::requests::DropTableRequest;
use table::Table;
use crate::engine::MitoEngineInner;
use crate::error::TableNotFoundSnafu;
use crate::table::MitoTable;
/// Procedure to drop a [MitoTable].
pub(crate) struct DropMitoTable<S: StorageEngine> {
data: DropTableData,
engine_inner: Arc<MitoEngineInner<S>>,
table: Arc<MitoTable<S::Region>>,
table: Option<Arc<MitoTable<S::Region>>>,
}
#[async_trait]
@@ -55,7 +54,8 @@ impl<S: StorageEngine> Procedure for DropMitoTable<S> {
fn lock_key(&self) -> LockKey {
let table_ref = self.data.table_ref();
let info = self.table.table_info();
let Some(table) = &self.table else { return LockKey::default() };
let info = table.table_info();
let keys = info
.meta
.region_numbers
@@ -78,12 +78,7 @@ impl<S: StorageEngine> DropMitoTable<S> {
request,
};
let table_ref = data.table_ref();
let table =
engine_inner
.get_mito_table(&table_ref)
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table = engine_inner.get_mito_table(&table_ref);
Ok(DropMitoTable {
data,
@@ -114,12 +109,7 @@ impl<S: StorageEngine> DropMitoTable<S> {
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let table_ref = data.table_ref();
let table =
engine_inner
.get_mito_table(&table_ref)
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table = engine_inner.get_mito_table(&table_ref);
Ok(DropMitoTable {
data,
@@ -148,7 +138,9 @@ impl<S: StorageEngine> DropMitoTable<S> {
self.engine_inner.tables.remove(&table_ref.to_string());
// Close the table to close all regions. Closing a region is idempotent.
self.table.close().await.map_err(Error::from_error_ext)?;
if let Some(table) = &self.table {
table.close().await.map_err(Error::from_error_ext)?;
}
// TODO(yingwen): Currently, DROP TABLE doesn't remove data. We can
// write a drop meta update to the table and remove all files in the

View File

@@ -134,26 +134,30 @@ impl AlterTableProcedure {
async fn on_prepare(&mut self) -> Result<Status> {
// Check whether catalog and schema exist.
let request = &self.data.request;
let catalog = self
.catalog_manager
.catalog(&self.data.request.catalog_name)
.catalog(&request.catalog_name)
.context(AccessCatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: &self.data.request.catalog_name,
name: &request.catalog_name,
})?;
let schema = catalog
.schema(&self.data.request.schema_name)
.schema(&request.schema_name)
.context(AccessCatalogSnafu)?
.context(SchemaNotFoundSnafu {
name: &self.data.request.schema_name,
name: &request.schema_name,
})?;
let table = schema
.table(&self.data.request.table_name)
.table(&request.table_name)
.await
.context(AccessCatalogSnafu)?
.context(TableNotFoundSnafu {
name: &self.data.request.table_name,
.with_context(|| TableNotFoundSnafu {
name: format!(
"{}.{}.{}",
request.catalog_name, request.schema_name, request.table_name
),
})?;
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
ensure!(
@@ -161,7 +165,10 @@ impl AlterTableProcedure {
.table_exist(new_table_name)
.context(AccessCatalogSnafu)?,
TableExistsSnafu {
name: new_table_name,
name: format!(
"{}.{}.{}",
request.catalog_name, request.schema_name, new_table_name
),
}
);
}

View File

@@ -62,7 +62,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("Table {} already exists", name))]
#[snafu(display("Table already exists: {}", name))]
TableExists { name: String },
}

View File

@@ -26,7 +26,8 @@ use common_catalog::consts::{
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datanode::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, S3Config, StorageConfig, WalConfig,
DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig, S3Config,
StorageConfig, WalConfig,
};
use datanode::error::{CreateTableSnafu, Result};
use datanode::instance::Instance;
@@ -169,6 +170,7 @@ enum TempDirGuard {
pub struct TestGuard {
_wal_tmp_dir: TempDir,
data_tmp_dir: Option<TempDirGuard>,
_procedure_tmp_dir: TempDir,
}
impl TestGuard {
@@ -187,6 +189,7 @@ pub fn create_tmp_dir_and_datanode_opts(
name: &str,
) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}"));
let procedure_tmp_dir = create_temp_dir(&format!("gt_procedure_{name}"));
let (store, data_tmp_dir) = get_test_store_config(&store_type, name);
@@ -200,6 +203,9 @@ pub fn create_tmp_dir_and_datanode_opts(
..Default::default()
},
mode: Mode::Standalone,
procedure: ProcedureConfig::from_file_path(
procedure_tmp_dir.path().to_str().unwrap().to_string(),
),
..Default::default()
};
(
@@ -207,6 +213,7 @@ pub fn create_tmp_dir_and_datanode_opts(
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
data_tmp_dir,
_procedure_tmp_dir: procedure_tmp_dir,
},
)
}

View File

@@ -197,7 +197,7 @@ pub async fn test_sql_api(store_type: StorageType) {
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
assert!(!body.success());
assert!(body.execution_time_ms().is_some());
assert!(body.error().unwrap().contains("not exist"));
assert!(body.error().unwrap().contains("Table not found"));
// test database given
let res = client

View File

@@ -27,7 +27,7 @@ SELECT * from t;
ALTER TABLE t RENAME new_table;
Error: 1003(Internal), Operation rename table not implemented yet
Error: 1001(Unsupported), Operation rename table not implemented yet
DROP TABLE t;

View File

@@ -31,11 +31,11 @@ Affected Rows: 0
DESC TABLE t;
Error: 4001(TableNotFound), Table `t` not exist
Error: 4001(TableNotFound), Table not found: t
SELECT * FROM t;
Error: 4001(TableNotFound), Table `greptime.public.t` not exist
Error: 4001(TableNotFound), Table not found: greptime.public.t
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);

View File

@@ -81,7 +81,7 @@ Affected Rows: 1
DROP TABLE hello;
Error: 4001(TableNotFound), Table `greptime.test_public_schema.hello` not exist
Error: 4001(TableNotFound), Table not found: greptime.test_public_schema.hello
SHOW TABLES FROM test_public_schema;
@@ -105,7 +105,7 @@ Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_publ
SELECT * FROM test_public_schema.hello;
Error: 4001(TableNotFound), Table `greptime.test_public_schema.hello` not exist
Error: 4001(TableNotFound), Table not found: greptime.test_public_schema.hello
USE public;

View File

@@ -21,3 +21,9 @@ metasrv_addrs = ['127.0.0.1:3002']
timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = false
[procedure.store]
type = "File"
data_dir = "{procedure_dir}"
max_retry_times = 3
retry_delay = "500ms"

View File

@@ -16,3 +16,9 @@ data_dir = '{data_dir}'
[grpc_options]
addr = '127.0.0.1:4001'
runtime_size = 8
[procedure.store]
type = "File"
data_dir = "{procedure_dir}"
max_retry_times = 3
retry_delay = "500ms"

View File

@@ -214,12 +214,14 @@ impl Env {
struct Context {
wal_dir: String,
data_dir: String,
procedure_dir: String,
}
let greptimedb_dir = format!("/tmp/greptimedb-{subcommand}-{}", db_ctx.time);
let ctx = Context {
wal_dir: format!("{greptimedb_dir}/wal/"),
data_dir: format!("{greptimedb_dir}/data/"),
procedure_dir: format!("{greptimedb_dir}/procedure/"),
};
let rendered = tt.render(subcommand, &ctx).unwrap();