mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 15:22:56 +00:00
feat(tests-integration): add more region migration integration tests (#3094)
This commit is contained in:
@@ -175,6 +175,7 @@ impl TableMetadataAllocator {
|
||||
};
|
||||
Ok(table_route)
|
||||
}
|
||||
|
||||
pub async fn create(
|
||||
&self,
|
||||
ctx: &TableMetadataAllocatorContext,
|
||||
|
||||
@@ -18,6 +18,8 @@ mod grpc;
|
||||
mod http;
|
||||
#[macro_use]
|
||||
mod sql;
|
||||
#[macro_use]
|
||||
mod region_migration;
|
||||
// #[macro_use]
|
||||
// mod region_failover;
|
||||
|
||||
@@ -26,4 +28,6 @@ http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
|
||||
// region_failover_tests!(File, S3, S3WithCache, Oss, Azblob);
|
||||
sql_tests!(File);
|
||||
|
||||
region_migration_tests!(File);
|
||||
|
||||
// TODO(niebayes): add integration tests for remote wal.
|
||||
|
||||
@@ -38,22 +38,57 @@ use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
|
||||
use tests_integration::test_util::{
|
||||
check_output_stream, get_test_store_config, run_test_with_kafka_wal, StorageType,
|
||||
PEER_PLACEHOLDER_ADDR,
|
||||
check_output_stream, get_test_store_config, StorageType, PEER_PLACEHOLDER_ADDR,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
const TEST_TABLE_NAME: &str = "migration_target";
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_region_migration_fs() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
run_test_with_kafka_wal(|endpoints| {
|
||||
Box::pin(async move { test_region_migration(StorageType::File, endpoints).await })
|
||||
})
|
||||
.await
|
||||
#[macro_export]
|
||||
macro_rules! region_migration_test {
|
||||
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
|
||||
paste::item! {
|
||||
mod [<integration_region_migration_ $service:lower _test>] {
|
||||
$(
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
$(
|
||||
#[$meta]
|
||||
)*
|
||||
async fn [< $test >]() {
|
||||
let store_type = tests_integration::test_util::StorageType::$service;
|
||||
if store_type.test_on() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
tests_integration::test_util::run_test_with_kafka_wal(|endpoints| {
|
||||
Box::pin(async move { $crate::region_migration::$test(store_type, endpoints).await })
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! region_migration_tests {
|
||||
($($service:ident),*) => {
|
||||
$(
|
||||
region_migration_test!(
|
||||
$service,
|
||||
|
||||
test_region_migration,
|
||||
test_region_migration_multiple_regions,
|
||||
test_region_migration_all_regions,
|
||||
test_region_migration_incorrect_from_peer,
|
||||
test_region_migration_incorrect_region_id,
|
||||
);
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
/// A naive region migration test.
|
||||
pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<String>) {
|
||||
let cluster_name = "test_region_migration";
|
||||
let peer_factory = |id| Peer {
|
||||
@@ -66,9 +101,11 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
|
||||
let home_dir = create_temp_dir("test_migration_data_home");
|
||||
let datanodes = 5u64;
|
||||
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
|
||||
let const_selector = Arc::new(ConstNodeSelector {
|
||||
peers: vec![peer_factory(1), peer_factory(2), peer_factory(3)],
|
||||
});
|
||||
let const_selector = Arc::new(ConstNodeSelector::new(vec![
|
||||
peer_factory(1),
|
||||
peer_factory(2),
|
||||
peer_factory(3),
|
||||
]));
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
@@ -112,7 +149,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
|
||||
);
|
||||
let (to_peer_id, mut to_regions) = distribution.pop_first().unwrap();
|
||||
info!(
|
||||
"Selecting to peer: {from_peer_id}, and regions: {:?}",
|
||||
"Selecting to peer: {to_peer_id}, and regions: {:?}",
|
||||
to_regions
|
||||
);
|
||||
|
||||
@@ -176,10 +213,430 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
|
||||
assert!(procedure.is_none());
|
||||
}
|
||||
|
||||
pub struct ConstNodeSelector {
|
||||
/// A region migration test for a region server contains multiple regions of the table.
|
||||
pub async fn test_region_migration_multiple_regions(
|
||||
store_type: StorageType,
|
||||
endpoints: Vec<String>,
|
||||
) {
|
||||
let cluster_name = "test_region_migration_multiple_regions";
|
||||
let peer_factory = |id| Peer {
|
||||
id,
|
||||
addr: PEER_PLACEHOLDER_ADDR.to_string(),
|
||||
};
|
||||
|
||||
// Prepares test cluster.
|
||||
let (store_config, _guard) = get_test_store_config(&store_type);
|
||||
let home_dir = create_temp_dir("test_region_migration_multiple_regions_data_home");
|
||||
let datanodes = 5u64;
|
||||
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
|
||||
let const_selector = Arc::new(ConstNodeSelector::new(vec![
|
||||
peer_factory(1),
|
||||
peer_factory(2),
|
||||
peer_factory(2),
|
||||
]));
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.with_wal_config(WalConfig::Kafka(KafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
linger: Duration::from_millis(25),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
num_topics: 3,
|
||||
topic_name_prefix: Uuid::new_v4().to_string(),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_shared_home_dir(Arc::new(home_dir))
|
||||
.with_meta_selector(const_selector.clone())
|
||||
.build()
|
||||
.await;
|
||||
let mut logical_timer = 1685508715000;
|
||||
let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone();
|
||||
|
||||
// Prepares test table.
|
||||
let table_id = prepare_testing_table(&cluster).await;
|
||||
|
||||
// Inserts data
|
||||
let results = insert_values(&cluster.frontend, logical_timer).await;
|
||||
logical_timer += 1000;
|
||||
for result in results {
|
||||
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
|
||||
}
|
||||
|
||||
// The region distribution
|
||||
let mut distribution = find_region_distribution(&table_metadata_manager, table_id).await;
|
||||
assert_eq!(distribution.len(), 2);
|
||||
|
||||
// Selecting target of region migration.
|
||||
let region_migration_manager = cluster.meta_srv.region_migration_manager();
|
||||
let (peer_1, peer_1_regions) = distribution.pop_first().unwrap();
|
||||
let (peer_2, peer_2_regions) = distribution.pop_first().unwrap();
|
||||
|
||||
// Picks the peer only contains as from peer.
|
||||
let ((from_peer_id, from_regions), (to_peer_id, mut to_regions)) = if peer_1_regions.len() == 1
|
||||
{
|
||||
((peer_1, peer_1_regions), (peer_2, peer_2_regions))
|
||||
} else {
|
||||
((peer_2, peer_2_regions), (peer_1, peer_1_regions))
|
||||
};
|
||||
|
||||
info!(
|
||||
"Selecting from peer: {from_peer_id}, and regions: {:?}",
|
||||
from_regions
|
||||
);
|
||||
info!(
|
||||
"Selecting to peer: {to_peer_id}, and regions: {:?}",
|
||||
to_regions
|
||||
);
|
||||
|
||||
let region_id = RegionId::new(table_id, from_regions[0]);
|
||||
// Trigger region migration.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
info!("Started region procedure: {}!", procedure.unwrap());
|
||||
|
||||
// Prepares expected region distribution.
|
||||
to_regions.extend(from_regions);
|
||||
// Keeps asc order.
|
||||
to_regions.sort();
|
||||
distribution.insert(to_peer_id, to_regions);
|
||||
|
||||
// Waits condition
|
||||
wait_condition(
|
||||
Duration::from_secs(10),
|
||||
Box::pin(async move {
|
||||
loop {
|
||||
let region_migration =
|
||||
find_region_distribution(&table_metadata_manager, table_id).await;
|
||||
if region_migration == distribution {
|
||||
info!("Found new distribution: {region_migration:?}");
|
||||
break;
|
||||
} else {
|
||||
info!("Found the unexpected distribution: {region_migration:?}, expected: {distribution:?}");
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Inserts more table.
|
||||
let results = insert_values(&cluster.frontend, logical_timer).await;
|
||||
for result in results {
|
||||
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
|
||||
}
|
||||
|
||||
// Asserts the writes.
|
||||
assert_values(&cluster.frontend).await;
|
||||
|
||||
// Triggers again.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(procedure.is_none());
|
||||
}
|
||||
|
||||
/// A region migration test for a region server contains all regions of the table.
|
||||
pub async fn test_region_migration_all_regions(store_type: StorageType, endpoints: Vec<String>) {
|
||||
let cluster_name = "test_region_migration_all_regions";
|
||||
let peer_factory = |id| Peer {
|
||||
id,
|
||||
addr: PEER_PLACEHOLDER_ADDR.to_string(),
|
||||
};
|
||||
|
||||
// Prepares test cluster.
|
||||
let (store_config, _guard) = get_test_store_config(&store_type);
|
||||
let home_dir = create_temp_dir("test_region_migration_all_regions_data_home");
|
||||
let datanodes = 5u64;
|
||||
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
|
||||
let const_selector = Arc::new(ConstNodeSelector::new(vec![
|
||||
peer_factory(2),
|
||||
peer_factory(2),
|
||||
peer_factory(2),
|
||||
]));
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.with_wal_config(WalConfig::Kafka(KafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
linger: Duration::from_millis(25),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
num_topics: 3,
|
||||
topic_name_prefix: Uuid::new_v4().to_string(),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_shared_home_dir(Arc::new(home_dir))
|
||||
.with_meta_selector(const_selector.clone())
|
||||
.build()
|
||||
.await;
|
||||
let mut logical_timer = 1685508715000;
|
||||
let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone();
|
||||
|
||||
// Prepares test table.
|
||||
let table_id = prepare_testing_table(&cluster).await;
|
||||
|
||||
// Inserts data
|
||||
let results = insert_values(&cluster.frontend, logical_timer).await;
|
||||
logical_timer += 1000;
|
||||
for result in results {
|
||||
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
|
||||
}
|
||||
|
||||
// The region distribution
|
||||
let mut distribution = find_region_distribution(&table_metadata_manager, table_id).await;
|
||||
assert_eq!(distribution.len(), 1);
|
||||
|
||||
// Selecting target of region migration.
|
||||
let region_migration_manager = cluster.meta_srv.region_migration_manager();
|
||||
let (from_peer_id, mut from_regions) = distribution.pop_first().unwrap();
|
||||
let to_peer_id = 1;
|
||||
let mut to_regions = Vec::new();
|
||||
info!(
|
||||
"Selecting from peer: {from_peer_id}, and regions: {:?}",
|
||||
from_regions
|
||||
);
|
||||
info!(
|
||||
"Selecting to peer: {to_peer_id}, and regions: {:?}",
|
||||
to_regions
|
||||
);
|
||||
|
||||
let region_id = RegionId::new(table_id, from_regions[0]);
|
||||
// Trigger region migration.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
info!("Started region procedure: {}!", procedure.unwrap());
|
||||
|
||||
// Prepares expected region distribution.
|
||||
to_regions.push(from_regions.remove(0));
|
||||
// Keeps asc order.
|
||||
to_regions.sort();
|
||||
distribution.insert(to_peer_id, to_regions);
|
||||
distribution.insert(from_peer_id, from_regions);
|
||||
|
||||
// Waits condition
|
||||
wait_condition(
|
||||
Duration::from_secs(10),
|
||||
Box::pin(async move {
|
||||
loop {
|
||||
let region_migration =
|
||||
find_region_distribution(&table_metadata_manager, table_id).await;
|
||||
if region_migration == distribution {
|
||||
info!("Found new distribution: {region_migration:?}");
|
||||
break;
|
||||
} else {
|
||||
info!("Found the unexpected distribution: {region_migration:?}, expected: {distribution:?}");
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Inserts more table.
|
||||
let results = insert_values(&cluster.frontend, logical_timer).await;
|
||||
for result in results {
|
||||
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
|
||||
}
|
||||
|
||||
// Asserts the writes.
|
||||
assert_values(&cluster.frontend).await;
|
||||
|
||||
// Triggers again.
|
||||
let procedure = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(from_peer_id),
|
||||
peer_factory(to_peer_id),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(procedure.is_none());
|
||||
}
|
||||
|
||||
pub async fn test_region_migration_incorrect_from_peer(
|
||||
store_type: StorageType,
|
||||
endpoints: Vec<String>,
|
||||
) {
|
||||
let cluster_name = "test_region_migration_incorrect_from_peer";
|
||||
let peer_factory = |id| Peer {
|
||||
id,
|
||||
addr: PEER_PLACEHOLDER_ADDR.to_string(),
|
||||
};
|
||||
|
||||
// Prepares test cluster.
|
||||
let (store_config, _guard) = get_test_store_config(&store_type);
|
||||
let home_dir = create_temp_dir("test_region_migration_incorrect_from_peer_data_home");
|
||||
let datanodes = 5u64;
|
||||
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
|
||||
let const_selector = Arc::new(ConstNodeSelector::new(vec![
|
||||
peer_factory(1),
|
||||
peer_factory(2),
|
||||
peer_factory(3),
|
||||
]));
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.with_wal_config(WalConfig::Kafka(KafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
linger: Duration::from_millis(25),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
num_topics: 3,
|
||||
topic_name_prefix: Uuid::new_v4().to_string(),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_shared_home_dir(Arc::new(home_dir))
|
||||
.with_meta_selector(const_selector.clone())
|
||||
.build()
|
||||
.await;
|
||||
let logical_timer = 1685508715000;
|
||||
let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone();
|
||||
|
||||
// Prepares test table.
|
||||
let table_id = prepare_testing_table(&cluster).await;
|
||||
|
||||
// Inserts data
|
||||
let results = insert_values(&cluster.frontend, logical_timer).await;
|
||||
for result in results {
|
||||
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
|
||||
}
|
||||
|
||||
// The region distribution
|
||||
let distribution = find_region_distribution(&table_metadata_manager, table_id).await;
|
||||
assert_eq!(distribution.len(), 3);
|
||||
let region_migration_manager = cluster.meta_srv.region_migration_manager();
|
||||
|
||||
let region_id = RegionId::new(table_id, 1);
|
||||
|
||||
// Trigger region migration.
|
||||
let err = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(5),
|
||||
peer_factory(1),
|
||||
))
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
assert!(matches!(
|
||||
err,
|
||||
meta_srv::error::Error::InvalidArguments { .. }
|
||||
));
|
||||
}
|
||||
|
||||
pub async fn test_region_migration_incorrect_region_id(
|
||||
store_type: StorageType,
|
||||
endpoints: Vec<String>,
|
||||
) {
|
||||
let cluster_name = "test_region_migration_incorrect_region_id";
|
||||
let peer_factory = |id| Peer {
|
||||
id,
|
||||
addr: PEER_PLACEHOLDER_ADDR.to_string(),
|
||||
};
|
||||
|
||||
// Prepares test cluster.
|
||||
let (store_config, _guard) = get_test_store_config(&store_type);
|
||||
let home_dir = create_temp_dir("test_region_migration_incorrect_region_id_data_home");
|
||||
let datanodes = 5u64;
|
||||
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
|
||||
let const_selector = Arc::new(ConstNodeSelector::new(vec![
|
||||
peer_factory(1),
|
||||
peer_factory(2),
|
||||
peer_factory(3),
|
||||
]));
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.with_wal_config(WalConfig::Kafka(KafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
linger: Duration::from_millis(25),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
num_topics: 3,
|
||||
topic_name_prefix: Uuid::new_v4().to_string(),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_shared_home_dir(Arc::new(home_dir))
|
||||
.with_meta_selector(const_selector.clone())
|
||||
.build()
|
||||
.await;
|
||||
let logical_timer = 1685508715000;
|
||||
let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone();
|
||||
|
||||
// Prepares test table.
|
||||
let table_id = prepare_testing_table(&cluster).await;
|
||||
|
||||
// Inserts data
|
||||
let results = insert_values(&cluster.frontend, logical_timer).await;
|
||||
for result in results {
|
||||
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
|
||||
}
|
||||
|
||||
// The region distribution
|
||||
let distribution = find_region_distribution(&table_metadata_manager, table_id).await;
|
||||
assert_eq!(distribution.len(), 3);
|
||||
let region_migration_manager = cluster.meta_srv.region_migration_manager();
|
||||
|
||||
let region_id = RegionId::new(table_id, 5);
|
||||
|
||||
// Trigger region migration.
|
||||
let err = region_migration_manager
|
||||
.submit_procedure(RegionMigrationProcedureTask::new(
|
||||
0,
|
||||
region_id,
|
||||
peer_factory(2),
|
||||
peer_factory(1),
|
||||
))
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
assert!(matches!(
|
||||
err,
|
||||
meta_srv::error::Error::RegionRouteNotFound { .. }
|
||||
));
|
||||
}
|
||||
|
||||
struct ConstNodeSelector {
|
||||
peers: Vec<Peer>,
|
||||
}
|
||||
|
||||
impl ConstNodeSelector {
|
||||
fn new(peers: Vec<Peer>) -> Self {
|
||||
Self { peers }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Selector for ConstNodeSelector {
|
||||
type Context = SelectorContext;
|
||||
|
||||
Reference in New Issue
Block a user