mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-05 13:30:44 +00:00
fix(soft-drop): open regions before restoring undrop metadata
Restore undropped table metadata only after physical regions have been reopened, keeping the table hidden while regions are still closed. Preserve the live-name conflict check before opening regions and cover the ordering with a regression test. Signed-off-by: Lei, HUANG <ratuthomm@gmail.com>
This commit is contained in:
@@ -602,6 +602,63 @@ async fn test_undrop_table_restores_metadata_and_reopens_regions() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_undrop_table_opens_regions_before_restoring_live_metadata() {
|
||||
let (tx, mut rx) = mpsc::channel(8);
|
||||
let datanode_handler = DatanodeWatcher::new(tx);
|
||||
let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
|
||||
let mut ddl_context = new_ddl_context(node_manager);
|
||||
ddl_context.soft_drop_enabled = true;
|
||||
let table_id = 1024;
|
||||
let table_name = "foo";
|
||||
let task = test_create_table_task(table_name, table_id);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(table_id, 1)),
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
follower_peers: vec![],
|
||||
leader_state: None,
|
||||
leader_down_since: None,
|
||||
write_route_policy: None,
|
||||
}]),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut drop_procedure = DropTableProcedure::new(
|
||||
new_drop_table_task(table_name, table_id, false),
|
||||
ddl_context.clone(),
|
||||
);
|
||||
execute_procedure_until_done(&mut drop_procedure).await;
|
||||
while rx.try_recv().is_ok() {}
|
||||
|
||||
let mut procedure =
|
||||
UndropTableProcedure::new(new_undrop_table_task(table_id), ddl_context.clone());
|
||||
let ctx = new_test_procedure_context();
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
|
||||
let (_, request) = rx.try_recv().unwrap();
|
||||
assert_matches!(request.body, Some(region_request::Body::Open(_)));
|
||||
assert!(rx.try_recv().is_err());
|
||||
assert!(
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(TableNameKey::new(
|
||||
DEFAULT_CATALOG_NAME,
|
||||
DEFAULT_SCHEMA_NAME,
|
||||
table_name,
|
||||
))
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_undrop_logical_table_skips_datanode_open() {
|
||||
let (tx, mut rx) = mpsc::channel(8);
|
||||
|
||||
@@ -103,7 +103,7 @@ impl UndropTableProcedure {
|
||||
}
|
||||
);
|
||||
self.data.table_info = Some(dropped_table.table_info_value.table_info);
|
||||
self.data.state = UndropTableState::RestoreMetadata;
|
||||
self.data.state = UndropTableState::OpenRegions;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
@@ -127,13 +127,14 @@ impl UndropTableProcedure {
|
||||
}
|
||||
err => err,
|
||||
})?;
|
||||
self.data.state = UndropTableState::OpenRegions;
|
||||
self.data.state = UndropTableState::InvalidateTableCache;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
async fn on_open_regions(&mut self) -> Result<Status> {
|
||||
self.ensure_live_table_not_exists().await?;
|
||||
let TableRouteValue::Physical(route) = self.data.table_route_value() else {
|
||||
self.data.state = UndropTableState::InvalidateTableCache;
|
||||
self.data.state = UndropTableState::RestoreMetadata;
|
||||
return Ok(Status::executing(true));
|
||||
};
|
||||
|
||||
@@ -146,10 +147,25 @@ impl UndropTableProcedure {
|
||||
&self.data.region_wal_options,
|
||||
)
|
||||
.await?;
|
||||
self.data.state = UndropTableState::InvalidateTableCache;
|
||||
self.data.state = UndropTableState::RestoreMetadata;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
async fn ensure_live_table_not_exists(&self) -> Result<()> {
|
||||
ensure!(
|
||||
!self
|
||||
.context
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.exists(TableNameKey::from(self.data.table_name()))
|
||||
.await?,
|
||||
error::TableAlreadyExistsSnafu {
|
||||
table_name: self.data.table_name().to_string()
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_broadcast(&mut self) -> Result<Status> {
|
||||
let ctx = crate::cache_invalidator::Context {
|
||||
subject: Some(format!(
|
||||
|
||||
Reference in New Issue
Block a user