mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-03 20:40:37 +00:00
feat(soft-drop): deregister failure detectors and handle replayed open-regions during purge
- `src/common/meta/src/ddl/drop_table.rs`: deregister failure detectors before transitioning to DeleteTombstone state - `src/common/meta/src/ddl/undrop_table.rs`: refactor `open_regions` into `open_regions_inner` with an `ignore_region_not_found` flag; expose `open_regions_ignore_region_not_found` for purge replayer - `src/common/meta/src/ddl/purge_dropped_table.rs`: use `open_regions_ignore_region_not_found` in replayed purge procedures - `src/common/meta/src/ddl/tests/drop_table.rs`: add tests for undrop idempotency and purge replay tolerance of dropped regions Signed-off-by: Lei, HUANG <ratuthomm@gmail.com>
This commit is contained in:
@@ -200,6 +200,11 @@ impl DropTableProcedure {
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
self.context
|
||||
.deregister_failure_detectors(convert_region_routes_to_detecting_regions(
|
||||
&self.data.physical_region_routes,
|
||||
))
|
||||
.await;
|
||||
|
||||
self.data.state = DropTableState::DeleteTombstone;
|
||||
Ok(Status::executing(true))
|
||||
|
||||
@@ -29,7 +29,7 @@ use table::table_name::TableName;
|
||||
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::ddl::drop_table::executor::DropTableExecutor;
|
||||
use crate::ddl::undrop_table::open_regions;
|
||||
use crate::ddl::undrop_table::open_regions_ignore_region_not_found;
|
||||
use crate::ddl::utils::{
|
||||
convert_region_routes_to_detecting_regions, is_metric_engine_logical_table,
|
||||
map_to_procedure_error,
|
||||
@@ -90,7 +90,7 @@ impl PurgeDroppedTableProcedure {
|
||||
|
||||
async fn on_open_regions(&mut self) -> Result<Status> {
|
||||
if let Some(region_routes) = self.data.physical_region_routes() {
|
||||
open_regions(
|
||||
open_regions_ignore_region_not_found(
|
||||
&self.context,
|
||||
self.data.table_id(),
|
||||
self.data.table_name(),
|
||||
|
||||
@@ -14,17 +14,19 @@
|
||||
|
||||
use std::assert_matches;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex as StdMutex};
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::region::{RegionRequest, region_request};
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::ext::{BoxedError, ErrorExt, StackError};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_procedure::Procedure;
|
||||
use common_procedure_test::{
|
||||
execute_procedure_until, execute_procedure_until_done, new_test_procedure_context,
|
||||
};
|
||||
use snafu::ResultExt;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
@@ -42,7 +44,7 @@ use crate::ddl::test_util::{
|
||||
};
|
||||
use crate::ddl::undrop_table::UndropTableProcedure;
|
||||
use crate::ddl::{DetectingRegion, RegionFailureDetectorController, TableMetadata};
|
||||
use crate::error::Error;
|
||||
use crate::error::{self, Error};
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
@@ -333,7 +335,9 @@ async fn test_soft_drop_closes_regions_and_keeps_tombstone() {
|
||||
#[tokio::test]
|
||||
async fn test_hard_drop_keeps_delete_tombstone_flow() {
|
||||
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(node_manager);
|
||||
let detector_controller = Arc::new(RecordingRegionFailureDetectorController::default());
|
||||
let mut ddl_context = new_ddl_context(node_manager);
|
||||
ddl_context.region_failure_detector_controller = detector_controller.clone();
|
||||
let table_id = 1024;
|
||||
let table_name = "foo";
|
||||
let task = test_create_table_task(table_name, table_id);
|
||||
@@ -372,6 +376,10 @@ async fn test_hard_drop_keeps_delete_tombstone_flow() {
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(dropped_table.is_none());
|
||||
assert_eq!(
|
||||
detector_controller.deregistered().await,
|
||||
vec![(1, RegionId::new(table_id, 1))]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -807,6 +815,54 @@ async fn test_undrop_table_fails_when_live_name_is_created_after_prepare() {
|
||||
assert_eq!(live_table.table_id(), live_table_id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_undrop_table_replayed_restore_metadata_is_idempotent() {
|
||||
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let mut ddl_context = new_ddl_context(node_manager);
|
||||
ddl_context.soft_drop_enabled = true;
|
||||
let table_id = 1024;
|
||||
let table_name = "foo";
|
||||
let task = test_create_table_task(table_name, table_id);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![]),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut drop_procedure = DropTableProcedure::new(
|
||||
new_drop_table_task(table_name, table_id, false),
|
||||
ddl_context.clone(),
|
||||
);
|
||||
execute_procedure_until_done(&mut drop_procedure).await;
|
||||
|
||||
let mut procedure =
|
||||
UndropTableProcedure::new(new_undrop_table_task(table_id), ddl_context.clone());
|
||||
let ctx = new_test_procedure_context();
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
let restore_metadata_data = procedure.dump().unwrap();
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
|
||||
let mut replayed =
|
||||
UndropTableProcedure::from_json(&restore_metadata_data, ddl_context.clone()).unwrap();
|
||||
execute_procedure_until_done(&mut replayed).await;
|
||||
|
||||
let live_table = ddl_context
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(TableNameKey::new(
|
||||
DEFAULT_CATALOG_NAME,
|
||||
DEFAULT_SCHEMA_NAME,
|
||||
table_name,
|
||||
))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(live_table.table_id(), table_id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_purge_dropped_table_drops_regions_and_deletes_tombstone() {
|
||||
let (tx, mut rx) = mpsc::channel(8);
|
||||
@@ -950,6 +1006,106 @@ async fn test_purge_dropped_table_by_id_selects_tombstone_when_live_table_exists
|
||||
assert!(rx.try_recv().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_purge_dropped_table_replayed_open_regions_ignores_dropped_regions() {
|
||||
let (tx, mut rx) = mpsc::channel(8);
|
||||
let dropped_regions = Arc::new(StdMutex::new(HashSet::new()));
|
||||
let datanode_handler = DatanodeWatcher::new(tx).with_handler({
|
||||
let dropped_regions = dropped_regions.clone();
|
||||
move |_peer, request| {
|
||||
let Some(body) = request.body.as_ref() else {
|
||||
return Ok(RegionResponse::new(0));
|
||||
};
|
||||
match body {
|
||||
region_request::Body::Open(req)
|
||||
if dropped_regions.lock().unwrap().contains(&req.region_id) =>
|
||||
{
|
||||
Err::<RegionResponse, _>(BoxedError::new(MockRegionNotFoundError))
|
||||
.context(error::ExternalSnafu)
|
||||
}
|
||||
region_request::Body::Drop(req) => {
|
||||
dropped_regions.lock().unwrap().insert(req.region_id);
|
||||
Ok(RegionResponse::new(0))
|
||||
}
|
||||
_ => Ok(RegionResponse::new(0)),
|
||||
}
|
||||
}
|
||||
});
|
||||
let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
|
||||
let mut ddl_context = new_ddl_context(node_manager);
|
||||
ddl_context.soft_drop_enabled = true;
|
||||
let table_id = 1024;
|
||||
let table_name = "foo";
|
||||
let task = test_create_table_task(table_name, table_id);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(table_id, 1)),
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
follower_peers: vec![],
|
||||
leader_state: None,
|
||||
leader_down_since: None,
|
||||
write_route_policy: None,
|
||||
}]),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut drop_procedure = DropTableProcedure::new(
|
||||
new_drop_table_task(table_name, table_id, false),
|
||||
ddl_context.clone(),
|
||||
);
|
||||
execute_procedure_until_done(&mut drop_procedure).await;
|
||||
while rx.try_recv().is_ok() {}
|
||||
|
||||
let mut procedure = PurgeDroppedTableProcedure::new(
|
||||
new_purge_dropped_table_task(table_id),
|
||||
ddl_context.clone(),
|
||||
);
|
||||
let ctx = new_test_procedure_context();
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
let open_regions_data = procedure.dump().unwrap();
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
|
||||
let mut replayed =
|
||||
PurgeDroppedTableProcedure::from_json(&open_regions_data, ddl_context.clone()).unwrap();
|
||||
execute_procedure_until_done(&mut replayed).await;
|
||||
|
||||
assert!(
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.get_dropped_table(&drop_procedure.data.task.table_name())
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Debug, snafu::Snafu)]
|
||||
#[snafu(display("mock region not found"))]
|
||||
struct MockRegionNotFoundError;
|
||||
|
||||
impl StackError for MockRegionNotFoundError {
|
||||
fn debug_fmt(&self, _: usize, _: &mut Vec<String>) {}
|
||||
|
||||
fn next(&self) -> Option<&dyn StackError> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl ErrorExt for MockRegionNotFoundError {
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn status_code(&self) -> StatusCode {
|
||||
StatusCode::RegionNotFound
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_rollback() {
|
||||
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
|
||||
@@ -18,6 +18,8 @@ use api::v1::region::{
|
||||
OpenRequest as PbOpenRegionRequest, RegionRequest, RegionRequestHeader, region_request,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
|
||||
use common_procedure::{
|
||||
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
|
||||
@@ -206,6 +208,47 @@ pub(crate) async fn open_regions(
|
||||
table_info: &table::metadata::TableInfo,
|
||||
region_routes: &[RegionRoute],
|
||||
region_wal_options: &HashMap<RegionNumber, WalOptions>,
|
||||
) -> Result<()> {
|
||||
open_regions_inner(
|
||||
context,
|
||||
table_id,
|
||||
table_name,
|
||||
table_info,
|
||||
region_routes,
|
||||
region_wal_options,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn open_regions_ignore_region_not_found(
|
||||
context: &DdlContext,
|
||||
table_id: TableId,
|
||||
table_name: &TableName,
|
||||
table_info: &table::metadata::TableInfo,
|
||||
region_routes: &[RegionRoute],
|
||||
region_wal_options: &HashMap<RegionNumber, WalOptions>,
|
||||
) -> Result<()> {
|
||||
open_regions_inner(
|
||||
context,
|
||||
table_id,
|
||||
table_name,
|
||||
table_info,
|
||||
region_routes,
|
||||
region_wal_options,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn open_regions_inner(
|
||||
context: &DdlContext,
|
||||
table_id: TableId,
|
||||
table_name: &TableName,
|
||||
table_info: &table::metadata::TableInfo,
|
||||
region_routes: &[RegionRoute],
|
||||
region_wal_options: &HashMap<RegionNumber, WalOptions>,
|
||||
ignore_region_not_found: bool,
|
||||
) -> Result<()> {
|
||||
let template = build_template_from_raw_table_info(table_info)?;
|
||||
let builder = CreateRequestBuilder::new(template, None);
|
||||
@@ -244,10 +287,12 @@ pub(crate) async fn open_regions(
|
||||
let datanode = datanode.clone();
|
||||
let requester = requester.clone();
|
||||
tasks.push(async move {
|
||||
requester
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(datanode))
|
||||
if let Err(err) = requester.handle(request).await
|
||||
&& !(ignore_region_not_found && err.status_code() == StatusCode::RegionNotFound)
|
||||
{
|
||||
return Err(add_peer_context_if_needed(datanode)(err));
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user