diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 4056597bf5..fd8d17bdff 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -18,6 +18,7 @@ use std::time::Duration; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use store_api::region_engine::SyncRegionFromRequest; +use store_api::region_request::RegionFlushReason; use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber}; use strum::Display; use table::metadata::TableId; @@ -338,14 +339,17 @@ pub struct FlushRegions { /// Error handling strategy for batch operations (only applies when multiple regions and sync strategy). #[serde(default)] pub error_strategy: FlushErrorStrategy, + /// The source that triggered this flush. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, } impl Display for FlushRegions { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})", - self.region_ids, self.strategy, self.error_strategy + "FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?}, reason={:?})", + self.region_ids, self.strategy, self.error_strategy, self.reason ) } } @@ -357,6 +361,7 @@ impl FlushRegions { region_ids: vec![region_id], strategy: FlushStrategy::Sync, error_strategy: FlushErrorStrategy::FailFast, + reason: None, } } @@ -366,6 +371,7 @@ impl FlushRegions { region_ids, strategy: FlushStrategy::Async, error_strategy: FlushErrorStrategy::TryAll, + reason: None, } } @@ -375,9 +381,15 @@ impl FlushRegions { region_ids, strategy: FlushStrategy::Sync, error_strategy, + reason: None, } } + pub fn with_reason(mut self, reason: RegionFlushReason) -> Self { + self.reason = Some(reason); + self + } + /// Check if this is a single region flush. pub fn is_single_region(&self) -> bool { self.region_ids.len() == 1 @@ -1363,6 +1375,7 @@ mod tests { assert!(!single_sync.is_hint()); assert!(single_sync.is_sync()); assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast); + assert_eq!(single_sync.reason, None); assert!(single_sync.is_single_region()); assert_eq!(single_sync.single_region_id(), Some(region_id)); @@ -1374,6 +1387,7 @@ mod tests { assert!(batch_async.is_hint()); assert!(!batch_async.is_sync()); assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll); + assert_eq!(batch_async.reason, None); assert!(!batch_async.is_single_region()); assert_eq!(batch_async.single_region_id(), None); @@ -1384,6 +1398,10 @@ mod tests { assert!(!batch_sync.is_hint()); assert!(batch_sync.is_sync()); assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast); + assert_eq!(batch_sync.reason, None); + + let with_reason = batch_sync.with_reason(RegionFlushReason::RemoteWalPrune); + assert_eq!(with_reason.reason, Some(RegionFlushReason::RemoteWalPrune)); } #[test] @@ -1401,6 +1419,7 @@ mod tests { region_ids: vec![region_id], strategy: FlushStrategy::Async, error_strategy: FlushErrorStrategy::TryAll, + reason: None, }; assert_eq!(flush_regions.region_ids, vec![region_id]); assert_eq!(flush_regions.strategy, FlushStrategy::Async); @@ -1450,6 +1469,7 @@ mod tests { let instruction = Instruction::FlushRegions(flush_regions.clone()); let serialized = serde_json::to_string(&instruction).unwrap(); + assert!(!serialized.contains("reason")); let deserialized: Instruction = serde_json::from_str(&serialized).unwrap(); match deserialized { @@ -1457,6 +1477,32 @@ mod tests { assert_eq!(fr.region_ids, vec![region_id]); assert_eq!(fr.strategy, FlushStrategy::Sync); assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast); + assert_eq!(fr.reason, None); + } + _ => panic!("Expected FlushRegions instruction"), + } + + let legacy = r#"{"FlushRegions":{"region_ids":[4398046511105],"strategy":"Sync","error_strategy":"FailFast"}}"#; + let deserialized: Instruction = serde_json::from_str(legacy).unwrap(); + match deserialized { + Instruction::FlushRegions(fr) => { + assert_eq!(fr.region_ids, vec![region_id]); + assert_eq!(fr.strategy, FlushStrategy::Sync); + assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast); + assert_eq!(fr.reason, None); + } + _ => panic!("Expected FlushRegions instruction"), + } + + let flush_regions = FlushRegions::async_batch(vec![region_id]) + .with_reason(RegionFlushReason::RemoteWalPrune); + let instruction = Instruction::FlushRegions(flush_regions); + let serialized = serde_json::to_string(&instruction).unwrap(); + assert!(serialized.contains(r#""reason":"RemoteWalPrune""#)); + let deserialized: Instruction = serde_json::from_str(&serialized).unwrap(); + match deserialized { + Instruction::FlushRegions(fr) => { + assert_eq!(fr.reason, Some(RegionFlushReason::RemoteWalPrune)); } _ => panic!("Expected FlushRegions instruction"), } @@ -1479,6 +1525,7 @@ mod tests { assert!(!fr.is_hint()); assert!(fr.is_sync()); assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll); + assert_eq!(fr.reason, None); } _ => panic!("Expected FlushRegions instruction"), } diff --git a/src/datanode/src/heartbeat/handler/downgrade_region.rs b/src/datanode/src/heartbeat/handler/downgrade_region.rs index f0ec37c844..98ace5661d 100644 --- a/src/datanode/src/heartbeat/handler/downgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/downgrade_region.rs @@ -113,9 +113,7 @@ impl DowngradeRegionsHandler { region_server_moved .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await?; diff --git a/src/datanode/src/heartbeat/handler/flush_region.rs b/src/datanode/src/heartbeat/handler/flush_region.rs index dafc92ff0f..ca5eac1c15 100644 --- a/src/datanode/src/heartbeat/handler/flush_region.rs +++ b/src/datanode/src/heartbeat/handler/flush_region.rs @@ -18,7 +18,7 @@ use common_meta::instruction::{ FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply, }; use common_telemetry::{debug, warn}; -use store_api::region_request::{RegionFlushRequest, RegionRequest}; +use store_api::region_request::{RegionFlushReason, RegionFlushRequest, RegionRequest}; use store_api::storage::RegionId; use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu}; @@ -39,14 +39,17 @@ impl InstructionHandler for FlushRegionsHandler { let strategy = flush_regions.strategy; let region_ids = flush_regions.region_ids; let error_strategy = flush_regions.error_strategy; + let reason = flush_regions.reason; let reply = if matches!(strategy, FlushStrategy::Async) { // Asynchronous hint mode: fire-and-forget, no reply expected - ctx.handle_flush_hint(region_ids).await; + ctx.handle_flush_hint(region_ids, reason).await; None } else { // Synchronous mode: return reply with results - let reply = ctx.handle_flush_sync(region_ids, error_strategy).await; + let reply = ctx + .handle_flush_sync(region_ids, error_strategy, reason) + .await; Some(InstructionReply::FlushRegions(reply)) }; @@ -62,9 +65,14 @@ impl InstructionHandler for FlushRegionsHandler { impl HandlerContext { /// Performs the actual region flush operation. - async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> { + async fn perform_region_flush( + &self, + region_id: RegionId, + reason: Option, + ) -> Result<()> { let request = RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, + reason, + ..Default::default() }); self.region_server .handle_request(region_id, request) @@ -73,10 +81,14 @@ impl HandlerContext { } /// Handles asynchronous flush hints (fire-and-forget). - async fn handle_flush_hint(&self, region_ids: Vec) { + async fn handle_flush_hint( + &self, + region_ids: Vec, + reason: Option, + ) { let start_time = Instant::now(); for region_id in ®ion_ids { - let result = self.perform_region_flush(*region_id).await; + let result = self.perform_region_flush(*region_id, reason).await; match result { Ok(_) => {} Err(error::Error::RegionNotFound { .. }) => { @@ -102,11 +114,12 @@ impl HandlerContext { &self, region_ids: Vec, error_strategy: FlushErrorStrategy, + reason: Option, ) -> FlushRegionReply { let mut results = Vec::with_capacity(region_ids.len()); for region_id in region_ids { - let result = self.flush_single_region_sync(region_id).await; + let result = self.flush_single_region_sync(region_id, reason).await; match &result { Ok(_) => results.push((region_id, Ok(()))), @@ -127,7 +140,11 @@ impl HandlerContext { } /// Flushes a single region synchronously with proper error handling. - async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> { + async fn flush_single_region_sync( + &self, + region_id: RegionId, + reason: Option, + ) -> Result<()> { // Check if region is leader and writable let Some(writable) = self.region_server.is_region_leader(region_id) else { return Err(RegionNotFoundSnafu { region_id }.build()); @@ -148,7 +165,8 @@ impl HandlerContext { .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, + reason, + ..Default::default() }), ) .await?; @@ -184,19 +202,27 @@ mod tests { use super::*; use crate::tests::{MockRegionEngine, mock_region_server}; + type FlushedRequests = Arc)>>>; + #[tokio::test] async fn test_handle_flush_region_hint() { - let flushed_region_ids: Arc>> = Arc::new(RwLock::new(Vec::new())); + let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new())); let mock_region_server = mock_region_server(); let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::>(); for region_id in ®ion_ids { - let flushed_region_ids_ref = flushed_region_ids.clone(); + let flushed_requests_ref = flushed_requests.clone(); let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| { region_engine.handle_request_mock_fn = - Some(Box::new(move |region_id, _request| { - flushed_region_ids_ref.write().unwrap().push(region_id); + Some(Box::new(move |region_id, request| { + let RegionRequest::Flush(request) = request else { + panic!("Expected flush request"); + }; + flushed_requests_ref + .write() + .unwrap() + .push((region_id, request.reason)); Ok(0) })) }); @@ -206,36 +232,47 @@ mod tests { let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); // Async hint mode - let flush_instruction = FlushRegions::async_batch(region_ids.clone()); + let flush_instruction = FlushRegions::async_batch(region_ids.clone()) + .with_reason(RegionFlushReason::RemoteWalPrune); let reply = FlushRegionsHandler .handle(&handler_context, flush_instruction) .await; assert!(reply.is_none()); // Hint mode returns no reply - assert_eq!(*flushed_region_ids.read().unwrap(), region_ids); + let expected = region_ids + .iter() + .map(|region_id| (*region_id, Some(RegionFlushReason::RemoteWalPrune))) + .collect::>(); + assert_eq!(*flushed_requests.read().unwrap(), expected); // Non-existent regions - flushed_region_ids.write().unwrap().clear(); + flushed_requests.write().unwrap().clear(); let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::>(); let flush_instruction = FlushRegions::async_batch(not_found_region_ids); let reply = FlushRegionsHandler .handle(&handler_context, flush_instruction) .await; assert!(reply.is_none()); - assert!(flushed_region_ids.read().unwrap().is_empty()); + assert!(flushed_requests.read().unwrap().is_empty()); } #[tokio::test] async fn test_handle_flush_region_sync_single() { - let flushed_region_ids: Arc>> = Arc::new(RwLock::new(Vec::new())); + let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new())); let mock_region_server = mock_region_server(); let region_id = RegionId::new(1024, 1); - let flushed_region_ids_ref = flushed_region_ids.clone(); + let flushed_requests_ref = flushed_requests.clone(); let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| { - region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| { - flushed_region_ids_ref.write().unwrap().push(region_id); + region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, request| { + let RegionRequest::Flush(request) = request else { + panic!("Expected flush request"); + }; + flushed_requests_ref + .write() + .unwrap() + .push((region_id, request.reason)); Ok(0) })) }); @@ -243,7 +280,8 @@ mod tests { let kv_backend = Arc::new(MemoryKvBackend::new()); let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); - let flush_instruction = FlushRegions::sync_single(region_id); + let flush_instruction = + FlushRegions::sync_single(region_id).with_reason(RegionFlushReason::Repartition); let reply = FlushRegionsHandler .handle(&handler_context, flush_instruction) .await; @@ -252,7 +290,10 @@ mod tests { assert_eq!(flush_reply.results.len(), 1); assert_eq!(flush_reply.results[0].0, region_id); assert!(flush_reply.results[0].1.is_ok()); - assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]); + assert_eq!( + *flushed_requests.read().unwrap(), + vec![(region_id, Some(RegionFlushReason::Repartition))] + ); } #[tokio::test] @@ -333,7 +374,7 @@ mod tests { let display = format!("{}", flush_regions); assert_eq!( display, - "FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast)" + "FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast, reason=None)" ); } } diff --git a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs index 9dee05373b..860849ba6e 100644 --- a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_procedure::{Context as ProcedureContext, Status}; use serde::{Deserialize, Serialize}; use snafu::OptionExt; +use store_api::region_request::RegionFlushReason; use tokio::time::Instant; use crate::error::{self, Result}; @@ -86,6 +87,7 @@ impl PreFlushRegion { leader, operation_timeout, utils::ErrorStrategy::Ignore, + Some(RegionFlushReason::RegionMigration), ) .await } diff --git a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs index 911e881ac3..c1957031d5 100644 --- a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs @@ -28,6 +28,7 @@ use common_telemetry::tracing_context::TracingContext; use futures::future::{join_all, try_join_all}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; +use store_api::region_request::RegionFlushReason; use store_api::storage::RegionId; use crate::error::{self, Error, Result}; @@ -418,6 +419,7 @@ impl EnterStagingRegion { peer, operation_timeout, ErrorStrategy::Retry, + Some(RegionFlushReason::Repartition), ) }) .collect::>(); diff --git a/src/meta-srv/src/procedure/repartition/group/sync_region.rs b/src/meta-srv/src/procedure/repartition/group/sync_region.rs index 7422ae8607..f95b20e95f 100644 --- a/src/meta-srv/src/procedure/repartition/group/sync_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/sync_region.rs @@ -27,6 +27,7 @@ use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::region_engine::SyncRegionFromRequest; +use store_api::region_request::RegionFlushReason; use store_api::storage::RegionId; use crate::error::{self, Error, Result}; @@ -85,6 +86,7 @@ impl SyncRegion { &prepare_result.central_region_datanode, operation_timeout, ErrorStrategy::Retry, + Some(RegionFlushReason::Repartition), ) .await } diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 5ea8e00038..0ac48cfc4f 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -20,6 +20,7 @@ use common_meta::peer::Peer; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{info, warn}; use snafu::ResultExt; +use store_api::region_request::RegionFlushReason; use store_api::storage::RegionId; use tokio::time::Instant; @@ -99,11 +100,12 @@ pub(crate) async fn flush_region( datanode: &Peer, timeout: Duration, error_strategy: ErrorStrategy, + reason: Option, ) -> Result<()> { - let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch( - region_ids.to_vec(), - FlushErrorStrategy::TryAll, - )); + let mut flush_regions = + FlushRegions::sync_batch(region_ids.to_vec(), FlushErrorStrategy::TryAll); + flush_regions.reason = reason; + let flush_instruction = Instruction::FlushRegions(flush_regions); let tracing_ctx = TracingContext::from_current_span(); let msg = MailboxMessage::json_message( @@ -308,6 +310,70 @@ pub mod mock { } } +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_meta::instruction::{FlushStrategy, Instruction}; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::sequence::SequenceBuilder; + + use super::*; + use crate::procedure::test_util::{MailboxContext, new_flush_region_reply_for_region}; + + #[tokio::test] + async fn test_flush_region_payload_includes_reason() { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let mailbox_sequence = SequenceBuilder::new("test_flush_region_reason", kv_backend).build(); + let mut mailbox_ctx = MailboxContext::new(mailbox_sequence); + + let datanode = Peer::new(1, "127.0.0.1:4001"); + let region_id = RegionId::new(1024, 1); + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(datanode.id), tx) + .await; + + let mailbox = mailbox_ctx.mailbox().clone(); + let reply_mailbox = mailbox.clone(); + let reply_task = tokio::spawn(async move { + let response = rx.recv().await.unwrap().unwrap(); + let msg = response.mailbox_message.unwrap(); + let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap(); + let Instruction::FlushRegions(flush_regions) = instruction else { + panic!("Expected FlushRegions instruction"); + }; + + assert_eq!(flush_regions.region_ids, vec![region_id]); + assert_eq!(flush_regions.strategy, FlushStrategy::Sync); + assert_eq!(flush_regions.reason, Some(RegionFlushReason::Repartition)); + + reply_mailbox + .on_recv( + msg.id, + Ok(new_flush_region_reply_for_region( + msg.id, region_id, true, None, + )), + ) + .await + .unwrap(); + }); + + flush_region( + &mailbox, + "127.0.0.1:3002", + &[region_id], + &datanode, + Duration::from_secs(5), + ErrorStrategy::Retry, + Some(RegionFlushReason::Repartition), + ) + .await + .unwrap(); + reply_task.await.unwrap(); + } +} + #[cfg(test)] pub mod test_data { use std::sync::Arc; diff --git a/src/meta-srv/src/region/flush_trigger.rs b/src/meta-srv/src/region/flush_trigger.rs index 4000d6dc13..cdaa4339f4 100644 --- a/src/meta-srv/src/region/flush_trigger.rs +++ b/src/meta-srv/src/region/flush_trigger.rs @@ -31,6 +31,7 @@ use common_wal::config::kafka::common::{ DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, }; use snafu::{OptionExt, ResultExt}; +use store_api::region_request::RegionFlushReason; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; @@ -358,8 +359,10 @@ impl RegionFlushTrigger { let flush_instructions = leader_to_region_ids .into_iter() .map(|(leader, region_ids)| { - let flush_instruction = - Instruction::FlushRegions(FlushRegions::async_batch(region_ids)); + let flush_instruction = Instruction::FlushRegions( + FlushRegions::async_batch(region_ids) + .with_reason(RegionFlushReason::RemoteWalPrune), + ); (leader, flush_instruction) }); @@ -502,11 +505,20 @@ fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool { #[cfg(test)] mod tests { + use std::sync::Arc; + use common_base::readable_size::ReadableSize; - use common_meta::region_registry::LeaderRegionManifestInfo; + use common_meta::instruction::FlushStrategy; + use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry}; + use common_meta::sequence::SequenceBuilder; + use common_meta::stats::topic::TopicStatsRegistry; use store_api::storage::RegionId; use super::*; + use crate::handler::HeartbeatMailbox; + use crate::procedure::test_util::{MailboxContext, new_wal_prune_metadata}; #[test] fn test_is_recent() { @@ -722,4 +734,58 @@ mod tests { should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10)))); assert!(result.is_none()); } + + #[tokio::test] + async fn test_send_flush_instructions_payload_includes_remote_wal_prune_reason() { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let leader_region_registry = Arc::new(LeaderRegionRegistry::new()); + let topic_stats_registry = Arc::new(TopicStatsRegistry::default()); + let mailbox_sequence = + SequenceBuilder::new("test_remote_wal_prune_flush_reason", kv_backend).build(); + let mut mailbox_ctx = MailboxContext::new(mailbox_sequence); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) + .await; + + let (trigger, _ticker) = RegionFlushTrigger::new( + table_metadata_manager.clone(), + leader_region_registry.clone(), + topic_stats_registry, + mailbox_ctx.mailbox().clone(), + "127.0.0.1:3002".to_string(), + ReadableSize(1), + ReadableSize(1), + ); + + let topic = "test_topic".to_string(); + new_wal_prune_metadata( + table_metadata_manager, + leader_region_registry, + 1, + 1, + &[0], + topic, + ) + .await; + + let region_id = RegionId::new(0, 0); + trigger.send_flush_instructions(&[region_id]).await.unwrap(); + + let response = rx.recv().await.unwrap().unwrap(); + let msg = response.mailbox_message.unwrap(); + let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap(); + let Instruction::FlushRegions(flush_regions) = instruction else { + panic!("Expected FlushRegions instruction"); + }; + + assert_eq!(flush_regions.region_ids, vec![region_id]); + assert_eq!(flush_regions.strategy, FlushStrategy::Async); + assert_eq!( + flush_regions.reason, + Some(RegionFlushReason::RemoteWalPrune) + ); + } } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index c2c39951cc..ef4d802cfc 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -688,9 +688,7 @@ mod test { metric_engine .handle_request( physical_region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); diff --git a/src/metric-engine/src/engine/sync/region.rs b/src/metric-engine/src/engine/sync/region.rs index 3fe51d1b68..cbe6515a19 100644 --- a/src/metric-engine/src/engine/sync/region.rs +++ b/src/metric-engine/src/engine/sync/region.rs @@ -231,9 +231,7 @@ mod tests { metric_engine .handle_request( source_physical_region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); diff --git a/src/metric-engine/src/repeated_task.rs b/src/metric-engine/src/repeated_task.rs index 4354511039..d0299035d9 100644 --- a/src/metric-engine/src/repeated_task.rs +++ b/src/metric-engine/src/repeated_task.rs @@ -64,9 +64,7 @@ impl TaskFunction for FlushMetadataRegionTask { .mito .handle_request( metadata_region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await { diff --git a/src/mito2/src/engine/apply_staging_manifest_test.rs b/src/mito2/src/engine/apply_staging_manifest_test.rs index efa0713cfc..e10ed0cbf0 100644 --- a/src/mito2/src/engine/apply_staging_manifest_test.rs +++ b/src/mito2/src/engine/apply_staging_manifest_test.rs @@ -230,9 +230,7 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) { engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -503,9 +501,7 @@ async fn test_apply_staging_manifest_change_edit_different_columns_fails_with_fo engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -624,9 +620,7 @@ async fn test_apply_staging_manifest_preserves_unflushed_memtable_with_format(fl engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -733,9 +727,7 @@ async fn test_split_repartition_causes_duplicate_data() { engine .handle_request( source_region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -928,9 +920,7 @@ async fn test_merge_repartition_data_integrity() { engine .handle_request( source_region_id_1, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -944,9 +934,7 @@ async fn test_merge_repartition_data_integrity() { engine .handle_request( source_region_id_2, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 8493dc6a36..c85c540488 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -932,9 +932,7 @@ async fn test_list_ssts_with_format( engine .handle_request( *region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -1124,9 +1122,7 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index dc8aa94fdf..f76e9f8bf9 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -55,9 +55,7 @@ pub(crate) async fn put_and_flush( let result = engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -68,9 +66,7 @@ async fn flush(engine: &MitoEngine, region_id: RegionId) { let result = engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -116,9 +112,7 @@ pub(crate) async fn delete_and_flush( let result = engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); diff --git a/src/mito2/src/engine/copy_region_from_test.rs b/src/mito2/src/engine/copy_region_from_test.rs index 0cf2686fca..97966af670 100644 --- a/src/mito2/src/engine/copy_region_from_test.rs +++ b/src/mito2/src/engine/copy_region_from_test.rs @@ -71,9 +71,7 @@ async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index: engine .handle_request( source_region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -184,9 +182,7 @@ async fn test_engine_copy_region_failure_with_format(flat_format: bool) { engine .handle_request( source_region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); diff --git a/src/mito2/src/engine/partition_filter_test.rs b/src/mito2/src/engine/partition_filter_test.rs index ff247d0a21..33f1d93aee 100644 --- a/src/mito2/src/engine/partition_filter_test.rs +++ b/src/mito2/src/engine/partition_filter_test.rs @@ -89,9 +89,7 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) { engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -122,9 +120,7 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) { engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); diff --git a/src/mito2/src/engine/remap_manifests_test.rs b/src/mito2/src/engine/remap_manifests_test.rs index b893eb5b97..467d14a1ef 100644 --- a/src/mito2/src/engine/remap_manifests_test.rs +++ b/src/mito2/src/engine/remap_manifests_test.rs @@ -194,9 +194,7 @@ async fn test_remap_manifests_success_with_format(flat_format: bool) { engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index 1b5a6538d8..71abd67eb2 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -402,9 +402,7 @@ async fn test_staging_write_partition_expr_version_with_format(flat_format: bool engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -610,9 +608,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) { engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -703,9 +699,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -720,9 +714,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 818da17faa..8c3fdad75d 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -217,9 +217,7 @@ async fn test_engine_truncate_after_flush_with_format(flat_format: bool) { engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await .unwrap(); @@ -395,9 +393,7 @@ async fn test_engine_truncate_during_flush_with_format(flat_format: bool) { engine_cloned .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), + RegionRequest::Flush(RegionFlushRequest::default()), ) .await }); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 3af94c08e6..c5ad2276eb 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -26,6 +26,7 @@ use datatypes::arrow::datatypes::SchemaRef; use partition::expr::PartitionExpr; use smallvec::{SmallVec, smallvec}; use snafu::ResultExt; +use store_api::region_request::RegionFlushReason; use store_api::storage::{RegionId, SequenceNumber}; use strum::IntoStaticStr; use tokio::sync::{Semaphore, mpsc, watch}; @@ -221,6 +222,12 @@ pub enum FlushReason { EnterStaging, /// Flush when region is closing. Closing, + /// Flush triggered before region migration. + RegionMigration, + /// Flush triggered by repartition procedure. + Repartition, + /// Flush triggered by remote WAL pruning. + RemoteWalPrune, } impl FlushReason { @@ -230,6 +237,16 @@ impl FlushReason { } } +impl From for FlushReason { + fn from(reason: RegionFlushReason) -> Self { + match reason { + RegionFlushReason::RegionMigration => FlushReason::RegionMigration, + RegionFlushReason::Repartition => FlushReason::Repartition, + RegionFlushReason::RemoteWalPrune => FlushReason::RemoteWalPrune, + } + } +} + /// Task to flush a region. pub(crate) struct RegionFlushTask { /// Region to flush. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 15785d8ac7..c4ca7cc7b3 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -1247,7 +1247,10 @@ pub async fn flush_region(engine: &MitoEngine, region_id: RegionId, row_group_si let result = engine .handle_request( region_id, - RegionRequest::Flush(RegionFlushRequest { row_group_size }), + RegionRequest::Flush(RegionFlushRequest { + row_group_size, + reason: None, + }), ) .await .unwrap(); diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 84efc283de..52fe068f15 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -50,9 +50,7 @@ impl RegionWorkerLoop { info!("Region {} has pending data, waiting for flush", region_id); self.handle_flush_request( region_id, - RegionFlushRequest { - row_group_size: None, - }, + RegionFlushRequest::default(), Some(FlushReason::Closing), sender, ); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index a8d88ed846..6778ef6cf2 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -19,7 +19,7 @@ use std::sync::atomic::Ordering; use common_telemetry::{debug, error, info}; use store_api::logstore::LogStore; -use store_api::region_request::RegionFlushRequest; +use store_api::region_request::{RegionFlushReason, RegionFlushRequest}; use store_api::storage::RegionId; use crate::config::{IndexBuildMode, MitoConfig}; @@ -30,6 +30,22 @@ use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, O use crate::sst::index::IndexBuildType; use crate::worker::RegionWorkerLoop; +fn resolve_flush_reason( + explicit_reason: Option, + request_reason: Option, + is_downgrading: bool, +) -> FlushReason { + explicit_reason + .or_else(|| request_reason.map(FlushReason::from)) + .unwrap_or({ + if is_downgrading { + FlushReason::Downgrading + } else { + FlushReason::Manual + } + }) +} + impl RegionWorkerLoop { /// On region flush job failed. pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) { @@ -176,13 +192,7 @@ impl RegionWorkerLoop { // when handling flush request instead of in `schedule_flush` or `flush_finished`. self.update_topic_latest_entry_id(®ion); - let reason = reason.unwrap_or_else(|| { - if region.is_downgrading() { - FlushReason::Downgrading - } else { - FlushReason::Manual - } - }); + let reason = resolve_flush_reason(reason, request.reason, region.is_downgrading()); let mut task = self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone()); task.push_sender(sender); @@ -349,3 +359,43 @@ impl RegionWorkerLoop { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_resolve_flush_reason_prefers_explicit_reason() { + let reason = resolve_flush_reason( + Some(FlushReason::Closing), + Some(RegionFlushReason::RemoteWalPrune), + true, + ); + assert_eq!(reason, FlushReason::Closing); + } + + #[test] + fn test_resolve_flush_reason_uses_request_reason() { + assert_eq!( + resolve_flush_reason(None, Some(RegionFlushReason::RegionMigration), true), + FlushReason::RegionMigration + ); + assert_eq!( + resolve_flush_reason(None, Some(RegionFlushReason::Repartition), false), + FlushReason::Repartition + ); + assert_eq!( + resolve_flush_reason(None, Some(RegionFlushReason::RemoteWalPrune), false), + FlushReason::RemoteWalPrune + ); + } + + #[test] + fn test_resolve_flush_reason_fallback_unchanged() { + assert_eq!( + resolve_flush_reason(None, None, true), + FlushReason::Downgrading + ); + assert_eq!(resolve_flush_reason(None, None, false), FlushReason::Manual); + } +} diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 99d3a87dd3..0759fccf67 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -346,9 +346,7 @@ fn make_region_flush(flush: FlushRequest) -> Result, + pub reason: Option, } #[derive(Debug)]