test(meta): cover operating region role propagation

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
WenyXu
2026-04-15 09:46:37 +00:00
parent f80827ea36
commit 77eb0752d7
5 changed files with 104 additions and 24 deletions

View File

@@ -159,6 +159,7 @@ impl State for DropDatabaseExecutor {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use api::region::RegionResponse;
@@ -167,6 +168,8 @@ mod tests {
use common_error::ext::BoxedError;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use table::table_name::TableName;
use crate::ddl::drop_database::cursor::DropDatabaseCursor;
@@ -179,7 +182,7 @@ mod tests {
use crate::error::{self, Error, Result};
use crate::key::datanode_table::DatanodeTableKey;
use crate::peer::Peer;
use crate::rpc::router::region_distribution;
use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
use crate::test_util::{MockDatanodeHandler, MockDatanodeManager, new_ddl_context};
#[derive(Clone)]
@@ -423,6 +426,34 @@ mod tests {
}
}
#[tokio::test]
async fn test_recover_registers_region_role_from_routes() {
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(node_manager);
let region_id = RegionId::new(1024, 1);
let mut state = DropDatabaseExecutor::new(
1024,
1024,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(7)),
follower_peers: vec![],
leader_state: Some(LeaderState::Downgrading),
leader_down_since: None,
write_route_policy: None,
}],
DropTableTarget::Physical,
);
state.recover(&ddl_context).unwrap();
let roles = ddl_context
.memory_region_keeper
.extract_operating_region_roles(7, &mut HashSet::from([region_id]));
assert_eq!(roles.get(&region_id), Some(&RegionRole::DowngradingLeader));
}
#[tokio::test]
async fn test_next_remaps_addresses_when_retrying() {
let (tx, mut rx) = tokio::sync::mpsc::channel(8);

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::assert_matches;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::region::RegionResponse;
@@ -30,6 +30,7 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
@@ -351,6 +352,10 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
.memory_region_keeper
.contains(datanode_id, region_id)
);
let roles = ddl_context
.memory_region_keeper
.extract_operating_region_roles(datanode_id, &mut HashSet::from([region_id]));
assert_eq!(roles.get(&region_id), Some(&RegionRole::Leader));
execute_procedure_until_done(&mut procedure).await;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::region::{RegionRequest, region_request};
@@ -23,6 +23,7 @@ use common_procedure::Procedure;
use common_procedure_test::{
execute_procedure_until, execute_procedure_until_done, new_test_procedure_context,
};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::mpsc;
@@ -328,6 +329,10 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
.memory_region_keeper
.contains(datanode_id, region_id)
);
let roles = ddl_context
.memory_region_keeper
.extract_operating_region_roles(datanode_id, &mut HashSet::from([region_id]));
assert_eq!(roles.get(&region_id), Some(&RegionRole::Leader));
execute_procedure_until_done(&mut procedure).await;

View File

@@ -85,18 +85,6 @@ impl MemoryRegionKeeper {
inner.contains_key(&(datanode_id, region_id))
}
/// Extracts all operating regions from `region_ids` and returns operating regions.
pub fn extract_operating_regions(
&self,
datanode_id: DatanodeId,
region_ids: &mut HashSet<RegionId>,
) -> HashSet<RegionId> {
let inner = self.inner.read().unwrap();
region_ids
.extract_if(|region_id| inner.contains_key(&(datanode_id, *region_id)))
.collect()
}
/// Extracts all operating regions with roles from `region_ids`.
pub fn extract_operating_region_roles(
&self,
@@ -104,12 +92,8 @@ impl MemoryRegionKeeper {
region_ids: &mut HashSet<RegionId>,
) -> HashMap<RegionId, RegionRole> {
let inner = self.inner.read().unwrap();
let operating_regions = region_ids
region_ids
.extract_if(|region_id| inner.contains_key(&(datanode_id, *region_id)))
.collect::<Vec<_>>();
operating_regions
.into_iter()
.map(|region_id| {
let role = *inner
.get(&(datanode_id, region_id))
@@ -167,11 +151,11 @@ mod tests {
RegionId::from_u64(2),
RegionId::from_u64(3),
]);
let output = keeper.extract_operating_regions(1, &mut regions);
let output = keeper.extract_operating_region_roles(1, &mut regions);
assert_eq!(output.len(), 2);
assert!(output.contains(&RegionId::from_u64(1)));
assert!(output.contains(&RegionId::from_u64(2)));
assert!(output.contains_key(&RegionId::from_u64(1)));
assert!(output.contains_key(&RegionId::from_u64(2)));
assert_eq!(regions, HashSet::from([RegionId::from_u64(3)]));
assert_eq!(keeper.len(), 2);

View File

@@ -837,9 +837,11 @@ mod tests {
};
use common_meta::error;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use common_meta::test_util::MockDatanodeManager;
use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use table::table_name::TableName;
use tokio::sync::mpsc;
@@ -993,6 +995,59 @@ mod tests {
assert!(!procedure.should_rollback_allocated_regions());
}
#[test]
fn test_register_operating_regions_preserves_route_roles() {
let keeper = Arc::new(MemoryRegionKeeper::new());
let region_routes = vec![
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![],
leader_state: Some(LeaderState::Staging),
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_state: Some(LeaderState::Downgrading),
leader_down_since: None,
write_route_policy: None,
},
];
let _guards = Context::register_operating_regions(&keeper, &region_routes).unwrap();
let leader_roles =
keeper.extract_operating_region_roles(1, &mut HashSet::from([RegionId::new(1024, 1)]));
let staging_roles =
keeper.extract_operating_region_roles(2, &mut HashSet::from([RegionId::new(1024, 2)]));
let downgrading_roles =
keeper.extract_operating_region_roles(3, &mut HashSet::from([RegionId::new(1024, 3)]));
assert_eq!(
leader_roles.get(&RegionId::new(1024, 1)),
Some(&RegionRole::Leader)
);
assert_eq!(
staging_roles.get(&RegionId::new(1024, 2)),
Some(&RegionRole::StagingLeader)
);
assert_eq!(
downgrading_roles.get(&RegionId::new(1024, 3)),
Some(&RegionRole::DowngradingLeader)
);
}
#[tokio::test]
async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() {
let env = TestingEnv::new();