feat: flush leader region before downgrading (#5995)

* feat: flush leader region before downgrading

* test: add unit tests

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-04-29 11:28:00 +08:00
committed by GitHub
parent 789f585a7f
commit a3ae2d7b52
11 changed files with 444 additions and 27 deletions

View File

@@ -217,7 +217,9 @@ pub enum Instruction {
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegion(FlushRegions),
FlushRegions(FlushRegions),
/// Flushes a single region.
FlushRegion(RegionId),
}
/// The reply of [UpgradeRegion].
@@ -248,6 +250,7 @@ pub enum InstructionReply {
CloseRegion(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
FlushRegion(SimpleReply),
}
impl Display for InstructionReply {
@@ -259,6 +262,7 @@ impl Display for InstructionReply {
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
}
}
}

View File

@@ -39,6 +39,7 @@ pub struct RegionHeartbeatResponseHandler {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
}
/// Handler of the instruction.
@@ -50,6 +51,7 @@ pub struct HandlerContext {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
}
impl HandlerContext {
@@ -63,6 +65,7 @@ impl HandlerContext {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
}
}
}
@@ -74,6 +77,7 @@ impl RegionHeartbeatResponseHandler {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
}
}
@@ -95,8 +99,11 @@ impl RegionHeartbeatResponseHandler {
handler_context.handle_upgrade_region_instruction(upgrade_region)
})),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::FlushRegion(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_regions)
Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_regions_instruction(flush_regions)
})),
Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_region)
})),
}
}
@@ -111,6 +118,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
| Some((_, Instruction::CloseRegion { .. }))
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
| Some((_, Instruction::FlushRegion { .. }))
)
}
@@ -124,12 +132,14 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let region_server = self.region_server.clone();
let catchup_tasks = self.catchup_tasks.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let flush_tasks = self.flush_tasks.clone();
let handler = Self::build_handler(instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler(HandlerContext {
region_server,
catchup_tasks,
downgrade_tasks,
flush_tasks,
})
.await;

View File

@@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{FlushRegions, InstructionReply};
use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
use common_telemetry::warn;
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::error;
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
pub(crate) fn handle_flush_region_instruction(
pub(crate) fn handle_flush_regions_instruction(
self,
flush_regions: FlushRegions,
) -> BoxFuture<'static, Option<InstructionReply>> {
@@ -49,6 +50,59 @@ impl HandlerContext {
None
})
}
pub(crate) fn handle_flush_region_instruction(
self,
region_id: RegionId,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not leader".to_string()),
}));
};
if !writable {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not writable".to_string()),
}));
}
let region_server_moved = self.region_server.clone();
let register_result = self
.flush_tasks
.try_register(
region_id,
Box::pin(async move {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
region_server_moved
.handle_request(region_id, request)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another flush task is running for the region: {region_id}");
}
let mut watcher = register_result.into_watcher();
let result = self.flush_tasks.wait_until_finish(&mut watcher).await;
match result {
Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply {
result: true,
error: None,
})),
Err(err) => Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some(format!("{err:?}")),
})),
}
})
}
}
#[cfg(test)]
@@ -84,7 +138,7 @@ mod tests {
let reply = handler_context
.clone()
.handle_flush_region_instruction(FlushRegions {
.handle_flush_regions_instruction(FlushRegions {
region_ids: region_ids.clone(),
})
.await;
@@ -94,7 +148,7 @@ mod tests {
flushed_region_ids.write().unwrap().clear();
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let reply = handler_context
.handle_flush_region_instruction(FlushRegions {
.handle_flush_regions_instruction(FlushRegions {
region_ids: not_found_region_ids.clone(),
})
.await;

View File

@@ -144,6 +144,11 @@ impl<T: Send + Sync + Clone + 'static> TaskTracker<T> {
}
}
/// Waits for a [RegisterResult] and returns a [WaitResult].
pub(crate) async fn wait_until_finish(&self, watcher: &mut TaskWatcher<T>) -> Result<T> {
wait(watcher).await
}
/// Tries to register a new async task, returns [RegisterResult::Busy] if previous task is running.
pub(crate) async fn try_register(
&self,

View File

@@ -14,6 +14,7 @@
pub(crate) mod close_downgraded_region;
pub(crate) mod downgrade_leader_region;
pub(crate) mod flush_leader_region;
pub(crate) mod manager;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
@@ -111,6 +112,8 @@ impl PersistentContext {
pub struct Metrics {
/// Elapsed time of downgrading region and upgrading region.
operations_elapsed: Duration,
/// Elapsed time of flushing leader region.
flush_leader_region_elapsed: Duration,
/// Elapsed time of downgrading leader region.
downgrade_leader_region_elapsed: Duration,
/// Elapsed time of open candidate region.
@@ -121,10 +124,15 @@ pub struct Metrics {
impl Display for Metrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let total = self.flush_leader_region_elapsed
+ self.downgrade_leader_region_elapsed
+ self.open_candidate_region_elapsed
+ self.upgrade_candidate_region_elapsed;
write!(
f,
"operations_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
self.operations_elapsed,
"total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
total,
self.flush_leader_region_elapsed,
self.downgrade_leader_region_elapsed,
self.open_candidate_region_elapsed,
self.upgrade_candidate_region_elapsed
@@ -138,6 +146,11 @@ impl Metrics {
self.operations_elapsed += elapsed;
}
/// Updates the elapsed time of flushing leader region.
pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
self.flush_leader_region_elapsed += elapsed;
}
/// Updates the elapsed time of downgrading leader region.
pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
self.downgrade_leader_region_elapsed += elapsed;
@@ -156,10 +169,18 @@ impl Metrics {
impl Drop for Metrics {
fn drop(&mut self) {
if !self.operations_elapsed.is_zero() {
let total = self.flush_leader_region_elapsed
+ self.downgrade_leader_region_elapsed
+ self.open_candidate_region_elapsed
+ self.upgrade_candidate_region_elapsed;
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(total.as_secs_f64());
if !self.flush_leader_region_elapsed.is_zero() {
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
.with_label_values(&["operations"])
.observe(self.operations_elapsed.as_secs_f64());
.with_label_values(&["flush_leader_region"])
.observe(self.flush_leader_region_elapsed.as_secs_f64());
}
if !self.downgrade_leader_region_elapsed.is_zero() {
@@ -320,6 +341,13 @@ impl Context {
.update_operations_elapsed(instant.elapsed());
}
/// Updates the elapsed time of flushing leader region.
pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
self.volatile_ctx
.metrics
.update_flush_leader_region_elapsed(instant.elapsed());
}
/// Updates the elapsed time of downgrading leader region.
pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
self.volatile_ctx
@@ -700,7 +728,8 @@ mod tests {
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::test_util::*;
use crate::procedure::test_util::{
new_downgrade_region_reply, new_open_region_reply, new_upgrade_region_reply,
new_downgrade_region_reply, new_flush_region_reply, new_open_region_reply,
new_upgrade_region_reply,
};
use crate::service::mailbox::Channel;
@@ -1208,6 +1237,15 @@ mod tests {
to_peer_id,
Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
)),
Assertion::simple(assert_flush_leader_region, assert_no_persist),
),
// Flush Leader Region
Step::next(
"Should be the flush leader region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(|id| Ok(new_flush_region_reply(id, true, None))),
)),
Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
),
// UpdateMetadata::Downgrade

View File

@@ -170,7 +170,7 @@ impl DowngradeLeaderRegion {
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to downgrade the region {} on Datanode {:?}, error: {:?}, elapsed: {:?}",
"Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id, leader, error, now.elapsed()
),
}
@@ -179,13 +179,14 @@ impl DowngradeLeaderRegion {
if !exists {
warn!(
"Trying to downgrade the region {} on Datanode {}, but region doesn't exist!, elapsed: {:?}",
"Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
region_id, leader, now.elapsed()
);
} else {
info!(
"Region {} leader is downgraded, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
"Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
region_id,
leader,
last_entry_id,
metadata_last_entry_id,
now.elapsed()

View File

@@ -0,0 +1,285 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio::time::Instant;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
/// Flushes the leader region before downgrading it.
///
/// This can minimize the time window where the region is not writable.
#[derive(Debug, Serialize, Deserialize)]
pub struct PreFlushRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for PreFlushRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let timer = Instant::now();
self.flush_region(ctx).await?;
ctx.update_flush_leader_region_elapsed(timer);
// We intentionally don't update `operations_elapsed` here to prevent
// the `next_operation_timeout` from being reduced by the flush operation.
// This ensures sufficient time for subsequent critical operations.
Ok((
Box::new(UpdateMetadata::Downgrade),
Status::executing(false),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl PreFlushRegion {
/// Builds flush leader region instruction.
fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
Instruction::FlushRegion(region_id)
}
/// Tries to flush a leader region.
///
/// Ignore:
/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
/// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
/// - Failed to flush region on the Datanode.
///
/// Abort:
/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
/// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
/// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON.
async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Flush leader region",
})?;
let flush_instruction = self.build_flush_leader_region_instruction(ctx);
let region_id = ctx.persistent_ctx.region_id;
let leader = &ctx.persistent_ctx.from_peer;
let msg = MailboxMessage::json_message(
&format!("Flush leader region: {}", region_id),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", leader.id, leader.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(leader.id);
let now = Instant::now();
let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
match result {
Ok(receiver) => match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
reply,
region_id,
now.elapsed()
);
let InstructionReply::FlushRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
};
if error.is_some() {
warn!(
"Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
region_id, leader, error
);
} else if result {
info!(
"The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
region_id,
leader,
now.elapsed()
);
}
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush leader region",
}
.fail(),
Err(err) => Err(err),
},
Err(Error::PusherNotFound { .. }) => {
warn!(
"Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_id,
leader
);
Ok(())
}
Err(err) => Err(err),
}
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use store_api::storage::RegionId;
use super::*;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_flush_region_reply, send_mock_reply,
};
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}
#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
// Should be ok, if leader region is unreachable. it will skip flush operation.
state.flush_region(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_unexpected_instruction_reply() {
common_telemetry::init_default_ut_logging();
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
// Sends an incorrect reply.
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
let err = state.flush_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_instruction_exceeded_deadline() {
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
// Sends an timeout error.
send_mock_reply(mailbox, rx, |id| {
Err(error::MailboxTimeoutSnafu { id }.build())
});
let err = state.flush_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::ExceededDeadline { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_flush_region_failed() {
common_telemetry::init_default_ut_logging();
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| {
Ok(new_flush_region_reply(
id,
false,
Some("test mocked".to_string()),
))
});
// Should be ok, if flush leader region failed. it will skip flush operation.
state.flush_region(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_next_update_metadata_downgrade_state() {
common_telemetry::init_default_ut_logging();
let mut state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None)));
let (next, _) = state.next(&mut ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
}
}

View File

@@ -28,7 +28,7 @@ use tokio::time::Instant;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
@@ -47,10 +47,7 @@ impl State for OpenCandidateRegion {
self.open_candidate_region(ctx, instruction).await?;
ctx.update_open_candidate_region_elapsed(now);
Ok((
Box::new(UpdateMetadata::Downgrade),
Status::executing(false),
))
Ok((Box::new(PreFlushRegion), Status::executing(false)))
}
fn as_any(&self) -> &dyn Any {
@@ -399,7 +396,7 @@ mod tests {
}
#[tokio::test]
async fn test_next_update_metadata_downgrade_state() {
async fn test_next_flush_leader_region_state() {
let mut state = Box::new(OpenCandidateRegion);
// from_peer: 1
// to_peer: 2
@@ -445,8 +442,7 @@ mod tests {
(to_peer_id, region_id)
);
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
assert_matches!(flush_leader_region, PreFlushRegion);
}
}

View File

@@ -44,6 +44,7 @@ use crate::error::{self, Error, Result};
use crate::metasrv::MetasrvInfo;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
@@ -415,6 +416,11 @@ pub(crate) fn assert_open_candidate_region(next: &dyn State) {
let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}
/// Asserts the [State] should be [FlushLeaderRegion].
pub(crate) fn assert_flush_leader_region(next: &dyn State) {
let _ = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
}
/// Asserts the [State] should be [UpdateMetadata::Downgrade].
pub(crate) fn assert_update_metadata_downgrade(next: &dyn State) {
let state = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();

View File

@@ -101,6 +101,24 @@ pub fn new_open_region_reply(id: u64, result: bool, error: Option<String>) -> Ma
}
}
/// Generates a [InstructionReply::FlushRegion] reply.
pub fn new_flush_region_reply(id: u64, result: bool, error: Option<String>) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::FlushRegion(SimpleReply {
result,
error,
}))
.unwrap(),
)),
}
}
/// Generates a [InstructionReply::CloseRegion] reply.
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {

View File

@@ -181,7 +181,7 @@ impl WalPruneProcedure {
let peer_and_instructions = peer_region_ids_map
.into_iter()
.map(|(peer, region_ids)| {
let flush_instruction = Instruction::FlushRegion(FlushRegions { region_ids });
let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids });
(peer.clone(), flush_instruction)
})
.collect();
@@ -536,7 +536,7 @@ mod tests {
let msg = resp.mailbox_message.unwrap();
let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
let mut flush_requested_region_ids = match flush_instruction {
Instruction::FlushRegion(FlushRegions { region_ids, .. }) => region_ids,
Instruction::FlushRegions(FlushRegions { region_ids, .. }) => region_ids,
_ => unreachable!(),
};
let sorted_region_ids = region_ids