diff --git a/Cargo.lock b/Cargo.lock index aa0bbd3e11..0a83a7d086 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4832,6 +4832,7 @@ dependencies = [ "futures", "h2", "http-body", + "humantime", "humantime-serde", "itertools 0.10.5", "lazy_static", diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 01450f861d..62da072a10 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -33,6 +33,7 @@ etcd-client.workspace = true futures.workspace = true h2 = "0.3" http-body = "0.4" +humantime = "2.1" humantime-serde.workspace = true itertools.workspace = true lazy_static.workspace = true diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index f0c29a46df..5f185f5f0d 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -210,6 +210,12 @@ pub enum Error { location: Location, source: servers::error::Error, }, + #[snafu(display("Failed to parse duration {}", duration))] + ParseDuration { + duration: String, + #[snafu(source)] + error: humantime::DurationError, + }, #[snafu(display("Failed to parse address {}", addr))] ParseAddr { addr: String, @@ -652,7 +658,6 @@ impl ErrorExt for Error { | Error::LockNotConfig { .. } | Error::ExceededRetryLimit { .. } | Error::SendShutdownSignal { .. } - | Error::ParseAddr { .. } | Error::SchemaAlreadyExists { .. } | Error::PusherNotFound { .. } | Error::PushMessage { .. } @@ -678,6 +683,8 @@ impl ErrorExt for Error { | Error::InvalidStatKey { .. } | Error::InvalidInactiveRegionKey { .. } | Error::ParseNum { .. } + | Error::ParseAddr { .. } + | Error::ParseDuration { .. } | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } | Error::InitExportMetricsTask { .. } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 8be3ecfc98..ff5d0ecc45 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -68,6 +68,13 @@ pub struct PersistentContext { to_peer: Peer, /// The [RegionId] of migration region. region_id: RegionId, + /// The timeout of waiting for a candidate to replay the WAL. + #[serde(with = "humantime_serde", default = "default_replay_timeout")] + replay_timeout: Duration, +} + +fn default_replay_timeout() -> Duration { + Duration::from_secs(1) } impl PersistentContext { @@ -479,7 +486,7 @@ mod tests { let serialized = procedure.dump().unwrap(); - let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; + let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#; assert_eq!(expected, serialized); } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 6cb6939c93..73ded29365 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -55,6 +55,7 @@ impl Default for DowngradeLeaderRegion { #[typetag::serde] impl State for DowngradeLeaderRegion { async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + let replay_timeout = ctx.persistent_ctx.replay_timeout; // Ensures the `leader_region_lease_deadline` must exist after recovering. ctx.volatile_ctx .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS)); @@ -69,7 +70,10 @@ impl State for DowngradeLeaderRegion { } Ok(( - Box::::default(), + Box::new(UpgradeCandidateRegion { + replay_timeout, + ..Default::default() + }), Status::executing(false), )) } @@ -226,6 +230,7 @@ mod tests { to_peer: Peer::empty(2), region_id: RegionId::new(1024, 1), cluster_id: 0, + replay_timeout: Duration::from_millis(1000), } } diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 753f9268e6..ad1b88efe0 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -16,6 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt::Display; use std::sync::{Arc, RwLock}; +use std::time::Duration; use common_meta::key::table_route::TableRouteValue; use common_meta::peer::Peer; @@ -61,15 +62,23 @@ pub struct RegionMigrationProcedureTask { pub(crate) region_id: RegionId, pub(crate) from_peer: Peer, pub(crate) to_peer: Peer, + pub(crate) replay_timeout: Duration, } impl RegionMigrationProcedureTask { - pub fn new(cluster_id: ClusterId, region_id: RegionId, from_peer: Peer, to_peer: Peer) -> Self { + pub fn new( + cluster_id: ClusterId, + region_id: RegionId, + from_peer: Peer, + to_peer: Peer, + replay_timeout: Duration, + ) -> Self { Self { cluster_id, region_id, from_peer, to_peer, + replay_timeout, } } } @@ -91,6 +100,7 @@ impl From for PersistentContext { region_id, from_peer, to_peer, + replay_timeout, }: RegionMigrationProcedureTask, ) -> Self { PersistentContext { @@ -98,6 +108,7 @@ impl From for PersistentContext { from_peer, to_peer, region_id, + replay_timeout, } } } @@ -319,6 +330,7 @@ mod test { region_id, from_peer: Peer::empty(2), to_peer: Peer::empty(1), + replay_timeout: Duration::from_millis(1000), }; // Inserts one manager @@ -342,6 +354,7 @@ mod test { region_id, from_peer: Peer::empty(1), to_peer: Peer::empty(1), + replay_timeout: Duration::from_millis(1000), }; let err = manager.submit_procedure(task).await.unwrap_err(); @@ -359,6 +372,7 @@ mod test { region_id, from_peer: Peer::empty(1), to_peer: Peer::empty(2), + replay_timeout: Duration::from_millis(1000), }; let err = manager.submit_procedure(task).await.unwrap_err(); @@ -376,6 +390,7 @@ mod test { region_id, from_peer: Peer::empty(1), to_peer: Peer::empty(2), + replay_timeout: Duration::from_millis(1000), }; let table_info = new_test_table_info(1024, vec![1]).into(); @@ -403,6 +418,7 @@ mod test { region_id, from_peer: Peer::empty(1), to_peer: Peer::empty(2), + replay_timeout: Duration::from_millis(1000), }; let table_info = new_test_table_info(1024, vec![1]).into(); @@ -434,6 +450,7 @@ mod test { region_id, from_peer: Peer::empty(1), to_peer: Peer::empty(2), + replay_timeout: Duration::from_millis(1000), }; let table_info = new_test_table_info(1024, vec![1]).into(); @@ -460,6 +477,7 @@ mod test { region_id, from_peer: Peer::empty(1), to_peer: Peer::empty(2), + replay_timeout: Duration::from_millis(1000), }; let err = manager diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 4e9bb39395..c311977838 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -16,6 +16,7 @@ use std::assert_matches::assert_matches; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::time::Duration; use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader}; @@ -281,6 +282,7 @@ pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> Persis to_peer: Peer::empty(to), region_id, cluster_id: 0, + replay_timeout: Duration::from_millis(1000), } } diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 47c8c7f1d1..514be461a8 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -33,14 +33,14 @@ use crate::service::mailbox::Channel; #[derive(Debug, Serialize, Deserialize)] pub struct UpgradeCandidateRegion { // The optimistic retry times. - optimistic_retry: usize, + pub(crate) optimistic_retry: usize, // The retry initial interval. - retry_initial_interval: Duration, + pub(crate) retry_initial_interval: Duration, // The replay timeout of a instruction. - replay_timeout: Duration, + pub(crate) replay_timeout: Duration, // If it's true it requires the candidate region MUST replay the WAL to the latest entry id. // Otherwise, it will rollback to the old leader region. - require_ready: bool, + pub(crate) require_ready: bool, } impl Default for UpgradeCandidateRegion { @@ -236,6 +236,7 @@ mod tests { to_peer: Peer::empty(2), region_id: RegionId::new(1024, 1), cluster_id: 0, + replay_timeout: Duration::from_millis(1000), } } diff --git a/src/meta-srv/src/service/admin/region_migration.rs b/src/meta-srv/src/service/admin/region_migration.rs index 544a6d4085..f8bbba51d4 100644 --- a/src/meta-srv/src/service/admin/region_migration.rs +++ b/src/meta-srv/src/service/admin/region_migration.rs @@ -15,9 +15,11 @@ use std::collections::HashMap; use std::num::ParseIntError; use std::str::FromStr; +use std::time::Duration; use common_meta::peer::Peer; use common_meta::{distributed_time_constants, ClusterId}; +use humantime::parse_duration; use serde::Serialize; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -43,6 +45,7 @@ struct SubmitRegionMigrationTaskRequest { region_id: RegionId, from_peer_id: u64, to_peer_id: u64, + replay_timeout: Duration, } #[derive(Debug, Serialize)] @@ -71,6 +74,8 @@ where Ok(parse_result) } +const DEFAULT_REPLAY_TIMEOUT: Duration = Duration::from_millis(1000); + impl TryFrom<&HashMap> for SubmitRegionMigrationTaskRequest { type Error = Error; @@ -89,11 +94,18 @@ impl TryFrom<&HashMap> for SubmitRegionMigrationTaskRequest { error::MissingRequiredParameterSnafu { param: key }.fail() })?; + let replay_timeout = if let Some(duration) = params.get("replay_timeout") { + parse_duration(duration).context(error::ParseDurationSnafu { duration })? + } else { + DEFAULT_REPLAY_TIMEOUT + }; + Ok(SubmitRegionMigrationTaskRequest { cluster_id, region_id: RegionId::from_u64(region_id), from_peer_id, to_peer_id, + replay_timeout, }) } } @@ -131,6 +143,7 @@ impl SubmitRegionMigrationTaskHandler { region_id, from_peer_id, to_peer_id, + replay_timeout, } = task; let from_peer = self.lookup_peer(cluster_id, from_peer_id).await?.context( @@ -150,6 +163,7 @@ impl SubmitRegionMigrationTaskHandler { region_id, from_peer, to_peer, + replay_timeout, }) .await?; @@ -187,6 +201,7 @@ mod tests { use std::collections::HashMap; use crate::error; + use crate::service::admin::region_migration::DEFAULT_REPLAY_TIMEOUT; #[test] fn test_parse_migration_task_req() { @@ -212,6 +227,7 @@ mod tests { region_id: RegionId::new(1024, 1), from_peer_id: 1, to_peer_id: 2, + replay_timeout: DEFAULT_REPLAY_TIMEOUT }, task_req ); @@ -233,6 +249,7 @@ mod tests { region_id: RegionId::new(1024, 1), from_peer_id: 1, to_peer_id: 2, + replay_timeout: DEFAULT_REPLAY_TIMEOUT }, task_req ); diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index cd17c4f4d7..11a5595c16 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -161,6 +161,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec