mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-01 21:00:38 +00:00
feat: add upgrade candidate region step (#2829)
* feat: add upgrade candidate region step * chore: apply suggestions from CR * chore: apply suggestions from CR
This commit is contained in:
@@ -33,5 +33,8 @@ pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
|
||||
/// The lease seconds of metasrv leader.
|
||||
pub const META_LEASE_SECS: u64 = 3;
|
||||
|
||||
// In a lease, there are two opportunities for renewal.
|
||||
/// In a lease, there are two opportunities for renewal.
|
||||
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
|
||||
|
||||
/// The default mailbox round-trip timeout.
|
||||
pub const MAILBOX_RTT_SECS: u64 = 1;
|
||||
|
||||
@@ -111,6 +111,7 @@ impl OpenRegion {
|
||||
/// The instruction of downgrading leader region.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DowngradeRegion {
|
||||
/// The [RegionId].
|
||||
pub region_id: RegionId,
|
||||
}
|
||||
|
||||
@@ -120,20 +121,67 @@ impl Display for DowngradeRegion {
|
||||
}
|
||||
}
|
||||
|
||||
/// Upgrades a follower region to leader region.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct UpgradeRegion {
|
||||
/// The [RegionId].
|
||||
pub region_id: RegionId,
|
||||
/// The `last_entry_id` of old leader region.
|
||||
pub last_entry_id: Option<u64>,
|
||||
/// The second of waiting for a wal replay.
|
||||
///
|
||||
/// `None` stands for no wait,
|
||||
/// it's helpful to verify whether the leader region is ready.
|
||||
pub wait_for_replay_secs: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Display)]
|
||||
pub enum Instruction {
|
||||
/// Opens a region.
|
||||
///
|
||||
/// - Returns true if a specified region exists.
|
||||
OpenRegion(OpenRegion),
|
||||
/// Closes a region.
|
||||
///
|
||||
/// - Returns true if a specified region does not exist.
|
||||
CloseRegion(RegionIdent),
|
||||
/// Upgrades a region.
|
||||
UpgradeRegion(UpgradeRegion),
|
||||
/// Downgrades a region.
|
||||
DowngradeRegion(DowngradeRegion),
|
||||
/// Invalidates a specified table cache.
|
||||
InvalidateTableIdCache(TableId),
|
||||
/// Invalidates a specified table name index cache.
|
||||
InvalidateTableNameCache(TableName),
|
||||
}
|
||||
|
||||
/// The reply of [UpgradeRegion].
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct UpgradeRegionReply {
|
||||
/// Returns true if `last_entry_id` has been replayed to the latest.
|
||||
pub ready: bool,
|
||||
/// Indicates whether the region exists.
|
||||
pub exists: bool,
|
||||
/// Returns error if any.
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
impl Display for UpgradeRegionReply {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"(ready={}, exists={}, error={:?})",
|
||||
self.ready, self.exists, self.error
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum InstructionReply {
|
||||
OpenRegion(SimpleReply),
|
||||
CloseRegion(SimpleReply),
|
||||
UpgradeRegion(UpgradeRegionReply),
|
||||
InvalidateTableCache(SimpleReply),
|
||||
DowngradeRegion(DowngradeRegionReply),
|
||||
}
|
||||
@@ -143,6 +191,7 @@ impl Display for InstructionReply {
|
||||
match self {
|
||||
Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
|
||||
Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
|
||||
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
|
||||
Self::InvalidateTableCache(reply) => {
|
||||
write!(f, "InstructionReply::Invalidate({})", reply)
|
||||
}
|
||||
|
||||
@@ -62,6 +62,9 @@ impl RegionHeartbeatResponseHandler {
|
||||
let close_region_req = RegionRequest::Close(RegionCloseRequest {});
|
||||
Ok((region_id, close_region_req))
|
||||
}
|
||||
Instruction::UpgradeRegion(_) => {
|
||||
todo!()
|
||||
}
|
||||
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
|
||||
InvalidHeartbeatResponseSnafu.fail()
|
||||
}
|
||||
@@ -86,6 +89,9 @@ impl RegionHeartbeatResponseHandler {
|
||||
result: false,
|
||||
error: None,
|
||||
}),
|
||||
Instruction::UpgradeRegion(_) => {
|
||||
todo!()
|
||||
}
|
||||
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
|
||||
InstructionReply::InvalidateTableCache(SimpleReply {
|
||||
result: false,
|
||||
@@ -118,6 +124,9 @@ impl RegionHeartbeatResponseHandler {
|
||||
reply.error = error;
|
||||
}
|
||||
},
|
||||
InstructionReply::UpgradeRegion(_) => {
|
||||
todo!()
|
||||
}
|
||||
InstructionReply::InvalidateTableCache(reply) => {
|
||||
reply.result = success;
|
||||
reply.error = error;
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::any::Any;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::distributed_time_constants::{MAILBOX_RTT_SECS, REGION_LEASE_SECS};
|
||||
use common_meta::instruction::{
|
||||
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
|
||||
};
|
||||
@@ -31,7 +31,7 @@ use crate::handler::HeartbeatMailbox;
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
const DOWNGRADE_LEADER_REGION_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
const DOWNGRADE_LEADER_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct DowngradeLeaderRegion {
|
||||
@@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion {
|
||||
tokio::time::sleep_until(*deadline).await;
|
||||
}
|
||||
|
||||
Ok(Box::new(UpgradeCandidateRegion))
|
||||
Ok(Box::<UpgradeCandidateRegion>::default())
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
@@ -159,7 +159,7 @@ impl DowngradeLeaderRegion {
|
||||
}
|
||||
Err(error::Error::MailboxTimeout { .. }) => {
|
||||
let reason = format!(
|
||||
"Mailbox received timeout for downgrade leader region {region_id} on Datanode {:?}",
|
||||
"Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}",
|
||||
leader,
|
||||
);
|
||||
error::RetryLaterSnafu { reason }.fail()
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::ddl::utils::region_storage_path;
|
||||
use common_meta::distributed_time_constants::MAILBOX_RTT_SECS;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
|
||||
use common_meta::RegionIdent;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -29,7 +30,7 @@ use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeader
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct OpenCandidateRegion;
|
||||
@@ -152,7 +153,7 @@ impl OpenCandidateRegion {
|
||||
} else {
|
||||
error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"Region {region_id} is not opened by Datanode {:?}, error: {error:?}",
|
||||
"Region {region_id} is not opened by datanode {:?}, error: {error:?}",
|
||||
candidate,
|
||||
),
|
||||
}
|
||||
@@ -161,7 +162,7 @@ impl OpenCandidateRegion {
|
||||
}
|
||||
Err(error::Error::MailboxTimeout { .. }) => {
|
||||
let reason = format!(
|
||||
"Mailbox received timeout for open candidate region {region_id} on Datanode {:?}",
|
||||
"Mailbox received timeout for open candidate region {region_id} on datanode {:?}",
|
||||
candidate,
|
||||
);
|
||||
error::RetryLaterSnafu { reason }.fail()
|
||||
|
||||
@@ -32,9 +32,9 @@ use crate::procedure::region_migration::{Context, State};
|
||||
pub enum UpdateMetadata {
|
||||
/// Downgrades the leader region.
|
||||
Downgrade,
|
||||
/// Upgrade the candidate region.
|
||||
/// Upgrades the candidate region.
|
||||
Upgrade,
|
||||
/// Rollback the downgraded leader region.
|
||||
/// Rolls back the downgraded region.
|
||||
Rollback,
|
||||
}
|
||||
|
||||
|
||||
@@ -13,19 +13,55 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::distributed_time_constants::MAILBOX_RTT_SECS;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
|
||||
use common_telemetry::warn;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::error::Result;
|
||||
use super::update_metadata::UpdateMetadata;
|
||||
use crate::error::{self, Result};
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct UpgradeCandidateRegion;
|
||||
pub struct UpgradeCandidateRegion {
|
||||
// The optimistic retry times.
|
||||
optimistic_retry: usize,
|
||||
// The retry initial interval.
|
||||
retry_initial_interval: Duration,
|
||||
// The replay timeout of a instruction.
|
||||
replay_timeout: Duration,
|
||||
// If it's true it requires the candidate region MUST replay the WAL to the latest entry id.
|
||||
// Otherwise, it will rollback to the old leader region.
|
||||
require_ready: bool,
|
||||
}
|
||||
|
||||
impl Default for UpgradeCandidateRegion {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
optimistic_retry: 3,
|
||||
retry_initial_interval: Duration::from_millis(500),
|
||||
replay_timeout: Duration::from_millis(1000),
|
||||
require_ready: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde]
|
||||
impl State for UpgradeCandidateRegion {
|
||||
async fn next(&mut self, _ctx: &mut Context) -> Result<Box<dyn State>> {
|
||||
todo!();
|
||||
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
|
||||
if self.upgrade_region_with_retry(ctx).await {
|
||||
Ok(Box::new(UpdateMetadata::Upgrade))
|
||||
} else {
|
||||
Ok(Box::new(UpdateMetadata::Rollback))
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
@@ -33,4 +69,494 @@ impl State for UpgradeCandidateRegion {
|
||||
}
|
||||
}
|
||||
|
||||
impl UpgradeCandidateRegion {}
|
||||
impl UpgradeCandidateRegion {
|
||||
const UPGRADE_CANDIDATE_REGION_RTT: Duration = Duration::from_secs(MAILBOX_RTT_SECS);
|
||||
|
||||
/// Returns the timeout of the upgrade candidate region.
|
||||
///
|
||||
/// Equals `replay_timeout` + RTT
|
||||
fn send_upgrade_candidate_region_timeout(&self) -> Duration {
|
||||
self.replay_timeout + UpgradeCandidateRegion::UPGRADE_CANDIDATE_REGION_RTT
|
||||
}
|
||||
|
||||
/// Builds upgrade region instruction.
|
||||
fn build_upgrade_region_instruction(&self, ctx: &Context) -> Instruction {
|
||||
let pc = &ctx.persistent_ctx;
|
||||
let region_id = pc.region_id;
|
||||
let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
|
||||
|
||||
Instruction::UpgradeRegion(UpgradeRegion {
|
||||
region_id,
|
||||
last_entry_id,
|
||||
wait_for_replay_secs: Some(self.replay_timeout.as_secs()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Tries to upgrade a candidate region.
|
||||
///
|
||||
/// Retry:
|
||||
/// - If `require_ready` is true, but the candidate region returns `ready` is false.
|
||||
/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
|
||||
///
|
||||
/// Abort:
|
||||
/// - The candidate region doesn't exist.
|
||||
/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
|
||||
/// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
|
||||
/// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
|
||||
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
|
||||
/// - Invalid JSON (impossible).
|
||||
async fn upgrade_region(&self, ctx: &Context, upgrade_instruction: &Instruction) -> Result<()> {
|
||||
let pc = &ctx.persistent_ctx;
|
||||
let region_id = pc.region_id;
|
||||
let candidate = &pc.to_peer;
|
||||
|
||||
let msg = MailboxMessage::json_message(
|
||||
&format!("Upgrade candidate region: {}", region_id),
|
||||
&format!("Meta@{}", ctx.server_addr()),
|
||||
&format!("Datanode-{}@{}", candidate.id, candidate.addr),
|
||||
common_time::util::current_time_millis(),
|
||||
upgrade_instruction,
|
||||
)
|
||||
.with_context(|_| error::SerializeToJsonSnafu {
|
||||
input: upgrade_instruction.to_string(),
|
||||
})?;
|
||||
|
||||
let ch = Channel::Datanode(candidate.id);
|
||||
let receiver = ctx
|
||||
.mailbox
|
||||
.send(&ch, msg, self.send_upgrade_candidate_region_timeout())
|
||||
.await?;
|
||||
|
||||
match receiver.await? {
|
||||
Ok(msg) => {
|
||||
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||
let InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready,
|
||||
exists,
|
||||
error,
|
||||
}) = reply
|
||||
else {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
reason: "Unexpected reply of the upgrade region instruction",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
// Notes: The order of handling is important.
|
||||
if error.is_some() {
|
||||
return error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"Failed to upgrade the region {} on datanode {:?}, error: {:?}",
|
||||
region_id, candidate, error
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
ensure!(
|
||||
exists,
|
||||
error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Expected region {} doesn't exist on datanode {:?}",
|
||||
region_id, candidate
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
if self.require_ready && !ready {
|
||||
return error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"Candidate region {} still replaying the wal on datanode {:?}",
|
||||
region_id, candidate
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(error::Error::MailboxTimeout { .. }) => {
|
||||
let reason = format!(
|
||||
"Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}",
|
||||
candidate,
|
||||
);
|
||||
error::RetryLaterSnafu { reason }.fail()
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
/// Upgrades a candidate region.
|
||||
///
|
||||
/// Returns true if the candidate region is upgraded successfully.
|
||||
async fn upgrade_region_with_retry(&self, ctx: &Context) -> bool {
|
||||
let upgrade_instruction = self.build_upgrade_region_instruction(ctx);
|
||||
|
||||
let mut retry = 0;
|
||||
let mut upgraded = false;
|
||||
|
||||
loop {
|
||||
if let Err(err) = self.upgrade_region(ctx, &upgrade_instruction).await {
|
||||
retry += 1;
|
||||
if err.is_retryable() && retry < self.optimistic_retry {
|
||||
warn!("Failed to upgrade region, error: {err:?}, retry later");
|
||||
sleep(self.retry_initial_interval).await;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
upgraded = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
upgraded
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use api::v1::meta::mailbox_message::Payload;
|
||||
use common_meta::peer::Peer;
|
||||
use common_time::util::current_time_millis;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::procedure::region_migration::test_util::{
|
||||
new_close_region_reply, send_mock_reply, TestingEnv,
|
||||
};
|
||||
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
|
||||
|
||||
fn new_persistent_context() -> PersistentContext {
|
||||
PersistentContext {
|
||||
from_peer: Peer::empty(1),
|
||||
to_peer: Peer::empty(2),
|
||||
region_id: RegionId::new(1024, 1),
|
||||
cluster_id: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn new_upgrade_region_reply(
|
||||
id: u64,
|
||||
ready: bool,
|
||||
|
||||
exists: bool,
|
||||
|
||||
error: Option<String>,
|
||||
) -> MailboxMessage {
|
||||
MailboxMessage {
|
||||
id,
|
||||
subject: "mock".to_string(),
|
||||
from: "datanode".to_string(),
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready,
|
||||
exists,
|
||||
error,
|
||||
}))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_datanode_is_unreachable() {
|
||||
let state = UpgradeCandidateRegion::default();
|
||||
let persistent_context = new_persistent_context();
|
||||
let env = TestingEnv::new();
|
||||
let ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let instruction = &state.build_upgrade_region_instruction(&ctx);
|
||||
let err = state.upgrade_region(&ctx, instruction).await.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::PusherNotFound { .. });
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_pusher_dropped() {
|
||||
let state = UpgradeCandidateRegion::default();
|
||||
let persistent_context = new_persistent_context();
|
||||
let to_peer_id = persistent_context.to_peer.id;
|
||||
|
||||
let mut env = TestingEnv::new();
|
||||
let ctx = env.context_factory().new_context(persistent_context);
|
||||
let mailbox_ctx = env.mailbox_context();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
|
||||
.await;
|
||||
|
||||
drop(rx);
|
||||
|
||||
let instruction = &state.build_upgrade_region_instruction(&ctx);
|
||||
let err = state.upgrade_region(&ctx, instruction).await.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::PushMessage { .. });
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unexpected_instruction_reply() {
|
||||
let state = UpgradeCandidateRegion::default();
|
||||
let persistent_context = new_persistent_context();
|
||||
let to_peer_id = persistent_context.to_peer.id;
|
||||
|
||||
let mut env = TestingEnv::new();
|
||||
let ctx = env.context_factory().new_context(persistent_context);
|
||||
let mailbox_ctx = env.mailbox_context();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
|
||||
.await;
|
||||
|
||||
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
|
||||
|
||||
let instruction = &state.build_upgrade_region_instruction(&ctx);
|
||||
let err = state.upgrade_region(&ctx, instruction).await.unwrap_err();
|
||||
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_upgrade_region_failed() {
|
||||
let state = UpgradeCandidateRegion::default();
|
||||
let persistent_context = new_persistent_context();
|
||||
let to_peer_id = persistent_context.to_peer.id;
|
||||
|
||||
let mut env = TestingEnv::new();
|
||||
let ctx = env.context_factory().new_context(persistent_context);
|
||||
let mailbox_ctx = env.mailbox_context();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
|
||||
.await;
|
||||
|
||||
// A reply contains an error.
|
||||
send_mock_reply(mailbox, rx, |id| {
|
||||
Ok(new_upgrade_region_reply(
|
||||
id,
|
||||
true,
|
||||
true,
|
||||
Some("test mocked".to_string()),
|
||||
))
|
||||
});
|
||||
|
||||
let instruction = &state.build_upgrade_region_instruction(&ctx);
|
||||
let err = state.upgrade_region(&ctx, instruction).await.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::RetryLater { .. });
|
||||
assert!(err.is_retryable());
|
||||
assert!(err.to_string().contains("test mocked"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_upgrade_region_not_found() {
|
||||
let state = UpgradeCandidateRegion::default();
|
||||
let persistent_context = new_persistent_context();
|
||||
let to_peer_id = persistent_context.to_peer.id;
|
||||
|
||||
let mut env = TestingEnv::new();
|
||||
let ctx = env.context_factory().new_context(persistent_context);
|
||||
let mailbox_ctx = env.mailbox_context();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
|
||||
.await;
|
||||
|
||||
send_mock_reply(mailbox, rx, |id| {
|
||||
Ok(new_upgrade_region_reply(id, true, false, None))
|
||||
});
|
||||
|
||||
let instruction = &state.build_upgrade_region_instruction(&ctx);
|
||||
let err = state.upgrade_region(&ctx, instruction).await.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::Unexpected { .. });
|
||||
assert!(!err.is_retryable());
|
||||
assert!(err.to_string().contains("doesn't exist"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_upgrade_region_require_ready() {
|
||||
let mut state = UpgradeCandidateRegion {
|
||||
require_ready: true,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let persistent_context = new_persistent_context();
|
||||
let to_peer_id = persistent_context.to_peer.id;
|
||||
|
||||
let mut env = TestingEnv::new();
|
||||
let ctx = env.context_factory().new_context(persistent_context);
|
||||
let mailbox_ctx = env.mailbox_context();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
|
||||
.await;
|
||||
|
||||
send_mock_reply(mailbox, rx, |id| {
|
||||
Ok(new_upgrade_region_reply(id, false, true, None))
|
||||
});
|
||||
|
||||
let instruction = &state.build_upgrade_region_instruction(&ctx);
|
||||
let err = state.upgrade_region(&ctx, instruction).await.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::RetryLater { .. });
|
||||
assert!(err.is_retryable());
|
||||
assert!(err.to_string().contains("still replaying the wal"));
|
||||
|
||||
// Sets the `require_ready` to false.
|
||||
state.require_ready = false;
|
||||
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
|
||||
.await;
|
||||
|
||||
send_mock_reply(mailbox, rx, |id| {
|
||||
Ok(new_upgrade_region_reply(id, false, true, None))
|
||||
});
|
||||
|
||||
let instruction = &state.build_upgrade_region_instruction(&ctx);
|
||||
state.upgrade_region(&ctx, instruction).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_upgrade_region_with_retry_ok() {
|
||||
let mut state = Box::<UpgradeCandidateRegion>::default();
|
||||
state.retry_initial_interval = Duration::from_millis(100);
|
||||
let persistent_context = new_persistent_context();
|
||||
let to_peer_id = persistent_context.to_peer.id;
|
||||
|
||||
let mut env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let mailbox_ctx = env.mailbox_context();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
|
||||
.await;
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let resp = rx.recv().await.unwrap().unwrap();
|
||||
let reply_id = resp.mailbox_message.unwrap().id;
|
||||
mailbox
|
||||
.on_recv(
|
||||
reply_id,
|
||||
Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// retry: 1
|
||||
let resp = rx.recv().await.unwrap().unwrap();
|
||||
let reply_id = resp.mailbox_message.unwrap().id;
|
||||
mailbox
|
||||
.on_recv(
|
||||
reply_id,
|
||||
Ok(new_upgrade_region_reply(reply_id, false, true, None)),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// retry: 2
|
||||
let resp = rx.recv().await.unwrap().unwrap();
|
||||
let reply_id = resp.mailbox_message.unwrap().id;
|
||||
mailbox
|
||||
.on_recv(
|
||||
reply_id,
|
||||
Ok(new_upgrade_region_reply(reply_id, true, true, None)),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let next = state.next(&mut ctx).await.unwrap();
|
||||
|
||||
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
|
||||
|
||||
assert_matches!(update_metadata, UpdateMetadata::Upgrade);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_upgrade_region_with_retry_failed() {
|
||||
let mut state = Box::<UpgradeCandidateRegion>::default();
|
||||
state.retry_initial_interval = Duration::from_millis(100);
|
||||
let persistent_context = new_persistent_context();
|
||||
let to_peer_id = persistent_context.to_peer.id;
|
||||
|
||||
let mut env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let mailbox_ctx = env.mailbox_context();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
|
||||
.await;
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let resp = rx.recv().await.unwrap().unwrap();
|
||||
let reply_id = resp.mailbox_message.unwrap().id;
|
||||
mailbox
|
||||
.on_recv(
|
||||
reply_id,
|
||||
Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// retry: 1
|
||||
let resp = rx.recv().await.unwrap().unwrap();
|
||||
let reply_id = resp.mailbox_message.unwrap().id;
|
||||
mailbox
|
||||
.on_recv(
|
||||
reply_id,
|
||||
Ok(new_upgrade_region_reply(reply_id, false, true, None)),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// retry: 2
|
||||
let resp = rx.recv().await.unwrap().unwrap();
|
||||
let reply_id = resp.mailbox_message.unwrap().id;
|
||||
mailbox
|
||||
.on_recv(
|
||||
reply_id,
|
||||
Ok(new_upgrade_region_reply(reply_id, false, false, None)),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let next = state.next(&mut ctx).await.unwrap();
|
||||
|
||||
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
|
||||
assert_matches!(update_metadata, UpdateMetadata::Rollback);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user