diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index c863d15d08..16260327f5 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -206,8 +206,6 @@ impl WriteBufferManager for WriteBufferManagerImpl { /// Reason of a flush task. #[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)] pub enum FlushReason { - /// Other reasons. - Others, /// Engine reaches flush threshold. EngineFull, /// Manual flush. @@ -243,6 +241,8 @@ impl From for FlushReason { RegionFlushReason::RegionMigration => FlushReason::RegionMigration, RegionFlushReason::Repartition => FlushReason::Repartition, RegionFlushReason::RemoteWalPrune => FlushReason::RemoteWalPrune, + RegionFlushReason::Closing => FlushReason::Closing, + RegionFlushReason::Downgrading => FlushReason::Downgrading, } } } @@ -1355,7 +1355,7 @@ mod tests { let (output_tx, output_rx) = oneshot::channel(); let mut task = RegionFlushTask { region_id: builder.region_id(), - reason: FlushReason::Others, + reason: FlushReason::Manual, senders: Vec::new(), request_sender: tx, access_layer: env.access_layer.clone(), @@ -1401,7 +1401,7 @@ mod tests { let mut tasks: Vec<_> = (0..3) .map(|_| RegionFlushTask { region_id: builder.region_id(), - reason: FlushReason::Others, + reason: FlushReason::Manual, senders: Vec::new(), request_sender: tx.clone(), access_layer: env.access_layer.clone(), @@ -1587,7 +1587,7 @@ mod tests { let mut tasks: Vec<_> = (0..2) .map(|_| RegionFlushTask { region_id: builder.region_id(), - reason: FlushReason::Others, + reason: FlushReason::Manual, senders: Vec::new(), request_sender: tx.clone(), access_layer: env.access_layer.clone(), diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 51b3594b95..6e9573a9bb 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -1153,7 +1153,7 @@ impl RegionWorkerLoop { continue; } DdlRequest::Flush(req) => { - self.handle_flush_request(ddl.region_id, req, None, ddl.sender); + self.handle_flush_request(ddl.region_id, req, ddl.sender); continue; } DdlRequest::Compact(req) => { diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 3e0f5e52a6..4f0e388f7f 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -17,10 +17,9 @@ use common_telemetry::info; use store_api::logstore::LogStore; use store_api::logstore::provider::Provider; -use store_api::region_request::RegionFlushRequest; +use store_api::region_request::{RegionFlushReason, RegionFlushRequest}; use store_api::storage::RegionId; -use crate::flush::FlushReason; use crate::request::OptionOutputTx; use crate::worker::RegionWorkerLoop; @@ -51,8 +50,10 @@ impl RegionWorkerLoop { info!("Region {} has pending data, waiting for flush", region_id); self.handle_flush_request( region_id, - RegionFlushRequest::default(), - Some(FlushReason::Closing), + RegionFlushRequest { + reason: Some(RegionFlushReason::Closing), + ..Default::default() + }, sender, ); return; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index cec922a6e1..6ad0e037e1 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -31,19 +31,14 @@ use crate::sst::index::IndexBuildType; use crate::worker::RegionWorkerLoop; fn resolve_flush_reason( - explicit_reason: Option, request_reason: Option, is_downgrading: bool, ) -> FlushReason { - explicit_reason - .or_else(|| request_reason.map(FlushReason::from)) - .unwrap_or({ - if is_downgrading { - FlushReason::Downgrading - } else { - FlushReason::Manual - } - }) + match request_reason { + Some(reason) => FlushReason::from(reason), + None if is_downgrading => FlushReason::Downgrading, + None => FlushReason::Manual, + } } impl RegionWorkerLoop { @@ -181,7 +176,6 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionFlushRequest, - reason: Option, sender: OptionOutputTx, ) { let region = match self.regions.flushable_region(region_id) { @@ -197,7 +191,7 @@ impl RegionWorkerLoop { // when handling flush request instead of in `schedule_flush` or `flush_finished`. self.update_topic_latest_entry_id(®ion); - let reason = resolve_flush_reason(reason, request.reason, region.is_downgrading()); + let reason = resolve_flush_reason(request.reason, region.is_downgrading()); let mut task = self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone()); task.push_sender(sender); @@ -369,38 +363,33 @@ impl RegionWorkerLoop { mod tests { use super::*; - #[test] - fn test_resolve_flush_reason_prefers_explicit_reason() { - let reason = resolve_flush_reason( - Some(FlushReason::Closing), - Some(RegionFlushReason::RemoteWalPrune), - true, - ); - assert_eq!(reason, FlushReason::Closing); - } - #[test] fn test_resolve_flush_reason_uses_request_reason() { assert_eq!( - resolve_flush_reason(None, Some(RegionFlushReason::RegionMigration), true), + resolve_flush_reason(Some(RegionFlushReason::RegionMigration), true), FlushReason::RegionMigration ); assert_eq!( - resolve_flush_reason(None, Some(RegionFlushReason::Repartition), false), + resolve_flush_reason(Some(RegionFlushReason::Repartition), false), FlushReason::Repartition ); assert_eq!( - resolve_flush_reason(None, Some(RegionFlushReason::RemoteWalPrune), false), + resolve_flush_reason(Some(RegionFlushReason::RemoteWalPrune), false), FlushReason::RemoteWalPrune ); + assert_eq!( + resolve_flush_reason(Some(RegionFlushReason::Closing), false), + FlushReason::Closing + ); + assert_eq!( + resolve_flush_reason(Some(RegionFlushReason::Downgrading), false), + FlushReason::Downgrading + ); } #[test] fn test_resolve_flush_reason_fallback_unchanged() { - assert_eq!( - resolve_flush_reason(None, None, true), - FlushReason::Downgrading - ); - assert_eq!(resolve_flush_reason(None, None, false), FlushReason::Manual); + assert_eq!(resolve_flush_reason(None, true), FlushReason::Downgrading); + assert_eq!(resolve_flush_reason(None, false), FlushReason::Manual); } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index e33e7613c5..951abca1be 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -1414,6 +1414,10 @@ pub enum RegionFlushReason { Repartition, /// Flush triggered by remote WAL pruning. RemoteWalPrune, + /// Flush region before closing region. + Closing, + /// Flush region before downgrading region. + Downgrading, } #[derive(Debug, Clone, Default)]