mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -9381,6 +9381,7 @@ name = "tests-integration"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"axum-test-helper",
|
||||
"catalog",
|
||||
|
||||
@@ -375,21 +375,6 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
// The "foreign" means the Datanode is not containing any regions to the table before.
|
||||
pub struct ForeignNodeSelector {
|
||||
pub foreign: Peer,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Selector for ForeignNodeSelector {
|
||||
type Context = SelectorContext;
|
||||
type Output = Vec<Peer>;
|
||||
|
||||
async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result<Self::Output> {
|
||||
Ok(vec![self.foreign.clone()])
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestingEnv {
|
||||
pub context: RegionFailoverContext,
|
||||
pub heartbeat_receivers: HashMap<DatanodeId, Receiver<tonic::Result<HeartbeatResponse>>>,
|
||||
|
||||
@@ -11,6 +11,7 @@ dashboard = []
|
||||
api = { path = "../src/api" }
|
||||
axum = "0.6"
|
||||
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
|
||||
async-trait = "0.1"
|
||||
catalog = { path = "../src/catalog" }
|
||||
client = { path = "../src/client" }
|
||||
common-base = { path = "../src/common/base" }
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::Peer;
|
||||
use catalog::helper::TableGlobalKey;
|
||||
use catalog::remote::{CachedMetaKvBackend, Kv};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
|
||||
@@ -29,8 +30,10 @@ use common_telemetry::info;
|
||||
use frontend::catalog::FrontendCatalogManager;
|
||||
use frontend::error::Result as FrontendResult;
|
||||
use frontend::instance::Instance;
|
||||
use meta_srv::metasrv::SelectorContext;
|
||||
use meta_srv::error::Result as MetaResult;
|
||||
use meta_srv::metasrv::{SelectorContext, SelectorRef};
|
||||
use meta_srv::procedure::region_failover::{RegionFailoverContext, RegionFailoverProcedure};
|
||||
use meta_srv::selector::{Namespace, Selector};
|
||||
use meta_srv::table_routes;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
@@ -83,8 +86,9 @@ pub async fn test_region_failover(store_type: StorageType) {
|
||||
|
||||
let (store_config, _guard) = get_test_store_config(&store_type, cluster_name);
|
||||
|
||||
let datanodes = 5u64;
|
||||
let cluster = GreptimeDbClusterBuilder::new(cluster_name)
|
||||
.with_datanodes(4)
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.build()
|
||||
.await;
|
||||
@@ -120,10 +124,25 @@ pub async fn test_region_failover(store_type: StorageType) {
|
||||
let distribution = find_region_distribution(&cluster).await;
|
||||
info!("Find region distribution: {distribution:?}");
|
||||
|
||||
let mut foreign = 0;
|
||||
for dn in 1..=datanodes {
|
||||
if !&distribution.contains_key(&dn) {
|
||||
foreign = dn
|
||||
}
|
||||
}
|
||||
|
||||
let selector = Arc::new(ForeignNodeSelector {
|
||||
foreign: Peer {
|
||||
id: foreign,
|
||||
// "127.0.0.1:3001" is just a placeholder, does not actually connect to it.
|
||||
addr: "127.0.0.1:3001".to_string(),
|
||||
},
|
||||
});
|
||||
|
||||
let failed_region = choose_failed_region(distribution);
|
||||
info!("Simulating failed region: {failed_region:#?}");
|
||||
|
||||
run_region_failover_procedure(&cluster, failed_region.clone()).await;
|
||||
run_region_failover_procedure(&cluster, failed_region.clone(), selector).await;
|
||||
|
||||
let distribution = find_region_distribution(&cluster).await;
|
||||
info!("Find region distribution again: {distribution:?}");
|
||||
@@ -287,14 +306,33 @@ fn choose_failed_region(distribution: HashMap<u64, Vec<u32>>) -> RegionIdent {
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_region_failover_procedure(cluster: &GreptimeDbCluster, failed_region: RegionIdent) {
|
||||
// The "foreign" means the Datanode is not containing any regions to the table before.
|
||||
pub struct ForeignNodeSelector {
|
||||
pub foreign: Peer,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Selector for ForeignNodeSelector {
|
||||
type Context = SelectorContext;
|
||||
type Output = Vec<Peer>;
|
||||
|
||||
async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> MetaResult<Self::Output> {
|
||||
Ok(vec![self.foreign.clone()])
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_region_failover_procedure(
|
||||
cluster: &GreptimeDbCluster,
|
||||
failed_region: RegionIdent,
|
||||
selector: SelectorRef,
|
||||
) {
|
||||
let meta_srv = &cluster.meta_srv;
|
||||
let procedure_manager = meta_srv.procedure_manager();
|
||||
let procedure = RegionFailoverProcedure::new(
|
||||
failed_region.clone(),
|
||||
RegionFailoverContext {
|
||||
mailbox: meta_srv.mailbox(),
|
||||
selector: meta_srv.selector(),
|
||||
selector,
|
||||
selector_ctx: SelectorContext {
|
||||
datanode_lease_secs: meta_srv.options().datanode_lease_secs,
|
||||
server_addr: meta_srv.options().server_addr.clone(),
|
||||
|
||||
Reference in New Issue
Block a user