refactor: clarify region flush reasons (#8146)

* refactor: clarify region flush reasons

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-05-21 15:35:46 +08:00
committed by GitHub
parent ba679dddfa
commit 15fc148e40
5 changed files with 34 additions and 40 deletions

View File

@@ -206,8 +206,6 @@ impl WriteBufferManager for WriteBufferManagerImpl {
/// Reason of a flush task.
#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
pub enum FlushReason {
/// Other reasons.
Others,
/// Engine reaches flush threshold.
EngineFull,
/// Manual flush.
@@ -243,6 +241,8 @@ impl From<RegionFlushReason> for FlushReason {
RegionFlushReason::RegionMigration => FlushReason::RegionMigration,
RegionFlushReason::Repartition => FlushReason::Repartition,
RegionFlushReason::RemoteWalPrune => FlushReason::RemoteWalPrune,
RegionFlushReason::Closing => FlushReason::Closing,
RegionFlushReason::Downgrading => FlushReason::Downgrading,
}
}
}
@@ -1355,7 +1355,7 @@ mod tests {
let (output_tx, output_rx) = oneshot::channel();
let mut task = RegionFlushTask {
region_id: builder.region_id(),
reason: FlushReason::Others,
reason: FlushReason::Manual,
senders: Vec::new(),
request_sender: tx,
access_layer: env.access_layer.clone(),
@@ -1401,7 +1401,7 @@ mod tests {
let mut tasks: Vec<_> = (0..3)
.map(|_| RegionFlushTask {
region_id: builder.region_id(),
reason: FlushReason::Others,
reason: FlushReason::Manual,
senders: Vec::new(),
request_sender: tx.clone(),
access_layer: env.access_layer.clone(),
@@ -1587,7 +1587,7 @@ mod tests {
let mut tasks: Vec<_> = (0..2)
.map(|_| RegionFlushTask {
region_id: builder.region_id(),
reason: FlushReason::Others,
reason: FlushReason::Manual,
senders: Vec::new(),
request_sender: tx.clone(),
access_layer: env.access_layer.clone(),

View File

@@ -1153,7 +1153,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
continue;
}
DdlRequest::Flush(req) => {
self.handle_flush_request(ddl.region_id, req, None, ddl.sender);
self.handle_flush_request(ddl.region_id, req, ddl.sender);
continue;
}
DdlRequest::Compact(req) => {

View File

@@ -17,10 +17,9 @@
use common_telemetry::info;
use store_api::logstore::LogStore;
use store_api::logstore::provider::Provider;
use store_api::region_request::RegionFlushRequest;
use store_api::region_request::{RegionFlushReason, RegionFlushRequest};
use store_api::storage::RegionId;
use crate::flush::FlushReason;
use crate::request::OptionOutputTx;
use crate::worker::RegionWorkerLoop;
@@ -51,8 +50,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
info!("Region {} has pending data, waiting for flush", region_id);
self.handle_flush_request(
region_id,
RegionFlushRequest::default(),
Some(FlushReason::Closing),
RegionFlushRequest {
reason: Some(RegionFlushReason::Closing),
..Default::default()
},
sender,
);
return;

View File

@@ -31,19 +31,14 @@ use crate::sst::index::IndexBuildType;
use crate::worker::RegionWorkerLoop;
fn resolve_flush_reason(
explicit_reason: Option<FlushReason>,
request_reason: Option<RegionFlushReason>,
is_downgrading: bool,
) -> FlushReason {
explicit_reason
.or_else(|| request_reason.map(FlushReason::from))
.unwrap_or({
if is_downgrading {
FlushReason::Downgrading
} else {
FlushReason::Manual
}
})
match request_reason {
Some(reason) => FlushReason::from(reason),
None if is_downgrading => FlushReason::Downgrading,
None => FlushReason::Manual,
}
}
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -181,7 +176,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&mut self,
region_id: RegionId,
request: RegionFlushRequest,
reason: Option<FlushReason>,
sender: OptionOutputTx,
) {
let region = match self.regions.flushable_region(region_id) {
@@ -197,7 +191,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// when handling flush request instead of in `schedule_flush` or `flush_finished`.
self.update_topic_latest_entry_id(&region);
let reason = resolve_flush_reason(reason, request.reason, region.is_downgrading());
let reason = resolve_flush_reason(request.reason, region.is_downgrading());
let mut task =
self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
task.push_sender(sender);
@@ -369,38 +363,33 @@ impl<S: LogStore> RegionWorkerLoop<S> {
mod tests {
use super::*;
#[test]
fn test_resolve_flush_reason_prefers_explicit_reason() {
let reason = resolve_flush_reason(
Some(FlushReason::Closing),
Some(RegionFlushReason::RemoteWalPrune),
true,
);
assert_eq!(reason, FlushReason::Closing);
}
#[test]
fn test_resolve_flush_reason_uses_request_reason() {
assert_eq!(
resolve_flush_reason(None, Some(RegionFlushReason::RegionMigration), true),
resolve_flush_reason(Some(RegionFlushReason::RegionMigration), true),
FlushReason::RegionMigration
);
assert_eq!(
resolve_flush_reason(None, Some(RegionFlushReason::Repartition), false),
resolve_flush_reason(Some(RegionFlushReason::Repartition), false),
FlushReason::Repartition
);
assert_eq!(
resolve_flush_reason(None, Some(RegionFlushReason::RemoteWalPrune), false),
resolve_flush_reason(Some(RegionFlushReason::RemoteWalPrune), false),
FlushReason::RemoteWalPrune
);
assert_eq!(
resolve_flush_reason(Some(RegionFlushReason::Closing), false),
FlushReason::Closing
);
assert_eq!(
resolve_flush_reason(Some(RegionFlushReason::Downgrading), false),
FlushReason::Downgrading
);
}
#[test]
fn test_resolve_flush_reason_fallback_unchanged() {
assert_eq!(
resolve_flush_reason(None, None, true),
FlushReason::Downgrading
);
assert_eq!(resolve_flush_reason(None, None, false), FlushReason::Manual);
assert_eq!(resolve_flush_reason(None, true), FlushReason::Downgrading);
assert_eq!(resolve_flush_reason(None, false), FlushReason::Manual);
}
}

View File

@@ -1414,6 +1414,10 @@ pub enum RegionFlushReason {
Repartition,
/// Flush triggered by remote WAL pruning.
RemoteWalPrune,
/// Flush region before closing region.
Closing,
/// Flush region before downgrading region.
Downgrading,
}
#[derive(Debug, Clone, Default)]