mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 06:42:57 +00:00
feat: support renaming table in the catalog manger (#824)
* feat: support renaming table in the catalog manger * feat: implement rename table for local catalog manager * chore: fmt code * fix: update system catalog when renaming table in local catalog manager * chore: add instance test for rename table * chore: fix frontend test * chore: fix comment * chore: fix rename table test * fix: renaming a table with an existing name * fix: improve the system catalog's renaming process * chore: improve the code * chore: improve the comment Co-authored-by: Yingwen <realevenyag@gmail.com> * chore: improve the code * chore: fix tests * chore: fix instance_test * chore: improve the code Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
@@ -97,6 +97,9 @@ pub trait CatalogManager: CatalogList {
|
||||
/// schema registered.
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
|
||||
|
||||
/// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed.
|
||||
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool>;
|
||||
|
||||
/// Register a system table, should be called before starting the manager.
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest)
|
||||
-> error::Result<()>;
|
||||
@@ -142,6 +145,15 @@ impl Debug for RegisterTableRequest {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RenameTableRequest {
|
||||
pub catalog: String,
|
||||
pub schema: String,
|
||||
pub table_name: String,
|
||||
pub new_table_name: String,
|
||||
pub table_id: TableId,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DeregisterTableRequest {
|
||||
pub catalog: String,
|
||||
|
||||
@@ -47,7 +47,8 @@ use crate::tables::SystemCatalog;
|
||||
use crate::{
|
||||
format_full_table_name, handle_system_table_request, CatalogList, CatalogManager,
|
||||
CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest,
|
||||
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider,
|
||||
SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
|
||||
@@ -379,6 +380,45 @@ impl CatalogManager for LocalCatalogManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
|
||||
let started = self.init_lock.lock().await;
|
||||
|
||||
ensure!(
|
||||
*started,
|
||||
IllegalManagerStateSnafu {
|
||||
msg: "Catalog manager not started",
|
||||
}
|
||||
);
|
||||
|
||||
let catalog_name = &request.catalog;
|
||||
let schema_name = &request.schema;
|
||||
|
||||
let catalog = self
|
||||
.catalogs
|
||||
.catalog(catalog_name)?
|
||||
.context(CatalogNotFoundSnafu { catalog_name })?;
|
||||
|
||||
let schema = catalog
|
||||
.schema(schema_name)?
|
||||
.with_context(|| SchemaNotFoundSnafu {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
})?;
|
||||
|
||||
// rename table in system catalog
|
||||
self.system
|
||||
.register_table(
|
||||
catalog_name.clone(),
|
||||
schema_name.clone(),
|
||||
request.new_table_name.clone(),
|
||||
request.table_id,
|
||||
)
|
||||
.await?;
|
||||
Ok(schema
|
||||
.rename_table(&request.table_name, request.new_table_name)
|
||||
.is_ok())
|
||||
}
|
||||
|
||||
async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
|
||||
UnimplementedSnafu {
|
||||
operation: "deregister table",
|
||||
|
||||
@@ -25,11 +25,14 @@ use table::metadata::TableId;
|
||||
use table::table::TableIdProvider;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::schema::SchemaProvider;
|
||||
use crate::{
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
|
||||
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef,
|
||||
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
|
||||
SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// Simple in-memory list of catalogs
|
||||
@@ -89,6 +92,25 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
.map(|v| v.is_none())
|
||||
}
|
||||
|
||||
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
|
||||
let catalogs = self.catalogs.write().unwrap();
|
||||
let catalog = catalogs
|
||||
.get(&request.catalog)
|
||||
.context(CatalogNotFoundSnafu {
|
||||
catalog_name: &request.catalog,
|
||||
})?
|
||||
.clone();
|
||||
let schema = catalog
|
||||
.schema(&request.schema)?
|
||||
.with_context(|| SchemaNotFoundSnafu {
|
||||
catalog: &request.catalog,
|
||||
schema: &request.schema,
|
||||
})?;
|
||||
Ok(schema
|
||||
.rename_table(&request.table_name, request.new_table_name)
|
||||
.is_ok())
|
||||
}
|
||||
|
||||
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
|
||||
let catalogs = self.catalogs.write().unwrap();
|
||||
let catalog = catalogs
|
||||
@@ -290,6 +312,20 @@ impl SchemaProvider for MemorySchemaProvider {
|
||||
}
|
||||
}
|
||||
|
||||
fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
|
||||
let mut tables = self.tables.write().unwrap();
|
||||
if tables.get(name).is_some() {
|
||||
let table = tables.remove(name).unwrap();
|
||||
tables.insert(new_name, table.clone());
|
||||
Ok(table)
|
||||
} else {
|
||||
TableNotFoundSnafu {
|
||||
table_info: name.to_string(),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
}
|
||||
|
||||
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
|
||||
let mut tables = self.tables.write().unwrap();
|
||||
Ok(tables.remove(name))
|
||||
@@ -354,6 +390,85 @@ mod tests {
|
||||
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mem_provider_rename_table() {
|
||||
let provider = MemorySchemaProvider::new();
|
||||
let table_name = "num";
|
||||
assert!(!provider.table_exist(table_name).unwrap());
|
||||
let test_table: TableRef = Arc::new(NumbersTable::default());
|
||||
// register test table
|
||||
assert!(provider
|
||||
.register_table(table_name.to_string(), test_table.clone())
|
||||
.unwrap()
|
||||
.is_none());
|
||||
assert!(provider.table_exist(table_name).unwrap());
|
||||
|
||||
// rename test table
|
||||
let new_table_name = "numbers";
|
||||
provider
|
||||
.rename_table(table_name, new_table_name.to_string())
|
||||
.unwrap();
|
||||
|
||||
// test old table name not exist
|
||||
assert!(!provider.table_exist(table_name).unwrap());
|
||||
assert!(provider.deregister_table(table_name).unwrap().is_none());
|
||||
|
||||
// test new table name exists
|
||||
assert!(provider.table_exist(new_table_name).unwrap());
|
||||
let registered_table = provider.table(new_table_name).unwrap().unwrap();
|
||||
assert_eq!(
|
||||
registered_table.table_info().ident.table_id,
|
||||
test_table.table_info().ident.table_id
|
||||
);
|
||||
|
||||
let other_table = Arc::new(NumbersTable::new(2));
|
||||
let result = provider.register_table(new_table_name.to_string(), other_table);
|
||||
let err = result.err().unwrap();
|
||||
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_catalog_rename_table() {
|
||||
let catalog = MemoryCatalogManager::default();
|
||||
let schema = catalog
|
||||
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// register table
|
||||
let table_name = "num";
|
||||
let table_id = 2333;
|
||||
let table: TableRef = Arc::new(NumbersTable::new(table_id));
|
||||
let register_table_req = RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id,
|
||||
table,
|
||||
};
|
||||
assert!(catalog.register_table(register_table_req).await.unwrap());
|
||||
assert!(schema.table_exist(table_name).unwrap());
|
||||
|
||||
// rename table
|
||||
let new_table_name = "numbers";
|
||||
let rename_table_req = RenameTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
new_table_name: new_table_name.to_string(),
|
||||
table_id,
|
||||
};
|
||||
assert!(catalog.rename_table(rename_table_req).await.unwrap());
|
||||
assert!(!schema.table_exist(table_name).unwrap());
|
||||
assert!(schema.table_exist(new_table_name).unwrap());
|
||||
|
||||
let registered_table = catalog
|
||||
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(registered_table.table_info().ident.table_id, table_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_register_if_absent() {
|
||||
let list = MemoryCatalogManager::default();
|
||||
|
||||
@@ -43,7 +43,7 @@ use crate::remote::{Kv, KvBackendRef};
|
||||
use crate::{
|
||||
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
|
||||
DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
|
||||
RegisterTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// Catalog manager based on metasrv.
|
||||
@@ -449,6 +449,13 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn rename_table(&self, _request: RenameTableRequest) -> Result<bool> {
|
||||
UnimplementedSnafu {
|
||||
operation: "rename table",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
|
||||
let mut requests = self.system_table_requests.lock().await;
|
||||
requests.push(request);
|
||||
@@ -739,6 +746,13 @@ impl SchemaProvider for RemoteSchemaProvider {
|
||||
prev
|
||||
}
|
||||
|
||||
fn rename_table(&self, _name: &str, _new_name: String) -> Result<TableRef> {
|
||||
UnimplementedSnafu {
|
||||
operation: "rename table",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
|
||||
let table_name = name.to_string();
|
||||
let table_key = self.build_regional_table_key(&table_name).to_string();
|
||||
|
||||
@@ -35,6 +35,10 @@ pub trait SchemaProvider: Sync + Send {
|
||||
/// If a table of the same name existed before, it returns "Table already exists" error.
|
||||
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>>;
|
||||
|
||||
/// If supported by the implementation, renames an existing table from this schema and returns it.
|
||||
/// If no table of that name exists, returns "Table not found" error.
|
||||
fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef>;
|
||||
|
||||
/// If supported by the implementation, removes an existing table from this schema and returns it.
|
||||
/// If no table of that name exists, returns Ok(None).
|
||||
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>>;
|
||||
|
||||
@@ -186,11 +186,23 @@ fn build_system_catalog_schema() -> Schema {
|
||||
SchemaBuilder::try_from(cols).unwrap().build().unwrap()
|
||||
}
|
||||
|
||||
pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest {
|
||||
/// Formats key string for table entry in system catalog
|
||||
#[inline]
|
||||
pub fn format_table_entry_key(catalog: &str, schema: &str, table_id: TableId) -> String {
|
||||
format!("{catalog}.{schema}.{table_id}")
|
||||
}
|
||||
|
||||
pub fn build_table_insert_request(
|
||||
catalog: String,
|
||||
schema: String,
|
||||
table_name: String,
|
||||
table_id: TableId,
|
||||
) -> InsertRequest {
|
||||
let entry_key = format_table_entry_key(&catalog, &schema, table_id);
|
||||
build_insert_request(
|
||||
EntryType::Table,
|
||||
full_table_name.as_bytes(),
|
||||
serde_json::to_string(&TableEntryValue { table_id })
|
||||
entry_key.as_bytes(),
|
||||
serde_json::to_string(&TableEntryValue { table_name })
|
||||
.unwrap()
|
||||
.as_bytes(),
|
||||
)
|
||||
@@ -285,8 +297,8 @@ pub fn decode_system_catalog(
|
||||
}
|
||||
|
||||
EntryType::Table => {
|
||||
// As for table entry, the key is a string with format: `<catalog_name>.<schema_name>.<table_name>`
|
||||
// and the value is a JSON string with format: `{"table_id": <table_id>}`
|
||||
// As for table entry, the key is a string with format: `<catalog_name>.<schema_name>.<table_id>`
|
||||
// and the value is a JSON string with format: `{"table_name": <table_name>}`
|
||||
let table_parts = key.split('.').collect::<Vec<_>>();
|
||||
ensure!(
|
||||
table_parts.len() >= 3,
|
||||
@@ -298,11 +310,12 @@ pub fn decode_system_catalog(
|
||||
debug!("Table meta value: {}", String::from_utf8_lossy(value));
|
||||
let table_meta: TableEntryValue =
|
||||
serde_json::from_slice(value).context(ValueDeserializeSnafu)?;
|
||||
let table_id = table_parts[2].parse::<TableId>().unwrap();
|
||||
Ok(Entry::Table(TableEntry {
|
||||
catalog_name: table_parts[0].to_string(),
|
||||
schema_name: table_parts[1].to_string(),
|
||||
table_name: table_parts[2].to_string(),
|
||||
table_id: table_meta.table_id,
|
||||
table_name: table_meta.table_name,
|
||||
table_id,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -362,7 +375,7 @@ pub struct TableEntry {
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct TableEntryValue {
|
||||
pub table_id: TableId,
|
||||
pub table_name: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -415,8 +428,8 @@ mod tests {
|
||||
pub fn test_decode_table() {
|
||||
let entry = decode_system_catalog(
|
||||
Some(EntryType::Table as u8),
|
||||
Some("some_catalog.some_schema.some_table".as_bytes()),
|
||||
Some("{\"table_id\":42}".as_bytes()),
|
||||
Some("some_catalog.some_schema.42".as_bytes()),
|
||||
Some("{\"table_name\":\"some_table\"}".as_bytes()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -435,7 +448,7 @@ mod tests {
|
||||
pub fn test_decode_mismatch() {
|
||||
decode_system_catalog(
|
||||
Some(EntryType::Table as u8),
|
||||
Some("some_catalog.some_schema.some_table".as_bytes()),
|
||||
Some("some_catalog.some_schema.42".as_bytes()),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -40,9 +40,7 @@ use table::{Table, TableRef};
|
||||
|
||||
use crate::error::{Error, InsertCatalogRecordSnafu};
|
||||
use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable};
|
||||
use crate::{
|
||||
format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef};
|
||||
|
||||
/// Tables holds all tables created by user.
|
||||
pub struct Tables {
|
||||
@@ -233,6 +231,10 @@ impl SchemaProvider for InformationSchema {
|
||||
panic!("System catalog & schema does not support register table")
|
||||
}
|
||||
|
||||
fn rename_table(&self, _name: &str, _new_name: String) -> crate::error::Result<TableRef> {
|
||||
unimplemented!("System catalog & schema does not support rename table")
|
||||
}
|
||||
|
||||
fn deregister_table(&self, _name: &str) -> crate::error::Result<Option<TableRef>> {
|
||||
panic!("System catalog & schema does not support deregister table")
|
||||
}
|
||||
@@ -269,8 +271,7 @@ impl SystemCatalog {
|
||||
table_name: String,
|
||||
table_id: TableId,
|
||||
) -> crate::error::Result<usize> {
|
||||
let full_table_name = format_full_table_name(&catalog, &schema, &table_name);
|
||||
let request = build_table_insert_request(full_table_name, table_id);
|
||||
let request = build_table_insert_request(catalog, schema, table_name, table_id);
|
||||
self.information_schema
|
||||
.system
|
||||
.insert(request)
|
||||
|
||||
@@ -17,7 +17,7 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::local::LocalCatalogManager;
|
||||
use catalog::{CatalogManager, RegisterTableRequest};
|
||||
use catalog::{CatalogManager, RegisterTableRequest, RenameTableRequest};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_telemetry::{error, info};
|
||||
use mito::config::EngineConfig;
|
||||
@@ -38,6 +38,44 @@ mod tests {
|
||||
Ok(catalog_manager)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rename_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let catalog_manager = create_local_catalog_manager().await.unwrap();
|
||||
// register table
|
||||
let table_name = "test_table";
|
||||
let table_id = 42;
|
||||
let table = Arc::new(NumbersTable::new(table_id));
|
||||
let request = RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id,
|
||||
table: table.clone(),
|
||||
};
|
||||
assert!(catalog_manager.register_table(request).await.unwrap());
|
||||
|
||||
// rename table
|
||||
let new_table_name = "table_t";
|
||||
let rename_table_req = RenameTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
new_table_name: new_table_name.to_string(),
|
||||
table_id,
|
||||
};
|
||||
assert!(catalog_manager
|
||||
.rename_table(rename_table_req)
|
||||
.await
|
||||
.unwrap());
|
||||
|
||||
let registered_table = catalog_manager
|
||||
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(registered_table.table_info().ident.table_id, table_id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_duplicate_register() {
|
||||
let catalog_manager = create_local_catalog_manager().await.unwrap();
|
||||
|
||||
@@ -193,6 +193,12 @@ pub enum Error {
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to rename table, source: {}", source))]
|
||||
RenameTable {
|
||||
#[snafu(backtrace)]
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to register a new schema, source: {}", source))]
|
||||
RegisterSchema {
|
||||
#[snafu(backtrace)]
|
||||
@@ -341,6 +347,7 @@ impl ErrorExt for Error {
|
||||
| Error::StartGrpc { .. }
|
||||
| Error::CreateDir { .. }
|
||||
| Error::InsertSystemCatalog { .. }
|
||||
| Error::RenameTable { .. }
|
||||
| Error::RegisterSchema { .. }
|
||||
| Error::Catalog { .. }
|
||||
| Error::MissingRequiredField { .. }
|
||||
|
||||
@@ -16,18 +16,21 @@ use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::remote::MetaKvBackend;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use meta_client::client::{MetaClient, MetaClientBuilder};
|
||||
use meta_srv::mocks::MockInfo;
|
||||
use mito::config::EngineConfig as TableEngineConfig;
|
||||
use query::QueryEngineFactory;
|
||||
use servers::Mode;
|
||||
use snafu::ResultExt;
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use table::metadata::TableId;
|
||||
use table::table::TableIdProvider;
|
||||
|
||||
use crate::datanode::DatanodeOptions;
|
||||
use crate::error::Result;
|
||||
use crate::error::{CatalogSnafu, Result};
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::instance::{create_log_store, new_object_store, DefaultEngine, Instance};
|
||||
use crate::script::ScriptExecutor;
|
||||
@@ -53,16 +56,29 @@ impl Instance {
|
||||
object_store,
|
||||
));
|
||||
|
||||
// create remote catalog manager
|
||||
let catalog_manager = Arc::new(catalog::remote::RemoteCatalogManager::new(
|
||||
table_engine.clone(),
|
||||
opts.node_id.unwrap_or(42),
|
||||
Arc::new(MetaKvBackend {
|
||||
client: meta_client.clone(),
|
||||
}),
|
||||
));
|
||||
|
||||
let factory = QueryEngineFactory::new(catalog_manager.clone());
|
||||
// By default, catalog manager and factory are created in standalone mode
|
||||
let (catalog_manager, factory) = match opts.mode {
|
||||
Mode::Standalone => {
|
||||
let catalog = Arc::new(
|
||||
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
|
||||
.await
|
||||
.context(CatalogSnafu)?,
|
||||
);
|
||||
let factory = QueryEngineFactory::new(catalog.clone());
|
||||
(catalog as CatalogManagerRef, factory)
|
||||
}
|
||||
Mode::Distributed => {
|
||||
let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new(
|
||||
table_engine.clone(),
|
||||
opts.node_id.unwrap_or(42),
|
||||
Arc::new(MetaKvBackend {
|
||||
client: meta_client.clone(),
|
||||
}),
|
||||
));
|
||||
let factory = QueryEngineFactory::new(catalog.clone());
|
||||
(catalog as CatalogManagerRef, factory)
|
||||
}
|
||||
};
|
||||
let query_engine = factory.query_engine();
|
||||
let script_executor =
|
||||
ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?;
|
||||
|
||||
@@ -208,9 +208,15 @@ mod tests {
|
||||
) -> catalog::error::Result<Option<TableRef>> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn rename_table(&self, _name: &str, _new_name: String) -> catalog::error::Result<TableRef> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn deregister_table(&self, _name: &str) -> catalog::error::Result<Option<TableRef>> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn table_exist(&self, name: &str) -> catalog::error::Result<bool> {
|
||||
Ok(name == "demo")
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use catalog::RenameTableRequest;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_query::Output;
|
||||
use snafu::prelude::*;
|
||||
@@ -28,11 +29,11 @@ impl SqlHandler {
|
||||
let ctx = EngineContext {};
|
||||
let catalog_name = req.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
|
||||
let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
|
||||
let table_name = &req.table_name.to_string();
|
||||
let table_name = req.table_name.clone();
|
||||
let table_ref = TableReference {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table: table_name,
|
||||
table: &table_name,
|
||||
};
|
||||
|
||||
let full_table_name = table_ref.to_string();
|
||||
@@ -43,12 +44,28 @@ impl SqlHandler {
|
||||
table_name: &full_table_name,
|
||||
}
|
||||
);
|
||||
self.table_engine
|
||||
.alter_table(&ctx, req)
|
||||
.await
|
||||
.context(error::AlterTableSnafu {
|
||||
table_name: full_table_name,
|
||||
})?;
|
||||
let is_rename = req.is_rename_table();
|
||||
let table =
|
||||
self.table_engine
|
||||
.alter_table(&ctx, req)
|
||||
.await
|
||||
.context(error::AlterTableSnafu {
|
||||
table_name: full_table_name,
|
||||
})?;
|
||||
if is_rename {
|
||||
let table_info = &table.table_info();
|
||||
let rename_table_req = RenameTableRequest {
|
||||
catalog: table_info.catalog_name.clone(),
|
||||
schema: table_info.schema_name.clone(),
|
||||
table_name,
|
||||
new_table_name: table_info.name.clone(),
|
||||
table_id: table_info.ident.table_id,
|
||||
};
|
||||
self.catalog_manager
|
||||
.rename_table(rename_table_req)
|
||||
.await
|
||||
.context(error::RenameTableSnafu)?;
|
||||
}
|
||||
// Tried in MySQL, it really prints "Affected Rows: 0".
|
||||
Ok(Output::AffectedRows(0))
|
||||
}
|
||||
|
||||
@@ -342,6 +342,58 @@ pub async fn test_execute_create() {
|
||||
assert!(matches!(output, Output::AffectedRows(0)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rename_table() {
|
||||
let instance = MockInstance::new("test_rename_table_local").await;
|
||||
|
||||
let output = execute_sql(&instance, "create database db").await;
|
||||
assert!(matches!(output, Output::AffectedRows(1)));
|
||||
|
||||
let output = execute_sql_in_db(
|
||||
&instance,
|
||||
"create table demo(host string, cpu double, memory double, ts timestamp, time index(ts))",
|
||||
"db",
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(output, Output::AffectedRows(0)));
|
||||
|
||||
// make sure table insertion is ok before altering table name
|
||||
let output = execute_sql_in_db(
|
||||
&instance,
|
||||
"insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000), ('host2', 2.2, 200, 2000)",
|
||||
"db",
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(output, Output::AffectedRows(2)));
|
||||
|
||||
// rename table
|
||||
let output = execute_sql_in_db(&instance, "alter table demo rename test_table", "db").await;
|
||||
assert!(matches!(output, Output::AffectedRows(0)));
|
||||
|
||||
let output = execute_sql_in_db(&instance, "show tables", "db").await;
|
||||
let expect = "\
|
||||
+------------+
|
||||
| Tables |
|
||||
+------------+
|
||||
| test_table |
|
||||
+------------+\
|
||||
"
|
||||
.to_string();
|
||||
check_output_stream(output, expect).await;
|
||||
|
||||
let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await;
|
||||
let expected = "\
|
||||
+-------+-----+--------+---------------------+
|
||||
| host | cpu | memory | ts |
|
||||
+-------+-----+--------+---------------------+
|
||||
| host1 | 1.1 | 100 | 1970-01-01T00:00:01 |
|
||||
| host2 | 2.2 | 200 | 1970-01-01T00:00:02 |
|
||||
+-------+-----+--------+---------------------+\
|
||||
"
|
||||
.to_string();
|
||||
check_output_stream(output, expected).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_alter_table() {
|
||||
let instance = setup_test_instance("test_alter_table").await;
|
||||
|
||||
@@ -24,8 +24,8 @@ use catalog::helper::{
|
||||
use catalog::remote::{Kv, KvBackendRef};
|
||||
use catalog::{
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
|
||||
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider,
|
||||
SchemaProviderRef,
|
||||
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
|
||||
SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use meta_client::rpc::TableName;
|
||||
@@ -97,6 +97,10 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result<bool> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn register_system_table(
|
||||
&self,
|
||||
_request: RegisterSystemTableRequest,
|
||||
@@ -322,6 +326,10 @@ impl SchemaProvider for FrontendSchemaProvider {
|
||||
unimplemented!("Frontend schema provider does not support register table")
|
||||
}
|
||||
|
||||
fn rename_table(&self, _name: &str, _new_name: String) -> catalog_err::Result<TableRef> {
|
||||
unimplemented!("Frontend schema provider does not support rename table")
|
||||
}
|
||||
|
||||
fn deregister_table(&self, _name: &str) -> catalog::error::Result<Option<TableRef>> {
|
||||
unimplemented!("Frontend schema provider does not support deregister table")
|
||||
}
|
||||
|
||||
@@ -270,7 +270,7 @@ impl DistInstance {
|
||||
let mut context = AlterContext::with_capacity(1);
|
||||
context.insert(expr);
|
||||
|
||||
table.alter(context, request).await.context(TableSnafu)?;
|
||||
table.alter(context, &request).await.context(TableSnafu)?;
|
||||
|
||||
Ok(Output::AffectedRows(0))
|
||||
}
|
||||
|
||||
@@ -163,7 +163,7 @@ impl Table for DistTable {
|
||||
Ok(FilterPushDownType::Inexact)
|
||||
}
|
||||
|
||||
async fn alter(&self, context: AlterContext, request: AlterTableRequest) -> table::Result<()> {
|
||||
async fn alter(&self, context: AlterContext, request: &AlterTableRequest) -> table::Result<()> {
|
||||
self.handle_alter(context, request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
@@ -414,7 +414,7 @@ impl DistTable {
|
||||
.context(CatalogSnafu)
|
||||
}
|
||||
|
||||
async fn handle_alter(&self, context: AlterContext, request: AlterTableRequest) -> Result<()> {
|
||||
async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> {
|
||||
let alter_expr = context
|
||||
.get::<AlterExpr>()
|
||||
.context(ContextValueNotFoundSnafu { key: "AlterExpr" })?;
|
||||
|
||||
@@ -170,6 +170,7 @@ async fn create_distributed_datanode(
|
||||
storage: ObjectStoreConfig::File {
|
||||
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
},
|
||||
mode: Mode::Distributed,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -29,7 +29,9 @@ use store_api::storage::{
|
||||
};
|
||||
use table::engine::{EngineContext, TableEngine, TableReference};
|
||||
use table::metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion};
|
||||
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
|
||||
use table::requests::{
|
||||
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
|
||||
};
|
||||
use table::table::{AlterContext, TableRef};
|
||||
use table::{error as table_error, Result as TableResult, Table};
|
||||
use tokio::sync::Mutex;
|
||||
@@ -491,7 +493,22 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
|
||||
let table_name = &req.table_name.clone();
|
||||
|
||||
let table_ref = TableReference {
|
||||
if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
|
||||
let table_ref = TableReference {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table: new_table_name,
|
||||
};
|
||||
|
||||
if self.get_table(&table_ref).is_some() {
|
||||
return TableExistsSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
|
||||
let mut table_ref = TableReference {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table: table_name,
|
||||
@@ -502,9 +519,17 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
|
||||
logging::info!("start altering table {} with request {:?}", table_name, req);
|
||||
table
|
||||
.alter(AlterContext::new(), req)
|
||||
.alter(AlterContext::new(), &req)
|
||||
.await
|
||||
.context(error::AlterTableSnafu { table_name })?;
|
||||
|
||||
if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
|
||||
table_ref.table = new_table_name.as_str();
|
||||
let full_table_name = table_ref.to_string();
|
||||
let mut tables = self.tables.write().unwrap();
|
||||
tables.remove(&full_table_name);
|
||||
tables.insert(full_table_name, table.clone());
|
||||
}
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
@@ -556,7 +581,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::table::test_util;
|
||||
use crate::table::test_util::{new_insert_request, MockRegion, TABLE_NAME};
|
||||
use crate::table::test_util::{new_insert_request, schema_for_test, MockRegion, TABLE_NAME};
|
||||
|
||||
async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) {
|
||||
let table_name = "test_default_constraint";
|
||||
@@ -1070,8 +1095,42 @@ mod tests {
|
||||
async fn test_alter_rename_table() {
|
||||
let (engine, table_engine, _table, object_store, _dir) =
|
||||
test_util::setup_mock_engine_and_table().await;
|
||||
let ctx = EngineContext::default();
|
||||
|
||||
let new_table_name = "table_t";
|
||||
// register another table
|
||||
let another_name = "another_table";
|
||||
let req = CreateTableRequest {
|
||||
id: 1024,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: another_name.to_string(),
|
||||
desc: Some("another test table".to_string()),
|
||||
schema: Arc::new(schema_for_test()),
|
||||
region_numbers: vec![0],
|
||||
primary_key_indices: vec![0],
|
||||
create_if_not_exists: true,
|
||||
table_options: HashMap::new(),
|
||||
};
|
||||
table_engine
|
||||
.create_table(&ctx, req)
|
||||
.await
|
||||
.expect("create table must succeed");
|
||||
// test renaming a table with an existing name.
|
||||
let req = AlterTableRequest {
|
||||
catalog_name: None,
|
||||
schema_name: None,
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
alter_kind: AlterKind::RenameTable {
|
||||
new_table_name: another_name.to_string(),
|
||||
},
|
||||
};
|
||||
let err = table_engine.alter_table(&ctx, req).await.err().unwrap();
|
||||
assert!(
|
||||
err.to_string().contains("Table already exists"),
|
||||
"Unexpected error: {err}"
|
||||
);
|
||||
|
||||
let new_table_name = "test_table";
|
||||
// test rename table
|
||||
let req = AlterTableRequest {
|
||||
catalog_name: None,
|
||||
@@ -1081,7 +1140,6 @@ mod tests {
|
||||
new_table_name: new_table_name.to_string(),
|
||||
},
|
||||
};
|
||||
let ctx = EngineContext::default();
|
||||
let table = table_engine.alter_table(&ctx, req).await.unwrap();
|
||||
|
||||
assert_eq!(table.table_info().name, new_table_name);
|
||||
|
||||
@@ -168,7 +168,7 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
}
|
||||
|
||||
/// Alter table changes the schemas of the table.
|
||||
async fn alter(&self, _context: AlterContext, req: AlterTableRequest) -> TableResult<()> {
|
||||
async fn alter(&self, _context: AlterContext, req: &AlterTableRequest) -> TableResult<()> {
|
||||
let _lock = self.alter_lock.lock().await;
|
||||
|
||||
let table_info = self.table_info();
|
||||
|
||||
@@ -231,6 +231,10 @@ impl SchemaProvider for SchemaProviderAdapter {
|
||||
.map(|_| table))
|
||||
}
|
||||
|
||||
fn rename_table(&self, _name: &str, _new_name: String) -> catalog_error::Result<TableRef> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn deregister_table(&self, name: &str) -> catalog::error::Result<Option<TableRef>> {
|
||||
self.df_schema_provider
|
||||
.deregister_table(name)
|
||||
|
||||
@@ -70,14 +70,20 @@ pub struct AlterTableRequest {
|
||||
pub alter_kind: AlterKind,
|
||||
}
|
||||
|
||||
impl AlterTableRequest {
|
||||
pub fn is_rename_table(&self) -> bool {
|
||||
matches!(self.alter_kind, AlterKind::RenameTable { .. })
|
||||
}
|
||||
}
|
||||
|
||||
/// Add column request
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AddColumnRequest {
|
||||
pub column_schema: ColumnSchema,
|
||||
pub is_key: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AlterKind {
|
||||
AddColumns { columns: Vec<AddColumnRequest> },
|
||||
DropColumns { names: Vec<String> },
|
||||
|
||||
@@ -77,7 +77,7 @@ pub trait Table: Send + Sync {
|
||||
}
|
||||
|
||||
/// Alter table.
|
||||
async fn alter(&self, _context: AlterContext, _request: AlterTableRequest) -> Result<()> {
|
||||
async fn alter(&self, _context: AlterContext, _request: &AlterTableRequest) -> Result<()> {
|
||||
UnsupportedSnafu {
|
||||
operation: "ALTER TABLE",
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user