diff --git a/Cargo.lock b/Cargo.lock index a82f6fe7ef..d23309fff1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9381,6 +9381,7 @@ name = "tests-integration" version = "0.2.0" dependencies = [ "api", + "async-trait", "axum", "axum-test-helper", "catalog", diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 92349ef309..a409dbc136 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -375,21 +375,6 @@ mod tests { } } - // The "foreign" means the Datanode is not containing any regions to the table before. - pub struct ForeignNodeSelector { - pub foreign: Peer, - } - - #[async_trait] - impl Selector for ForeignNodeSelector { - type Context = SelectorContext; - type Output = Vec; - - async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result { - Ok(vec![self.foreign.clone()]) - } - } - pub struct TestingEnv { pub context: RegionFailoverContext, pub heartbeat_receivers: HashMap>>, diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 1ceaca7be7..e1fdbcb481 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -11,6 +11,7 @@ dashboard = [] api = { path = "../src/api" } axum = "0.6" axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" } +async-trait = "0.1" catalog = { path = "../src/catalog" } client = { path = "../src/client" } common-base = { path = "../src/common/base" } diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 43180db2fb..869faebe46 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use api::v1::meta::Peer; use catalog::helper::TableGlobalKey; use catalog::remote::{CachedMetaKvBackend, Kv}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; @@ -29,8 +30,10 @@ use common_telemetry::info; use frontend::catalog::FrontendCatalogManager; use frontend::error::Result as FrontendResult; use frontend::instance::Instance; -use meta_srv::metasrv::SelectorContext; +use meta_srv::error::Result as MetaResult; +use meta_srv::metasrv::{SelectorContext, SelectorRef}; use meta_srv::procedure::region_failover::{RegionFailoverContext, RegionFailoverProcedure}; +use meta_srv::selector::{Namespace, Selector}; use meta_srv::table_routes; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; @@ -83,8 +86,9 @@ pub async fn test_region_failover(store_type: StorageType) { let (store_config, _guard) = get_test_store_config(&store_type, cluster_name); + let datanodes = 5u64; let cluster = GreptimeDbClusterBuilder::new(cluster_name) - .with_datanodes(4) + .with_datanodes(datanodes as u32) .with_store_config(store_config) .build() .await; @@ -120,10 +124,25 @@ pub async fn test_region_failover(store_type: StorageType) { let distribution = find_region_distribution(&cluster).await; info!("Find region distribution: {distribution:?}"); + let mut foreign = 0; + for dn in 1..=datanodes { + if !&distribution.contains_key(&dn) { + foreign = dn + } + } + + let selector = Arc::new(ForeignNodeSelector { + foreign: Peer { + id: foreign, + // "127.0.0.1:3001" is just a placeholder, does not actually connect to it. + addr: "127.0.0.1:3001".to_string(), + }, + }); + let failed_region = choose_failed_region(distribution); info!("Simulating failed region: {failed_region:#?}"); - run_region_failover_procedure(&cluster, failed_region.clone()).await; + run_region_failover_procedure(&cluster, failed_region.clone(), selector).await; let distribution = find_region_distribution(&cluster).await; info!("Find region distribution again: {distribution:?}"); @@ -287,14 +306,33 @@ fn choose_failed_region(distribution: HashMap>) -> RegionIdent { } } -async fn run_region_failover_procedure(cluster: &GreptimeDbCluster, failed_region: RegionIdent) { +// The "foreign" means the Datanode is not containing any regions to the table before. +pub struct ForeignNodeSelector { + pub foreign: Peer, +} + +#[async_trait::async_trait] +impl Selector for ForeignNodeSelector { + type Context = SelectorContext; + type Output = Vec; + + async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> MetaResult { + Ok(vec![self.foreign.clone()]) + } +} + +async fn run_region_failover_procedure( + cluster: &GreptimeDbCluster, + failed_region: RegionIdent, + selector: SelectorRef, +) { let meta_srv = &cluster.meta_srv; let procedure_manager = meta_srv.procedure_manager(); let procedure = RegionFailoverProcedure::new( failed_region.clone(), RegionFailoverContext { mailbox: meta_srv.mailbox(), - selector: meta_srv.selector(), + selector, selector_ctx: SelectorContext { datanode_lease_secs: meta_srv.options().datanode_lease_secs, server_addr: meta_srv.options().server_addr.clone(),