mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 15:00:40 +00:00
feat: impl DROP TABLE on memory catalog based standalone mode (#630)
* feat: implement drop table for standalone mode Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * update integration test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * enhancement test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -20,6 +20,7 @@ message AdminExpr {
|
||||
CreateExpr create = 2;
|
||||
AlterExpr alter = 3;
|
||||
CreateDatabaseExpr create_database = 4;
|
||||
DropTableExpr drop_table = 5;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,6 +56,12 @@ message AlterExpr {
|
||||
}
|
||||
}
|
||||
|
||||
message DropTableExpr {
|
||||
string catalog_name = 1;
|
||||
string schema_name = 2;
|
||||
string table_name = 3;
|
||||
}
|
||||
|
||||
message AddColumns {
|
||||
repeated AddColumn add_columns = 1;
|
||||
}
|
||||
|
||||
@@ -146,7 +146,6 @@ pub struct DeregisterTableRequest {
|
||||
pub catalog: String,
|
||||
pub schema: String,
|
||||
pub table_name: String,
|
||||
pub table_id: TableId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -393,7 +393,6 @@ mod tests {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "numbers".to_string(),
|
||||
table_id: 2333,
|
||||
};
|
||||
catalog
|
||||
.deregister_table(deregister_table_req)
|
||||
|
||||
@@ -217,7 +217,7 @@ impl TableEngine for MockTableEngine {
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
_request: DropTableRequest,
|
||||
) -> table::Result<()> {
|
||||
) -> table::Result<bool> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,7 +58,19 @@ impl Admin {
|
||||
header: Some(header),
|
||||
expr: Some(admin_expr::Expr::Alter(expr)),
|
||||
};
|
||||
Ok(self.do_requests(vec![expr]).await?.remove(0))
|
||||
self.do_request(expr).await
|
||||
}
|
||||
|
||||
pub async fn drop_table(&self, expr: DropTableExpr) -> Result<AdminResult> {
|
||||
let header = ExprHeader {
|
||||
version: PROTOCOL_VERSION,
|
||||
};
|
||||
let expr = AdminExpr {
|
||||
header: Some(header),
|
||||
expr: Some(admin_expr::Expr::DropTable(expr)),
|
||||
};
|
||||
|
||||
self.do_request(expr).await
|
||||
}
|
||||
|
||||
/// Invariants: the lengths of input vec (`Vec<AdminExpr>`) and output vec (`Vec<AdminResult>`) are equal.
|
||||
|
||||
@@ -73,6 +73,13 @@ pub enum Error {
|
||||
source: TableError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to drop table {}, source: {}", table_name, source))]
|
||||
DropTable {
|
||||
table_name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Table not found: {}", table_name))]
|
||||
TableNotFound { table_name: String },
|
||||
|
||||
@@ -298,6 +305,7 @@ impl ErrorExt for Error {
|
||||
Error::CreateTable { source, .. }
|
||||
| Error::GetTable { source, .. }
|
||||
| Error::AlterTable { source, .. } => source.status_code(),
|
||||
Error::DropTable { source, .. } => source.status_code(),
|
||||
|
||||
Error::Insert { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -195,6 +195,9 @@ impl GrpcAdminHandler for Instance {
|
||||
Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => {
|
||||
self.execute_create_database(create_database_expr).await
|
||||
}
|
||||
Some(admin_expr::Expr::DropTable(drop_table_expr)) => {
|
||||
self.handle_drop_table(drop_table_expr).await
|
||||
}
|
||||
other => {
|
||||
return servers::error::NotSupportedSnafu {
|
||||
feat: format!("{:?}", other),
|
||||
|
||||
@@ -107,6 +107,10 @@ impl Instance {
|
||||
let req = self.sql_handler.alter_to_request(alter_table)?;
|
||||
self.sql_handler.execute(SqlRequest::Alter(req)).await
|
||||
}
|
||||
Statement::DropTable(drop_table) => {
|
||||
let req = self.sql_handler.drop_table_to_request(drop_table);
|
||||
self.sql_handler.execute(SqlRequest::DropTable(req)).await
|
||||
}
|
||||
Statement::ShowDatabases(stmt) => {
|
||||
self.sql_handler
|
||||
.execute(SqlRequest::ShowDatabases(stmt))
|
||||
|
||||
@@ -13,13 +13,14 @@
|
||||
// limitations under the License.
|
||||
|
||||
use api::result::AdminResultBuilder;
|
||||
use api::v1::{AdminResult, AlterExpr, CreateExpr};
|
||||
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr};
|
||||
use common_error::prelude::{ErrorExt, StatusCode};
|
||||
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
|
||||
use common_query::Output;
|
||||
use common_telemetry::{error, info};
|
||||
use futures::TryFutureExt;
|
||||
use snafu::prelude::*;
|
||||
use table::requests::DropTableRequest;
|
||||
|
||||
use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu};
|
||||
use crate::instance::Instance;
|
||||
@@ -116,6 +117,26 @@ impl Instance {
|
||||
.build(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> AdminResult {
|
||||
let req = DropTableRequest {
|
||||
catalog_name: expr.catalog_name,
|
||||
schema_name: expr.schema_name,
|
||||
table_name: expr.table_name,
|
||||
};
|
||||
let result = self.sql_handler().execute(SqlRequest::DropTable(req)).await;
|
||||
match result {
|
||||
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
|
||||
.status_code(StatusCode::Success as u32)
|
||||
.mutate_result(rows as _, 0)
|
||||
.build(),
|
||||
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
|
||||
Err(err) => AdminResultBuilder::default()
|
||||
.status_code(err.status_code() as u32)
|
||||
.err_msg(err.to_string())
|
||||
.build(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_query::Output;
|
||||
use common_telemetry::error;
|
||||
use query::query_engine::QueryEngineRef;
|
||||
use query::sql::{describe_table, explain, show_databases, show_tables};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -26,10 +27,11 @@ use table::engine::{EngineContext, TableEngineRef, TableReference};
|
||||
use table::requests::*;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{self, GetTableSnafu, Result, TableNotFoundSnafu};
|
||||
use crate::error::{ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu};
|
||||
|
||||
mod alter;
|
||||
mod create;
|
||||
mod drop_table;
|
||||
mod insert;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -38,6 +40,7 @@ pub enum SqlRequest {
|
||||
CreateTable(CreateTableRequest),
|
||||
CreateDatabase(CreateDatabaseRequest),
|
||||
Alter(AlterTableRequest),
|
||||
DropTable(DropTableRequest),
|
||||
ShowDatabases(ShowDatabases),
|
||||
ShowTables(ShowTables),
|
||||
DescribeTable(DescribeTable),
|
||||
@@ -65,24 +68,29 @@ impl SqlHandler {
|
||||
}
|
||||
|
||||
pub async fn execute(&self, request: SqlRequest) -> Result<Output> {
|
||||
match request {
|
||||
let result = match request {
|
||||
SqlRequest::Insert(req) => self.insert(req).await,
|
||||
SqlRequest::CreateTable(req) => self.create_table(req).await,
|
||||
SqlRequest::CreateDatabase(req) => self.create_database(req).await,
|
||||
SqlRequest::Alter(req) => self.alter(req).await,
|
||||
SqlRequest::DropTable(req) => self.drop_table(req).await,
|
||||
SqlRequest::ShowDatabases(stmt) => {
|
||||
show_databases(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu)
|
||||
show_databases(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
|
||||
}
|
||||
SqlRequest::ShowTables(stmt) => {
|
||||
show_tables(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu)
|
||||
show_tables(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
|
||||
}
|
||||
SqlRequest::DescribeTable(stmt) => {
|
||||
describe_table(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu)
|
||||
describe_table(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
|
||||
}
|
||||
SqlRequest::Explain(stmt) => explain(stmt, self.query_engine.clone())
|
||||
.await
|
||||
.context(error::ExecuteSqlSnafu),
|
||||
.context(ExecuteSqlSnafu),
|
||||
};
|
||||
if let Err(e) = &result {
|
||||
error!("Datanode execution error: {:?}", e);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
pub(crate) fn get_table<'a>(&self, table_ref: &'a TableReference) -> Result<TableRef> {
|
||||
|
||||
71
src/datanode/src/sql/drop_table.rs
Normal file
71
src/datanode/src/sql/drop_table.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
// Copyright 2022 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use catalog::DeregisterTableRequest;
|
||||
use common_error::prelude::BoxedError;
|
||||
use common_query::Output;
|
||||
use common_telemetry::info;
|
||||
use snafu::ResultExt;
|
||||
use sql::statements::drop::DropTable;
|
||||
use table::engine::{EngineContext, TableReference};
|
||||
use table::requests::DropTableRequest;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::sql::SqlHandler;
|
||||
|
||||
impl SqlHandler {
|
||||
pub async fn drop_table(&self, req: DropTableRequest) -> Result<Output> {
|
||||
let deregister_table_req = DeregisterTableRequest {
|
||||
catalog: req.catalog_name.clone(),
|
||||
schema: req.schema_name.clone(),
|
||||
table_name: req.table_name.clone(),
|
||||
};
|
||||
|
||||
let table_reference = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
table: &req.table_name,
|
||||
};
|
||||
let table_full_name = table_reference.to_string();
|
||||
|
||||
self.catalog_manager
|
||||
.deregister_table(deregister_table_req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::DropTableSnafu {
|
||||
table_name: table_full_name.clone(),
|
||||
})?;
|
||||
|
||||
let ctx = EngineContext {};
|
||||
self.table_engine()
|
||||
.drop_table(&ctx, req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::DropTableSnafu {
|
||||
table_name: table_full_name.clone(),
|
||||
})?;
|
||||
|
||||
info!("Successfully dropped table: {}", table_full_name);
|
||||
|
||||
Ok(Output::AffectedRows(1))
|
||||
}
|
||||
|
||||
pub fn drop_table_to_request(&self, drop_table: DropTable) -> DropTableRequest {
|
||||
DropTableRequest {
|
||||
catalog_name: drop_table.catalog_name,
|
||||
schema_name: drop_table.schema_name,
|
||||
table_name: drop_table.table_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -250,6 +250,12 @@ pub enum Error {
|
||||
source: client::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to drop table, source: {}", source))]
|
||||
DropTable {
|
||||
#[snafu(backtrace)]
|
||||
source: client::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to insert values to table, source: {}", source))]
|
||||
Insert {
|
||||
#[snafu(backtrace)]
|
||||
@@ -509,19 +515,20 @@ impl ErrorExt for Error {
|
||||
Error::BumpTableId { source, .. } => source.status_code(),
|
||||
Error::SchemaNotFound { .. } => StatusCode::InvalidArguments,
|
||||
Error::CatalogNotFound { .. } => StatusCode::InvalidArguments,
|
||||
Error::CreateTable { source, .. } => source.status_code(),
|
||||
Error::AlterTable { source, .. } => source.status_code(),
|
||||
Error::Insert { source, .. } => source.status_code(),
|
||||
Error::CreateTable { source, .. }
|
||||
| Error::AlterTable { source, .. }
|
||||
| Error::DropTable { source }
|
||||
| Error::Select { source, .. }
|
||||
| Error::CreateDatabase { source, .. }
|
||||
| Error::CreateTableOnInsertion { source, .. }
|
||||
| Error::AlterTableOnInsertion { source, .. }
|
||||
| Error::Insert { source, .. } => source.status_code(),
|
||||
Error::BuildCreateExprOnInsertion { source, .. } => source.status_code(),
|
||||
Error::CreateTableOnInsertion { source, .. } => source.status_code(),
|
||||
Error::AlterTableOnInsertion { source, .. } => source.status_code(),
|
||||
Error::Select { source, .. } => source.status_code(),
|
||||
Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(),
|
||||
Error::DeserializeInsertBatch { source, .. } => source.status_code(),
|
||||
Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
|
||||
Error::ExecuteSql { source, .. } => source.status_code(),
|
||||
Error::InsertBatchToRequest { source, .. } => source.status_code(),
|
||||
Error::CreateDatabase { source, .. } => source.status_code(),
|
||||
Error::CollectRecordbatchStream { source } | Error::CreateRecordbatches { source } => {
|
||||
source.status_code()
|
||||
}
|
||||
|
||||
@@ -25,7 +25,8 @@ use api::v1::alter_expr::Kind;
|
||||
use api::v1::object_expr::Expr;
|
||||
use api::v1::{
|
||||
admin_expr, select_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column,
|
||||
CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult,
|
||||
CreateDatabaseExpr, CreateExpr, DropTableExpr, InsertExpr, ObjectExpr,
|
||||
ObjectResult as GrpcObjectResult,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use catalog::remote::MetaKvBackend;
|
||||
@@ -58,8 +59,9 @@ use crate::catalog::FrontendCatalogManager;
|
||||
use crate::datanode::DatanodeClients;
|
||||
use crate::error::{
|
||||
self, AlterTableOnInsertionSnafu, AlterTableSnafu, CatalogNotFoundSnafu, CatalogSnafu,
|
||||
CreateDatabaseSnafu, CreateTableSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu,
|
||||
MissingMetasrvOptsSnafu, Result, SchemaNotFoundSnafu, SelectSnafu,
|
||||
CreateDatabaseSnafu, CreateTableSnafu, DropTableSnafu, FindNewColumnsOnInsertionSnafu,
|
||||
InsertSnafu, MissingMetasrvOptsSnafu, Result, SchemaNotFoundSnafu, SelectSnafu,
|
||||
UnsupportedExprSnafu,
|
||||
};
|
||||
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
|
||||
use crate::frontend::FrontendOptions;
|
||||
@@ -278,6 +280,23 @@ impl Instance {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle drop table expr
|
||||
pub async fn handle_drop_table(&self, expr: DropTableExpr) -> Result<Output> {
|
||||
match self.mode {
|
||||
Mode::Standalone => self
|
||||
.admin(&expr.schema_name)
|
||||
.drop_table(expr)
|
||||
.await
|
||||
.and_then(admin_result_to_output)
|
||||
.context(DropTableSnafu),
|
||||
// TODO(ruihang): support drop table in distributed mode
|
||||
Mode::Distributed => UnsupportedExprSnafu {
|
||||
name: "Distributed DROP TABLE",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle explain expr
|
||||
pub async fn handle_explain(&self, sql: &str, explain_stmt: Explain) -> Result<Output> {
|
||||
if let Some(dist_instance) = &self.dist_instance {
|
||||
@@ -615,6 +634,17 @@ impl SqlQueryHandler for Instance {
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(server_error::ExecuteQuerySnafu { query }),
|
||||
Statement::DropTable(drop_stmt) => {
|
||||
let expr = DropTableExpr {
|
||||
catalog_name: drop_stmt.catalog_name,
|
||||
schema_name: drop_stmt.schema_name,
|
||||
table_name: drop_stmt.table_name,
|
||||
};
|
||||
self.handle_drop_table(expr)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(server_error::ExecuteQuerySnafu { query })
|
||||
}
|
||||
Statement::Explain(explain_stmt) => self
|
||||
.handle_explain(query, explain_stmt)
|
||||
.await
|
||||
@@ -726,6 +756,7 @@ fn get_schema_name(expr: &AdminExpr) -> &str {
|
||||
Some(admin_expr::Expr::Create(expr)) => expr.schema_name.as_deref(),
|
||||
Some(admin_expr::Expr::Alter(expr)) => expr.schema_name.as_deref(),
|
||||
Some(admin_expr::Expr::CreateDatabase(_)) | None => Some(DEFAULT_SCHEMA_NAME),
|
||||
Some(admin_expr::Expr::DropTable(expr)) => Some(expr.schema_name.as_ref()),
|
||||
};
|
||||
schema_name.unwrap_or(DEFAULT_SCHEMA_NAME)
|
||||
}
|
||||
|
||||
@@ -123,14 +123,14 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
|
||||
async fn drop_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
_request: DropTableRequest,
|
||||
) -> TableResult<()> {
|
||||
unimplemented!();
|
||||
request: DropTableRequest,
|
||||
) -> TableResult<bool> {
|
||||
Ok(self.inner.drop_table(request).await?)
|
||||
}
|
||||
}
|
||||
|
||||
struct MitoEngineInner<S: StorageEngine> {
|
||||
/// All tables opened by the engine.
|
||||
/// All tables opened by the engine. Map key is formatted [TableReference].
|
||||
///
|
||||
/// Writing to `tables` should also hold the `table_mutex`.
|
||||
tables: RwLock<HashMap<String, TableRef>>,
|
||||
@@ -464,6 +464,22 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.context(error::AlterTableSnafu { table_name })?;
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
/// Drop table. Returns whether a table is dropped (true) or not exist (false).
|
||||
async fn drop_table(&self, req: DropTableRequest) -> Result<bool> {
|
||||
let table_reference = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
table: &req.table_name,
|
||||
};
|
||||
// todo(ruihang): reclaim persisted data
|
||||
Ok(self
|
||||
.tables
|
||||
.write()
|
||||
.unwrap()
|
||||
.remove(&table_reference.to_string())
|
||||
.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
@@ -961,4 +977,69 @@ mod tests {
|
||||
assert_eq!(new_schema.timestamp_column(), old_schema.timestamp_column());
|
||||
assert_eq!(new_schema.version(), old_schema.version() + 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let ctx = EngineContext::default();
|
||||
|
||||
let (_engine, table_engine, table, _object_store, _dir) =
|
||||
test_util::setup_mock_engine_and_table().await;
|
||||
let engine_ctx = EngineContext {};
|
||||
|
||||
let table_info = table.table_info();
|
||||
let table_reference = TableReference {
|
||||
catalog: DEFAULT_CATALOG_NAME,
|
||||
schema: DEFAULT_SCHEMA_NAME,
|
||||
table: &table_info.name,
|
||||
};
|
||||
|
||||
let create_table_request = CreateTableRequest {
|
||||
id: 1,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
schema: table_info.meta.schema.clone(),
|
||||
create_if_not_exists: true,
|
||||
desc: None,
|
||||
primary_key_indices: Vec::default(),
|
||||
table_options: HashMap::new(),
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
|
||||
let created_table = table_engine
|
||||
.create_table(&ctx, create_table_request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table_info, created_table.table_info());
|
||||
assert!(table_engine.table_exists(&engine_ctx, &table_reference));
|
||||
|
||||
let drop_table_request = DropTableRequest {
|
||||
catalog_name: table_reference.catalog.to_string(),
|
||||
schema_name: table_reference.schema.to_string(),
|
||||
table_name: table_reference.table.to_string(),
|
||||
};
|
||||
let table_dropped = table_engine
|
||||
.drop_table(&engine_ctx, drop_table_request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(table_dropped);
|
||||
assert!(!table_engine.table_exists(&engine_ctx, &table_reference));
|
||||
|
||||
// should be able to re-create
|
||||
let request = CreateTableRequest {
|
||||
id: 2,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
schema: table_info.meta.schema.clone(),
|
||||
create_if_not_exists: false,
|
||||
desc: None,
|
||||
primary_key_indices: Vec::default(),
|
||||
table_options: HashMap::new(),
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
table_engine.create_table(&ctx, request).await.unwrap();
|
||||
assert!(table_engine.table_exists(&engine_ctx, &table_reference));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +84,8 @@ where
|
||||
| Statement::CreateTable(_)
|
||||
| Statement::CreateDatabase(_)
|
||||
| Statement::Alter(_)
|
||||
| Statement::Insert(_) => unreachable!(),
|
||||
| Statement::Insert(_)
|
||||
| Statement::DropTable(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ use crate::error::{
|
||||
self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result, SyntaxSnafu, TokenizerSnafu,
|
||||
};
|
||||
use crate::statements::describe::DescribeTable;
|
||||
use crate::statements::drop::DropTable;
|
||||
use crate::statements::explain::Explain;
|
||||
use crate::statements::show::{ShowCreateTable, ShowDatabases, ShowKind, ShowTables};
|
||||
use crate::statements::statement::Statement;
|
||||
@@ -99,6 +100,8 @@ impl<'a> ParserContext<'a> {
|
||||
|
||||
Keyword::ALTER => self.parse_alter(),
|
||||
|
||||
Keyword::DROP => self.parse_drop(),
|
||||
|
||||
// todo(hl) support more statements.
|
||||
_ => self.unsupported(self.peek_token_as_string()),
|
||||
}
|
||||
@@ -271,6 +274,36 @@ impl<'a> ParserContext<'a> {
|
||||
Ok(Statement::Explain(Explain::try_from(explain_statement)?))
|
||||
}
|
||||
|
||||
fn parse_drop(&mut self) -> Result<Statement> {
|
||||
self.parser.next_token();
|
||||
if !self.matches_keyword(Keyword::TABLE) {
|
||||
return self.unsupported(self.peek_token_as_string());
|
||||
}
|
||||
self.parser.next_token();
|
||||
|
||||
let table_ident =
|
||||
self.parser
|
||||
.parse_object_name()
|
||||
.with_context(|_| error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "a table name",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
ensure!(
|
||||
!table_ident.0.is_empty(),
|
||||
InvalidTableNameSnafu {
|
||||
name: table_ident.to_string()
|
||||
}
|
||||
);
|
||||
|
||||
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_ident)?;
|
||||
Ok(Statement::DropTable(DropTable {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
}))
|
||||
}
|
||||
|
||||
// Report unexpected token
|
||||
pub(crate) fn expected<T>(&self, expected: &str, found: Token) -> Result<T> {
|
||||
Err(ParserError::ParserError(format!(
|
||||
@@ -338,6 +371,7 @@ impl<'a> ParserContext<'a> {
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use sqlparser::ast::{Query as SpQuery, Statement as SpStatement};
|
||||
use sqlparser::dialect::GenericDialect;
|
||||
|
||||
@@ -532,4 +566,43 @@ mod tests {
|
||||
|
||||
assert_eq!(stmts[0], Statement::Explain(explain))
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_drop_table() {
|
||||
let sql = "DROP TABLE foo";
|
||||
let result = ParserContext::create_with_dialect(sql, &GenericDialect {});
|
||||
let mut stmts = result.unwrap();
|
||||
assert_eq!(
|
||||
stmts.pop().unwrap(),
|
||||
Statement::DropTable(DropTable {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "foo".to_string()
|
||||
})
|
||||
);
|
||||
|
||||
let sql = "DROP TABLE my_schema.foo";
|
||||
let result = ParserContext::create_with_dialect(sql, &GenericDialect {});
|
||||
let mut stmts = result.unwrap();
|
||||
assert_eq!(
|
||||
stmts.pop().unwrap(),
|
||||
Statement::DropTable(DropTable {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: "my_schema".to_string(),
|
||||
table_name: "foo".to_string()
|
||||
})
|
||||
);
|
||||
|
||||
let sql = "DROP TABLE my_catalog.my_schema.foo";
|
||||
let result = ParserContext::create_with_dialect(sql, &GenericDialect {});
|
||||
let mut stmts = result.unwrap();
|
||||
assert_eq!(
|
||||
stmts.pop().unwrap(),
|
||||
Statement::DropTable(DropTable {
|
||||
catalog_name: "my_catalog".to_string(),
|
||||
schema_name: "my_schema".to_string(),
|
||||
table_name: "foo".to_string()
|
||||
})
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
pub mod alter;
|
||||
pub mod create;
|
||||
pub mod describe;
|
||||
pub mod drop;
|
||||
pub mod explain;
|
||||
pub mod insert;
|
||||
pub mod query;
|
||||
|
||||
32
src/sql/src/statements/drop.rs
Normal file
32
src/sql/src/statements/drop.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
// Copyright 2022 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// DROP TABLE statement.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct DropTable {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
}
|
||||
|
||||
impl DropTable {
|
||||
/// Creates a statement for `DROP TABLE`
|
||||
pub fn new(catalog_name: String, schema_name: String, table_name: String) -> Self {
|
||||
DropTable {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,7 @@ use sqlparser::parser::ParserError;
|
||||
use crate::statements::alter::AlterTable;
|
||||
use crate::statements::create::{CreateDatabase, CreateTable};
|
||||
use crate::statements::describe::DescribeTable;
|
||||
use crate::statements::drop::DropTable;
|
||||
use crate::statements::explain::Explain;
|
||||
use crate::statements::insert::Insert;
|
||||
use crate::statements::query::Query;
|
||||
@@ -33,6 +34,8 @@ pub enum Statement {
|
||||
Insert(Box<Insert>),
|
||||
/// CREATE TABLE
|
||||
CreateTable(CreateTable),
|
||||
// DROP TABLE
|
||||
DropTable(DropTable),
|
||||
// CREATE DATABASE
|
||||
CreateDatabase(CreateDatabase),
|
||||
/// ALTER TABLE
|
||||
@@ -67,6 +70,9 @@ impl TryFrom<Statement> for SpStatement {
|
||||
Statement::DescribeTable(_) => Err(ParserError::ParserError(
|
||||
"sqlparser does not support DESCRIBE TABLE query.".to_string(),
|
||||
)),
|
||||
Statement::DropTable(_) => Err(ParserError::ParserError(
|
||||
"sqlparser does not support DROP TABLE query.".to_string(),
|
||||
)),
|
||||
Statement::Query(s) => Ok(SpStatement::Query(Box::new(s.inner))),
|
||||
Statement::Insert(i) => Ok(i.inner),
|
||||
Statement::CreateDatabase(_) | Statement::CreateTable(_) | Statement::Alter(_) => {
|
||||
|
||||
@@ -74,8 +74,8 @@ pub trait TableEngine: Send + Sync {
|
||||
/// Returns true when the given table is exists.
|
||||
fn table_exists<'a>(&self, ctx: &EngineContext, table_ref: &'a TableReference) -> bool;
|
||||
|
||||
/// Drops the given table.
|
||||
async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result<()>;
|
||||
/// Drops the given table. Return true if the table is dropped, or false if the table doesn't exist.
|
||||
async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result<bool>;
|
||||
}
|
||||
|
||||
pub type TableEngineRef = Arc<dyn TableEngine>;
|
||||
|
||||
@@ -84,4 +84,8 @@ pub enum AlterKind {
|
||||
|
||||
/// Drop table request
|
||||
#[derive(Debug)]
|
||||
pub struct DropTableRequest {}
|
||||
pub struct DropTableRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ impl TableEngine for MockTableEngine {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn drop_table(&self, _ctx: &EngineContext, _request: DropTableRequest) -> Result<()> {
|
||||
async fn drop_table(&self, _ctx: &EngineContext, _request: DropTableRequest) -> Result<bool> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,5 +56,5 @@ SELECT idc, avg(memory_util) FROM system_metrics GROUP BY idc ORDER BY idc;
|
||||
|
||||
DROP TABLE system_metrics;
|
||||
|
||||
Failed to execute, error: Datanode { code: 1001, msg: "Failed to execute sql, source: Cannot parse SQL, source: SQL statement is not supported: DROP TABLE system_metrics;, keyword: DROP" }
|
||||
MutateResult { success: 1, failure: 0 }
|
||||
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
select 1;
|
||||
|
||||
+--------------------------+
|
||||
| Int64(1), #Field, #Int64 |
|
||||
+--------------------------+
|
||||
| 1 |
|
||||
+--------------------------+
|
||||
|
||||
select 2 + 3;
|
||||
|
||||
+----------------------------------------+
|
||||
| Int64(2) Plus Int64(3), #Field, #Int64 |
|
||||
+----------------------------------------+
|
||||
| 5 |
|
||||
+----------------------------------------+
|
||||
|
||||
select 4 + 0.5;
|
||||
|
||||
+----------------------------------------------+
|
||||
| Int64(4) Plus Float64(0.5), #Field, #Float64 |
|
||||
+----------------------------------------------+
|
||||
| 4.5 |
|
||||
+----------------------------------------------+
|
||||
|
||||
select "a";
|
||||
|
||||
Failed to execute, error: Datanode { code: 3000, msg: "Failed to execute sql, source: Cannot plan SQL: SELECT \"a\", source: Error during planning: Invalid identifier '#a' for schema fields:[], metadata:{}" }
|
||||
|
||||
select "A";
|
||||
|
||||
Failed to execute, error: Datanode { code: 3000, msg: "Failed to execute sql, source: Cannot plan SQL: SELECT \"A\", source: Error during planning: Invalid identifier '#A' for schema fields:[], metadata:{}" }
|
||||
|
||||
select * where "a" = "A";
|
||||
|
||||
Failed to execute, error: Datanode { code: 3000, msg: "Failed to execute sql, source: Cannot plan SQL: SELECT * WHERE \"a\" = \"A\", source: Error during planning: Invalid identifier '#a' for schema fields:[], metadata:{}" }
|
||||
|
||||
36
tests/cases/standalone/select/dummy.result
Normal file
36
tests/cases/standalone/select/dummy.result
Normal file
@@ -0,0 +1,36 @@
|
||||
select 1;
|
||||
|
||||
+--------------------------+
|
||||
| Int64(1), #Field, #Int64 |
|
||||
+--------------------------+
|
||||
| 1 |
|
||||
+--------------------------+
|
||||
|
||||
select 2 + 3;
|
||||
|
||||
+----------------------------------------+
|
||||
| Int64(2) Plus Int64(3), #Field, #Int64 |
|
||||
+----------------------------------------+
|
||||
| 5 |
|
||||
+----------------------------------------+
|
||||
|
||||
select 4 + 0.5;
|
||||
|
||||
+----------------------------------------------+
|
||||
| Int64(4) Plus Float64(0.5), #Field, #Float64 |
|
||||
+----------------------------------------------+
|
||||
| 4.5 |
|
||||
+----------------------------------------------+
|
||||
|
||||
select "a";
|
||||
|
||||
Failed to execute, error: Datanode { code: 1003, msg: "Failed to execute query: select \"a\";, source: Failed to execute query: select \"a\";, source: Failed to select from table, source: Error occurred on the data node, code: 3000, msg: Failed to execute sql, source: Cannot plan SQL: SELECT \"a\", source: Error during planning: Invalid identifier '#a' for schema fields:[], metadata:{}" }
|
||||
|
||||
select "A";
|
||||
|
||||
Failed to execute, error: Datanode { code: 1003, msg: "Failed to execute query: select \"A\";, source: Failed to execute query: select \"A\";, source: Failed to select from table, source: Error occurred on the data node, code: 3000, msg: Failed to execute sql, source: Cannot plan SQL: SELECT \"A\", source: Error during planning: Invalid identifier '#A' for schema fields:[], metadata:{}" }
|
||||
|
||||
select * where "a" = "A";
|
||||
|
||||
Failed to execute, error: Datanode { code: 1003, msg: "Failed to execute query: select * where \"a\" = \"A\";, source: Failed to execute query: select * where \"a\" = \"A\";, source: Failed to select from table, source: Error occurred on the data node, code: 3000, msg: Failed to execute sql, source: Cannot plan SQL: SELECT * WHERE \"a\" = \"A\", source: Error during planning: Invalid identifier '#a' for schema fields:[], metadata:{}" }
|
||||
|
||||
@@ -51,13 +51,13 @@ impl Env {
|
||||
pub async fn start_standalone() -> GreptimeDB {
|
||||
let server_process = Command::new("cargo")
|
||||
.current_dir("../")
|
||||
.args(["run", "--", "standalone", "start"])
|
||||
.args(["run", "--", "standalone", "start", "-m"])
|
||||
.spawn()
|
||||
.unwrap_or_else(|_| panic!("Failed to start GreptimeDB"));
|
||||
|
||||
time::sleep(Duration::from_secs(3)).await;
|
||||
|
||||
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
|
||||
let client = Client::with_urls(vec!["127.0.0.1:4001"]);
|
||||
let db = DB::new("greptime", client.clone());
|
||||
|
||||
GreptimeDB {
|
||||
|
||||
Reference in New Issue
Block a user