From 2c019965beb6a95edff652467c462172d72f4514 Mon Sep 17 00:00:00 2001 From: Alex Araujo Date: Wed, 3 Sep 2025 20:18:54 -0500 Subject: [PATCH] feat: unify FlushRegions instructions (#6819) * feat: add FlushRegionsV2 instruction with unified semantics - Add FlushRegionsV2 struct supporting both single and batch operations - Preserve original FlushRegion(RegionId) API for backward compatibility - Support configurable FlushStrategy (Sync/Async) and FlushErrorStrategy (FailFast/TryAll) - Add detailed per-region error reporting in FlushRegionReply - Update datanode handlers to support both legacy and enhanced flush instructions - Maintain zero breaking changes through automatic conversion of legacy formats Signed-off-by: Alex Araujo * chore: run make fmt Signed-off-by: Alex Araujo * Apply suggestions from code review Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com> Signed-off-by: Alex Araujo * refactor: extract shared perform_region_flush fn Signed-off-by: Alex Araujo * refactor: use consistent error type across similar methods see gh copilot suggestion: https://github.com/GreptimeTeam/greptimedb/pull/6819#discussion_r2299603698 Signed-off-by: Alex Araujo * chore: make fmt Signed-off-by: Alex Araujo * refactor: consolidate FlushRegion instructions Signed-off-by: Alex Araujo --------- Signed-off-by: Alex Araujo --- src/common/meta/src/instruction.rs | 319 +++++++++++++++- src/datanode/src/heartbeat/handler.rs | 4 - .../src/heartbeat/handler/flush_region.rs | 340 +++++++++++++----- .../src/procedure/region_migration.rs | 6 +- .../region_migration/flush_leader_region.rs | 51 ++- src/meta-srv/src/procedure/test_util.rs | 43 ++- src/meta-srv/src/region/flush_trigger.rs | 3 +- 7 files changed, 652 insertions(+), 114 deletions(-) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index a6e2cb05c0..b2e4e261cd 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -81,12 +81,94 @@ pub struct SimpleReply { pub error: Option, } +/// Reply for flush region operations with support for batch results. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct FlushRegionReply { + /// Results for each region that was attempted to be flushed. + /// For single region flushes, this will contain one result. + /// For batch flushes, this contains results for all attempted regions. + pub results: Vec<(RegionId, Result<(), String>)>, + /// Overall success: true if all regions were flushed successfully. + pub overall_success: bool, +} + +impl FlushRegionReply { + /// Create a successful single region reply. + pub fn success_single(region_id: RegionId) -> Self { + Self { + results: vec![(region_id, Ok(()))], + overall_success: true, + } + } + + /// Create a failed single region reply. + pub fn error_single(region_id: RegionId, error: String) -> Self { + Self { + results: vec![(region_id, Err(error))], + overall_success: false, + } + } + + /// Create a batch reply from individual results. + pub fn from_results(results: Vec<(RegionId, Result<(), String>)>) -> Self { + let overall_success = results.iter().all(|(_, result)| result.is_ok()); + Self { + results, + overall_success, + } + } + + /// Convert to SimpleReply for backward compatibility. + pub fn to_simple_reply(&self) -> SimpleReply { + if self.overall_success { + SimpleReply { + result: true, + error: None, + } + } else { + let errors: Vec = self + .results + .iter() + .filter_map(|(region_id, result)| { + result + .as_ref() + .err() + .map(|err| format!("{}: {}", region_id, err)) + }) + .collect(); + SimpleReply { + result: false, + error: Some(errors.join("; ")), + } + } + } +} + impl Display for SimpleReply { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "(result={}, error={:?})", self.result, self.error) } } +impl Display for FlushRegionReply { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let results_str = self + .results + .iter() + .map(|(region_id, result)| match result { + Ok(()) => format!("{}:OK", region_id), + Err(err) => format!("{}:ERR({})", region_id, err), + }) + .collect::>() + .join(", "); + write!( + f, + "(overall_success={}, results=[{}])", + self.overall_success, results_str + ) + } +} + impl Display for OpenRegion { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( @@ -219,10 +301,107 @@ pub struct DropFlow { pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>, } -/// Flushes a batch of regions. +/// Strategy for executing flush operations. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum FlushStrategy { + /// Synchronous operation that waits for completion and expects a reply + Sync, + /// Asynchronous hint operation (fire-and-forget, no reply expected) + Async, +} + +/// Error handling strategy for batch flush operations. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum FlushErrorStrategy { + /// Abort on first error (fail-fast) + FailFast, + /// Attempt to flush all regions and collect all errors + TryAll, +} + +impl Default for FlushStrategy { + fn default() -> Self { + Self::Sync + } +} + +impl Default for FlushErrorStrategy { + fn default() -> Self { + Self::FailFast + } +} + +/// Unified flush instruction supporting both single and batch operations +/// with configurable execution strategies and error handling. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct FlushRegions { + /// List of region IDs to flush. Can contain a single region or multiple regions. pub region_ids: Vec, + /// Execution strategy: Sync (expects reply) or Async (fire-and-forget hint). + #[serde(default)] + pub strategy: FlushStrategy, + /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy). + #[serde(default)] + pub error_strategy: FlushErrorStrategy, +} + +impl FlushRegions { + /// Create synchronous single-region flush + pub fn sync_single(region_id: RegionId) -> Self { + Self { + region_ids: vec![region_id], + strategy: FlushStrategy::Sync, + error_strategy: FlushErrorStrategy::FailFast, + } + } + + /// Create asynchronous batch flush (fire-and-forget) + pub fn async_batch(region_ids: Vec) -> Self { + Self { + region_ids, + strategy: FlushStrategy::Async, + error_strategy: FlushErrorStrategy::TryAll, + } + } + + /// Create synchronous batch flush with error strategy + pub fn sync_batch(region_ids: Vec, error_strategy: FlushErrorStrategy) -> Self { + Self { + region_ids, + strategy: FlushStrategy::Sync, + error_strategy, + } + } + + /// Check if this is a single region flush. + pub fn is_single_region(&self) -> bool { + self.region_ids.len() == 1 + } + + /// Get the single region ID if this is a single region flush. + pub fn single_region_id(&self) -> Option { + if self.is_single_region() { + self.region_ids.first().copied() + } else { + None + } + } + + /// Check if this is a hint (asynchronous) operation. + pub fn is_hint(&self) -> bool { + matches!(self.strategy, FlushStrategy::Async) + } + + /// Check if this is a synchronous operation. + pub fn is_sync(&self) -> bool { + matches!(self.strategy, FlushStrategy::Sync) + } +} + +impl From for FlushRegions { + fn from(region_id: RegionId) -> Self { + Self::sync_single(region_id) + } } #[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] @@ -243,8 +422,6 @@ pub enum Instruction { InvalidateCaches(Vec), /// Flushes regions. FlushRegions(FlushRegions), - /// Flushes a single region. - FlushRegion(RegionId), } /// The reply of [UpgradeRegion]. @@ -275,7 +452,7 @@ pub enum InstructionReply { CloseRegion(SimpleReply), UpgradeRegion(UpgradeRegionReply), DowngradeRegion(DowngradeRegionReply), - FlushRegion(SimpleReply), + FlushRegions(FlushRegionReply), } impl Display for InstructionReply { @@ -287,7 +464,7 @@ impl Display for InstructionReply { Self::DowngradeRegion(reply) => { write!(f, "InstructionReply::DowngradeRegion({})", reply) } - Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply), + Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply), } } } @@ -373,4 +550,136 @@ mod tests { }; assert_eq!(expected, deserialized); } + + #[test] + fn test_flush_regions_creation() { + let region_id = RegionId::new(1024, 1); + + // Single region sync flush + let single_sync = FlushRegions::sync_single(region_id); + assert_eq!(single_sync.region_ids, vec![region_id]); + assert_eq!(single_sync.strategy, FlushStrategy::Sync); + assert!(!single_sync.is_hint()); + assert!(single_sync.is_sync()); + assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast); + assert!(single_sync.is_single_region()); + assert_eq!(single_sync.single_region_id(), Some(region_id)); + + // Batch async flush (hint) + let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)]; + let batch_async = FlushRegions::async_batch(region_ids.clone()); + assert_eq!(batch_async.region_ids, region_ids); + assert_eq!(batch_async.strategy, FlushStrategy::Async); + assert!(batch_async.is_hint()); + assert!(!batch_async.is_sync()); + assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll); + assert!(!batch_async.is_single_region()); + assert_eq!(batch_async.single_region_id(), None); + + // Batch sync flush + let batch_sync = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast); + assert_eq!(batch_sync.region_ids, region_ids); + assert_eq!(batch_sync.strategy, FlushStrategy::Sync); + assert!(!batch_sync.is_hint()); + assert!(batch_sync.is_sync()); + assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast); + } + + #[test] + fn test_flush_regions_conversion() { + let region_id = RegionId::new(1024, 1); + + let from_region_id: FlushRegions = region_id.into(); + assert_eq!(from_region_id.region_ids, vec![region_id]); + assert_eq!(from_region_id.strategy, FlushStrategy::Sync); + assert!(!from_region_id.is_hint()); + assert!(from_region_id.is_sync()); + + // Test default construction + let flush_regions = FlushRegions { + region_ids: vec![region_id], + strategy: FlushStrategy::Async, + error_strategy: FlushErrorStrategy::TryAll, + }; + assert_eq!(flush_regions.region_ids, vec![region_id]); + assert_eq!(flush_regions.strategy, FlushStrategy::Async); + assert!(flush_regions.is_hint()); + assert!(!flush_regions.is_sync()); + } + + #[test] + fn test_flush_region_reply() { + let region_id = RegionId::new(1024, 1); + + // Successful single region reply + let success_reply = FlushRegionReply::success_single(region_id); + assert!(success_reply.overall_success); + assert_eq!(success_reply.results.len(), 1); + assert_eq!(success_reply.results[0].0, region_id); + assert!(success_reply.results[0].1.is_ok()); + + // Failed single region reply + let error_reply = FlushRegionReply::error_single(region_id, "test error".to_string()); + assert!(!error_reply.overall_success); + assert_eq!(error_reply.results.len(), 1); + assert_eq!(error_reply.results[0].0, region_id); + assert!(error_reply.results[0].1.is_err()); + + // Batch reply + let region_id2 = RegionId::new(1024, 2); + let results = vec![ + (region_id, Ok(())), + (region_id2, Err("flush failed".to_string())), + ]; + let batch_reply = FlushRegionReply::from_results(results); + assert!(!batch_reply.overall_success); + assert_eq!(batch_reply.results.len(), 2); + + // Conversion to SimpleReply + let simple_reply = batch_reply.to_simple_reply(); + assert!(!simple_reply.result); + assert!(simple_reply.error.is_some()); + assert!(simple_reply.error.unwrap().contains("flush failed")); + } + + #[test] + fn test_serialize_flush_regions_instruction() { + let region_id = RegionId::new(1024, 1); + let flush_regions = FlushRegions::sync_single(region_id); + let instruction = Instruction::FlushRegions(flush_regions.clone()); + + let serialized = serde_json::to_string(&instruction).unwrap(); + let deserialized: Instruction = serde_json::from_str(&serialized).unwrap(); + + match deserialized { + Instruction::FlushRegions(fr) => { + assert_eq!(fr.region_ids, vec![region_id]); + assert_eq!(fr.strategy, FlushStrategy::Sync); + assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast); + } + _ => panic!("Expected FlushRegions instruction"), + } + } + + #[test] + fn test_serialize_flush_regions_batch_instruction() { + let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)]; + let flush_regions = + FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll); + let instruction = Instruction::FlushRegions(flush_regions); + + let serialized = serde_json::to_string(&instruction).unwrap(); + let deserialized: Instruction = serde_json::from_str(&serialized).unwrap(); + + match deserialized { + Instruction::FlushRegions(fr) => { + assert_eq!(fr.region_ids, region_ids); + assert_eq!(fr.strategy, FlushStrategy::Sync); + assert!(!fr.is_hint()); + assert!(fr.is_sync()); + assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll); + } + _ => panic!("Expected FlushRegions instruction"), + } + } } diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index f47fe78225..5783498819 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -102,9 +102,6 @@ impl RegionHeartbeatResponseHandler { Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| { handler_context.handle_flush_regions_instruction(flush_regions) })), - Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| { - handler_context.handle_flush_region_instruction(flush_region) - })), } } } @@ -118,7 +115,6 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { | Some((_, Instruction::CloseRegion { .. })) | Some((_, Instruction::DowngradeRegion { .. })) | Some((_, Instruction::UpgradeRegion { .. })) - | Some((_, Instruction::FlushRegion { .. })) | Some((_, Instruction::FlushRegions { .. })) ) } diff --git a/src/datanode/src/heartbeat/handler/flush_region.rs b/src/datanode/src/heartbeat/handler/flush_region.rs index 09617aa37b..8f8ea2389a 100644 --- a/src/datanode/src/heartbeat/handler/flush_region.rs +++ b/src/datanode/src/heartbeat/handler/flush_region.rs @@ -14,99 +14,156 @@ use std::time::Instant; -use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply}; +use common_meta::instruction::{ + FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply, +}; use common_telemetry::{debug, warn}; use futures_util::future::BoxFuture; use store_api::region_request::{RegionFlushRequest, RegionRequest}; use store_api::storage::RegionId; -use crate::error; +use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, UnexpectedSnafu}; use crate::heartbeat::handler::HandlerContext; impl HandlerContext { + /// Performs the actual region flush operation. + async fn perform_region_flush(&self, region_id: RegionId) -> Result<(), error::Error> { + let request = RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }); + self.region_server + .handle_request(region_id, request) + .await?; + Ok(()) + } + + /// Handles asynchronous flush hints (fire-and-forget). + async fn handle_flush_hint(&self, region_ids: Vec) { + let start_time = Instant::now(); + for region_id in ®ion_ids { + let result = self.perform_region_flush(*region_id).await; + match result { + Ok(_) => {} + Err(error::Error::RegionNotFound { .. }) => { + warn!( + "Received a flush region hint from meta, but target region: {} is not found.", + region_id + ); + } + Err(err) => { + warn!("Failed to flush region: {}, error: {}", region_id, err); + } + } + } + let elapsed = start_time.elapsed(); + debug!( + "Flush regions hint: {:?}, elapsed: {:?}", + region_ids, elapsed + ); + } + + /// Handles synchronous flush operations with proper error handling and replies. + async fn handle_flush_sync( + &self, + region_ids: Vec, + error_strategy: FlushErrorStrategy, + ) -> FlushRegionReply { + let mut results = Vec::with_capacity(region_ids.len()); + + for region_id in region_ids { + let result = self.flush_single_region_sync(region_id).await; + + match &result { + Ok(_) => results.push((region_id, Ok(()))), + Err(err) => { + // Convert error::Error to String for FlushRegionReply compatibility + let error_string = err.to_string(); + results.push((region_id, Err(error_string))); + + // For fail-fast strategy, abort on first error + if matches!(error_strategy, FlushErrorStrategy::FailFast) { + break; + } + } + } + } + + FlushRegionReply::from_results(results) + } + + /// Flushes a single region synchronously with proper error handling. + async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<(), error::Error> { + // Check if region is leader and writable + let Some(writable) = self.region_server.is_region_leader(region_id) else { + return Err(RegionNotFoundSnafu { region_id }.build()); + }; + + if !writable { + return Err(RegionNotReadySnafu { region_id }.build()); + } + + // Register and execute the flush task + let region_server_moved = self.region_server.clone(); + let register_result = self + .flush_tasks + .try_register( + region_id, + Box::pin(async move { + region_server_moved + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await?; + Ok(()) + }), + ) + .await; + + if register_result.is_busy() { + warn!("Another flush task is running for the region: {region_id}"); + } + + let mut watcher = register_result.into_watcher(); + match self.flush_tasks.wait_until_finish(&mut watcher).await { + Ok(()) => Ok(()), + Err(err) => Err(UnexpectedSnafu { + violated: format!("Flush task failed: {err:?}"), + } + .build()), + } + } + + /// Unified handler for FlushRegions with all flush semantics. pub(crate) fn handle_flush_regions_instruction( self, flush_regions: FlushRegions, ) -> BoxFuture<'static, Option> { Box::pin(async move { let start_time = Instant::now(); - for region_id in &flush_regions.region_ids { - let request = RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }); - let result = self.region_server.handle_request(*region_id, request).await; - match result { - Ok(_) => {} - Err(error::Error::RegionNotFound { .. }) => { - warn!( - "Received a flush region instruction from meta, but target region: {} is not found.", - region_id - ); - } - Err(err) => { - warn!("Failed to flush region: {}, error: {}", region_id, err); - } - } - } - let elapsed = start_time.elapsed(); - debug!( - "Flush regions: {:?}, elapsed: {:?}", - flush_regions.region_ids, elapsed - ); - None - }) - } + let strategy = flush_regions.strategy; + let region_ids = flush_regions.region_ids; + let error_strategy = flush_regions.error_strategy; - pub(crate) fn handle_flush_region_instruction( - self, - region_id: RegionId, - ) -> BoxFuture<'static, Option> { - Box::pin(async move { - let Some(writable) = self.region_server.is_region_leader(region_id) else { - return Some(InstructionReply::FlushRegion(SimpleReply { - result: false, - error: Some("Region is not leader".to_string()), - })); + let reply = if matches!(strategy, FlushStrategy::Async) { + // Asynchronous hint mode: fire-and-forget, no reply expected + self.handle_flush_hint(region_ids).await; + None + } else { + // Synchronous mode: return reply with results + let reply = self.handle_flush_sync(region_ids, error_strategy).await; + Some(InstructionReply::FlushRegions(reply)) }; - if !writable { - return Some(InstructionReply::FlushRegion(SimpleReply { - result: false, - error: Some("Region is not writable".to_string()), - })); - } + let elapsed = start_time.elapsed(); + debug!( + "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}", + strategy, elapsed, reply + ); - let region_server_moved = self.region_server.clone(); - let register_result = self - .flush_tasks - .try_register( - region_id, - Box::pin(async move { - let request = RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }); - region_server_moved - .handle_request(region_id, request) - .await?; - Ok(()) - }), - ) - .await; - if register_result.is_busy() { - warn!("Another flush task is running for the region: {region_id}"); - } - let mut watcher = register_result.into_watcher(); - let result = self.flush_tasks.wait_until_finish(&mut watcher).await; - match result { - Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply { - result: true, - error: None, - })), - Err(err) => Some(InstructionReply::FlushRegion(SimpleReply { - result: false, - error: Some(format!("{err:?}")), - })), - } + reply }) } } @@ -115,7 +172,7 @@ impl HandlerContext { mod tests { use std::sync::{Arc, RwLock}; - use common_meta::instruction::FlushRegions; + use common_meta::instruction::{FlushErrorStrategy, FlushRegions}; use mito2::engine::MITO_ENGINE_NAME; use store_api::storage::RegionId; @@ -123,7 +180,7 @@ mod tests { use crate::tests::{mock_region_server, MockRegionEngine}; #[tokio::test] - async fn test_handle_flush_region_instruction() { + async fn test_handle_flush_region_hint() { let flushed_region_ids: Arc>> = Arc::new(RwLock::new(Vec::new())); let mock_region_server = mock_region_server(); @@ -142,23 +199,138 @@ mod tests { } let handler_context = HandlerContext::new_for_test(mock_region_server); + // Async hint mode + let flush_instruction = FlushRegions::async_batch(region_ids.clone()); let reply = handler_context .clone() - .handle_flush_regions_instruction(FlushRegions { - region_ids: region_ids.clone(), - }) + .handle_flush_regions_instruction(flush_instruction) .await; - assert!(reply.is_none()); + assert!(reply.is_none()); // Hint mode returns no reply assert_eq!(*flushed_region_ids.read().unwrap(), region_ids); + // Non-existent regions flushed_region_ids.write().unwrap().clear(); let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::>(); + let flush_instruction = FlushRegions::async_batch(not_found_region_ids); let reply = handler_context - .handle_flush_regions_instruction(FlushRegions { - region_ids: not_found_region_ids.clone(), - }) + .handle_flush_regions_instruction(flush_instruction) .await; assert!(reply.is_none()); assert!(flushed_region_ids.read().unwrap().is_empty()); } + + #[tokio::test] + async fn test_handle_flush_region_sync_single() { + let flushed_region_ids: Arc>> = Arc::new(RwLock::new(Vec::new())); + + let mock_region_server = mock_region_server(); + let region_id = RegionId::new(1024, 1); + + let flushed_region_ids_ref = flushed_region_ids.clone(); + let (mock_engine, _) = + MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| { + region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| { + flushed_region_ids_ref.write().unwrap().push(region_id); + Ok(0) + })) + }); + mock_region_server.register_test_region(region_id, mock_engine); + let handler_context = HandlerContext::new_for_test(mock_region_server); + + let flush_instruction = FlushRegions::sync_single(region_id); + let reply = handler_context + .handle_flush_regions_instruction(flush_instruction) + .await; + + assert!(reply.is_some()); + if let Some(InstructionReply::FlushRegions(flush_reply)) = reply { + assert!(flush_reply.overall_success); + assert_eq!(flush_reply.results.len(), 1); + assert_eq!(flush_reply.results[0].0, region_id); + assert!(flush_reply.results[0].1.is_ok()); + } else { + panic!("Expected FlushRegions reply"); + } + + assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]); + } + + #[tokio::test] + async fn test_handle_flush_region_sync_batch_fail_fast() { + let flushed_region_ids: Arc>> = Arc::new(RwLock::new(Vec::new())); + + let mock_region_server = mock_region_server(); + let region_ids = vec![ + RegionId::new(1024, 1), + RegionId::new(1024, 2), + RegionId::new(1024, 3), + ]; + + // Register only the first region, others will fail + let flushed_region_ids_ref = flushed_region_ids.clone(); + let (mock_engine, _) = + MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| { + region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| { + flushed_region_ids_ref.write().unwrap().push(region_id); + Ok(0) + })) + }); + mock_region_server.register_test_region(region_ids[0], mock_engine); + let handler_context = HandlerContext::new_for_test(mock_region_server); + + // Sync batch with fail-fast strategy + let flush_instruction = + FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast); + let reply = handler_context + .handle_flush_regions_instruction(flush_instruction) + .await; + + assert!(reply.is_some()); + if let Some(InstructionReply::FlushRegions(flush_reply)) = reply { + assert!(!flush_reply.overall_success); // Should fail due to non-existent regions + // With fail-fast, only process regions until first failure + assert!(flush_reply.results.len() <= region_ids.len()); + } else { + panic!("Expected FlushRegions reply"); + } + } + + #[tokio::test] + async fn test_handle_flush_region_sync_batch_try_all() { + let flushed_region_ids: Arc>> = Arc::new(RwLock::new(Vec::new())); + + let mock_region_server = mock_region_server(); + let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)]; + + // Register only the first region + let flushed_region_ids_ref = flushed_region_ids.clone(); + let (mock_engine, _) = + MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| { + region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| { + flushed_region_ids_ref.write().unwrap().push(region_id); + Ok(0) + })) + }); + mock_region_server.register_test_region(region_ids[0], mock_engine); + let handler_context = HandlerContext::new_for_test(mock_region_server); + + // Sync batch with try-all strategy + let flush_instruction = + FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll); + let reply = handler_context + .handle_flush_regions_instruction(flush_instruction) + .await; + + assert!(reply.is_some()); + if let Some(InstructionReply::FlushRegions(flush_reply)) = reply { + assert!(!flush_reply.overall_success); // Should fail due to one non-existent region + // With try-all, should process all regions + assert_eq!(flush_reply.results.len(), region_ids.len()); + // First should succeed, second should fail + assert!(flush_reply.results[0].1.is_ok()); + assert!(flush_reply.results[1].1.is_err()); + } else { + panic!("Expected FlushRegions reply"); + } + } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index ec295f98d1..2e4ce0c043 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -796,7 +796,7 @@ mod tests { use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; use crate::procedure::region_migration::test_util::*; use crate::procedure::test_util::{ - new_downgrade_region_reply, new_flush_region_reply, new_open_region_reply, + new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply, new_upgrade_region_reply, }; use crate::service::mailbox::Channel; @@ -1316,7 +1316,9 @@ mod tests { "Should be the flush leader region", Some(mock_datanode_reply( from_peer_id, - Arc::new(|id| Ok(new_flush_region_reply(id, true, None))), + Arc::new(move |id| { + Ok(new_flush_region_reply_for_region(id, region_id, true, None)) + }), )), Assertion::simple(assert_update_metadata_downgrade, assert_no_persist), ), diff --git a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs index 8014c5fdca..e1cd76cc3b 100644 --- a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs @@ -15,7 +15,7 @@ use std::any::Any; use api::v1::meta::MailboxMessage; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::instruction::{FlushRegions, Instruction, InstructionReply}; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; @@ -65,7 +65,7 @@ impl PreFlushRegion { fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction { let pc = &ctx.persistent_ctx; let region_id = pc.region_id; - Instruction::FlushRegion(region_id) + Instruction::FlushRegions(FlushRegions::sync_single(region_id)) } /// Tries to flush a leader region. @@ -117,13 +117,37 @@ impl PreFlushRegion { now.elapsed() ); - let InstructionReply::FlushRegion(SimpleReply { result, error }) = reply else { - return error::UnexpectedInstructionReplySnafu { - mailbox_message: msg.to_string(), - reason: "expect flush region reply", + let reply_result = match reply { + InstructionReply::FlushRegions(flush_reply) => { + if flush_reply.results.len() != 1 { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect single region flush result", + } + .fail(); + } + let (reply_region_id, result) = &flush_reply.results[0]; + if *reply_region_id != region_id { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "flush reply region ID mismatch", + } + .fail(); + } + match result { + Ok(()) => (true, None), + Err(err) => (false, Some(err.clone())), + } + } + _ => { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect flush region reply", + } + .fail(); } - .fail(); }; + let (result, error) = reply_result; if error.is_some() { warn!( @@ -170,7 +194,7 @@ mod tests { use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv}; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; use crate::procedure::test_util::{ - new_close_region_reply, new_flush_region_reply, send_mock_reply, + new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply, }; fn new_persistent_context() -> PersistentContext { @@ -245,6 +269,7 @@ mod tests { // to_peer: 2 let persistent_context = new_persistent_context(); let from_peer_id = persistent_context.from_peer.id; + let region_id = persistent_context.region_id; let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); let mailbox_ctx = env.mailbox_context(); @@ -253,9 +278,10 @@ mod tests { mailbox_ctx .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; - send_mock_reply(mailbox, rx, |id| { - Ok(new_flush_region_reply( + send_mock_reply(mailbox, rx, move |id| { + Ok(new_flush_region_reply_for_region( id, + region_id, false, Some("test mocked".to_string()), )) @@ -272,6 +298,7 @@ mod tests { // to_peer: 2 let persistent_context = new_persistent_context(); let from_peer_id = persistent_context.from_peer.id; + let region_id = persistent_context.region_id; let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); let mailbox_ctx = env.mailbox_context(); @@ -280,7 +307,9 @@ mod tests { mailbox_ctx .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; - send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None))); + send_mock_reply(mailbox, rx, move |id| { + Ok(new_flush_region_reply_for_region(id, region_id, true, None)) + }); let procedure_ctx = new_procedure_context(); let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index ab89808770..9f8b40692d 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage}; use common_meta::instruction::{ - DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, + DowngradeRegionReply, FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, }; use common_meta::key::table_route::TableRouteValue; use common_meta::key::test_utils::new_test_table_info; @@ -101,8 +101,16 @@ pub fn new_open_region_reply(id: u64, result: bool, error: Option) -> Ma } } -/// Generates a [InstructionReply::FlushRegion] reply. +/// Generates a [InstructionReply::FlushRegions] reply with a single region. pub fn new_flush_region_reply(id: u64, result: bool, error: Option) -> MailboxMessage { + // Use RegionId(0, 0) as default for backward compatibility in tests + let region_id = RegionId::new(0, 0); + let flush_reply = if result { + FlushRegionReply::success_single(region_id) + } else { + FlushRegionReply::error_single(region_id, error.unwrap_or("Test error".to_string())) + }; + MailboxMessage { id, subject: "mock".to_string(), @@ -110,11 +118,32 @@ pub fn new_flush_region_reply(id: u64, result: bool, error: Option) -> M to: "meta".to_string(), timestamp_millis: current_time_millis(), payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::FlushRegion(SimpleReply { - result, - error, - })) - .unwrap(), + serde_json::to_string(&InstructionReply::FlushRegions(flush_reply)).unwrap(), + )), + } +} + +/// Generates a [InstructionReply::FlushRegions] reply for a specific region. +pub fn new_flush_region_reply_for_region( + id: u64, + region_id: RegionId, + result: bool, + error: Option, +) -> MailboxMessage { + let flush_reply = if result { + FlushRegionReply::success_single(region_id) + } else { + FlushRegionReply::error_single(region_id, error.unwrap_or("Test error".to_string())) + }; + + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::FlushRegions(flush_reply)).unwrap(), )), } } diff --git a/src/meta-srv/src/region/flush_trigger.rs b/src/meta-srv/src/region/flush_trigger.rs index 2fe918dfd9..e04402bd10 100644 --- a/src/meta-srv/src/region/flush_trigger.rs +++ b/src/meta-srv/src/region/flush_trigger.rs @@ -357,7 +357,8 @@ impl RegionFlushTrigger { let flush_instructions = leader_to_region_ids .into_iter() .map(|(leader, region_ids)| { - let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids }); + let flush_instruction = + Instruction::FlushRegions(FlushRegions::async_batch(region_ids)); (leader, flush_instruction) });