feat(drop_table): support to rollback table metadata (#3692)

* feat: support to rollback table metadata

* refactor: store table route value instead of physical table route

* feat(drop_table): support to rollback table metadata

* test: add rollback tests for drop table

* fix: do not set region to readonly

* test: add sqlness tests

* feat: implement TombstoneManager

* test: add tests for TombstoneManager

* refactor: using TombstoneManager

* chore: remove unused code

* fix: fix typo

* refactor: using `on_restore_metadata`

* refactor: add `executor` to `DropTableProcedure`

* refactor: simplify the `TombstoneManager`

* refactor: refactor `Key`

* refactor: carry more info

* feat: add `destroy_table_metadata`

* refactor: remove redundant table_route_value

* feat: ensure the key is empty

* feat: introcude `table_metadata_keys`

* chore: carry more info

* chore: remove clone

* chore: apply suggestions from CR

* feat: delete metadata tombstone
This commit is contained in:
Weny Xu
2024-04-16 17:22:41 +08:00
committed by GitHub
parent 64941d848e
commit d12379106e
21 changed files with 1305 additions and 218 deletions

View File

@@ -107,14 +107,11 @@ impl TableMetadataBencher {
.unwrap();
let start = Instant::now();
let table_info = table_info.unwrap();
let table_route = table_route.unwrap();
let table_id = table_info.table_info.ident.table_id;
let _ = self
.table_metadata_manager
.delete_table_metadata(
table_id,
&table_info.table_name(),
table_route.unwrap().region_routes().unwrap(),
)
.delete_table_metadata(table_id, &table_info.table_name(), &table_route)
.await;
start.elapsed()
},

View File

@@ -76,6 +76,7 @@ impl DropDatabaseCursor {
.await?;
Ok((
Box::new(DropDatabaseExecutor::new(
table_id,
table_id,
TableName::new(&ctx.catalog, &ctx.schema, &table_name),
table_route.region_routes,
@@ -86,6 +87,7 @@ impl DropDatabaseCursor {
}
(DropTableTarget::Physical, TableRouteValue::Physical(table_route)) => Ok((
Box::new(DropDatabaseExecutor::new(
table_id,
table_id,
TableName::new(&ctx.catalog, &ctx.schema, &table_name),
table_route.region_routes,
@@ -220,7 +222,7 @@ mod tests {
.get_physical_table_route(physical_table_id)
.await
.unwrap();
assert_eq!(table_route.region_routes, executor.region_routes);
assert_eq!(table_route.region_routes, executor.physical_region_routes);
assert_eq!(executor.target, DropTableTarget::Logical);
}

View File

@@ -26,6 +26,7 @@ use crate::ddl::drop_database::State;
use crate::ddl::drop_table::executor::DropTableExecutor;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::router::{operating_leader_regions, RegionRoute};
use crate::table_name::TableName;
@@ -33,8 +34,10 @@ use crate::table_name::TableName;
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DropDatabaseExecutor {
table_id: TableId,
physical_table_id: TableId,
table_name: TableName,
pub(crate) region_routes: Vec<RegionRoute>,
/// The physical table region routes.
pub(crate) physical_region_routes: Vec<RegionRoute>,
pub(crate) target: DropTableTarget,
#[serde(skip)]
dropping_regions: Vec<OperatingRegionGuard>,
@@ -44,14 +47,16 @@ impl DropDatabaseExecutor {
/// Returns a new [DropDatabaseExecutor].
pub fn new(
table_id: TableId,
physical_table_id: TableId,
table_name: TableName,
region_routes: Vec<RegionRoute>,
physical_region_routes: Vec<RegionRoute>,
target: DropTableTarget,
) -> Self {
Self {
table_name,
table_id,
region_routes,
physical_table_id,
table_name,
physical_region_routes,
target,
dropping_regions: vec![],
}
@@ -60,7 +65,7 @@ impl DropDatabaseExecutor {
impl DropDatabaseExecutor {
fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
let dropping_regions = operating_leader_regions(&self.region_routes);
let dropping_regions = operating_leader_regions(&self.physical_region_routes);
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
for (region_id, datanode_id) in dropping_regions {
let guard = ddl_ctx
@@ -87,12 +92,18 @@ impl State for DropDatabaseExecutor {
) -> Result<(Box<dyn State>, Status)> {
self.register_dropping_regions(ddl_ctx)?;
let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
// Deletes metadata for table permanently.
let table_route_value = TableRouteValue::new(
self.table_id,
self.physical_table_id,
self.physical_region_routes.clone(),
);
executor
.on_remove_metadata(ddl_ctx, &self.region_routes)
.on_destroy_metadata(ddl_ctx, &table_route_value)
.await?;
executor.invalidate_table_cache(ddl_ctx).await?;
executor
.on_drop_regions(ddl_ctx, &self.region_routes)
.on_drop_regions(ddl_ctx, &self.physical_region_routes)
.await?;
info!("Table: {}({}) is dropped", self.table_name, self.table_id);
@@ -122,7 +133,9 @@ mod tests {
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::error::{self, Error, Result};
use crate::key::datanode_table::DatanodeTableKey;
use crate::peer::Peer;
use crate::rpc::router::region_distribution;
use crate::table_name::TableName;
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
@@ -157,6 +170,7 @@ mod tests {
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
physical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes.clone(),
@@ -181,9 +195,10 @@ mod tests {
tables: None,
};
let mut state = DropDatabaseExecutor::new(
physical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes,
table_route.region_routes.clone(),
DropTableTarget::Physical,
);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
@@ -207,6 +222,7 @@ mod tests {
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
logical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
table_route.region_routes.clone(),
@@ -231,8 +247,9 @@ mod tests {
tables: None,
};
let mut state = DropDatabaseExecutor::new(
logical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
table_route.region_routes,
DropTableTarget::Logical,
);
@@ -240,6 +257,33 @@ mod tests {
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Logical);
// Checks table info
ddl_context
.table_metadata_manager
.table_info_manager()
.get(physical_table_id)
.await
.unwrap()
.unwrap();
// Checks table route
let table_route = ddl_context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(physical_table_id)
.await
.unwrap()
.unwrap();
let region_routes = table_route.region_routes().unwrap();
for datanode_id in region_distribution(region_routes).into_keys() {
ddl_context
.table_metadata_manager
.datanode_table_manager()
.get(&DatanodeTableKey::new(datanode_id, physical_table_id))
.await
.unwrap()
.unwrap();
}
}
#[derive(Clone)]
@@ -279,6 +323,7 @@ mod tests {
.await
.unwrap();
let mut state = DropDatabaseExecutor::new(
physical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes,

View File

@@ -18,9 +18,11 @@ mod metadata;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use common_telemetry::info;
use common_telemetry::tracing::warn;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use strum::AsRefStr;
@@ -31,9 +33,7 @@ use self::executor::DropTableExecutor;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
@@ -47,46 +47,50 @@ pub struct DropTableProcedure {
pub data: DropTableData,
/// The guards of opening regions.
pub dropping_regions: Vec<OperatingRegionGuard>,
/// The drop table executor.
executor: DropTableExecutor,
}
impl DropTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
pub fn new(cluster_id: u64, task: DropTableTask, context: DdlContext) -> Self {
let data = DropTableData::new(cluster_id, task);
let executor = data.build_executor();
Self {
context,
data: DropTableData::new(cluster_id, task),
data,
dropping_regions: vec![],
executor,
}
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let executor = data.build_executor();
Ok(Self {
context,
data,
dropping_regions: vec![],
executor,
})
}
pub(crate) async fn on_prepare<'a>(&mut self, executor: &DropTableExecutor) -> Result<Status> {
if executor.on_prepare(&self.context).await?.stop() {
pub(crate) async fn on_prepare<'a>(&mut self) -> Result<Status> {
if self.executor.on_prepare(&self.context).await?.stop() {
return Ok(Status::done());
}
self.fill_table_metadata().await?;
self.data.state = DropTableState::RemoveMetadata;
self.data.state = DropTableState::DeleteMetadata;
Ok(Status::executing(true))
}
/// Register dropping regions if doesn't exist.
fn register_dropping_regions(&mut self) -> Result<()> {
// Safety: filled in `on_prepare`.
let region_routes = self.data.region_routes().unwrap()?;
let dropping_regions = operating_leader_regions(&self.data.physical_region_routes);
let dropping_regions = operating_leader_regions(region_routes);
if self.dropping_regions.len() == dropping_regions.len() {
if !self.dropping_regions.is_empty() {
return Ok(());
}
@@ -109,7 +113,7 @@ impl DropTableProcedure {
}
/// Removes the table metadata.
async fn on_remove_metadata(&mut self, executor: &DropTableExecutor) -> Result<Status> {
pub(crate) async fn on_delete_metadata(&mut self) -> Result<Status> {
self.register_dropping_regions()?;
// NOTES: If the meta server is crashed after the `RemoveMetadata`,
// Corresponding regions of this table on the Datanode will be closed automatically.
@@ -117,12 +121,15 @@ impl DropTableProcedure {
// TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping.
let table_id = self.data.table_id();
executor
.on_remove_metadata(
&self.context,
// Safety: filled in `on_prepare`.
self.data.region_routes().unwrap()?,
)
let table_route_value = &TableRouteValue::new(
self.data.task.table_id,
// Safety: checked
self.data.physical_table_id.unwrap(),
self.data.physical_region_routes.clone(),
);
// Deletes table metadata logically.
self.executor
.on_delete_metadata(&self.context, table_route_value)
.await?;
info!("Deleted table metadata for table {table_id}");
self.data.state = DropTableState::InvalidateTableCache;
@@ -130,30 +137,33 @@ impl DropTableProcedure {
}
/// Broadcasts invalidate table cache instruction.
async fn on_broadcast(&mut self, executor: &DropTableExecutor) -> Result<Status> {
executor.invalidate_table_cache(&self.context).await?;
async fn on_broadcast(&mut self) -> Result<Status> {
self.executor.invalidate_table_cache(&self.context).await?;
self.data.state = DropTableState::DatanodeDropRegions;
Ok(Status::executing(true))
}
pub async fn on_datanode_drop_regions(&self, executor: &DropTableExecutor) -> Result<Status> {
executor
.on_drop_regions(
&self.context,
// Safety: filled in `on_prepare`.
self.data.region_routes().unwrap()?,
)
pub async fn on_datanode_drop_regions(&mut self) -> Result<Status> {
self.executor
.on_drop_regions(&self.context, &self.data.physical_region_routes)
.await?;
Ok(Status::done())
self.data.state = DropTableState::DeleteTombstone;
Ok(Status::executing(true))
}
pub(crate) fn executor(&self) -> DropTableExecutor {
DropTableExecutor::new(
self.data.task.table_name(),
self.data.table_id(),
self.data.task.drop_if_exists,
)
/// Deletes metadata tombstone.
async fn on_delete_metadata_tombstone(&self) -> Result<Status> {
let table_route_value = &TableRouteValue::new(
self.data.task.table_id,
// Safety: checked
self.data.physical_table_id.unwrap(),
self.data.physical_region_routes.clone(),
);
self.executor
.on_delete_metadata_tombstone(&self.context, table_route_value)
.await?;
Ok(Status::done())
}
}
@@ -164,17 +174,17 @@ impl Procedure for DropTableProcedure {
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let executor = self.executor();
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
.with_label_values(&[state.as_ref()])
.start_timer();
match self.data.state {
DropTableState::Prepare => self.on_prepare(&executor).await,
DropTableState::RemoveMetadata => self.on_remove_metadata(&executor).await,
DropTableState::InvalidateTableCache => self.on_broadcast(&executor).await,
DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions(&executor).await,
DropTableState::Prepare => self.on_prepare().await,
DropTableState::DeleteMetadata => self.on_delete_metadata().await,
DropTableState::InvalidateTableCache => self.on_broadcast().await,
DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await,
DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await,
}
.map_err(handle_retry_error)
}
@@ -194,6 +204,28 @@ impl Procedure for DropTableProcedure {
LockKey::new(lock_key)
}
fn rollback_supported(&self) -> bool {
!matches!(self.data.state, DropTableState::Prepare)
}
async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
warn!(
"Rolling back the drop table procedure, table: {}",
self.data.table_id()
);
let table_route_value = &TableRouteValue::new(
self.data.task.table_id,
// Safety: checked
self.data.physical_table_id.unwrap(),
self.data.physical_region_routes.clone(),
);
self.executor
.on_restore_metadata(&self.context, table_route_value)
.await
.map_err(ProcedureError::external)
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -201,8 +233,8 @@ pub struct DropTableData {
pub state: DropTableState,
pub cluster_id: u64,
pub task: DropTableTask,
pub table_route_value: Option<DeserializedValueWithBytes<TableRouteValue>>,
pub table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
pub physical_region_routes: Vec<RegionRoute>,
pub physical_table_id: Option<TableId>,
}
impl DropTableData {
@@ -211,8 +243,8 @@ impl DropTableData {
state: DropTableState::Prepare,
cluster_id,
task,
table_route_value: None,
table_info_value: None,
physical_region_routes: vec![],
physical_table_id: None,
}
}
@@ -220,13 +252,17 @@ impl DropTableData {
self.task.table_ref()
}
fn region_routes(&self) -> Option<Result<&Vec<RegionRoute>>> {
self.table_route_value.as_ref().map(|v| v.region_routes())
}
fn table_id(&self) -> TableId {
self.task.table_id
}
fn build_executor(&self) -> DropTableExecutor {
DropTableExecutor::new(
self.task.table_name(),
self.task.table_id,
self.task.drop_if_exists,
)
}
}
/// The state of drop table.
@@ -234,10 +270,12 @@ impl DropTableData {
pub enum DropTableState {
/// Prepares to drop the table
Prepare,
/// Removes metadata
RemoveMetadata,
/// Deletes metadata logically
DeleteMetadata,
/// Invalidates Table Cache
InvalidateTableCache,
/// Drops regions on Datanode
DatanodeDropRegions,
/// Deletes metadata tombstone permanently
DeleteTombstone,
}

View File

@@ -30,6 +30,7 @@ use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::table_name::TableName;
@@ -99,14 +100,73 @@ impl DropTableExecutor {
Ok(Control::Continue(()))
}
/// Removes the table metadata.
pub async fn on_remove_metadata(
/// Deletes the table metadata **logically**.
pub async fn on_delete_metadata(
&self,
ctx: &DdlContext,
region_routes: &[RegionRoute],
table_route_value: &TableRouteValue,
) -> Result<()> {
let table_name_key = TableNameKey::new(
&self.table.catalog_name,
&self.table.schema_name,
&self.table.table_name,
);
if !ctx
.table_metadata_manager
.table_name_manager()
.exists(table_name_key)
.await?
{
return Ok(());
}
ctx.table_metadata_manager
.delete_table_metadata(self.table_id, &self.table, table_route_value)
.await
}
/// Deletes the table metadata tombstone **permanently**.
pub async fn on_delete_metadata_tombstone(
&self,
ctx: &DdlContext,
table_route_value: &TableRouteValue,
) -> Result<()> {
ctx.table_metadata_manager
.delete_table_metadata(self.table_id, &self.table, region_routes)
.delete_table_metadata_tombstone(self.table_id, &self.table, table_route_value)
.await
}
/// Deletes metadata for table **permanently**.
pub async fn on_destroy_metadata(
&self,
ctx: &DdlContext,
table_route_value: &TableRouteValue,
) -> Result<()> {
ctx.table_metadata_manager
.destroy_table_metadata(self.table_id, &self.table, table_route_value)
.await
}
/// Restores the table metadata.
pub async fn on_restore_metadata(
&self,
ctx: &DdlContext,
table_route_value: &TableRouteValue,
) -> Result<()> {
let table_name_key = TableNameKey::new(
&self.table.catalog_name,
&self.table.schema_name,
&self.table.table_name,
);
if ctx
.table_metadata_manager
.table_name_manager()
.exists(table_name_key)
.await?
{
return Ok(());
}
ctx.table_metadata_manager
.restore_table_metadata(self.table_id, &self.table, table_route_value)
.await
}

View File

@@ -12,35 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::format_full_table_name;
use snafu::OptionExt;
use crate::ddl::drop_table::DropTableProcedure;
use crate::error::{self, Result};
use crate::error::Result;
impl DropTableProcedure {
/// Fetches the table info and table route.
/// Fetches the table info and physical table route.
pub(crate) async fn fill_table_metadata(&mut self) -> Result<()> {
let task = &self.data.task;
let table_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(task.table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: format_full_table_name(&task.catalog, &task.schema, &task.table),
})?;
let (_, table_route_value) = self
let (physical_table_id, physical_table_route_value) = self
.context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw_physical_table_route(task.table_id)
.get_physical_table_route(task.table_id)
.await?;
self.data.table_info_value = Some(table_info_value);
self.data.table_route_value = Some(table_route_value);
self.data.physical_region_routes = physical_table_route_value.region_routes;
self.data.physical_table_id = Some(physical_table_id);
Ok(())
}
}

View File

@@ -19,17 +19,25 @@ use api::v1::region::{region_request, RegionRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::datanode_handler::DatanodeWatcher;
use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler};
use crate::ddl::test_util::{
create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task,
};
use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
use crate::key::table_route::TableRouteValue;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::ddl::DropTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
use crate::test_util::{new_ddl_context, new_ddl_context_with_kv_backend, MockDatanodeManager};
#[tokio::test]
async fn test_on_prepare_table_not_exists_err() {
@@ -59,8 +67,7 @@ async fn test_on_prepare_table_not_exists_err() {
};
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context);
let executor = procedure.executor();
let err = procedure.on_prepare(&executor).await.unwrap_err();
let err = procedure.on_prepare().await.unwrap_err();
assert_eq!(err.status_code(), StatusCode::TableNotFound);
}
@@ -93,8 +100,7 @@ async fn test_on_prepare_table() {
// Drop if exists
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
let executor = procedure.executor();
procedure.on_prepare(&executor).await.unwrap();
procedure.on_prepare().await.unwrap();
let task = DropTableTask {
catalog: DEFAULT_CATALOG_NAME.to_string(),
@@ -106,8 +112,7 @@ async fn test_on_prepare_table() {
// Drop table
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context);
let executor = procedure.executor();
procedure.on_prepare(&executor).await.unwrap();
procedure.on_prepare().await.unwrap();
}
#[tokio::test]
@@ -162,9 +167,8 @@ async fn test_on_datanode_drop_regions() {
};
// Drop table
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context);
let executor = procedure.executor();
procedure.on_prepare(&executor).await.unwrap();
procedure.on_datanode_drop_regions(&executor).await.unwrap();
procedure.on_prepare().await.unwrap();
procedure.on_datanode_drop_regions().await.unwrap();
let check = |peer: Peer,
request: RegionRequest,
@@ -191,3 +195,97 @@ async fn test_on_datanode_drop_regions() {
let (peer, request) = results.remove(0);
check(peer, request, 3, RegionId::new(table_id, 3));
}
#[tokio::test]
async fn test_on_rollback() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let kv_backend = Arc::new(MemoryKvBackend::new());
let ddl_context = new_ddl_context_with_kv_backend(datanode_manager, kv_backend.clone());
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
TableRouteValue::Physical(table_route),
)
.await;
// The create logical table procedure.
let physical_table_id = table_id;
// Creates the logical table metadata.
let task = test_create_logical_table_task("foo");
let mut procedure = CreateLogicalTablesProcedure::new(
cluster_id,
vec![task],
physical_table_id,
ddl_context.clone(),
);
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
// Triggers procedure to create table metadata
let status = procedure.execute(&ctx).await.unwrap();
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
assert_eq!(*table_ids, vec![1025]);
let expected_kvs = kv_backend.dump();
// Drops the physical table
{
let task = DropTableTask {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: "phy_table".to_string(),
table_id: physical_table_id,
drop_if_exists: false,
};
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
procedure.on_prepare().await.unwrap();
procedure.on_delete_metadata().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.rollback(&ctx).await.unwrap();
// Rollback again
procedure.rollback(&ctx).await.unwrap();
let kvs = kv_backend.dump();
assert_eq!(kvs, expected_kvs);
}
// Drops the logical table
let task = DropTableTask {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: "foo".to_string(),
table_id: table_ids[0],
drop_if_exists: false,
};
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
procedure.on_prepare().await.unwrap();
procedure.on_delete_metadata().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.rollback(&ctx).await.unwrap();
// Rollback again
procedure.rollback(&ctx).await.unwrap();
let kvs = kv_backend.dump();
assert_eq!(kvs, expected_kvs);
}

View File

@@ -421,6 +421,9 @@ pub enum Error {
#[snafu(display("Invalid role: {}", role))]
InvalidRole { role: i32, location: Location },
#[snafu(display("Atomic key changed: {err_msg}"))]
CasKeyChanged { err_msg: String, location: Location },
#[snafu(display("Failed to parse {} from utf8", name))]
FromUtf8 {
name: String,
@@ -440,7 +443,8 @@ impl ErrorExt for Error {
| EtcdTxnOpResponse { .. }
| EtcdFailed { .. }
| EtcdTxnFailed { .. }
| ConnectEtcd { .. } => StatusCode::Internal,
| ConnectEtcd { .. }
| CasKeyChanged { .. } => StatusCode::Internal,
SerdeJson { .. }
| ParseOption { .. }

View File

@@ -56,9 +56,12 @@ pub mod table_region;
pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;
// TODO(weny): remove it.
#[allow(dead_code)]
mod tombstone;
mod txn_helper;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;
@@ -83,9 +86,13 @@ use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
use self::datanode_table::RegionInfo;
use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
use self::table_route::{TableRouteManager, TableRouteValue};
use self::tombstone::TombstoneManager;
use crate::ddl::utils::region_storage_path;
use crate::error::{self, Result, SerdeJsonSnafu};
use crate::kv_backend::txn::{Txn, TxnOpResponse};
use crate::error::{self, Result, SerdeJsonSnafu, UnexpectedSnafu};
use crate::key::table_route::TableRouteKey;
use crate::key::tombstone::Key;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus};
use crate::table_name::TableName;
@@ -97,7 +104,6 @@ pub const MAINTENANCE_KEY: &str = "maintenance";
const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
const TABLE_REGION_KEY_PREFIX: &str = "__table_region";
pub const REMOVED_PREFIX: &str = "__removed";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
@@ -145,6 +151,33 @@ pub trait TableMetaKey {
fn as_raw_key(&self) -> Vec<u8>;
}
pub(crate) trait TableMetaKeyGetTxnOp {
fn build_get_op(
&self,
) -> (
TxnOp,
impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
);
}
impl TableMetaKey for String {
fn as_raw_key(&self) -> Vec<u8> {
self.as_bytes().to_vec()
}
}
impl TableMetaKeyGetTxnOp for String {
fn build_get_op(
&self,
) -> (
TxnOp,
impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
let key = self.as_raw_key();
(TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key))
}
}
pub trait TableMetaValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
where
@@ -162,6 +195,7 @@ pub struct TableMetadataManager {
catalog_manager: CatalogManager,
schema_manager: SchemaManager,
table_route_manager: TableRouteManager,
tombstone_manager: TombstoneManager,
kv_backend: KvBackendRef,
}
@@ -303,6 +337,7 @@ impl TableMetadataManager {
catalog_manager: CatalogManager::new(kv_backend.clone()),
schema_manager: SchemaManager::new(kv_backend.clone()),
table_route_manager: TableRouteManager::new(kv_backend.clone()),
tombstone_manager: TombstoneManager::new(kv_backend.clone()),
kv_backend,
}
}
@@ -363,19 +398,16 @@ impl TableMetadataManager {
Option<DeserializedValueWithBytes<TableInfoValue>>,
Option<DeserializedValueWithBytes<TableRouteValue>>,
)> {
let (get_table_route_txn, table_route_decoder) = self
.table_route_manager
.table_route_storage()
.build_get_txn(table_id);
let (get_table_info_txn, table_info_decoder) =
self.table_info_manager.build_get_txn(table_id);
let txn = Txn::merge_all(vec![get_table_route_txn, get_table_info_txn]);
let res = self.kv_backend.txn(txn).await?;
let table_info_value = table_info_decoder(&res.responses)?;
let table_route_value = table_route_decoder(&res.responses)?;
let table_info_key = TableInfoKey::new(table_id);
let table_route_key = TableRouteKey::new(table_id);
let (table_info_txn, table_info_filter) = table_info_key.build_get_op();
let (table_route_txn, table_route_filter) = table_route_key.build_get_op();
let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]);
let mut res = self.kv_backend.txn(txn).await?;
let mut set = TxnOpGetResponseSet::from(&mut res.responses);
let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?;
let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?;
Ok((table_info_value, table_route_value))
}
@@ -545,47 +577,106 @@ impl TableMetadataManager {
Ok(())
}
/// Deletes metadata for table.
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn delete_table_metadata(
fn table_metadata_keys(
&self,
table_id: TableId,
table_name: &TableName,
region_routes: &[RegionRoute],
) -> Result<()> {
// Deletes table name.
table_route_value: &TableRouteValue,
) -> Result<Vec<Key>> {
// Builds keys
let datanode_ids = if table_route_value.is_physical() {
region_distribution(table_route_value.region_routes()?)
.into_keys()
.collect()
} else {
vec![]
};
let mut keys = Vec::with_capacity(3 + datanode_ids.len());
let table_name = TableNameKey::new(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
let table_info_key = TableInfoKey::new(table_id);
let table_route_key = TableRouteKey::new(table_id);
let datanode_table_keys = datanode_ids
.into_iter()
.map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
.collect::<HashSet<_>>();
let delete_table_name_txn = self.table_name_manager().build_delete_txn(&table_name)?;
keys.push(Key::compare_and_swap(table_name.as_raw_key()));
keys.push(Key::new(table_info_key.as_raw_key()));
keys.push(Key::new(table_route_key.as_raw_key()));
for key in &datanode_table_keys {
keys.push(Key::new(key.as_raw_key()));
}
Ok(keys)
}
// Deletes table info.
let delete_table_info_txn = self.table_info_manager().build_delete_txn(table_id)?;
/// Deletes metadata for table **logically**.
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn delete_table_metadata(
&self,
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
) -> Result<()> {
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
self.tombstone_manager.create(keys).await?;
Ok(())
}
// Deletes datanode table key value pairs.
let distribution = region_distribution(region_routes);
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
/// Deletes metadata tombstone for table **permanently**.
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn delete_table_metadata_tombstone(
&self,
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
) -> Result<()> {
let keys = self
.table_metadata_keys(table_id, table_name, table_route_value)?
.into_iter()
.map(|key| key.into_bytes())
.collect::<Vec<_>>();
self.tombstone_manager.delete(keys).await
}
// Deletes table route.
let delete_table_route_txn = self
.table_route_manager()
.table_route_storage()
.build_delete_txn(table_id)?;
/// Restores metadata for table.
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn restore_table_metadata(
&self,
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
) -> Result<()> {
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
self.tombstone_manager.restore(keys).await?;
Ok(())
}
let txn = Txn::merge_all(vec![
delete_table_name_txn,
delete_table_info_txn,
delete_datanode_txn,
delete_table_route_txn,
]);
/// Deletes metadata for table **permanently**.
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn destroy_table_metadata(
&self,
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
) -> Result<()> {
let operations = self
.table_metadata_keys(table_id, table_name, table_route_value)?
.into_iter()
.map(|key| TxnOp::Delete(key.into_bytes()))
.collect::<Vec<_>>();
// It's always successes.
let _ = self.kv_backend.txn(txn).await?;
let txn = Txn::new().and_then(operations);
let resp = self.kv_backend.txn(txn).await?;
ensure!(
resp.succeeded,
UnexpectedSnafu {
err_msg: format!("Failed to destroy table metadata: {table_id}")
}
);
Ok(())
}
@@ -873,6 +964,38 @@ macro_rules! impl_table_meta_value {
}
}
macro_rules! impl_table_meta_key_get_txn_op {
($($key: ty), *) => {
$(
impl $crate::key::TableMetaKeyGetTxnOp for $key {
/// Returns a [TxnOp] to retrieve the corresponding value
/// and a filter to retrieve the value from the [TxnOpGetResponseSet]
fn build_get_op(
&self,
) -> (
TxnOp,
impl for<'a> FnMut(
&'a mut TxnOpGetResponseSet,
) -> Option<Vec<u8>>,
) {
let raw_key = self.as_raw_key();
(
TxnOp::Get(raw_key.clone()),
TxnOpGetResponseSet::filter(raw_key),
)
}
}
)*
}
}
impl_table_meta_key_get_txn_op! {
TableNameKey<'_>,
TableInfoKey,
TableRouteKey,
DatanodeTableKey
}
#[macro_export]
macro_rules! impl_optional_meta_value {
($($val_ty: ty), *) => {
@@ -907,6 +1030,7 @@ mod tests {
use std::sync::Arc;
use bytes::Bytes;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::util::current_time_millis;
use futures::TryStreamExt;
use store_api::storage::RegionId;
@@ -914,6 +1038,7 @@ mod tests {
use super::datanode_table::DatanodeTableKey;
use super::test_utils;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::utils::region_storage_path;
use crate::error::Result;
use crate::key::datanode_table::RegionInfo;
@@ -1155,15 +1280,10 @@ mod tests {
table_info.schema_name,
table_info.name,
);
let table_route_value = &TableRouteValue::physical(region_routes.clone());
// deletes metadata.
table_metadata_manager
.delete_table_metadata(table_id, &table_name, region_routes)
.await
.unwrap();
// if metadata was already deleted, it should be ok.
table_metadata_manager
.delete_table_metadata(table_id, &table_name, region_routes)
.delete_table_metadata(table_id, &table_name, table_route_value)
.await
.unwrap();
@@ -1559,4 +1679,118 @@ mod tests {
.await
.is_err());
}
#[tokio::test]
async fn test_destroy_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
let table_id = 1025;
let table_name = "foo";
let task = test_create_table_task(table_name, table_id);
let options = [(0, "test".to_string())].into();
table_metadata_manager
.create_table_metadata(
task.table_info,
TableRouteValue::physical(vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![Peer::empty(4)],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
]),
options,
)
.await
.unwrap();
let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
let table_route_value = table_metadata_manager
.table_route_manager
.table_route_storage()
.get_raw(table_id)
.await
.unwrap()
.unwrap();
table_metadata_manager
.destroy_table_metadata(table_id, &table_name, &table_route_value)
.await
.unwrap();
assert!(mem_kv.is_empty());
}
#[tokio::test]
async fn test_restore_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
let table_id = 1025;
let table_name = "foo";
let task = test_create_table_task(table_name, table_id);
let options = [(0, "test".to_string())].into();
table_metadata_manager
.create_table_metadata(
task.table_info,
TableRouteValue::physical(vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![Peer::empty(4)],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
]),
options,
)
.await
.unwrap();
let expected_result = mem_kv.dump();
let table_route_value = table_metadata_manager
.table_route_manager
.table_route_storage()
.get_raw(table_id)
.await
.unwrap()
.unwrap();
let region_routes = table_route_value.region_routes().unwrap();
let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
let table_route_value = TableRouteValue::physical(region_routes.clone());
table_metadata_manager
.delete_table_metadata(table_id, &table_name, &table_route_value)
.await
.unwrap();
table_metadata_manager
.restore_table_metadata(table_id, &table_name, &table_route_value)
.await
.unwrap();
let kvs = mem_kv.dump();
assert_eq!(kvs, expected_result);
}
}

View File

@@ -55,6 +55,7 @@ pub struct RegionInfo {
pub region_wal_options: HashMap<RegionNumber, String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct DatanodeTableKey {
pub datanode_id: DatanodeId,
pub table_id: TableId,

View File

@@ -18,10 +18,11 @@ use serde::{Deserialize, Serialize};
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use super::{txn_helper, DeserializedValueWithBytes, TableMetaValue, TABLE_INFO_KEY_PREFIX};
use crate::error::Result;
use crate::key::TableMetaKey;
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse};
use crate::key::{
txn_helper, DeserializedValueWithBytes, TableMetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX,
};
use crate::kv_backend::txn::{Txn, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
use crate::table_name::TableName;
@@ -101,20 +102,6 @@ impl TableInfoManager {
Self { kv_backend }
}
pub(crate) fn build_get_txn(
&self,
table_id: TableId,
) -> (
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>,
) {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]);
(txn, txn_helper::build_txn_response_decoder_fn(raw_key))
}
/// Builds a create table info transaction, it expected the `__table_info/{table_id}` wasn't occupied.
pub(crate) fn build_create_txn(
&self,
@@ -156,16 +143,6 @@ impl TableInfoManager {
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
}
/// Builds a delete table info transaction.
pub(crate) fn build_delete_txn(&self, table_id: TableId) -> Result<Txn> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = Txn::new().and_then(vec![TxnOp::Delete(raw_key)]);
Ok(txn)
}
pub async fn get(
&self,
table_id: TableId,

View File

@@ -194,14 +194,6 @@ impl TableNameManager {
Ok(txn)
}
/// Builds a delete table name transaction. It only executes while the primary keys comparing successes.
pub(crate) fn build_delete_txn(&self, key: &TableNameKey<'_>) -> Result<Txn> {
let raw_key = key.as_raw_key();
let txn = Txn::new().and_then(vec![TxnOp::Delete(raw_key)]);
Ok(txn)
}
pub async fn get(&self, key: TableNameKey<'_>) -> Result<Option<TableNameValue>> {
let raw_key = key.as_raw_key();
self.kv_backend

View File

@@ -26,7 +26,7 @@ use crate::error::{
UnexpectedLogicalRouteTableSnafu,
};
use crate::key::{RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::txn::{Txn, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute};
use crate::rpc::store::BatchGetRequest;
@@ -61,6 +61,27 @@ pub struct LogicalTableRouteValue {
}
impl TableRouteValue {
/// Returns a [TableRouteValue::Physical] if `table_id` equals `physical_table_id`.
/// Otherwise returns a [TableRouteValue::Logical].
pub(crate) fn new(
table_id: TableId,
physical_table_id: TableId,
region_routes: Vec<RegionRoute>,
) -> Self {
if table_id == physical_table_id {
TableRouteValue::physical(region_routes)
} else {
let region_routes = region_routes
.into_iter()
.map(|region| {
debug_assert_eq!(region.region.id.table_id(), physical_table_id);
RegionId::new(table_id, region.region.id.region_number())
})
.collect::<Vec<_>>();
TableRouteValue::logical(physical_table_id, region_routes)
}
}
pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
Self::Physical(PhysicalTableRouteValue::new(region_routes))
}
@@ -425,21 +446,6 @@ impl TableRouteStorage {
Self { kv_backend }
}
/// Builds a get table route transaction(readonly).
pub(crate) fn build_get_txn(
&self,
table_id: TableId,
) -> (
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
) {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]);
(txn, txn_helper::build_txn_response_decoder_fn(raw_key))
}
/// Builds a create table route transaction,
/// it expected the `__table_route/{table_id}` wasn't occupied.
pub fn build_create_txn(
@@ -483,17 +489,6 @@ impl TableRouteStorage {
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
}
/// Builds a delete table route transaction,
/// it expected the remote value equals the `table_route_value`.
pub(crate) fn build_delete_txn(&self, table_id: TableId) -> Result<Txn> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = Txn::new().and_then(vec![TxnOp::Delete(raw_key)]);
Ok(txn)
}
/// Returns the [`TableRouteValue`].
pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
let key = TableRouteKey::new(table_id);

View File

@@ -0,0 +1,544 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::{ensure, OptionExt};
use super::TableMetaKeyGetTxnOp;
use crate::error::{self, Result};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
/// [TombstoneManager] provides the ability to:
/// - logically delete values
/// - restore the deleted values
pub(crate) struct TombstoneManager {
kv_backend: KvBackendRef,
}
const TOMBSTONE_PREFIX: &str = "__tombstone/";
pub(crate) struct TombstoneKey<T>(T);
fn to_tombstone(key: &[u8]) -> Vec<u8> {
[TOMBSTONE_PREFIX.as_bytes(), key].concat()
}
impl TombstoneKey<&Vec<u8>> {
/// Returns the origin key and tombstone key.
fn to_keys(&self) -> (Vec<u8>, Vec<u8>) {
let key = self.0;
let tombstone_key = to_tombstone(key);
(key.clone(), tombstone_key)
}
/// Returns the origin key and tombstone key.
fn into_keys(self) -> (Vec<u8>, Vec<u8>) {
self.to_keys()
}
/// Returns the tombstone key.
fn to_tombstone_key(&self) -> Vec<u8> {
let key = self.0;
to_tombstone(key)
}
}
impl TableMetaKeyGetTxnOp for TombstoneKey<&Vec<u8>> {
fn build_get_op(
&self,
) -> (
TxnOp,
impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
TxnOpGetResponseSet::build_get_op(to_tombstone(self.0))
}
}
/// The key used in the [TombstoneManager].
pub(crate) struct Key {
bytes: Vec<u8>,
// Atomic Key:
// The value corresponding to the key remains consistent between two transactions.
atomic: bool,
}
impl Key {
/// Returns a new atomic key.
pub(crate) fn compare_and_swap<T: Into<Vec<u8>>>(key: T) -> Self {
Self {
bytes: key.into(),
atomic: true,
}
}
/// Returns a new normal key.
pub(crate) fn new<T: Into<Vec<u8>>>(key: T) -> Self {
Self {
bytes: key.into(),
atomic: false,
}
}
/// Into bytes
pub(crate) fn into_bytes(self) -> Vec<u8> {
self.bytes
}
fn get_inner(&self) -> &Vec<u8> {
&self.bytes
}
fn is_atomic(&self) -> bool {
self.atomic
}
}
impl TableMetaKeyGetTxnOp for Key {
fn build_get_op(
&self,
) -> (
TxnOp,
impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
let key = self.get_inner().clone();
(TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key))
}
}
fn format_on_failure_error_message<F: FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>>(
mut set: TxnOpGetResponseSet,
on_failure_kv_and_filters: Vec<(Vec<u8>, Vec<u8>, F)>,
) -> String {
on_failure_kv_and_filters
.into_iter()
.flat_map(|(key, value, mut filter)| {
let got = filter(&mut set);
let Some(got) = got else {
return Some(format!(
"For key: {} was expected: {}, but value does not exists",
String::from_utf8_lossy(&key),
String::from_utf8_lossy(&value),
));
};
if got != value {
Some(format!(
"For key: {} was expected: {}, but got: {}",
String::from_utf8_lossy(&key),
String::from_utf8_lossy(&value),
String::from_utf8_lossy(&got),
))
} else {
None
}
})
.collect::<Vec<_>>()
.join("; ")
}
fn format_keys(keys: &[Key]) -> String {
keys.iter()
.map(|key| String::from_utf8_lossy(&key.bytes))
.collect::<Vec<_>>()
.join(", ")
}
impl TombstoneManager {
/// Returns [TombstoneManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Creates tombstones for keys.
///
/// Preforms to:
/// - retrieve all values corresponding `keys`.
/// - stores tombstone values.
pub(crate) async fn create(&self, keys: Vec<Key>) -> Result<()> {
// Builds transaction to retrieve all values
let (operations, mut filters): (Vec<_>, Vec<_>) =
keys.iter().map(|key| key.build_get_op()).unzip();
let txn = Txn::new().and_then(operations);
let mut resp = self.kv_backend.txn(txn).await?;
ensure!(
resp.succeeded,
error::UnexpectedSnafu {
err_msg: format!(
"Failed to retrieves the metadata, keys: {}",
format_keys(&keys)
),
}
);
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
// Builds the create tombstone transaction.
let mut tombstone_operations = Vec::with_capacity(keys.len() * 2);
let mut tombstone_comparison = vec![];
let mut on_failure_operations = vec![];
let mut on_failure_kv_and_filters = vec![];
for (idx, key) in keys.iter().enumerate() {
let filter = &mut filters[idx];
let value = filter(&mut set).with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Missing value, key: {}",
String::from_utf8_lossy(key.get_inner())
),
})?;
let (origin_key, tombstone_key) = TombstoneKey(key.get_inner()).into_keys();
// Compares the atomic key.
if key.is_atomic() {
tombstone_comparison.push(Compare::with_not_exist_value(
tombstone_key.clone(),
CompareOp::Equal,
));
tombstone_comparison.push(Compare::with_value(
origin_key.clone(),
CompareOp::Equal,
value.clone(),
));
let (op, filter) = TxnOpGetResponseSet::build_get_op(origin_key.clone());
on_failure_operations.push(op);
on_failure_kv_and_filters.push((origin_key.clone(), value.clone(), filter));
}
tombstone_operations.push(TxnOp::Delete(origin_key));
tombstone_operations.push(TxnOp::Put(tombstone_key, value));
}
let txn = if !tombstone_comparison.is_empty() {
Txn::new().when(tombstone_comparison)
} else {
Txn::new()
}
.and_then(tombstone_operations);
let txn = if !on_failure_operations.is_empty() {
txn.or_else(on_failure_operations)
} else {
txn
};
let mut resp = self.kv_backend.txn(txn).await?;
// TODO(weny): add tests for atomic key changed.
if !resp.succeeded {
let set = TxnOpGetResponseSet::from(&mut resp.responses);
let err_msg = format_on_failure_error_message(set, on_failure_kv_and_filters);
return error::CasKeyChangedSnafu { err_msg }.fail();
}
Ok(())
}
/// Restores tombstones for keys.
///
/// Preforms to:
/// - retrieve all tombstone values corresponding `keys`.
/// - stores tombstone values.
pub(crate) async fn restore(&self, keys: Vec<Key>) -> Result<()> {
// Builds transaction to retrieve all tombstone values
let tombstone_keys = keys
.iter()
.map(|key| TombstoneKey(key.get_inner()))
.collect::<Vec<_>>();
let (operations, mut filters): (Vec<_>, Vec<_>) =
tombstone_keys.iter().map(|key| key.build_get_op()).unzip();
let txn = Txn::new().and_then(operations);
let mut resp = self.kv_backend.txn(txn).await?;
ensure!(
resp.succeeded,
error::UnexpectedSnafu {
err_msg: format!(
"Failed to retrieves the metadata, keys: {}",
format_keys(&keys)
),
}
);
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
// Builds the restore tombstone transaction.
let mut tombstone_operations = Vec::with_capacity(keys.len() * 2);
let mut tombstone_comparison = vec![];
let mut on_failure_operations = vec![];
let mut on_failure_kv_and_filters = vec![];
for (idx, key) in keys.iter().enumerate() {
let filter = &mut filters[idx];
let value = filter(&mut set).with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Missing value, key: {}",
String::from_utf8_lossy(key.get_inner())
),
})?;
let (origin_key, tombstone_key) = tombstone_keys[idx].to_keys();
// Compares the atomic key.
if key.is_atomic() {
tombstone_comparison.push(Compare::with_not_exist_value(
origin_key.clone(),
CompareOp::Equal,
));
tombstone_comparison.push(Compare::with_value(
tombstone_key.clone(),
CompareOp::Equal,
value.clone(),
));
let (op, filter) = tombstone_keys[idx].build_get_op();
on_failure_operations.push(op);
on_failure_kv_and_filters.push((tombstone_key.clone(), value.clone(), filter));
}
tombstone_operations.push(TxnOp::Delete(tombstone_key));
tombstone_operations.push(TxnOp::Put(origin_key, value));
}
let txn = if !tombstone_comparison.is_empty() {
Txn::new().when(tombstone_comparison)
} else {
Txn::new()
}
.and_then(tombstone_operations);
let txn = if !on_failure_operations.is_empty() {
txn.or_else(on_failure_operations)
} else {
txn
};
let mut resp = self.kv_backend.txn(txn).await?;
// TODO(weny): add tests for atomic key changed.
if !resp.succeeded {
let set = TxnOpGetResponseSet::from(&mut resp.responses);
let err_msg = format_on_failure_error_message(set, on_failure_kv_and_filters);
return error::CasKeyChangedSnafu { err_msg }.fail();
}
Ok(())
}
/// Deletes tombstones for keys.
pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
let operations = keys
.iter()
.map(|key| TxnOp::Delete(TombstoneKey(key).to_tombstone_key()))
.collect::<Vec<_>>();
let txn = Txn::new().and_then(operations);
// Always success.
let _ = self.kv_backend.txn(txn).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::key::tombstone::{Key, TombstoneKey, TombstoneManager};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;
#[tokio::test]
async fn test_create_tombstone() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
.unwrap();
kv_backend
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.await
.unwrap();
assert!(!kv_backend.exists(b"bar").await.unwrap());
assert!(!kv_backend.exists(b"foo").await.unwrap());
assert_eq!(
kv_backend
.get(&TombstoneKey(&"bar".into()).to_tombstone_key())
.await
.unwrap()
.unwrap()
.value,
b"baz"
);
assert_eq!(
kv_backend
.get(&TombstoneKey(&"foo".into()).to_tombstone_key())
.await
.unwrap()
.unwrap()
.value,
b"hi"
);
assert_eq!(kv_backend.len(), 2);
}
#[tokio::test]
async fn test_create_tombstone_without_atomic_key() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
.unwrap();
kv_backend
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
tombstone_manager
.create(vec![Key::new("bar"), Key::new("foo")])
.await
.unwrap();
assert!(!kv_backend.exists(b"bar").await.unwrap());
assert!(!kv_backend.exists(b"foo").await.unwrap());
assert_eq!(
kv_backend
.get(&TombstoneKey(&"bar".into()).to_tombstone_key())
.await
.unwrap()
.unwrap()
.value,
b"baz"
);
assert_eq!(
kv_backend
.get(&TombstoneKey(&"foo".into()).to_tombstone_key())
.await
.unwrap()
.unwrap()
.value,
b"hi"
);
assert_eq!(kv_backend.len(), 2);
}
#[tokio::test]
async fn test_create_tombstone_origin_value_not_found_err() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
.unwrap();
kv_backend
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
let err = tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("baz")])
.await
.unwrap_err();
assert!(err.to_string().contains("Missing value"));
}
#[tokio::test]
async fn test_restore_tombstone() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
.unwrap();
kv_backend
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
let expected_kvs = kv_backend.dump();
tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.await
.unwrap();
tombstone_manager
.restore(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.await
.unwrap();
assert_eq!(expected_kvs, kv_backend.dump());
}
#[tokio::test]
async fn test_restore_tombstone_without_atomic_key() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
.unwrap();
kv_backend
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
let expected_kvs = kv_backend.dump();
tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.await
.unwrap();
tombstone_manager
.restore(vec![Key::new("bar"), Key::new("foo")])
.await
.unwrap();
assert_eq!(expected_kvs, kv_backend.dump());
}
#[tokio::test]
async fn test_restore_tombstone_origin_value_not_found_err() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
.unwrap();
kv_backend
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.await
.unwrap();
let err = tombstone_manager
.restore(vec![Key::new("bar"), Key::new("baz")])
.await
.unwrap_err();
assert!(err.to_string().contains("Missing value"));
}
#[tokio::test]
async fn test_delete_tombstone() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
.unwrap();
kv_backend
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.await
.unwrap();
tombstone_manager
.delete(vec![b"bar".to_vec(), b"foo".to_vec()])
.await
.unwrap();
assert!(kv_backend.is_empty());
}
}

View File

@@ -18,7 +18,69 @@ use serde::Serialize;
use crate::error::Result;
use crate::key::{DeserializedValueWithBytes, TableMetaValue};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::rpc::KeyValue;
/// The response set of [TxnOpResponse::ResponseGet]
pub(crate) struct TxnOpGetResponseSet(Vec<KeyValue>);
impl TxnOpGetResponseSet {
/// Returns a [TxnOp] to retrieve the value corresponding `key` and
/// a filter to consume corresponding [KeyValue] from [TxnOpGetResponseSet].
pub(crate) fn build_get_op<T: Into<Vec<u8>>>(
key: T,
) -> (
TxnOp,
impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
let key = key.into();
(TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key))
}
/// Returns a filter to consume a [KeyValue] where the key equals `key`.
pub(crate) fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
move |set| {
let pos = set.0.iter().position(|kv| kv.key == key);
match pos {
Some(pos) => Some(set.0.remove(pos).value),
None => None,
}
}
}
/// Returns a decoder to decode bytes to `DeserializedValueWithBytes<T>`.
pub(crate) fn decode_with<F, T>(
mut f: F,
) -> impl FnMut(&mut TxnOpGetResponseSet) -> Result<Option<DeserializedValueWithBytes<T>>>
where
F: FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
T: Serialize + DeserializeOwned + TableMetaValue,
{
move |set| {
f(set)
.map(|value| DeserializedValueWithBytes::from_inner_slice(&value))
.transpose()
}
}
}
impl From<&mut Vec<TxnOpResponse>> for TxnOpGetResponseSet {
fn from(value: &mut Vec<TxnOpResponse>) -> Self {
let value = value
.extract_if(|resp| matches!(resp, TxnOpResponse::ResponseGet(_)))
.flat_map(|resp| {
// Safety: checked
let TxnOpResponse::ResponseGet(r) = resp else {
unreachable!()
};
r.kvs
})
.collect::<Vec<_>>();
TxnOpGetResponseSet(value)
}
}
// TODO(weny): using `TxnOpGetResponseSet`.
pub(crate) fn build_txn_response_decoder_fn<T>(
raw_key: Vec<u8>,
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<T>>>

View File

@@ -70,6 +70,25 @@ impl<T> MemoryKvBackend<T> {
let mut kvs = self.kvs.write().unwrap();
kvs.clear();
}
#[cfg(test)]
/// Returns true if the `kvs` is empty.
pub fn is_empty(&self) -> bool {
self.kvs.read().unwrap().is_empty()
}
#[cfg(test)]
/// Returns the `kvs`.
pub fn dump(&self) -> BTreeMap<Vec<u8>, Vec<u8>> {
let kvs = self.kvs.read().unwrap();
kvs.clone()
}
#[cfg(test)]
/// Returns the length of `kvs`
pub fn len(&self) -> usize {
self.kvs.read().unwrap().len()
}
}
#[async_trait]

View File

@@ -16,6 +16,7 @@
#![feature(btree_extract_if)]
#![feature(async_closure)]
#![feature(let_chains)]
#![feature(extract_if)]
pub mod cache_invalidator;
pub mod cluster;

View File

@@ -27,6 +27,7 @@ use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
@@ -86,6 +87,14 @@ impl<T: MockDatanodeHandler + 'static> DatanodeManager for MockDatanodeManager<T
/// Returns a test purpose [DdlContext].
pub fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext {
let kv_backend = Arc::new(MemoryKvBackend::new());
new_ddl_context_with_kv_backend(datanode_manager, kv_backend)
}
/// Returns a test purpose [DdlContext] with a specified [KvBackendRef].
pub fn new_ddl_context_with_kv_backend(
datanode_manager: DatanodeManagerRef,
kv_backend: KvBackendRef,
) -> DdlContext {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
DdlContext {

View File

@@ -545,9 +545,7 @@ impl RegionServerInner {
match region_change {
RegionChange::None => {}
RegionChange::Register(_, _) | RegionChange::Deregisters => {
self.region_map
.remove(&region_id)
.map(|(id, engine)| engine.set_writable(id, false));
self.region_map.remove(&region_id);
}
}
}

View File

@@ -71,7 +71,26 @@ DESC TABLE t2;
| val | Float64 | | YES | | FIELD |
+--------+----------------------+-----+------+---------+---------------+
-- TODO(ruihang): add a case that drops phy before t1
-- should be failed
-- SQLNESS REPLACE (region\s\d+\(\d+\,\s\d+\)) region
DROP TABLE phy;
Error: 1004(InvalidArguments), Physical region is busy, there are still some logical regions using it
-- metadata should be restored
DESC TABLE phy;
+------------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+------------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | | NO | | FIELD |
| val | Float64 | | YES | | FIELD |
| __table_id | UInt32 | PRI | NO | | TAG |
| __tsid | UInt64 | PRI | NO | | TAG |
| host | String | PRI | YES | | TAG |
| job | String | PRI | YES | | TAG |
+------------+----------------------+-----+------+---------+---------------+
DROP TABLE t1;
Affected Rows: 0

View File

@@ -16,7 +16,11 @@ DESC TABLE t1;
DESC TABLE t2;
-- TODO(ruihang): add a case that drops phy before t1
-- should be failed
-- SQLNESS REPLACE (region\s\d+\(\d+\,\s\d+\)) region
DROP TABLE phy;
-- metadata should be restored
DESC TABLE phy;
DROP TABLE t1;