mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-21 15:30:40 +00:00
refactor!: Uses table id to locate tables in table engines (#1817)
* refactor: add table_id to get_table()/table_exists() * refactor: Add table_id to alter table request * refactor: Add table id to DropTableRequest * refactor: add table id to DropTableRequest * refactor: Use table id as key for the tables map * refactor: use table id as file engine's map key * refactor: Remove table reference from engine's get_table/table_exists * style: remove unused imports * feat!: Add table id to TableRegionalValue * style: fix cilppy * chore: add comments and logs
This commit is contained in:
@@ -201,6 +201,9 @@ impl TableRegionalKey {
|
||||
/// region ids allocated by metasrv.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TableRegionalValue {
|
||||
// We can remove the `Option` from the table id once all regional values
|
||||
// stored in meta have table ids.
|
||||
pub table_id: Option<TableId>,
|
||||
pub version: TableVersion,
|
||||
pub regions_ids: Vec<u32>,
|
||||
pub engine_name: Option<String>,
|
||||
|
||||
@@ -984,21 +984,29 @@ impl SchemaProvider for RemoteSchemaProvider {
|
||||
.get(key.as_bytes())
|
||||
.await?
|
||||
.map(|Kv(_, v)| {
|
||||
let TableRegionalValue { engine_name, .. } =
|
||||
TableRegionalValue::parse(String::from_utf8_lossy(&v))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
let reference = TableReference {
|
||||
catalog: &self.catalog_name,
|
||||
schema: &self.schema_name,
|
||||
table: name,
|
||||
let TableRegionalValue {
|
||||
table_id,
|
||||
engine_name,
|
||||
..
|
||||
} = TableRegionalValue::parse(String::from_utf8_lossy(&v))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
|
||||
let Some(table_id) = table_id else {
|
||||
warn!("Cannot find table id for {key}, the value has an old format");
|
||||
return Ok(None);
|
||||
};
|
||||
let engine_name = engine_name.as_deref().unwrap_or(MITO_ENGINE);
|
||||
let engine = self
|
||||
.engine_manager
|
||||
.engine(engine_name)
|
||||
.context(TableEngineNotFoundSnafu { engine_name })?;
|
||||
let reference = TableReference {
|
||||
catalog: &self.catalog_name,
|
||||
schema: &self.schema_name,
|
||||
table: name,
|
||||
};
|
||||
let table = engine
|
||||
.get_table(&EngineContext {}, &reference)
|
||||
.get_table(&EngineContext {}, table_id)
|
||||
.with_context(|_| OpenTableSnafu {
|
||||
table_info: reference.to_string(),
|
||||
})?;
|
||||
@@ -1011,9 +1019,12 @@ impl SchemaProvider for RemoteSchemaProvider {
|
||||
}
|
||||
|
||||
async fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
|
||||
// Currently, initiate_tables() always call this method to register the table to the schema thus we
|
||||
// always update the region value.
|
||||
let table_info = table.table_info();
|
||||
let table_version = table_info.ident.version;
|
||||
let table_value = TableRegionalValue {
|
||||
table_id: Some(table_info.ident.table_id),
|
||||
version: table_version,
|
||||
regions_ids: table.table_info().meta.region_numbers.clone(),
|
||||
engine_name: Some(table_info.meta.engine.clone()),
|
||||
@@ -1061,25 +1072,27 @@ impl SchemaProvider for RemoteSchemaProvider {
|
||||
.get(table_key.as_bytes())
|
||||
.await?
|
||||
.map(|Kv(_, v)| {
|
||||
let TableRegionalValue { engine_name, .. } =
|
||||
TableRegionalValue::parse(String::from_utf8_lossy(&v))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
Ok(engine_name)
|
||||
let TableRegionalValue {
|
||||
table_id,
|
||||
engine_name,
|
||||
..
|
||||
} = TableRegionalValue::parse(String::from_utf8_lossy(&v))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
Ok(engine_name.and_then(|name| table_id.map(|id| (name, id))))
|
||||
})
|
||||
.transpose()?
|
||||
.flatten();
|
||||
|
||||
let engine_name = engine_opt.as_deref().unwrap_or_else(|| {
|
||||
warn!("Cannot find table engine name for {table_key}");
|
||||
MITO_ENGINE
|
||||
});
|
||||
|
||||
self.backend.delete(table_key.as_bytes()).await?;
|
||||
debug!(
|
||||
"Successfully deleted catalog table entry, key: {}",
|
||||
table_key
|
||||
);
|
||||
|
||||
let Some((engine_name, table_id)) = engine_opt else {
|
||||
warn!("Cannot find table id and engine name for {table_key}");
|
||||
return Ok(None);
|
||||
};
|
||||
let reference = TableReference {
|
||||
catalog: &self.catalog_name,
|
||||
schema: &self.schema_name,
|
||||
@@ -1088,9 +1101,9 @@ impl SchemaProvider for RemoteSchemaProvider {
|
||||
// deregistering table does not necessarily mean dropping the table
|
||||
let table = self
|
||||
.engine_manager
|
||||
.engine(engine_name)
|
||||
.engine(&engine_name)
|
||||
.context(TableEngineNotFoundSnafu { engine_name })?
|
||||
.get_table(&EngineContext {}, &reference)
|
||||
.get_table(&EngineContext {}, table_id)
|
||||
.with_context(|_| OpenTableSnafu {
|
||||
table_info: reference.to_string(),
|
||||
})?;
|
||||
|
||||
@@ -16,8 +16,7 @@ use std::any::Any;
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, RwLock as StdRwLock};
|
||||
|
||||
use async_stream::stream;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
@@ -27,7 +26,7 @@ use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::vectors::StringVector;
|
||||
use serde::Serializer;
|
||||
use table::engine::{CloseTableResult, EngineContext, TableEngine, TableReference};
|
||||
use table::engine::{CloseTableResult, EngineContext, TableEngine};
|
||||
use table::metadata::TableId;
|
||||
use table::requests::{
|
||||
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
|
||||
@@ -167,7 +166,7 @@ impl KvBackend for MockKvBackend {
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MockTableEngine {
|
||||
tables: RwLock<HashMap<String, TableRef>>,
|
||||
tables: StdRwLock<HashMap<TableId, TableRef>>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -182,21 +181,8 @@ impl TableEngine for MockTableEngine {
|
||||
_ctx: &EngineContext,
|
||||
request: CreateTableRequest,
|
||||
) -> table::Result<TableRef> {
|
||||
let table_name = request.table_name.clone();
|
||||
let catalog_name = request.catalog_name.clone();
|
||||
let schema_name = request.schema_name.clone();
|
||||
let table_full_name =
|
||||
TableReference::full(&catalog_name, &schema_name, &table_name).to_string();
|
||||
let table_id = request.id;
|
||||
|
||||
let default_table_id = "0".to_owned();
|
||||
let table_id = TableId::from_str(
|
||||
request
|
||||
.table_options
|
||||
.extra_options
|
||||
.get("table_id")
|
||||
.unwrap_or(&default_table_id),
|
||||
)
|
||||
.unwrap();
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
"name",
|
||||
ConcreteDataType::string_datatype(),
|
||||
@@ -206,16 +192,16 @@ impl TableEngine for MockTableEngine {
|
||||
let data = vec![Arc::new(StringVector::from(vec!["a", "b", "c"])) as _];
|
||||
let record_batch = RecordBatch::new(schema, data).unwrap();
|
||||
let table: TableRef = Arc::new(MemTable::new_with_catalog(
|
||||
&table_name,
|
||||
&request.table_name,
|
||||
record_batch,
|
||||
table_id,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
request.catalog_name,
|
||||
request.schema_name,
|
||||
vec![0],
|
||||
)) as Arc<_>;
|
||||
|
||||
let mut tables = self.tables.write().await;
|
||||
tables.insert(table_full_name, table.clone() as TableRef);
|
||||
let mut tables = self.tables.write().unwrap();
|
||||
tables.insert(table_id, table.clone() as TableRef);
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
@@ -224,7 +210,7 @@ impl TableEngine for MockTableEngine {
|
||||
_ctx: &EngineContext,
|
||||
request: OpenTableRequest,
|
||||
) -> table::Result<Option<TableRef>> {
|
||||
Ok(self.tables.read().await.get(&request.table_name).cloned())
|
||||
Ok(self.tables.read().unwrap().get(&request.table_id).cloned())
|
||||
}
|
||||
|
||||
async fn alter_table(
|
||||
@@ -238,25 +224,13 @@ impl TableEngine for MockTableEngine {
|
||||
fn get_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
table_ref: &TableReference,
|
||||
table_id: TableId,
|
||||
) -> table::Result<Option<TableRef>> {
|
||||
futures::executor::block_on(async {
|
||||
Ok(self
|
||||
.tables
|
||||
.read()
|
||||
.await
|
||||
.get(&table_ref.to_string())
|
||||
.cloned())
|
||||
})
|
||||
Ok(self.tables.read().unwrap().get(&table_id).cloned())
|
||||
}
|
||||
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
|
||||
futures::executor::block_on(async {
|
||||
self.tables
|
||||
.read()
|
||||
.await
|
||||
.contains_key(&table_ref.to_string())
|
||||
})
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
|
||||
self.tables.read().unwrap().contains_key(&table_id)
|
||||
}
|
||||
|
||||
async fn drop_table(
|
||||
@@ -272,11 +246,7 @@ impl TableEngine for MockTableEngine {
|
||||
_ctx: &EngineContext,
|
||||
request: CloseTableRequest,
|
||||
) -> table::Result<CloseTableResult> {
|
||||
let _ = self
|
||||
.tables
|
||||
.write()
|
||||
.await
|
||||
.remove(&request.table_ref().to_string());
|
||||
let _ = self.tables.write().unwrap().remove(&request.table_id);
|
||||
Ok(CloseTableResult::Released(vec![]))
|
||||
}
|
||||
|
||||
|
||||
@@ -475,6 +475,7 @@ impl CountdownTask {
|
||||
catalog_name: table_ident.catalog.clone(),
|
||||
schema_name: table_ident.schema.clone(),
|
||||
table_name: table_ident.table.clone(),
|
||||
table_id: table_ident.table_id,
|
||||
region_numbers: vec![region],
|
||||
flush: true,
|
||||
};
|
||||
@@ -499,7 +500,7 @@ mod test {
|
||||
use common_meta::heartbeat::mailbox::HeartbeatMailbox;
|
||||
use datatypes::schema::RawSchema;
|
||||
use table::engine::manager::MemoryTableEngineManager;
|
||||
use table::engine::{TableEngine, TableReference};
|
||||
use table::engine::TableEngine;
|
||||
use table::requests::{CreateTableRequest, TableOptions};
|
||||
use table::test_util::EmptyTable;
|
||||
|
||||
@@ -751,8 +752,9 @@ mod test {
|
||||
let catalog = "my_catalog";
|
||||
let schema = "my_schema";
|
||||
let table = "my_table";
|
||||
let table_id = 1;
|
||||
let request = CreateTableRequest {
|
||||
id: 1,
|
||||
id: table_id,
|
||||
catalog_name: catalog.to_string(),
|
||||
schema_name: schema.to_string(),
|
||||
table_name: table.to_string(),
|
||||
@@ -768,7 +770,6 @@ mod test {
|
||||
table_options: TableOptions::default(),
|
||||
engine: "mito".to_string(),
|
||||
};
|
||||
let table_ref = TableReference::full(catalog, schema, table);
|
||||
|
||||
let table_engine = Arc::new(MockTableEngine::default());
|
||||
table_engine.create_table(ctx, request).await.unwrap();
|
||||
@@ -777,7 +778,7 @@ mod test {
|
||||
catalog: catalog.to_string(),
|
||||
schema: schema.to_string(),
|
||||
table: table.to_string(),
|
||||
table_id: 1024,
|
||||
table_id,
|
||||
engine: "mito".to_string(),
|
||||
};
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
@@ -813,9 +814,9 @@ mod test {
|
||||
.unwrap();
|
||||
|
||||
// assert the table is closed after deadline is reached
|
||||
assert!(table_engine.table_exists(ctx, &table_ref));
|
||||
assert!(table_engine.table_exists(ctx, table_id));
|
||||
// spare 500ms for the task to close the table
|
||||
tokio::time::sleep(Duration::from_millis(2000)).await;
|
||||
assert!(!table_engine.table_exists(ctx, &table_ref));
|
||||
assert!(!table_engine.table_exists(ctx, table_id));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
|
||||
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
|
||||
|
||||
/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
|
||||
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
let catalog_name = expr.catalog_name;
|
||||
let schema_name = expr.schema_name;
|
||||
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
|
||||
@@ -69,6 +69,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: expr.table_name,
|
||||
table_id,
|
||||
alter_kind,
|
||||
};
|
||||
Ok(request)
|
||||
@@ -82,6 +83,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: expr.table_name,
|
||||
table_id,
|
||||
alter_kind,
|
||||
};
|
||||
Ok(request)
|
||||
@@ -92,6 +94,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: expr.table_name,
|
||||
table_id,
|
||||
alter_kind,
|
||||
};
|
||||
Ok(request)
|
||||
@@ -239,7 +242,7 @@ mod tests {
|
||||
})),
|
||||
};
|
||||
|
||||
let alter_request = alter_expr_to_request(expr).unwrap();
|
||||
let alter_request = alter_expr_to_request(1, expr).unwrap();
|
||||
assert_eq!(alter_request.catalog_name, "");
|
||||
assert_eq!(alter_request.schema_name, "");
|
||||
assert_eq!("monitor".to_string(), alter_request.table_name);
|
||||
@@ -296,7 +299,7 @@ mod tests {
|
||||
})),
|
||||
};
|
||||
|
||||
let alter_request = alter_expr_to_request(expr).unwrap();
|
||||
let alter_request = alter_expr_to_request(1, expr).unwrap();
|
||||
assert_eq!(alter_request.catalog_name, "");
|
||||
assert_eq!(alter_request.schema_name, "");
|
||||
assert_eq!("monitor".to_string(), alter_request.table_name);
|
||||
@@ -344,7 +347,7 @@ mod tests {
|
||||
})),
|
||||
};
|
||||
|
||||
let alter_request = alter_expr_to_request(expr).unwrap();
|
||||
let alter_request = alter_expr_to_request(1, expr).unwrap();
|
||||
assert_eq!(alter_request.catalog_name, "test_catalog");
|
||||
assert_eq!(alter_request.schema_name, "test_schema");
|
||||
assert_eq!("monitor".to_string(), alter_request.table_name);
|
||||
|
||||
@@ -164,7 +164,7 @@ impl CloseRegionHandler {
|
||||
}
|
||||
|
||||
if engine
|
||||
.get_table(&ctx, table_ref)
|
||||
.get_table(&ctx, region_ident.table_ident.table_id)
|
||||
.with_context(|_| error::GetTableSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?
|
||||
@@ -178,6 +178,7 @@ impl CloseRegionHandler {
|
||||
schema_name: table_ref.schema.to_string(),
|
||||
table_name: table_ref.table.to_string(),
|
||||
region_numbers: region_numbers.clone(),
|
||||
table_id: region_ident.table_ident.table_id,
|
||||
flush: true,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -106,7 +106,13 @@ impl Instance {
|
||||
let name = alter_table.table_name().clone();
|
||||
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
|
||||
let table_ref = TableReference::full(&catalog, &schema, &table);
|
||||
let req = SqlHandler::alter_to_request(alter_table, table_ref)?;
|
||||
// Currently, we have to get the table multiple times. Consider remove the sql handler in the future.
|
||||
let table = self.sql_handler.get_table(&table_ref).await?;
|
||||
let req = SqlHandler::alter_to_request(
|
||||
alter_table,
|
||||
table_ref,
|
||||
table.table_info().ident.table_id,
|
||||
)?;
|
||||
self.sql_handler
|
||||
.execute(SqlRequest::Alter(req), query_ctx)
|
||||
.await
|
||||
@@ -114,10 +120,13 @@ impl Instance {
|
||||
Statement::DropTable(drop_table) => {
|
||||
let (catalog_name, schema_name, table_name) =
|
||||
table_idents_to_full_name(drop_table.table_name(), query_ctx.clone())?;
|
||||
let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
|
||||
let table = self.sql_handler.get_table(&table_ref).await?;
|
||||
let req = DropTableRequest {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
table_id: table.table_info().ident.table_id,
|
||||
};
|
||||
self.sql_handler
|
||||
.execute(SqlRequest::DropTable(req), query_ctx)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, FlushTableExpr};
|
||||
use common_catalog::consts::IMMUTABLE_FILE_ENGINE;
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
|
||||
use common_query::Output;
|
||||
use common_telemetry::info;
|
||||
@@ -22,8 +23,8 @@ use snafu::prelude::*;
|
||||
use table::requests::{DropTableRequest, FlushTableRequest};
|
||||
|
||||
use crate::error::{
|
||||
AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu,
|
||||
IncorrectInternalStateSnafu, Result,
|
||||
AlterExprToRequestSnafu, BumpTableIdSnafu, CatalogSnafu, CreateExprToRequestSnafu,
|
||||
IncorrectInternalStateSnafu, Result, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::instance::Instance;
|
||||
use crate::sql::SqlRequest;
|
||||
@@ -69,17 +70,45 @@ impl Instance {
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> Result<Output> {
|
||||
let request = alter_expr_to_request(expr).context(AlterExprToRequestSnafu)?;
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(
|
||||
&expr.catalog_name,
|
||||
&expr.schema_name,
|
||||
&expr.table_name,
|
||||
),
|
||||
})?;
|
||||
|
||||
let request = alter_expr_to_request(table.table_info().ident.table_id, expr)
|
||||
.context(AlterExprToRequestSnafu)?;
|
||||
self.sql_handler()
|
||||
.execute(SqlRequest::Alter(request), QueryContext::arc())
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> Result<Output> {
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(
|
||||
&expr.catalog_name,
|
||||
&expr.schema_name,
|
||||
&expr.table_name,
|
||||
),
|
||||
})?;
|
||||
|
||||
let req = DropTableRequest {
|
||||
catalog_name: expr.catalog_name,
|
||||
schema_name: expr.schema_name,
|
||||
table_name: expr.table_name,
|
||||
table_id: table.table_info().ident.table_id,
|
||||
};
|
||||
self.sql_handler()
|
||||
.execute(SqlRequest::DropTable(req), QueryContext::arc())
|
||||
|
||||
@@ -19,6 +19,7 @@ use snafu::prelude::*;
|
||||
use sql::statements::alter::{AlterTable, AlterTableOperation};
|
||||
use sql::statements::column_def_to_schema;
|
||||
use table::engine::TableReference;
|
||||
use table::metadata::TableId;
|
||||
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};
|
||||
use table_procedure::AlterTableProcedure;
|
||||
|
||||
@@ -60,6 +61,7 @@ impl SqlHandler {
|
||||
pub(crate) fn alter_to_request(
|
||||
alter_table: AlterTable,
|
||||
table_ref: TableReference,
|
||||
table_id: TableId,
|
||||
) -> Result<AlterTableRequest> {
|
||||
let alter_kind = match &alter_table.alter_operation() {
|
||||
AlterTableOperation::AddConstraint(table_constraint) => {
|
||||
@@ -91,6 +93,7 @@ impl SqlHandler {
|
||||
catalog_name: table_ref.catalog.to_string(),
|
||||
schema_name: table_ref.schema.to_string(),
|
||||
table_name: table_ref.table.to_string(),
|
||||
table_id,
|
||||
alter_kind,
|
||||
})
|
||||
}
|
||||
@@ -128,6 +131,7 @@ mod tests {
|
||||
let req = SqlHandler::alter_to_request(
|
||||
alter_table,
|
||||
TableReference::full("greptime", "public", "my_metric_1"),
|
||||
1,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(req.catalog_name, "greptime");
|
||||
@@ -154,6 +158,7 @@ mod tests {
|
||||
let req = SqlHandler::alter_to_request(
|
||||
alter_table,
|
||||
TableReference::full("greptime", "public", "test_table"),
|
||||
1,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(req.catalog_name, "greptime");
|
||||
|
||||
@@ -25,7 +25,7 @@ use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use table::engine::{table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference};
|
||||
use table::error::TableOperationSnafu;
|
||||
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
|
||||
use table::{error as table_error, Result as TableResult, Table, TableRef};
|
||||
use tokio::sync::Mutex;
|
||||
@@ -88,16 +88,12 @@ impl TableEngine for ImmutableFileTableEngine {
|
||||
.fail()
|
||||
}
|
||||
|
||||
fn get_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
table_ref: &TableReference,
|
||||
) -> TableResult<Option<TableRef>> {
|
||||
Ok(self.inner.get_table(table_ref))
|
||||
fn get_table(&self, _ctx: &EngineContext, table_id: TableId) -> TableResult<Option<TableRef>> {
|
||||
Ok(self.inner.get_table(table_id))
|
||||
}
|
||||
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
|
||||
self.inner.get_table(table_ref).is_some()
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
|
||||
self.inner.get_table(table_id).is_some()
|
||||
}
|
||||
|
||||
async fn drop_table(
|
||||
@@ -151,8 +147,8 @@ impl TableEngineProcedure for ImmutableFileTableEngine {
|
||||
|
||||
#[cfg(test)]
|
||||
impl ImmutableFileTableEngine {
|
||||
pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> {
|
||||
self.inner.close_table(table_ref).await
|
||||
pub async fn close_table(&self, table_id: TableId) -> TableResult<()> {
|
||||
self.inner.close_table(table_id).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,10 +169,10 @@ impl ImmutableFileTableEngine {
|
||||
}
|
||||
|
||||
struct EngineInner {
|
||||
/// All tables opened by the engine. Map key is formatted [TableReference].
|
||||
/// All tables opened by the engine.
|
||||
///
|
||||
/// Writing to `tables` should also hold the `table_mutex`.
|
||||
tables: RwLock<HashMap<String, ImmutableFileTableRef>>,
|
||||
tables: RwLock<HashMap<TableId, ImmutableFileTableRef>>,
|
||||
object_store: ObjectStore,
|
||||
|
||||
/// Table mutex is used to protect the operations such as creating/opening/closing
|
||||
@@ -199,6 +195,7 @@ impl EngineInner {
|
||||
request: CreateTableRequest,
|
||||
) -> Result<TableRef> {
|
||||
let CreateTableRequest {
|
||||
id: table_id,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
@@ -212,7 +209,7 @@ impl EngineInner {
|
||||
table: &table_name,
|
||||
};
|
||||
|
||||
if let Some(table) = self.get_table(&table_ref) {
|
||||
if let Some(table) = self.get_table(table_id) {
|
||||
return if create_if_not_exists {
|
||||
Ok(table)
|
||||
} else {
|
||||
@@ -223,14 +220,13 @@ impl EngineInner {
|
||||
let table_schema =
|
||||
Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?);
|
||||
|
||||
let table_id = request.id;
|
||||
let table_dir = table_dir(&catalog_name, &schema_name, table_id);
|
||||
|
||||
let table_full_name = table_ref.to_string();
|
||||
|
||||
let _lock = self.table_mutex.lock().await;
|
||||
// Checks again, read lock should be enough since we are guarded by the mutex.
|
||||
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
|
||||
if let Some(table) = self.get_table(table_id) {
|
||||
return if request.create_if_not_exists {
|
||||
Ok(table)
|
||||
} else {
|
||||
@@ -279,27 +275,20 @@ impl EngineInner {
|
||||
table_id
|
||||
);
|
||||
|
||||
self.tables
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(table_full_name, table.clone());
|
||||
self.tables.write().unwrap().insert(table_id, table.clone());
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
fn get_table_by_full_name(&self, full_name: &str) -> Option<TableRef> {
|
||||
fn get_table(&self, table_id: TableId) -> Option<TableRef> {
|
||||
self.tables
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(full_name)
|
||||
.get(&table_id)
|
||||
.cloned()
|
||||
.map(|table| table as _)
|
||||
}
|
||||
|
||||
fn get_table(&self, table_ref: &TableReference) -> Option<TableRef> {
|
||||
self.get_table_by_full_name(&table_ref.to_string())
|
||||
}
|
||||
|
||||
async fn open_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
@@ -309,6 +298,7 @@ impl EngineInner {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
table_id,
|
||||
..
|
||||
} = request;
|
||||
let table_ref = TableReference {
|
||||
@@ -317,16 +307,15 @@ impl EngineInner {
|
||||
table: &table_name,
|
||||
};
|
||||
|
||||
let table_full_name = table_ref.to_string();
|
||||
|
||||
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
|
||||
if let Some(table) = self.get_table(table_id) {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
|
||||
let table_full_name = table_ref.to_string();
|
||||
let table = {
|
||||
let _lock = self.table_mutex.lock().await;
|
||||
// Checks again, read lock should be enough since we are guarded by the mutex.
|
||||
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
|
||||
if let Some(table) = self.get_table(table_id) {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
|
||||
@@ -350,10 +339,7 @@ impl EngineInner {
|
||||
.context(table_error::TableOperationSnafu)?,
|
||||
);
|
||||
|
||||
self.tables
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(table_full_name, table.clone());
|
||||
self.tables.write().unwrap().insert(table_id, table.clone());
|
||||
Some(table as _)
|
||||
};
|
||||
|
||||
@@ -375,7 +361,7 @@ impl EngineInner {
|
||||
|
||||
let table_full_name = table_ref.to_string();
|
||||
let _lock = self.table_mutex.lock().await;
|
||||
if let Some(table) = self.get_table_by_full_name(&table_full_name) {
|
||||
if let Some(table) = self.get_table(req.table_id) {
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let table_dir = table_dir(&req.catalog_name, &req.schema_name, table_id);
|
||||
|
||||
@@ -389,7 +375,7 @@ impl EngineInner {
|
||||
.context(DropTableSnafu {
|
||||
table_name: &table_full_name,
|
||||
})?;
|
||||
self.tables.write().unwrap().remove(&table_full_name);
|
||||
self.tables.write().unwrap().remove(&req.table_id);
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
@@ -429,12 +415,10 @@ impl EngineInner {
|
||||
|
||||
#[cfg(test)]
|
||||
impl EngineInner {
|
||||
pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> {
|
||||
let full_name = table_ref.to_string();
|
||||
|
||||
pub async fn close_table(&self, table_id: TableId) -> TableResult<()> {
|
||||
let _lock = self.table_mutex.lock().await;
|
||||
|
||||
if let Some(table) = self.get_table_by_full_name(&full_name) {
|
||||
if let Some(table) = self.get_table(table_id) {
|
||||
let regions = Vec::new();
|
||||
table
|
||||
.close(®ions)
|
||||
@@ -443,7 +427,7 @@ impl EngineInner {
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
}
|
||||
|
||||
self.tables.write().unwrap().remove(&full_name);
|
||||
self.tables.write().unwrap().remove(&table_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -92,14 +92,13 @@ impl CreateImmutableFileTable {
|
||||
|
||||
fn on_prepare(&mut self) -> Result<Status> {
|
||||
let engine_ctx = EngineContext::default();
|
||||
let table_ref = self.data.table_ref();
|
||||
// Safety: Current get_table implementation always returns Ok.
|
||||
if self.engine.table_exists(&engine_ctx, &table_ref) {
|
||||
if self.engine.table_exists(&engine_ctx, self.data.request.id) {
|
||||
// The table already exists.
|
||||
ensure!(
|
||||
self.data.request.create_if_not_exists,
|
||||
TableExistsSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
table_name: self.data.table_ref().to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
@@ -113,8 +112,7 @@ impl CreateImmutableFileTable {
|
||||
|
||||
async fn on_create_table(&mut self) -> Result<Status> {
|
||||
let engine_ctx = EngineContext::default();
|
||||
let table_ref = self.data.table_ref();
|
||||
if self.engine.table_exists(&engine_ctx, &table_ref) {
|
||||
if self.engine.table_exists(&engine_ctx, self.data.request.id) {
|
||||
// Table already created. We don't need to check create_if_not_exists as
|
||||
// we have checked it in prepare state.
|
||||
return Ok(Status::Done);
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, IMMUTABLE_FILE_ENGINE};
|
||||
use table::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference};
|
||||
use table::engine::{EngineContext, TableEngine, TableEngineProcedure};
|
||||
use table::requests::{AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest};
|
||||
use table::{error as table_error, Table};
|
||||
|
||||
@@ -35,14 +35,9 @@ async fn test_get_table() {
|
||||
..
|
||||
} = test_util::setup_test_engine_and_table("test_get_table").await;
|
||||
let table_info = table.table_info();
|
||||
let table_ref = TableReference {
|
||||
catalog: &table_info.catalog_name,
|
||||
schema: &table_info.schema_name,
|
||||
table: &table_info.name,
|
||||
};
|
||||
|
||||
let got = table_engine
|
||||
.get_table(&EngineContext::default(), &table_ref)
|
||||
.get_table(&EngineContext::default(), table_info.ident.table_id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
@@ -53,21 +48,17 @@ async fn test_get_table() {
|
||||
async fn test_open_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let ctx = EngineContext::default();
|
||||
// the test table id is 1
|
||||
let table_id = 1;
|
||||
let open_req = OpenTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: test_util::TEST_TABLE_NAME.to_string(),
|
||||
// the test table id is 1
|
||||
table_id: 1,
|
||||
table_id,
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: DEFAULT_CATALOG_NAME,
|
||||
schema: DEFAULT_SCHEMA_NAME,
|
||||
table: test_util::TEST_TABLE_NAME,
|
||||
};
|
||||
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
table_ref: table,
|
||||
@@ -77,7 +68,7 @@ async fn test_open_table() {
|
||||
|
||||
assert_eq!(IMMUTABLE_FILE_ENGINE, table_engine.name());
|
||||
|
||||
table_engine.close_table(&table_ref).await.unwrap();
|
||||
table_engine.close_table(table_id).await.unwrap();
|
||||
|
||||
let reopened = table_engine
|
||||
.open_table(&ctx, open_req.clone())
|
||||
@@ -101,21 +92,17 @@ async fn test_open_table() {
|
||||
async fn test_close_all_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: DEFAULT_CATALOG_NAME,
|
||||
schema: DEFAULT_SCHEMA_NAME,
|
||||
table: test_util::TEST_TABLE_NAME,
|
||||
};
|
||||
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
dir: _dir,
|
||||
table_ref: table,
|
||||
..
|
||||
} = test_util::setup_test_engine_and_table("test_close_all_table").await;
|
||||
|
||||
table_engine.close().await.unwrap();
|
||||
|
||||
let exist = table_engine.table_exists(&EngineContext::default(), &table_ref);
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let exist = table_engine.table_exists(&EngineContext::default(), table_id);
|
||||
|
||||
assert!(!exist);
|
||||
}
|
||||
@@ -126,6 +113,7 @@ async fn test_alter_table() {
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
dir: _dir,
|
||||
table_ref,
|
||||
..
|
||||
} = test_util::setup_test_engine_and_table("test_alter_table").await;
|
||||
|
||||
@@ -133,6 +121,7 @@ async fn test_alter_table() {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TEST_TABLE_NAME.to_string(),
|
||||
table_id: table_ref.table_info().ident.table_id,
|
||||
alter_kind: AlterKind::RenameTable {
|
||||
new_table_name: "foo".to_string(),
|
||||
},
|
||||
@@ -151,12 +140,6 @@ async fn test_alter_table() {
|
||||
async fn test_drop_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let drop_req = DropTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TEST_TABLE_NAME.to_string(),
|
||||
};
|
||||
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
object_store,
|
||||
@@ -167,12 +150,13 @@ async fn test_drop_table() {
|
||||
} = test_util::setup_test_engine_and_table("test_drop_table").await;
|
||||
|
||||
let table_info = table.table_info();
|
||||
let table_ref = TableReference {
|
||||
catalog: &table_info.catalog_name,
|
||||
schema: &table_info.schema_name,
|
||||
table: &table_info.name,
|
||||
};
|
||||
|
||||
let drop_req = DropTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TEST_TABLE_NAME.to_string(),
|
||||
table_id: table_info.ident.table_id,
|
||||
};
|
||||
let dropped = table_engine
|
||||
.drop_table(&EngineContext::default(), drop_req)
|
||||
.await
|
||||
@@ -180,7 +164,7 @@ async fn test_drop_table() {
|
||||
|
||||
assert!(dropped);
|
||||
|
||||
let exist = table_engine.table_exists(&EngineContext::default(), &table_ref);
|
||||
let exist = table_engine.table_exists(&EngineContext::default(), table_info.ident.table_id);
|
||||
assert!(!exist);
|
||||
|
||||
// check table_dir manifest
|
||||
@@ -203,13 +187,14 @@ async fn test_create_drop_table_procedure() {
|
||||
let engine_ctx = EngineContext::default();
|
||||
// Test create table by procedure.
|
||||
let create_request = test_util::new_create_request(schema);
|
||||
let table_id = create_request.id;
|
||||
let mut procedure = table_engine
|
||||
.create_table_procedure(&engine_ctx, create_request.clone())
|
||||
.unwrap();
|
||||
common_procedure_test::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &create_request.table_ref())
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
|
||||
@@ -218,6 +203,7 @@ async fn test_create_drop_table_procedure() {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TEST_TABLE_NAME.to_string(),
|
||||
table_id,
|
||||
};
|
||||
let mut procedure = table_engine
|
||||
.drop_table_procedure(&engine_ctx, drop_request)
|
||||
@@ -225,7 +211,7 @@ async fn test_create_drop_table_procedure() {
|
||||
common_procedure_test::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &create_request.table_ref())
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
|
||||
@@ -543,8 +543,11 @@ impl DistInstance {
|
||||
table_name: format_full_table_name(catalog_name, schema_name, table_name),
|
||||
})?;
|
||||
|
||||
let request = common_grpc_expr::alter_expr_to_request(expr.clone())
|
||||
.context(AlterExprToRequestSnafu)?;
|
||||
let request = common_grpc_expr::alter_expr_to_request(
|
||||
table.table_info().ident.table_id,
|
||||
expr.clone(),
|
||||
)
|
||||
.context(AlterExprToRequestSnafu)?;
|
||||
|
||||
let mut context = AlterContext::with_capacity(1);
|
||||
|
||||
|
||||
@@ -382,6 +382,7 @@ impl DistTable {
|
||||
schema_name,
|
||||
table_name,
|
||||
alter_kind,
|
||||
table_id: _table_id,
|
||||
} = request;
|
||||
|
||||
let alter_expr = context
|
||||
|
||||
@@ -42,8 +42,7 @@ use table::engine::{
|
||||
};
|
||||
use table::metadata::{TableId, TableInfo, TableVersion};
|
||||
use table::requests::{
|
||||
AlterKind, AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest,
|
||||
OpenTableRequest,
|
||||
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
|
||||
};
|
||||
use table::{error as table_error, Result as TableResult, Table, TableRef};
|
||||
|
||||
@@ -102,9 +101,8 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
|
||||
let table_ref = request.table_ref();
|
||||
let _lock = self.inner.table_mutex.lock(table_ref.to_string()).await;
|
||||
if let Some(table) = self.inner.get_mito_table(&table_ref) {
|
||||
let _lock = self.inner.table_mutex.lock(request.id).await;
|
||||
if let Some(table) = self.inner.get_mito_table(request.id) {
|
||||
if request.create_if_not_exists {
|
||||
return Ok(table);
|
||||
} else {
|
||||
@@ -148,26 +146,10 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
|
||||
) -> TableResult<TableRef> {
|
||||
let _timer = common_telemetry::timer!(metrics::MITO_ALTER_TABLE_ELAPSED);
|
||||
|
||||
if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
|
||||
let mut table_ref = req.table_ref();
|
||||
table_ref.table = new_table_name;
|
||||
if self.inner.get_mito_table(&table_ref).is_some() {
|
||||
return TableExistsSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
}
|
||||
.fail()
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
}
|
||||
}
|
||||
|
||||
let mut procedure = AlterMitoTable::new(req, self.inner.clone())
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
|
||||
// TODO(yingwen): Rename has concurrent issue without the procedure runtime. But
|
||||
// users can't use this method to alter a table so it is still safe. We should
|
||||
// refactor the table engine to avoid using table name as key.
|
||||
procedure
|
||||
.engine_alter_table()
|
||||
.await
|
||||
@@ -175,16 +157,12 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
|
||||
.context(table_error::TableOperationSnafu)
|
||||
}
|
||||
|
||||
fn get_table(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
table_ref: &TableReference,
|
||||
) -> TableResult<Option<TableRef>> {
|
||||
Ok(self.inner.get_table(table_ref))
|
||||
fn get_table(&self, _ctx: &EngineContext, table_id: TableId) -> TableResult<Option<TableRef>> {
|
||||
Ok(self.inner.get_table(table_id))
|
||||
}
|
||||
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
|
||||
self.inner.get_table(table_ref).is_some()
|
||||
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
|
||||
self.inner.get_table(table_id).is_some()
|
||||
}
|
||||
|
||||
async fn drop_table(
|
||||
@@ -254,16 +232,16 @@ impl<S: StorageEngine> TableEngineProcedure for MitoEngine<S> {
|
||||
}
|
||||
|
||||
pub(crate) struct MitoEngineInner<S: StorageEngine> {
|
||||
/// All tables opened by the engine. Map key is formatted [TableReference].
|
||||
/// All tables opened by the engine.
|
||||
///
|
||||
/// Writing to `tables` should also hold the `table_mutex`.
|
||||
tables: DashMap<String, Arc<MitoTable<S::Region>>>,
|
||||
tables: DashMap<TableId, Arc<MitoTable<S::Region>>>,
|
||||
object_store: ObjectStore,
|
||||
compress_type: CompressionType,
|
||||
storage_engine: S,
|
||||
/// Table mutex is used to protect the operations such as creating/opening/closing
|
||||
/// a table, to avoid things like opening the same table simultaneously.
|
||||
table_mutex: Arc<KeyLock<String>>,
|
||||
table_mutex: Arc<KeyLock<TableId>>,
|
||||
}
|
||||
|
||||
fn build_row_key_desc(
|
||||
@@ -429,11 +407,6 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
let catalog_name = &request.catalog_name;
|
||||
let schema_name = &request.schema_name;
|
||||
let table_name = &request.table_name;
|
||||
let table_ref = TableReference {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table: table_name,
|
||||
};
|
||||
|
||||
let table_id = request.table_id;
|
||||
let engine_ctx = StorageEngineContext::default();
|
||||
@@ -463,6 +436,11 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
|
||||
let mut regions = HashMap::with_capacity(table_info.meta.region_numbers.len());
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table: table_name,
|
||||
};
|
||||
for region_number in &request.region_numbers {
|
||||
let region = self
|
||||
.open_region(&engine_ctx, table_id, *region_number, &table_ref, &opts)
|
||||
@@ -556,16 +534,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
ctx: &EngineContext,
|
||||
request: OpenTableRequest,
|
||||
) -> TableResult<Option<TableRef>> {
|
||||
let catalog_name = &request.catalog_name;
|
||||
let schema_name = &request.schema_name;
|
||||
let table_name = &request.table_name;
|
||||
let table_ref = TableReference {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
table: table_name,
|
||||
};
|
||||
|
||||
if let Some(table) = self.get_table(&table_ref) {
|
||||
if let Some(table) = self.get_table(request.table_id) {
|
||||
if let Some(table) = self.check_regions(table, &request.region_numbers)? {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
@@ -573,11 +542,10 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
|
||||
// Acquires the mutex before opening a new table.
|
||||
let table = {
|
||||
let table_name_key = table_ref.to_string();
|
||||
let _lock = self.table_mutex.lock(table_name_key.clone()).await;
|
||||
let _lock = self.table_mutex.lock(request.table_id).await;
|
||||
|
||||
// Checks again, read lock should be enough since we are guarded by the mutex.
|
||||
if let Some(table) = self.get_mito_table(&table_ref) {
|
||||
if let Some(table) = self.get_mito_table(request.table_id) {
|
||||
// Contains all regions or target region
|
||||
if let Some(table) = self.check_regions(table.clone(), &request.region_numbers)? {
|
||||
Some(table)
|
||||
@@ -593,7 +561,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
let table = self.recover_table(ctx, request.clone()).await?;
|
||||
if let Some(table) = table {
|
||||
// already locked
|
||||
self.tables.insert(table_ref.to_string(), table.clone());
|
||||
self.tables.insert(request.table_id, table.clone());
|
||||
|
||||
Some(table as _)
|
||||
} else {
|
||||
@@ -604,8 +572,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
|
||||
logging::info!(
|
||||
"Mito engine opened table: {} in schema: {}",
|
||||
table_name,
|
||||
schema_name
|
||||
request.table_name,
|
||||
request.schema_name
|
||||
);
|
||||
|
||||
Ok(table)
|
||||
@@ -613,10 +581,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
|
||||
async fn drop_table(&self, request: DropTableRequest) -> TableResult<bool> {
|
||||
// Remove the table from the engine to avoid further access from users.
|
||||
let table_ref = request.table_ref();
|
||||
|
||||
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
|
||||
let removed_table = self.tables.remove(&table_ref.to_string());
|
||||
let _lock = self.table_mutex.lock(request.table_id).await;
|
||||
let removed_table = self.tables.remove(&request.table_id);
|
||||
// Close the table to close all regions. Closing a region is idempotent.
|
||||
if let Some((_, table)) = &removed_table {
|
||||
let regions = table.region_ids();
|
||||
@@ -663,17 +629,13 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
Ok(Some((manifest, table_info)))
|
||||
}
|
||||
|
||||
fn get_table(&self, table_ref: &TableReference) -> Option<TableRef> {
|
||||
self.tables
|
||||
.get(&table_ref.to_string())
|
||||
.map(|en| en.value().clone() as _)
|
||||
fn get_table(&self, table_id: TableId) -> Option<TableRef> {
|
||||
self.tables.get(&table_id).map(|en| en.value().clone() as _)
|
||||
}
|
||||
|
||||
/// Returns the [MitoTable].
|
||||
fn get_mito_table(&self, table_ref: &TableReference) -> Option<Arc<MitoTable<S::Region>>> {
|
||||
self.tables
|
||||
.get(&table_ref.to_string())
|
||||
.map(|en| en.value().clone())
|
||||
fn get_mito_table(&self, table_id: TableId) -> Option<Arc<MitoTable<S::Region>>> {
|
||||
self.tables.get(&table_id).map(|en| en.value().clone())
|
||||
}
|
||||
|
||||
async fn close(&self) -> TableResult<()> {
|
||||
@@ -696,8 +658,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
}
|
||||
|
||||
async fn close_table(&self, request: CloseTableRequest) -> TableResult<CloseTableResult> {
|
||||
let table_ref = request.table_ref();
|
||||
if let Some(table) = self.get_mito_table(&table_ref) {
|
||||
if let Some(table) = self.get_mito_table(request.table_id) {
|
||||
return self
|
||||
.close_table_inner(table, Some(&request.region_numbers), request.flush)
|
||||
.await;
|
||||
@@ -713,13 +674,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
flush: bool,
|
||||
) -> TableResult<CloseTableResult> {
|
||||
let info = table.table_info();
|
||||
let table_ref = TableReference {
|
||||
catalog: &info.catalog_name,
|
||||
schema: &info.schema_name,
|
||||
table: &info.name,
|
||||
};
|
||||
let table_id = info.ident.table_id;
|
||||
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
|
||||
let _lock = self.table_mutex.lock(table_id).await;
|
||||
|
||||
let all_regions = table.region_ids();
|
||||
let regions = regions.unwrap_or(&all_regions);
|
||||
@@ -738,12 +694,12 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
}
|
||||
|
||||
if table.is_releasable() {
|
||||
self.tables.remove(&table_ref.to_string());
|
||||
self.tables.remove(&table_id);
|
||||
|
||||
logging::info!(
|
||||
"Mito engine closed table: {} in schema: {}",
|
||||
table_ref.table,
|
||||
table_ref.schema,
|
||||
info.name,
|
||||
info.schema_name,
|
||||
);
|
||||
return Ok(CloseTableResult::Released(removed_regions));
|
||||
}
|
||||
|
||||
@@ -29,9 +29,7 @@ use table::requests::{AlterKind, AlterTableRequest};
|
||||
use table::{Table, TableRef};
|
||||
|
||||
use crate::engine::MitoEngineInner;
|
||||
use crate::error::{
|
||||
TableExistsSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu,
|
||||
};
|
||||
use crate::error::{TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu};
|
||||
use crate::manifest::action::{TableChange, TableMetaAction, TableMetaActionList};
|
||||
use crate::metrics;
|
||||
use crate::table::MitoTable;
|
||||
@@ -39,7 +37,6 @@ use crate::table::MitoTable;
|
||||
/// Procedure to alter a [MitoTable].
|
||||
pub(crate) struct AlterMitoTable<S: StorageEngine> {
|
||||
data: AlterTableData,
|
||||
engine_inner: Arc<MitoEngineInner<S>>,
|
||||
table: Arc<MitoTable<S::Region>>,
|
||||
/// The table info after alteration.
|
||||
new_info: Option<TableInfo>,
|
||||
@@ -107,18 +104,16 @@ impl<S: StorageEngine> AlterMitoTable<S> {
|
||||
table_version: 0,
|
||||
};
|
||||
let table_ref = data.table_ref();
|
||||
let table =
|
||||
engine_inner
|
||||
.get_mito_table(&table_ref)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
let table = engine_inner
|
||||
.get_mito_table(data.request.table_id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
let info = table.table_info();
|
||||
data.table_version = info.ident.version;
|
||||
|
||||
Ok(AlterMitoTable {
|
||||
data,
|
||||
engine_inner,
|
||||
table,
|
||||
new_info: None,
|
||||
alter_op: None,
|
||||
@@ -148,16 +143,14 @@ impl<S: StorageEngine> AlterMitoTable<S> {
|
||||
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
|
||||
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
let table_ref = data.table_ref();
|
||||
let table =
|
||||
engine_inner
|
||||
.get_mito_table(&table_ref)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
let table = engine_inner
|
||||
.get_mito_table(data.request.table_id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
|
||||
Ok(AlterMitoTable {
|
||||
data,
|
||||
engine_inner,
|
||||
table,
|
||||
new_info: None,
|
||||
alter_op: None,
|
||||
@@ -176,17 +169,8 @@ impl<S: StorageEngine> AlterMitoTable<S> {
|
||||
}
|
||||
);
|
||||
|
||||
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
|
||||
let mut table_ref = self.data.table_ref();
|
||||
table_ref.table = new_table_name;
|
||||
ensure!(
|
||||
self.engine_inner.get_mito_table(&table_ref).is_none(),
|
||||
TableExistsSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// We don't check the table name in the table engine as it is the catalog
|
||||
// manager's duty to ensure the table name is unused.
|
||||
self.data.state = AlterTableState::EngineAlterTable;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
@@ -252,22 +236,6 @@ impl<S: StorageEngine> AlterMitoTable<S> {
|
||||
// Update in memory metadata of the table.
|
||||
self.table.set_table_info(new_info.clone());
|
||||
|
||||
// Rename key in tables map.
|
||||
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
|
||||
let mut table_ref = self.data.table_ref();
|
||||
let removed = {
|
||||
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
|
||||
self.engine_inner.tables.remove(&table_ref.to_string())
|
||||
};
|
||||
ensure!(removed.is_some(), TableNotFoundSnafu { table_name });
|
||||
|
||||
table_ref.table = new_table_name.as_str();
|
||||
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
|
||||
self.engine_inner
|
||||
.tables
|
||||
.insert(table_ref.to_string(), self.table.clone());
|
||||
}
|
||||
|
||||
Ok(self.table.clone())
|
||||
}
|
||||
|
||||
@@ -363,19 +331,15 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Get metadata of the created table.
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, request.id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let old_info = table.table_info();
|
||||
let old_meta = &old_info.meta;
|
||||
|
||||
// Alter the table.
|
||||
let table_id = request.id;
|
||||
let request = new_add_columns_req();
|
||||
let mut procedure = table_engine
|
||||
.alter_table_procedure(&engine_ctx, request.clone())
|
||||
@@ -384,7 +348,7 @@ mod tests {
|
||||
|
||||
// Validate.
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let new_info = table.table_info();
|
||||
@@ -405,7 +369,7 @@ mod tests {
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
);
|
||||
let request = new_add_columns_req_with_location(&new_tag, &new_field);
|
||||
let request = new_add_columns_req_with_location(table_id, &new_tag, &new_field);
|
||||
let mut procedure = table_engine
|
||||
.alter_table_procedure(&engine_ctx, request.clone())
|
||||
.unwrap();
|
||||
@@ -413,7 +377,7 @@ mod tests {
|
||||
|
||||
// Validate.
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let new_info = table.table_info();
|
||||
@@ -456,6 +420,7 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Add columns.
|
||||
let table_id = request.id;
|
||||
let request = new_add_columns_req();
|
||||
let mut procedure = table_engine
|
||||
.alter_table_procedure(&engine_ctx, request.clone())
|
||||
@@ -463,13 +428,8 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Get metadata.
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let old_info = table.table_info();
|
||||
@@ -521,13 +481,8 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Get metadata of the created table.
|
||||
let mut table_ref = TableReference {
|
||||
catalog: &create_request.catalog_name,
|
||||
schema: &create_request.schema_name,
|
||||
table: &create_request.table_name,
|
||||
};
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, create_request.id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
@@ -546,12 +501,7 @@ mod tests {
|
||||
let info = table.table_info();
|
||||
assert_eq!(new_name, info.name);
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
table_ref.table = &new_name;
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, create_request.id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
|
||||
@@ -131,7 +131,12 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
let table_ref = self.creator.data.table_ref();
|
||||
logging::debug!("on prepare create table {}", table_ref);
|
||||
|
||||
if self.creator.engine_inner.get_table(&table_ref).is_some() {
|
||||
if self
|
||||
.creator
|
||||
.engine_inner
|
||||
.get_table(self.creator.data.request.id)
|
||||
.is_some()
|
||||
{
|
||||
// If the table already exists.
|
||||
ensure!(
|
||||
self.creator.data.request.create_if_not_exists,
|
||||
@@ -152,14 +157,14 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
async fn on_engine_create_table(&mut self) -> Result<Status> {
|
||||
// In this state, we can ensure we are able to create a new table.
|
||||
let table_ref = self.creator.data.table_ref();
|
||||
logging::debug!("on engine create table {}", table_ref);
|
||||
let table_id = self.creator.data.request.id;
|
||||
logging::debug!(
|
||||
"on engine create table {}, table_id: {}",
|
||||
table_ref,
|
||||
table_id
|
||||
);
|
||||
|
||||
let _lock = self
|
||||
.creator
|
||||
.engine_inner
|
||||
.table_mutex
|
||||
.lock(table_ref.to_string())
|
||||
.await;
|
||||
let _lock = self.creator.engine_inner.table_mutex.lock(table_id).await;
|
||||
self.creator.create_table().await?;
|
||||
|
||||
Ok(Status::Done)
|
||||
@@ -209,14 +214,13 @@ impl<S: StorageEngine> TableCreator<S> {
|
||||
self.data.request.id,
|
||||
);
|
||||
|
||||
let table_ref = self.data.table_ref();
|
||||
// It is possible that the procedure retries `CREATE TABLE` many times, so we
|
||||
// return the table if it exists.
|
||||
if let Some(table) = self.engine_inner.get_table(&table_ref) {
|
||||
if let Some(table) = self.engine_inner.get_table(self.data.request.id) {
|
||||
return Ok(table.clone());
|
||||
}
|
||||
|
||||
logging::debug!("Creator create table {}", table_ref);
|
||||
logging::debug!("Creator create table {}", self.data.table_ref());
|
||||
|
||||
self.create_regions(&table_dir).await?;
|
||||
|
||||
@@ -313,7 +317,6 @@ impl<S: StorageEngine> TableCreator<S> {
|
||||
/// Writes metadata to the table manifest.
|
||||
async fn write_table_manifest(&mut self, table_dir: &str) -> Result<TableRef> {
|
||||
// Try to open the table first, as the table manifest might already exist.
|
||||
let table_ref = self.data.table_ref();
|
||||
if let Some((manifest, table_info)) = self
|
||||
.engine_inner
|
||||
.recover_table_manifest_and_info(&self.data.request.table_name, table_dir)
|
||||
@@ -323,7 +326,7 @@ impl<S: StorageEngine> TableCreator<S> {
|
||||
|
||||
self.engine_inner
|
||||
.tables
|
||||
.insert(table_ref.to_string(), table.clone());
|
||||
.insert(self.data.request.id, table.clone());
|
||||
return Ok(table);
|
||||
}
|
||||
|
||||
@@ -333,7 +336,7 @@ impl<S: StorageEngine> TableCreator<S> {
|
||||
|
||||
self.engine_inner
|
||||
.tables
|
||||
.insert(table_ref.to_string(), table.clone());
|
||||
.insert(self.data.request.id, table.clone());
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
@@ -432,13 +435,8 @@ mod tests {
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
assert!(table_engine
|
||||
.get_table(&EngineContext::default(), &table_ref)
|
||||
.get_table(&EngineContext::default(), request.id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
|
||||
@@ -84,8 +84,7 @@ impl<S: StorageEngine> DropMitoTable<S> {
|
||||
state: DropTableState::Prepare,
|
||||
request,
|
||||
};
|
||||
let table_ref = data.table_ref();
|
||||
let table = engine_inner.get_mito_table(&table_ref);
|
||||
let table = engine_inner.get_mito_table(data.request.table_id);
|
||||
|
||||
Ok(DropMitoTable {
|
||||
data,
|
||||
@@ -115,8 +114,7 @@ impl<S: StorageEngine> DropMitoTable<S> {
|
||||
/// Recover the procedure from json.
|
||||
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
|
||||
let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
let table_ref = data.table_ref();
|
||||
let table = engine_inner.get_mito_table(&table_ref);
|
||||
let table = engine_inner.get_mito_table(data.request.table_id);
|
||||
|
||||
Ok(DropMitoTable {
|
||||
data,
|
||||
@@ -182,6 +180,7 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Drop the table.
|
||||
let table_id = request.id;
|
||||
let request = test_util::new_drop_request();
|
||||
let mut procedure = table_engine
|
||||
.drop_table_procedure(&engine_ctx, request.clone())
|
||||
@@ -189,13 +188,8 @@ mod tests {
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// The table is dropped.
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ use table::Table;
|
||||
|
||||
use super::*;
|
||||
use crate::table::test_util::{
|
||||
self, new_insert_request, schema_for_test, setup_table, TestEngineComponents, TABLE_NAME,
|
||||
self, new_insert_request, setup_table, TestEngineComponents, TABLE_NAME,
|
||||
};
|
||||
|
||||
pub fn has_parquet_file(sst_dir: &str) -> bool {
|
||||
@@ -537,11 +537,16 @@ fn test_region_id() {
|
||||
assert_eq!(18446744069414584330, region_id(u32::MAX, 10));
|
||||
}
|
||||
|
||||
fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> AlterTableRequest {
|
||||
fn new_add_columns_req(
|
||||
table_id: TableId,
|
||||
new_tag: &ColumnSchema,
|
||||
new_field: &ColumnSchema,
|
||||
) -> AlterTableRequest {
|
||||
AlterTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id,
|
||||
alter_kind: AlterKind::AddColumns {
|
||||
columns: vec![
|
||||
AddColumnRequest {
|
||||
@@ -560,6 +565,7 @@ fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> Alte
|
||||
}
|
||||
|
||||
pub(crate) fn new_add_columns_req_with_location(
|
||||
table_id: TableId,
|
||||
new_tag: &ColumnSchema,
|
||||
new_field: &ColumnSchema,
|
||||
) -> AlterTableRequest {
|
||||
@@ -567,6 +573,7 @@ pub(crate) fn new_add_columns_req_with_location(
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id,
|
||||
alter_kind: AlterKind::AddColumns {
|
||||
columns: vec![
|
||||
AddColumnRequest {
|
||||
@@ -597,7 +604,7 @@ async fn test_alter_table_add_column() {
|
||||
|
||||
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
|
||||
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
|
||||
let req = new_add_columns_req(&new_tag, &new_field);
|
||||
let req = new_add_columns_req(table.table_info().ident.table_id, &new_tag, &new_field);
|
||||
let table = table_engine
|
||||
.alter_table(&EngineContext::default(), req)
|
||||
.await
|
||||
@@ -633,7 +640,7 @@ async fn test_alter_table_add_column() {
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
);
|
||||
let req = new_add_columns_req_with_location(&new_tag, &new_field);
|
||||
let req = new_add_columns_req_with_location(new_info.ident.table_id, &new_tag, &new_field);
|
||||
let table = table_engine
|
||||
.alter_table(&EngineContext::default(), req)
|
||||
.await
|
||||
@@ -653,13 +660,13 @@ async fn test_alter_table_add_column() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_table_remove_column() {
|
||||
let (_engine, table_engine, _table, _object_store, _dir) =
|
||||
let (_engine, table_engine, table, _object_store, _dir) =
|
||||
test_util::setup_mock_engine_and_table().await;
|
||||
|
||||
// Add two columns to the table first.
|
||||
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
|
||||
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
|
||||
let req = new_add_columns_req(&new_tag, &new_field);
|
||||
let req = new_add_columns_req(table.table_info().ident.table_id, &new_tag, &new_field);
|
||||
let table = table_engine
|
||||
.alter_table(&EngineContext::default(), req)
|
||||
.await
|
||||
@@ -674,6 +681,7 @@ async fn test_alter_table_remove_column() {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id: table.table_info().ident.table_id,
|
||||
alter_kind: AlterKind::DropColumns {
|
||||
names: vec![String::from("memory"), String::from("my_field")],
|
||||
},
|
||||
@@ -706,45 +714,13 @@ async fn test_alter_rename_table() {
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
storage_engine,
|
||||
table_ref,
|
||||
object_store,
|
||||
dir: _dir,
|
||||
..
|
||||
} = test_util::setup_test_engine_and_table().await;
|
||||
let ctx = EngineContext::default();
|
||||
|
||||
// register another table
|
||||
let another_name = "another_table";
|
||||
let req = CreateTableRequest {
|
||||
id: 1024,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: another_name.to_string(),
|
||||
desc: Some("another test table".to_string()),
|
||||
schema: RawSchema::from(&schema_for_test()),
|
||||
region_numbers: vec![0],
|
||||
primary_key_indices: vec![0],
|
||||
create_if_not_exists: true,
|
||||
table_options: TableOptions::default(),
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
};
|
||||
table_engine
|
||||
.create_table(&ctx, req)
|
||||
.await
|
||||
.expect("create table must succeed");
|
||||
// test renaming a table with an existing name.
|
||||
let req = AlterTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
alter_kind: AlterKind::RenameTable {
|
||||
new_table_name: another_name.to_string(),
|
||||
},
|
||||
};
|
||||
let err = table_engine.alter_table(&ctx, req).await.err().unwrap();
|
||||
assert!(
|
||||
err.to_string().contains("Table already exists"),
|
||||
"Unexpected error: {err}"
|
||||
);
|
||||
let table_id = table_ref.table_info().ident.table_id;
|
||||
|
||||
let new_table_name = "test_table";
|
||||
// test rename table
|
||||
@@ -752,6 +728,7 @@ async fn test_alter_rename_table() {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id,
|
||||
alter_kind: AlterKind::RenameTable {
|
||||
new_table_name: new_table_name.to_string(),
|
||||
},
|
||||
@@ -765,7 +742,7 @@ async fn test_alter_rename_table() {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: new_table_name.to_string(),
|
||||
table_id: 1,
|
||||
table_id,
|
||||
region_numbers: vec![0],
|
||||
};
|
||||
|
||||
@@ -794,17 +771,13 @@ async fn test_drop_table() {
|
||||
let engine_ctx = EngineContext {};
|
||||
|
||||
let table_info = table.table_info();
|
||||
let table_reference = TableReference {
|
||||
catalog: DEFAULT_CATALOG_NAME,
|
||||
schema: DEFAULT_SCHEMA_NAME,
|
||||
table: &table_info.name,
|
||||
};
|
||||
|
||||
let table_id = 1;
|
||||
let create_table_request = CreateTableRequest {
|
||||
id: 1,
|
||||
id: table_id,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
table_name: table_info.name.clone(),
|
||||
schema: RawSchema::from(&*table_info.meta.schema),
|
||||
create_if_not_exists: true,
|
||||
desc: None,
|
||||
@@ -819,23 +792,25 @@ async fn test_drop_table() {
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table_info, created_table.table_info());
|
||||
assert!(table_engine.table_exists(&engine_ctx, &table_reference));
|
||||
assert!(table_engine.table_exists(&engine_ctx, table_id));
|
||||
|
||||
let drop_table_request = DropTableRequest {
|
||||
catalog_name: table_reference.catalog.to_string(),
|
||||
schema_name: table_reference.schema.to_string(),
|
||||
table_name: table_reference.table.to_string(),
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_info.name.clone(),
|
||||
table_id,
|
||||
};
|
||||
let table_dropped = table_engine
|
||||
.drop_table(&engine_ctx, drop_table_request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(table_dropped);
|
||||
assert!(!table_engine.table_exists(&engine_ctx, &table_reference));
|
||||
assert!(!table_engine.table_exists(&engine_ctx, table_id));
|
||||
|
||||
// should be able to re-create
|
||||
let table_id = 2;
|
||||
let request = CreateTableRequest {
|
||||
id: 2,
|
||||
id: table_id,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
@@ -848,7 +823,7 @@ async fn test_drop_table() {
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
};
|
||||
table_engine.create_table(&ctx, request).await.unwrap();
|
||||
assert!(table_engine.table_exists(&engine_ctx, &table_reference));
|
||||
assert!(table_engine.table_exists(&engine_ctx, table_id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -28,7 +28,7 @@ use storage::compaction::noop::NoopCompactionScheduler;
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use table::engine::{EngineContext, TableEngine};
|
||||
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::requests::{
|
||||
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, InsertRequest, TableOptions,
|
||||
};
|
||||
@@ -39,6 +39,7 @@ use crate::engine::{MitoEngine, MITO_ENGINE};
|
||||
pub use crate::table::test_util::mock_engine::{MockEngine, MockRegion};
|
||||
|
||||
pub const TABLE_NAME: &str = "demo";
|
||||
pub const TABLE_ID: TableId = 1;
|
||||
|
||||
/// Create a InsertRequest with default catalog and schema.
|
||||
pub fn new_insert_request(
|
||||
@@ -107,7 +108,7 @@ pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
|
||||
|
||||
pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
|
||||
CreateTableRequest {
|
||||
id: 1,
|
||||
id: TABLE_ID,
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
@@ -126,6 +127,7 @@ pub fn new_alter_request(alter_kind: AlterKind) -> AlterTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id: TABLE_ID,
|
||||
alter_kind,
|
||||
}
|
||||
}
|
||||
@@ -135,6 +137,7 @@ pub fn new_drop_request() -> DropTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
table_id: TABLE_ID,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -315,13 +315,14 @@ mod tests {
|
||||
async fn test_alter_table_procedure_rename() {
|
||||
let env = TestEnv::new("rename");
|
||||
let table_name = "test_old";
|
||||
env.create_table(table_name).await;
|
||||
let table_id = env.create_table(table_name).await;
|
||||
|
||||
let new_table_name = "test_new";
|
||||
let request = AlterTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id,
|
||||
alter_kind: AlterKind::RenameTable {
|
||||
new_table_name: new_table_name.to_string(),
|
||||
},
|
||||
|
||||
@@ -349,14 +349,9 @@ mod tests {
|
||||
table_engine.clone(),
|
||||
);
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
let engine_ctx = EngineContext::default();
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, request.id)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
|
||||
@@ -365,7 +360,7 @@ mod tests {
|
||||
watcher.changed().await.unwrap();
|
||||
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, request.id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
@@ -390,14 +385,9 @@ mod tests {
|
||||
table_engine.clone(),
|
||||
);
|
||||
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
let engine_ctx = EngineContext::default();
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, request.id)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
|
||||
@@ -452,7 +442,7 @@ mod tests {
|
||||
|
||||
// The table is created.
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.get_table(&engine_ctx, request.id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
|
||||
@@ -277,12 +277,13 @@ mod tests {
|
||||
async fn test_drop_table_procedure() {
|
||||
let env = TestEnv::new("drop");
|
||||
let table_name = "test_drop";
|
||||
env.create_table(table_name).await;
|
||||
let table_id = env.create_table(table_name).await;
|
||||
|
||||
let request = DropTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id,
|
||||
};
|
||||
let TestEnv {
|
||||
dir: _dir,
|
||||
@@ -305,13 +306,6 @@ mod tests {
|
||||
let schema = catalog.schema(DEFAULT_SCHEMA_NAME).await.unwrap().unwrap();
|
||||
assert!(schema.table(table_name).await.unwrap().is_none());
|
||||
let ctx = EngineContext::default();
|
||||
assert!(!table_engine.table_exists(
|
||||
&ctx,
|
||||
&TableReference {
|
||||
catalog: DEFAULT_CATALOG_NAME,
|
||||
schema: DEFAULT_SCHEMA_NAME,
|
||||
table: table_name,
|
||||
}
|
||||
));
|
||||
assert!(!table_engine.table_exists(&ctx, table_id,));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ use object_store::ObjectStore;
|
||||
use storage::compaction::noop::NoopCompactionScheduler;
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use table::metadata::TableId;
|
||||
use table::requests::CreateTableRequest;
|
||||
|
||||
use crate::CreateTableProcedure;
|
||||
@@ -92,8 +93,9 @@ impl TestEnv {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_table(&self, table_name: &str) {
|
||||
pub async fn create_table(&self, table_name: &str) -> TableId {
|
||||
let request = new_create_request(table_name);
|
||||
let table_id = request.id;
|
||||
let procedure = CreateTableProcedure::new(
|
||||
request,
|
||||
self.catalog_manager.clone(),
|
||||
@@ -108,6 +110,8 @@ impl TestEnv {
|
||||
.await
|
||||
.unwrap();
|
||||
watcher.changed().await.unwrap();
|
||||
|
||||
table_id
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -108,14 +108,10 @@ pub trait TableEngine: Send + Sync {
|
||||
) -> Result<TableRef>;
|
||||
|
||||
/// Returns the table by it's name.
|
||||
fn get_table(
|
||||
&self,
|
||||
ctx: &EngineContext,
|
||||
table_ref: &TableReference,
|
||||
) -> Result<Option<TableRef>>;
|
||||
fn get_table(&self, ctx: &EngineContext, table_id: TableId) -> Result<Option<TableRef>>;
|
||||
|
||||
/// Returns true when the given table is exists.
|
||||
fn table_exists(&self, ctx: &EngineContext, table_ref: &TableReference) -> bool;
|
||||
fn table_exists(&self, ctx: &EngineContext, table_id: TableId) -> bool;
|
||||
|
||||
/// Drops the given table. Return true if the table is dropped, or false if the table doesn't exist.
|
||||
async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result<bool>;
|
||||
|
||||
@@ -162,6 +162,7 @@ pub struct AlterTableRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub table_id: TableId,
|
||||
pub alter_kind: AlterKind,
|
||||
}
|
||||
|
||||
@@ -200,6 +201,7 @@ pub struct DropTableRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub table_id: TableId,
|
||||
}
|
||||
|
||||
impl DropTableRequest {
|
||||
@@ -218,6 +220,7 @@ pub struct CloseTableRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub table_id: TableId,
|
||||
/// Do nothing if region_numbers is empty
|
||||
pub region_numbers: Vec<RegionNumber>,
|
||||
/// flush regions
|
||||
|
||||
@@ -19,7 +19,8 @@ use async_trait::async_trait;
|
||||
use common_procedure::BoxedProcedure;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference};
|
||||
use crate::engine::{EngineContext, TableEngine, TableEngineProcedure};
|
||||
use crate::metadata::TableId;
|
||||
use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
|
||||
use crate::test_util::EmptyTable;
|
||||
use crate::{Result, TableRef};
|
||||
@@ -86,11 +87,11 @@ impl TableEngine for MockTableEngine {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn get_table(&self, _ctx: &EngineContext, _ref: &TableReference) -> Result<Option<TableRef>> {
|
||||
fn get_table(&self, _ctx: &EngineContext, _table_id: TableId) -> Result<Option<TableRef>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn table_exists(&self, _ctx: &EngineContext, _name: &TableReference) -> bool {
|
||||
fn table_exists(&self, _ctx: &EngineContext, _table_id: TableId) -> bool {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user