feat(tests-integration): add a naive region migration integration test (#3078)

* fix: fix heartbeat handler ignore upgrade candidate instruction

* fix: fix handler did not inject wal options

* feat: expose `RegionMigrationProcedureTask`

* feat(tests-integration): add a naive region migration test

* chore: apply suggestions from CR

* feat: add test if the target region has migrated

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-01-03 16:12:59 +09:00
committed by GitHub
parent e4c71843e6
commit 611a8aa2fe
9 changed files with 421 additions and 31 deletions

View File

@@ -96,6 +96,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
Some((_, Instruction::OpenRegion { .. }))
| Some((_, Instruction::CloseRegion { .. }))
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
)
}
@@ -134,7 +135,7 @@ mod tests {
use common_meta::heartbeat::mailbox::{
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
};
use common_meta::instruction::{DowngradeRegion, OpenRegion};
use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
@@ -175,6 +176,44 @@ mod tests {
}
}
#[test]
fn test_is_acceptable() {
common_telemetry::init_default_ut_logging();
let region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let heartbeat_env = HeartbeatResponseTestEnv::new();
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
// Open region
let region_id = RegionId::new(1024, 1);
let storage_path = "test";
let instruction = open_region_instruction(region_id, storage_path);
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
// Close region
let instruction = close_region_instruction(region_id);
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
// Downgrade region
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
});
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
// Upgrade region
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
});
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
);
}
fn close_region_instruction(region_id: RegionId) -> Instruction {
Instruction::CloseRegion(RegionIdent {
table_id: region_id.table_id(),

View File

@@ -14,6 +14,7 @@
use common_error::ext::ErrorExt;
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal::prepare_wal_option;
use futures_util::future::BoxFuture;
use store_api::path_utils::region_dir;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
@@ -26,15 +27,14 @@ impl HandlerContext {
OpenRegion {
region_ident,
region_storage_path,
region_options,
mut region_options,
region_wal_options,
skip_wal_replay,
}: OpenRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
// TODO(niebayes): extends region options with region_wal_options.
let _ = region_wal_options;
prepare_wal_option(&mut region_options, region_id, &region_wal_options);
let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),
@@ -42,10 +42,8 @@ impl HandlerContext {
skip_wal_replay,
});
let result = self.region_server.handle_request(region_id, request).await;
let success = result.is_ok();
let error = result.as_ref().map_err(|e| e.output_msg()).err();
InstructionReply::OpenRegion(SimpleReply {
result: success,
error,

View File

@@ -42,6 +42,7 @@ use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
pub use manager::RegionMigrationProcedureTask;
use serde::{Deserialize, Serialize};
use snafu::{location, Location, OptionExt, ResultExt};
use store_api::storage::RegionId;

View File

@@ -56,13 +56,24 @@ impl Drop for RegionMigrationProcedureGuard {
}
#[derive(Debug, Clone)]
pub(crate) struct RegionMigrationProcedureTask {
pub struct RegionMigrationProcedureTask {
pub(crate) cluster_id: ClusterId,
pub(crate) region_id: RegionId,
pub(crate) from_peer: Peer,
pub(crate) to_peer: Peer,
}
impl RegionMigrationProcedureTask {
pub fn new(cluster_id: ClusterId, region_id: RegionId, from_peer: Peer, to_peer: Peer) -> Self {
Self {
cluster_id,
region_id,
from_peer,
to_peer,
}
}
}
impl Display for RegionMigrationProcedureTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
@@ -223,7 +234,7 @@ impl RegionMigrationManager {
}
/// Submits a new region migration procedure.
pub(crate) async fn submit_procedure(
pub async fn submit_procedure(
&self,
task: RegionMigrationProcedureTask,
) -> Result<Option<ProcedureId>> {

View File

@@ -43,17 +43,18 @@ use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions};
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use servers::grpc::GrpcServer;
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use tempfile::TempDir;
use tonic::transport::Server;
use tower::service_fn;
use crate::test_util::{
self, create_datanode_opts, create_tmp_dir_and_datanode_opts, FileDirGuard, StorageGuard,
StorageType,
StorageType, PEER_PLACEHOLDER_ADDR,
};
pub struct GreptimeDbCluster {
@@ -75,6 +76,8 @@ pub struct GreptimeDbClusterBuilder {
datanodes: Option<u32>,
wal_config: WalConfig,
meta_wal_config: MetaWalConfig,
shared_home_dir: Option<Arc<TempDir>>,
meta_selector: Option<SelectorRef>,
}
impl GreptimeDbClusterBuilder {
@@ -102,34 +105,53 @@ impl GreptimeDbClusterBuilder {
datanodes: None,
wal_config: WalConfig::default(),
meta_wal_config: MetaWalConfig::default(),
shared_home_dir: None,
meta_selector: None,
}
}
#[must_use]
pub fn with_store_config(mut self, store_config: ObjectStoreConfig) -> Self {
self.store_config = Some(store_config);
self
}
#[must_use]
pub fn with_store_providers(mut self, store_providers: Vec<StorageType>) -> Self {
self.store_providers = Some(store_providers);
self
}
#[must_use]
pub fn with_datanodes(mut self, datanodes: u32) -> Self {
self.datanodes = Some(datanodes);
self
}
#[must_use]
pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self {
self.wal_config = wal_config;
self
}
#[must_use]
pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self {
self.meta_wal_config = wal_meta;
self
}
#[must_use]
pub fn with_shared_home_dir(mut self, shared_home_dir: Arc<TempDir>) -> Self {
self.shared_home_dir = Some(shared_home_dir);
self
}
#[must_use]
pub fn with_meta_selector(mut self, selector: SelectorRef) -> Self {
self.meta_selector = Some(selector);
self
}
pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);
@@ -147,7 +169,13 @@ impl GreptimeDbClusterBuilder {
..Default::default()
};
let meta_srv = self.build_metasrv(opt, datanode_clients.clone()).await;
let meta_srv = meta_srv::mocks::mock(
opt,
self.kv_backend.clone(),
self.meta_selector.clone(),
Some(datanode_clients.clone()),
)
.await;
let (datanode_instances, storage_guards, dir_guards) =
self.build_datanodes(meta_srv.clone(), datanodes).await;
@@ -175,14 +203,6 @@ impl GreptimeDbClusterBuilder {
}
}
async fn build_metasrv(
&self,
opt: MetaSrvOptions,
datanode_clients: Arc<DatanodeClients>,
) -> MockInfo {
meta_srv::mocks::mock(opt, self.kv_backend.clone(), None, Some(datanode_clients)).await
}
async fn build_datanodes(
&self,
meta_srv: MockInfo,
@@ -200,10 +220,15 @@ impl GreptimeDbClusterBuilder {
let datanode_id = i as u64 + 1;
let mode = Mode::Distributed;
let mut opts = if let Some(store_config) = &self.store_config {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
let home_dir = if let Some(home_dir) = &self.shared_home_dir {
home_dir.path().to_str().unwrap().to_string()
} else {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
dir_guards.push(FileDirGuard::new(home_tmp_dir));
dir_guards.push(FileDirGuard::new(home_tmp_dir));
home_dir
};
create_datanode_opts(
mode,
@@ -370,8 +395,8 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) {
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
// "127.0.0.1:3001" is just a placeholder, does not actually connect to it.
let addr = "127.0.0.1:3001";
// `PEER_PLACEHOLDER_ADDR` is just a placeholder, does not actually connect to it.
let addr = PEER_PLACEHOLDER_ADDR;
let channel_manager = ChannelManager::new();
let _ = channel_manager
.reset_with_connector(

View File

@@ -65,6 +65,7 @@ impl GreptimeDbStandaloneBuilder {
}
}
#[must_use]
pub fn with_default_store_type(self, store_type: StorageType) -> Self {
Self {
default_store: Some(store_type),
@@ -73,6 +74,7 @@ impl GreptimeDbStandaloneBuilder {
}
#[cfg(test)]
#[must_use]
pub fn with_store_providers(self, store_providers: Vec<StorageType>) -> Self {
Self {
store_providers: Some(store_providers),
@@ -81,6 +83,7 @@ impl GreptimeDbStandaloneBuilder {
}
#[cfg(test)]
#[must_use]
pub fn with_plugin(self, plugin: Plugins) -> Self {
Self {
plugin: Some(plugin),
@@ -88,11 +91,13 @@ impl GreptimeDbStandaloneBuilder {
}
}
#[must_use]
pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self {
self.wal_config = wal_config;
self
}
#[must_use]
pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self {
self.meta_wal_config = wal_meta;
self

View File

@@ -27,6 +27,7 @@ use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
use common_recordbatch::util;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::warn;
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datanode::config::{
@@ -36,6 +37,7 @@ use datanode::config::{
use frontend::frontend::TomlSerializable;
use frontend::instance::Instance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use futures::future::BoxFuture;
use object_store::services::{Azblob, Gcs, Oss, S3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
@@ -54,6 +56,8 @@ use session::context::QueryContext;
use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:3001";
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum StorageType {
S3,
@@ -662,3 +666,22 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
.await
.unwrap();
}
pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return;
}
let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();
test(endpoints).await
}

View File

@@ -100,7 +100,7 @@ pub async fn test_region_failover(store_type: StorageType) {
let table_id = prepare_testing_table(&cluster).await;
let results = write_datas(&frontend, logical_timer).await;
let results = insert_values(&frontend, logical_timer).await;
logical_timer += 1000;
for result in results {
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
@@ -141,12 +141,12 @@ pub async fn test_region_failover(store_type: StorageType) {
// Inserts data to each datanode after failover
let frontend = cluster.frontend.clone();
let results = write_datas(&frontend, logical_timer).await;
let results = insert_values(&frontend, logical_timer).await;
for result in results {
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
}
assert_writes(&frontend).await;
assert_values(&frontend).await;
assert!(!distribution.contains_key(&failed_region.datanode_id));
@@ -179,12 +179,12 @@ async fn has_route_cache(instance: &Arc<Instance>, table_id: TableId) -> bool {
.is_some()
}
async fn write_datas(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {
async fn insert_values(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {
let query_ctx = QueryContext::arc();
let mut results = Vec::new();
for range in [5, 15, 25, 55] {
let result = write_data(
let result = insert_value(
instance,
&format!("INSERT INTO my_table VALUES ({},{})", range, ts),
query_ctx.clone(),
@@ -196,7 +196,7 @@ async fn write_datas(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Ou
results
}
async fn write_data(
async fn insert_value(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
@@ -204,7 +204,7 @@ async fn write_data(
instance.do_query(sql, query_ctx).await.remove(0)
}
async fn assert_writes(instance: &Arc<Instance>) {
async fn assert_values(instance: &Arc<Instance>) {
let query_ctx = QueryContext::arc();
let result = instance

View File

@@ -0,0 +1,288 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_config::wal::KafkaConfig;
use common_config::WalConfig;
use common_meta::key::{RegionDistribution, TableMetadataManagerRef};
use common_meta::peer::Peer;
use common_meta::wal::kafka::KafkaConfig as MetaKafkaConfig;
use common_meta::wal::WalConfig as MetaWalConfig;
use common_query::Output;
use common_telemetry::info;
use common_test_util::temp_dir::create_temp_dir;
use frontend::error::Result as FrontendResult;
use frontend::instance::Instance;
use futures::future::BoxFuture;
use meta_srv::error::Result as MetaResult;
use meta_srv::metasrv::SelectorContext;
use meta_srv::procedure::region_migration::RegionMigrationProcedureTask;
use meta_srv::selector::{Namespace, Selector, SelectorOptions};
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
use tests_integration::test_util::{
check_output_stream, get_test_store_config, run_test_with_kafka_wal, StorageType,
PEER_PLACEHOLDER_ADDR,
};
use uuid::Uuid;
const TEST_TABLE_NAME: &str = "migration_target";
#[tokio::test(flavor = "multi_thread")]
async fn test_region_migration_fs() {
common_telemetry::init_default_ut_logging();
run_test_with_kafka_wal(|endpoints| {
Box::pin(async move { test_region_migration(StorageType::File, endpoints).await })
})
.await
}
pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<String>) {
let cluster_name = "test_region_migration";
let peer_factory = |id| Peer {
id,
addr: PEER_PLACEHOLDER_ADDR.to_string(),
};
// Prepares test cluster.
let (store_config, _guard) = get_test_store_config(&store_type);
let home_dir = create_temp_dir("test_migration_data_home");
let datanodes = 5u64;
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
let const_selector = Arc::new(ConstNodeSelector {
peers: vec![peer_factory(1), peer_factory(2), peer_factory(3)],
});
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_wal_config(WalConfig::Kafka(KafkaConfig {
broker_endpoints: endpoints.clone(),
linger: Duration::from_millis(25),
..Default::default()
}))
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
..Default::default()
}))
.with_shared_home_dir(Arc::new(home_dir))
.with_meta_selector(const_selector.clone())
.build()
.await;
let mut logical_timer = 1685508715000;
let table_metadata_manager = cluster.meta_srv.table_metadata_manager().clone();
// Prepares test table.
let table_id = prepare_testing_table(&cluster).await;
// Inserts data
let results = insert_values(&cluster.frontend, logical_timer).await;
logical_timer += 1000;
for result in results {
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
}
// The region distribution
let mut distribution = find_region_distribution(&table_metadata_manager, table_id).await;
// Selecting target of region migration.
let region_migration_manager = cluster.meta_srv.region_migration_manager();
let (from_peer_id, from_regions) = distribution.pop_first().unwrap();
info!(
"Selecting from peer: {from_peer_id}, and regions: {:?}",
from_regions
);
let (to_peer_id, mut to_regions) = distribution.pop_first().unwrap();
info!(
"Selecting to peer: {from_peer_id}, and regions: {:?}",
to_regions
);
let region_id = RegionId::new(table_id, from_regions[0]);
// Trigger region migration.
let procedure = region_migration_manager
.submit_procedure(RegionMigrationProcedureTask::new(
0,
region_id,
peer_factory(from_peer_id),
peer_factory(to_peer_id),
))
.await
.unwrap();
info!("Started region procedure: {}!", procedure.unwrap());
// Prepares expected region distribution.
to_regions.extend(from_regions);
// Keeps asc order.
to_regions.sort();
distribution.insert(to_peer_id, to_regions);
// Waits condition
wait_condition(
Duration::from_secs(10),
Box::pin(async move {
loop {
let region_migration =
find_region_distribution(&table_metadata_manager, table_id).await;
if region_migration == distribution {
info!("Found new distribution: {region_migration:?}");
break;
} else {
info!("Found the unexpected distribution: {region_migration:?}, expected: {distribution:?}");
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
}),
)
.await;
// Inserts more table.
let results = insert_values(&cluster.frontend, logical_timer).await;
for result in results {
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
}
// Asserts the writes.
assert_values(&cluster.frontend).await;
// Triggers again.
let procedure = region_migration_manager
.submit_procedure(RegionMigrationProcedureTask::new(
0,
region_id,
peer_factory(from_peer_id),
peer_factory(to_peer_id),
))
.await
.unwrap();
assert!(procedure.is_none());
}
pub struct ConstNodeSelector {
peers: Vec<Peer>,
}
#[async_trait::async_trait]
impl Selector for ConstNodeSelector {
type Context = SelectorContext;
type Output = Vec<Peer>;
async fn select(
&self,
_ns: Namespace,
_ctx: &Self::Context,
_opts: SelectorOptions,
) -> MetaResult<Self::Output> {
Ok(self.peers.clone())
}
}
async fn wait_condition(timeout: Duration, condition: BoxFuture<'static, ()>) {
tokio::time::timeout(timeout, condition).await.unwrap();
}
async fn assert_values(instance: &Arc<Instance>) {
let query_ctx = QueryContext::arc();
let result = instance
.do_query(
&format!("select * from {TEST_TABLE_NAME} order by i, ts"),
query_ctx,
)
.await
.remove(0);
let expected = "\
+----+---------------------+
| i | ts |
+----+---------------------+
| 5 | 2023-05-31T04:51:55 |
| 5 | 2023-05-31T04:51:56 |
| 15 | 2023-05-31T04:51:55 |
| 15 | 2023-05-31T04:51:56 |
| 55 | 2023-05-31T04:51:55 |
| 55 | 2023-05-31T04:51:56 |
+----+---------------------+";
check_output_stream(result.unwrap(), expected).await;
}
async fn prepare_testing_table(cluster: &GreptimeDbCluster) -> TableId {
let sql = format!(
r"
CREATE TABLE {TEST_TABLE_NAME} (
i INT PRIMARY KEY,
ts TIMESTAMP TIME INDEX,
) PARTITION BY RANGE COLUMNS (i) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)"
);
let mut result = cluster.frontend.do_query(&sql, QueryContext::arc()).await;
let output = result.remove(0).unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
let table = cluster
.frontend
.catalog_manager()
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, TEST_TABLE_NAME)
.await
.unwrap()
.unwrap();
table.table_info().table_id()
}
async fn find_region_distribution(
table_metadata_manager: &TableMetadataManagerRef,
table_id: TableId,
) -> RegionDistribution {
table_metadata_manager
.table_route_manager()
.get_region_distribution(table_id)
.await
.unwrap()
.unwrap()
}
async fn insert_values(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {
let query_ctx = QueryContext::arc();
let mut results = Vec::new();
for range in [5, 15, 55] {
let result = insert_value(
instance,
&format!("INSERT INTO {TEST_TABLE_NAME} VALUES ({},{})", range, ts),
query_ctx.clone(),
)
.await;
results.push(result);
}
results
}
async fn insert_value(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> FrontendResult<Output> {
instance.do_query(sql, query_ctx).await.remove(0)
}