From a3ae2d7b52703a4cbb1190a4261b607c21418c4e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 29 Apr 2025 11:28:00 +0800 Subject: [PATCH] feat: flush leader region before downgrading (#5995) * feat: flush leader region before downgrading * test: add unit tests * chore: apply suggestions from CR --- src/common/meta/src/instruction.rs | 6 +- src/datanode/src/heartbeat/handler.rs | 14 +- .../src/heartbeat/handler/flush_region.rs | 62 +++- src/datanode/src/heartbeat/task_tracker.rs | 5 + .../src/procedure/region_migration.rs | 50 ++- .../downgrade_leader_region.rs | 7 +- .../region_migration/flush_leader_region.rs | 285 ++++++++++++++++++ .../region_migration/open_candidate_region.rs | 14 +- .../procedure/region_migration/test_util.rs | 6 + src/meta-srv/src/procedure/test_util.rs | 18 ++ src/meta-srv/src/procedure/wal_prune.rs | 4 +- 11 files changed, 444 insertions(+), 27 deletions(-) create mode 100644 src/meta-srv/src/procedure/region_migration/flush_leader_region.rs diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 5e00437332..937329232e 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -217,7 +217,9 @@ pub enum Instruction { /// Invalidates batch cache. InvalidateCaches(Vec), /// Flushes regions. - FlushRegion(FlushRegions), + FlushRegions(FlushRegions), + /// Flushes a single region. + FlushRegion(RegionId), } /// The reply of [UpgradeRegion]. @@ -248,6 +250,7 @@ pub enum InstructionReply { CloseRegion(SimpleReply), UpgradeRegion(UpgradeRegionReply), DowngradeRegion(DowngradeRegionReply), + FlushRegion(SimpleReply), } impl Display for InstructionReply { @@ -259,6 +262,7 @@ impl Display for InstructionReply { Self::DowngradeRegion(reply) => { write!(f, "InstructionReply::DowngradeRegion({})", reply) } + Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply), } } } diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 17950847ed..1aff1b7f47 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -39,6 +39,7 @@ pub struct RegionHeartbeatResponseHandler { region_server: RegionServer, catchup_tasks: TaskTracker<()>, downgrade_tasks: TaskTracker<()>, + flush_tasks: TaskTracker<()>, } /// Handler of the instruction. @@ -50,6 +51,7 @@ pub struct HandlerContext { region_server: RegionServer, catchup_tasks: TaskTracker<()>, downgrade_tasks: TaskTracker<()>, + flush_tasks: TaskTracker<()>, } impl HandlerContext { @@ -63,6 +65,7 @@ impl HandlerContext { region_server, catchup_tasks: TaskTracker::new(), downgrade_tasks: TaskTracker::new(), + flush_tasks: TaskTracker::new(), } } } @@ -74,6 +77,7 @@ impl RegionHeartbeatResponseHandler { region_server, catchup_tasks: TaskTracker::new(), downgrade_tasks: TaskTracker::new(), + flush_tasks: TaskTracker::new(), } } @@ -95,8 +99,11 @@ impl RegionHeartbeatResponseHandler { handler_context.handle_upgrade_region_instruction(upgrade_region) })), Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), - Instruction::FlushRegion(flush_regions) => Ok(Box::new(move |handler_context| { - handler_context.handle_flush_region_instruction(flush_regions) + Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| { + handler_context.handle_flush_regions_instruction(flush_regions) + })), + Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| { + handler_context.handle_flush_region_instruction(flush_region) })), } } @@ -111,6 +118,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { | Some((_, Instruction::CloseRegion { .. })) | Some((_, Instruction::DowngradeRegion { .. })) | Some((_, Instruction::UpgradeRegion { .. })) + | Some((_, Instruction::FlushRegion { .. })) ) } @@ -124,12 +132,14 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { let region_server = self.region_server.clone(); let catchup_tasks = self.catchup_tasks.clone(); let downgrade_tasks = self.downgrade_tasks.clone(); + let flush_tasks = self.flush_tasks.clone(); let handler = Self::build_handler(instruction)?; let _handle = common_runtime::spawn_global(async move { let reply = handler(HandlerContext { region_server, catchup_tasks, downgrade_tasks, + flush_tasks, }) .await; diff --git a/src/datanode/src/heartbeat/handler/flush_region.rs b/src/datanode/src/heartbeat/handler/flush_region.rs index 04feca27a2..20867d645c 100644 --- a/src/datanode/src/heartbeat/handler/flush_region.rs +++ b/src/datanode/src/heartbeat/handler/flush_region.rs @@ -12,16 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::instruction::{FlushRegions, InstructionReply}; +use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply}; use common_telemetry::warn; use futures_util::future::BoxFuture; use store_api::region_request::{RegionFlushRequest, RegionRequest}; +use store_api::storage::RegionId; use crate::error; use crate::heartbeat::handler::HandlerContext; impl HandlerContext { - pub(crate) fn handle_flush_region_instruction( + pub(crate) fn handle_flush_regions_instruction( self, flush_regions: FlushRegions, ) -> BoxFuture<'static, Option> { @@ -49,6 +50,59 @@ impl HandlerContext { None }) } + + pub(crate) fn handle_flush_region_instruction( + self, + region_id: RegionId, + ) -> BoxFuture<'static, Option> { + Box::pin(async move { + let Some(writable) = self.region_server.is_region_leader(region_id) else { + return Some(InstructionReply::FlushRegion(SimpleReply { + result: false, + error: Some("Region is not leader".to_string()), + })); + }; + + if !writable { + return Some(InstructionReply::FlushRegion(SimpleReply { + result: false, + error: Some("Region is not writable".to_string()), + })); + } + + let region_server_moved = self.region_server.clone(); + let register_result = self + .flush_tasks + .try_register( + region_id, + Box::pin(async move { + let request = RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }); + region_server_moved + .handle_request(region_id, request) + .await?; + Ok(()) + }), + ) + .await; + if register_result.is_busy() { + warn!("Another flush task is running for the region: {region_id}"); + } + let mut watcher = register_result.into_watcher(); + let result = self.flush_tasks.wait_until_finish(&mut watcher).await; + match result { + Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply { + result: true, + error: None, + })), + Err(err) => Some(InstructionReply::FlushRegion(SimpleReply { + result: false, + error: Some(format!("{err:?}")), + })), + } + }) + } } #[cfg(test)] @@ -84,7 +138,7 @@ mod tests { let reply = handler_context .clone() - .handle_flush_region_instruction(FlushRegions { + .handle_flush_regions_instruction(FlushRegions { region_ids: region_ids.clone(), }) .await; @@ -94,7 +148,7 @@ mod tests { flushed_region_ids.write().unwrap().clear(); let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::>(); let reply = handler_context - .handle_flush_region_instruction(FlushRegions { + .handle_flush_regions_instruction(FlushRegions { region_ids: not_found_region_ids.clone(), }) .await; diff --git a/src/datanode/src/heartbeat/task_tracker.rs b/src/datanode/src/heartbeat/task_tracker.rs index 977054661c..1fff51f897 100644 --- a/src/datanode/src/heartbeat/task_tracker.rs +++ b/src/datanode/src/heartbeat/task_tracker.rs @@ -144,6 +144,11 @@ impl TaskTracker { } } + /// Waits for a [RegisterResult] and returns a [WaitResult]. + pub(crate) async fn wait_until_finish(&self, watcher: &mut TaskWatcher) -> Result { + wait(watcher).await + } + /// Tries to register a new async task, returns [RegisterResult::Busy] if previous task is running. pub(crate) async fn try_register( &self, diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 43b444d3b1..a83bf5dfab 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -14,6 +14,7 @@ pub(crate) mod close_downgraded_region; pub(crate) mod downgrade_leader_region; +pub(crate) mod flush_leader_region; pub(crate) mod manager; pub(crate) mod migration_abort; pub(crate) mod migration_end; @@ -111,6 +112,8 @@ impl PersistentContext { pub struct Metrics { /// Elapsed time of downgrading region and upgrading region. operations_elapsed: Duration, + /// Elapsed time of flushing leader region. + flush_leader_region_elapsed: Duration, /// Elapsed time of downgrading leader region. downgrade_leader_region_elapsed: Duration, /// Elapsed time of open candidate region. @@ -121,10 +124,15 @@ pub struct Metrics { impl Display for Metrics { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let total = self.flush_leader_region_elapsed + + self.downgrade_leader_region_elapsed + + self.open_candidate_region_elapsed + + self.upgrade_candidate_region_elapsed; write!( f, - "operations_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}", - self.operations_elapsed, + "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}", + total, + self.flush_leader_region_elapsed, self.downgrade_leader_region_elapsed, self.open_candidate_region_elapsed, self.upgrade_candidate_region_elapsed @@ -138,6 +146,11 @@ impl Metrics { self.operations_elapsed += elapsed; } + /// Updates the elapsed time of flushing leader region. + pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) { + self.flush_leader_region_elapsed += elapsed; + } + /// Updates the elapsed time of downgrading leader region. pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) { self.downgrade_leader_region_elapsed += elapsed; @@ -156,10 +169,18 @@ impl Metrics { impl Drop for Metrics { fn drop(&mut self) { - if !self.operations_elapsed.is_zero() { + let total = self.flush_leader_region_elapsed + + self.downgrade_leader_region_elapsed + + self.open_candidate_region_elapsed + + self.upgrade_candidate_region_elapsed; + METRIC_META_REGION_MIGRATION_STAGE_ELAPSED + .with_label_values(&["total"]) + .observe(total.as_secs_f64()); + + if !self.flush_leader_region_elapsed.is_zero() { METRIC_META_REGION_MIGRATION_STAGE_ELAPSED - .with_label_values(&["operations"]) - .observe(self.operations_elapsed.as_secs_f64()); + .with_label_values(&["flush_leader_region"]) + .observe(self.flush_leader_region_elapsed.as_secs_f64()); } if !self.downgrade_leader_region_elapsed.is_zero() { @@ -320,6 +341,13 @@ impl Context { .update_operations_elapsed(instant.elapsed()); } + /// Updates the elapsed time of flushing leader region. + pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) { + self.volatile_ctx + .metrics + .update_flush_leader_region_elapsed(instant.elapsed()); + } + /// Updates the elapsed time of downgrading leader region. pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) { self.volatile_ctx @@ -700,7 +728,8 @@ mod tests { use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; use crate::procedure::region_migration::test_util::*; use crate::procedure::test_util::{ - new_downgrade_region_reply, new_open_region_reply, new_upgrade_region_reply, + new_downgrade_region_reply, new_flush_region_reply, new_open_region_reply, + new_upgrade_region_reply, }; use crate::service::mailbox::Channel; @@ -1208,6 +1237,15 @@ mod tests { to_peer_id, Arc::new(|id| Ok(new_open_region_reply(id, true, None))), )), + Assertion::simple(assert_flush_leader_region, assert_no_persist), + ), + // Flush Leader Region + Step::next( + "Should be the flush leader region", + Some(mock_datanode_reply( + from_peer_id, + Arc::new(|id| Ok(new_flush_region_reply(id, true, None))), + )), Assertion::simple(assert_update_metadata_downgrade, assert_no_persist), ), // UpdateMetadata::Downgrade diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 93481adc54..a298c5541d 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -170,7 +170,7 @@ impl DowngradeLeaderRegion { if error.is_some() { return error::RetryLaterSnafu { reason: format!( - "Failed to downgrade the region {} on Datanode {:?}, error: {:?}, elapsed: {:?}", + "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}", region_id, leader, error, now.elapsed() ), } @@ -179,13 +179,14 @@ impl DowngradeLeaderRegion { if !exists { warn!( - "Trying to downgrade the region {} on Datanode {}, but region doesn't exist!, elapsed: {:?}", + "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}", region_id, leader, now.elapsed() ); } else { info!( - "Region {} leader is downgraded, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}", + "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}", region_id, + leader, last_entry_id, metadata_last_entry_id, now.elapsed() diff --git a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs new file mode 100644 index 0000000000..aeeeb836f6 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs @@ -0,0 +1,285 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use api::v1::meta::MailboxMessage; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_procedure::Status; +use common_telemetry::{info, warn}; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use tokio::time::Instant; + +use crate::error::{self, Error, Result}; +use crate::handler::HeartbeatMailbox; +use crate::procedure::region_migration::update_metadata::UpdateMetadata; +use crate::procedure::region_migration::{Context, State}; +use crate::service::mailbox::Channel; + +/// Flushes the leader region before downgrading it. +/// +/// This can minimize the time window where the region is not writable. +#[derive(Debug, Serialize, Deserialize)] +pub struct PreFlushRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for PreFlushRegion { + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + let timer = Instant::now(); + self.flush_region(ctx).await?; + ctx.update_flush_leader_region_elapsed(timer); + // We intentionally don't update `operations_elapsed` here to prevent + // the `next_operation_timeout` from being reduced by the flush operation. + // This ensures sufficient time for subsequent critical operations. + + Ok(( + Box::new(UpdateMetadata::Downgrade), + Status::executing(false), + )) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl PreFlushRegion { + /// Builds flush leader region instruction. + fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction { + let pc = &ctx.persistent_ctx; + let region_id = pc.region_id; + Instruction::FlushRegion(region_id) + } + + /// Tries to flush a leader region. + /// + /// Ignore: + /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable. + /// - [PushMessage](error::Error::PushMessage), The receiver is dropped. + /// - Failed to flush region on the Datanode. + /// + /// Abort: + /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout. + /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible). + /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply). + /// - [ExceededDeadline](error::Error::ExceededDeadline) + /// - Invalid JSON. + async fn flush_region(&self, ctx: &mut Context) -> Result<()> { + let operation_timeout = + ctx.next_operation_timeout() + .context(error::ExceededDeadlineSnafu { + operation: "Flush leader region", + })?; + let flush_instruction = self.build_flush_leader_region_instruction(ctx); + let region_id = ctx.persistent_ctx.region_id; + let leader = &ctx.persistent_ctx.from_peer; + + let msg = MailboxMessage::json_message( + &format!("Flush leader region: {}", region_id), + &format!("Metasrv@{}", ctx.server_addr()), + &format!("Datanode-{}@{}", leader.id, leader.addr), + common_time::util::current_time_millis(), + &flush_instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: flush_instruction.to_string(), + })?; + + let ch = Channel::Datanode(leader.id); + let now = Instant::now(); + let result = ctx.mailbox.send(&ch, msg, operation_timeout).await; + + match result { + Ok(receiver) => match receiver.await? { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received flush leader region reply: {:?}, region: {}, elapsed: {:?}", + reply, + region_id, + now.elapsed() + ); + + let InstructionReply::FlushRegion(SimpleReply { result, error }) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect flush region reply", + } + .fail(); + }; + + if error.is_some() { + warn!( + "Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.", + region_id, leader, error + ); + } else if result { + info!( + "The flush leader region {} on datanode {:?} is successful, elapsed: {:?}", + region_id, + leader, + now.elapsed() + ); + } + + Ok(()) + } + Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu { + operation: "Flush leader region", + } + .fail(), + Err(err) => Err(err), + }, + Err(Error::PusherNotFound { .. }) => { + warn!( + "Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.", + region_id, + leader + ); + Ok(()) + } + Err(err) => Err(err), + } + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use store_api::storage::RegionId; + + use super::*; + use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::{ContextFactory, PersistentContext}; + use crate::procedure::test_util::{ + new_close_region_reply, new_flush_region_reply, send_mock_reply, + }; + + fn new_persistent_context() -> PersistentContext { + test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) + } + + #[tokio::test] + async fn test_datanode_is_unreachable() { + let state = PreFlushRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + // Should be ok, if leader region is unreachable. it will skip flush operation. + state.flush_region(&mut ctx).await.unwrap(); + } + + #[tokio::test] + async fn test_unexpected_instruction_reply() { + common_telemetry::init_default_ut_logging(); + let state = PreFlushRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) + .await; + // Sends an incorrect reply. + send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id))); + let err = state.flush_region(&mut ctx).await.unwrap_err(); + assert_matches!(err, Error::UnexpectedInstructionReply { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_instruction_exceeded_deadline() { + let state = PreFlushRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) + .await; + // Sends an timeout error. + send_mock_reply(mailbox, rx, |id| { + Err(error::MailboxTimeoutSnafu { id }.build()) + }); + + let err = state.flush_region(&mut ctx).await.unwrap_err(); + assert_matches!(err, Error::ExceededDeadline { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_flush_region_failed() { + common_telemetry::init_default_ut_logging(); + let state = PreFlushRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) + .await; + send_mock_reply(mailbox, rx, |id| { + Ok(new_flush_region_reply( + id, + false, + Some("test mocked".to_string()), + )) + }); + // Should be ok, if flush leader region failed. it will skip flush operation. + state.flush_region(&mut ctx).await.unwrap(); + } + + #[tokio::test] + async fn test_next_update_metadata_downgrade_state() { + common_telemetry::init_default_ut_logging(); + let mut state = PreFlushRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) + .await; + send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None))); + let (next, _) = state.next(&mut ctx).await.unwrap(); + + let update_metadata = next.as_any().downcast_ref::().unwrap(); + assert_matches!(update_metadata, UpdateMetadata::Downgrade); + } +} diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 6d1c81d3ed..be466467b4 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -28,7 +28,7 @@ use tokio::time::Instant; use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; -use crate::procedure::region_migration::update_metadata::UpdateMetadata; +use crate::procedure::region_migration::flush_leader_region::PreFlushRegion; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; @@ -47,10 +47,7 @@ impl State for OpenCandidateRegion { self.open_candidate_region(ctx, instruction).await?; ctx.update_open_candidate_region_elapsed(now); - Ok(( - Box::new(UpdateMetadata::Downgrade), - Status::executing(false), - )) + Ok((Box::new(PreFlushRegion), Status::executing(false))) } fn as_any(&self) -> &dyn Any { @@ -399,7 +396,7 @@ mod tests { } #[tokio::test] - async fn test_next_update_metadata_downgrade_state() { + async fn test_next_flush_leader_region_state() { let mut state = Box::new(OpenCandidateRegion); // from_peer: 1 // to_peer: 2 @@ -445,8 +442,7 @@ mod tests { (to_peer_id, region_id) ); - let update_metadata = next.as_any().downcast_ref::().unwrap(); - - assert_matches!(update_metadata, UpdateMetadata::Downgrade); + let flush_leader_region = next.as_any().downcast_ref::().unwrap(); + assert_matches!(flush_leader_region, PreFlushRegion); } } diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index c229f1934a..810c1cbb33 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -44,6 +44,7 @@ use crate::error::{self, Error, Result}; use crate::metasrv::MetasrvInfo; use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; +use crate::procedure::region_migration::flush_leader_region::PreFlushRegion; use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker; use crate::procedure::region_migration::migration_abort::RegionMigrationAbort; use crate::procedure::region_migration::migration_end::RegionMigrationEnd; @@ -415,6 +416,11 @@ pub(crate) fn assert_open_candidate_region(next: &dyn State) { let _ = next.as_any().downcast_ref::().unwrap(); } +/// Asserts the [State] should be [FlushLeaderRegion]. +pub(crate) fn assert_flush_leader_region(next: &dyn State) { + let _ = next.as_any().downcast_ref::().unwrap(); +} + /// Asserts the [State] should be [UpdateMetadata::Downgrade]. pub(crate) fn assert_update_metadata_downgrade(next: &dyn State) { let state = next.as_any().downcast_ref::().unwrap(); diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index ca6da59f2a..1189f1a02b 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -101,6 +101,24 @@ pub fn new_open_region_reply(id: u64, result: bool, error: Option) -> Ma } } +/// Generates a [InstructionReply::FlushRegion] reply. +pub fn new_flush_region_reply(id: u64, result: bool, error: Option) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::FlushRegion(SimpleReply { + result, + error, + })) + .unwrap(), + )), + } +} + /// Generates a [InstructionReply::CloseRegion] reply. pub fn new_close_region_reply(id: u64) -> MailboxMessage { MailboxMessage { diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs index e9b6403942..19258f1636 100644 --- a/src/meta-srv/src/procedure/wal_prune.rs +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -181,7 +181,7 @@ impl WalPruneProcedure { let peer_and_instructions = peer_region_ids_map .into_iter() .map(|(peer, region_ids)| { - let flush_instruction = Instruction::FlushRegion(FlushRegions { region_ids }); + let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids }); (peer.clone(), flush_instruction) }) .collect(); @@ -536,7 +536,7 @@ mod tests { let msg = resp.mailbox_message.unwrap(); let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap(); let mut flush_requested_region_ids = match flush_instruction { - Instruction::FlushRegion(FlushRegions { region_ids, .. }) => region_ids, + Instruction::FlushRegions(FlushRegions { region_ids, .. }) => region_ids, _ => unreachable!(), }; let sorted_region_ids = region_ids