feat: expose the region migration replay_timeout argument (#3129)

* feat: expose region migration args

* fix: fix ci
This commit is contained in:
Weny Xu
2024-01-10 18:47:59 +09:00
committed by GitHub
parent ec8266b969
commit 3ab370265a
10 changed files with 75 additions and 8 deletions

1
Cargo.lock generated
View File

@@ -4832,6 +4832,7 @@ dependencies = [
"futures",
"h2",
"http-body",
"humantime",
"humantime-serde",
"itertools 0.10.5",
"lazy_static",

View File

@@ -33,6 +33,7 @@ etcd-client.workspace = true
futures.workspace = true
h2 = "0.3"
http-body = "0.4"
humantime = "2.1"
humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true

View File

@@ -210,6 +210,12 @@ pub enum Error {
location: Location,
source: servers::error::Error,
},
#[snafu(display("Failed to parse duration {}", duration))]
ParseDuration {
duration: String,
#[snafu(source)]
error: humantime::DurationError,
},
#[snafu(display("Failed to parse address {}", addr))]
ParseAddr {
addr: String,
@@ -652,7 +658,6 @@ impl ErrorExt for Error {
| Error::LockNotConfig { .. }
| Error::ExceededRetryLimit { .. }
| Error::SendShutdownSignal { .. }
| Error::ParseAddr { .. }
| Error::SchemaAlreadyExists { .. }
| Error::PusherNotFound { .. }
| Error::PushMessage { .. }
@@ -678,6 +683,8 @@ impl ErrorExt for Error {
| Error::InvalidStatKey { .. }
| Error::InvalidInactiveRegionKey { .. }
| Error::ParseNum { .. }
| Error::ParseAddr { .. }
| Error::ParseDuration { .. }
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. }
| Error::InitExportMetricsTask { .. }

View File

@@ -68,6 +68,13 @@ pub struct PersistentContext {
to_peer: Peer,
/// The [RegionId] of migration region.
region_id: RegionId,
/// The timeout of waiting for a candidate to replay the WAL.
#[serde(with = "humantime_serde", default = "default_replay_timeout")]
replay_timeout: Duration,
}
fn default_replay_timeout() -> Duration {
Duration::from_secs(1)
}
impl PersistentContext {
@@ -479,7 +486,7 @@ mod tests {
let serialized = procedure.dump().unwrap();
let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}

View File

@@ -55,6 +55,7 @@ impl Default for DowngradeLeaderRegion {
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let replay_timeout = ctx.persistent_ctx.replay_timeout;
// Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
@@ -69,7 +70,10 @@ impl State for DowngradeLeaderRegion {
}
Ok((
Box::<UpgradeCandidateRegion>::default(),
Box::new(UpgradeCandidateRegion {
replay_timeout,
..Default::default()
}),
Status::executing(false),
))
}
@@ -226,6 +230,7 @@ mod tests {
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
replay_timeout: Duration::from_millis(1000),
}
}

View File

@@ -16,6 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use common_meta::key::table_route::TableRouteValue;
use common_meta::peer::Peer;
@@ -61,15 +62,23 @@ pub struct RegionMigrationProcedureTask {
pub(crate) region_id: RegionId,
pub(crate) from_peer: Peer,
pub(crate) to_peer: Peer,
pub(crate) replay_timeout: Duration,
}
impl RegionMigrationProcedureTask {
pub fn new(cluster_id: ClusterId, region_id: RegionId, from_peer: Peer, to_peer: Peer) -> Self {
pub fn new(
cluster_id: ClusterId,
region_id: RegionId,
from_peer: Peer,
to_peer: Peer,
replay_timeout: Duration,
) -> Self {
Self {
cluster_id,
region_id,
from_peer,
to_peer,
replay_timeout,
}
}
}
@@ -91,6 +100,7 @@ impl From<RegionMigrationProcedureTask> for PersistentContext {
region_id,
from_peer,
to_peer,
replay_timeout,
}: RegionMigrationProcedureTask,
) -> Self {
PersistentContext {
@@ -98,6 +108,7 @@ impl From<RegionMigrationProcedureTask> for PersistentContext {
from_peer,
to_peer,
region_id,
replay_timeout,
}
}
}
@@ -319,6 +330,7 @@ mod test {
region_id,
from_peer: Peer::empty(2),
to_peer: Peer::empty(1),
replay_timeout: Duration::from_millis(1000),
};
// Inserts one
manager
@@ -342,6 +354,7 @@ mod test {
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(1),
replay_timeout: Duration::from_millis(1000),
};
let err = manager.submit_procedure(task).await.unwrap_err();
@@ -359,6 +372,7 @@ mod test {
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
replay_timeout: Duration::from_millis(1000),
};
let err = manager.submit_procedure(task).await.unwrap_err();
@@ -376,6 +390,7 @@ mod test {
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
replay_timeout: Duration::from_millis(1000),
};
let table_info = new_test_table_info(1024, vec![1]).into();
@@ -403,6 +418,7 @@ mod test {
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
replay_timeout: Duration::from_millis(1000),
};
let table_info = new_test_table_info(1024, vec![1]).into();
@@ -434,6 +450,7 @@ mod test {
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
replay_timeout: Duration::from_millis(1000),
};
let table_info = new_test_table_info(1024, vec![1]).into();
@@ -460,6 +477,7 @@ mod test {
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
replay_timeout: Duration::from_millis(1000),
};
let err = manager

View File

@@ -16,6 +16,7 @@ use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
@@ -281,6 +282,7 @@ pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> Persis
to_peer: Peer::empty(to),
region_id,
cluster_id: 0,
replay_timeout: Duration::from_millis(1000),
}
}

View File

@@ -33,14 +33,14 @@ use crate::service::mailbox::Channel;
#[derive(Debug, Serialize, Deserialize)]
pub struct UpgradeCandidateRegion {
// The optimistic retry times.
optimistic_retry: usize,
pub(crate) optimistic_retry: usize,
// The retry initial interval.
retry_initial_interval: Duration,
pub(crate) retry_initial_interval: Duration,
// The replay timeout of a instruction.
replay_timeout: Duration,
pub(crate) replay_timeout: Duration,
// If it's true it requires the candidate region MUST replay the WAL to the latest entry id.
// Otherwise, it will rollback to the old leader region.
require_ready: bool,
pub(crate) require_ready: bool,
}
impl Default for UpgradeCandidateRegion {
@@ -236,6 +236,7 @@ mod tests {
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
replay_timeout: Duration::from_millis(1000),
}
}

View File

@@ -15,9 +15,11 @@
use std::collections::HashMap;
use std::num::ParseIntError;
use std::str::FromStr;
use std::time::Duration;
use common_meta::peer::Peer;
use common_meta::{distributed_time_constants, ClusterId};
use humantime::parse_duration;
use serde::Serialize;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -43,6 +45,7 @@ struct SubmitRegionMigrationTaskRequest {
region_id: RegionId,
from_peer_id: u64,
to_peer_id: u64,
replay_timeout: Duration,
}
#[derive(Debug, Serialize)]
@@ -71,6 +74,8 @@ where
Ok(parse_result)
}
const DEFAULT_REPLAY_TIMEOUT: Duration = Duration::from_millis(1000);
impl TryFrom<&HashMap<String, String>> for SubmitRegionMigrationTaskRequest {
type Error = Error;
@@ -89,11 +94,18 @@ impl TryFrom<&HashMap<String, String>> for SubmitRegionMigrationTaskRequest {
error::MissingRequiredParameterSnafu { param: key }.fail()
})?;
let replay_timeout = if let Some(duration) = params.get("replay_timeout") {
parse_duration(duration).context(error::ParseDurationSnafu { duration })?
} else {
DEFAULT_REPLAY_TIMEOUT
};
Ok(SubmitRegionMigrationTaskRequest {
cluster_id,
region_id: RegionId::from_u64(region_id),
from_peer_id,
to_peer_id,
replay_timeout,
})
}
}
@@ -131,6 +143,7 @@ impl SubmitRegionMigrationTaskHandler {
region_id,
from_peer_id,
to_peer_id,
replay_timeout,
} = task;
let from_peer = self.lookup_peer(cluster_id, from_peer_id).await?.context(
@@ -150,6 +163,7 @@ impl SubmitRegionMigrationTaskHandler {
region_id,
from_peer,
to_peer,
replay_timeout,
})
.await?;
@@ -187,6 +201,7 @@ mod tests {
use std::collections::HashMap;
use crate::error;
use crate::service::admin::region_migration::DEFAULT_REPLAY_TIMEOUT;
#[test]
fn test_parse_migration_task_req() {
@@ -212,6 +227,7 @@ mod tests {
region_id: RegionId::new(1024, 1),
from_peer_id: 1,
to_peer_id: 2,
replay_timeout: DEFAULT_REPLAY_TIMEOUT
},
task_req
);
@@ -233,6 +249,7 @@ mod tests {
region_id: RegionId::new(1024, 1),
from_peer_id: 1,
to_peer_id: 2,
replay_timeout: DEFAULT_REPLAY_TIMEOUT
},
task_req
);

View File

@@ -161,6 +161,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
region_id,
peer_factory(from_peer_id),
peer_factory(to_peer_id),
Duration::from_millis(1000),
))
.await
.unwrap();
@@ -207,6 +208,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
region_id,
peer_factory(from_peer_id),
peer_factory(to_peer_id),
Duration::from_millis(1000),
))
.await
.unwrap();
@@ -299,6 +301,7 @@ pub async fn test_region_migration_multiple_regions(
region_id,
peer_factory(from_peer_id),
peer_factory(to_peer_id),
Duration::from_millis(1000),
))
.await
.unwrap();
@@ -345,6 +348,7 @@ pub async fn test_region_migration_multiple_regions(
region_id,
peer_factory(from_peer_id),
peer_factory(to_peer_id),
Duration::from_millis(1000),
))
.await
.unwrap();
@@ -426,6 +430,7 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
region_id,
peer_factory(from_peer_id),
peer_factory(to_peer_id),
Duration::from_millis(1000),
))
.await
.unwrap();
@@ -473,6 +478,7 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
region_id,
peer_factory(from_peer_id),
peer_factory(to_peer_id),
Duration::from_millis(1000),
))
.await
.unwrap();
@@ -543,6 +549,7 @@ pub async fn test_region_migration_incorrect_from_peer(
region_id,
peer_factory(5),
peer_factory(1),
Duration::from_millis(1000),
))
.await
.unwrap_err();
@@ -617,6 +624,7 @@ pub async fn test_region_migration_incorrect_region_id(
region_id,
peer_factory(2),
peer_factory(1),
Duration::from_millis(1000),
))
.await
.unwrap_err();