feat: flush region before close when skip-wal is enabled (#7549)

* feat: flush region before close when skip-wal is enabled

When closing a region with Noop WAL provider, the region is now flushed
before closing to ensure data durability. This prevents data loss for
regions configured with skip_wal.

Changes:
- Add `Closing` variant to `FlushReason` enum
- Modify `handle_close_request` to trigger flush for Noop WAL regions
- Pass flush reason through the flush pipeline
- Add test to verify data persistence after close with skip-wal

The flush-on-close flow completes the region cleanup after the flush
finishes, ensuring the region is properly removed from all schedulers.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor: extract region cleanup logic into dedicated method

Extracts common region cleanup logic (stop, remove, and scheduler cleanup) into a new `remove_region` method to avoid duplication between `handle_close` and `handle_flush_request`. This improves code maintainability and reduces redundancy.

Also updates `RegionMap::remove_region` to return the removed region reference, allowing the caller to perform cleanup operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* test: split skip-wal region close test into pending and no-pending cases

Split the test_close_region_skip_wal test into two separate test cases:
- test_close_region_skip_wal_with_pending_data: Tests the scenario where
  data is inserted before closing a region with skip-wal enabled
- test_close_region_skip_wal_without_pending_data: Tests the scenario
  where a region with skip-wal is closed without any data insertion

This improves test clarity and ensures both scenarios are properly covered.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: skip request handling and compaction for flush-on-close regions

When a region is flushed as part of the close operation (flush_on_close=true),
the region is immediately removed from the server. Therefore, there's no need
to handle pending requests or schedule compactions for such regions.

This fix moves the on_flush_success listener call outside the conditional
block and wraps all post-flush operations (request handling, compaction
scheduling) in an else branch, ensuring they only execute for normal flush
operations where the region remains active.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* test: add close follower region test with skip-wal

Adds a test case for closing a follower region with skip-wal enabled.
The test verifies that when a region transitions from Follower to Leader
before closing, the flush mechanism works correctly even with WAL disabled.

Also refactors flushable_region() to return Option instead of erroring
when region is not operable, allowing more flexible handling of region
states during flush operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: fmt

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* revise test logic for closing a follower region

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-01-15 17:15:14 +08:00
committed by GitHub
parent 2ae20daa62
commit e0285209cb
9 changed files with 272 additions and 65 deletions

View File

@@ -65,6 +65,8 @@ mod scan_test;
#[cfg(test)]
mod set_role_state_test;
#[cfg(test)]
mod skip_wal_test;
#[cfg(test)]
mod staging_test;
#[cfg(test)]
mod sync_test;

View File

@@ -0,0 +1,170 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::Rows;
use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
#[tokio::test]
async fn test_close_region_skip_wal_with_pending_data() {
test_close_region_skip_wal(true).await;
}
#[tokio::test]
async fn test_close_region_skip_wal_without_pending_data() {
test_close_region_skip_wal(false).await;
}
async fn test_close_region_skip_wal(insert: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix(&format!("close-skip-wal-{}", insert)).await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let mut request = CreateRequestBuilder::new().build();
// Set skip_wal = true via WalOptions::Noop
let wal_options = WalOptions::Noop;
request.options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&wal_options).unwrap(),
);
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
if insert {
let column_schemas = rows_schema(&request);
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
}
// The region should have data in memtable.
let region = engine.get_region(region_id).unwrap();
if insert {
assert!(!region.version().memtables.is_empty());
} else {
assert!(region.version().memtables.is_empty());
}
// Close the region. This should trigger a flush.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
// After closing, we reopen it and check if data is persisted.
engine
.handle_request(
region_id,
RegionRequest::Open(store_api::region_request::RegionOpenRequest {
engine: String::new(),
table_dir: request.table_dir.clone(),
path_type: store_api::region_request::PathType::Bare,
options: request.options.clone(),
skip_wal_replay: false,
checkpoint: None,
}),
)
.await
.unwrap();
let scan_request = ScanRequest::default();
let stream = engine
.scan_to_stream(region_id, scan_request)
.await
.unwrap();
let batches = common_recordbatch::RecordBatches::try_collect(stream)
.await
.unwrap();
// If flush was triggered, data should be there even though WAL was skipped.
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
if insert {
assert_eq!(3, total_rows);
} else {
assert_eq!(0, total_rows);
}
}
#[tokio::test]
async fn test_close_follower_region_skip_wal() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("close-follower-skip-wal").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let mut request = CreateRequestBuilder::new().build();
// Set skip_wal = true via WalOptions::Noop
let wal_options = WalOptions::Noop;
request.options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&wal_options).unwrap(),
);
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
// Set the region to Follower state.
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
assert!(region.is_follower());
// Close the region. This should trigger a flush.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
// After closing, we reopen it and check if data is persisted.
engine
.handle_request(
region_id,
RegionRequest::Open(store_api::region_request::RegionOpenRequest {
engine: String::new(),
table_dir: request.table_dir.clone(),
path_type: store_api::region_request::PathType::Bare,
options: request.options.clone(),
skip_wal_replay: false,
checkpoint: None,
}),
)
.await
.unwrap();
let scan_request = ScanRequest::default();
let stream = engine
.scan_to_stream(region_id, scan_request)
.await
.unwrap();
let batches = common_recordbatch::RecordBatches::try_collect(stream)
.await
.unwrap();
// If flush was triggered, data should be there even though WAL was skipped.
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(0, total_rows);
}

View File

@@ -553,18 +553,6 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Region {} is in {:?} state, expect: Leader or Leader(Downgrading)",
region_id,
state
))]
FlushableRegionState {
region_id: RegionId,
state: RegionRoleState,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid options"))]
JsonOptions {
#[snafu(source)]
@@ -1312,7 +1300,6 @@ impl ErrorExt for Error {
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady,
FlushableRegionState { .. } => StatusCode::RegionNotReady,
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,

View File

@@ -219,6 +219,8 @@ pub enum FlushReason {
Downgrading,
/// Enter staging mode.
EnterStaging,
/// Flush when region is closing.
Closing,
}
impl FlushReason {
@@ -320,6 +322,7 @@ impl RegionFlushTask {
edit,
memtables_to_remove,
is_staging: self.is_staging,
flush_reason: self.reason,
};
WorkerRequest::Background {
region_id: self.region_id,

View File

@@ -44,8 +44,8 @@ pub use utils::*;
use crate::access_layer::AccessLayerRef;
use crate::error::{
FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
UnexpectedSnafu, UpdateManifestSnafu,
};
use crate::manifest::action::{
RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
@@ -1163,31 +1163,30 @@ impl RegionMap {
/// Gets flushable region by region id.
///
/// Returns error if the region does not exist or is not operable.
fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
/// Returns error if the region does not exist.
/// Returns None if the region exists but not operatable.
fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
let region = self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
ensure!(
region.is_flushable(),
FlushableRegionStateSnafu {
region_id,
state: region.state(),
}
);
Ok(region)
if region.is_flushable() {
Ok(Some(region))
} else {
Ok(None)
}
}
/// Gets flushable region by region id.
///
/// Calls the callback if the region does not exist or is not operable.
/// Calls the callback if the region does not exist.
/// Returns None if the region exists but not operatable.
pub(crate) fn flushable_region_or<F: OnFailure>(
&self,
region_id: RegionId,
cb: &mut F,
) -> Option<MitoRegionRef> {
match self.flushable_region(region_id) {
Ok(region) => Some(region),
Ok(region) => region,
Err(e) => {
cb.on_failure(e);
None
@@ -1196,9 +1195,9 @@ impl RegionMap {
}
/// Remove region by id.
pub(crate) fn remove_region(&self, region_id: RegionId) {
pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
let mut regions = self.regions.write().unwrap();
regions.remove(&region_id);
regions.remove(&region_id)
}
/// List all regions.

View File

@@ -50,6 +50,7 @@ use crate::error::{
FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
Result, UnexpectedSnafu,
};
use crate::flush::FlushReason;
use crate::manifest::action::{RegionEdit, TruncateKind};
use crate::memtable::MemtableId;
use crate::memtable::bulk::part::BulkPart;
@@ -914,6 +915,8 @@ pub(crate) struct FlushFinished {
pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
/// Whether the region is in staging mode.
pub(crate) is_staging: bool,
/// Reason for flush.
pub(crate) flush_reason: FlushReason,
}
impl FlushFinished {

View File

@@ -1069,14 +1069,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
continue;
}
DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
DdlRequest::Close(_) => {
self.handle_close_request(ddl.region_id, ddl.sender).await;
continue;
}
DdlRequest::Alter(req) => {
self.handle_alter_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
DdlRequest::Flush(req) => {
self.handle_flush_request(ddl.region_id, req, ddl.sender);
self.handle_flush_request(ddl.region_id, req, None, ddl.sender);
continue;
}
DdlRequest::Compact(req) => {

View File

@@ -15,36 +15,68 @@
//! Handling close request.
use common_telemetry::info;
use store_api::region_request::AffectedRows;
use store_api::logstore::LogStore;
use store_api::logstore::provider::Provider;
use store_api::region_request::RegionFlushRequest;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::flush::FlushReason;
use crate::request::OptionOutputTx;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_close_request(
&mut self,
region_id: RegionId,
) -> Result<AffectedRows> {
sender: OptionOutputTx,
) {
let Some(region) = self.regions.get_region(region_id) else {
return Ok(0);
sender.send(Ok(0));
return;
};
info!("Try to close region {}, worker: {}", region_id, self.id);
// If the region is using Noop WAL and has data in memtable,
// we should flush it before closing to ensure durability.
if region.provider == Provider::Noop
&& !region
.version_control
.current()
.version
.memtables
.is_empty()
{
info!("Region {} has pending data, waiting for flush", region_id);
self.handle_flush_request(
region_id,
RegionFlushRequest {
row_group_size: None,
},
Some(FlushReason::Closing),
sender,
);
return;
}
// WAL configured or memtable is empty, flush is not necessary.
self.remove_region(region_id).await;
info!("Region {} closed, worker: {}", region_id, self.id);
sender.send(Ok(0))
}
/// Remove a region and stop all related tasks.
pub(crate) async fn remove_region(&mut self, region_id: RegionId) {
let Some(region) = self.regions.remove_region(region_id) else {
return;
};
region.stop().await;
self.regions.remove_region(region_id);
// Clean flush status.
self.flush_scheduler.on_region_closed(region_id);
// Clean compaction status.
self.compaction_scheduler.on_region_closed(region_id);
// clean index build status.
self.index_build_scheduler.on_region_closed(region_id).await;
info!("Region {} closed, worker: {}", region_id, self.id);
self.region_count.dec();
Ok(0)
}
}

View File

@@ -165,6 +165,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&mut self,
region_id: RegionId,
request: RegionFlushRequest,
reason: Option<FlushReason>,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else {
@@ -175,11 +176,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// when handling flush request instead of in `schedule_flush` or `flush_finished`.
self.update_topic_latest_entry_id(&region);
let reason = if region.is_downgrading() {
FlushReason::Downgrading
} else {
FlushReason::Manual
};
let reason = reason.unwrap_or_else(|| {
if region.is_downgrading() {
FlushReason::Downgrading
} else {
FlushReason::Manual
}
});
let mut task =
self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
task.push_sender(sender);
@@ -277,6 +280,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
let flush_on_close = request.flush_reason == FlushReason::Closing;
let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
// Notifies waiters and observes the flush timer.
@@ -295,26 +299,30 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
}
// Handle pending requests for the region.
if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
self.flush_scheduler.on_flush_success(region_id)
{
// Perform DDLs first because they require empty memtables.
self.handle_ddl_requests(&mut ddl_requests).await;
// Handle pending write requests, we don't stall these requests.
self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
.await;
if flush_on_close {
// Remove region from server for flush on closing,
// no need to handle requests and schedule compactions.
self.remove_region(region_id).await;
info!("Region {} closed after flush", region_id);
} else {
// Handle pending requests for the region.
if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
self.flush_scheduler.on_flush_success(region_id)
{
// Perform DDLs first because they require empty memtables.
self.handle_ddl_requests(&mut ddl_requests).await;
// Handle pending write requests, we don't stall these requests.
self.handle_write_requests(&mut write_requests, &mut bulk_writes, false)
.await;
}
// Maybe flush worker again.
self.maybe_flush_worker();
// Handle stalled requests.
self.handle_stalled_requests().await;
// Schedules compaction.
self.schedule_compaction(&region).await;
}
// Maybe flush worker again.
self.maybe_flush_worker();
// Handle stalled requests.
self.handle_stalled_requests().await;
// Schedules compaction.
self.schedule_compaction(&region).await;
self.listener.on_flush_success(region_id);
}