fix: keep region failover state not changed upon failure (#2261)

This commit is contained in:
LFC
2023-08-28 12:40:47 +08:00
committed by GitHub
parent 63b22b2403
commit d8f851bef2
7 changed files with 110 additions and 51 deletions

View File

@@ -214,7 +214,7 @@ impl RegionFailoverManager {
#[derive(Serialize, Deserialize, Debug)]
struct Node {
failed_region: RegionIdent,
state: Option<Box<dyn State>>,
state: Box<dyn State>,
}
/// The "Context" of region failover procedure state machine.
@@ -233,7 +233,7 @@ pub struct RegionFailoverContext {
#[typetag::serde(tag = "region_failover_state")]
trait State: Sync + Send + Debug {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>>;
@@ -304,7 +304,7 @@ impl RegionFailoverProcedure {
let state = RegionFailoverStart::new();
let node = Node {
failed_region,
state: Some(Box::new(state)),
state: Box::new(state),
};
Self { node, context }
}
@@ -322,25 +322,18 @@ impl Procedure for RegionFailoverProcedure {
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
if let Some(state) = self.node.state.take() {
let next_state = state
.next(&self.context, &self.node.failed_region)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
self.node.state = Some(next_state);
}
Ok(self
.node
.state
.as_ref()
.map(|s| s.status())
.unwrap_or(Status::Done))
let state = &mut self.node.state;
*state = state
.next(&self.context, &self.node.failed_region)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())
}
fn dump(&self) -> ProcedureResult<String> {
@@ -362,6 +355,7 @@ impl Procedure for RegionFailoverProcedure {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Mutex;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader};
@@ -370,7 +364,8 @@ mod tests {
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::TableMetadataManager;
use common_meta::DatanodeId;
use common_procedure::BoxedProcedure;
use common_procedure::{BoxedProcedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use rand::prelude::SliceRandom;
use tokio::sync::mpsc::Receiver;
@@ -452,6 +447,11 @@ mod tests {
Self { selector: None }
}
fn with_selector(mut self, selector: SelectorRef) -> Self {
self.selector = Some(selector);
self
}
pub async fn build(self) -> TestingEnv {
let in_memory = Arc::new(MemStore::new());
let kv_store: KvStoreRef = Arc::new(MemStore::new());
@@ -531,8 +531,6 @@ mod tests {
#[tokio::test]
async fn test_region_failover_procedure() {
common_telemetry::init_default_ut_logging();
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
@@ -662,7 +660,7 @@ mod tests {
let state = RegionFailoverStart::new();
let node = Node {
failed_region,
state: Some(Box::new(state)),
state: Box::new(state),
};
let procedure = RegionFailoverProcedure {
node,
@@ -677,7 +675,76 @@ mod tests {
let n: Node = serde_json::from_str(&s).unwrap();
assert_eq!(
format!("{n:?}"),
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_ident: TableIdent { catalog: "greptime", schema: "public", table: "my_table", table_id: 1, engine: "mito" }, region_number: 1 }, state: Some(RegionFailoverStart { failover_candidate: None }) }"#
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_ident: TableIdent { catalog: "greptime", schema: "public", table: "my_table", table_id: 1, engine: "mito" }, region_number: 1 }, state: RegionFailoverStart { failover_candidate: None } }"#
);
}
#[tokio::test]
async fn test_state_not_changed_upon_failure() {
struct MySelector {
peers: Arc<Mutex<Vec<Option<Peer>>>>,
}
#[async_trait]
impl Selector for MySelector {
type Context = SelectorContext;
type Output = Vec<Peer>;
async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result<Self::Output> {
let mut peers = self.peers.lock().unwrap();
Ok(if let Some(Some(peer)) = peers.pop() {
vec![peer]
} else {
vec![]
})
}
}
// Returns a valid peer the second time called "select".
let selector = MySelector {
peers: Arc::new(Mutex::new(vec![
Some(Peer {
id: 42,
addr: "".to_string(),
}),
None,
])),
};
let env = TestingEnvBuilder::new()
.with_selector(Arc::new(selector))
.build()
.await;
let failed_region = env.failed_region(1).await;
let state = RegionFailoverStart::new();
let node = Node {
failed_region,
state: Box::new(state),
};
let mut procedure = RegionFailoverProcedure {
node,
context: env.context,
};
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
let result = procedure.execute(&ctx).await;
assert!(result.is_err());
assert!(result.unwrap_err().is_retry_later());
assert_eq!(
r#"{"region_failover_state":"RegionFailoverStart","failover_candidate":null}"#,
serde_json::to_string(&procedure.node.state).unwrap()
);
let result = procedure.execute(&ctx).await;
assert!(matches!(result, Ok(Status::Executing { persist: true })));
assert_eq!(
r#"{"region_failover_state":"DeactivateRegion","candidate":{"id":42,"addr":""},"region_lease_expiry_seconds":40}"#,
serde_json::to_string(&procedure.node.state).unwrap()
);
}
}

View File

@@ -85,7 +85,7 @@ impl ActivateRegion {
}
async fn handle_response(
self,
&self,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
@@ -102,7 +102,7 @@ impl ActivateRegion {
.fail();
};
if result {
Ok(Box::new(UpdateRegionMetadata::new(self.candidate)))
Ok(Box::new(UpdateRegionMetadata::new(self.candidate.clone())))
} else {
// The region could be just indeed cannot be opened by the candidate, retry
// would be in vain. Then why not just end the failover procedure? Because we
@@ -131,7 +131,7 @@ impl ActivateRegion {
#[typetag::serde]
impl State for ActivateRegion {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {

View File

@@ -76,7 +76,7 @@ impl DeactivateRegion {
}
async fn handle_response(
self,
&self,
ctx: &RegionFailoverContext,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
@@ -98,7 +98,7 @@ impl DeactivateRegion {
.deregister_inactive_region(failed_region)
.await?;
Ok(Box::new(ActivateRegion::new(self.candidate)))
Ok(Box::new(ActivateRegion::new(self.candidate.clone())))
} else {
// Under rare circumstances would a Datanode fail to close a Region.
// So simply retry.
@@ -114,7 +114,7 @@ impl DeactivateRegion {
// resides might be unreachable. So we wait for the region lease to expire. The
// region would be closed by its own [RegionAliveKeeper].
self.wait_for_region_lease_expiry().await;
Ok(Box::new(ActivateRegion::new(self.candidate)))
Ok(Box::new(ActivateRegion::new(self.candidate.clone())))
}
Err(e) => Err(e),
}
@@ -132,7 +132,7 @@ impl DeactivateRegion {
#[typetag::serde]
impl State for DeactivateRegion {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
@@ -144,7 +144,7 @@ impl State for DeactivateRegion {
Err(Error::PusherNotFound { .. }) => {
// See the mailbox received timeout situation comments above.
self.wait_for_region_lease_expiry().await;
return Ok(Box::new(ActivateRegion::new(self.candidate)));
return Ok(Box::new(ActivateRegion::new(self.candidate.clone())));
}
Err(e) => return Err(e),
};

View File

@@ -26,12 +26,8 @@ pub(super) struct RegionFailoverEnd;
#[async_trait]
#[typetag::serde]
impl State for RegionFailoverEnd {
async fn next(
mut self: Box<Self>,
_: &RegionFailoverContext,
_: &RegionIdent,
) -> Result<Box<dyn State>> {
Ok(self)
async fn next(&mut self, _: &RegionFailoverContext, _: &RegionIdent) -> Result<Box<dyn State>> {
Ok(Box::new(RegionFailoverEnd))
}
fn status(&self) -> Status {

View File

@@ -91,7 +91,7 @@ impl RegionFailoverStart {
#[typetag::serde]
impl State for RegionFailoverStart {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {

View File

@@ -58,7 +58,7 @@ impl InvalidateCache {
#[typetag::serde]
impl State for InvalidateCache {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
@@ -108,12 +108,11 @@ mod tests {
let _ = heartbeat_receivers.insert(frontend_id, rx);
}
let state = InvalidateCache;
let table_ident: TableIdent = failed_region.clone().into();
// lexicographical order
// frontend-4,5,6,7
let next_state = Box::new(state)
let next_state = InvalidateCache
.next(&context, &failed_region)
.await
.unwrap();

View File

@@ -131,7 +131,7 @@ fn pretty_log_table_route_change(
#[typetag::serde]
impl State for UpdateRegionMetadata {
async fn next(
mut self: Box<Self>,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
@@ -165,12 +165,9 @@ mod tests {
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let state = UpdateRegionMetadata::new(Peer::new(2, ""));
let mut state = UpdateRegionMetadata::new(Peer::new(2, ""));
let next_state = Box::new(state)
.next(&env.context, &failed_region)
.await
.unwrap();
let next_state = state.next(&env.context, &failed_region).await.unwrap();
assert_eq!(format!("{next_state:?}"), "InvalidateCache");
}