mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
376 lines
12 KiB
Rust
376 lines
12 KiB
Rust
// Copyright 2023 Greptime Team
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager};
|
|
use client::OutputData;
|
|
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
|
use common_meta::key::table_route::TableRouteKey;
|
|
use common_meta::key::{RegionDistribution, TableMetaKey};
|
|
use common_meta::peer::Peer;
|
|
use common_meta::{distributed_time_constants, RegionIdent};
|
|
use common_procedure::{watcher, ProcedureWithId};
|
|
use common_query::Output;
|
|
use common_telemetry::info;
|
|
use common_test_util::recordbatch::check_output_stream;
|
|
use frontend::error::Result as FrontendResult;
|
|
use frontend::instance::Instance;
|
|
use futures::TryStreamExt;
|
|
use meta_srv::error::Result as MetaResult;
|
|
use meta_srv::metasrv::{SelectorContext, SelectorRef};
|
|
use meta_srv::procedure::region_failover::{RegionFailoverContext, RegionFailoverProcedure};
|
|
use meta_srv::selector::{Namespace, Selector, SelectorOptions};
|
|
use servers::query_handler::sql::SqlQueryHandler;
|
|
use session::context::{QueryContext, QueryContextRef};
|
|
use table::metadata::TableId;
|
|
use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
|
|
use tests_integration::test_util::{get_test_store_config, StorageType};
|
|
use tokio::time;
|
|
|
|
#[macro_export]
|
|
macro_rules! region_failover_test {
|
|
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
|
|
paste::item! {
|
|
mod [<integration_region_failover_ $service:lower _test>] {
|
|
$(
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
$(
|
|
#[$meta]
|
|
)*
|
|
async fn [< $test >]() {
|
|
let store_type = tests_integration::test_util::StorageType::$service;
|
|
if store_type.test_on() {
|
|
let _ = $crate::region_failover::$test(store_type).await;
|
|
}
|
|
|
|
}
|
|
)*
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
#[macro_export]
|
|
macro_rules! region_failover_tests {
|
|
($($service:ident),*) => {
|
|
$(
|
|
region_failover_test!(
|
|
$service,
|
|
|
|
test_region_failover,
|
|
);
|
|
)*
|
|
};
|
|
}
|
|
|
|
pub async fn test_region_failover(store_type: StorageType) {
|
|
if store_type == StorageType::File {
|
|
// Region failover doesn't make sense when using local file storage.
|
|
return;
|
|
}
|
|
common_telemetry::init_default_ut_logging();
|
|
info!("Running region failover test for {}", store_type);
|
|
|
|
let mut logical_timer = 1685508715000;
|
|
|
|
let cluster_name = "test_region_failover";
|
|
|
|
let (store_config, _guard) = get_test_store_config(&store_type);
|
|
|
|
let datanodes = 5u64;
|
|
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
|
|
let cluster = builder
|
|
.with_datanodes(datanodes as u32)
|
|
.with_store_config(store_config)
|
|
.build()
|
|
.await;
|
|
|
|
let frontend = cluster.frontend.clone();
|
|
|
|
let table_id = prepare_testing_table(&cluster).await;
|
|
|
|
let results = insert_values(&frontend, logical_timer).await;
|
|
logical_timer += 1000;
|
|
for result in results {
|
|
assert!(matches!(result.unwrap().data, OutputData::AffectedRows(1)));
|
|
}
|
|
|
|
assert!(has_route_cache(&frontend, table_id).await);
|
|
|
|
let distribution = find_region_distribution(&cluster, table_id).await;
|
|
info!("Find region distribution: {distribution:?}");
|
|
|
|
let mut foreign = 0;
|
|
for dn in 1..=datanodes {
|
|
if !&distribution.contains_key(&dn) {
|
|
foreign = dn
|
|
}
|
|
}
|
|
|
|
let selector = Arc::new(ForeignNodeSelector {
|
|
foreign: Peer {
|
|
id: foreign,
|
|
// "127.0.0.1:3001" is just a placeholder, does not actually connect to it.
|
|
addr: "127.0.0.1:3001".to_string(),
|
|
},
|
|
});
|
|
|
|
let failed_region = choose_failed_region(distribution);
|
|
info!("Simulating failed region: {failed_region:#?}");
|
|
|
|
run_region_failover_procedure(&cluster, failed_region.clone(), selector).await;
|
|
|
|
let distribution = find_region_distribution(&cluster, table_id).await;
|
|
info!("Find region distribution again: {distribution:?}");
|
|
|
|
// Waits for invalidating table cache
|
|
time::sleep(Duration::from_millis(100)).await;
|
|
|
|
assert!(!has_route_cache(&frontend, table_id).await);
|
|
|
|
// Inserts data to each datanode after failover
|
|
let frontend = cluster.frontend.clone();
|
|
let results = insert_values(&frontend, logical_timer).await;
|
|
for result in results {
|
|
assert!(matches!(result.unwrap().data, OutputData::AffectedRows(1)));
|
|
}
|
|
|
|
assert_values(&frontend).await;
|
|
|
|
assert!(!distribution.contains_key(&failed_region.datanode_id));
|
|
|
|
let mut success = false;
|
|
let values = distribution.values();
|
|
for val in values {
|
|
success = success || val.contains(&failed_region.region_number);
|
|
}
|
|
assert!(success)
|
|
}
|
|
|
|
async fn has_route_cache(instance: &Arc<Instance>, table_id: TableId) -> bool {
|
|
let catalog_manager = instance
|
|
.catalog_manager()
|
|
.as_any()
|
|
.downcast_ref::<KvBackendCatalogManager>()
|
|
.unwrap();
|
|
|
|
let kv_backend = catalog_manager.table_metadata_manager_ref().kv_backend();
|
|
|
|
let cache = kv_backend
|
|
.as_any()
|
|
.downcast_ref::<CachedMetaKvBackend>()
|
|
.unwrap()
|
|
.cache();
|
|
|
|
cache
|
|
.get(TableRouteKey::new(table_id).as_raw_key().as_slice())
|
|
.await
|
|
.is_some()
|
|
}
|
|
|
|
async fn insert_values(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {
|
|
let query_ctx = QueryContext::arc();
|
|
|
|
let mut results = Vec::new();
|
|
for range in [5, 15, 25, 55] {
|
|
let result = insert_value(
|
|
instance,
|
|
&format!("INSERT INTO my_table VALUES ({},{})", range, ts),
|
|
query_ctx.clone(),
|
|
)
|
|
.await;
|
|
results.push(result);
|
|
}
|
|
|
|
results
|
|
}
|
|
|
|
async fn insert_value(
|
|
instance: &Arc<Instance>,
|
|
sql: &str,
|
|
query_ctx: QueryContextRef,
|
|
) -> FrontendResult<Output> {
|
|
instance.do_query(sql, query_ctx).await.remove(0)
|
|
}
|
|
|
|
async fn assert_values(instance: &Arc<Instance>) {
|
|
let query_ctx = QueryContext::arc();
|
|
|
|
let result = instance
|
|
.do_query("select * from my_table order by i, ts", query_ctx)
|
|
.await
|
|
.remove(0);
|
|
|
|
let expected = "\
|
|
+----+---------------------+
|
|
| i | ts |
|
|
+----+---------------------+
|
|
| 5 | 2023-05-31T04:51:55 |
|
|
| 5 | 2023-05-31T04:51:56 |
|
|
| 15 | 2023-05-31T04:51:55 |
|
|
| 15 | 2023-05-31T04:51:56 |
|
|
| 25 | 2023-05-31T04:51:55 |
|
|
| 25 | 2023-05-31T04:51:56 |
|
|
| 55 | 2023-05-31T04:51:55 |
|
|
| 55 | 2023-05-31T04:51:56 |
|
|
+----+---------------------+";
|
|
check_output_stream(result.unwrap().data, expected).await;
|
|
}
|
|
|
|
async fn prepare_testing_table(cluster: &GreptimeDbCluster) -> TableId {
|
|
let sql = r"
|
|
CREATE TABLE my_table (
|
|
i INT PRIMARY KEY,
|
|
ts TIMESTAMP TIME INDEX,
|
|
) PARTITION BY RANGE COLUMNS (i) (
|
|
PARTITION r0 VALUES LESS THAN (10),
|
|
PARTITION r1 VALUES LESS THAN (20),
|
|
PARTITION r2 VALUES LESS THAN (50),
|
|
PARTITION r3 VALUES LESS THAN (MAXVALUE),
|
|
)";
|
|
let result = cluster.frontend.do_query(sql, QueryContext::arc()).await;
|
|
result.first().unwrap().as_ref().unwrap();
|
|
|
|
let table = cluster
|
|
.frontend
|
|
.catalog_manager()
|
|
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table")
|
|
.await
|
|
.unwrap()
|
|
.unwrap();
|
|
table.table_info().table_id()
|
|
}
|
|
|
|
async fn find_region_distribution(
|
|
cluster: &GreptimeDbCluster,
|
|
table_id: TableId,
|
|
) -> RegionDistribution {
|
|
let manager = cluster.metasrv.table_metadata_manager();
|
|
let region_distribution = manager
|
|
.table_route_manager()
|
|
.get_region_distribution(table_id)
|
|
.await
|
|
.unwrap()
|
|
.unwrap();
|
|
|
|
// test DatanodeTableValues match the table region distribution
|
|
for datanode_id in cluster.datanode_instances.keys() {
|
|
let mut actual = manager
|
|
.datanode_table_manager()
|
|
.tables(*datanode_id)
|
|
.try_collect::<Vec<_>>()
|
|
.await
|
|
.unwrap()
|
|
.into_iter()
|
|
.filter_map(|x| {
|
|
if x.table_id == table_id {
|
|
Some(x.regions)
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.flatten()
|
|
.collect::<Vec<_>>();
|
|
actual.sort();
|
|
|
|
if let Some(mut expected) = region_distribution.get(datanode_id).cloned() {
|
|
expected.sort();
|
|
assert_eq!(expected, actual);
|
|
} else {
|
|
assert!(actual.is_empty());
|
|
}
|
|
}
|
|
|
|
region_distribution
|
|
}
|
|
|
|
fn choose_failed_region(distribution: RegionDistribution) -> RegionIdent {
|
|
let (failed_datanode, failed_region) = distribution
|
|
.iter()
|
|
.filter_map(|(datanode_id, regions)| {
|
|
if !regions.is_empty() {
|
|
Some((*datanode_id, regions[0]))
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.next()
|
|
.unwrap();
|
|
RegionIdent {
|
|
cluster_id: 1000,
|
|
datanode_id: failed_datanode,
|
|
table_id: 1025,
|
|
region_number: failed_region,
|
|
engine: "mito2".to_string(),
|
|
}
|
|
}
|
|
|
|
// The "foreign" means the Datanode is not containing any regions to the table before.
|
|
pub struct ForeignNodeSelector {
|
|
pub foreign: Peer,
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
impl Selector for ForeignNodeSelector {
|
|
type Context = SelectorContext;
|
|
type Output = Vec<Peer>;
|
|
|
|
async fn select(
|
|
&self,
|
|
_ns: Namespace,
|
|
_ctx: &Self::Context,
|
|
_opts: SelectorOptions,
|
|
) -> MetaResult<Self::Output> {
|
|
Ok(vec![self.foreign.clone()])
|
|
}
|
|
}
|
|
|
|
async fn run_region_failover_procedure(
|
|
cluster: &GreptimeDbCluster,
|
|
failed_region: RegionIdent,
|
|
selector: SelectorRef,
|
|
) {
|
|
let metasrv = &cluster.metasrv;
|
|
let procedure_manager = metasrv.procedure_manager();
|
|
let procedure = RegionFailoverProcedure::new(
|
|
"greptime".into(),
|
|
"public".into(),
|
|
failed_region.clone(),
|
|
RegionFailoverContext {
|
|
region_lease_secs: 10,
|
|
in_memory: metasrv.in_memory().clone(),
|
|
kv_backend: metasrv.kv_backend().clone(),
|
|
mailbox: metasrv.mailbox().clone(),
|
|
selector,
|
|
selector_ctx: SelectorContext {
|
|
datanode_lease_secs: distributed_time_constants::REGION_LEASE_SECS,
|
|
server_addr: metasrv.options().server_addr.clone(),
|
|
kv_backend: metasrv.kv_backend().clone(),
|
|
meta_peer_client: metasrv.meta_peer_client().clone(),
|
|
table_id: None,
|
|
},
|
|
dist_lock: metasrv.lock().clone(),
|
|
table_metadata_manager: metasrv.table_metadata_manager().clone(),
|
|
},
|
|
);
|
|
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
|
let procedure_id = procedure_with_id.id;
|
|
info!("Starting region failover procedure {procedure_id} for region {failed_region:?}");
|
|
|
|
let watcher = &mut procedure_manager.submit(procedure_with_id).await.unwrap();
|
|
watcher::wait(watcher).await.unwrap();
|
|
}
|