feat: unify FlushRegions instructions (#6819)

* feat: add FlushRegionsV2 instruction with unified semantics

- Add FlushRegionsV2 struct supporting both single and batch operations
- Preserve original FlushRegion(RegionId) API for backward compatibility
- Support configurable FlushStrategy (Sync/Async) and FlushErrorStrategy (FailFast/TryAll)
- Add detailed per-region error reporting in FlushRegionReply
- Update datanode handlers to support both legacy and enhanced flush instructions
- Maintain zero breaking changes through automatic conversion of legacy formats

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>

* chore: run make fmt

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>

* Apply suggestions from code review

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>

* refactor: extract shared perform_region_flush fn

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>

* refactor: use consistent error type across similar methods

see gh copilot suggestion: https://github.com/GreptimeTeam/greptimedb/pull/6819#discussion_r2299603698

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>

* chore: make fmt

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>

* refactor: consolidate FlushRegion instructions

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>

---------

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>
This commit is contained in:
Alex Araujo
2025-09-03 20:18:54 -05:00
committed by GitHub
parent 756856799f
commit 2c019965be
7 changed files with 652 additions and 114 deletions

View File

@@ -81,12 +81,94 @@ pub struct SimpleReply {
pub error: Option<String>,
}
/// Reply for flush region operations with support for batch results.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct FlushRegionReply {
/// Results for each region that was attempted to be flushed.
/// For single region flushes, this will contain one result.
/// For batch flushes, this contains results for all attempted regions.
pub results: Vec<(RegionId, Result<(), String>)>,
/// Overall success: true if all regions were flushed successfully.
pub overall_success: bool,
}
impl FlushRegionReply {
/// Create a successful single region reply.
pub fn success_single(region_id: RegionId) -> Self {
Self {
results: vec![(region_id, Ok(()))],
overall_success: true,
}
}
/// Create a failed single region reply.
pub fn error_single(region_id: RegionId, error: String) -> Self {
Self {
results: vec![(region_id, Err(error))],
overall_success: false,
}
}
/// Create a batch reply from individual results.
pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self {
let overall_success = results.iter().all(|(_, result)| result.is_ok());
Self {
results,
overall_success,
}
}
/// Convert to SimpleReply for backward compatibility.
pub fn to_simple_reply(&self) -> SimpleReply {
if self.overall_success {
SimpleReply {
result: true,
error: None,
}
} else {
let errors: Vec<String> = self
.results
.iter()
.filter_map(|(region_id, result)| {
result
.as_ref()
.err()
.map(|err| format!("{}: {}", region_id, err))
})
.collect();
SimpleReply {
result: false,
error: Some(errors.join("; ")),
}
}
}
}
impl Display for SimpleReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "(result={}, error={:?})", self.result, self.error)
}
}
impl Display for FlushRegionReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let results_str = self
.results
.iter()
.map(|(region_id, result)| match result {
Ok(()) => format!("{}:OK", region_id),
Err(err) => format!("{}:ERR({})", region_id, err),
})
.collect::<Vec<_>>()
.join(", ");
write!(
f,
"(overall_success={}, results=[{}])",
self.overall_success, results_str
)
}
}
impl Display for OpenRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
@@ -219,10 +301,107 @@ pub struct DropFlow {
pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
}
/// Flushes a batch of regions.
/// Strategy for executing flush operations.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum FlushStrategy {
/// Synchronous operation that waits for completion and expects a reply
Sync,
/// Asynchronous hint operation (fire-and-forget, no reply expected)
Async,
}
/// Error handling strategy for batch flush operations.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum FlushErrorStrategy {
/// Abort on first error (fail-fast)
FailFast,
/// Attempt to flush all regions and collect all errors
TryAll,
}
impl Default for FlushStrategy {
fn default() -> Self {
Self::Sync
}
}
impl Default for FlushErrorStrategy {
fn default() -> Self {
Self::FailFast
}
}
/// Unified flush instruction supporting both single and batch operations
/// with configurable execution strategies and error handling.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FlushRegions {
/// List of region IDs to flush. Can contain a single region or multiple regions.
pub region_ids: Vec<RegionId>,
/// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint).
#[serde(default)]
pub strategy: FlushStrategy,
/// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
#[serde(default)]
pub error_strategy: FlushErrorStrategy,
}
impl FlushRegions {
/// Create synchronous single-region flush
pub fn sync_single(region_id: RegionId) -> Self {
Self {
region_ids: vec![region_id],
strategy: FlushStrategy::Sync,
error_strategy: FlushErrorStrategy::FailFast,
}
}
/// Create asynchronous batch flush (fire-and-forget)
pub fn async_batch(region_ids: Vec<RegionId>) -> Self {
Self {
region_ids,
strategy: FlushStrategy::Async,
error_strategy: FlushErrorStrategy::TryAll,
}
}
/// Create synchronous batch flush with error strategy
pub fn sync_batch(region_ids: Vec<RegionId>, error_strategy: FlushErrorStrategy) -> Self {
Self {
region_ids,
strategy: FlushStrategy::Sync,
error_strategy,
}
}
/// Check if this is a single region flush.
pub fn is_single_region(&self) -> bool {
self.region_ids.len() == 1
}
/// Get the single region ID if this is a single region flush.
pub fn single_region_id(&self) -> Option<RegionId> {
if self.is_single_region() {
self.region_ids.first().copied()
} else {
None
}
}
/// Check if this is a hint (asynchronous) operation.
pub fn is_hint(&self) -> bool {
matches!(self.strategy, FlushStrategy::Async)
}
/// Check if this is a synchronous operation.
pub fn is_sync(&self) -> bool {
matches!(self.strategy, FlushStrategy::Sync)
}
}
impl From<RegionId> for FlushRegions {
fn from(region_id: RegionId) -> Self {
Self::sync_single(region_id)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
@@ -243,8 +422,6 @@ pub enum Instruction {
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegions(FlushRegions),
/// Flushes a single region.
FlushRegion(RegionId),
}
/// The reply of [UpgradeRegion].
@@ -275,7 +452,7 @@ pub enum InstructionReply {
CloseRegion(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
FlushRegion(SimpleReply),
FlushRegions(FlushRegionReply),
}
impl Display for InstructionReply {
@@ -287,7 +464,7 @@ impl Display for InstructionReply {
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
}
}
}
@@ -373,4 +550,136 @@ mod tests {
};
assert_eq!(expected, deserialized);
}
#[test]
fn test_flush_regions_creation() {
let region_id = RegionId::new(1024, 1);
// Single region sync flush
let single_sync = FlushRegions::sync_single(region_id);
assert_eq!(single_sync.region_ids, vec![region_id]);
assert_eq!(single_sync.strategy, FlushStrategy::Sync);
assert!(!single_sync.is_hint());
assert!(single_sync.is_sync());
assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
assert!(single_sync.is_single_region());
assert_eq!(single_sync.single_region_id(), Some(region_id));
// Batch async flush (hint)
let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
let batch_async = FlushRegions::async_batch(region_ids.clone());
assert_eq!(batch_async.region_ids, region_ids);
assert_eq!(batch_async.strategy, FlushStrategy::Async);
assert!(batch_async.is_hint());
assert!(!batch_async.is_sync());
assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
assert!(!batch_async.is_single_region());
assert_eq!(batch_async.single_region_id(), None);
// Batch sync flush
let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
assert_eq!(batch_sync.region_ids, region_ids);
assert_eq!(batch_sync.strategy, FlushStrategy::Sync);
assert!(!batch_sync.is_hint());
assert!(batch_sync.is_sync());
assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
}
#[test]
fn test_flush_regions_conversion() {
let region_id = RegionId::new(1024, 1);
let from_region_id: FlushRegions = region_id.into();
assert_eq!(from_region_id.region_ids, vec![region_id]);
assert_eq!(from_region_id.strategy, FlushStrategy::Sync);
assert!(!from_region_id.is_hint());
assert!(from_region_id.is_sync());
// Test default construction
let flush_regions = FlushRegions {
region_ids: vec![region_id],
strategy: FlushStrategy::Async,
error_strategy: FlushErrorStrategy::TryAll,
};
assert_eq!(flush_regions.region_ids, vec![region_id]);
assert_eq!(flush_regions.strategy, FlushStrategy::Async);
assert!(flush_regions.is_hint());
assert!(!flush_regions.is_sync());
}
#[test]
fn test_flush_region_reply() {
let region_id = RegionId::new(1024, 1);
// Successful single region reply
let success_reply = FlushRegionReply::success_single(region_id);
assert!(success_reply.overall_success);
assert_eq!(success_reply.results.len(), 1);
assert_eq!(success_reply.results[0].0, region_id);
assert!(success_reply.results[0].1.is_ok());
// Failed single region reply
let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string());
assert!(!error_reply.overall_success);
assert_eq!(error_reply.results.len(), 1);
assert_eq!(error_reply.results[0].0, region_id);
assert!(error_reply.results[0].1.is_err());
// Batch reply
let region_id2 = RegionId::new(1024, 2);
let results = vec![
(region_id, Ok(())),
(region_id2, Err("flush failed".to_string())),
];
let batch_reply = FlushRegionReply::from_results(results);
assert!(!batch_reply.overall_success);
assert_eq!(batch_reply.results.len(), 2);
// Conversion to SimpleReply
let simple_reply = batch_reply.to_simple_reply();
assert!(!simple_reply.result);
assert!(simple_reply.error.is_some());
assert!(simple_reply.error.unwrap().contains("flush failed"));
}
#[test]
fn test_serialize_flush_regions_instruction() {
let region_id = RegionId::new(1024, 1);
let flush_regions = FlushRegions::sync_single(region_id);
let instruction = Instruction::FlushRegions(flush_regions.clone());
let serialized = serde_json::to_string(&instruction).unwrap();
let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
match deserialized {
Instruction::FlushRegions(fr) => {
assert_eq!(fr.region_ids, vec![region_id]);
assert_eq!(fr.strategy, FlushStrategy::Sync);
assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
}
_ => panic!("Expected FlushRegions instruction"),
}
}
#[test]
fn test_serialize_flush_regions_batch_instruction() {
let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
let flush_regions =
FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
let instruction = Instruction::FlushRegions(flush_regions);
let serialized = serde_json::to_string(&instruction).unwrap();
let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
match deserialized {
Instruction::FlushRegions(fr) => {
assert_eq!(fr.region_ids, region_ids);
assert_eq!(fr.strategy, FlushStrategy::Sync);
assert!(!fr.is_hint());
assert!(fr.is_sync());
assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
}
_ => panic!("Expected FlushRegions instruction"),
}
}
}

View File

@@ -102,9 +102,6 @@ impl RegionHeartbeatResponseHandler {
Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_regions_instruction(flush_regions)
})),
Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_region)
})),
}
}
}
@@ -118,7 +115,6 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
| Some((_, Instruction::CloseRegion { .. }))
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
| Some((_, Instruction::FlushRegion { .. }))
| Some((_, Instruction::FlushRegions { .. }))
)
}

View File

@@ -14,99 +14,156 @@
use std::time::Instant;
use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
use common_meta::instruction::{
FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
};
use common_telemetry::{debug, warn};
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::error;
use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, UnexpectedSnafu};
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
/// Performs the actual region flush operation.
async fn perform_region_flush(&self, region_id: RegionId) -> Result<(), error::Error> {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
self.region_server
.handle_request(region_id, request)
.await?;
Ok(())
}
/// Handles asynchronous flush hints (fire-and-forget).
async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
let start_time = Instant::now();
for region_id in &region_ids {
let result = self.perform_region_flush(*region_id).await;
match result {
Ok(_) => {}
Err(error::Error::RegionNotFound { .. }) => {
warn!(
"Received a flush region hint from meta, but target region: {} is not found.",
region_id
);
}
Err(err) => {
warn!("Failed to flush region: {}, error: {}", region_id, err);
}
}
}
let elapsed = start_time.elapsed();
debug!(
"Flush regions hint: {:?}, elapsed: {:?}",
region_ids, elapsed
);
}
/// Handles synchronous flush operations with proper error handling and replies.
async fn handle_flush_sync(
&self,
region_ids: Vec<RegionId>,
error_strategy: FlushErrorStrategy,
) -> FlushRegionReply {
let mut results = Vec::with_capacity(region_ids.len());
for region_id in region_ids {
let result = self.flush_single_region_sync(region_id).await;
match &result {
Ok(_) => results.push((region_id, Ok(()))),
Err(err) => {
// Convert error::Error to String for FlushRegionReply compatibility
let error_string = err.to_string();
results.push((region_id, Err(error_string)));
// For fail-fast strategy, abort on first error
if matches!(error_strategy, FlushErrorStrategy::FailFast) {
break;
}
}
}
}
FlushRegionReply::from_results(results)
}
/// Flushes a single region synchronously with proper error handling.
async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<(), error::Error> {
// Check if region is leader and writable
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return Err(RegionNotFoundSnafu { region_id }.build());
};
if !writable {
return Err(RegionNotReadySnafu { region_id }.build());
}
// Register and execute the flush task
let region_server_moved = self.region_server.clone();
let register_result = self
.flush_tasks
.try_register(
region_id,
Box::pin(async move {
region_server_moved
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another flush task is running for the region: {region_id}");
}
let mut watcher = register_result.into_watcher();
match self.flush_tasks.wait_until_finish(&mut watcher).await {
Ok(()) => Ok(()),
Err(err) => Err(UnexpectedSnafu {
violated: format!("Flush task failed: {err:?}"),
}
.build()),
}
}
/// Unified handler for FlushRegions with all flush semantics.
pub(crate) fn handle_flush_regions_instruction(
self,
flush_regions: FlushRegions,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let start_time = Instant::now();
for region_id in &flush_regions.region_ids {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
let result = self.region_server.handle_request(*region_id, request).await;
match result {
Ok(_) => {}
Err(error::Error::RegionNotFound { .. }) => {
warn!(
"Received a flush region instruction from meta, but target region: {} is not found.",
region_id
);
}
Err(err) => {
warn!("Failed to flush region: {}, error: {}", region_id, err);
}
}
}
let elapsed = start_time.elapsed();
debug!(
"Flush regions: {:?}, elapsed: {:?}",
flush_regions.region_ids, elapsed
);
None
})
}
let strategy = flush_regions.strategy;
let region_ids = flush_regions.region_ids;
let error_strategy = flush_regions.error_strategy;
pub(crate) fn handle_flush_region_instruction(
self,
region_id: RegionId,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not leader".to_string()),
}));
let reply = if matches!(strategy, FlushStrategy::Async) {
// Asynchronous hint mode: fire-and-forget, no reply expected
self.handle_flush_hint(region_ids).await;
None
} else {
// Synchronous mode: return reply with results
let reply = self.handle_flush_sync(region_ids, error_strategy).await;
Some(InstructionReply::FlushRegions(reply))
};
if !writable {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not writable".to_string()),
}));
}
let elapsed = start_time.elapsed();
debug!(
"FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
strategy, elapsed, reply
);
let region_server_moved = self.region_server.clone();
let register_result = self
.flush_tasks
.try_register(
region_id,
Box::pin(async move {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
region_server_moved
.handle_request(region_id, request)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another flush task is running for the region: {region_id}");
}
let mut watcher = register_result.into_watcher();
let result = self.flush_tasks.wait_until_finish(&mut watcher).await;
match result {
Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply {
result: true,
error: None,
})),
Err(err) => Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some(format!("{err:?}")),
})),
}
reply
})
}
}
@@ -115,7 +172,7 @@ impl HandlerContext {
mod tests {
use std::sync::{Arc, RwLock};
use common_meta::instruction::FlushRegions;
use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
use mito2::engine::MITO_ENGINE_NAME;
use store_api::storage::RegionId;
@@ -123,7 +180,7 @@ mod tests {
use crate::tests::{mock_region_server, MockRegionEngine};
#[tokio::test]
async fn test_handle_flush_region_instruction() {
async fn test_handle_flush_region_hint() {
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
let mock_region_server = mock_region_server();
@@ -142,23 +199,138 @@ mod tests {
}
let handler_context = HandlerContext::new_for_test(mock_region_server);
// Async hint mode
let flush_instruction = FlushRegions::async_batch(region_ids.clone());
let reply = handler_context
.clone()
.handle_flush_regions_instruction(FlushRegions {
region_ids: region_ids.clone(),
})
.handle_flush_regions_instruction(flush_instruction)
.await;
assert!(reply.is_none());
assert!(reply.is_none()); // Hint mode returns no reply
assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
// Non-existent regions
flushed_region_ids.write().unwrap().clear();
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
let reply = handler_context
.handle_flush_regions_instruction(FlushRegions {
region_ids: not_found_region_ids.clone(),
})
.handle_flush_regions_instruction(flush_instruction)
.await;
assert!(reply.is_none());
assert!(flushed_region_ids.read().unwrap().is_empty());
}
#[tokio::test]
async fn test_handle_flush_region_sync_single() {
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let flushed_region_ids_ref = flushed_region_ids.clone();
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
flushed_region_ids_ref.write().unwrap().push(region_id);
Ok(0)
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let flush_instruction = FlushRegions::sync_single(region_id);
let reply = handler_context
.handle_flush_regions_instruction(flush_instruction)
.await;
assert!(reply.is_some());
if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
assert!(flush_reply.overall_success);
assert_eq!(flush_reply.results.len(), 1);
assert_eq!(flush_reply.results[0].0, region_id);
assert!(flush_reply.results[0].1.is_ok());
} else {
panic!("Expected FlushRegions reply");
}
assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
}
#[tokio::test]
async fn test_handle_flush_region_sync_batch_fail_fast() {
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
let mock_region_server = mock_region_server();
let region_ids = vec![
RegionId::new(1024, 1),
RegionId::new(1024, 2),
RegionId::new(1024, 3),
];
// Register only the first region, others will fail
let flushed_region_ids_ref = flushed_region_ids.clone();
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
flushed_region_ids_ref.write().unwrap().push(region_id);
Ok(0)
}))
});
mock_region_server.register_test_region(region_ids[0], mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
// Sync batch with fail-fast strategy
let flush_instruction =
FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
let reply = handler_context
.handle_flush_regions_instruction(flush_instruction)
.await;
assert!(reply.is_some());
if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
assert!(!flush_reply.overall_success); // Should fail due to non-existent regions
// With fail-fast, only process regions until first failure
assert!(flush_reply.results.len() <= region_ids.len());
} else {
panic!("Expected FlushRegions reply");
}
}
#[tokio::test]
async fn test_handle_flush_region_sync_batch_try_all() {
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
let mock_region_server = mock_region_server();
let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
// Register only the first region
let flushed_region_ids_ref = flushed_region_ids.clone();
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
flushed_region_ids_ref.write().unwrap().push(region_id);
Ok(0)
}))
});
mock_region_server.register_test_region(region_ids[0], mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
// Sync batch with try-all strategy
let flush_instruction =
FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
let reply = handler_context
.handle_flush_regions_instruction(flush_instruction)
.await;
assert!(reply.is_some());
if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
assert!(!flush_reply.overall_success); // Should fail due to one non-existent region
// With try-all, should process all regions
assert_eq!(flush_reply.results.len(), region_ids.len());
// First should succeed, second should fail
assert!(flush_reply.results[0].1.is_ok());
assert!(flush_reply.results[1].1.is_err());
} else {
panic!("Expected FlushRegions reply");
}
}
}

View File

@@ -796,7 +796,7 @@ mod tests {
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::test_util::*;
use crate::procedure::test_util::{
new_downgrade_region_reply, new_flush_region_reply, new_open_region_reply,
new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply,
new_upgrade_region_reply,
};
use crate::service::mailbox::Channel;
@@ -1316,7 +1316,9 @@ mod tests {
"Should be the flush leader region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(|id| Ok(new_flush_region_reply(id, true, None))),
Arc::new(move |id| {
Ok(new_flush_region_reply_for_region(id, region_id, true, None))
}),
)),
Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
),

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::instruction::{FlushRegions, Instruction, InstructionReply};
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
@@ -65,7 +65,7 @@ impl PreFlushRegion {
fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
Instruction::FlushRegion(region_id)
Instruction::FlushRegions(FlushRegions::sync_single(region_id))
}
/// Tries to flush a leader region.
@@ -117,13 +117,37 @@ impl PreFlushRegion {
now.elapsed()
);
let InstructionReply::FlushRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
let reply_result = match reply {
InstructionReply::FlushRegions(flush_reply) => {
if flush_reply.results.len() != 1 {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect single region flush result",
}
.fail();
}
let (reply_region_id, result) = &flush_reply.results[0];
if *reply_region_id != region_id {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "flush reply region ID mismatch",
}
.fail();
}
match result {
Ok(()) => (true, None),
Err(err) => (false, Some(err.clone())),
}
}
_ => {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
}
.fail();
};
let (result, error) = reply_result;
if error.is_some() {
warn!(
@@ -170,7 +194,7 @@ mod tests {
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_flush_region_reply, send_mock_reply,
new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
};
fn new_persistent_context() -> PersistentContext {
@@ -245,6 +269,7 @@ mod tests {
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let region_id = persistent_context.region_id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
@@ -253,9 +278,10 @@ mod tests {
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| {
Ok(new_flush_region_reply(
send_mock_reply(mailbox, rx, move |id| {
Ok(new_flush_region_reply_for_region(
id,
region_id,
false,
Some("test mocked".to_string()),
))
@@ -272,6 +298,7 @@ mod tests {
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let region_id = persistent_context.region_id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
@@ -280,7 +307,9 @@ mod tests {
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None)));
send_mock_reply(mailbox, rx, move |id| {
Ok(new_flush_region_reply_for_region(id, region_id, true, None))
});
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();

View File

@@ -17,7 +17,7 @@ use std::collections::HashMap;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
use common_meta::instruction::{
DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
DowngradeRegionReply, FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
@@ -101,8 +101,16 @@ pub fn new_open_region_reply(id: u64, result: bool, error: Option<String>) -> Ma
}
}
/// Generates a [InstructionReply::FlushRegion] reply.
/// Generates a [InstructionReply::FlushRegions] reply with a single region.
pub fn new_flush_region_reply(id: u64, result: bool, error: Option<String>) -> MailboxMessage {
// Use RegionId(0, 0) as default for backward compatibility in tests
let region_id = RegionId::new(0, 0);
let flush_reply = if result {
FlushRegionReply::success_single(region_id)
} else {
FlushRegionReply::error_single(region_id, error.unwrap_or("Test error".to_string()))
};
MailboxMessage {
id,
subject: "mock".to_string(),
@@ -110,11 +118,32 @@ pub fn new_flush_region_reply(id: u64, result: bool, error: Option<String>) -> M
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::FlushRegion(SimpleReply {
result,
error,
}))
.unwrap(),
serde_json::to_string(&InstructionReply::FlushRegions(flush_reply)).unwrap(),
)),
}
}
/// Generates a [InstructionReply::FlushRegions] reply for a specific region.
pub fn new_flush_region_reply_for_region(
id: u64,
region_id: RegionId,
result: bool,
error: Option<String>,
) -> MailboxMessage {
let flush_reply = if result {
FlushRegionReply::success_single(region_id)
} else {
FlushRegionReply::error_single(region_id, error.unwrap_or("Test error".to_string()))
};
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::FlushRegions(flush_reply)).unwrap(),
)),
}
}

View File

@@ -357,7 +357,8 @@ impl RegionFlushTrigger {
let flush_instructions = leader_to_region_ids
.into_iter()
.map(|(leader, region_ids)| {
let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids });
let flush_instruction =
Instruction::FlushRegions(FlushRegions::async_batch(region_ids));
(leader, flush_instruction)
});