fix: skip flush when closing follower region (#8143)

* fix/flush-on-close: Guard close flush by region state

- `close flush`: skip pending-data flush for non-flushable regions in `src/mito2/src/worker/handle_close.rs`
- `flushable region`: surface `FlushableRegionState` errors from `RegionMap::flushable_region` in `src/mito2/src/error.rs`, `src/mito2/src/region.rs`, and `src/mito2/src/worker/handle_flush.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/flush-on-close: Cover follower close with pending data

- `regression test`: cover closing a Noop WAL follower region with pending memtable data in `src/mito2/src/engine/skip_wal_test.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: comments and logs

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-05-21 11:51:14 +08:00
committed by GitHub
parent 28bed396e2
commit 1e987e66c8
5 changed files with 78 additions and 32 deletions

View File

@@ -171,6 +171,48 @@ async fn test_close_follower_region_skip_wal() {
assert_eq!(0, total_rows);
}
#[tokio::test]
async fn test_close_follower_region_skip_wal_with_pending_data() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("close-follower-skip-wal-pending-data").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let mut request = CreateRequestBuilder::new().build();
let wal_options = WalOptions::Noop;
request.options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&wal_options).unwrap(),
);
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
let rows = Rows {
schema: rows_schema(&request),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
let region = engine.get_region(region_id).unwrap();
assert!(!region.version().memtables.is_empty());
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
assert!(region.is_follower());
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
assert!(!engine.is_region_exists(region_id));
}
#[tokio::test]
async fn test_close_region_after_truncate_skip_wal() {
common_telemetry::init_default_ut_logging();

View File

@@ -1241,6 +1241,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Region {} is in {:?} state, expect: Writable, Staging or Downgrading",
region_id,
state
))]
FlushableRegionState {
region_id: RegionId,
state: RegionRoleState,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1430,6 +1442,8 @@ impl ErrorExt for Error {
TooManyFilesToRead { .. } | TooManyGcJobs { .. } => StatusCode::RateLimited,
PruneFile { source, .. } => source.status_code(),
FlushableRegionState { .. } => StatusCode::RegionNotReady,
}
}

View File

@@ -45,8 +45,8 @@ pub use utils::*;
use crate::access_layer::AccessLayerRef;
use crate::error::{
InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
UnexpectedSnafu, UpdateManifestSnafu,
FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
};
use crate::manifest::action::{
RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
@@ -1435,35 +1435,19 @@ impl RegionMap {
/// Gets flushable region by region id.
///
/// Returns error if the region does not exist.
/// Returns None if the region exists but not operatable.
fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
/// Returns error if the region does not exist or not flushable.
pub(crate) fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
let region = self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
if region.is_flushable() {
Ok(Some(region))
} else {
Ok(None)
}
}
/// Gets flushable region by region id.
///
/// Calls the callback if the region does not exist.
/// Returns None if the region exists but not operatable.
pub(crate) fn flushable_region_or<F: OnFailure>(
&self,
region_id: RegionId,
cb: &mut F,
) -> Option<MitoRegionRef> {
match self.flushable_region(region_id) {
Ok(region) => region,
Err(e) => {
cb.on_failure(e);
None
ensure!(
region.is_flushable(),
FlushableRegionStateSnafu {
region_id,
state: region.state(),
}
}
);
Ok(region)
}
/// Remove region by id.

View File

@@ -37,8 +37,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
info!("Try to close region {}, worker: {}", region_id, self.id);
// If the region is using Noop WAL and has data in memtable,
// we should flush it before closing to ensure durability.
// If the region is using Noop WAL and has data in memtable and region is flushable (like,
// not in follower state), we should flush it before closing to ensure durability.
if region.provider == Provider::Noop
&& !region
.version_control
@@ -46,6 +46,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.version
.memtables
.is_empty()
&& region.is_flushable()
{
info!("Region {} has pending data, waiting for flush", region_id);
self.handle_flush_request(

View File

@@ -182,11 +182,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
request: RegionFlushRequest,
reason: Option<FlushReason>,
mut sender: OptionOutputTx,
sender: OptionOutputTx,
) {
let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
return;
let region = match self.regions.flushable_region(region_id) {
Ok(region) => region,
Err(e) => {
sender.send(Err(e));
return;
}
};
// `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty.
// But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id`
// when handling flush request instead of in `schedule_flush` or `flush_finished`.